In [1]:
# Example usage
from audio_downloader import download_youtube_audio


youtube_url = "https://www.youtube.com/watch?v=8kMaTybvDUw"  # Replace with actual URL

audio_file = download_youtube_audio(youtube_url)

if audio_file:
    print(f"Audio downloaded successfully: {audio_file}")
else:
    print("Failed to download audio")

[youtube] Extracting URL: https://www.youtube.com/watch?v=8kMaTybvDUw
[youtube] 8kMaTybvDUw: Downloading webpage
[youtube] 8kMaTybvDUw: Downloading tv client config
[youtube] 8kMaTybvDUw: Downloading tv player API JSON
[youtube] 8kMaTybvDUw: Downloading ios player API JSON
[youtube] 8kMaTybvDUw: Downloading m3u8 information
[Downloader] Title: 12-Factor Agents: Patterns of reliable LLM applications — Dex Horthy, HumanLayer
[Downloader] Duration: 17:05
[Downloader] Metadata saved to: ./metadata\12-Factor_Agents_Patterns_of_reliable_LLM_applicat.json
[Downloader] metadata: {'title': '12-Factor Agents: Patterns of reliable LLM applications\xa0—\xa0Dex Horthy, HumanLayer', 'url': 'https://www.youtube.com/watch?v=8kMaTybvDUw', 'uploader': 'AI Engineer', 'duration': 1025, 'upload_date': '20250703'}
[youtube] Extracting URL: https://www.youtube.com/watch?v=8kMaTybvDUw
[youtube] 8kMaTybvDUw: Downloading webpage
[youtube] 8kMaTybvDUw: Downloading tv client config
[youtube] 8kMaTybvDUw: Download

In [4]:
import os

from audio_transcriber import format_and_save_transcription, transcribe_webm_directly
# Main execution
wav_file = r"MultiModalRAG\downloads\12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer.wav"
    
# Check if file exists (handle the unicode characters in filename)
if not os.path.exists(wav_file):
    # Try to find WebM files in downloads directory
    downloads_dir = "downloads"
    if os.path.exists(downloads_dir):
        wav_files = [f for f in os.listdir(downloads_dir) if f.endswith('.wav')]
        if wav_files:
            wav_file = os.path.join(downloads_dir, wav_files[0])
            print(f"Found WebM file: {wav_files[0]}")
        else:
            print("❌ No WebM files found in downloads directory")
            exit(1)
    else:
        print("❌ Downloads directory not found")
        exit(1)

# Transcribe the WebM file
result = transcribe_webm_directly(wav_file)

if result:
    # Format and save
    formatted_result = format_and_save_transcription(result, wav_file)
    print("\n🎉 Transcription completed successfully!")
    print("You can now proceed to the next step: creating embeddings for RAG!")
else:
    print("❌ Transcription failed")

Found WebM file: 12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer.wav
✅ Found WebM file: 12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer.wav
File size: 187.74 MB
Loading Whisper model...
✅ Model loaded
Transcribing WebM file (this may take a few minutes)...
Detected language: English


100%|██████████| 102530/102530 [39:27<00:00, 43.30frames/s]


✅ Transcription completed!
✅ Transcription saved to:
   JSON: ./transcripts\12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer_transcription.json
   Text: ./transcripts\12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer_transcript.txt

Transcription Summary:
Language: en
Duration: 1017.56 seconds
Segments: 351
Words: ~3720

First few segments:
1. [0:00:14] Who here's building agents?...
2. [0:00:17] Who here's, leave your hand up if you've built like 10 plus agents....
3. [0:00:21] Anyone here built like 100 agents?...

🎉 Transcription completed successfully!
You can now proceed to the next step: creating embeddings for RAG!


In [7]:
# Load metadata
import json

from chunker import process_transcription_for_rag_langchain, save_chunks_for_vectordb


metadata_file_path = "metadata/12-Factor_Agents_Patterns_of_reliable_LLM_applicat.json"
with open(metadata_file_path, 'r', encoding='utf-8') as f:
    video_metadata = json.load(f)

# Process transcription
transcription_file = r"transcripts\12-Factor Agents： Patterns of reliable LLM applications — Dex Horthy, HumanLayer_transcription.json"

if os.path.exists(transcription_file):
    chunks = process_transcription_for_rag_langchain(transcription_file, video_metadata)
    
    # Save for vector database
    save_chunks_for_vectordb(chunks, "./vectordb_data/video_chunks.json")
    
    # Show example chunk
    if chunks:
        print(f"\nExample chunk:")
        example_chunk = chunks[0]
        print(f"Text: {example_chunk['text'][:200]}...")
        print(f"Embedding dimension: {len(example_chunk['embedding'])}")
