<a href="https://colab.research.google.com/github/kr5red/Project-4-Business-Case-Multimodal-AI-ChatBot-for-YouTube-Video-QA/blob/main/main_version5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# YouTube RAG Pipeline

0. Installments & Imports
1. Ingest YouTube Videos ‚Üí DataFrame
2. Convert Transcripts ‚Üí LangChain Documents
3. Build Vector Store (Chroma + OpenAI Embeddings)
4. (Next Steps ‚Äì Implemented in Later Cells)

## 0. Installments & Imports

In [2]:
!pip install youtube-transcript-api chromadb pytube
!pip install -q -U langchain langchain-openai langchain-core langchain-community langsmith
!pip install -q openai



In [3]:
import os
import pandas as pd
from urllib.parse import urlparse, parse_qs

# Colab secrets
from google.colab import userdata

# LangChain imports (modern API)
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool, tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_community.chat_message_histories import ChatMessageHistory


# Text splitting + Documents
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

# VectorDB
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# YouTube transcript
from youtube_transcript_api import (
    YouTubeTranscriptApi,
    TranscriptsDisabled,
    NoTranscriptFound,
)

# Metadata enrichment
from pytube import YouTube

#Speech recognition
from google.colab import files
from openai import OpenAI
import uuid

#Tools
from collections import defaultdict
from typing import Dict, List
from langchain_core.messages import BaseMessage

#deployment
import gradio as gr

In [4]:
from openai import OpenAI

openai_key = userdata.get("OPENAI_API_KEY")
langchain_key = userdata.get("LANGCHAIN_API_KEY")

if openai_key is None:
    raise ValueError("OPENAI_API_KEY not found in Colab secrets.")
if langchain_key is None:
    raise ValueError("LANGCHAIN_API_KEY not found in Colab secrets.")

os.environ["OPENAI_API_KEY"] = openai_key
os.environ["LANGCHAIN_API_KEY"] = langchain_key
os.environ["LANGCHAIN_TRACING_V2"] = "true"    # required for LangSmith
os.environ["LANGCHAIN_PROJECT"] = "youtube-qa-bot"

# Reusable OpenAI client for audio transcription etc.
openai_client = OpenAI()

print("Keys loaded")
print("LangSmith enabled ‚Äî project:", os.environ["LANGCHAIN_PROJECT"])

Keys loaded
LangSmith enabled ‚Äî project: youtube-qa-bot


YouTube ingestion
- URL -> video_id
- video_id -> transcript (list)
- transcript -> plain text

In [5]:
#Extract the YouTube video ID from URL formats
def extract_video_id(url: str) -> str:
    parsed = urlparse(url)

    # Short youtu.be links
    if parsed.netloc in ("youtu.be", "www.youtu.be"):
        return parsed.path.lstrip("/")

    # Regular youtube.com links
    if parsed.netloc in ("www.youtube.com", "youtube.com", "m.youtube.com"):
        qs = parse_qs(parsed.query)
        vid = qs.get("v", [None])[0]
        if vid:
            return vid

    raise ValueError(f"Could not extract video_id from URL: {url}")

#Convert a transcript (list of {text, start, duration}) to a single text string
def transcript_to_text(transcript, include_timestamps: bool = False) -> str:
    lines = []
    for entry in transcript:
        if include_timestamps:
            start = entry["start"]
            lines.append(f"[{start:.1f}s] {entry['text']}")
        else:
            lines.append(entry["text"])
    return " ".join(lines)


