# Your First RAG Application

In this notebook, we'll walk you through each of the components that are involved in a simple RAG application.

We won't be leveraging any fancy tools, just the OpenAI Python SDK, Numpy, and some classic Python.

> NOTE: This was done with Python 3.11.4.

> NOTE: There might be [compatibility issues](https://github.com/wandb/wandb/issues/7683) if you're on NVIDIA driver >552.44 As an interim solution - you can rollback your drivers to the 552.44.

## Table of Contents:

- Task 1: Imports and Utilities
- Task 2: Documents
- Task 3: Embeddings and Vectors
- Task 4: Prompts
- Task 5: Retrieval Augmented Generation
  - 🚧 Activity #1: Augment RAG

Let's look at a rather complicated looking visual representation of a basic RAG application.

<img src="https://i.imgur.com/vD8b016.png" />

## Task 1: Imports and Utility

We're just doing some imports and enabling `async` to work within the Jupyter environment here, nothing too crazy!

In [53]:
from aimakerspace.text_utils import TextFileLoader, CharacterTextSplitter
from aimakerspace.vectordatabase import VectorDatabase
import asyncio

In [54]:
import nest_asyncio
nest_asyncio.apply()

## Task 2: Documents

We'll be concerning ourselves with this part of the flow in the following section:

<img src="https://i.imgur.com/jTm9gjk.png" />

### Loading Source Documents

So, first things first, we need some documents to work with.

While we could work directly with the `.txt` files (or whatever file-types you wanted to extend this to) we can instead do some batch processing of those documents at the beginning in order to store them in a more machine compatible format.

In this case, we're going to parse our text file into a single document in memory.

Let's look at the relevant bits of the `TextFileLoader` class:

```python
def load_file(self):
        with open(self.path, "r", encoding=self.encoding) as f:
            self.documents.append(f.read())
```

We're simply loading the document using the built in `open` method, and storing that output in our `self.documents` list.

> NOTE: We're using blogs from PMarca (Marc Andreessen) as our sample data. This data is largely irrelevant as we want to focus on the mechanisms of RAG, which includes out data's shape and quality - but not specifically what the contents of the data are. 


In [55]:
text_loader = TextFileLoader("data/PMarcaBlogs.txt")
documents = text_loader.load_documents()
len(documents)

1

In [56]:
print(documents[0][:100])


The Pmarca Blog Archives
(select posts from 2007-2009)
Marc Andreessen
copyright: Andreessen Horow


### Splitting Text Into Chunks

As we can see, there is one massive document.

We'll want to chunk the document into smaller parts so it's easier to pass the most relevant snippets to the LLM.

There is no fixed way to split/chunk documents - and you'll need to rely on some intuition as well as knowing your data *very* well in order to build the most robust system.

For this toy example, we'll just split blindly on length.

>There's an opportunity to clear up some terminology here, for this course we will be stick to the following:
>
>- "source documents" : The `.txt`, `.pdf`, `.html`, ..., files that make up the files and information we start with in its raw format
>- "document(s)" : single (or more) text object(s)
>- "corpus" : the combination of all of our documents

As you can imagine (though it's not specifically true in this toy example) the idea of splitting documents is to break them into managable sized chunks that retain the most relevant local context.

In [57]:
text_splitter = CharacterTextSplitter()
split_documents = text_splitter.split_texts(documents)
len(split_documents)

373

Let's take a look at some of the documents we've managed to split.

In [58]:
split_documents[0:1]

['\ufeff\nThe Pmarca Blog Archives\n(select posts from 2007-2009)\nMarc Andreessen\ncopyright: Andreessen Horowitz\ncover design: Jessica Hagy\nproduced using: Pressbooks\nContents\nTHE PMARCA GUIDE TO STARTUPS\nPart 1: Why not to do a startup 2\nPart 2: When the VCs say "no" 10\nPart 3: "But I don\'t know any VCs!" 18\nPart 4: The only thing that matters 25\nPart 5: The Moby Dick theory of big companies 33\nPart 6: How much funding is too little? Too much? 41\nPart 7: Why a startup\'s initial business plan doesn\'t\nmatter that much\n49\nTHE PMARCA GUIDE TO HIRING\nPart 8: Hiring, managing, promoting, and Dring\nexecutives\n54\nPart 9: How to hire a professional CEO 68\nHow to hire the best people you\'ve ever worked\nwith\n69\nTHE PMARCA GUIDE TO BIG COMPANIES\nPart 1: Turnaround! 82\nPart 2: Retaining great people 86\nTHE PMARCA GUIDE TO CAREER, PRODUCTIVITY,\nAND SOME OTHER THINGS\nIntroduction 97\nPart 1: Opportunity 99\nPart 2: Skills and education 107\nPart 3: Where to go and wh

## Task 3: Embeddings and Vectors

Next, we have to convert our corpus into a "machine readable" format as we explored in the Embedding Primer notebook.

Today, we're going to talk about the actual process of creating, and then storing, these embeddings, and how we can leverage that to intelligently add context to our queries.

### OpenAI API Key

In order to access OpenAI's APIs, we'll need to provide our OpenAI API Key!

You can work through the folder "OpenAI API Key Setup" for more information on this process if you don't already have an API Key!

In [59]:
import os
import openai
from getpass import getpass

openai.api_key = getpass("OpenAI API Key: ")
os.environ["OPENAI_API_KEY"] = openai.api_key

### Vector Database

Let's set up our vector database to hold all our documents and their embeddings!

While this is all baked into 1 call - we can look at some of the code that powers this process to get a better understanding:

Let's look at our `VectorDatabase().__init__()`:

```python
def __init__(self, embedding_model: EmbeddingModel = None):
        self.vectors = defaultdict(np.array)
        self.embedding_model = embedding_model or EmbeddingModel()
```

As you can see - our vectors are merely stored as a dictionary of `np.array` objects.

Secondly, our `VectorDatabase()` has a default `EmbeddingModel()` which is a wrapper for OpenAI's `text-embedding-3-small` model.

> **Quick Info About `text-embedding-3-small`**:
> - It has a context window of **8191** tokens
> - It returns vectors with dimension **1536**

#### ❓Question #1:

The default embedding dimension of `text-embedding-3-small` is 1536, as noted above. 

1. Is there any way to modify this dimension?
2. What technique does OpenAI use to achieve this?

> NOTE: Check out this [API documentation](https://platform.openai.com/docs/api-reference/embeddings/create) for the answer to question #1, and [this documentation](https://platform.openai.com/docs/guides/embeddings/use-cases) for an answer to question #2!

#### Answer #1

1. Yes, by using the optional *dimensions* parameter when calling https://api.openai.com/v1/embeddings

2. The Matryoshka Representation Learning technique that the v3 embedding models were trained on are what enables developers to shorten embeddings and trade-off performance and embeddings usage costs.


---

We can call the `async_get_embeddings` method of our `EmbeddingModel()` on a list of `str` and receive a list of `float` back!

```python
async def async_get_embeddings(self, list_of_text: List[str]) -> List[List[float]]:
        return await aget_embeddings(
            list_of_text=list_of_text, engine=self.embeddings_model_name
        )
```

We cast those to `np.array` when we build our `VectorDatabase()`:

```python
async def abuild_from_list(self, list_of_text: List[str]) -> "VectorDatabase":
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        for text, embedding in zip(list_of_text, embeddings):
            self.insert(text, np.array(embedding))
        return self
```

And that's all we need to do!

In [60]:
# Clean the documents to remove problematic Unicode characters
def clean_text_for_embedding(text):
    """Clean text to avoid Unicode encoding issues"""
    import re
    # Remove emojis and other non-ASCII characters
    cleaned = re.sub(r'[^\x00-\x7F]+', ' ', text)
    # Remove extra whitespace
    cleaned = ' '.join(cleaned.split())
    return cleaned

# Clean all split documents
cleaned_split_documents = [clean_text_for_embedding(doc) for doc in split_documents]

print(f"Original documents: {len(split_documents)}")
print(f"Cleaned documents: {len(cleaned_split_documents)}")
print(f"Sample cleaned text: {cleaned_split_documents[0][:100]}...")

vector_db = VectorDatabase()
vector_db = asyncio.run(vector_db.abuild_from_list(cleaned_split_documents))

Original documents: 373
Cleaned documents: 373
Sample cleaned text: The Pmarca Blog Archives (select posts from 2007-2009) Marc Andreessen copyright: Andreessen Horowit...


#### ❓Question #2:

What are the benefits of using an `async` approach to collecting our embeddings?

> NOTE: Determining the core difference between `async` and `sync` will be useful! If you get stuck - ask ChatGPT!

#### Answer #2

1. To put it simply, synchronous operations are performed sequentially whereas asynchronous operations can occur concurrently, allowing other processes to take priority if some are awaiting continuation


---

So, to review what we've done so far in natural language:

1. We load source documents
2. We split those source documents into smaller chunks (documents)
3. We send each of those documents to the `text-embedding-3-small` OpenAI API endpoint
4. We store each of the text representations with the vector representations as keys/values in a dictionary

### Semantic Similarity

The next step is to be able to query our `VectorDatabase()` with a `str` and have it return to us vectors and text that is most relevant from our corpus.

We're going to use the following process to achieve this in our toy example:

1. We need to embed our query with the same `EmbeddingModel()` as we used to construct our `VectorDatabase()`
2. We loop through every vector in our `VectorDatabase()` and use a distance measure to compare how related they are
3. We return a list of the top `k` closest vectors, with their text representations

There's some very heavy optimization that can be done at each of these steps - but let's just focus on the basic pattern in this notebook.

> We are using [cosine similarity](https://www.engati.com/glossary/cosine-similarity) as a distance metric in this example - but there are many many distance metrics you could use - like [these](https://flavien-vidal.medium.com/similarity-distances-for-natural-language-processing-16f63cd5ba55)

> We are using a rather inefficient way of calculating relative distance between the query vector and all other vectors - there are more advanced approaches that are much more efficient, like [ANN](https://towardsdatascience.com/comprehensive-guide-to-approximate-nearest-neighbors-algorithms-8b94f057d6b6)

In [61]:
vector_db.search_by_text("What is the Michael Eisner Memorial Weak Executive Problem?", k=3)

[('ordingly. Seventh, when hiring the executive to run your former specialty, be careful you don t hire someone weak on purpose. This sounds silly, but you wouldn t believe how oaen it happens. The CEO who used to be a product manager who has a weak product management executive. The CEO who used to be in sales who has a weak sales executive. The CEO who used to be in marketing who has a weak marketing executive. I call this the Michael Eisner Memorial Weak Executive Problem aaer the CEO of Disney who had previously been a brilliant TV network executive. When he bought ABC at Disney, it promptly fell to fourth place. His response? If I had an extra two days a week, I could turn around ABC myself. Well, guess what, he didn t have an extra two days a week. A CEO or a startup founder oaen has a hard time letting go of the function that brought him to the party. The result: you hire someone weak into the executive role for that function so that you can continue to be the man cons',
  np.flo

## Task 4: Prompts

In the following section, we'll be looking at the role of prompts - and how they help us to guide our application in the right direction.

In this notebook, we're going to rely on the idea of "zero-shot in-context learning".

This is a lot of words to say: "We will ask it to perform our desired task in the prompt, and provide no examples."

### XYZRolePrompt

Before we do that, let's stop and think a bit about how OpenAI's chat models work.

We know they have roles - as is indicated in the following API [documentation](https://platform.openai.com/docs/api-reference/chat/create#chat/create-messages)

There are three roles, and they function as follows (taken directly from [OpenAI](https://platform.openai.com/docs/guides/gpt/chat-completions-api)):

- `{"role" : "system"}` : The system message helps set the behavior of the assistant. For example, you can modify the personality of the assistant or provide specific instructions about how it should behave throughout the conversation. However note that the system message is optional and the model’s behavior without a system message is likely to be similar to using a generic message such as "You are a helpful assistant."
- `{"role" : "user"}` : The user messages provide requests or comments for the assistant to respond to.
- `{"role" : "assistant"}` : Assistant messages store previous assistant responses, but can also be written by you to give examples of desired behavior.

The main idea is this:

1. You start with a system message that outlines how the LLM should respond, what kind of behaviours you can expect from it, and more
2. Then, you can provide a few examples in the form of "assistant"/"user" pairs
3. Then, you prompt the model with the true "user" message.

In this example, we'll be forgoing the 2nd step for simplicities sake.

#### Utility Functions

You'll notice that we're using some utility functions from the `aimakerspace` module - let's take a peek at these and see what they're doing!

##### XYZRolePrompt

Here we have our `system`, `user`, and `assistant` role prompts.

Let's take a peek at what they look like:

```python
class BasePrompt:
    def __init__(self, prompt):
        """
        Initializes the BasePrompt object with a prompt template.

        :param prompt: A string that can contain placeholders within curly braces
        """
        self.prompt = prompt
        self._pattern = re.compile(r"\{([^}]+)\}")

    def format_prompt(self, **kwargs):
        """
        Formats the prompt string using the keyword arguments provided.

        :param kwargs: The values to substitute into the prompt string
        :return: The formatted prompt string
        """
        matches = self._pattern.findall(self.prompt)
        return self.prompt.format(**{match: kwargs.get(match, "") for match in matches})

    def get_input_variables(self):
        """
        Gets the list of input variable names from the prompt string.

        :return: List of input variable names
        """
        return self._pattern.findall(self.prompt)
```

Then we have our `RolePrompt` which laser focuses us on the role pattern found in most API endpoints for LLMs.

```python
class RolePrompt(BasePrompt):
    def __init__(self, prompt, role: str):
        """
        Initializes the RolePrompt object with a prompt template and a role.

        :param prompt: A string that can contain placeholders within curly braces
        :param role: The role for the message ('system', 'user', or 'assistant')
        """
        super().__init__(prompt)
        self.role = role

    def create_message(self, **kwargs):
        """
        Creates a message dictionary with a role and a formatted message.

        :param kwargs: The values to substitute into the prompt string
        :return: Dictionary containing the role and the formatted message
        """
        return {"role": self.role, "content": self.format_prompt(**kwargs)}
```

We'll look at how the `SystemRolePrompt` is constructed to get a better idea of how that extension works:

```python
class SystemRolePrompt(RolePrompt):
    def __init__(self, prompt: str):
        super().__init__(prompt, "system")
```

That pattern is repeated for our `UserRolePrompt` and our `AssistantRolePrompt` as well.

##### ChatOpenAI

Next we have our model, which is converted to a format analagous to libraries like LangChain and LlamaIndex.

Let's take a peek at how that is constructed:

```python
class ChatOpenAI:
    def __init__(self, model_name: str = "gpt-4o-mini"):
        self.model_name = model_name
        self.openai_api_key = os.getenv("OPENAI_API_KEY")
        if self.openai_api_key is None:
            raise ValueError("OPENAI_API_KEY is not set")

    def run(self, messages, text_only: bool = True):
        if not isinstance(messages, list):
            raise ValueError("messages must be a list")

        openai.api_key = self.openai_api_key
        response = openai.ChatCompletion.create(
            model=self.model_name, messages=messages
        )

        if text_only:
            return response.choices[0].message.content

        return response
```

#### ❓ Question #3:

When calling the OpenAI API - are there any ways we can achieve more reproducible outputs?

> NOTE: Check out [this section](https://platform.openai.com/docs/guides/text-generation/) of the OpenAI documentation for the answer!

#### Answer #3

1. A combination of the below would help with reproducing similar outputs:
- pinning to specific model snapshots, as different snapshots of models can produce different results
- consistent instruction helps to produce desired responses
- consider using resuable prompts to keep format and tone consistent
- few-shot learning can help make responses more reliable by helping the model know what sort of response you're expecting
- if available, opting for a lower temperature would help keep responses less random


---

### Creating and Prompting OpenAI's `gpt-4o-mini`!

Let's tie all these together and use it to prompt `gpt-4o-mini`!

In [62]:
from aimakerspace.openai_utils.prompts import (
    UserRolePrompt,
    SystemRolePrompt,
    AssistantRolePrompt,
)

from aimakerspace.openai_utils.chatmodel import ChatOpenAI

chat_openai = ChatOpenAI()
user_prompt_template = "{content}"
user_role_prompt = UserRolePrompt(user_prompt_template)
system_prompt_template = (
    "You are an expert in {expertise}, you always answer in a kind way."
)
system_role_prompt = SystemRolePrompt(system_prompt_template)

messages = [
    system_role_prompt.create_message(expertise="Python"),
    user_role_prompt.create_message(
        content="What is the best way to write a loop?"
    ),
]

response = chat_openai.run(messages)

In [63]:
print(response)

The best way to write a loop in Python largely depends on the specific task you're trying to accomplish and the structure of your data. There are different types of loops you can use in Python: `for` loops and `while` loops. Here are some guidelines and examples for each type:

### For Loop
A `for` loop is generally used when you want to iterate over a sequence (like a list, tuple, or string).

**Example: Looping through a list**
```python
fruits = ['apple', 'banana', 'cherry']
for fruit in fruits:
    print(fruit)
```

### While Loop
A `while` loop is useful when you want to repeat an action as long as a condition is true.

**Example: Using a while loop**
```python
count = 0
while count < 5:
    print(count)
    count += 1
```

### Tips for Writing Loops
1. **Keep it Simple**: Aim to make your loops as simple and readable as possible.
2. **Use Built-in Functions**: Consider using built-in functions like `enumerate()` or `zip()` when dealing with loops to make your code more elegant.
 

## Task 5: Retrieval Augmented Generation

Now we can create a RAG prompt - which will help our system behave in a way that makes sense!

There is much you could do here, many tweaks and improvements to be made!

In [64]:
RAG_SYSTEM_TEMPLATE = """You are a knowledgeable assistant that answers questions based strictly on provided context.

Instructions:
- Only answer questions using information from the provided context
- If the context doesn't contain relevant information, respond with "I don't know"
- Be accurate and cite specific parts of the context when possible
- Keep responses {response_style} and {response_length}
- Only use the provided context. Do not use external knowledge.
- Only provide answers when you are confident the context supports your response."""

RAG_USER_TEMPLATE = """Context Information:
{context}

Number of relevant sources found: {context_count}
{similarity_scores}

Question: {user_query}

Please provide your answer based solely on the context above."""

rag_system_prompt = SystemRolePrompt(
    RAG_SYSTEM_TEMPLATE,
    strict=True,
    defaults={
        "response_style": "concise",
        "response_length": "brief"
    }
)

rag_user_prompt = UserRolePrompt(
    RAG_USER_TEMPLATE,
    strict=True,
    defaults={
        "context_count": "",
        "similarity_scores": ""
    }
)

Now we can create our pipeline!

In [65]:
class RetrievalAugmentedQAPipeline:
    def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase, 
                 response_style: str = "detailed", include_scores: bool = False) -> None:
        self.llm = llm
        self.vector_db_retriever = vector_db_retriever
        self.response_style = response_style
        self.include_scores = include_scores

    def run_pipeline(self, user_query: str, k: int = 4, **system_kwargs) -> dict:
        # Retrieve relevant contexts
        context_list = self.vector_db_retriever.search_by_text(user_query, k=k)
        
        context_prompt = ""
        similarity_scores = []
        
        for i, (context, score) in enumerate(context_list, 1):
            context_prompt += f"[Source {i}]: {context}\n\n"
            similarity_scores.append(f"Source {i}: {score:.3f}")
        
        # Create system message with parameters
        system_params = {
            "response_style": self.response_style,
            "response_length": system_kwargs.get("response_length", "detailed")
        }
        
        formatted_system_prompt = rag_system_prompt.create_message(**system_params)
        
        user_params = {
            "user_query": user_query,
            "context": context_prompt.strip(),
            "context_count": len(context_list),
            "similarity_scores": f"Relevance scores: {', '.join(similarity_scores)}" if self.include_scores else ""
        }
        
        formatted_user_prompt = rag_user_prompt.create_message(**user_params)

        return {
            "response": self.llm.run([formatted_system_prompt, formatted_user_prompt]), 
            "context": context_list,
            "context_count": len(context_list),
            "similarity_scores": similarity_scores if self.include_scores else None,
            "prompts_used": {
                "system": formatted_system_prompt,
                "user": formatted_user_prompt
            }
        }

In [66]:
rag_pipeline = RetrievalAugmentedQAPipeline(
    vector_db_retriever=vector_db,
    llm=chat_openai,
    response_style="detailed",
    include_scores=True
)

result = rag_pipeline.run_pipeline(
    "What is the 'Michael Eisner Memorial Weak Executive Problem'?",
    k=3,
    response_length="comprehensive", 
    include_warnings=True,
    confidence_required=True
)

print(f"Response: {result['response']}")
print(f"\nContext Count: {result['context_count']}")
print(f"Similarity Scores: {result['similarity_scores']}")


Response: The 'Michael Eisner Memorial Weak Executive Problem' refers to the tendency of CEOs or startup founders to hire executives who are weak in the core functional areas that the CEO is familiar with or has expertise in. This phenomenon occurs when a CEO, who might have a background as a strong executive in a specific area, hires a weak individual to fill that role, allowing the CEO to maintain control and continue to influence that function. The example given is of Michael Eisner, the former CEO of Disney, who, despite being a brilliant TV network executive, oversaw the decline of ABC after acquiring it, ultimately expressing dissatisfaction about not having enough time to manage it himself. This reflects the challenge leaders often face in relinquishing control and the potential negative impact of hiring weaker executives in their areas of expertise (Source 1).

Context Count: 3
Similarity Scores: ['Source 1: 0.691', 'Source 2: 0.510', 'Source 3: 0.491']


#### ❓ Question #4:

What prompting strategies could you use to make the LLM have a more thoughtful, detailed response?

What is that strategy called?

> NOTE: You can look through ["Accessing GPT-3.5-turbo Like a Developer"](https://colab.research.google.com/drive/1mOzbgf4a2SP5qQj33ZxTz2a01-5eXqk2?usp=sharing) for an answer to this question if you get stuck!

#### Answer #4

- have the LLM take the "role" of the type of person who would ideally provide the type of answer you're looking for, such a professor, or a field expert, etc.
- CoT reasoning and prompting the model to think through its answers


---

### 🏗️ Activity #1:

Enhance your RAG application in some way! 

Suggestions are: 

- Allow it to work with PDF files
- Implement a new distance metric
- Add metadata support to the vector database

While these are suggestions, you should feel free to make whatever augmentations you desire! 

> NOTE: These additions might require you to work within the `aimakerspace` library - that's expected!

> NOTE: If you're not sure where to start - ask Cursor (CMD/CTRL+L) to guide you through the changes!

In [67]:
# Enhanced RAG Comparison: Testing Three Different Chunking Strategies
# Based on the comprehensive comparison functionality from the handler.py
# Updated for modern OpenAI API compatibility and Unicode handling

import numpy as np
from collections import defaultdict

# First, let's create a larger chunk version of our vector database for comparison
class LargeChunkVectorDB:
    """Vector database with larger chunks for comparison"""
    
    def __init__(self, embedding_model, chunk_size=3000, chunk_overlap=200):
        self.vectors = defaultdict(np.array)
        self.embedding_model = embedding_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
    async def build_from_documents(self, documents):
        """Build vector database with large chunks"""
        large_splitter = CharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap
        )
        large_chunks = large_splitter.split_texts(documents)
        
        # Clean the large chunks to avoid Unicode issues
        cleaned_large_chunks = [clean_text_for_embedding(chunk) for chunk in large_chunks]
        
        # Get embeddings for large chunks using the embedding model
        embeddings = await self.embedding_model.async_get_embeddings(cleaned_large_chunks)
        
        # Store in vectors dictionary (using cleaned text)
        for text, embedding in zip(cleaned_large_chunks, embeddings):
            self.vectors[text] = np.array(embedding)
            
        return cleaned_large_chunks
    
    def search_by_text(self, query_text, k=5):
        """Search for similar chunks - SYNCHRONOUS version for RAG pipeline compatibility"""
        if not self.vectors:
            return []
            
        # For synchronous compatibility, we'll use a simpler approach
        # Note: This is a limitation - ideally we'd want async throughout
        try:
            # Try to use the embedding model synchronously if possible
            import openai
            import os
            
            # Clean the query text too
            clean_query = clean_text_for_embedding(query_text)
            
            # Get query embedding synchronously 
            client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
            response = client.embeddings.create(
                input=[clean_query],
                model="text-embedding-3-small"
            )
            query_embedding = np.array(response.data[0].embedding)
            
            # Calculate similarities
            similarities = []
            for text, vector in self.vectors.items():
                similarity = np.dot(query_embedding, vector)
                similarities.append((text, similarity))
            
            # Sort by similarity and return top k
            similarities.sort(key=lambda x: x[1], reverse=True)
            return similarities[:k]
            
        except Exception as e:
            print(f"Error in synchronous search: {e}")
            return []
    
    async def async_search_by_text(self, query_text, k=5):
        """Async version of search for when we can use it"""
        if not self.vectors:
            return []
            
        # Clean the query text
        clean_query = clean_text_for_embedding(query_text)
        
        # Get query embedding using the embedding model (async)
        query_embedding = np.array(await self.embedding_model.async_get_embedding(clean_query))
        
        # Calculate similarities
        similarities = []
        for text, vector in self.vectors.items():
            similarity = np.dot(query_embedding, vector)
            similarities.append((text, similarity))
        
        # Sort by similarity and return top k
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:k]


class EnhancedRAGComparison:
    """Compare three different RAG strategies"""
    
    def __init__(self, documents, embedding_model, chat_model):
        self.documents = documents
        self.embedding_model = embedding_model
        self.chat_model = chat_model
        
        # Strategy 1: Small chunks (already exists) - use cleaned documents
        self.small_vector_db = vector_db
        
        # Strategy 2: We'll create adjacent chunk functionality using cleaned documents
        self.chunk_metadata = {}
        self.build_chunk_metadata()
        
        # Strategy 3: Large chunks
        self.large_vector_db = None
        
    def build_chunk_metadata(self):
        """Build metadata for adjacent chunk functionality using cleaned documents"""
        for i, chunk in enumerate(cleaned_split_documents):
            self.chunk_metadata[i] = {
                'text': chunk,
                'index': i,
                'total_chunks': len(cleaned_split_documents)
            }
    
    async def setup_large_chunks(self):
        """Setup large chunk vector database"""
        self.large_vector_db = LargeChunkVectorDB(self.embedding_model)
        large_chunks = await self.large_vector_db.build_from_documents(self.documents)
        print(f"Created {len(large_chunks)} large chunks (vs {len(cleaned_split_documents)} small chunks)")
        
    def get_adjacent_chunks(self, center_idx, adjacent_count=1):
        """Get adjacent chunks around a center chunk"""
        start_idx = max(0, center_idx - adjacent_count)
        end_idx = min(len(cleaned_split_documents), center_idx + adjacent_count + 1)
        
        adjacent_texts = []
        for i in range(start_idx, end_idx):
            if i < len(cleaned_split_documents):
                adjacent_texts.append(cleaned_split_documents[i])
        
        return "\n\n".join(adjacent_texts)
    
    async def strategy_1_small_chunks(self, query, k=5):
        """Strategy 1: Regular small chunks"""
        results = self.small_vector_db.search_by_text(query, k=k)
        return [{
            'text': text,
            'score': float(score),
            'strategy': 'Small Chunks',
            'full_text': text,
            'original_rank': i + 1  # Track original rank for comparison
        } for i, (text, score) in enumerate(results)]
    
    async def strategy_2_adjacent_expansion(self, query, k=5, adjacent_count=1):
        """Strategy 2: Small chunks + adjacent expansion with re-embedding"""
        # First get initial results
        initial_results = self.small_vector_db.search_by_text(query, k=k)
        
        enhanced_results = []
        for i, (text, score) in enumerate(initial_results):
            # Find the chunk index
            chunk_idx = -1
            for j, chunk in enumerate(cleaned_split_documents):
                if chunk == text:
                    chunk_idx = j
                    break
            
            if chunk_idx != -1:
                # Get adjacent chunks
                expanded_text = self.get_adjacent_chunks(chunk_idx, adjacent_count)
                
                # Clean the expanded text before embedding
                clean_expanded = clean_text_for_embedding(expanded_text)
                clean_query = clean_text_for_embedding(query)
                
                # Re-embed the expanded text
                expanded_embedding = await self.embedding_model.async_get_embedding(clean_expanded)
                query_embedding = await self.embedding_model.async_get_embedding(clean_query)
                
                # Calculate new similarity
                new_score = float(np.dot(expanded_embedding, query_embedding))
                
                enhanced_results.append({
                    'text': text,  # Original chunk for comparison
                    'score': new_score,
                    'original_score': float(score),
                    'original_rank': i + 1,  # Original rank before re-scoring
                    'strategy': f'Adjacent Expansion ({adjacent_count})',
                    'full_text': clean_expanded,
                    'chunks_used': 2 * adjacent_count + 1,
                    'score_change': new_score - float(score)
                })
        
        # Re-sort by new scores and calculate rank changes
        enhanced_results.sort(key=lambda x: x['score'], reverse=True)
        
        # Add new ranks and calculate rank changes
        for i, result in enumerate(enhanced_results):
            result['new_rank'] = i + 1
            result['rank_change'] = result['original_rank'] - result['new_rank']  # Positive means improved (moved up)
            
        return enhanced_results
    
    async def strategy_3_large_chunks(self, query, k=5):
        """Strategy 3: Large chunks from the start"""
        if not self.large_vector_db:
            await self.setup_large_chunks()
            
        results = await self.large_vector_db.async_search_by_text(query, k=k)
        return [{
            'text': text[:200] + "..." if len(text) > 200 else text,  # Truncate for display
            'score': float(score),
            'strategy': 'Large Chunks',
            'full_text': text,
            'chunk_size': len(text),
            'original_rank': i + 1  # Track rank
        } for i, (text, score) in enumerate(results)]
    
    async def compare_all_strategies(self, query, k=5, adjacent_count=1, display_chars=200):
        """Compare all three strategies side by side"""
        print(f"🔍 COMPREHENSIVE RAG STRATEGY COMPARISON")
        print(f"Query: '{query}'")
        print(f"Results per strategy: {k}")
        print("=" * 100)
        
        # Get results from all strategies
        small_results = await self.strategy_1_small_chunks(query, k)
        adjacent_results = await self.strategy_2_adjacent_expansion(query, k, adjacent_count)
        large_results = await self.strategy_3_large_chunks(query, k)
        
        # Compare rank by rank
        for rank in range(k):
            print(f"\n🏆 RANK #{rank + 1} COMPARISON")
            print("-" * 50)
            
            # Small chunks
            if rank < len(small_results):
                result = small_results[rank]
                text_preview = result['text'][:display_chars] + "..." if len(result['text']) > display_chars else result['text']
                print(f"📄 SMALL CHUNKS: Score {result['score']:.4f}")
                print(f"   {text_preview}")
            else:
                print(f"📄 SMALL CHUNKS: [No result at rank {rank + 1}]")
            
            print()
            
            # Adjacent expansion
            if rank < len(adjacent_results):
                result = adjacent_results[rank]
                text_preview = result['text'][:display_chars] + "..." if len(result['text']) > display_chars else result['text']
                
                # Show rank change information
                rank_change_info = ""
                if 'rank_change' in result:
                    rank_change = result['rank_change']
                    if rank_change > 0:
                        rank_change_info = f" (was rank #{result['original_rank']}, ↑{rank_change})"
                    elif rank_change < 0:
                        rank_change_info = f" (was rank #{result['original_rank']}, ↓{abs(rank_change)})"
                    else:
                        rank_change_info = f" (rank unchanged from #{result['original_rank']})"
                
                score_change = result.get('score_change', 0)
                print(f"🔗 ADJACENT EXPANSION: Score {result['score']:.4f} (was {result['original_score']:.4f}, {score_change:+.4f}){rank_change_info}")
                print(f"   {text_preview}")
                print(f"   (Expanded to {result['chunks_used']} chunks)")
            else:
                print(f"🔗 ADJACENT EXPANSION: [No result at rank {rank + 1}]")
            
            print()
            
            # Large chunks
            if rank < len(large_results):
                result = large_results[rank]
                text_preview = result['text'][:display_chars] + "..." if len(result['text']) > display_chars else result['text']
                print(f"📚 LARGE CHUNKS: Score {result['score']:.4f}")
                print(f"   {text_preview}")
                print(f"   (Chunk size: {result.get('chunk_size', 'Unknown')} chars)")
            else:
                print(f"📚 LARGE CHUNKS: [No result at rank {rank + 1}]")
        
        # Calculate average scores
        small_avg = sum(r['score'] for r in small_results) / len(small_results) if small_results else 0
        adjacent_avg = sum(r['score'] for r in adjacent_results) / len(adjacent_results) if adjacent_results else 0
        large_avg = sum(r['score'] for r in large_results) / len(large_results) if large_results else 0
        
        print(f"\n📊 AVERAGE SCORES")
        print("-" * 40)
        print(f"Small Chunks:      {small_avg:.4f}")
        print(f"Adjacent Expanded: {adjacent_avg:.4f} ({adjacent_avg - small_avg:+.4f})")
        print(f"Large Chunks:      {large_avg:.4f} ({large_avg - small_avg:+.4f})")
        
        # Add rank change summary for adjacent expansion
        if adjacent_results and any('rank_change' in r for r in adjacent_results):
            print(f"\n📈 RANK CHANGE ANALYSIS (Adjacent Expansion)")
            print("-" * 40)
            improved = sum(1 for r in adjacent_results if r.get('rank_change', 0) > 0)
            worsened = sum(1 for r in adjacent_results if r.get('rank_change', 0) < 0)
            unchanged = sum(1 for r in adjacent_results if r.get('rank_change', 0) == 0)
            
            print(f"Rank improvements: {improved} results moved up")
            print(f"Rank degradations: {worsened} results moved down") 
            print(f"Rank unchanged:    {unchanged} results stayed same")
            
            # Average rank change
            avg_rank_change = sum(r.get('rank_change', 0) for r in adjacent_results) / len(adjacent_results)
            print(f"Average rank change: {avg_rank_change:+.2f} positions")
        
        # Determine winner
        strategies = [
            ('Small Chunks', small_avg),
            ('Adjacent Expanded', adjacent_avg), 
            ('Large Chunks', large_avg)
        ]
        best_strategy = max(strategies, key=lambda x: x[1])
        print(f"\n🏆 BEST STRATEGY: {best_strategy[0]} (avg score: {best_strategy[1]:.4f})")
        
        return {
            'small_results': small_results,
            'adjacent_results': adjacent_results,
            'large_results': large_results,
            'average_scores': {
                'small': small_avg,
                'adjacent': adjacent_avg,
                'large': large_avg
            },
            'best_strategy': best_strategy[0]
        }

# Initialize the comparison system
print("Setting up Enhanced RAG Comparison...")
comparison = EnhancedRAGComparison(documents, vector_db.embedding_model, chat_openai)

Setting up Enhanced RAG Comparison...


In [68]:
# Run the comprehensive comparison on PMarcaBlogs.txt
# This will test all three strategies against the same query

# Example usage - test different queries to see how each strategy performs
test_queries = [
    "What is the Michael Eisner Memorial Weak Executive Problem?",
    "What does Marc Andreessen think about technology and startups?",
    "How should companies approach innovation and disruption?"
]

print("🚀 Running Enhanced RAG Strategy Comparison")
print("=" * 60)

# Run comparison for the first query
query = test_queries[0]
print(f"\nTesting with query: '{query}'")

# Run the comparison
comparison_results = asyncio.run(
    comparison.compare_all_strategies(
        query=query,
        k=5,                    # Number of results per strategy
        adjacent_count=1,       # Number of adjacent chunks on each side
        display_chars=200       # Characters to display in preview
    )
)

print("\n" + "="*80)
print("🏁 COMPARISON COMPLETE!")
print(f"Best performing strategy: {comparison_results['best_strategy']}")
print("="*80)

🚀 Running Enhanced RAG Strategy Comparison

Testing with query: 'What is the Michael Eisner Memorial Weak Executive Problem?'
🔍 COMPREHENSIVE RAG STRATEGY COMPARISON
Query: 'What is the Michael Eisner Memorial Weak Executive Problem?'
Results per strategy: 5
Created 107 large chunks (vs 373 small chunks)

🏆 RANK #1 COMPARISON
--------------------------------------------------
📄 SMALL CHUNKS: Score 0.6884
   ordingly. Seventh, when hiring the executive to run your former specialty, be careful you don t hire someone weak on purpose. This sounds silly, but you wouldn t believe how oaen it happens. The CEO w...

🔗 ADJACENT EXPANSION: Score 0.5223 (was 0.5056, +0.0167) (was rank #2, ↑1)
   m. They have areas where they are truly deXcient in judgment or skill set. That s just life. Almost nobody is brilliant at everything. When hiring and when Hring executives, you must therefore focus o...
   (Expanded to 3 chunks)

📚 LARGE CHUNKS: Score 0.5447
   g executive. I call this the Michael Eisner

In [75]:
# RAG Pipeline Comparison: Test Actual Response Quality
# Run the full RAG pipeline with each chunking strategy to compare final responses

async def test_rag_pipeline_with_all_strategies():
    """Test RAG pipeline responses using all three chunking strategies"""
    
    query = "What are the 3 most valuable advice for startups to succeed? Rank them in order of importance. Format your reply to be between 75 and 100 words."
    k = 3
    
    print("🤖 RAG PIPELINE RESPONSE COMPARISON")
    print("=" * 80)
    print(f"Query: {query}")
    print(f"Retrieved chunks per method: {k}")
    print("=" * 80)
    
    # Strategy 1: Original Small Chunks
    print(f"\n📄 STRATEGY 1: SMALL CHUNKS RAG RESPONSE")
    print("-" * 60)
    
    rag_pipeline_small = RetrievalAugmentedQAPipeline(
        vector_db_retriever=vector_db,
        llm=chat_openai,
        response_style="detailed",
        include_scores=True
    )
    
    result_small = rag_pipeline_small.run_pipeline(
        query,
        k=k,
        response_length="comprehensive"
    )
    
    print(f"📝 Response:")
    print(result_small['response'])
    print(f"\n📊 Context Count: {result_small['context_count']}")
    print(f"📊 Similarity Scores: {result_small['similarity_scores']}")
    
    # Strategy 2: Adjacent Expansion (simulate by getting expanded context)
    print(f"\n\n🔗 STRATEGY 2: ADJACENT EXPANSION RAG RESPONSE")
    print("-" * 60)
    
    # Get the small chunks first
    small_results = vector_db.search_by_text(query, k=k)
    
    # Create expanded context by getting adjacent chunks
    expanded_contexts = []
    expanded_scores = []
    
    for i, (text, score) in enumerate(small_results):
        # Find chunk index in cleaned documents
        chunk_idx = -1
        for j, chunk in enumerate(cleaned_split_documents):
            if chunk == text:
                chunk_idx = j
                break
        
        if chunk_idx != -1:
            # Get adjacent chunks (1 on each side)
            expanded_text = comparison.get_adjacent_chunks(chunk_idx, 1)
            
            # Clean the text before embedding
            clean_expanded = clean_text_for_embedding(expanded_text)
            clean_query = clean_text_for_embedding(query)
            
            # Re-calculate similarity for expanded text
            expanded_embedding = await vector_db.embedding_model.async_get_embedding(clean_expanded)
            query_embedding = await vector_db.embedding_model.async_get_embedding(clean_query)
            new_score = float(np.dot(expanded_embedding, query_embedding))
            
            expanded_contexts.append((clean_expanded, new_score))
            expanded_scores.append(f"Source {i+1}: {new_score:.3f}")
    
    # Sort by new scores
    expanded_contexts.sort(key=lambda x: x[1], reverse=True)
    
    # Create expanded context prompt
    expanded_context_prompt = ""
    for i, (context, score) in enumerate(expanded_contexts, 1):
        expanded_context_prompt += f"[Source {i}]: {context}\n\n"
    
    # Format expanded user prompt manually
    expanded_user_params = {
        "user_query": query,
        "context": expanded_context_prompt.strip(),
        "context_count": len(expanded_contexts),
        "similarity_scores": f"Relevance scores: {', '.join(expanded_scores)}"
    }
    
    formatted_user_prompt_expanded = rag_user_prompt.create_message(**expanded_user_params)
    formatted_system_prompt = rag_system_prompt.create_message(
        response_style="detailed",
        response_length="comprehensive"
    )
    
    # Get response from LLM
    response_expanded = chat_openai.run([formatted_system_prompt, formatted_user_prompt_expanded])
    
    print(f"📝 Response:")
    print(response_expanded)
    print(f"\n📊 Context Count: {len(expanded_contexts)}")
    print(f"📊 Similarity Scores: {expanded_scores}")
    print(f"📊 Note: Using expanded chunks (3 chunks each = ~3000 chars per context)")
    
    # Strategy 3: Large Chunks - Create a synchronous wrapper
    print(f"\n\n📚 STRATEGY 3: LARGE CHUNKS RAG RESPONSE")
    print("-" * 60)
    
    # Ensure large chunk vector DB is set up
    if not comparison.large_vector_db:
        await comparison.setup_large_chunks()
    
    # Get large chunk results manually and create a mock vector DB for RAG pipeline
    large_chunk_results = await comparison.large_vector_db.async_search_by_text(query, k=k)
    
    # Create a temporary mock vector DB that returns the large chunk results
    class MockLargeChunkVectorDB:
        def __init__(self, results):
            self.cached_results = results
            
        def search_by_text(self, query_text, k=5):
            # Return the pre-computed results
            return self.cached_results[:k]
    
    mock_large_db = MockLargeChunkVectorDB(large_chunk_results)
    
    # Create RAG pipeline with mock DB
    rag_pipeline_large = RetrievalAugmentedQAPipeline(
        vector_db_retriever=mock_large_db,
        llm=chat_openai,
        response_style="detailed", 
        include_scores=True
    )
    
    result_large = rag_pipeline_large.run_pipeline(
        query,
        k=k,
        response_length="comprehensive"
    )
    
    print(f"📝 Response:")
    print(result_large['response'])
    print(f"\n📊 Context Count: {result_large['context_count']}")
    print(f"📊 Similarity Scores: {result_large['similarity_scores']}")
    print(f"📊 Note: Using large chunks (~3000 chars each)")
    
    # Summary comparison
    print(f"\n\n📈 RESPONSE COMPARISON SUMMARY")
    print("=" * 50)
    print(f"🔍 Query: {query}")
    print(f"\n📄 Small Chunks: {len(result_small['response'])} characters")
    print(f"🔗 Adjacent Expansion: {len(response_expanded)} characters") 
    print(f"📚 Large Chunks: {len(result_large['response'])} characters")
    
    # Analyze which contexts were most useful
    print(f"\n📊 Context Analysis:")
    print(f"Small Chunks - Average similarity: {sum([float(s.split(': ')[1]) for s in result_small['similarity_scores']]) / len(result_small['similarity_scores']):.3f}")
    print(f"Adjacent Expansion - Average similarity: {sum([float(s.split(': ')[1]) for s in expanded_scores]) / len(expanded_scores):.3f}")
    print(f"Large Chunks - Average similarity: {sum([float(s.split(': ')[1]) for s in result_large['similarity_scores']]) / len(result_large['similarity_scores']):.3f}")
    
    print(f"\n💡 RESPONSE QUALITY INSIGHTS:")
    print(f"📊 Response lengths give us a hint about detail level")
    print(f"📊 Similarity scores show retrieval relevance")
    print(f"📊 Compare the actual responses above to see which strategy")
    print(f"   provides the most comprehensive and accurate answer!")
    
    return {
        'small_response': result_small,
        'expanded_response': response_expanded,
        'large_response': result_large,
        'query': query,
        'comparison_summary': {
            'small_length': len(result_small['response']),
            'expanded_length': len(response_expanded),
            'large_length': len(result_large['response'])
        }
    }

# Run the RAG pipeline comparison
print("Testing RAG pipeline responses with all three strategies...")
print("This will show how each chunking method affects the final answer quality.")
pipeline_results = await test_rag_pipeline_with_all_strategies()

Testing RAG pipeline responses with all three strategies...
This will show how each chunking method affects the final answer quality.
🤖 RAG PIPELINE RESPONSE COMPARISON
Query: What are the 3 most valuable advice for startups to succeed? Rank them in order of importance. Format your reply to be between 75 and 100 words.
Retrieved chunks per method: 3

📄 STRATEGY 1: SMALL CHUNKS RAG RESPONSE
------------------------------------------------------------
📝 Response:
Based on the provided context, the three most valuable pieces of advice for startups to succeed, ranked in order of importance, are:

1. **Market**: A great market with potential customers is crucial, as it can drive product development (Source 3).
2. **Product**: Building a strong product is essential, as it directly correlates to a startup's viability and ability to attract customers (Source 3).
3. **Team**: While important, the team is secondary to market and product, as even in successful startups, the quality of the team ca