else:
    print(f"Transcription file not found: {transcription_file}")

Created 24 chunks using LangChain
Loading embedding model...
Creating embeddings...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]


Chunking Summary:
Total chunks: 24
Average words per chunk: 180.4
Chunks saved to: ./vectordb_data/video_chunks.json

Example chunk:
Text: Who here's building agents? Who here's, leave your hand up if you've built like 10 plus agents. Anyone here built like 100 agents? All right, we got a few. Awesome. Love it. So I think a lot of us hav...
Embedding dimension: 384


In [5]:

# Step 1: Add chunks to ChromaDB
from qa import add_chunks_to_chromadb, answer_question, search_videos


chunks_file = "./vectordb_data/video_chunks.json"
import os
if os.path.exists(chunks_file):
    collection = add_chunks_to_chromadb(chunks_file)
    
    # Step 2: Test search
    query = "What are the main points?"
    results = search_videos(query)
    
    # print(f"\nSearch results for: '{query}'")
    # for i, doc in enumerate(results['documents'][0]): # type: ignore
    #     metadata = results['metadatas'][0][i] # type: ignore
    #     print(f"\n{i+1}. From: {metadata['video_title']}")
    #     print(f"Text: {doc[:200]}...")
    
    # Step 3: Test Q&A
    answer = answer_question(query) 
    print(f"\nAnswer:\n{answer}")

else:
    print(f"Chunks file not found: {chunks_file}")
    print("Please run the chunking step first.")

Using existing collection: youtube_videos
Added 24 chunks to ChromaDB
Using existing collection: youtube_videos
Using existing collection: youtube_videos

Answer:
The main points are about building reliable agents and using frameworks to serve the needs of good builders. The speaker also discusses the 12 factors of building agents and how they can be applied to the practice of building reliable agents. Additionally, the speaker mentions the importance of turning sentences into JSON and the potential harm of using the "go to" abstraction in programming languages.


In [8]:
query = "The speaker mentions 12 factors, can you tell me the top 2 factors? Please provide a detailed answer with examples from the video."
answer = answer_question(query) 
print(f"\nAnswer:\n{answer}")

Using existing collection: youtube_videos

Answer:
The top 2 factors mentioned by the speaker are factor one and factor four. 

Factor one is about the ability of LLMs (Language Model Managers) to turn a sentence into JSON. This factor highlights the power and flexibility of LLMs in handling data and converting it into a usable format. The speaker mentions that this factor is the most magical thing that LLMs can do and it has nothing to do with loops, switch statements, or code. This factor is essential for building reliable and efficient LLM applications. An example of this factor in action would be using an LLM to convert a sentence like "I am going to the store" into JSON format, which could look like {"subject": "I", "verb": "am going", "object": "store"}. This conversion can be done quickly and accurately by an LLM, making it a valuable factor for building LLM applications.

Factor four is about the use of abstractions in programming languages and the potential harm they can cause

In [None]:
import os
import uuid
import json
from audio_downloader import download_youtube_audio
from audio_transcriber import transcribe_webm_directly, format_and_save_transcription
from chunker import process_transcription_for_rag_langchain, save_chunks_for_vectordb
from qa import add_chunks_to_chromadb, answer_question

