In [None]:
# Install required libraries
!pip install gradio
!pip install openai
!pip install chromadb
!pip install moviepy
!pip install opencv-python
!pip install python-dotenv
!pip install langchain
!pip install langchain-openai
!pip install tiktoken

print("✅ All libraries installed successfully!")

In [None]:
!pip install -U google-generativeai

In [None]:
# Load API keys from Colab Secrets (persists across sessions!)
from google.colab import userdata
import os

os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')
os.environ["ASSEMBLYAI_API_KEY"] = userdata.get('ASSEMBLYAI_API_KEY')

print("✅ API keys loaded from secrets!")

✅ API keys loaded from secrets!


In [None]:
# Install Gemini and AssemblyAI libraries
!pip install -q google-generativeai
!pip install -q assemblyai

print("✅ Gemini and AssemblyAI libraries installed!")

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/51.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.6/51.6 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h✅ Gemini and AssemblyAI libraries installed!


In [None]:
# Final working version for Google Colab - WITH COLAB SECRETS
import google.generativeai as genai
import assemblyai as aai
import os
from google.colab import userdata  # Add this import

print("🔧 Setting up APIs...\n")

# Configure APIs with Colab Secrets
try:
    # Load from Colab secrets
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    ASSEMBLYAI_API_KEY = userdata.get('ASSEMBLYAI_API_KEY')

    print("✅ API keys loaded from Colab secrets")

    # Configure Gemini API
    genai.configure(api_key=GOOGLE_API_KEY)

    # Test Gemini API
    try:
        model = genai.GenerativeModel('models/gemini-2.0-flash')
        response = model.generate_content("Say 'Gemini API is working!'")
        print("✅ Gemini API Test:")
        print(response.text)
        print()
    except Exception as e:
        print(f"❌ Gemini Error: {e}")
        print("\nTrying alternative model...")
        try:
            model = genai.GenerativeModel('models/gemini-1.5-flash')
            response = model.generate_content("Say 'Gemini API is working!'")
            print("✅ Gemini API Test (with gemini-1.5-flash):")
            print(response.text)
            print()
        except Exception as e2:
            print(f"❌ Still error: {e2}\n")

    # Test AssemblyAI API
    try:
        aai.settings.api_key = ASSEMBLYAI_API_KEY
        transcriber = aai.Transcriber()
        print("✅ AssemblyAI API: Connected successfully!")
        print()
    except Exception as e:
        print(f"❌ AssemblyAI Error: {e}\n")

    print("🎉 Setup complete! Ready to build!")

except Exception as e:
    print(f"❌ Error loading API keys: {e}")
    print("\n💡 Make sure you've set up Colab secrets:")
    print("1. Click the 🔑 key icon in left sidebar")
    print("2. Add two secrets:")
    print("   - Name: GOOGLE_API_KEY, Value: your_gemini_api_key")
    print("   - Name: ASSEMBLYAI_API_KEY, Value: your_assemblyai_api_key")
    print("3. Restart runtime and run this cell again")

🔧 Setting up APIs...

✅ API keys loaded from Colab secrets
✅ Gemini API Test:
Gemini API is working!


✅ AssemblyAI API: Connected successfully!

🎉 Setup complete! Ready to build!


In [None]:
#@title Upload video here
# Create a sample upload area
from google.colab import files
import shutil

print("📹 Upload a test video file (MP4, MOV, AVI, etc.)")
print("Recommended: A short video (1-3 minutes) for testing\n")

uploaded = files.upload()

# Get the uploaded filename
video_filename = list(uploaded.keys())[0]
print(f"\n✅ Video uploaded: {video_filename}")

In [None]:
!pip install assemblyai

Collecting assemblyai
  Downloading assemblyai-0.45.5-py3-none-any.whl.metadata (27 kB)
