<a href="https://colab.research.google.com/github/s11khushboo/youtube-QandA/blob/main/preprocessing-video.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install yt-dlp openai-whisper sentence-transformers pinecone



Collecting yt-dlp
  Downloading yt_dlp-2025.11.12-py3-none-any.whl.metadata (180 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m180.0/180.0 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting openai-whisper
  Downloading openai_whisper-20250625.tar.gz (803 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pinecone
  Downloading pinecone-8.0.0-py3-none-any.whl.metadata (11 kB)
Collecting pinecone-plugin-assistant<4.0.0,>=3.0.1 (from pinecone)
  Downloading pinecone_plugin_assistant-3.0.1-py3-none-any.whl.metadata (30 kB)
Collecting pinecone-plugin-interface<0.1.0,>=0.0.7 (from pinecone)
  Downloading pinecone_plugin_interface-0.0.7-py3-none-any.whl.metadata (1.2 kB)
Collecting packaging>=20.9 (from

In [27]:
# ingest.py (simplified)
from yt_dlp import YoutubeDL
import whisper
from sentence_transformers import SentenceTransformer
import pinecone
import uuid
import math
import time
from pinecone import Pinecone, ServerlessSpec
from urllib.parse import urlparse, parse_qs
from langchain_openai import OpenAI
from langchain_core.prompts import PromptTemplate  # pseudo imports



INDEX_NAME = "youtube-chunks"
EMBED_MODEL = "all-MiniLM-L6-v2"  # or OpenAI embeddings
WHISPER_MODEL = "small"



In [None]:
def download_audio(youtube_url, out_path="audio.mp3"):
    ydl_opts = {"format": "bestaudio/best", "outtmpl": out_path}
    # download audio
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([youtube_url])
        info = ydl.extract_info(youtube_url, download=False)
        title = info.get("title", None)
    return out_path ,title

In [28]:

def get_video_id(url: str):
    # Extract video ID
    parsed = urlparse(url)
    if "youtu.be" in parsed.hostname:
        video_id = parsed.path[1:]
    elif "watch" in parsed.path:
        video_id = parse_qs(parsed.query)["v"][0]
    elif parsed.path.startswith("/shorts/") or parsed.path.startswith("/embed/"):
        video_id = parsed.path.split("/")[2]
    else:
        raise ValueError("Unsupported YouTube URL format.")
    return video_id

In [44]:
# transcribe
def transcribe_whisper(audio_path):
    model = whisper.load_model(WHISPER_MODEL)
    result = model.transcribe(audio_path, task="transcribe")  # returns segments with timestamps
    return result

In [31]:

# chunking with overlap
def chunk_segments(segments, max_chars=1000, overlap_chars=200):
    chunks = []
    buffer = ""
    buffer_start = None
    buffer_end = None
    for seg in segments:
        text = seg["text"].strip()
        if not buffer:
            buffer_start = seg["start"]
        if len(buffer) + len(text) <= max_chars:
            buffer += (" " + text)
            buffer_end = seg["end"]
        else:
            chunks.append({
                "start": buffer_start, "end": buffer_end, "text": buffer.strip()
            })
            # start new buffer with overlap
            buffer = text[-overlap_chars:]
            buffer_start = seg["start"]
            buffer_end = seg["end"]
    if buffer:
        chunks.append({"start": buffer_start, "end": buffer_end, "text": buffer.strip()})
    return chunks

In [48]:
import os
from google.colab import userdata

os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
os.environ["PINECONE_KEY"] = userdata.get("PINECONE_KEY")


In [49]:


pc = Pinecone(api_key=os.environ["PINECONE_KEY"])

spec = ServerlessSpec(
    cloud="aws", region="us-east-1"
)

In [34]:
# embeddings (sentence-transformers)
embedder = SentenceTransformer(EMBED_MODEL)

def embed_texts(texts):
    return embedder.encode(texts, show_progress_bar=False).tolist()


index_name = "youtube-text-demo"


# check if index already exists (it shouldn't if this is first time)
if index_name not in pc.list_indexes().names():
    # if does not exist, create index
    pc.create_index(
        index_name,
        dimension=384,
        metric='cosine',
        spec=spec
    )
    # wait for index to be initialized
    while not pc.describe_index(index_name).status['ready']:
        time.sleep(1)

# connect to index
index = pc.Index(index_name)
# view index stats
index.describe_index_stats()



{'_response_info': {'raw_headers': {'connection': 'keep-alive',
                                    'content-length': '181',
                                    'content-type': 'application/json',
                                    'date': 'Wed, 26 Nov 2025 09:23:51 GMT',
                                    'grpc-status': '0',
                                    'server': 'envoy',
                                    'x-envoy-upstream-service-time': '52',
                                    'x-pinecone-request-id': '7836427583778180564',
                                    'x-pinecone-request-latency-ms': '51'}},
 'dimension': 384,
 'index_fullness': 0.0,
 'memoryFullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'__default__': {'vector_count': 8}},
 'storageFullness': 0.0,
 'total_vector_count': 8,
 'vector_type': 'dense'}

In [35]:

def upsert_chunks(video_id, title, chunks):
    texts = [c["text"] for c in chunks]
    embeddings = embed_texts(texts)  # this should be list of lists
    if hasattr(embeddings, "tolist"):
            embeddings = embeddings.tolist()
    vectors = []
    for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
        # ensure emb is a plain Python list
        if not isinstance(emb, list):
            emb = emb.tolist()

        # ensure metadata contains only serializable types
        metadata = {
            "video_id": video_id,
            "start_time": float(chunk["start"]),
            "end_time": float(chunk["end"]),
            "text": str(chunk["text"]),
            "title": str(title)
        }

        vectors.append(
            (f"{video_id}_chunk_{i}", emb, metadata)
        )

    # upsert all vectors
    index.upsert(vectors=vectors)
    print(f"Upserted {len(vectors)} chunks for video {video_id}")


In [37]:
def ingest_youtube_video(url):
    print("Downloading audio...")
    audio_path,title = download_audio(url)
    transcript = transcribe_whisper(audio_path)
    chunks=chunk_segments(transcript["segments"])
    video_id=get_video_id(url)
    upsert_chunks(video_id,title,chunks)
    return f"Successfully ingested video: {url}. Chunks: {len(chunks)}"

In [40]:
ingest_youtube_video("https://www.youtube.com/watch?v=67_aMPDk2zw")

Downloading audio...
[youtube] Extracting URL: https://www.youtube.com/watch?v=67_aMPDk2zw
[youtube] 67_aMPDk2zw: Downloading webpage




[youtube] 67_aMPDk2zw: Downloading android sdkless player API JSON
[youtube] 67_aMPDk2zw: Downloading web safari player API JSON




[youtube] 67_aMPDk2zw: Downloading m3u8 information




[info] 67_aMPDk2zw: Downloading 1 format(s): 251-9
[download] audio.mp3 has already been downloaded
[download] 100% of    4.35MiB
[youtube] Extracting URL: https://www.youtube.com/watch?v=67_aMPDk2zw
[youtube] 67_aMPDk2zw: Downloading webpage




[youtube] 67_aMPDk2zw: Downloading android sdkless player API JSON
[youtube] 67_aMPDk2zw: Downloading web safari player API JSON




[youtube] 67_aMPDk2zw: Downloading m3u8 information


100%|███████████████████████████████████████| 461M/461M [00:06<00:00, 69.9MiB/s]


Upserted 4 chunks for video 67_aMPDk2zw


'Successfully ingested video: https://www.youtube.com/watch?v=67_aMPDk2zw. Chunks: 4'

In [None]:
!pip show langchain

Name: langchain
Version: 1.0.8
Summary: Building applications with LLMs through composability
Home-page: https://docs.langchain.com/
Author: 
Author-email: 
License: MIT
Location: /usr/local/lib/python3.12/dist-packages
Requires: langchain-core, langgraph, pydantic
Required-by: 


In [3]:
!pip install langchain-openai





In [51]:
def search_vector_db(query: str):
       # 1) embed query
      q_emb = embedder.encode([query])[0]
      if hasattr(q_emb, "tolist"):
          q_emb = q_emb.tolist()
      # 2) search vector DB
      results = index.query(
        vector=q_emb,
        top_k=6,             # number of nearest neighbors
        include_metadata=True
      )
      return results

In [50]:
def answer_query(user_query, conversation_id=None):
    results=search_vector_db(user_query)


    # 3) build context
    context = ""
    for r in results["matches"]:
        md = r["metadata"]
        context += f"[{md['start_time']:.1f}s - {md['end_time']:.1f}s] {md['text']}\n\n"
    # 4) system + user prompt
    system_prompt = "You are an assistant that answers queries using ONLY the provided video excerpts"
    query = f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {user_query}\nAnswer"
    # 5) call LLM (could be OpenAI or local)

    llm = llm = OpenAI(model="gpt-3.5-turbo-instruct",temperature = 0.0,openai_api_key=os.environ["OPENAI_API_KEY"])

    answer = llm.invoke(query)
    print(answer)
    # 6) store memory (optional)
    # ... save conversation and retrieved ids
    return {"answer": answer, "sources": [m['id'] for m in results['matches']]}


In [25]:
question = "Explain MCP in simple terms."
answer = answer_query(question)

results
: MCP, or Model Context Protocol, is a way for AI agents to interact with software and tools without the need for human developers to write explicit code for every action. It allows AI agents to discover and use tools on their own, making the process faster and more efficient. MCP is different from traditional APIs, which require human developers to manually integrate software systems. Instead, MCP provides a machine-readable menu of its capabilities, allowing AI agents to dynamically and autonomously complete tasks without the need for external documentation or pre-written code. MCP will complement APIs, not replace them, and will work together with them to make systems more resilient and adaptable.


In [41]:
question = "what is LLM"
answer = answer_query(question)

results
: LLM stands for Large Language Model. It is a type of language model that is trained on a large volume of data, such as Wikipedia articles, Google news articles, and online books. LLMs use a neural network with trillions of parameters to capture complex patterns and nuances in language. They are used in applications such as Chat GPT and Gmail auto-complete. LLMs also use reinforcement learning with human feedback to improve their performance.


In [52]:
question = "what is stochastic parrot"
answer = answer_query(question)
print(answer)

: A stochastic parrot is a parrot that uses statistical probability and randomness to predict the next word or set of words based on past conversations it has listened to. It is similar to a language model, which uses neural networks to predict the next set of words in a sentence.
{'answer': ': A stochastic parrot is a parrot that uses statistical probability and randomness to predict the next word or set of words based on past conversations it has listened to. It is similar to a language model, which uses neural networks to predict the next set of words in a sentence.', 'sources': ['67_aMPDk2zw_chunk_0', '67_aMPDk2zw_chunk_2', '67_aMPDk2zw_chunk_3', 'dwlE7TiDXz40_chunk_6', 'dwlE7TiDXz40_chunk_0', '67_aMPDk2zw_chunk_1']}