class YouTubeRAGPipeline:
    def __init__(self, base_dir="./rag_data"):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)
    
    def process_video(self, youtube_url):
        """Complete pipeline for a YouTube video"""
        
        # Generate unique ID for this video
        video_uuid = str(uuid.uuid4())[:8]
        print(f"Processing video with UUID: {video_uuid}")
        
        # Create directories for this video
        video_dir = os.path.join(self.base_dir, video_uuid)
        downloads_dir = os.path.join(video_dir, "downloads")
        metadata_dir = os.path.join(video_dir, "metadata")
        transcripts_dir = os.path.join(video_dir, "transcripts")
        chunks_dir = os.path.join(video_dir, "chunks")
        
        try:
            # Step 1: Download audio
            print("\n=== Step 1: Downloading Audio ===")
            audio_file = download_youtube_audio(
                youtube_url, 
                output_path=downloads_dir, 
                metadata_path=metadata_dir
            )
            
            if not audio_file:
                print("Failed to download audio")
                return None
            
            # Step 2: Transcribe audio
            print("\n=== Step 2: Transcribing Audio ===")
            result = transcribe_webm_directly(audio_file)
            
            if not result:
                print("Failed to transcribe audio")
                return None
            
            # Save transcription
            os.makedirs(transcripts_dir, exist_ok=True)
            base_name = os.path.splitext(os.path.basename(audio_file))[0]
            transcription_file = os.path.join(transcripts_dir, f"{base_name}_transcription.json")
            
            formatted_result = format_and_save_transcription(result, audio_file)
            
            # Move transcription to correct location
            original_transcript = f"./transcripts/{base_name}_transcription.json"
            if os.path.exists(original_transcript):
                os.rename(original_transcript, transcription_file)
            
            # Step 3: Load metadata
            print("\n=== Step 3: Loading Metadata ===")
            metadata_files = [f for f in os.listdir(metadata_dir) if f.endswith('.json')]
            if not metadata_files:
                print("No metadata file found")
                return None
            
            metadata_file = os.path.join(metadata_dir, metadata_files[0])
            with open(metadata_file, 'r', encoding='utf-8') as f:
                video_metadata = json.load(f)
            
            # Add UUID to metadata
            video_metadata['uuid'] = video_uuid
            
            # Step 4: Create chunks
            print("\n=== Step 4: Creating Chunks ===")
            os.makedirs(chunks_dir, exist_ok=True)
            
            chunks = process_transcription_for_rag_langchain(transcription_file, video_metadata)
            chunks_file = os.path.join(chunks_dir, "video_chunks.json")
            save_chunks_for_vectordb(chunks, chunks_file)
            
            # Step 5: Add to ChromaDB
            print("\n=== Step 5: Adding to Vector Database ===")
            add_chunks_to_chromadb(chunks_file)
            
            print(f"\n✅ Successfully processed video!")
            print(f"Video UUID: {video_uuid}")
            print(f"Title: {video_metadata['title']}")
            print(f"Data saved in: {video_dir}")
            
            return {
                'uuid': video_uuid,
                'metadata': video_metadata,
                'audio_file': audio_file,
                'transcription_file': transcription_file,
                'chunks_file': chunks_file
            }
            
        except Exception as e:
            print(f"Error in pipeline: {str(e)}")
            return None
    
    def ask_question(self, question):
        """Ask a question across all processed videos"""
        return answer_question(question)
    
    def list_videos(self):
        """List all processed videos"""
        videos = []
        if not os.path.exists(self.base_dir):
            return videos
        
        for video_uuid in os.listdir(self.base_dir):
            video_dir = os.path.join(self.base_dir, video_uuid)
            metadata_dir = os.path.join(video_dir, "metadata")
            
            if os.path.exists(metadata_dir):
                metadata_files = [f for f in os.listdir(metadata_dir) if f.endswith('.json')]
                if metadata_files:
                    metadata_file = os.path.join(metadata_dir, metadata_files[0])
                    with open(metadata_file, 'r', encoding='utf-8') as f:
                        metadata = json.load(f)
                    videos.append({
                        'uuid': video_uuid,
                        'title': metadata.get('title', 'Unknown'),
                        'uploader': metadata.get('uploader', 'Unknown'),
                        'url': metadata.get('url', '')
                    })
        
        return videos

    # Initialize pipeline
pipeline = YouTubeRAGPipeline()

# Process a video
youtube_url = "https://www.youtube.com/watch?v=w5unVTO7mLQ"

# result = pipeline.process_video(youtube_url)

if result:
    print("\n" + "="*50)
    print("PIPELINE COMPLETED SUCCESSFULLY!")
    print("="*50)
    
    # Interactive Q&A
    while True:
        question = input("\nAsk a question (or 'quit' to exit): ")
        if question.lower() == 'quit':
            break
        
        answer = pipeline.ask_question(question)
        print(f"\nAnswer: {answer}")

# Show all videos
print("\n=== All Processed Videos ===")
videos = pipeline.list_videos()
for i, video in enumerate(videos, 1):
    print(f"{i}. {video['title']} (UUID: {video['uuid']})")



Using existing collection: youtube_videos

Answer: The video is a tutorial on how to make an offline GPT voice assistant in Python. The speaker goes through the steps of setting up the necessary libraries and models, and demonstrates how to use the voice assistant to perform tasks such as taking a screenshot, opening a specific website, and asking and answering questions using a language model. The video also mentions future videos on using machine learning to solve problems.
Using existing collection: youtube_videos

Answer: Python, GPT, LLM, chat GPT, open AI

=== All Processed Videos ===
1. Make an Offline GPT Voice Assistant in Python (UUID: e19e81b0)


In [4]:
pipeline = YouTubeRAGPipeline()
import os
# Process a video
youtube_url = "https://www.youtube.com/watch?v=w5unVTO7mLQ"

