In [None]:
#1. Install necessary libraries
!pip install -q "openai>=1.0.0" "qdrant-client[fastembed]" langchain langchain-openai yt-dlp langchain-community pydub

In [None]:
 #2. Import libraries and set up the API key
import os
import re
import json
import time
import math
import yt_dlp
import collections
from google.colab import userdata
from openai import OpenAI
from pydub import AudioSegment
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Qdrant

In [None]:
# Securely get the API key from Colab Secrets
try:
    os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
    client = OpenAI()
    print("OpenAI client initialized.")
except (ImportError, KeyError):
    print(" OpenAI API key not found. Please add it to Colab Secrets.")
    exit()

OpenAI client initialized.


In [None]:
# Define the Segment structure once for clarity and reuse
Segment = collections.namedtuple("Segment", ["start", "end", "text"])

def get_video_id(url: str) -> str | None:
    """Extracts the YouTube video ID from a URL."""
    match = re.search(r"(?:v=|\/|embed\/|youtu\.be\/)([a-zA-Z0-9_-]{11})", url)
    return match.group(1) if match else None

def format_time(seconds: float) -> str:
    """Formats seconds into MM:SS format."""
    minutes = int(seconds // 60)
    seconds = int(seconds % 60)
    return f"{minutes:02d}:{seconds:02d}"

In [None]:
def get_transcript_segments(video_url: str):
    CACHE_DIR = "transcript_cache_en"
    TXT_OUTPUT_DIR = "text_transcripts_en"
    os.makedirs(CACHE_DIR, exist_ok=True)
    os.makedirs(TXT_OUTPUT_DIR, exist_ok=True)

    video_id = get_video_id(video_url)
    if not video_id:
        raise ValueError("Could not extract video ID from URL.")

    cache_file_path = os.path.join(CACHE_DIR, f"{video_id}.json")

    if os.path.exists(cache_file_path):
        try:
            print(f" Loading translated transcript from cache for video ID: {video_id}")
            with open(cache_file_path, 'r', encoding='utf-8') as f:
                cached_data = json.load(f)
            if not cached_data:
                 raise json.JSONDecodeError("Cache file is empty", "", 0)
            return [Segment(s['start'], s['end'], s['text']) for s in cached_data]
        except json.JSONDecodeError:
            print(f" Corrupted cache file found for {video_id}. Deleting and re-processing.")
            os.remove(cache_file_path)

    print(f" No cache found. Processing video for translation: {video_id}")
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3'}],
        'outtmpl': f'{video_id}_temp_audio'
    }
    audio_file_path = f"{video_id}_temp_audio.mp3"

    start_download_time = time.perf_counter()
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
    end_download_time = time.perf_counter()
    print(f" Audio download time: {end_download_time - start_download_time:.2f} seconds")

    MAX_FILE_SIZE_MB = 24
    file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
    serializable_segments = []

    if file_size_mb > MAX_FILE_SIZE_MB:
        print(f" Audio file is large ({file_size_mb:.2f} MB). Splitting into chunks...")
        audio = AudioSegment.from_mp3(audio_file_path)
        num_chunks = math.ceil(file_size_mb / MAX_FILE_SIZE_MB)
        chunk_length_ms = math.ceil(len(audio) / num_chunks)
        time_offset_s = 0

        for i in range(num_chunks):
            start_ms = i * chunk_length_ms
            end_ms = start_ms + chunk_length_ms
            chunk = audio[start_ms:end_ms]
            chunk_file_path = f"{video_id}_chunk_{i}.mp3"
            chunk.export(chunk_file_path, format="mp3")

            print(f"Translating chunk {i+1}/{num_chunks}...")
            start_chunk_time = time.perf_counter()
            with open(chunk_file_path, "rb") as chunk_file_obj:
                translation_response = client.audio.translations.create(
                    model="whisper-1",
                    file=chunk_file_obj,
                    response_format="verbose_json"
                )

            response_segments = translation_response.segments
            for segment in response_segments:
                new_segment_dict = {
                    'start': segment.start + time_offset_s,
                    'end': segment.end + time_offset_s,
                    'text': segment.text
                }
                serializable_segments.append(new_segment_dict)

            end_chunk_time = time.perf_counter()
            print(f" Chunk {i+1} translation time: {end_chunk_time - start_chunk_time:.2f} seconds")

            if response_segments:
                 time_offset_s = serializable_segments[-1]['end']
            os.remove(chunk_file_path)
    else:
        print("Translating with OpenAI Whisper API...")
        start_translation_time = time.perf_counter()
        with open(audio_file_path, "rb") as audio_file_obj:
            translation_response = client.audio.translations.create(
                model="whisper-1",
                file=audio_file_obj,
                response_format="verbose_json"
            )

        serializable_segments = [
            {'start': s.start, 'end': s.end, 'text': s.text}
            for s in translation_response.segments
        ]

        end_translation_time = time.perf_counter()
        print(f" Translation time: {end_translation_time - start_translation_time:.2f} seconds")

    os.remove(audio_file_path)
    print("Translation complete and audio file cleaned up.")

    with open(cache_file_path, 'w', encoding='utf-8') as f:
        json.dump(serializable_segments, f, indent=2)
    print(f" Saved translated transcript to cache: {cache_file_path}")

    txt_file_path = os.path.join(TXT_OUTPUT_DIR, f"{video_id}.txt")
    with open(txt_file_path, 'w', encoding='utf-8') as f:
        for segment in serializable_segments:
            timestamp = format_time(segment['start'])
            f.write(f"[{timestamp}] {segment['text'].strip()}\n")
    print(f" Saved human-readable translated transcript to: {txt_file_path}")

    return [Segment(s['start'], s['end'], s['text']) for s in serializable_segments]