Downloading assemblyai-0.45.5-py3-none-any.whl (51 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/51.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.6/51.6 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: assemblyai
Successfully installed assemblyai-0.45.5


In [None]:
#@title Transcript Agent
# AGENT 1: Transcription Agent
import assemblyai as aai
from moviepy.editor import VideoFileClip
import os
from google.colab import userdata  # Add this import

class TranscriptionAgent:
    def __init__(self, assemblyai_api_key):
        """Initialize the Transcription Agent"""
        aai.settings.api_key = assemblyai_api_key
        self.transcriber = aai.Transcriber()
        print("✅ Transcription Agent initialized")

    def extract_audio(self, video_path):
        """Extract audio from video file"""
        try:
            print(f"🎬 Extracting audio from: {video_path}")
            video = VideoFileClip(video_path)
            audio_path = "extracted_audio.mp3"
            video.audio.write_audiofile(audio_path, verbose=False, logger=None)
            video.close()
            print(f"✅ Audio extracted: {audio_path}")
            return audio_path
        except Exception as e:
            print(f"❌ Audio extraction error: {e}")
            return None

    def transcribe_audio(self, audio_path):
        """Transcribe audio file using AssemblyAI"""
        try:
            print(f"🎤 Transcribing audio... (this may take a minute)")

            config = aai.TranscriptionConfig(
                speech_model=aai.SpeechModel.best,
                punctuate=True,
                format_text=True
            )

            transcript = self.transcriber.transcribe(audio_path, config=config)

            if transcript.status == aai.TranscriptStatus.error:
                print(f"❌ Transcription error: {transcript.error}")
                return None

            print("✅ Transcription complete!")
            return {
                'text': transcript.text,
                'words': transcript.words,
                'duration': transcript.audio_duration
            }

        except Exception as e:
            print(f"❌ Transcription error: {e}")
            return None

    def process_video(self, video_path):
        """Complete workflow: extract audio + transcribe"""
        print("\n" + "="*50)
        print("🤖 TRANSCRIPTION AGENT STARTED")
        print("="*50 + "\n")

        # Step 1: Extract audio
        audio_path = self.extract_audio(video_path)
        if not audio_path:
            return None

        # Step 2: Transcribe
        result = self.transcribe_audio(audio_path)

        if result:
            print(f"\n📊 Transcription Stats:")
            print(f"   Duration: {result['duration']/1000:.1f} seconds")
            print(f"   Words: {len(result['words'])}")
            print(f"   Characters: {len(result['text'])}")
            print("\n" + "="*50)

        return result

# Initialize the agent - FIXED VERSION
try:
    # Method 1: Using Colab userdata (recommended)
    ASSEMBLYAI_API_KEY = userdata.get('ASSEMBLYAI_API_KEY')
    print("✅ AssemblyAI API key loaded from Colab secrets")
except Exception as e:
    try:
        # Method 2: Traditional environment variable
        ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
        print("✅ AssemblyAI API key loaded from environment variable")
    except:
        print("❌ Could not load AssemblyAI API key. Please check your Colab secrets.")
        ASSEMBLYAI_API_KEY = None

if ASSEMBLYAI_API_KEY:
    transcription_agent = TranscriptionAgent(ASSEMBLYAI_API_KEY)
    print("\n🎯 Transcription Agent ready to use!")
else:
    print("\n⚠️  Please set up your ASSEMBLYAI_API_KEY in Colab secrets first!")

✅ AssemblyAI API key loaded from Colab secrets
✅ Transcription Agent initialized

🎯 Transcription Agent ready to use!


In [None]:
#@title write the transcript to file
# Test the Transcription Agent
result = transcription_agent.process_video(video_filename)

if result:
    print("\n📝 TRANSCRIPT PREVIEW (first 500 characters):")
    print("-" * 50)
    print(result['text'][:500] + "...")
    print("-" * 50)

    # Save full transcript
    with open('transcript.txt', 'w') as f:
        f.write(result['text'])
    print("\n💾 Full transcript saved to: transcript.txt")
else:
    print("\n❌ Transcription failed. Please check the error messages above.")


In [None]:
#@title AGENT 2: Summary Agent
import google.generativeai as genai
import os
from google.colab import userdata  # Add this import

class SummaryAgent:
    def __init__(self, gemini_api_key):
        """Initialize the Summary Agent with Gemini"""
        genai.configure(api_key=gemini_api_key)
        self.model = genai.GenerativeModel('models/gemini-2.0-flash')
        print("✅ Summary Agent initialized")

    def generate_summary(self, transcript_text):
        """Generate a comprehensive summary of the transcript."""
        try:
            print("\n" + "="*50)
            print("🤖 SUMMARY AGENT STARTED")
            print("="*50 + "\n")

            print("📝 Analyzing transcript and generating summary...")

            prompt = f"""
            Analyze the following video transcript and provide:

            1. **Main Topic**: What is this video about? (1-2 sentences)
            2. **Key Points**: List the 5-7 most important points discussed (bullet points)
            3. **Brief Summary**: A concise 2-3 paragraph summary of the entire content. can be less if video is short
            4. **Key Takeaways**: 3-5 actionable insights or conclusions

            Transcript:
            {transcript_text}

            Format your response clearly with headers for each section. and use bold wherever required.
            """

            response = self.model.generate_content(prompt)

            print("✅ Summary generated successfully!\n")
            print("="*50)

            return response.text

        except Exception as e:
            print(f"❌ Summary generation error: {e}")
            return None

    def generate_chapter_markers(self, transcript_text):
        """Generate chapter markers/timestamps for the video"""
        try:
            print("\n📌 Generating chapter markers...")

            prompt = f"""
            Based on this video transcript, identify the main topics/sections discussed.
            Use bold to highlight info.
            Create chapter markers with:
            - Chapter title (concise, descriptive)
            - Brief description (1 sentence)

            Since we don't have exact timestamps, suggest logical breakpoints.
            Create 3-5 chapters depending on content.

            Transcript:
            {transcript_text}

            Format as:
            Chapter 1: [Title]
            - Description: [1 sentence]

            Chapter 2: [Title]
            - Description: [1 sentence]

            (etc.)
            """

            response = self.model.generate_content(prompt)

            print("✅ Chapter markers generated!\n")

            return response.text

        except Exception as e:
            print(f"❌ Chapter marker error: {e}")
            return None

    def process_transcript(self, transcript_text):
        """Complete workflow: generate summary + chapter markers"""
        # Generate summary
        summary = self.generate_summary(transcript_text)

        # Generate chapters
        chapters = self.generate_chapter_markers(transcript_text)

        return {
            'summary': summary,
            'chapters': chapters
        }

# Initialize the Summary Agent - FIXED VERSION
try:
    # Method 1: Using Colab userdata (recommended)
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    print("✅ Google API key loaded from Colab secrets")
except Exception as e:
    try:
        # Method 2: Traditional environment variable
        GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
        print("✅ Google API key loaded from environment variable")
    except:
        print("❌ Could not load Google API key. Please check your Colab secrets.")
        GOOGLE_API_KEY = None

if GOOGLE_API_KEY:
    summary_agent = SummaryAgent(GOOGLE_API_KEY)
    print("\n🎯 Summary Agent ready to use!")
else:
    print("\n⚠️  Please set up your GOOGLE_API_KEY in Colab secrets first!")

✅ Google API key loaded from Colab secrets
✅ Summary Agent initialized

🎯 Summary Agent ready to use!


In [None]:
#@title Test the Summary Agent with the transcript we generated
with open('transcript.txt', 'r') as f:
    transcript = f.read()

# Generate summary and chapters
results = summary_agent.process_transcript(transcript)

# Display results
if results['summary']:
    print("\n📊 SUMMARY:")
    print("="*60)
    print(results['summary'])
    print("="*60)

if results['chapters']:
    print("\n\n📑 CHAPTER MARKERS:")
    print("="*60)
    print(results['chapters'])
    print("="*60)

# Save to file
with open('summary_output.txt', 'w') as f:
    f.write("VIDEO SUMMARY\n")
    f.write("="*60 + "\n\n")
    f.write(results['summary'])
    f.write("\n\n" + "="*60 + "\n\n")
    f.write("CHAPTER MARKERS\n")
    f.write("="*60 + "\n\n")
    f.write(results['chapters'])

print("\n\n💾 Summary saved to: summary_output.txt")

In [None]:
# Install ChromaDB with proper dependency handling
# !pip install --upgrade pip setuptools wheel
!pip install jedi>=0.16  # Fix the missing dependency
!pip install chromadb

print("✅ ChromaDB installed successfully!")

Collecting chromadb
  Downloading chromadb-1.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.38.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?

✅ ChromaDB installed successfully!


In [None]:
#@title Embedding Agent
import chromadb
from chromadb.utils import embedding_functions
import google.generativeai as genai
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Any, Optional
import logging

class EnhancedEmbeddingAgent:
    def __init__(self, embedding_model: str = "sentence_transformer", gemini_api_key: Optional[str] = None):
        """
        Initialize Embedding Agent with multiple embedding options

        Args:
            embedding_model: "sentence_transformer" (free), "gemini" (paid), or "openai" (paid)
            gemini_api_key: Optional API key for Gemini (only needed if using Gemini)
        """
        self.embedding_model = embedding_model
        self.chroma_client = chromadb.Client()

        # Clean up existing collection
        try:
            self.chroma_client.delete_collection("video_transcripts")
        except:
            pass

        # Initialize embedding function based on choice
        if embedding_model == "sentence_transformer":
            print("🆓 Using free SentenceTransformer embeddings (all-MiniLM-L6-v2)")
            self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
                model_name="all-MiniLM-L6-v2"
            )
        elif embedding_model == "gemini" and gemini_api_key:
            print("🔑 Using Gemini embeddings (paid)")
            genai.configure(api_key=gemini_api_key)
            self.embedding_function = self._gemini_embedding_function
        elif embedding_model == "openai":
            print("🔑 Using OpenAI embeddings (paid)")
            # You would need to install openai package and set API key
            # self.embedding_function = embedding_functions.OpenAIEmbeddingFunction(...)
            raise NotImplementedError("OpenAI embeddings not implemented in this example")
        else:
            print("🆓 Defaulting to free SentenceTransformer embeddings")
            self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
                model_name="all-MiniLM-L6-v2"
            )

        # Create collection
        self.collection = self.chroma_client.create_collection(
            name="video_transcripts",
            embedding_function=self.embedding_function
        )

        # Initialize SentenceTransformer for manual embeddings if needed
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        print(f"✅ Embedding Agent initialized with {embedding_model}")
        print(f"✅ Vector database created: {self.collection.name}")

    def _gemini_embedding_function(self, texts: List[str]) -> List[List[float]]:
        """Custom embedding function for Gemini"""
        try:
            embeddings = []
            for text in texts:
                # Gemini embedding API call
                result = genai.embed_content(
                    model="models/embedding-001",
                    content=text,
                    task_type="retrieval_document"
                )
                embeddings.append(result['embedding'])
            return embeddings
        except Exception as e:
            print(f"❌ Gemini embedding failed: {e}")
            # Fallback to SentenceTransformer
            print("🔄 Falling back to SentenceTransformer embeddings")
            return self.sentence_model.encode(texts).tolist()

    def chunk_transcript(self, transcript_text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        """Split transcript into overlapping chunks with smart paragraph boundaries"""
        words = transcript_text.split()
        chunks = []

        # If transcript is short, return as single chunk
        if len(words) <= chunk_size:
            return [transcript_text]

        # Smart chunking with sentence/paragraph awareness
        sentences = transcript_text.split('. ')
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_words = sentence.split()
            sentence_length = len(sentence_words)

            # If adding this sentence exceeds chunk size, save current chunk
            if current_length + sentence_length > chunk_size and current_chunk:
                chunks.append(' '.join(current_chunk))
                # Start new chunk with overlap
                overlap_words = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk
                current_chunk = overlap_words
                current_length = len(overlap_words)

            current_chunk.extend(sentence_words)
            current_length += sentence_length

        # Add final chunk
        if current_chunk:
            chunks.append(' '.join(current_chunk))

        print(f"✂️  Created {len(chunks)} chunks from transcript")
        return chunks

    def store_embeddings(self, transcript_text: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """Chunk transcript and store embeddings in vector DB"""
        try:
            print("\n" + "="*50)
            print("🤖 ENHANCED EMBEDDING AGENT STARTED")
            print("="*50)

            print("✂️  Chunking transcript with smart boundaries...")
            chunks = self.chunk_transcript(transcript_text)

            if not chunks:
                print("❌ No chunks created from transcript")
                return None

            print(f"✅ Created {len(chunks)} chunks")
            print(f"📊 Chunk sizes: {[len(chunk.split()) for chunk in chunks]} words")

            print("\n🧠 Generating embeddings...")

            # Prepare metadata for each chunk
            base_metadata = metadata or {}
            metadatas = []
            for i, chunk in enumerate(chunks):
                chunk_metadata = base_metadata.copy()
                chunk_metadata.update({
                    "chunk_index": i,
                    "chunk_length": len(chunk),
                    "word_count": len(chunk.split()),
                    "embedding_model": self.embedding_model
                })
                metadatas.append(chunk_metadata)

            # Add to ChromaDB collection
            ids = [f"chunk_{i}_{hash(chunk) % 10000}" for i, chunk in enumerate(chunks)]

            self.collection.add(
                documents=chunks,
                ids=ids,
                metadatas=metadatas
            )

            # Verify storage
            collection_count = self.collection.count()
            print(f"✅ Stored {collection_count} chunks in vector database")

            stats = {
                'total_chunks': len(chunks),
                'collection_name': self.collection.name,
                'embedding_model': self.embedding_model,
                'average_chunk_size': np.mean([len(chunk.split()) for chunk in chunks])
            }

            print(f"📊 Embedding Stats: {stats}")
            print("="*50)

            return stats

        except Exception as e:
            print(f"❌ Embedding storage error: {e}")
            logging.error(f"Embedding storage failed: {e}")
            return None

    def semantic_search(self, query: str, n_results: int = 3, filter_metadata: Optional[Dict] = None) -> Dict[str, Any]:
        """Enhanced semantic search with filtering"""
        try:
            print(f"🔍 Performing semantic search: '{query}'")

            results = self.collection.query(
                query_texts=[query],
                n_results=n_results,
                where=filter_metadata  # Optional metadata filtering
            )

            if results and results['documents']:
                print(f"✅ Found {len(results['documents'][0])} relevant results")
                return {
                    'documents': results['documents'][0],
                    'metadatas': results['metadatas'][0],
                    'distances': results['distances'][0],
                    'query': query
                }
            else:
                print("❌ No results found")
                return None

        except Exception as e:
            print(f"❌ Semantic search error: {e}")
            return None

    def get_collection_stats(self) -> Dict[str, Any]:
        """Get statistics about the vector database"""
        try:
            count = self.collection.count()
            # Get sample of documents to analyze
            sample_results = self.collection.get(limit=min(10, count))

            stats = {
                'total_documents': count,
                'embedding_model': self.embedding_model,
                'sample_chunk_sizes': [len(doc.split()) for doc in sample_results['documents']] if sample_results['documents'] else []
            }

            return stats
        except Exception as e:
            print(f"❌ Error getting collection stats: {e}")
            return {}

    def clear_database(self):
        """Clear the vector database"""
        try:
            self.chroma_client.delete_collection("video_transcripts")
            self.collection = self.chroma_client.create_collection(
                name="video_transcripts",
                embedding_function=self.embedding_function
            )
            print("✅ Vector database cleared")
        except Exception as e:
            print(f"❌ Error clearing database: {e}")

# Initialize with free embeddings by default
embedding_agent = EnhancedEmbeddingAgent(embedding_model="sentence_transformer")
print("\n🎯 Enhanced Embedding Agent ready! (Using free embeddings)")

🆓 Using free SentenceTransformer embeddings (all-MiniLM-L6-v2)
✅ Embedding Agent initialized with sentence_transformer
✅ Vector database created: video_transcripts

🎯 Enhanced Embedding Agent ready! (Using free embeddings)


In [None]:
#@title Test the Embedding Agent
print("🧪 Testing Embedding Agent...")

# Read the transcript file
try:
    with open('transcript.txt', 'r') as f:
        transcript = f.read()
    print(f"📖 Loaded transcript: {len(transcript)} characters")

    # Store embeddings
    result = embedding_agent.store_embeddings(transcript)

    if result:
        print(f"📊 Embedding Stats:")
        print(f"   Total chunks: {result['total_chunks']}")
        print(f"   Collection: {result['collection_name']}")

        # Test search functionality
        print("\n🔍 Testing search functionality...")
        test_query = "main topic of the video"
        search_results = embedding_agent.search_similar(test_query, n_results=2)

        if search_results:
            print(f"✅ Search test successful!")
            print(f"   Query: '{test_query}'")
            print(f"   Found {len(search_results['documents'][0])} results")
        else:
            print("❌ Search test failed")

        print(f"\n✅ Vector database is ready for Q&A!")
    else:
        print("❌ Failed to store embeddings")

except FileNotFoundError:
    print("❌ transcript.txt file not found. Please run the transcription agent first.")
except Exception as e:
    print(f"❌ Testing error: {e}")

🧪 Testing Embedding Agent...
❌ transcript.txt file not found. Please run the transcription agent first.


In [None]:
#@title AGENT 4: Q&A Agent (uses RAG) - FIXED VERSION
import google.generativeai as genai
from google.colab import userdata

class QAAgent:
    def __init__(self, embedding_agent):
        """Initialize Q&A Agent with Gemini and access to vector DB"""
        # Get API key from Colab secrets
        gemini_api_key = userdata.get('GOOGLE_API_KEY')
        genai.configure(api_key=gemini_api_key)
        self.model = genai.GenerativeModel('models/gemini-2.0-flash')
        self.embedding_agent = embedding_agent
        print("✅ Q&A Agent initialized")

    def answer_question(self, question):
        """Answer a question using RAG (Retrieval Augmented Generation)"""
        try:
            print("\n" + "="*50)
            print("🤖 Q&A AGENT STARTED")
            print("="*50 + "\n")

            print(f"❓ Question: {question}\n")

            # Step 1: Search vector DB for relevant context
            print("🔍 Searching vector database for relevant content...")

            # Use the semantic_search method that exists in your embedding agent
            search_results = self.embedding_agent.semantic_search(question, n_results=3)

            print(f"📊 Search results type: {type(search_results)}")

            if not search_results or not search_results.get('documents'):
                print("❌ No documents found in search results")
                print(f"🔍 Search results: {search_results}")
                return "I couldn't find relevant information in the video to answer this question."

            # Get relevant chunks - handle the response format
            relevant_chunks = search_results['documents']
            context = "\n\n".join(relevant_chunks)

            print(f"✅ Found {len(relevant_chunks)} relevant sections")
            print(f"📝 First chunk preview: {relevant_chunks[0][:200]}...\n")

            # Step 2: Generate answer using context
            print("💭 Generating answer using context...")

            prompt = f"""
            Based on the following content from a video transcript, answer the user's question.

            Context from video:
            {context}

            User's question: {question}

            Instructions:
            - Answer directly and concisely
            - If the user is asking questions abount the video and not the content in the video, then use your knowledge and answer him. In this case, dont say Idk
            - Use only information from the provided context to answer questions related to it.
            - If the context doesn't contain what user is asking, say "I cannot find this information in the video"
            - Be helpful and clear
            - Interact with the user in a friendly way
            - You can use emojis and images to make the user understand better

            Answer:
            """

            response = self.model.generate_content(prompt)
            answer = response.text

            print("✅ Answer generated!")
            print(f"🤖 Answer: {answer}\n")
            print("="*50)

            return {
                'question': question,
                'answer': answer,
                'sources': relevant_chunks
            }

        except Exception as e:
            print(f"❌ Q&A error: {e}")
            import traceback
            print(f"🔍 Full traceback: {traceback.format_exc()}")
            return None

# Re-initialize the Q&A Agent with the correct method
qa_agent = QAAgent(embedding_agent)
print("\n🎯 Q&A Agent ready to use!")

✅ Q&A Agent initialized

🎯 Q&A Agent ready to use!


In [None]:
#@title Test the Q&A Agent with sample questions

# You can modify these questions based on your video content
test_questions = [
    "What is the main topic of this video?",
    "What are the key points discussed?",
    "Can you summarize what was said?"
]

print("🧪 Testing Q&A Agent with sample questions:\n")

for question in test_questions:
    result = qa_agent.answer_question(question)

    if result:
        print(f"\n💬 ANSWER:")
        print("-" * 60)
        print(result['answer'])
        print("-" * 60)
        print()

    # Small delay between questions
    import time
    time.sleep(2)

print("\n✅ Q&A Agent testing complete!")

🧪 Testing Q&A Agent with sample questions:


🤖 Q&A AGENT STARTED

❓ Question: What is the main topic of this video?

🔍 Searching vector database for relevant content...
🔍 Performing semantic search: 'What is the main topic of this video?'
✅ Found 3 relevant results
📊 Search results type: <class 'dict'>
✅ Found 3 relevant sections
📝 First chunk preview: Alright, let's learn some trigonometry or Algebra three Or Precalculus Okay, lesson number one These are all the same thing Trigonometry is algebra three, trig meaning three, and onometry, meaning the...

💭 Generating answer using context...
✅ Answer generated!
🤖 Answer: The main topic of this video is trigonometry, specifically focusing on the unit circle and trigonometric functions like sine, cosine, and tangent.



💬 ANSWER:
------------------------------------------------------------
The main topic of this video is trigonometry, specifically focusing on the unit circle and trigonometric functions like sine, cosine, and tangent.

----

KeyboardInterrupt: 

In [None]:
#@title Frame Extraction
import cv2
from moviepy.editor import VideoFileClip
import matplotlib.pyplot as plt
from IPython.display import Image, display
import os
import base64
from PIL import Image as PILImage

class FrameExtractor:
    def __init__(self):
        """Initialize Frame Extractor"""
        print("✅ Frame Extractor initialized")

    def extract_key_frames(self, video_path, num_frames=5):
        """Extract key frames from video at regular intervals"""
        try:
            print("\n" + "="*50)
            print("🎬 FRAME EXTRACTION STARTED")
            print("="*50 + "\n")

            print(f"📹 Processing video: {video_path}")

            # Get video info
            video = VideoFileClip(video_path)
            duration = video.duration
            fps = video.fps
            total_frames = int(duration * fps)

            print(f"   Duration: {duration:.1f} seconds")
            print(f"   FPS: {fps}")
            print(f"   Total frames: {total_frames}\n")

            # Calculate frame intervals
            interval = duration / (num_frames + 1)
            timestamps = [interval * (i + 1) for i in range(num_frames)]

            print(f"🎞️  Extracting {num_frames} key frames...")

            # Create frames directory
            os.makedirs('extracted_frames', exist_ok=True)

            frames_data = []

            for idx, timestamp in enumerate(timestamps):
                # Extract frame at timestamp
                frame = video.get_frame(timestamp)

                # Save frame
                frame_path = f'extracted_frames/frame_{idx+1}_at_{timestamp:.1f}s.jpg'
                plt.imsave(frame_path, frame)

                frames_data.append({
                    'frame_number': idx + 1,
                    'timestamp': timestamp,
                    'path': frame_path
                })

                print(f"   ✓ Frame {idx+1} at {timestamp:.1f}s")

            video.close()

            print(f"\n✅ Extracted {len(frames_data)} frames")
            print("="*50 + "\n")

            return frames_data

        except Exception as e:
            print(f"❌ Frame extraction error: {e}")
            return None

    def display_frames(self, frames_data):
        """Display extracted frames"""
        print("🖼️  EXTRACTED FRAMES:\n")

        for frame_info in frames_data:
            print(f"Frame {frame_info['frame_number']} (at {frame_info['timestamp']:.1f}s):")
            display(Image(filename=frame_info['path'], width=400))
            print()

# Initialize Frame Extractor
frame_extractor = FrameExtractor()
print("\n🎯 Frame Extractor ready to use!")

✅ Frame Extractor initialized

🎯 Frame Extractor ready to use!


In [None]:
#@title Complete Gradio Interface (Fixed Frame Display + Download Features)
import gradio as gr
import base64
from PIL import Image
import tempfile
import os
import zipfile

class VideoContentAnalyzer:
    def __init__(self, transcription_agent, summary_agent, embedding_agent, qa_agent, frame_extractor):
        self.transcription_agent = transcription_agent
        self.summary_agent = summary_agent
        self.embedding_agent = embedding_agent
        self.qa_agent = qa_agent
        self.frame_extractor = frame_extractor
        self.current_transcript = None
        self.current_frames = None
        self.current_summary = None

    def image_to_base64(self, image_path):
        """Convert image to base64 for HTML display"""
        try:
            with open(image_path, "rb") as img_file:
                return base64.b64encode(img_file.read()).decode('utf-8')
        except Exception as e:
            print(f"Error converting image to base64: {e}")
            return ""

    def create_text_file(self, content, filename):
        """Create a temporary text file for download"""
        temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8')
        temp_file.write(content)
        temp_file.close()
        return temp_file.name

    def create_frames_zip(self, frames_data, zip_filename="frames.zip"):
        """Create a ZIP file containing all extracted frames"""
        try:
            temp_zip = tempfile.NamedTemporaryFile(suffix='.zip', delete=False)
            temp_zip.close()

            with zipfile.ZipFile(temp_zip.name, 'w') as zipf:
                for frame in frames_data:
                    if os.path.exists(frame['path']):
                        # Use descriptive filename in the zip
                        frame_filename = f"frame_{frame['frame_number']}_at_{frame['timestamp']:.1f}s.jpg"
                        zipf.write(frame['path'], frame_filename)

            return temp_zip.name
        except Exception as e:
            print(f"Error creating frames ZIP: {e}")
            return None

    def process_video(self, video_file):
        """Process uploaded video - all agents work together"""
        if video_file is None:
            return "Please upload a video file!", "", "", "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

        try:
            # Initialize download files as None
            transcript_file = None
            summary_file = None
            frames_zip = None

            # Step 1: Transcription
            status = "🎤 Transcribing video...\n"
            yield status, "", "", "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

            transcript_result = self.transcription_agent.process_video(video_file)
            if not transcript_result:
                yield "❌ Transcription failed!", "", "", "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)
                return

            self.current_transcript = transcript_result['text']
            status += f"✅ Transcription complete! ({len(transcript_result['words'])} words)\n\n"

            # Step 2: Summary
            status += "📝 Generating summary...\n"
            yield status, self.current_transcript[:500] + "...", "", "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

            summary_result = self.summary_agent.process_transcript(self.current_transcript)
            summary_text = f"{summary_result['summary']}\n\n---\n\n{summary_result['chapters']}"
            self.current_summary = summary_text
            status += "✅ Summary generated!\n\n"

            # Step 3: Embeddings
            status += "🧠 Creating vector database...\n"
            yield status, self.current_transcript[:500] + "...", summary_text, "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

            embedding_result = self.embedding_agent.store_embeddings(self.current_transcript)
            status += f"✅ Vector DB ready! ({embedding_result['total_chunks']} chunks)\n\n"

            # Step 4: Frame Extraction
            status += "🎬 Extracting key frames...\n"
            yield status, self.current_transcript[:500] + "...", summary_text, "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

            self.current_frames = self.frame_extractor.extract_key_frames(video_file, num_frames=5)

            # Create frames display with base64 images
            frames_html = "<div style='display: grid; grid-template-columns: repeat(2, 1fr); gap: 15px; padding: 10px;'>"
            if self.current_frames:
                for frame in self.current_frames:
                    base64_image = self.image_to_base64(frame['path'])
                    if base64_image:
                        frames_html += f"""
                        <div style='text-align: center; border: 1px solid #ddd; border-radius: 10px; padding: 10px; background: #f9f9f9;'>
                            <img src='data:image/jpeg;base64,{base64_image}' style='width: 100%; max-width: 300px; border-radius: 8px;'>
                            <p style='margin: 8px 0; font-weight: bold;'>Frame {frame['frame_number']}</p>
                            <p style='margin: 0; color: #666;'>Timestamp: {frame['timestamp']:.1f}s</p>
                        </div>
                        """
                    else:
                        frames_html += f"""
                        <div style='text-align: center; border: 1px solid #ddd; border-radius: 10px; padding: 10px; background: #f9f9f9;'>
                            <p style='color: red;'>Image not found</p>
                            <p>Frame {frame['frame_number']} @ {frame['timestamp']:.1f}s</p>
                        </div>
                        """
            else:
                frames_html += "<p>No frames extracted</p>"

            frames_html += "</div>"

            # Create downloadable files
            transcript_file = self.create_text_file(self.current_transcript, "transcript.txt")
            summary_file = self.create_text_file(self.current_summary, "summary.txt")
            frames_zip = self.create_frames_zip(self.current_frames) if self.current_frames else None

            status += "✅ Frame extraction complete!\n\n"
            status += "🎉 All processing complete! You can now ask questions about the video."

            # Return results with download buttons
            yield (
                status,
                self.current_transcript,
                summary_text,
                frames_html,
                gr.DownloadButton(visible=True, value=transcript_file, label="📥 Download Transcript"),
                gr.DownloadButton(visible=True, value=summary_file, label="📥 Download Summary"),
                gr.DownloadButton(visible=True, value=frames_zip, label="📥 Download Frames") if frames_zip else gr.DownloadButton(visible=False)
            )

        except Exception as e:
            yield f"❌ Error: {str(e)}", "", "", "", gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False)

    def answer_question(self, question):
        """Answer question about the video"""
        if not self.current_transcript:
            return "Please process a video first!"

        if not question or question.strip() == "":
            return "Please enter a question!"

        result = self.qa_agent.answer_question(question)

        if result:
            answer = f"**Question:** {result['question']}\n\n"
            answer += f"**Answer:** {result['answer']}\n\n"
            answer += "---\n\n**Relevant Context:**\n\n"
            for i, source in enumerate(result['sources'][:2], 1):
                answer += f"{i}. {source[:200]}...\n\n"
            return answer
        else:
            return "Sorry, I couldn't generate an answer."

# Initialize the complete system
analyzer = VideoContentAnalyzer(
    transcription_agent,
    summary_agent,
    embedding_agent,
    qa_agent,
    frame_extractor
)

print("✅ Video Content Analyzer System Ready!")

# Create Gradio Interface
with gr.Blocks(title="Video Content Analyzer - Agentic AI", theme=gr.themes.Soft()) as demo:

    gr.Markdown("""
    # 🎥 Video Content Analyzer - Agentic AI System

    Upload a video and let multiple AI agents analyze it for you!

    **Features:**
    - 🎤 Automatic transcription
    - 📝 Intelligent summarization
    - 🧠 Semantic search with vector database
    - 💬 Q&A about video content
    - 🎬 Key frame extraction
    - 📥 Download all results
    """)

    with gr.Tab("📤 Upload & Process Video"):
        with gr.Row():
            with gr.Column():
                video_input = gr.Video(label="Upload Video File")
                process_btn = gr.Button("🚀 Process Video", variant="primary", size="lg")

            with gr.Column():
                status_output = gr.Textbox(
                    label="Processing Status",
                    lines=10,
                    max_lines=15
                )

        with gr.Row():
            with gr.Column():
                transcript_output = gr.Textbox(
                    label="📝 Transcript",
                    lines=10,
                    max_lines=20
                )
                download_transcript = gr.DownloadButton(
                    "📥 Download Transcript",
                    visible=False,
                    variant="secondary"
                )

            with gr.Column():
                summary_output = gr.Textbox(
                    label="📊 Summary & Chapters",
                    lines=10,
                    max_lines=20
                )
                download_summary = gr.DownloadButton(
                    "📥 Download Summary",
                    visible=False,
                    variant="secondary"
                )

        frames_output = gr.HTML(label="🎬 Extracted Key Frames")

        with gr.Row():
            download_frames = gr.DownloadButton(
                "📥 Download All Frames (ZIP)",
                visible=False,
                variant="secondary"
            )

    with gr.Tab("💬 Ask Questions"):
        gr.Markdown("### Ask questions about the video content")

        with gr.Row():
            question_input = gr.Textbox(
                label="Your Question",
                placeholder="e.g., What is the main topic discussed?",
                lines=2
            )

        ask_btn = gr.Button("🔍 Get Answer", variant="primary")

        answer_output = gr.Markdown(label="Answer")

        gr.Markdown("""
        **Example Questions:**
        - What is the main topic of this video?
        - What are the key points discussed?
        - Can you explain [specific topic] mentioned in the video?
        - What conclusions were drawn?
        """)

    # Event handlers
    process_btn.click(
        fn=analyzer.process_video,
        inputs=[video_input],
        outputs=[
            status_output,
            transcript_output,
            summary_output,
            frames_output,
            download_transcript,
            download_summary,
            download_frames
        ]
    )

    ask_btn.click(
        fn=analyzer.answer_question,
        inputs=[question_input],
        outputs=[answer_output]
    )

# Launch the interface with Colab-friendly settings
print("🚀 Launching Gradio Interface...")
try:
    demo.launch(share=True, debug=True)
except Exception as e:
    print(f"Note: If you encounter issues with share=True, try running with share=False")
    demo.launch(share=False, debug=True)

✅ Video Content Analyzer System Ready!
🚀 Launching Gradio Interface...
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://b8f755e56b0d1f440b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)



🤖 TRANSCRIPTION AGENT STARTED

🎬 Extracting audio from: /tmp/gradio/2ba17ff7c4873ce113d539bab9f36197d7501f98b018417a01805fdf6bf2dc71/All of Trigonometry Explained in 5 Minutes fDjLmYlweUA.webm
✅ Audio extracted: extracted_audio.mp3
🎤 Transcribing audio... (this may take a minute)
✅ Transcription complete!

📊 Transcription Stats:
   Duration: 0.3 seconds
   Words: 1041
   Characters: 5412


🤖 SUMMARY AGENT STARTED

📝 Analyzing transcript and generating summary...
✅ Summary generated successfully!


📌 Generating chapter markers...
✅ Chapter markers generated!


🤖 ENHANCED EMBEDDING AGENT STARTED
✂️  Chunking transcript with smart boundaries...
✂️  Created 3 chunks from transcript
✅ Created 3 chunks
📊 Chunk sizes: [494, 487, 160] words

🧠 Generating embeddings...
✅ Stored 3 chunks in vector database
📊 Embedding Stats: {'total_chunks': 3, 'collection_name': 'video_transcripts', 'embedding_model': 'sentence_transformer', 'average_chunk_size': np.float64(380.3333333333333)}

🎬 FRAME EXTRACT

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/fastapi/applications.py", line 1134, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/error

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://b8f755e56b0d1f440b.gradio.live


In [None]:
# In a Colab cell, create .env template
env_template = """
API_KEY=your_api_key_here
SECRET_KEY=your_secret_here
DATABASE_URL=your_database_url
"""

with open('.env.template', 'w') as f:
    f.write(env_template)

In [None]:
# This creates your actual .env file with real keys (but DON'T commit this!)
real_env = """
API_KEY=your_actual_secret_key_12345
SECRET_KEY=your_actual_secret_67890
DATABASE_URL=your_actual_database_url
"""

with open('.env', 'w') as f:
    f.write(real_env)

In [None]:
gitignore_content = """
# Secrets
.env
*.env
secrets.json
config.json

# Other files...
"""
with open('.gitignore', 'w') as f:
    f.write(gitignore_content)