#Fetch transcript for a single video_id and turn it into plain text.
def fetch_transcript_text(video_id: str, languages=None) -> str:
    try:
        ytt_api = YouTubeTranscriptApi()

        # If you don't care about language, you can call ytt_api.fetch(video_id) without languages
        if languages is None:
            fetched = ytt_api.fetch(video_id)
        else:
            fetched = ytt_api.fetch(video_id, languages=languages)

        # `fetched` is a FetchedTranscript object with `.snippets`
        # Convert to the same structure transcript_to_text() expects
        transcript = [
            {"text": s.text, "start": s.start, "duration": s.duration}
            for s in fetched.snippets
        ]

        return transcript_to_text(transcript, include_timestamps=False)

    except TranscriptsDisabled:
        raise RuntimeError(f"Transcripts are disabled for video_id={video_id}")
    except NoTranscriptFound:
        raise RuntimeError(f"No transcript found for video_id={video_id} in languages={languages}")
    except Exception as e:
        raise RuntimeError(f"Error fetching transcript for {video_id}: {e}")


Ingest YouTube videos into a DataFrame

In [6]:
def ingest_youtube_videos(urls, languages="en, de") -> pd.DataFrame:
    rows = []
    for url in urls:
        try:
            video_id = extract_video_id(url)
            transcript = fetch_transcript_text(video_id, languages=languages)
            rows.append({
                "video_id": video_id,
                "url": url,
                "transcript": transcript,
            })
        except Exception as e:
            print(f"Skipping {url}: {e}")
    return pd.DataFrame(rows)


In [9]:
#Ingest multiple videos ----
video_urls = [
    "https://www.youtube.com/watch?v=HG68Ymazo18",
]

df_videos = ingest_youtube_videos(video_urls, languages=["en"])

if df_videos.empty:
    print("No videos ingested ...")
else:
    print(df_videos.head())

Skipping https://www.youtube.com/watch?v=HG68Ymazo18: Error fetching transcript for HG68Ymazo18: 
Could not retrieve a transcript for the video https://www.youtube.com/watch?v=HG68Ymazo18! This is most likely caused by:

YouTube is blocking requests from your IP. This usually is due to one of the following reasons:
- You have done too many requests and your IP has been blocked by YouTube
- You are doing requests from an IP belonging to a cloud provider (like AWS, Google Cloud Platform, Azure, etc.). Unfortunately, most IPs from cloud providers are blocked by YouTube.

There are two things you can do to work around this:
1. Use proxies to hide your IP address, as explained in the "Working around IP bans" section of the README (https://github.com/jdepoix/youtube-transcript-api?tab=readme-ov-file#working-around-ip-bans-requestblocked-or-ipblocked-exception).
2. (NOT RECOMMENDED) If you authenticate your requests using cookies, you will be able to continue doing requests for a while. Howev

In [8]:
df_videos.head()

Add Chunking + LangChain Documents on Top the Your DataFrame

In [None]:
#Convert each row in df_videos (video_id, url, transcript) into multiple LangChain Documents with metadata.
def df_to_documents(
    df: pd.DataFrame,
    chunk_size: int = 1000,
    chunk_overlap: int = 150,
) -> list[Document]:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )

    docs: list[Document] = []

    for _, row in df.iterrows():
        video_id = row["video_id"]
        url = row["url"]
        transcript = row["transcript"]

        # Try to fetch some metadata from YouTube
        title = author = description = None
        try:
            yt = YouTube(url)
            title = yt.title
            author = yt.author
            description = yt.description
        except Exception:
            pass

        # Split transcript into chunks
        chunks = splitter.split_text(transcript)

        for idx, chunk in enumerate(chunks):
            doc = Document(
                page_content=chunk,
                metadata={
                    "video_id": video_id,
                    "url": url,
                    "title": title,
                    "author": author,
                    "description": description,
                    "chunk_index": idx,
                },
            )
            docs.append(doc)

    return docs


In [None]:
documents = df_to_documents(df_videos)
print(f"Created {len(documents)} chunks from {len(df_videos)} videos.")

Created 6 chunks from 1 videos.


Build a LangChain VectorStore (Chroma) from Documents

In [None]:
# from langchain_community.embeddings import HuggingFaceEmbeddings