In [None]:
def group_segments_into_documents(segments, max_duration_seconds=90):
    documents = []
    current_text = ""
    if not segments: return []
    current_start = segments[0].start

    for i, segment in enumerate(segments):
        current_text += segment.text + " "
        duration = segment.end - current_start
        if duration >= max_duration_seconds or i == len(segments) - 1:
            documents.append(Document(
                page_content=current_text.strip(),
                metadata={'start': current_start, 'end': segment.end}
            ))
            current_text = ""
            if i < len(segments) - 1:
                current_start = segments[i+1].start
    return documents

In [None]:
def setup_rag_pipeline(video_url: str):
    raw_segments = get_transcript_segments(video_url)
    documents = group_segments_into_documents(raw_segments)
    print(f"\nGrouped transcript into {len(documents)} documents.")

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
    chunks = text_splitter.split_documents(documents)
    print(f"Created {len(chunks)} chunks for the vector store.")

    embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small")
    vector_store = Qdrant.from_documents(
        chunks, embeddings_model, location=":memory:", collection_name="video_rag_session"
    )
    print("✅ Vector store is ready to be queried.")
    return vector_store

In [None]:
def answer_query(query: str, vector_store, video_url: str):
    print(f"\n🔍 Searching for content related to: '{query}'")
    start_retrieval_time = time.perf_counter()
    retrieved_docs = vector_store.similarity_search(query, k=5)
    end_retrieval_time = time.perf_counter()
    print(f"⏱️ Retrieval time: {end_retrieval_time - start_retrieval_time:.4f} seconds")

    context = "\n\n".join([doc.page_content for doc in retrieved_docs])
    sources = [doc.metadata for doc in retrieved_docs]

    prompt = f"Answer the user's query based ONLY on the following context. If the context doesn't contain the answer, say so.\n\nContext:\n{context}\n\nQuery:\n{query}\n\nAnswer:"
    response = client.chat.completions.create(model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}])
    answer = response.choices[0].message.content

    print("\n Answer ")
    print(answer)
    print("\n Sources from Video ")
    video_base_url = video_url.split('&')[0]
    for i, source_meta in enumerate(sources):
        start_time = format_time(source_meta['start'])
        timestamp_link = f"{video_base_url}&t={int(source_meta['start'])}s"
        print(f"Source {i+1} (starts at {start_time}): {timestamp_link}")

In [None]:
# Replace with the YouTube video you want to query
YOUTUBE_URL = "https://www.youtube.com/watch?v=l99lcuNhVrI"
# This will download, transcribe (if not cached), and create the vector store.
print(f"Starting setup for video: {YOUTUBE_URL}")
video_vector_store = setup_rag_pipeline(YOUTUBE_URL)

print("\n" + "="*50)
print(" Setup complete. You can now run the next cell to ask questions.")
print("="*50)

In [None]:
answer_query(
        "what is the video about",
        video_vector_store,
        YOUTUBE_URL
    )