# Check if video already processed
videos = pipeline.list_videos()
existing_video = None
for video in videos:
    if video['url'] == youtube_url:
        existing_video = video
        break

if existing_video:
    print(f"✅ Video already processed!")
    print(f"UUID: {existing_video['uuid']}")
    print(f"Title: {existing_video['title']}")
    print("Loading existing data into ChromaDB...")
    
    # Load existing chunks into ChromaDB
    chunks_file = f"./rag_data/{existing_video['uuid']}/chunks/video_chunks.json"
    if os.path.exists(chunks_file):
        from qa import add_chunks_to_chromadb
        add_chunks_to_chromadb(chunks_file)
        print("✅ Data loaded successfully!")
    else:
        print("❌ Chunks file not found")
        
else:
    print("🔄 Video not found. Processing new video...")
    result = pipeline.process_video(youtube_url)
    if not result:
        print("❌ Failed to process video")
        exit()
    print(f"✅ Video processed successfully!")
    print(f"UUID: {result['uuid']}")

print("\n" + "="*50)
print("Ready for Q&A!")
print("="*50)

while True:
    question = input("\nAsk a question (or 'quit' to exit): ")
    if question.lower() == 'quit':
        break
        
    answer = pipeline.ask_question(question)
    print(f"\nQuestion: {question}")
    print(f"Answer: {answer}")

✅ Video already processed!
UUID: e19e81b0
Title: Make an Offline GPT Voice Assistant in Python
Loading existing data into ChromaDB...
Using existing collection: youtube_videos
Added 27 chunks to ChromaDB
✅ Data loaded successfully!

Ready for Q&A!
Using existing collection: youtube_videos

Question: what is this video about?
Answer: The video is about creating an offline GPT voice assistant in Python.
Using existing collection: youtube_videos

Question: how many people are speaking?
Answer: One person is speaking in the video transcript.
Using existing collection: youtube_videos

Question: for speech recogniton what is getting used?
Answer: For speech recognition, the speech recognition library is being used in Python.
Using existing collection: youtube_videos

Question: for speech recogniton what is geting used?
Answer: For speech recognition, the speech recognition package in Python is being used.


In [None]:
import re
from youtube_transcript_api._api import YouTubeTranscriptApi
import json
import os
from datetime import timedelta

def extract_video_id(youtube_url):
    """Extract video ID from YouTube URL"""
    patterns = [
        r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
        r'(?:embed\/)([0-9A-Za-z_-]{11})',
        r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})'
    ]
    
    for pattern in patterns:
        match = re.search(pattern, youtube_url)
        if match:
            return match.group(1)
    return None

def get_youtube_transcript_fast(youtube_url):
    """
    Fast transcript extraction using YouTube's existing captions
    Falls back to None if no captions available
    """
    
    # Extract video ID
    video_id = extract_video_id(youtube_url)
    if not video_id:
        print("❌ Invalid YouTube URL")
        return None
    
    try:
        print(f"🔍 Fetching transcript for video ID: {video_id}")
        
        # Try to get transcript (auto-generated or manual)
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # Prefer manual captions over auto-generated
        transcript = None
        try:
            # Try manual captions first
            transcript = transcript_list.find_manually_created_transcript(['en'])
            print("✅ Found manual captions")
        except:
            try:
                # Fall back to auto-generated
                transcript = transcript_list.find_generated_transcript(['en'])
                print("✅ Found auto-generated captions")
            except:
                # Try any available language
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"✅ Found transcript in {transcript.language}")
        
        if not transcript:
            print("❌ No transcripts available")
            return None
        
        # Fetch the transcript
        transcript_data = transcript.fetch()
        
        # Format the data
        formatted_transcript = {
            'language': transcript.language,
            'is_generated': transcript.is_generated,
            'segments': [],
            'full_text': '',
            'total_duration': 0
        }
        
        full_text_parts = []
        
        for item in transcript_data:
            segment = {
                'start_time': str(timedelta(seconds=int(item.start))),
                'end_time': str(timedelta(seconds=int(item.start + item.duration))),
                'start_seconds': item.start,
                'end_seconds': item.start + item.duration,
                'text': item.text.strip(),
                'duration': item.duration
            }
            
            formatted_transcript['segments'].append(segment)
            full_text_parts.append(item.text)
        
        formatted_transcript['full_text'] = ' '.join(full_text_parts)
        if formatted_transcript['segments']:
            formatted_transcript['total_duration'] = formatted_transcript['segments'][-1]['end_seconds']
        
        print(f"✅ Transcript extracted successfully!")
        print(f"Language: {formatted_transcript['language']}")
        print(f"Generated: {formatted_transcript['is_generated']}")
        print(f"Duration: {formatted_transcript['total_duration']:.1f} seconds")
        print(f"Segments: {len(formatted_transcript['segments'])}")
        
        return formatted_transcript
        
    except Exception as e:
        print(f"❌ Error fetching transcript: {str(e)}")
        return None