def build_vectorstore_from_documents(
    docs: list[Document],
    collection_name: str = "youtube_rag",
    persist_directory: str | None = None,
):
    """
    Build a Chroma vector store from LangChain Documents.
    Uses OpenAI embeddings by default.
    """
    # OpenAI embedding model
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

    vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=embeddings,
        collection_name=collection_name,
        persist_directory=persist_directory,  # can be None for in-memory
    )
    return vectorstore

In [None]:
vectorstore = build_vectorstore_from_documents(
    documents,
    collection_name="youtube_rag",
    persist_directory="./chroma_youtube_rag",
)

LLM + Retriever + Memory

In [None]:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

memory = ChatMessageHistory()

rag_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant answering questions about the content "
            "of YouTube videos indexed in a vector database. "
            "Use the retrieved context to answer accurately."
        ),
        ("human", "Context from videos:\n{context}\n\nQuestion: {question}")
    ]
)

Build the RAG pipeline manually

In [None]:
def youtube_rag_query(question: str):
    """
    Full RAG pipeline:
    1. Retrieve relevant video chunks
    2. Add them into the prompt
    3. Call the LLM
    4. Store chat history (memory)
    """
    # ---- Retrieval ----
    docs = retriever.invoke(question)

    if not docs:
        context = "No relevant content found."
    else:
        context_parts = []
        for i, d in enumerate(docs):
            meta = d.metadata or {}
            context_parts.append(
                f"[{i+1}] Title: {meta.get('title', 'Unknown')}\n"
                f"Channel: {meta.get('author', 'Unknown')}\n"
                f"Description: {meta.get('description', 'No description')}\n"
                f"Chunk {meta.get('chunk_index', '?')}:\n{d.page_content}"
            )
        context = "\n\n---\n".join(context_parts)

    # ---- Build prompt ----
    prompt_msg = rag_prompt.format_messages(
        context=context,
        question=question,
    )

    # ---- LLM call ----
    response = llm.invoke(prompt_msg)

    # ---- Memory update ----
    memory.add_message(HumanMessage(content=question))
    memory.add_message(response)

    return response.content, context


In [None]:
#Tools

@tool
def youtube_rag_qa(question: str) -> str:
    """Answer questions about the YouTube videos that have been ingested into the vector store."""
    answer, _ctx = youtube_rag_query(question)
    return answer

@tool
def transcribe_audio_file(audio_path: str) -> str:
    """Transcribe an audio file at a local path to text."""
    return transcribe_audio_to_text(audio_path)

tools = [youtube_rag_qa, transcribe_audio_file]
tool_map = {t.name: t for t in tools}

# LLM that can call tools
tool_llm = llm.bind_tools(tools)

# ---- Conversation memory (per session) ----

session_history: Dict[str, List[BaseMessage]] = defaultdict(list)

SYSTEM_PROMPT = (
    "You are an assistant that answers questions about a set of YouTube videos "
    "that have already been ingested into a vector database.\n"
    "If the user asks about the content of the videos, you should use the "
    "`youtube_rag_qa` tool.\n"
    "If the user gives or refers to an audio file path, you can use the "
    "`transcribe_audio_file` tool to turn it into text and then use "
    "`youtube_rag_qa` with the transcribed question.\n"
    "Always give concise, helpful answers."
)

