# Your First RAG Application

In this notebook, we'll walk you through each of the components that are involved in a simple RAG application.

We won't be leveraging any fancy tools, just the OpenAI Python SDK, Numpy, and some classic Python.

> NOTE: This was done with Python 3.12.3.

> NOTE: There might be [compatibility issues](https://github.com/wandb/wandb/issues/7683) if you're on NVIDIA driver >552.44 As an interim solution - you can rollback your drivers to the 552.44.

## Table of Contents:

- Task 1: Imports and Utilities
- Task 2: Documents
- Task 3: Embeddings and Vectors
- Task 4: Prompts
- Task 5: Retrieval Augmented Generation
  - 🚧 Activity #1: Augment RAG

Let's look at a rather complicated looking visual representation of a basic RAG application.

<img src="https://i.imgur.com/vD8b016.png" />

## Task 1: Imports and Utility

We're just doing some imports and enabling `async` to work within the Jupyter environment here, nothing too crazy!

In [1]:
# %pip install numpy
import numpy as np
print("NumPy version:", np.__version__)
print("Import successful!")

NumPy version: 2.3.3
Import successful!


In [2]:
# %pip install dotenv
# %pip install openai

from aimakerspace.text_utils import TextFileLoader, CharacterTextSplitter
from aimakerspace.vectordatabase import VectorDatabase
import asyncio

In [3]:
import nest_asyncio
nest_asyncio.apply()

## Task 2: Documents

We'll be concerning ourselves with this part of the flow in the following section:

<img src="https://i.imgur.com/jTm9gjk.png" />

### Loading Source Documents

So, first things first, we need some documents to work with.

While we could work directly with the `.txt` files (or whatever file-types you wanted to extend this to) we can instead do some batch processing of those documents at the beginning in order to store them in a more machine compatible format.

In this case, we're going to parse our text file into a single document in memory.

Let's look at the relevant bits of the `TextFileLoader` class:

```python
def load_file(self):
        with open(self.path, "r", encoding=self.encoding) as f:
            self.documents.append(f.read())
```

We're simply loading the document using the built in `open` method, and storing that output in our `self.documents` list.

> NOTE: We're using blogs from PMarca (Marc Andreessen) as our sample data. This data is largely irrelevant as we want to focus on the mechanisms of RAG, which includes out data's shape and quality - but not specifically what the contents of the data are. 


In [4]:
text_loader = TextFileLoader("data/PMarcaBlogs.txt")
documents = text_loader.load_documents()
len(documents)

1

In [5]:
print(documents[0][:100])

﻿
The Pmarca Blog Archives
(select posts from 2007-2009)
Marc Andreessen
copyright: Andreessen Horow


### Splitting Text Into Chunks

As we can see, there is one massive document.

We'll want to chunk the document into smaller parts so it's easier to pass the most relevant snippets to the LLM.

There is no fixed way to split/chunk documents - and you'll need to rely on some intuition as well as knowing your data *very* well in order to build the most robust system.

For this toy example, we'll just split blindly on length.

>There's an opportunity to clear up some terminology here, for this course we will be stick to the following:
>
>- "source documents" : The `.txt`, `.pdf`, `.html`, ..., files that make up the files and information we start with in its raw format
>- "document(s)" : single (or more) text object(s)
>- "corpus" : the combination of all of our documents

As you can imagine (though it's not specifically true in this toy example) the idea of splitting documents is to break them into managable sized chunks that retain the most relevant local context.

In [6]:
text_splitter = CharacterTextSplitter()
split_documents = text_splitter.split_texts(documents)
len(split_documents)

373

Let's take a look at some of the documents we've managed to split.

In [7]:
split_documents[0:1]

['\ufeff\nThe Pmarca Blog Archives\n(select posts from 2007-2009)\nMarc Andreessen\ncopyright: Andreessen Horowitz\ncover design: Jessica Hagy\nproduced using: Pressbooks\nContents\nTHE PMARCA GUIDE TO STARTUPS\nPart 1: Why not to do a startup 2\nPart 2: When the VCs say "no" 10\nPart 3: "But I don\'t know any VCs!" 18\nPart 4: The only thing that matters 25\nPart 5: The Moby Dick theory of big companies 33\nPart 6: How much funding is too little? Too much? 41\nPart 7: Why a startup\'s initial business plan doesn\'t\nmatter that much\n49\nTHE PMARCA GUIDE TO HIRING\nPart 8: Hiring, managing, promoting, and Dring\nexecutives\n54\nPart 9: How to hire a professional CEO 68\nHow to hire the best people you\'ve ever worked\nwith\n69\nTHE PMARCA GUIDE TO BIG COMPANIES\nPart 1: Turnaround! 82\nPart 2: Retaining great people 86\nTHE PMARCA GUIDE TO CAREER, PRODUCTIVITY,\nAND SOME OTHER THINGS\nIntroduction 97\nPart 1: Opportunity 99\nPart 2: Skills and education 107\nPart 3: Where to go and wh

## Task 3: Embeddings and Vectors

Next, we have to convert our corpus into a "machine readable" format as we explored in the Embedding Primer notebook.

Today, we're going to talk about the actual process of creating, and then storing, these embeddings, and how we can leverage that to intelligently add context to our queries.

### OpenAI API Key

In order to access OpenAI's APIs, we'll need to provide our OpenAI API Key!

You can work through the folder "OpenAI API Key Setup" for more information on this process if you don't already have an API Key!

In [8]:
import os
import openai
from getpass import getpass

openai.api_key = getpass("OpenAI API Key: ")
os.environ["OPENAI_API_KEY"] = openai.api_key

OpenAI API Key:  ········


### Vector Database

Let's set up our vector database to hold all our documents and their embeddings!

While this is all baked into 1 call - we can look at some of the code that powers this process to get a better understanding:

Let's look at our `VectorDatabase().__init__()`:

```python
def __init__(self, embedding_model: EmbeddingModel = None):
        self.vectors = defaultdict(np.array)
        self.embedding_model = embedding_model or EmbeddingModel()
```

As you can see - our vectors are merely stored as a dictionary of `np.array` objects.

Secondly, our `VectorDatabase()` has a default `EmbeddingModel()` which is a wrapper for OpenAI's `text-embedding-3-small` model.

> **Quick Info About `text-embedding-3-small`**:
> - It has a context window of **8191** tokens
> - It returns vectors with dimension **1536**

#### ❓Question #1:

The default embedding dimension of `text-embedding-3-small` is 1536, as noted above. 

1. Is there any way to modify this dimension?
2. What technique does OpenAI use to achieve this?

> NOTE: Check out this [API documentation](https://platform.openai.com/docs/api-reference/embeddings/create) for the answer to question #1.1, and [this documentation](https://platform.openai.com/docs/guides/embeddings/use-cases) for an answer to question #1.2!


##### ✅ Answer:
1. You can pass in a value to the dimensions paramater to reduce the default of 1536 dimensions.
2. This change only happens for the `text-embedding-3-small` and `text-embedding-3-large` models - you can reduce the number of dimensions and you will not lose any concept-representing properties, unlike other models. Ideally, you should modify this value before creating the embedding. They are able to acheive this by encoding information dynamically and having individual embeddings adapt to new constraints as it is created.

We can call the `async_get_embeddings` method of our `EmbeddingModel()` on a list of `str` and receive a list of `float` back!

```python
async def async_get_embeddings(self, list_of_text: List[str]) -> List[List[float]]:
        return await aget_embeddings(
            list_of_text=list_of_text, engine=self.embeddings_model_name
        )
```

We cast those to `np.array` when we build our `VectorDatabase()`:

```python
async def abuild_from_list(self, list_of_text: List[str]) -> "VectorDatabase":
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        for text, embedding in zip(list_of_text, embeddings):
            self.insert(text, np.array(embedding))
        return self
```

And that's all we need to do!

In [9]:
vector_db = VectorDatabase()
vector_db = asyncio.run(vector_db.abuild_from_list(split_documents))

#### ❓Question #2:

What are the benefits of using an `async` approach to collecting our embeddings?

> NOTE: Determining the core difference between `async` and `sync` will be useful! If you get stuck - ask ChatGPT!

##### ✅ Answer:
Creating requests synchronously mean the work will be done one at a time. Only till the first request finishes can you move on to the second. An asychronous approach means that the requests are done parallelly. If the first request is taking too long, you can work on the second and third one at the same time. 
Some benefits of using an `async` approach include:
1. If you have a large number of texts (in the hundreds or thousands), an `async` approach lets you make many requests concurrently. A `synchronous` approach means you have to wait for one request to finish before moving on to the next. This would take too long for a large data set. A synchronous approach might work if you have a very small data set - i.e. 5-10 queries total.
2. Async scales better. Since you are not limited by the order in which things are done, you can work with other heavy operations like API calls. This also helps web servers scale because you need to serve many users at the same time.
3. An async approach means faster processing of large batches and more efficient resource usage. 


So, to review what we've done so far in natural language:

1. We load source documents
2. We split those source documents into smaller chunks (documents)
3. We send each of those documents to the `text-embedding-3-small` OpenAI API endpoint
4. We store each of the text representations with the vector representations as keys/values in a dictionary

### Semantic Similarity

The next step is to be able to query our `VectorDatabase()` with a `str` and have it return to us vectors and text that is most relevant from our corpus.

We're going to use the following process to achieve this in our toy example:

1. We need to embed our query with the same `EmbeddingModel()` as we used to construct our `VectorDatabase()`
2. We loop through every vector in our `VectorDatabase()` and use a distance measure to compare how related they are
3. We return a list of the top `k` closest vectors, with their text representations

There's some very heavy optimization that can be done at each of these steps - but let's just focus on the basic pattern in this notebook.

> We are using [cosine similarity](https://www.engati.com/glossary/cosine-similarity) as a distance metric in this example - but there are many many distance metrics you could use - like [these](https://flavien-vidal.medium.com/similarity-distances-for-natural-language-processing-16f63cd5ba55)

> We are using a rather inefficient way of calculating relative distance between the query vector and all other vectors - there are more advanced approaches that are much more efficient, like [ANN](https://towardsdatascience.com/comprehensive-guide-to-approximate-nearest-neighbors-algorithms-8b94f057d6b6)

In [10]:
vector_db.search_by_text("What is the Michael Eisner Memorial Weak Executive Problem?", k=3)

[('ordingly.\nSeventh, when hiring the executive to run your former specialty, be\ncareful you don’t hire someone weak on purpose.\nThis sounds silly, but you wouldn’t believe how oaen it happens.\nThe CEO who used to be a product manager who has a weak\nproduct management executive. The CEO who used to be in\nsales who has a weak sales executive. The CEO who used to be\nin marketing who has a weak marketing executive.\nI call this the “Michael Eisner Memorial Weak Executive Problem” — aaer the CEO of Disney who had previously been a brilliant TV network executive. When he bought ABC at Disney, it\npromptly fell to fourth place. His response? “If I had an extra\ntwo days a week, I could turn around ABC myself.” Well, guess\nwhat, he didn’t have an extra two days a week.\nA CEO — or a startup founder — oaen has a hard time letting\ngo of the function that brought him to the party. The result: you\nhire someone weak into the executive role for that function so\nthat you can continue to b

## Task 4: Prompts

In the following section, we'll be looking at the role of prompts - and how they help us to guide our application in the right direction.

In this notebook, we're going to rely on the idea of "zero-shot in-context learning".

This is a lot of words to say: "We will ask it to perform our desired task in the prompt, and provide no examples."

### XYZRolePrompt

Before we do that, let's stop and think a bit about how OpenAI's chat models work.

We know they have roles - as is indicated in the following API [documentation](https://platform.openai.com/docs/api-reference/chat/create#chat/create-messages)

There are three roles, and they function as follows (taken directly from [OpenAI](https://platform.openai.com/docs/guides/gpt/chat-completions-api)):

- `{"role" : "system"}` : The system message helps set the behavior of the assistant. For example, you can modify the personality of the assistant or provide specific instructions about how it should behave throughout the conversation. However note that the system message is optional and the model’s behavior without a system message is likely to be similar to using a generic message such as "You are a helpful assistant."
- `{"role" : "user"}` : The user messages provide requests or comments for the assistant to respond to.
- `{"role" : "assistant"}` : Assistant messages store previous assistant responses, but can also be written by you to give examples of desired behavior.

The main idea is this:

1. You start with a system message that outlines how the LLM should respond, what kind of behaviours you can expect from it, and more
2. Then, you can provide a few examples in the form of "assistant"/"user" pairs
3. Then, you prompt the model with the true "user" message.

In this example, we'll be forgoing the 2nd step for simplicities sake.

#### Utility Functions

You'll notice that we're using some utility functions from the `aimakerspace` module - let's take a peek at these and see what they're doing!

##### XYZRolePrompt

Here we have our `system`, `user`, and `assistant` role prompts.

Let's take a peek at what they look like:

```python
class BasePrompt:
    def __init__(self, prompt):
        """
        Initializes the BasePrompt object with a prompt template.

        :param prompt: A string that can contain placeholders within curly braces
        """
        self.prompt = prompt
        self._pattern = re.compile(r"\{([^}]+)\}")

    def format_prompt(self, **kwargs):
        """
        Formats the prompt string using the keyword arguments provided.

        :param kwargs: The values to substitute into the prompt string
        :return: The formatted prompt string
        """
        matches = self._pattern.findall(self.prompt)
        return self.prompt.format(**{match: kwargs.get(match, "") for match in matches})

    def get_input_variables(self):
        """
        Gets the list of input variable names from the prompt string.

        :return: List of input variable names
        """
        return self._pattern.findall(self.prompt)
```

Then we have our `RolePrompt` which laser focuses us on the role pattern found in most API endpoints for LLMs.

```python
class RolePrompt(BasePrompt):
    def __init__(self, prompt, role: str):
        """
        Initializes the RolePrompt object with a prompt template and a role.

        :param prompt: A string that can contain placeholders within curly braces
        :param role: The role for the message ('system', 'user', or 'assistant')
        """
        super().__init__(prompt)
        self.role = role

    def create_message(self, **kwargs):
        """
        Creates a message dictionary with a role and a formatted message.

        :param kwargs: The values to substitute into the prompt string
        :return: Dictionary containing the role and the formatted message
        """
        return {"role": self.role, "content": self.format_prompt(**kwargs)}
```

We'll look at how the `SystemRolePrompt` is constructed to get a better idea of how that extension works:

```python
class SystemRolePrompt(RolePrompt):
    def __init__(self, prompt: str):
        super().__init__(prompt, "system")
```

That pattern is repeated for our `UserRolePrompt` and our `AssistantRolePrompt` as well.

##### ChatOpenAI

Next we have our model, which is converted to a format analagous to libraries like LangChain and LlamaIndex.

Let's take a peek at how that is constructed:

```python
class ChatOpenAI:
    def __init__(self, model_name: str = "gpt-4.1-mini"):
        self.model_name = model_name
        self.openai_api_key = os.getenv("OPENAI_API_KEY")
        if self.openai_api_key is None:
            raise ValueError("OPENAI_API_KEY is not set")

    def run(self, messages, text_only: bool = True):
        if not isinstance(messages, list):
            raise ValueError("messages must be a list")

        openai.api_key = self.openai_api_key
        response = openai.ChatCompletion.create(
            model=self.model_name, messages=messages
        )

        if text_only:
            return response.choices[0].message.content

        return response
```

#### ❓ Question #3:

When calling the OpenAI API - are there any ways we can achieve more reproducible outputs?

> NOTE: Check out [this section](https://platform.openai.com/docs/guides/text-generation/) of the OpenAI documentation for the answer!

##### ✅ Answer:
Prompt Engineering - writing effective instructions for the model so that the output has clear success criteria and meets your requirements more consistently.
There are some techniques that work with any model, such as using message roles. However, the prompts may need to be adjusted based on the model. Even different snapshots of models within the same family can produce different results with the same prompts. For that reason, it is recommended to pin your applications to model snapshots for consistent behavior. And to build a way to evaluate the behavior and performance of the prompts so you update it as necessary.
You can pass in different insructions which behave like certain roles (developer). You can also pass in prompts as certain roles. The `developer` role has the highest priority and are used as instructions for the model. The `user` role is prioritized after `developer`, and these are the instructions given by the user - the query asked. There is also the `assistant` role which are messages generated by the model.
You can also pass in reusable prompts which can be updated without changing any of the code. This allows you to iterate until you consistently get successful responses.


### Creating and Prompting OpenAI's `gpt-4.1-mini`!

Let's tie all these together and use it to prompt `gpt-4.1-mini`!

In [11]:
from aimakerspace.openai_utils.prompts import (
    UserRolePrompt,
    SystemRolePrompt,
    AssistantRolePrompt,
)

from aimakerspace.openai_utils.chatmodel import ChatOpenAI

chat_openai = ChatOpenAI()
user_prompt_template = "{content}"
user_role_prompt = UserRolePrompt(user_prompt_template)
system_prompt_template = (
    "You are an expert in {expertise}, you always answer in a kind way."
)
system_role_prompt = SystemRolePrompt(system_prompt_template)

messages = [
    system_role_prompt.create_message(expertise="Python"),
    user_role_prompt.create_message(
        content="What is the best way to write a loop?"
    ),
]

response = chat_openai.run(messages)

In [12]:
print(response)

Hello! The "best" way to write a loop in Python depends on what you're trying to achieve, but generally, using a `for` loop to iterate over sequences (like lists, strings, or ranges) is very clean and Pythonic.

Here's an example of a simple and readable `for` loop:

```python
for item in iterable:
    # do something with item
```

For example, to print numbers from 0 to 4:

```python
for i in range(5):
    print(i)
```

If you need a loop that continues until a condition becomes false, a `while` loop works well:

```python
count = 0
while count < 5:
    print(count)
    count += 1
```

If you want, feel free to share your specific use case, and I can help suggest the best loop style for it! 😊


## Task 5: Retrieval Augmented Generation

Now we can create a RAG prompt - which will help our system behave in a way that makes sense!

There is much you could do here, many tweaks and improvements to be made!

In [13]:
RAG_SYSTEM_TEMPLATE = """You are a knowledgeable and thoughtful assistant that provides deep analysis based on provided context.

Before answering, ask yourself these questions:
- What is the core issue being described?
- What causes or factors contribute to this situation?
- What are the consequences or implications?
- What patterns or principles does this illustrate?
- How might this apply in similar situations?

Instructions:
- Think through your response step-by-step before providing your final answer
- First, analyze what the context tells us about the topic
- Then, identify the key insights and implications
- Finally, synthesize this into a comprehensive response
- Only answer questions using information from the provided context
- If the context doesn't contain relevant information, respond with "I don't know"
- Be accurate and cite specific parts of the context when possible
- Keep responses {response_style} and {response_length}
- Show your reasoning process before giving your final answer

Then provide a comprehensive response that addresses these analytical dimensions while staying strictly within the provided context.

Format your response as:
**Analysis:** [Your step-by-step thinking]
**Answer:** [Your final comprehensive response]"""


RAG_USER_TEMPLATE = """Context Information: {context}
Number of relevant sources found: {context_count}
{similarity_scores}

Question: {user_query}

Please analyze this question by considering:
1. What does the context directly state about this topic?
2. What are the underlying principles or patterns described?
3. What implications or consequences are mentioned or can be inferred?
4. How does this connect to broader concepts mentioned in the context?

Provide your answer based solely on the context above, showing your analytical process."""

rag_system_prompt = SystemRolePrompt(
    RAG_SYSTEM_TEMPLATE,
    strict=True,
    defaults={
        "response_style": "detailed",
        "response_length": "comprehensive"
    }
)

rag_user_prompt = UserRolePrompt(
    RAG_USER_TEMPLATE,
    strict=True,
    defaults={
        "context_count": "",
        "similarity_scores": ""
    }
)

Now we can create our pipeline!

In [14]:
class RetrievalAugmentedQAPipeline:
    def __init__(self, llm: ChatOpenAI, vector_db_retriever: VectorDatabase, 
                 response_style: str = "detailed", include_scores: bool = False) -> None:
        self.llm = llm
        self.vector_db_retriever = vector_db_retriever
        self.response_style = response_style
        self.include_scores = include_scores

    def run_pipeline(self, user_query: str, k: int = 4, **system_kwargs) -> dict:
        # Retrieve relevant contexts
        context_list = self.vector_db_retriever.search_by_text(user_query, k=k)
        
        context_prompt = ""
        similarity_scores = []
        
        for i, (context, score) in enumerate(context_list, 1):
            context_prompt += f"[Source {i}]: {context}\n\n"
            similarity_scores.append(f"Source {i}: {score:.3f}")
        
        # Create system message with parameters
        system_params = {
            "response_style": self.response_style,
            "response_length": system_kwargs.get("response_length", "detailed")
        }
        
        formatted_system_prompt = rag_system_prompt.create_message(**system_params)
        
        user_params = {
            "user_query": user_query,
            "context": context_prompt.strip(),
            "context_count": len(context_list),
            "similarity_scores": f"Relevance scores: {', '.join(similarity_scores)}" if self.include_scores else ""
        }
        
        formatted_user_prompt = rag_user_prompt.create_message(**user_params)

        return {
            "response": self.llm.run([formatted_system_prompt, formatted_user_prompt]), 
            "context": context_list,
            "context_count": len(context_list),
            "similarity_scores": similarity_scores if self.include_scores else None,
            "prompts_used": {
                "system": formatted_system_prompt,
                "user": formatted_user_prompt
            }
        }

In [15]:
rag_pipeline = RetrievalAugmentedQAPipeline(
    vector_db_retriever=vector_db,
    llm=chat_openai,
    response_style="detailed",
    include_scores=True
)

result = rag_pipeline.run_pipeline(
    "What is the 'Michael Eisner Memorial Weak Executive Problem'?",
    k=3,
    response_length="comprehensive", 
    include_warnings=True,
    confidence_required=True
)

print(f"Response: {result['response']}")
print(f"\nContext Count: {result['context_count']}")
print(f"Similarity Scores: {result['similarity_scores']}")

Response: **Analysis:**  
1. The context from Source 1 explicitly defines the "Michael Eisner Memorial Weak Executive Problem." It occurs when a CEO or startup founder, previously strong or personally involved in a specific function (e.g., product management, sales, marketing), intentionally hires a weak executive to run that same function after ascending to the CEO role. The reason for this is the CEO's reluctance to fully relinquish control or influence over the function that initially made them successful or got them "to the party."  
   
2. The underlying pattern is that the founder or CEO struggles with letting go of their previous role or specialty, which leads to hiring a weaker executive in that area. This allows the CEO to remain the ultimate authority or key player in the function, even if it undermines the function's effectiveness.

3. The consequence, as illustrated by the Michael Eisner example from Disney, is that the function or division deteriorates under weak leadershi

#### ❓ Question #4:

What prompting strategies could you use to make the LLM have a more thoughtful, detailed response?

What is that strategy called?

> NOTE: You can look through our [OpenAI Responses API](https://colab.research.google.com/drive/14SCfRnp39N7aoOx8ZxadWb0hAqk4lQdL?usp=sharing) notebook for an answer to this question if you get stuck!

##### ✅ Answer:
The overall strategy to improve LLM responses is advanced or analytical prompt engineering techniques. All of these smaller strategies together will improve the responses and consistency of success.
1. Chain of thought processing. Have the LLM think step by step and show it's reasoning before outputing the final response.
2. Right now the LLM isn't allowed to use any external information. If we want a more accurate response, we should allow for different paths of reasoning which includes information that wasn't explicitly given to it by the user. We can also have the system think about the problem in different ways and then converge the ideas at the end for a more comprehensive response.
3. We tell the LLM to `respond with "I don't know"` if it doesn't know the answer to a query. We can also have the system ask itself questions to create more in depth reasoning for each question.
4. We can assign more well defined roles to the system so it understands what we expect from the responses better.

### 🏗️ Activity #1:

Enhance your RAG application in some way! 

Suggestions are: 

- Allow it to work with PDF files
- Implement a new distance metric
- Add metadata support to the vector database
- Use a different embedding model
- Add the capability to ingest a YouTube link

While these are suggestions, you should feel free to make whatever augmentations you desire! If you shared an idea during Session 1, think about features you might need to incorporate for your use case! 

When you're finished making the augments to your RAG application - vibe check it against the old one - see if you can "feel the improvement"!

> NOTE: These additions might require you to work within the `aimakerspace` library - that's expected!

> NOTE: If you're not sure where to start - ask Cursor (CMD/CTRL+L) to guide you through the changes!

Now on to Youtube

In [16]:
# from youtube_transcript_api import YouTubeTranscriptApi
# from urllib.parse import urlparse, parse_qs

# def get_video_id(url: str) -> str:
#     parsed_url = urlparse(url)
#     # Extract query parameters
#     query_params = parse_qs(parsed_url.query)
#     return query_params.get("v", [None])[0]

# def get_video_transcript(video_id: str, lang: str = "en") -> list:
#     print("video_id", video_id)
#     youtube_api = YouTubeTranscriptApi()
#     transcript = youtube_api.fetch(video_id, languages=[lang])
#     # transcript is a list of dicts: [{'text': '...', 'start': 0.0, 'duration': 5.0}, ...]
#     return [{'text': snippet.text, 'start': snippet.start, 'duration': snippet.duration} for snippet in transcript]

# def chunk_transcript(transcript, chunk_size=120):
#     chunks, current_chunk, current_start = [], [], 0

#     for entry in transcript:
#         start_time = entry['start']
#         if start_time - current_start < chunk_size:
#             current_chunk.append(entry['text'])
#         else:
#             chunks.append((current_start, " ".join(current_chunk)))
#             current_chunk = [entry['text']]
#             current_start = start_time
#     if current_chunk:
#         chunks.append((current_start, " ".join(current_chunk)))
#     return chunks

# def summarize_chunks(chunks, llm):
#     summaries = []
#     for start_time, text in chunks:
#         response = llm.run([
#             {"role": "system", "content": "You are a helpful assistant that summarizes YouTube transcripts."},
#             {"role": "user", "content": f"Summarize this segment:\n\n{text}"}
#         ])
#         summaries.append((start_time, response))
#     return summaries

# def format_summary(summaries):
#     for start_time, summary in summaries:
#         minutes = int(start_time // 60)
#         print(f"[{minutes:02d}:00] {summary}")




In [17]:
# # youtube_url = "8vFGrNjT4P4"
# youtube_url =input("Youtube Url: ")
# video_id = get_video_id(youtube_url)
# print(f"URL, {video_id}")


# transcript = get_video_transcript(video_id)

# print(f"transcript, {transcript}")

# chunked_transcript = chunk_transcript(transcript)

# print(f"chunked_transcript, {chunked_transcript}")



In [18]:
# summarized_chunks = summarize_chunks(chunked_transcript, chat_openai)

# print(f"summarized_chunks, {summarized_chunks}")


# formatted_summary = format_summary(summarized_chunks)

# print(f"formatted_summary, {formatted_summary}")

YouTube Transcript RAG System

Key features:
1. Index YouTube videos by URL
2. Search through transcript content semantically
3. Ask questions about video content
4. Get video summaries
5. Access timestamped sources

Example usage:

#### Index a video
`video_id = await youtube_rag.index_video("https://www.youtube.com/watch?v=VIDEO_ID")`

#### Search for specific content
`results = youtube_rag.search_transcript("speaking techniques", k=3)`

#### Ask questions about the video
`answer = youtube_rag.ask_question("What are the main speaking tips mentioned?", video_id)`

#### Get a summary
`summary = youtube_rag.get_video_summary(video_id)`

#### List all indexed videos
`videos = youtube_rag.list_indexed_videos()`

In [19]:
# Enhanced YouTube Transcript RAG System
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
from aimakerspace.vectordatabase import VectorDatabase
from aimakerspace.openai_utils.chatmodel import ChatOpenAI
from aimakerspace.openai_utils.prompts import SystemRolePrompt, UserRolePrompt
import asyncio
import numpy as np
from typing import List, Dict, Tuple, Optional
import json

class YouTubeTranscriptRAG:
    """
    A RAG system for YouTube transcripts that can index, search, and answer questions
    about video content using semantic similarity and LLM generation.
    """
    
    def __init__(self, llm: ChatOpenAI = None, embedding_model=None):
        self.llm = llm or ChatOpenAI()
        self.vector_db = VectorDatabase(embedding_model)
        self.transcript_data = {}  # Store original transcript data
        self.video_metadata = {}   # Store video information
        
    def get_video_id(self, url: str) -> str:
        """Extract video ID from YouTube URL"""
        parsed_url = urlparse(url)
        query_params = parse_qs(parsed_url.query)
        return query_params.get("v", [None])[0]
    
    def get_video_transcript(self, video_id: str, lang: str = "en") -> List[Dict]:
        """Fetch transcript from YouTube API"""
        print(f"Fetching transcript for video ID: {video_id}")
        youtube_api = YouTubeTranscriptApi()
        transcript = youtube_api.fetch(video_id, languages=[lang])
        return [{'text': snippet.text, 'start': snippet.start, 'duration': snippet.duration} for snippet in transcript]
    
    def chunk_transcript(self, transcript: List[Dict], chunk_size: int = 120) -> List[Tuple[float, str, Dict]]:
        """
        Chunk transcript into segments with metadata
        Returns: List of (start_time, text, metadata) tuples
        """
        chunks = []
        current_chunk = []
        current_start = 0
        
        for entry in transcript:
            start_time = entry['start']
            if start_time - current_start < chunk_size:
                current_chunk.append(entry['text'])
            else:
                if current_chunk:
                    # Create metadata for this chunk
                    metadata = {
                        'start_time': current_start,
                        'end_time': start_time,
                        'duration': start_time - current_start,
                        'video_id': getattr(self, 'current_video_id', None)
                    }
                    chunks.append((current_start, " ".join(current_chunk), metadata))
                current_chunk = [entry['text']]
                current_start = start_time
        
        # Add the last chunk
        if current_chunk:
            metadata = {
                'start_time': current_start,
                'end_time': transcript[-1]['start'] + transcript[-1]['duration'],
                'duration': (transcript[-1]['start'] + transcript[-1]['duration']) - current_start,
                'video_id': getattr(self, 'current_video_id', None)
            }
            chunks.append((current_start, " ".join(current_chunk), metadata))
        
        return chunks
    
    async def index_video(self, video_url: str, chunk_size: int = 120, lang: str = "en") -> str:
        """
        Index a YouTube video for RAG search
        Returns the video ID
        """
        video_id = self.get_video_id(video_url)
        if not video_id:
            raise ValueError("Invalid YouTube URL - could not extract video ID")
        
        self.current_video_id = video_id
        
        # Fetch transcript
        transcript = self.get_video_transcript(video_id, lang)
        self.transcript_data[video_id] = transcript
        
        # Store video metadata
        self.video_metadata[video_id] = {
            'url': video_url,
            'video_id': video_id,
            'language': lang,
            'total_segments': len(transcript),
            'total_duration': transcript[-1]['start'] + transcript[-1]['duration'] if transcript else 0
        }
        
        # Chunk the transcript
        chunks = self.chunk_transcript(transcript, chunk_size)
        
        # Prepare texts for embedding
        chunk_texts = [chunk[1] for chunk in chunks]  # Extract text from chunks
        
        # Create embeddings and add to vector database
        await self.vector_db.abuild_from_list(chunk_texts)
        
        # Store chunk metadata for later retrieval
        for i, (start_time, text, metadata) in enumerate(chunks):
            # Use the text as key to store metadata
            if hasattr(self.vector_db, 'metadata'):
                self.vector_db.metadata[text] = metadata
            else:
                self.vector_db.metadata = {text: metadata}
        
        print(f"Successfully indexed video {video_id} with {len(chunks)} chunks")
        return video_id
    
    def search_transcript(self, query: str, k: int = 5, video_id: str = None) -> List[Tuple[str, float, Dict]]:
        """
        Search through indexed transcripts
        Returns: List of (text, similarity_score, metadata) tuples
        """
        # Get search results from vector database
        results = self.vector_db.search_by_text(query, k=k)
        
        # Add metadata to results
        enhanced_results = []
        for text, score in results:
            metadata = getattr(self.vector_db, 'metadata', {}).get(text, {})
            
            # Filter by video_id if specified
            if video_id and metadata.get('video_id') != video_id:
                continue
                
            enhanced_results.append((text, score, metadata))
        
        return enhanced_results
    
    def ask_question(self, question: str, video_id: str = None, k: int = 5) -> Dict:
        """
        Ask a question about the indexed video content using RAG
        """
        # Search for relevant chunks
        relevant_chunks = self.search_transcript(question, k=k, video_id=video_id)
        
        if not relevant_chunks:
            return {
                "answer": "No relevant information found in the indexed videos.",
                "sources": [],
                "video_id": video_id
            }
        
        # Prepare context from relevant chunks
        context_parts = []
        sources = []
        
        for i, (text, score, metadata) in enumerate(relevant_chunks, 1):
            start_minutes = int(metadata.get('start_time', 0) // 60)
            start_seconds = int(metadata.get('start_time', 0) % 60)
            timestamp = f"{start_minutes:02d}:{start_seconds:02d}"
            
            context_parts.append(f"[Source {i} - {timestamp}]: {text}")
            sources.append({
                'text': text,
                'timestamp': timestamp,
                'start_time': metadata.get('start_time', 0),
                'similarity_score': score,
                'video_id': metadata.get('video_id')
            })
        
        context = "\n\n".join(context_parts)
        
        # Create RAG prompt
        system_prompt = SystemRolePrompt("""
        You are a helpful assistant that answers questions about YouTube video content.
        Use only the provided context from the video transcript to answer questions.
        If the context doesn't contain enough information to answer the question, say so.
        When referencing specific parts of the video, mention the timestamp if available.
        """)
        
        user_prompt = UserRolePrompt("""
        Context from YouTube video transcript:
        {context}
        
        Question: {question}
        
        Please provide a comprehensive answer based on the video content above.
        """)
        
        # Generate response
        messages = [
            system_prompt.create_message(),
            user_prompt.create_message(context=context, question=question)
        ]
        
        answer = self.llm.run(messages)
        
        return {
            "answer": answer,
            "sources": sources,
            "video_id": video_id or "all_videos",
            "context_count": len(relevant_chunks)
        }
    
    def get_video_summary(self, video_id: str, max_chunks: int = 10) -> str:
        """
        Generate a summary of the video content
        """
        if video_id not in self.transcript_data:
            return "Video not found in indexed data."
        
        # Get all chunks for this video
        all_chunks = []
        for text, metadata in getattr(self.vector_db, 'metadata', {}).items():
            if metadata.get('video_id') == video_id:
                all_chunks.append((metadata.get('start_time', 0), text))
        
        # Sort by start time and take first max_chunks
        all_chunks.sort(key=lambda x: x[0])
        selected_chunks = all_chunks[:max_chunks]
        
        # Combine text
        combined_text = " ".join([chunk[1] for chunk in selected_chunks])
        
        # Generate summary
        system_prompt = SystemRolePrompt("""
        You are a helpful assistant that creates concise summaries of YouTube video content.
        Create a clear, structured summary that captures the main points and key insights.
        """)
        
        user_prompt = UserRolePrompt("""
        Please summarize the following YouTube video content:
        
        {content}
        
        Provide a structured summary with main points and key insights.
        """)
        
        messages = [
            system_prompt.create_message(),
            user_prompt.create_message(content=combined_text)
        ]
        
        return self.llm.run(messages)
    
    def list_indexed_videos(self) -> List[Dict]:
        """Get list of all indexed videos with metadata"""
        return list(self.video_metadata.values())
    
    def get_video_stats(self, video_id: str) -> Dict:
        """Get statistics about an indexed video"""
        if video_id not in self.video_metadata:
            return {"error": "Video not found"}
        
        metadata = self.video_metadata[video_id]
        chunk_count = len([m for m in getattr(self.vector_db, 'metadata', {}).values() 
                          if m.get('video_id') == video_id])
        
        return {
            **metadata,
            "indexed_chunks": chunk_count,
            "average_chunk_duration": metadata['total_duration'] / chunk_count if chunk_count > 0 else 0
        }


In [20]:
#  Using the RAG system with the existing video data

# Initialize the RAG system
youtube_rag = YouTubeTranscriptRAG()

# Using Patrick Winston's "How to Speak" lecture
video_url = "https://www.youtube.com/watch?v=Unzc731iCUY" 

# Index the video (this will create embeddings and make it searchable)
video_id = await youtube_rag.index_video(video_url, chunk_size=60)

print(f"Successfully indexed video: {video_id}")
print(f"Video stats: {youtube_rag.get_video_stats(video_id)}")




Fetching transcript for video ID: Unzc731iCUY
Successfully indexed video Unzc731iCUY with 61 chunks
Successfully indexed video: Unzc731iCUY
Video stats: {'url': 'https://www.youtube.com/watch?v=Unzc731iCUY', 'video_id': 'Unzc731iCUY', 'language': 'en', 'total_segments': 1166, 'total_duration': 3789.3399999999997, 'indexed_chunks': 61, 'average_chunk_duration': 62.120327868852456}


In [21]:
# 1. Search for specific content
print("\n1. Searching for 'speaking techniques':")
search_results = youtube_rag.search_transcript("speaking techniques", k=3, video_id=video_id)
for i, (text, score, metadata) in enumerate(search_results, 1):
    timestamp = f"{int(metadata['start_time']//60):02d}:{int(metadata['start_time']%60):02d}"
    print(f"  Result {i} (Score: {score:.3f}, Time: {timestamp}):")
    print(f"  {text[:100]}...")
    print()


1. Searching for 'speaking techniques':
  Result 1 (Score: 0.536, Time: 09:23):
  It can't be too obvious
because then people will be embarrassed to say
it, but the answers can't be ...

  Result 2 (Score: 0.532, Time: 02:03):
  And I was a better skier because
I had the K, and I had the P, and all she had
was the T. So you can...

  Result 3 (Score: 0.461, Time: 05:13):
  that they didn't know at
the beginning of the hour. It's an empowerment promise. It's the reason for...



In [22]:
# 2. Ask a question about the video content
print("2. Asking: 'What are the main speaking tips mentioned?'")
question_result = youtube_rag.ask_question("What are the main speaking tips mentioned?", video_id=video_id)
print(f"Answer: {question_result['answer']}")
print(f"Sources used: {question_result['context_count']} chunks")
print()


2. Asking: 'What are the main speaking tips mentioned?'
Answer: The main speaking tips mentioned in the video are as follows:

1. **Build a Personal Repertoire of Techniques (09:23)**  
   Observe effective speakers you admire and analyze why they are successful. Use this understanding to develop your own personal style by building up a repertoire (or "armamentarium") of speaking techniques and heuristics. The speaker emphasizes the importance of having a variety of ideas and methods that you can draw upon when presenting.

2. **Have an Empowerment Promise to Start Your Talk (04:11, 05:13)**  
   Instead of starting with a joke—which often falls flat because the audience is still settling in—you should begin your talk by making an empowerment promise. This means clearly telling the audience what new knowledge or benefits they will gain by the end of your presentation. For example, "At the end of this 60 minutes, you will know things about speaking you don’t know now, and something amon

In [23]:
#  Ask more questions about the video content
print("3. Asking: 'What is the main topic of what happens between minutes 10 and 20 in the video?'")
question_result = youtube_rag.ask_question("What is the main topic of what happens between minutes 10 and 20 in the video?", video_id=video_id)
print(f"Answer: {question_result['answer']}")
print(f"Sources used: {question_result['context_count']} chunks")
print()



3. Asking: 'What is the main topic of what happens between minutes 10 and 20 in the video?'
Answer: The video transcript provided does not contain information specifically covering the content or main topic occurring between minutes 10 and 20. The closest timestamps available in the context are around 5:13, 8:22, 35:22, 38:27, and 39:29, which address topics such as how to start a talk with an "empowerment promise," techniques for engaging the audience (like asking questions and pausing for answers), reviewing tools, and expressing passion in presentations. However, there is no detailed content or description from the 10 to 20-minute segment. Therefore, based on the given context, it is not possible to determine the main topic of the video during minutes 10 to 20.
Sources used: 5 chunks



In [24]:
print("4. Asking: 'Given that I should not start with humor, where in my speeches should I try and get people to laugh?")
question_result = youtube_rag.ask_question("Given that I should not start with humor, where in my speeches should I try and get people to laugh?", video_id=video_id)
print(f"Answer: {question_result['answer']}")
print(f"Sources used: {question_result['context_count']} chunks")
print()



4. Asking: 'Given that I should not start with humor, where in my speeches should I try and get people to laugh?
Answer: Based on the content from the video transcript, you should avoid starting your speech with a joke because at the beginning, people are still settling in—they might be putting their laptops away and adjusting to your vocal style, so they are not ready for humor (Source 1 - 04:11).

Instead, an effective place to incorporate humor is at the end of your talk. According to the speaker's conversation with Doug Lenat, a fantastic speaker, the secret to his success is that he always finishes with a joke. Ending with humor leaves the audience with the impression that they've had fun throughout the presentation (Source 2 - 57:04).

So, while starting with humor is generally not recommended, concluding your talk with a joke is a good strategy to create a positive, memorable ending and engagement with your audience.
Sources used: 5 chunks



In [25]:
print("5. Asking: 'How do I get people to stay engaged throughout the entire speech? Give examples of differences I should incorprate depending on the time length, and the different mediums used while presenting.")
question_result = youtube_rag.ask_question("How do I get people to stay engaged throughout the entire speech? Give examples of differences I should incorprate depending on the time length, and the different mediums used while presenting.", video_id=video_id)
print(f"Answer: {question_result['answer']}")
print(f"Sources used: {question_result['context_count']} chunks")
print()



5. Asking: 'How do I get people to stay engaged throughout the entire speech? Give examples of differences I should incorprate depending on the time length, and the different mediums used while presenting.
Answer: To keep people engaged throughout your entire speech, several strategies and considerations are advised based on the video content:

1. **Start Strong with an Empowerment Promise (Source 5 - 04:11):**  
   Instead of beginning with a joke, which often falls flat because the audience is still settling in, start by telling your audience what they will know or gain by the end of the talk. This "empowerment promise" sets expectations and motivates listeners to stay focused.

2. **Balance Content Difficulty (Source 1 - 09:23):**  
   Your content should neither be too obvious nor too hard. If the material is too obvious, people may be embarrassed to respond or engage; if it's too hard, they may have nothing to say or feel lost. Aim for that sweet spot to maintain engagement and en

In [26]:
# 3. Get a summary of the video
print("6. Getting video summary:")
summary = youtube_rag.get_video_summary(video_id, max_chunks=5)
print(f"Summary: {summary}")
print()



6. Getting video summary:
Summary: Summary of Patrick Winston's Talk on Communication and Public Speaking:

1. Importance of Communication Skills
   - Success in life is largely determined by the ability to speak, write, and the quality of ideas (in that order).
   - Students should be equipped with communication skills before entering life, similar to soldiers being armed before battle.

2. Formula for Quality Communication
   - Quality communication depends on:
     - Knowledge (K)
     - Practice (P)
     - Talent (T)
   - Talent plays a small role; knowledge and practice are far more significant.
   - Example: Patrick compares his skiing ability to Olympic gymnast Mary Lou Retton’s novice skiing to illustrate how knowledge and practice can outweigh inherent talent.

3. Objective and Promise of the Talk
   - The talk aims to provide practical speaking techniques and heuristics.
   - Attendees may find at least one technique that could help secure a job or opportunity.
   - The learn

In [27]:
# 4. List indexed videos
print("4. Indexed videos:")
videos = youtube_rag.list_indexed_videos()
for video in videos:
    print(f"  - {video['video_id']}: {video['total_duration']:.1f}s, {video['total_segments']} segments")

4. Indexed videos:
  - Unzc731iCUY: 3789.3s, 1166 segments


In [28]:
# User inputted youtube url
video_url = input("Enter a video URL: ")

# Index the video (this will create embeddings and make it searchable)
video_id = await youtube_rag.index_video(video_url, chunk_size=60)

# 3. Get a summary of the video
print("Getting video summary:")
summary = youtube_rag.get_video_summary(video_id, max_chunks=5)
print(f"Summary: {summary}")
print()




Enter a video URL:  https://www.youtube.com/watch?v=vo4pMVb0R6M&ab_channel=CrashCourse


Fetching transcript for video ID: vo4pMVb0R6M
Successfully indexed video vo4pMVb0R6M with 11 chunks
Getting video summary:
Summary: **Summary of YouTube Video Content on Psychology and Its History**

---

### Introduction to Psychology and the Mind
- The human mind is described as the most complicated known piece of the universe, governing consciousness and behavior.
- Psychology is the scientific study of behavior and mental processes.
- Despite the brain’s complexity, humans persistently attempt to understand it.
- The term "psychology" originates from Latin meaning "study of the soul," evolving considerably over centuries.

### Historical Background
- Early ideas about the mind date back to Aristotle, who mistakenly placed consciousness in the heart.
- Around 2000 years ago, the Chinese implemented psychological exams for public officials.
- In the 9th century, Persian doctor Muhammad ibn Zakariya al-Rhazes described mental illness and established early psychiatric care.

### Centra

In [29]:
# have the user ask a question about the video
user_inputted_question = input("Ask a question: ")

question_result = youtube_rag.ask_question(user_inputted_question, video_id=video_id)
print(f"Answer: {question_result['answer']}")
print(f"Sources used: {question_result['context_count']} chunks")
print()

Ask a question:  How does psychology differ from philosophy?


Answer: Based on the video content, psychology differs from philosophy primarily in its approach and methods. While philosophy has long pondered questions about the mind, soul, and human nature (as mentioned with Aristotle and the early use of the term "psychology" around the sixteenth century [Source 2 - 00:00]), psychology emerged as a formal science in the mid-1800s that focuses on the scientific study of behavior and mental processes.

Psychology seeks to understand how the brain works, how it can break or be healed, and why we behave the way we do, employing empirical methods including observation and experimentation [Source 4 - 02:01]. For example, Wilhelm Wundt established the first psychology laboratory in 1879, aiming to study the structures of consciousness through introspection in a systematic way inspired by approaches in chemistry and physics [Source 1 - 03:02]. This scientific, experimental approach contrasts with philosophy's more speculative and conceptual methods.

Mor

# Advanced Usage Examples

## ADVANCED RAG USAGE PATTERNS

### 1. Multiple Video Support:
- Index multiple videos: `await youtube_rag.index_video(url1)`, `await youtube_rag.index_video(url2)`
- Search across all videos: `youtube_rag.search_transcript('query')`
- Search specific video: `youtube_rag.search_transcript('query', video_id='specific_id')`

### 2. Custom Chunking Strategies:
- Short videos: `chunk_size=30` (30 seconds)
- Educational content: `chunk_size=120` (2 minutes)
- Dense technical content: `chunk_size=60` (1 minute)

### 3. Advanced Search Options:
- Adjust k parameter for more/fewer results
- Filter by video_id for targeted search
- Access similarity scores for relevance ranking

### 4. Integration with Existing RAG Pipeline:
- Use the same VectorDatabase class
- Compatible with existing embedding models
- Can combine with text documents

### 5. Rich Metadata Support:
- Automatic timestamp extraction
- Video duration and segment tracking
- Source attribution for answers

### 6. Combining with Existing RAG:

```python
# You can use the same vector database for both text and video content
combined_rag = RetrievalAugmentedQAPipeline(
    vector_db_retriever=youtube_rag.vector_db,  # Same vector DB
    llm=chat_openai
)

# Search across both text documents and video transcripts
result = combined_rag.run_pipeline("What are the key insights about communication?")
```

---

## KEY IMPROVEMENTS OVER ORIGINAL CODE:

✅ Semantic search instead of simple text matching  
✅ Timestamped results with video context  
✅ Question-answering with source attribution  
✅ Video summarization capabilities  
✅ Multiple video support  
✅ Integration with existing RAG infrastructure  
✅ Metadata preservation and retrieval  
✅ Async processing for better performance
