In [None]:
# Create an empty .env file
with open('.env', 'w') as f:
    pass  # The 'pass' statement does nothing, effectively creating an empty file

print(".env file created successfully.")

.env file created successfully.


In [None]:
# Add a dummy value for the Gemini API key to the .env file
with open('.env', 'a') as f:
    f.write('GOOGLE_API_KEY=""')

print("Dummy Gemini API key added to .env file.")

Dummy Gemini API key added to .env file.


In [None]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '//content/feisty-outrider-471302-k6-597fcfaf6c32.json'

!pip install google-cloud-speech

In [None]:
!pip install google-cloud-speech youtube_transcript_api

Collecting youtube_transcript_api
  Downloading youtube_transcript_api-1.2.2-py3-none-any.whl.metadata (24 kB)
Downloading youtube_transcript_api-1.2.2-py3-none-any.whl (485 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.0/485.0 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: youtube_transcript_api
Successfully installed youtube_transcript_api-1.2.2


In [None]:
!pip install langchain langchain-google-genai langchain-community sentence-transformers faiss-cpu python-dotenv pydantic pytube

Collecting langchain-google-genai
  Downloading langchain_google_genai-2.1.10-py3-none-any.whl.metadata (7.2 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.29-py3-none-any.whl.metadata (2.9 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting filetype<2.0.0,>=1.2.0 (from langchain-google-genai)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting google-ai-generativelanguage<0.7.0,>=0.6.18 (from langchain-google-genai)
  Downloading google_ai_generativelanguage-0.6.18-py3-none-any.whl.metadata (9.8 kB)
Collecting requests<3,>=2 (from langchain)
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting dataclasses-json<0.7,>=0.6.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting marshmallo

In [None]:

!pip install yt-dlp SpeechRecognition pydub
!apt update && apt install -y ffmpeg

Collecting yt-dlp
  Downloading yt_dlp-2025.9.5-py3-none-any.whl.metadata (177 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/177.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m174.1/177.1 kB[0m [31m5.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m177.1/177.1 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting SpeechRecognition
  Downloading speechrecognition-3.14.3-py3-none-any.whl.metadata (30 kB)
Downloading yt_dlp-2025.9.5-py3-none-any.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m44.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading speechrecognition-3.14.3-py3-none-any.whl (32.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m32.9/32.9 MB[0m [31m45.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp, SpeechRecognition
Successfully install

In [None]:

# Cell 1: Audio Upload and Processing System - Core Classes
import os
import hashlib
import json
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import tempfile
import math

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
from pydantic import BaseModel, Field
from dotenv import load_dotenv

# Audio processing libraries
try:
    from pydub import AudioSegment
    from pydub.utils import which
    AUDIO_PROCESSING_AVAILABLE = True
    print("✅ Audio processing (pydub) available")
except ImportError:
    print("⚠️ pydub not installed. Run: pip install pydub")
    AUDIO_PROCESSING_AVAILABLE = False

# Google Cloud Speech
try:
    from google.cloud import speech
    GOOGLE_CLOUD_SPEECH_AVAILABLE = True
    print("✅ Google Cloud Speech-to-Text available")
except ImportError:
    print("⚠️ Google Cloud Speech not available. Run: pip install google-cloud-speech")
    GOOGLE_CLOUD_SPEECH_AVAILABLE = False

# HuggingFace embeddings
try:
    from langchain_community.embeddings import HuggingFaceEmbeddings
    EMBEDDINGS_AVAILABLE = True
except ImportError:
    print("⚠️ sentence-transformers not installed. Run: pip install sentence-transformers")
    EMBEDDINGS_AVAILABLE = False

# File upload for Colab
try:
    from google.colab import files
    COLAB_AVAILABLE = True
    print("✅ Google Colab file upload available")
except ImportError:
    COLAB_AVAILABLE = False

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv('/content/.env')

# Initialize the Gemini LLM
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    temperature=0.3,
    google_api_key=os.getenv("GOOGLE_API_KEY")
)

# Initialize embeddings
if EMBEDDINGS_AVAILABLE:
    print("🔧 Initializing HuggingFace embeddings...")
    embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
    print("✅ Embeddings initialized successfully!")
else:
    embeddings = None

# Structured output models
class AudioAnalysis(BaseModel):
    audio_title: str = Field(description="Title or name of the audio file")
    main_topics: List[str] = Field(description="Main topics discussed in the audio")
    content_type: str = Field(description="Type of content (lecture, interview, meeting, etc.)")
    speakers: List[str] = Field(description="Identified speakers or voices")
    audio_quality: str = Field(description="Quality assessment of the audio")

class AudioChunkSummary(BaseModel):
    start_time: str = Field(description="Start timestamp of the chunk")
    end_time: str = Field(description="End timestamp of the chunk")
    key_points: List[str] = Field(description="Key points discussed in this chunk")
    detailed_summary: str = Field(description="Comprehensive summary of the chunk content")
    topics_covered: List[str] = Field(description="Specific topics covered in this chunk")

# Audio utility functions
def format_duration(seconds: float) -> str:
    """Convert seconds to HH:MM:SS format"""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = int(seconds % 60)

    if hours > 0:
        return f"{hours:02d}:{minutes:02d}:{secs:02d}"
    else:
        return f"{minutes:02d}:{secs:02d}"

def get_audio_info(audio_path: str) -> Dict:
    """Get basic audio file information"""
    try:
        if AUDIO_PROCESSING_AVAILABLE:
            audio = AudioSegment.from_file(audio_path)
            return {
                'duration_seconds': len(audio) / 1000.0,
                'duration_formatted': format_duration(len(audio) / 1000.0),
                'channels': audio.channels,
                'frame_rate': audio.frame_rate,
                'sample_width': audio.sample_width,
                'file_size_mb': os.path.getsize(audio_path) / (1024 * 1024)
            }
        else:
            file_size = os.path.getsize(audio_path)
            return {
                'file_size_mb': file_size / (1024 * 1024),
                'duration_seconds': 0,
                'duration_formatted': 'Unknown'
            }
    except Exception as e:
        logger.error(f"Error getting audio info: {e}")
        return {'error': str(e)}

def get_audio_properties(audio_path: str) -> Dict:
    """Get audio file properties including sample rate"""
    try:
        if AUDIO_PROCESSING_AVAILABLE:
            audio = AudioSegment.from_file(audio_path)
            return {
                'sample_rate': audio.frame_rate,
                'channels': audio.channels,
                'duration_ms': len(audio),
                'sample_width': audio.sample_width
            }
        else:
            return {'sample_rate': 16000}  # fallback
    except Exception as e:
        logger.error(f"Failed to get audio properties: {e}")
        return {'sample_rate': 16000}  # fallback

# Enhanced Audio Processing Manager
class AudioProcessingManager:
    def __init__(self, embeddings, cache_dir: str = "./audio_cache"):
        self.embeddings = embeddings
        self.cache_dir = cache_dir
        self.audio_stores: Dict[str, FAISS] = {}  # filename -> FAISS store
        self.audio_metadata: Dict[str, Dict] = {}  # filename -> metadata
        self.audio_hashes: Dict[str, str] = {}  # filename -> content hash
        self.merged_store: Optional[FAISS] = None
        self.current_files: List[str] = []

        # Create cache directory
        os.makedirs(cache_dir, exist_ok=True)

        # Load existing cache
        self._load_cache()

    def _load_cache(self):
        """Load cached metadata"""
        cache_file = os.path.join(self.cache_dir, "audio_cache_metadata.json")
        if os.path.exists(cache_file):
            try:
                with open(cache_file, 'r') as f:
                    cache_data = json.load(f)
                    self.audio_hashes = cache_data.get('audio_hashes', {})
                    self.audio_metadata = cache_data.get('audio_metadata', {})
                    self.current_files = cache_data.get('current_files', [])
                logger.info(f"📋 Loaded cache with {len(self.audio_hashes)} audio entries")
            except Exception as e:
                logger.warning(f"Failed to load cache: {e}")

    def _save_cache(self):
        """Save cache metadata"""
        cache_file = os.path.join(self.cache_dir, "audio_cache_metadata.json")
        try:
            cache_data = {
                'audio_hashes': self.audio_hashes,
                'audio_metadata': self.audio_metadata,
                'current_files': self.current_files,
                'last_updated': datetime.now().isoformat()
            }
            with open(cache_file, 'w') as f:
                json.dump(cache_data, f, indent=2)
        except Exception as e:
            logger.warning(f"Failed to save cache: {e}")

    def _get_safe_filename(self, filename: str) -> str:
        """Generate safe filename from original name"""
        return hashlib.md5(filename.encode()).hexdigest()

    def upload_audio_file(self) -> Optional[str]:
        """Upload audio file using Colab file upload"""
        if not COLAB_AVAILABLE:
            print("❌ File upload not available outside Google Colab")
            return None

        try:
            print("📁 Please select your audio file (MP3, WAV, M4A, etc.)")
            uploaded = files.upload()

            if not uploaded:
                return None

            filename = list(uploaded.keys())[0]
            logger.info(f"📁 Uploaded file: {filename}")

            # Get audio info
            audio_info = get_audio_info(filename)
            logger.info(f"🎵 Audio info: {audio_info}")

            return filename

        except Exception as e:
            logger.error(f"❌ Upload failed: {e}")
            return None

    def split_large_audio(self, audio_path: str, max_chunk_mb: float = 8.0,
                         chunk_duration_minutes: int = 10) -> List[str]:
        """Split large audio files into manageable chunks preserving sample rate"""
        if not AUDIO_PROCESSING_AVAILABLE:
            raise Exception("Audio processing not available")

        try:
            audio = AudioSegment.from_file(audio_path)
            file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
            duration_minutes = len(audio) / (1000 * 60)

            logger.info(f"🎵 Audio: {duration_minutes:.1f} min, {file_size_mb:.1f} MB, {audio.frame_rate} Hz")

            # Determine if splitting is needed
            chunk_duration_ms = chunk_duration_minutes * 60 * 1000

            if file_size_mb <= max_chunk_mb and duration_minutes <= chunk_duration_minutes:
                logger.info("📁 File size acceptable, no splitting needed")
                return [audio_path]

            # Calculate optimal chunk size
            total_duration_ms = len(audio)
            num_chunks = max(
                math.ceil(file_size_mb / max_chunk_mb),
                math.ceil(total_duration_ms / chunk_duration_ms)
            )

            chunk_size_ms = total_duration_ms // num_chunks
            overlap_ms = 5000  # 5 second overlap

            logger.info(f"🔄 Splitting into {num_chunks} chunks of ~{chunk_size_ms/60000:.1f} minutes each")

            chunks = []
            base_name = os.path.splitext(audio_path)[0]

            for i in range(num_chunks):
                start_ms = max(0, i * chunk_size_ms - (overlap_ms if i > 0 else 0))
                end_ms = min(total_duration_ms, (i + 1) * chunk_size_ms + overlap_ms)

                chunk = audio[start_ms:end_ms]
                chunk_filename = f"{base_name}_chunk_{i+1:02d}.wav"

                # Export as WAV preserving original sample rate (don't force 16kHz)
                chunk.export(chunk_filename, format="wav", parameters=[
                    "-ar", str(audio.frame_rate),  # Use original sample rate
                    "-ac", "1"  # Convert to mono
                ])
                chunks.append(chunk_filename)

                logger.info(f"✅ Created chunk {i+1}/{num_chunks}: {format_duration(start_ms/1000)} - {format_duration(end_ms/1000)}")

            return chunks

        except Exception as e:
            logger.error(f"❌ Audio splitting failed: {e}")
            raise

    def transcribe_audio_chunks(self, audio_chunks: List[str], file_identifier: str) -> Tuple[List[Document], str]:
        """Transcribe multiple audio chunks using Google Cloud Speech-to-Text with dynamic sample rate"""
        if not GOOGLE_CLOUD_SPEECH_AVAILABLE:
            return self._create_sample_documents(file_identifier), "sample"

        try:
            logger.info(f"🎤 Transcribing {len(audio_chunks)} audio chunks...")

            client = speech.SpeechClient()
            all_transcripts = []

            for i, chunk_path in enumerate(audio_chunks):
                logger.info(f"🎤 Processing chunk {i+1}/{len(audio_chunks)}: {os.path.basename(chunk_path)}")

                try:
                    # Get audio properties for this chunk
                    audio_props = get_audio_properties(chunk_path)
                    sample_rate = audio_props['sample_rate']

                    logger.info(f"🔊 Detected sample rate: {sample_rate} Hz")

                    with open(chunk_path, 'rb') as audio_file:
                        content = audio_file.read()

                    audio = speech.RecognitionAudio(content=content)
                    config = speech.RecognitionConfig(
                        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
                        sample_rate_hertz=sample_rate,  # Use detected sample rate
                        language_code="en-US",
                        enable_automatic_punctuation=True,
                        enable_word_time_offsets=False,
                    )

                    # Use appropriate recognition method based on file size
                    file_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)  # Fixed: use file size, not content size

                    if file_size_mb > 5:
                        operation = client.long_running_recognize(config=config, audio=audio)
                        response = operation.result(timeout=300)
                    else:
                        response = client.recognize(config=config, audio=audio)

                    # Extract transcript from this chunk
                    chunk_transcript = ""
                    for result in response.results:
                        chunk_transcript += result.alternatives[0].transcript + " "

                    if chunk_transcript.strip():
                        all_transcripts.append({
                            'chunk_index': i + 1,
                            'transcript': chunk_transcript.strip(),
                            'file_path': chunk_path,
                            'sample_rate': sample_rate
                        })
                        logger.info(f"✅ Chunk {i+1} transcribed: {len(chunk_transcript)} characters")
                    else:
                        logger.warning(f"⚠️ Chunk {i+1} produced no transcript")

                except Exception as e:
                    logger.error(f"❌ Failed to transcribe chunk {i+1}: {e}")
                    continue

            if not all_transcripts:
                raise Exception("No successful transcriptions from any chunks")

            # Combine all transcripts
            full_transcript = " ".join([t['transcript'] for t in all_transcripts])

            # Create document
            doc = Document(
                page_content=full_transcript,
                metadata={
                    'source': file_identifier,
                    'total_chunks': len(audio_chunks),
                    'successful_chunks': len(all_transcripts),
                    'transcription_method': 'google_cloud_speech_chunks',
                    'language': 'en',
                    'transcript_length': len(full_transcript),
                    'processed_at': datetime.now().isoformat(),
                    'sample_rate': all_transcripts[0]['sample_rate'] if all_transcripts else 16000
                }
            )

            content_hash = hashlib.md5(full_transcript.encode()).hexdigest()

            logger.info(f"✅ Full transcription completed: {len(full_transcript)} characters from {len(all_transcripts)} chunks")

            # Cleanup chunk files
            for chunk_path in audio_chunks:
                if chunk_path != file_identifier:  # Don't delete original file
                    try:
                        os.remove(chunk_path)
                    except:
                        pass

            return [doc], content_hash

        except Exception as e:
            logger.error(f"❌ Audio transcription failed: {e}")
            return self._create_sample_documents(file_identifier), "sample"

    def _create_sample_documents(self, source_file: str) -> List[Document]:
        """Create sample documents for testing"""
        sample_content = """
        This is a sample transcription of an audio file about machine learning and artificial intelligence.
        The speaker discusses the fundamentals of neural networks and their applications in modern technology.
        Key topics include deep learning, data preprocessing, model training, and real-world applications.
        The discussion covers both theoretical concepts and practical implementation strategies.
        """

        document = Document(
            page_content=sample_content,
            metadata={
                'source': source_file,
                'title': 'Sample Audio Transcription',
                'transcription_method': 'sample',
                'duration': 300,
                'processed_at': datetime.now().isoformat()
            }
        )
        return [document]

    def create_text_chunks(self, documents: List[Document], chunk_size: int = 1500) -> List[Document]:
        """Create text chunks from transcribed documents"""
        all_chunks = []

        for doc in documents:
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=200,
                length_function=len,
                separators=["\n\n", "\n", ". ", " ", ""]
            )

            chunks = text_splitter.split_documents([doc])

            for i, chunk in enumerate(chunks):
                chunk.metadata.update({
                    'chunk_index': i,
                    'total_chunks': len(chunks),
                    'chunk_size': len(chunk.page_content)
                })
                all_chunks.append(chunk)

        logger.info(f"📊 Created {len(all_chunks)} text chunks")
        return all_chunks

print("✅ Cell 1 Complete: Audio upload and processing system loaded!")

✅ Audio processing (pydub) available
✅ Google Cloud Speech-to-Text available
✅ Google Colab file upload available
🔧 Initializing HuggingFace embeddings...
✅ Embeddings initialized successfully!
✅ Cell 1 Complete: Audio upload and processing system loaded!


In [None]:

# Cell 2: AudioProcessingManager Methods

def create_comprehensive_summary(self, file_identifier: str, documents: List[Document]) -> Dict:
    """Create multi-level comprehensive summary"""
    logger.info(f"📝 Creating comprehensive summary for {file_identifier}")

    # Step 1: Individual chunk summaries
    chunk_summaries = []
    for i, doc in enumerate(documents):
        try:
            # Create synthetic timestamps for chunks that don't have them
            start_time = doc.metadata.get('start_time', f"Segment {i+1}")
            end_time = doc.metadata.get('end_time', f"Segment {i+1}")

            chunk_prompt = ChatPromptTemplate.from_template(
                """Provide a comprehensive summary of this audio segment. Include:
                1. Key points discussed
                2. Important details and context
                3. Any specific examples or explanations given
                4. Concepts or topics introduced

                Segment: {start_time} - {end_time}
                Content: {content}

                Create a detailed summary that preserves all important information:"""
            )

            summary_response = llm.invoke(
                chunk_prompt.format_prompt(
                    start_time=start_time,
                    end_time=end_time,
                    content=doc.page_content
                ).to_string()
            )

            chunk_summaries.append({
                'start_time': start_time,
                'end_time': end_time,
                'summary': summary_response.content,
                'chunk_index': i+1,
                'content_length': len(doc.page_content)
            })

            logger.info(f"✅ Completed chunk summary {i+1}/{len(documents)}")

        except Exception as e:
            logger.error(f"❌ Failed to summarize chunk {i+1}: {e}")
            chunk_summaries.append({
                'start_time': f"Segment {i+1}",
                'end_time': f"Segment {i+1}",
                'summary': f"Summary unavailable for this segment (Segment {i+1})",
                'chunk_index': i+1,
                'content_length': len(doc.page_content) if hasattr(doc, 'page_content') else 0
            })

    # Step 2: Create master summary from chunk summaries
    try:
        all_summaries = "\n\n".join([
            f"**Segment {summary['chunk_index']} ({summary['start_time']} - {summary['end_time']}):**\n{summary['summary']}"
            for summary in chunk_summaries
        ])

        master_prompt = ChatPromptTemplate.from_template(
            """Based on these detailed segment summaries, create a comprehensive master summary that:

            1. **OVERVIEW**: Provide a high-level overview of the entire audio content
            2. **DETAILED CHRONOLOGICAL SUMMARY**: Create a detailed walkthrough preserving all important information
            3. **KEY INSIGHTS**: Extract the most important insights and takeaways
            4. **TOPICS BREAKDOWN**: Organize content by major topics/themes
            5. **CONCLUSION**: Summarize final conclusions and implications

            **IMPORTANT**: Do not lose any important context or details. Be comprehensive rather than brief.

            Segment Summaries:
            {all_summaries}

            Comprehensive Master Summary:"""
        )

        master_response = llm.invoke(
            master_prompt.format_prompt(all_summaries=all_summaries).to_string()
        )

        logger.info("✅ Created master comprehensive summary")

    except Exception as e:
        logger.error(f"❌ Failed to create master summary: {e}")
        master_response = type('obj', (object,), {'content': "Master summary generation failed"})

    return {
        'chunk_summaries': chunk_summaries,
        'master_summary': master_response.content,
        'total_chunks': len(chunk_summaries),
        'total_content_length': sum([s['content_length'] for s in chunk_summaries])
    }

def add_or_update_audio(self, file_path: str) -> bool:
    """Add new audio file or update existing one"""
    logger.info(f"\n🔄 Processing audio file: {file_path}")

    if not os.path.exists(file_path):
        logger.error("❌ File not found")
        return False

    # Get file info
    audio_info = get_audio_info(file_path)
    file_identifier = os.path.basename(file_path)

    # Check if file changed
    file_hash = hashlib.md5(open(file_path, 'rb').read()).hexdigest()
    if file_identifier in self.audio_hashes:
        if self.audio_hashes[file_identifier] == file_hash:
            logger.info("📋 File unchanged, using cached summaries")
            return True

    # Split large audio files
    try:
        audio_chunks = self.split_large_audio(file_path)
    except Exception as e:
        logger.error(f"❌ Audio splitting failed: {e}")
        return False

    # Transcribe audio chunks
    documents, content_hash = self.transcribe_audio_chunks(audio_chunks, file_identifier)

    if not documents:
        logger.error("❌ No transcription available")
        return False

    # Create text chunks
    chunks = self.create_text_chunks(documents)

    if not chunks:
        logger.error("❌ No chunks created")
        return False

    # Add metadata to chunks
    for doc in chunks:
        doc.metadata.update({
            'source_file': file_path,
            'file_identifier': file_identifier,
            'content_hash': content_hash,
            'processed_at': datetime.now().isoformat(),
            'audio_info': audio_info
        })

    # Create comprehensive summary
    summary_data = self.create_comprehensive_summary(file_identifier, chunks)

    # Create vector store if embeddings available
    if self.embeddings:
        try:
            # Create documents with summaries for vector search
            summary_documents = []
            for chunk_summary in summary_data['chunk_summaries']:
                summary_doc = Document(
                    page_content=chunk_summary['summary'],
                    metadata={
                        'source_file': file_path,
                        'file_identifier': file_identifier,
                        'start_time': chunk_summary['start_time'],
                        'end_time': chunk_summary['end_time'],
                        'content_type': 'chunk_summary'
                    }
                )
                summary_documents.append(summary_doc)

            # Add master summary
            master_doc = Document(
                page_content=summary_data['master_summary'],
                metadata={
                    'source_file': file_path,
                    'file_identifier': file_identifier,
                    'content_type': 'master_summary'
                }
            )
            summary_documents.append(master_doc)

            # Create vector store
            vector_store = FAISS.from_documents(summary_documents, self.embeddings)
            self.audio_stores[file_identifier] = vector_store

            # Rebuild merged store
            self._rebuild_merged_store()

            logger.info("✅ Created vector store for audio summaries")

        except Exception as e:
            logger.warning(f"⚠️ Vector store creation failed: {e}")

    # Store metadata
    self.audio_metadata[file_identifier] = {
        'file_path': file_path,
        'chunks_count': len(chunks),
        'summary_data': summary_data,
        'last_updated': datetime.now().isoformat(),
        'content_hash': content_hash,
        'audio_info': audio_info
    }

    # Update tracking
    self.audio_hashes[file_identifier] = file_hash
    if file_identifier not in self.current_files:
        self.current_files.append(file_identifier)

    self._save_cache()

    logger.info(f"✅ Successfully processed audio file: {file_identifier}")
    return True

def _rebuild_merged_store(self):
    """Rebuild the merged vector store from all individual stores"""
    if not self.audio_stores:
        self.merged_store = None
        return

    logger.info("🔗 Rebuilding merged vector store...")

    stores = list(self.audio_stores.values())
    self.merged_store = stores[0]

    for store in stores[1:]:
        self.merged_store.merge_from(store)

    logger.info(f"✅ Merged store ready with {len(self.current_files)} audio sources")

def get_retriever(self, k: int = 5):
    """Get retriever from merged store"""
    if self.merged_store is None:
        raise ValueError("No vector store available. Add audio files first.")
    return self.merged_store.as_retriever(search_kwargs={"k": k})

def get_memory_info(self) -> Dict:
    """Get information about current memory state"""
    total_duration = 0
    total_chunks = 0

    for audio_data in self.audio_metadata.values():
        if 'audio_info' in audio_data:
            total_duration += audio_data['audio_info'].get('duration_seconds', 0)
        if 'summary_data' in audio_data:
            total_chunks += audio_data['summary_data'].get('total_chunks', 0)

    return {
        'active_files': self.current_files,
        'total_files': len(self.audio_stores),
        'total_duration_minutes': round(total_duration / 60, 2),
        'total_chunks': total_chunks,
        'metadata': self.audio_metadata,
        'file_hashes': self.audio_hashes
    }

def remove_audio(self, file_identifier: str) -> bool:
    """Remove audio file from memory and rebuild"""
    if file_identifier in self.audio_stores:
        del self.audio_stores[file_identifier]
        del self.audio_metadata[file_identifier]
        if file_identifier in self.current_files:
            self.current_files.remove(file_identifier)
        if file_identifier in self.audio_hashes:
            del self.audio_hashes[file_identifier]

        self._rebuild_merged_store()
        self._save_cache()
        logger.info(f"🗑️ Removed {file_identifier} from memory")
        return True
    return False

def get_audio_summary(self, file_identifier: str) -> Optional[Dict]:
    """Get comprehensive summary for specific audio file"""
    if file_identifier in self.audio_metadata:
        return self.audio_metadata[file_identifier].get('summary_data')
    return None

# Add all methods to the AudioProcessingManager class
AudioProcessingManager.create_comprehensive_summary = create_comprehensive_summary
AudioProcessingManager.add_or_update_audio = add_or_update_audio
AudioProcessingManager._rebuild_merged_store = _rebuild_merged_store
AudioProcessingManager.get_retriever = get_retriever
AudioProcessingManager.get_memory_info = get_memory_info
AudioProcessingManager.remove_audio = remove_audio
AudioProcessingManager.get_audio_summary = get_audio_summary
AudioProcessingManager.get_audio_properties = get_audio_properties

# Initialize the audio processing manager
if embeddings:
    audio_manager = AudioProcessingManager(embeddings)
else:
    audio_manager = AudioProcessingManager(None)

print("✅ Cell 2 Complete: AudioProcessingManager methods added!")

✅ Cell 2 Complete: AudioProcessingManager methods added!


In [None]:

# Cell 3: Interactive Audio Processing System

def audio_qa_pipeline(question: str):
    """Enhanced Q&A pipeline for audio content"""
    if not audio_manager.current_files:
        print("❌ No audio files loaded. Please upload audio files first.")
        return

    print(f"❓ Question: {question}")
    print("="*60)

    # Get memory info
    memory_info = audio_manager.get_memory_info()

    try:
        if audio_manager.embeddings and audio_manager.merged_store:
            # Use vector search
            retriever = audio_manager.get_retriever(k=5)
            retrieved_docs = retriever.invoke(question)

            context = ""
            for doc in retrieved_docs:
                source = doc.metadata.get('file_identifier', 'Unknown')
                content_type = doc.metadata.get('content_type', 'unknown')

                if content_type == 'chunk_summary':
                    time_info = f"[{doc.metadata.get('start_time', 'Unknown')} - {doc.metadata.get('end_time', 'Unknown')}]"
                    context += f"\n**Audio: {source} {time_info}**\n{doc.page_content}\n"
                else:
                    context += f"\n**Audio: {source} (Master Summary)**\n{doc.page_content}\n"

            print("📄 Retrieved Context Preview:")
            preview = context[:500] + "..." if len(context) > 500 else context
            print(preview)
            print("\n" + "="*60 + "\n")

        else:
            # Fallback: use all summaries
            context = ""
            for file_id in memory_info['active_files']:
                summary_data = audio_manager.get_audio_summary(file_id)
                if summary_data:
                    context += f"\n**Audio: {file_id}**\n{summary_data['master_summary']}\n"

        # Generate answer
        answer_prompt = ChatPromptTemplate.from_template(
            """Answer the following question based on the audio content provided:

            Context from Audio Files:
            {context}

            Question: {question}

            Provide a comprehensive answer that:
            1. Directly addresses the question
            2. References specific audio files and segments when relevant
            3. Synthesizes information from multiple sources if applicable
            4. Includes relevant details and context

            Answer:"""
        )

        final_answer = llm.invoke(
            answer_prompt.format_prompt(
                context=context,
                question=question
            ).to_string()
        ).content

        print("💡 Answer:")
        print(final_answer)
        print("\n" + "="*80 + "\n")

        return final_answer

    except Exception as e:
        print(f"❌ Error during Q&A: {e}")
        return None

def display_audio_summary(file_identifier: str):
    """Display comprehensive summary for a specific audio file"""
    summary_data = audio_manager.get_audio_summary(file_identifier)

    if not summary_data:
        print(f"❌ No summary available for {file_identifier}")
        return

    print(f"\n🎵 COMPREHENSIVE SUMMARY: {file_identifier}")
    print("="*80)

    print("📊 SUMMARY STATISTICS:")
    print(f"  Total Chunks: {summary_data['total_chunks']}")
    print(f"  Total Content Length: {summary_data['total_content_length']} characters")

    print("\n📝 MASTER SUMMARY:")
    print("-" * 40)
    print(summary_data['master_summary'])

    print(f"\n🕐 DETAILED SEGMENTS ({len(summary_data['chunk_summaries'])} segments):")
    print("-" * 40)
    for i, chunk in enumerate(summary_data['chunk_summaries']):
        print(f"\n⏰ {chunk['start_time']} - {chunk['end_time']}:")
        print(chunk['summary'])

        if i < len(summary_data['chunk_summaries']) - 1:
            print("\n" + "·" * 40)

    print("\n" + "="*80 + "\n")

def compare_audio_files(topic: str):
    """Compare how different audio files discuss a specific topic"""
    if not audio_manager.current_files:
        print("❌ No audio files loaded for comparison")
        return

    print(f"🔍 Comparing audio files on topic: {topic}")
    print("="*60)

    comparison_context = ""

    for file_id in audio_manager.current_files:
        summary_data = audio_manager.get_audio_summary(file_id)
        if summary_data:
            comparison_context += f"\n**Audio File: {file_id}**\n"
            comparison_context += f"Summary: {summary_data['master_summary']}\n"
            comparison_context += "-" * 40 + "\n"

    try:
        comparison_prompt = ChatPromptTemplate.from_template(
            """Compare how these different audio files discuss the topic "{topic}":

            {comparison_context}

            Provide a comprehensive comparison that:
            1. Identifies common themes and approaches
            2. Highlights unique perspectives from each audio file
            3. Notes any contradictions or different viewpoints
            4. Synthesizes the information into key insights
            5. Mentions which files provide the most depth on specific aspects

            Comparison Analysis:"""
        )

        comparison_response = llm.invoke(
            comparison_prompt.format_prompt(
                topic=topic,
                comparison_context=comparison_context
            ).to_string()
        )

        print("📊 Audio File Comparison Analysis:")
        print(comparison_response.content)
        print("\n" + "="*80 + "\n")

    except Exception as e:
        print(f"❌ Error during comparison: {e}")

def export_summaries():
    """Export all summaries to a file"""
    if not audio_manager.current_files:
        print("❌ No audio files to export")
        return

    try:
        export_data = {
            'export_timestamp': datetime.now().isoformat(),
            'total_files': len(audio_manager.current_files),
            'audio_files': {}
        }

        for file_id in audio_manager.current_files:
            summary_data = audio_manager.get_audio_summary(file_id)
            metadata = audio_manager.audio_metadata.get(file_id, {})

            export_data['audio_files'][file_id] = {
                'metadata': metadata,
                'summary_data': summary_data
            }

        export_filename = f"audio_summaries_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

        with open(export_filename, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, ensure_ascii=False)

        print(f"✅ Exported summaries to: {export_filename}")

    except Exception as e:
        print(f"❌ Export failed: {e}")

def show_system_stats():
    """Show detailed system statistics"""
    info = audio_manager.get_memory_info()

    print("📊 SYSTEM STATISTICS")
    print("="*50)
    print(f"🎵 Total Audio Files Loaded: {len(info['active_files'])}")
    print(f"⏱️  Total Content Duration: {info['total_duration_minutes']:.1f} minutes")
    print(f"📝 Total Summary Chunks: {info['total_chunks']}")
    print(f"💾 Cache Directory: {audio_manager.cache_dir}")

    if audio_manager.embeddings:
        print(f"🔍 Vector Search: Enabled")
        print(f"📊 Vector Stores: {len(audio_manager.audio_stores)}")
    else:
        print(f"🔍 Vector Search: Disabled (embeddings not available)")

    print(f"🗄️  Audio Processing: {'Available' if AUDIO_PROCESSING_AVAILABLE else 'Not Available'}")
    print(f"🎤 Google Cloud Speech: {'Available' if GOOGLE_CLOUD_SPEECH_AVAILABLE else 'Not Available'}")

    print("\n📈 PER-FILE BREAKDOWN:")
    print("-" * 30)

    for file_id in info['active_files']:
        metadata = info['metadata'].get(file_id, {})
        summary_data = metadata.get('summary_data', {})
        audio_info = metadata.get('audio_info', {})

        duration = audio_info.get('duration_minutes', 0)
        chunks = summary_data.get('total_chunks', 0)

        print(f"🎵 {file_id}")
        print(f"   Duration: {duration:.1f} min | Chunks: {chunks}")
        print(f"   Size: {audio_info.get('file_size_mb', 0):.1f} MB")

# Interactive system
def run_audio_processing_system():
    """Run the interactive audio processing system"""
    print("🚀 Audio Processing & Q&A System")
    print("="*80)
    print("📋 BASIC COMMANDS:")
    print("  upload                - Upload new audio file for processing")
    print("  process <filename>    - Process uploaded audio file")
    print("  remove <filename>     - Remove audio file from memory")
    print("  summary <filename>    - Show detailed summary of specific file")
    print("  summaries            - Show summaries of all loaded files")
    print("\n🔍 ANALYSIS COMMANDS:")
    print("  memory               - Show memory status and statistics")
    print("  sources              - List active audio sources")
    print("  search <query>       - Search across all audio summaries")
    print("  compare <topic>      - Compare how files discuss a topic")
    print("  stats                - Show detailed system statistics")
    print("\n💾 UTILITY COMMANDS:")
    print("  export               - Export all summaries to JSON file")
    print("  clear                - Clear all loaded files")
    print("  help                 - Show this help message")
    print("  quit                 - Exit system")
    print("\n❓ QUESTIONS:")
    print("  <question>           - Ask any question about loaded audio files")
    print("="*80)

    # Check system requirements
    if not AUDIO_PROCESSING_AVAILABLE:
        print("\n⚠️  WARNING: pydub not available. Audio splitting disabled.")
    if not GOOGLE_CLOUD_SPEECH_AVAILABLE:
        print("⚠️  WARNING: Google Cloud Speech not available. Using sample data.")

    print(f"\n🏠 Cache Directory: {audio_manager.cache_dir}")
    print("💡 TIP: Start by uploading an audio file with: upload")

    while True:
        user_input = input("\n🎯 Enter command or question: ").strip()

        if user_input.lower() in ['quit', 'exit', 'q']:
            print("👋 Thanks for using the Audio Processing Q&A system!")
            break

        elif user_input.lower() in ['help', 'h']:
            print("📋 Commands listed above. Try 'upload' to get started!")

        elif user_input.lower() == 'upload':
            file_path = input("📁 Enter the path to your audio file: ").strip()
            if file_path and os.path.exists(file_path):
                success = audio_manager.add_or_update_audio(file_path)
                if success:
                    print(f"✅ Successfully processed: {file_path}")
                else:
                    print(f"❌ Failed to process: {file_path}")
            else:
                print("❌ File not found or no path provided")

        elif user_input.startswith('add '):
            file_path = user_input[4:].strip()
            if file_path and os.path.exists(file_path):
                success = audio_manager.add_or_update_audio(file_path)
                if success:
                    print(f"✅ Successfully processed: {file_path}")
                else:
                    print(f"❌ Failed to process: {file_path}")
            else:
                print("❌ File not found or invalid path")

        elif user_input.startswith('process '):
            filename = user_input[8:].strip()
            if filename and os.path.exists(filename):
                success = audio_manager.add_or_update_audio(filename)
                if success:
                    print(f"✅ Successfully processed: {filename}")
                else:
                    print(f"❌ Failed to process: {filename}")
            else:
                print("❌ File not found or no filename provided")

        elif user_input.startswith('remove '):
            filename = user_input[7:].strip()
            if filename:
                success = audio_manager.remove_audio(filename)
                if success:
                    print(f"✅ Successfully removed: {filename}")
                else:
                    print(f"❌ File not found: {filename}")
            else:
                print("❌ Please provide a filename")

        elif user_input.startswith('summary '):
            filename = user_input[8:].strip()
            if filename:
                display_audio_summary(filename)
            else:
                print("❌ Please provide a filename")

        elif user_input.lower() == 'summaries':
            info = audio_manager.get_memory_info()
            if info['active_files']:
                for file_id in info['active_files']:
                    display_audio_summary(file_id)
            else:
                print("❌ No audio files loaded. Upload files first with 'upload'")

        elif user_input.lower() == 'memory':
            info = audio_manager.get_memory_info()
            print("🧠 Memory Status:")
            print(f"  Active Files: {len(info['active_files'])}")
            print(f"  Total Duration: {info['total_duration_minutes']} minutes")
            print(f"  Total Chunks: {info['total_chunks']}")

            if info['active_files']:
                print("\n🎵 Loaded Audio Files:")
                for file_id, metadata in info['metadata'].items():
                    chunks = metadata.get('chunks_count', 0)
                    audio_info = metadata.get('audio_info', {})
                    duration = audio_info.get('duration_formatted', 'Unknown')
                    print(f"    🎵 {file_id}")
                    print(f"        Duration: {duration} | Chunks: {chunks}")

        elif user_input.lower() == 'sources':
            info = audio_manager.get_memory_info()
            if info['active_files']:
                print("🎵 Active Audio Sources:")
                for i, file_id in enumerate(info['active_files'], 1):
                    metadata = info['metadata'].get(file_id, {})
                    audio_info = metadata.get('audio_info', {})
                    duration = audio_info.get('duration_formatted', 'Unknown')
                    size = audio_info.get('file_size_mb', 0)
                    print(f"  {i}. {file_id}")
                    print(f"     Duration: {duration} | Size: {size:.1f} MB")
            else:
                print("❌ No audio files loaded. Upload files first with 'upload'")

        elif user_input.startswith('search '):
            query = user_input[7:].strip()
            if query:
                audio_qa_pipeline(query)
            else:
                print("❌ Please provide a search query")

        elif user_input.startswith('compare '):
            topic = user_input[8:].strip()
            if topic:
                compare_audio_files(topic)
            else:
                print("❌ Please provide a topic to compare")

        elif user_input.lower() == 'stats':
            show_system_stats()

        elif user_input.lower() == 'export':
            export_summaries()

        elif user_input.lower() == 'clear':
            confirm = input("⚠️  Are you sure you want to clear all audio files? (yes/no): ")
            if confirm.lower() in ['yes', 'y']:
                for file_id in list(audio_manager.current_files):
                    audio_manager.remove_audio(file_id)
                print("✅ All audio files cleared from memory")
            else:
                print("❌ Clear operation cancelled")

        elif user_input:
            try:
                if audio_manager.current_files:
                    audio_qa_pipeline(user_input)
                else:
                    print("❌ No audio files loaded. Please upload audio files first with 'upload'")
            except Exception as e:
                print(f"❌ Error: {e}")
                print("💡 Try rephrasing your question or check if files are properly loaded")
        else:
            print("❓ Please enter a valid command or question. Type 'help' for available commands.")

# System check and startup
print("🔧 SYSTEM REQUIREMENTS CHECK:")
print(f"✅ LangChain and Gemini: Available")
print(f"{'✅' if AUDIO_PROCESSING_AVAILABLE else '❌'} Audio Processing (pydub): {'Available' if AUDIO_PROCESSING_AVAILABLE else 'Not Available'}")
print(f"{'✅' if GOOGLE_CLOUD_SPEECH_AVAILABLE else '❌'} Google Cloud Speech: {'Available' if GOOGLE_CLOUD_SPEECH_AVAILABLE else 'Not Available'}")
print(f"{'✅' if EMBEDDINGS_AVAILABLE else '❌'} HuggingFace Embeddings: {'Available' if EMBEDDINGS_AVAILABLE else 'Not Available'}")
print(f"{'✅' if COLAB_AVAILABLE else '❌'} Google Colab File Upload: {'Available' if COLAB_AVAILABLE else 'Not Available'}")

if not AUDIO_PROCESSING_AVAILABLE:
    print("\n📦 To install audio processing:")
    print("   pip install pydub")

if not GOOGLE_CLOUD_SPEECH_AVAILABLE:
    print("\n📦 To install Google Cloud Speech:")
    print("   pip install google-cloud-speech")

if not EMBEDDINGS_AVAILABLE:
    print("\n📦 To install embedding support:")
    print("   pip install sentence-transformers")

print(f"\n🔑 Gemini API Key: {'✅ Configured' if os.getenv('GOOGLE_API_KEY') else '❌ Missing'}")

if not os.getenv('GOOGLE_API_KEY'):
    print("⚠️  Please set GOOGLE_API_KEY in your .env file")
else:
    print("\n✅ Cell 3 Complete: System ready!")
    print("🚀 Run: run_audio_processing_system() to start the interactive system")

# Uncomment the line below to auto-start the system
# run_audio_processing_system()

🔧 SYSTEM REQUIREMENTS CHECK:
✅ LangChain and Gemini: Available
✅ Audio Processing (pydub): Available
✅ Google Cloud Speech: Available
✅ HuggingFace Embeddings: Available
✅ Google Colab File Upload: Available

🔑 Gemini API Key: ✅ Configured

✅ Cell 3 Complete: System ready!
🚀 Run: run_audio_processing_system() to start the interactive system


In [None]:
run_audio_processing_system()

🚀 Audio Processing & Q&A System
📋 BASIC COMMANDS:
  upload                - Upload new audio file for processing
  process <filename>    - Process uploaded audio file
  remove <filename>     - Remove audio file from memory
  summary <filename>    - Show detailed summary of specific file
  summaries            - Show summaries of all loaded files

🔍 ANALYSIS COMMANDS:
  memory               - Show memory status and statistics
  sources              - List active audio sources
  search <query>       - Search across all audio summaries
  compare <topic>      - Compare how files discuss a topic
  stats                - Show detailed system statistics

💾 UTILITY COMMANDS:
  export               - Export all summaries to JSON file
  clear                - Clear all loaded files
  help                 - Show this help message
  quit                 - Exit system

❓ QUESTIONS:
  <question>           - Ask any question about loaded audio files

🏠 Cache Directory: ./audio_cache
💡 TIP: Start by u

KeyboardInterrupt: Interrupted by user