def agent_chat(user_input: str, session_id: str = "default") -> str:
    """
    Simple agent:
    - Uses OpenAI tool-calling to decide whether to call youtube_rag_qa / transcribe_audio_file
    - Keeps per-session memory of previous turns
    """
    history = session_history[session_id]

    # 1) Build the message list: system + history + new user message
    messages: List[BaseMessage] = [
        SystemMessage(content=SYSTEM_PROMPT),
        *history,
        HumanMessage(content=user_input),
    ]

    # 2) First model call: the model may decide to call tools
    ai_msg = tool_llm.invoke(messages)
    messages.append(ai_msg)

    # If the model already answered without tools, just return that
    if not getattr(ai_msg, "tool_calls", None):
        history.extend([HumanMessage(content=user_input), ai_msg])
        return ai_msg.content

    # 3) If there are tool calls, execute them
    tool_messages: List[ToolMessage] = []
    for call in ai_msg.tool_calls:
        tool_name = call["name"]
        tool_args = call["args"]
        tool_id = call["id"]

        tool = tool_map.get(tool_name)
        if tool is None:
            tool_result = f"Error: tool '{tool_name}' not found."
        else:
            # invoke() expects a dict of arguments for the tool
            tool_result = tool.invoke(tool_args)

        tool_messages.append(
            ToolMessage(
                content=str(tool_result),
                tool_call_id=tool_id,
            )
        )

    # 4) Second model call: model sees tool results and produces final answer
    messages.extend(tool_messages)
    final_ai = llm.invoke(messages)

    # 5) Update history with just the human input and final answer
    history.extend([
        HumanMessage(content=user_input),
        final_ai,
    ])

    return final_ai.content


Test the RAG pipeline

In [None]:
answer, used_context = youtube_rag_query("Give me an overview of the videos you indexed.")

print("=== ANSWER ===\n")
print(answer)

print("\n=== CONTEXT USED ===\n")
print(used_context)

=== ANSWER ===

The indexed videos focus on job interview preparation and strategies. Here's an overview of the content:

1. **Interview Preparation**: The videos emphasize the importance of being well-prepared for interviews, including having a list of thoughtful questions to ask the employer. Suggested questions include inquiries about the company culture, goals, and performance evaluation.

2. **Positive Framing**: Candidates are advised to avoid speaking negatively about previous employers. Instead, they should highlight what they learned from past experiences and how those experiences can benefit the new role.

3. **Body Language and Etiquette**: The videos stress the significance of body language during interviews. Candidates should maintain good posture, make eye contact, and be aware of their movements to convey confidence and presence.

4. **Answering Common Questions**: Tips are provided on how to answer common interview questions effectively. Candidates are encouraged to be 

Chat loop



In [None]:
def chat_with_youtube_bot(session_id: str = "default"):
    """
    Simple text-based chat loop in Colab, powered by the tool-calling agent + memory.
    """
    print("YouTube QA ChatBot (agent powered)")
    print("Ask me anything about the videos I have indexed.")
    print("Type 'exit' or 'quit' to end the chat.\n")

    while True:
        user_input = input("You: ").strip()

        if user_input.lower() in {"exit", "quit"}:
            print("Bot: Bye!")
            break

        if not user_input:
            continue

        # ‚úÖ Use the agent (tools + memory), not the bare RAG function
        answer = agent_chat(user_input, session_id=session_id)
        print(f"Bot: {answer}\n")


# Run this to start chatting:
chat_with_youtube_bot()


YouTube QA ChatBot (agent powered)
Ask me anything about the videos I have indexed.
Type 'exit' or 'quit' to end the chat.

You: what should I not do?
Bot: In an interview, you should avoid speaking negatively about previous employers and instead focus on what you've learned. Be mindful of your body language; avoid distracting movements like tapping your fingers. Also, prepare a few questions for your employer to show your interest and preparation.

You: exit
Bot: Bye!


Speech recognition

In [None]:
def transcribe_audio_to_text(audio_path: str, model: str = "gpt-4o-mini-transcribe") -> str:
    """
    Transcribe an audio file to text using OpenAI's speech recognition.
    Adjust model name if needed depending on what your account supports.
    """
    with open(audio_path, "rb") as audio_file:
        transcription = openai_client.audio.transcriptions.create(
            model=model,
            file=audio_file,
            response_format="text",
        )
    # For newer clients this is already a string; if it's an object, cast to str
    return str(transcription)