def save_fast_transcript(transcript_data, youtube_url, output_dir="./transcripts"):
    """Save the fast transcript to JSON file"""
    
    if not transcript_data:
        return None
    
    os.makedirs(output_dir, exist_ok=True)
    
    # Create filename from video ID
    video_id = extract_video_id(youtube_url)
    filename = f"fast_transcript_{video_id}.json"
    filepath = os.path.join(output_dir, filename)
    
    # Add source info
    transcript_data['source'] = 'youtube_captions'
    transcript_data['video_url'] = youtube_url
    transcript_data['video_id'] = video_id
    
    # Save to file
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(transcript_data, f, indent=2, ensure_ascii=False)
    
    print(f"💾 Transcript saved to: {filepath}")
    return filepath

def get_video_metadata_fast(youtube_url):
    """
    Get basic video metadata without downloading
    Uses yt-dlp extract_info with download=False
    """
    import yt_dlp  # type: ignore
    
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
    }
    
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url, download=False)
            
            metadata = {
                'title': info.get('title', 'Unknown'),
                'url': youtube_url,
                'uploader': info.get('uploader', 'Unknown'),
                'duration': info.get('duration', 0),
                'upload_date': info.get('upload_date', ''),
                'view_count': info.get('view_count', 0),
                'description': info.get('description', '')[:500] + '...' if info.get('description') else ''
            }
            
            return metadata
            
    except Exception as e:
        print(f"❌ Error getting metadata: {str(e)}")
        return None

# Combined fast processing function
def process_youtube_fast(youtube_url):
    """
    Fast processing: Get transcript + metadata without downloading audio
    """
    
    print("🚀 Fast Processing Mode")
    print("="*50)
    
    # Step 1: Get transcript (fast)
    print("\n📝 Step 1: Getting transcript...")
    transcript_data = get_youtube_transcript_fast(youtube_url)
    
    if not transcript_data:
        print("❌ No captions available. Would need to use Whisper (slow mode).")
        return None, None
    
    # Step 2: Get metadata (fast)
    print("\n📊 Step 2: Getting metadata...")
    metadata = get_video_metadata_fast(youtube_url)
    
    if not metadata:
        print("❌ Could not get video metadata")
        return None, None
    
    # Step 3: Save transcript
    print("\n💾 Step 3: Saving transcript...")
    transcript_file = save_fast_transcript(transcript_data, youtube_url)
    
    print("\n✅ Fast processing complete!")
    print(f"Time saved: ~5-10 minutes vs Whisper approach")
    
    return transcript_data, metadata
# Example usage
youtube_url = "https://www.youtube.com/watch?v=w5unVTO7mLQ"

transcript_data, metadata = process_youtube_fast(youtube_url)

if transcript_data and metadata:
    print(f"\n📋 Summary:")
    print(f"Title: {metadata['title']}")
    print(f"Duration: {transcript_data['total_duration']:.1f} seconds")
    print(f"Language: {transcript_data['language']}")
    print(f"Word count: ~{len(transcript_data['full_text'].split())}")
    print(f"First 100 chars: {transcript_data['full_text'][:100]}...")
else:
    print("❌ Fast processing failed - would need Whisper fallback")

🚀 Fast Processing Mode

📝 Step 1: Getting transcript...
🔍 Fetching transcript for video ID: w5unVTO7mLQ
✅ Found manual captions
✅ Transcript extracted successfully!
Language: English
Generated: False
Duration: 1468.5 seconds
Segments: 899

📊 Step 2: Getting metadata...


ModuleNotFoundError: No module named 'yt_dlp'

In [2]:
import json

def debug_transcript_structure(transcription_file: str):
    """Debug function to see what's actually in the transcript file"""
    
    print(f"=== DEBUGGING TRANSCRIPT: {transcription_file} ===")
    
    with open(transcription_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    print("📋 Top-level keys:")
    for key in data.keys():
        print(f"  - {key}: {type(data[key])}")
    
    print(f"\n📝 Full text preview:")
    full_text = data.get('full_text', 'NOT FOUND')
    print(f"  Length: {len(full_text) if full_text != 'NOT FOUND' else 'N/A'}")
    print(f"  Preview: {full_text[:100] if full_text != 'NOT FOUND' else 'N/A'}...")
    
    print(f"\n🎬 Segments info:")
    segments = data.get('segments', [])
    print(f"  Total segments: {len(segments)}")
    
    if segments:
        print(f"  First segment keys: {list(segments[0].keys())}")
        print(f"  First segment: {segments[0]}")
        
        print(f"\n  Sample of first 3 segments:")
        for i, segment in enumerate(segments[:3]):
            print(f"    {i+1}. {segment}")
    else:
        print("  ❌ NO SEGMENTS FOUND!")
    
    return data

# Test with your transcript file
if __name__ == "__main__":
    # Replace with your actual transcript file path
    transcript_file = r"rag_data\eb4371eb\transcripts\transcript.json"
    
    try:
        debug_transcript_structure(transcript_file)
    except FileNotFoundError:
        print(f"❌ File not found: {transcript_file}")
        print("Please update the path to your actual transcript file")
    except Exception as e:
        print(f"❌ Error: {e}")

=== DEBUGGING TRANSCRIPT: rag_data\eb4371eb\transcripts\transcript.json ===
📋 Top-level keys:
  - language: <class 'str'>
  - full_text: <class 'str'>
  - segments: <class 'list'>
  - total_duration: <class 'float'>
  - source_type: <class 'str'>

📝 Full text preview:
  Length: 27364
  Preview: In this video, we're going to make an offline virtual assistant that uses a local LLM, just like cha...

🎬 Segments info:
  Total segments: 899
  First segment keys: ['start_time', 'end_time', 'start_seconds', 'end_seconds', 'text', 'duration']
  First segment: {'start_time': '0:00:00', 'end_time': '0:00:01', 'start_seconds': 0.016, 'end_seconds': 1.733, 'text': "In this video, we're going to make an", 'duration': 1.717}

  Sample of first 3 segments:
    1. {'start_time': '0:00:00', 'end_time': '0:00:01', 'start_seconds': 0.016, 'end_seconds': 1.733, 'text': "In this video, we're going to make an", 'duration': 1.717}
    2. {'start_time': '0:00:01', 'end_time': '0:00:03', 'start_seconds': 1.733

In [3]:
import chromadb
import json
from typing import List, Dict
from dotenv import load_dotenv
load_dotenv()

def setup_chromadb(collection_name="youtube_videos"):
    """Initialize ChromaDB client and collection"""
    client = chromadb.PersistentClient(path="./chroma_db")
    
    # Get or create collection
    try:
        collection = client.get_collection(collection_name)
        print(f"Using existing collection: {collection_name}")
    except:
        collection = client.create_collection(collection_name)
        print(f"Created new collection: {collection_name}")
    
    return collection

def add_chunks_to_chromadb(chunks_file: str, collection_name="youtube_videos"):
    """Load chunks and add them to ChromaDB with citation info"""
    
    # Load chunks
    with open(chunks_file, 'r', encoding='utf-8') as f:
        chunks = json.load(f)
    
    # Setup ChromaDB
    collection = setup_chromadb(collection_name)
    
    # Prepare data for ChromaDB
    ids = []
    embeddings = []
    documents = []
    metadatas = []
    
    for chunk in chunks:
        ids.append(chunk['unique_id'])
        embeddings.append(chunk['embedding'])
        documents.append(chunk['text'])
        
        # Metadata with citation tracking info
        metadata = {
            'chunk_id': chunk['chunk_id'],
            'video_title': chunk['video_title'],
            'video_url': chunk['video_url'],
            'uploader': chunk['uploader'],
            'word_count': chunk['word_count'],
            
            # Citation tracking fields - IMPORTANT: Convert to strings for ChromaDB
            'start_time': chunk.get('start_time', '0:00:00'),
            'end_time': chunk.get('end_time', '0:00:00'),
            'start_seconds': float(chunk.get('start_seconds', 0)),
            'end_seconds': float(chunk.get('end_seconds', 0)),
            'duration': float(chunk.get('duration', 0))
        }
        metadatas.append(metadata)
    
    # Add to ChromaDB
    collection.add(
        ids=ids,
        embeddings=embeddings,
        documents=documents,
        metadatas=metadatas
    )
    
    print(f"Added {len(chunks)} chunks to ChromaDB with citation tracking")
    print(f"✅ First chunk citation example: {metadatas[0]['start_time']} - {metadatas[0]['end_time']}")
    return collection

def search_videos(query: str, collection_name="youtube_videos", n_results=3):
    """Search for relevant video chunks"""
    
    # Setup ChromaDB
    collection = setup_chromadb(collection_name)
    
    # Search
    results = collection.query(
        query_texts=[query],
        n_results=n_results
    )
    
    return results

def answer_question_with_citations(query: str, collection_name="youtube_videos"):
    """Enhanced Q&A with proper citation tracking"""
    
    from langchain_openai import ChatOpenAI
    from langchain.prompts import PromptTemplate
    
    # Search for relevant chunks
    results = search_videos(query, collection_name, n_results=3)
    
    if not results['documents'][0]:
        return "No relevant information found."
    
    # Debug: Check what we got back
    print(f"\n🔍 Debug: Found {len(results['documents'][0])} results")
    if results['metadatas'][0]:
        first_meta = results['metadatas'][0][0]
        print(f"📍 First result metadata keys: {list(first_meta.keys())}")
        print(f"🕐 First result timestamps: {first_meta.get('start_time')} - {first_meta.get('end_time')}")
    
    # Prepare context with citation info
    context_chunks = []
    citations = []
    
    for i, doc in enumerate(results['documents'][0]):
        metadata = results['metadatas'][0][i]
        
        # Extract citation info safely
        video_title = metadata.get('video_title', 'Unknown Video')
        start_time = metadata.get('start_time', 'Unknown')
        end_time = metadata.get('end_time', 'Unknown') 
        video_url = metadata.get('video_url', '')
        start_seconds = metadata.get('start_seconds', 0)
        
        # Create citation with clickable timestamp link
        if video_url and start_seconds and start_time != 'Unknown':
            timestamped_url = f"{video_url}&t={int(start_seconds)}s"
            citation = f"Source {i+1}: '{video_title}' at {start_time}-{end_time} ({timestamped_url})"
        else:
            citation = f"Source {i+1}: '{video_title}' at {start_time}-{end_time}"
        
        citations.append(citation)
        context_chunks.append(f"[Source {i+1}] {doc}")
    
    context = "\n\n".join(context_chunks)
    
    # Generate answer with citations
    llm = ChatOpenAI(temperature=0)
    
    prompt = PromptTemplate(
        input_variables=["context", "question"],
        template="""
Based on the following video transcript context, answer the question and include source references.

Context:
{context}

Question: {question}

Answer with citations (use [Source X] format):"""
    )
    
    response = llm.invoke(prompt.format(context=context, question=query))
    answer = response.content.strip()
    
    # Append full citations
    full_response = f"{answer}\n\n" + "="*50 + "\nSources:\n" + "\n".join(citations)
    
    return full_response

def answer_question(query: str, collection_name="youtube_videos"):
    """Wrapper function to maintain compatibility"""
    return answer_question_with_citations(query, collection_name)

# Debug function to check what's in ChromaDB
def debug_chromadb_metadata(collection_name="youtube_videos"):
    """Debug function to see what metadata is actually stored"""
    
    collection = setup_chromadb(collection_name)
    
    # Get a few items to check metadata
    results = collection.get(limit=3)
    
    print(f"\n🔍 ChromaDB Debug for collection: {collection_name}")
    print(f"📊 Total items in collection: {collection.count()}")
    
    if results['metadatas']:
        print(f"📋 Metadata keys in first item: {list(results['metadatas'][0].keys())}")
        print(f"🕐 First item timestamps: {results['metadatas'][0].get('start_time')} - {results['metadatas'][0].get('end_time')}")
        
        for i, meta in enumerate(results['metadatas'][:3]):
            print(f"  Item {i+1}: {meta.get('start_time')} - {meta.get('end_time')} | {meta.get('video_title', 'No title')}")
    else:
        print("❌ No metadata found!")

# Example usage
if __name__ == "__main__":
    
    # Debug what's in ChromaDB
    debug_chromadb_metadata("video_eb4371eb")  # Replace with your actual collection name
    
    # Test a query
    query = "What version of whisper is used?"
    answer = answer_question_with_citations(query, "video_eb4371eb")
    print(f"\nAnswer: {answer}")

Using existing collection: video_eb4371eb

🔍 ChromaDB Debug for collection: video_eb4371eb
📊 Total items in collection: 35
📋 Metadata keys in first item: ['word_count', 'uploader', 'video_title', 'chunk_id', 'video_url']
🕐 First item timestamps: None - None
  Item 1: None - None | Make an Offline GPT Voice Assistant in Python
  Item 2: None - None | Make an Offline GPT Voice Assistant in Python
  Item 3: None - None | Make an Offline GPT Voice Assistant in Python
Using existing collection: video_eb4371eb

🔍 Debug: Found 3 results
📍 First result metadata keys: ['video_url', 'word_count', 'uploader', 'video_title', 'chunk_id']
🕐 First result timestamps: None - None

Answer: The version of whisper used in the video transcript is not explicitly mentioned. The video focuses on utilizing the whisper API from OpenAI for speech recognition [Source 3]. The whisper API is specifically mentioned as being from OpenAI, the company responsible for chat GPT [Source 3]. The video demonstrates how to t

In [1]:
import json

def test_timestamp_mapping():
    """Test the timestamp mapping function directly"""
    
    # Sample transcript data (like your format)
    sample_segments = [
        {
            'start_time': '0:00:00',
            'end_time': '0:00:01', 
            'start_seconds': 0.016,
            'end_seconds': 1.733,
            'text': "In this video, we're going to make an",
            'duration': 1.717
        },
        {
            'start_time': '0:00:01',
            'end_time': '0:00:03',
            'start_seconds': 1.733, 
            'end_seconds': 3.366,
            'text': 'offline virtual assistant',
            'duration': 1.633
        },
        {
            'start_time': '0:00:03',
            'end_time': '0:00:06',
            'start_seconds': 3.366,
            'end_seconds': 6.633, 
            'text': 'that uses a local LLM, just',
            'duration': 3.267
        }
    ]
    
    # Sample chunk text
    sample_chunk = "In this video, we're going to make an offline virtual assistant that uses a local LLM"
    
    print("🧪 Testing timestamp mapping function")
    print("="*50)
    print(f"📝 Sample chunk: {sample_chunk}")
    print(f"🎬 Number of segments: {len(sample_segments)}")
    
    # Import and test the function
    try:
        from chunker import find_chunk_timestamps
        
        result = find_chunk_timestamps(sample_chunk, sample_segments)
        
        print(f"\n✅ Function result: {result}")
        print(f"🕐 Start time: {result['start_time']}")
        print(f"🕐 End time: {result['end_time']}")
        print(f"⏱️  Start seconds: {result['start_seconds']}")
        print(f"⏱️  End seconds: {result['end_seconds']}")
        
        if result['start_time'] == '0:00:00' and result['end_time'] != '0:00:00':
            print("✅ SUCCESS: Timestamps mapped correctly!")
        else:
            print("❌ ISSUE: Timestamps not mapped correctly")
            
    except ImportError as e:
        print(f"❌ Import error: {e}")
    except Exception as e:
        print(f"❌ Function error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    test_timestamp_mapping()

🧪 Testing timestamp mapping function
📝 Sample chunk: In this video, we're going to make an offline virtual assistant that uses a local LLM
🎬 Number of segments: 3


  from .autonotebook import tqdm as notebook_tqdm



✅ Function result: {'start_time': '0:00:00', 'end_time': '0:00:06', 'start_seconds': 0.016, 'end_seconds': 6.633}
🕐 Start time: 0:00:00
🕐 End time: 0:00:06
⏱️  Start seconds: 0.016
⏱️  End seconds: 6.633
✅ SUCCESS: Timestamps mapped correctly!


In [4]:
# Quick test to see what's in your chunker.py
import inspect
from chunker import process_transcription_for_rag_langchain

print("🔍 Current chunker function source:")
print("="*60)

# Get the source code of the function
try:
    source = inspect.getsource(process_transcription_for_rag_langchain)
    
    # Check if it has citation tracking
    if 'start_time' in source and 'find_chunk_timestamps' in source:
        print("✅ Chunker HAS citation tracking")
    else:
        print("❌ Chunker does NOT have citation tracking")
    
    # Check if it has debug output
    if 'Debug timestamp mapping' in source:
        print("✅ Chunker HAS debug output")
    else:
        print("❌ Chunker does NOT have debug output")
        
    print(f"\nFunction length: {len(source)} characters")
    print("First few lines:")
    print("\n".join(source.split('\n')[:10]))
    
except Exception as e:
    print(f"Error: {e}")

  from .autonotebook import tqdm as notebook_tqdm


🔍 Current chunker function source:
✅ Chunker HAS citation tracking
✅ Chunker HAS debug output

Function length: 4900 characters
First few lines:
def process_transcription_for_rag_langchain(transcription_file: str, video_metadata: Dict) -> List[Dict]:
    """
    Enhanced pipeline with citation tracking - handles both fast and slow transcript formats
    
    Args:
        transcription_file: Path to transcription JSON file
        video_metadata: Video information from download
    
    Returns:
        List of processed chunks with citation tracking info