def ask_bot_with_audio(session_id: str = "voice-session"):
    """
    Colab helper to:
    1. Upload an audio file with a spoken question
    2. Use the agent (via answer_from_audio_file) to get transcript + answer
    3. Print both transcript and answer
    """
    print("üéôÔ∏è Upload an audio file (e.g., .wav, .mp3) with your question...")
    uploaded = files.upload()

    if not uploaded:
        print("No file uploaded.")
        return

    audio_filename = next(iter(uploaded.keys()))
    print(f"üìÅ Uploaded file: {audio_filename}")

    try:
        # This already uses agent_chat under the hood
        transcript_text, answer = answer_from_audio_file(audio_filename, session_id=session_id)
    except Exception as e:
        print("‚ùå Error while processing the audio:", e)
        return

    print("\nüìù Transcribed question:")
    print(transcript_text)

    print("\nü§ñ Bot answer:")
    print(answer)

    print("\n Transcribed question:")
    print(transcript_text)

    # 2) Ask the RAG bot
    try:
        answer, _context = youtube_rag_query(transcript_text, session_id=session_id)
    except TypeError:
        # Fallback if youtube_rag_query only expects (question)
        answer, _context = youtube_rag_query(transcript_text)

    print("\nü§ñ Bot answer:")
    print(answer)

def tts_from_text(text: str, output_dir: str = "tts_outputs") -> str:
    """
    Convert answer text to speech using OpenAI TTS and return the audio file path.
    """
    os.makedirs(output_dir, exist_ok=True)
    filename = f"answer_{uuid.uuid4().hex}.mp3"
    out_path = os.path.join(output_dir, filename)

    # Adjust 'model' if needed to whatever TTS model you have access to
    speech = openai_client.audio.speech.create(
        model="gpt-4o-mini-tts",  # or another TTS-capable model from your OpenAI account
        voice="alloy",
        input=text,
    )

    with open(out_path, "wb") as f:
        f.write(speech.read())

    return out_path


In [None]:
ask_bot_with_audio()

üéôÔ∏è Upload an audio file (e.g., .wav, .mp3) with your question...


KeyboardInterrupt: 

In [None]:
def answer_from_audio_file(audio_path: str, session_id: str = "voice-session"):
    """
    Core backend logic:
    1. Transcribe audio file at `audio_path` to text
    2. Run the text through the agent (which internally uses RAG + tools + memory)
    3. Return (transcript, answer)
    """
    transcript_text = transcribe_audio_to_text(audio_path)
    answer = agent_chat(transcript_text, session_id=session_id)
    return transcript_text, answer


Change for Gradio later

In [None]:
def gradio_audio_qa(audio_path, tts_enabled: bool):
    """
    Gradio backend:
    - Takes an audio question (mic or upload)
    - Uses the agent-backed pipeline to get (transcript, answer)
    - Optionally generates spoken answer audio if tts_enabled is True
    """
    if audio_path is None:
        return "", "No audio received.", None

    # This already calls agent_chat under the hood
    transcript, answer = answer_from_audio_file(audio_path)

    answer_audio_path = None
    if tts_enabled:
        try:
            answer_audio_path = tts_from_text(answer)
        except Exception as e:
            print("TTS error:", e)

    return transcript, answer, answer_audio_path


with gr.Blocks() as demo:
    gr.Markdown("# üé¨ YouTube QA Bot with Voice Input")

    with gr.Row():
        audio_input = gr.Audio(
            sources=["microphone"],   # üëà mic only
            type="filepath",          # pass a file path to backend
            label="Hold to record your question",
        )
        tts_toggle = gr.Checkbox(
            label="Read answer aloud",
            value=False,
        )

    transcript_output = gr.Textbox(
        label="Transcribed question",
        lines=2,
    )
    answer_output = gr.Textbox(
        label="Bot answer",
        lines=4,
    )
    answer_audio_output = gr.Audio(
        label="Spoken answer",
        type="filepath",
    )

    audio_input.change(
        fn=gradio_audio_qa,
        inputs=[audio_input, tts_toggle],
        outputs=[transcript_output, answer_output, answer_audio_output],
    )

demo.launch()