# Textual RAG

![RAG Image](../data/rag.png)

Retrieval-Augmented Generation (RAG) is an AI technique that combines information retrieval with text generation. Instead of relying solely on a pre-trained language model’s internal knowledge, RAG dynamically retrieves relevant documents from an external knowledge base before generating a response.

![Why RAG Image](../data/why_rag.png)

1. **Improved Accuracy:** RAG enhances the factual correctness of generated responses by retrieving up-to-date and domain-specific information, reducing the likelihood of hallucinations (fabricated information).

2. **Better Generalization:** Since RAG dynamically retrieves relevant documents, it performs well across various domains without requiring extensive fine-tuning, making it more adaptable to new topics.

3. **Reduced Model Size Requirements:** Instead of embedding all knowledge within a large model, RAG leverages external databases, allowing for smaller, more efficient models while maintaining high-quality responses.

4. **Enhanced Explainability:** By referencing retrieved documents, RAG provides verifiable sources for its answers, making it more transparent and easier to trust compared to purely generative models.

5. **And more...**

In this exercise, you will learn how to implement a Retrieval-Augmented Generation (RAG) pipeline from scratch, without relying on tools like `langchain`. While `langchain` is a powerful framework that simplifies the development of RAG pipelines, it can sometimes lack flexibility for custom implementations, as it abstracts many components.

The different components of the pipeline are:  

- **Text extraction from PDFs** – Extract raw text from PDF files to make the content processable.  
- **Text chunking** – Break the extracted text into smaller, meaningful segments to improve retrieval efficiency.  
- **Embedding of the chunks** – Convert text chunks into numerical representations (embeddings) using a pre-trained model.  
- **Storage of the embeddings in a vector store** – Save the embeddings in a specialized database (vector store) to enable fast similarity searches.  
- **Relevant chunks retrieval** – Query the vector store to find the most relevant text chunks based on user input.  
- **Setting and prompting of the LLM for a RAG** – Structure prompts and configure the language model to integrate retrieved information into its responses.  
- **Additional tools for improved retrieval** – Use techniques like query expansion to reformulate user queries for better recall and reciprocal rank fusion to combine results from multiple retrieval methods.  
- **Final RAG pipeline implementation** – Integrate all components into a complete system that retrieves relevant information and generates enhanced responses using the language model.  

**Note:** To complete this exercise, you need an OpenAI API key, the PDF files, and the necessary libraries installed (see `requirements.txt`).  

In [None]:
!pip install -r requirements.txt

In [None]:
import os
import getpass
import json

import chromadb

from src.data_classes import Chunk
from src.data_processing import SimpleChunker, PDFExtractorAPI
from src.embedding import (
    OpenAITextEmbeddings,
    compute_openai_large_embedding_cost,
)
from src.vectorstore import (
    ChromaDBVectorStore,
    VectorStoreRetriever,
)
from src.llm import OpenAILLM
from src.rag import Generator, DefaultRAG, query_expansion

In [None]:
data_folder = "../data"

pdf_files = [
    "Explainable_machine_learning_prediction_of_edema_a.pdf",
    "Modeling tumor size dynamics based on real‐world electronic health records.pdf",
]
example_pdf_file = "Explainable_machine_learning_prediction_of_edema_a.pdf"
example_pdf_path = os.path.join(data_folder, example_pdf_file)

vector_store_collection = "text_collection"

In [None]:
os.environ["OPENAI_API_KEY"] = getpass.getpass()

# Example

The example uses only `Explainable_machine_learning_prediction_of_edema_a.pdf`. Please, have a quick look at it before starting the exercise.

In [None]:
test_question = "According to SHAP analysis, which factors were the most influential in predicting higher-grade edema (Grade 2+)?"

## LLM  

The LLM is the core of the RAG system, responsible for generating responses based on the retrieved information. There are many options available on-premise or online, each with different performance, speed, specialized knowledge and cost trade-offs. In this case, we use `gpt-4o-mini`.  

This LLM expects input in the form of a list of messages, where each message includes the content and the role of the speaker (e.g., system, user, assistant).  

Here is how they are defined here:

```python
class Roles(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"

class LLMMessage(BaseModel):
    content: Optional[str] = None
    role: Optional[Roles] = None
```

In [None]:
llm = OpenAILLM(temperature=0.5)

In [None]:
print(test_question)

In [None]:
answer, price = llm.generate([{"role": "user", "content": test_question}], verbose=True)

In [None]:
print(answer.content)

## PDF Text Extraction  

The first step in the pipeline is to extract text from the document.  

In this exercise, we use the `MinerU` library, which under the hood uses among others `doclayout_yolo` for segmentation. Note that this model is not commercially permissive.

The choice of extraction tool should be carefully considered. Depending on the document type and formatting, different methods may be required to preserve text integrity and leverage structural elements such as headings, tables, or metadata for better processing (`pdfplumber` (better for tables), `Tesseract OCR` (for scanned PDFs), ect.).

In [None]:
data_extractor = PDFExtractorAPI()
_, text, _ = data_extractor.extract_text_and_images(example_pdf_path)

In [None]:
print(text[:1000])

## Text Chunking  

The second step is to split the extracted text into smaller chunks, which will later be embedded and retrieved efficiently.  

In this exercise, we use a simple heuristic approach: the text is split iteratively—first by heading levels (`#`), then by line breaks (`\n`), and finally by sentence (`.`). Splitting only occurs if the resulting chunk exceeds a predefined length. However, more advanced techniques exist, such as **semantic chunking** (which splits based on meaning rather than syntax) or **agentic chunking** (which dynamically adapts chunk sizes based on context).  

Each chunk is enriched with metadata, including:  
- **Source file** – The document from which the chunk originates.  
- **Chunk counter** – The position of the chunk within the file.  
- **Unique identifier (`chunk_id`)** – Ensures each chunk can be referenced independently.  

Additional metadata could be included to enable more refined filtering and retrieval strategies.  

Here, our chunks are defined as:
```python
class Chunk(BaseModel):
    chunk_id: int
    content: str
    metadata: dict = Field(default_factory=dict)
    score: Optional[float] = None
```  

In [None]:
file_metadata = {"source_text": example_pdf_file}

text_chunker = SimpleChunker(max_chunk_size=1000)

chunks = text_chunker.chunk_text(text, file_metadata)

In [None]:
print(len(chunks))
chunks[0]

## Embedding Model  

Once the text is split into chunks, each chunk is converted into a numerical representation (embedding) that captures its meaning.  

Here, we use OpenAI’s `text-embedding-3-large`, but other options exist, each with different trade-offs in on-premise vs online, accuracy, speed, and cost. The choice of model depends on the specific needs of the retrieval task.

In [None]:
_ = compute_openai_large_embedding_cost(chunks, verbose=True)

In [None]:
embedding_model = OpenAITextEmbeddings()
embeddings = embedding_model.get_embedding([chunk.content for chunk in chunks])

In [None]:
print(embeddings.shape)
embeddings[0]

## Vector Store and Retriever  

After embedding the chunks, they need to be stored for efficient retrieval. The choice of vector store depends on factors like accuracy, speed, and filtering options. In this exercise, we use `ChromaDB`.  

The next step is retrieving the most relevant chunks based on a query. In this implementation, the retriever uses only embeddings (sparse search). However, in some cases, dense search methods like BM25 or hybrid approaches combining both sparse and dense search can be used for better accuracy.

In [None]:
vector_store = ChromaDBVectorStore(vector_store_collection)
vector_store.insert_documents(chunks, embeddings)

In [None]:
print(test_question)

In [None]:
retriever = VectorStoreRetriever(embedding_model, vector_store)
results = retriever.retrieve(test_question, 5)
results

## Generator  

Once the LLM is set up, a specific prompt needs to be defined for the RAG system. This prompt must include the retrieved chunks as context. The prompt has to be adapted to each specific project.

In addition to the basic prompt, we incorporate **prompt engineering** by asking the LLM to justify its answer. The model is also instructed to indicate which chunks were most relevant in forming its response, improving **interpretability**, and to provide the answer in **JSON format** for easier data management.

In [None]:
default_system_prompt = """You are a helpful assistant, and your task is to answer questions using relevant documents. Please first think step-by-step by mentioning which documents you used and then answer the question. Organize your output in a json formatted as dict{"step_by_step_thinking": Str(explanation), "document_used": List(integers), "answer": Str{answer}}. Your responses will be read by someone without specialized knowledge, so please have a definite and concise answer."""
print(default_system_prompt)

In [None]:
default_rag_template = """
Here are the relevant DOCUMENTS:
{context}

--------------------------------------------

Here is the USER QUESTION:
{query}

--------------------------------------------

Please think step-by-step and generate your output in json:
"""
print(default_rag_template)

In [None]:
print(test_question)

In [None]:
generator = Generator(
    llm, system_prompt=default_system_prompt, rag_template=default_rag_template
)

In [None]:
answer, cost = generator.generate(
    history=[],
    query=test_question,
    chunks=[
        results[0][0]["chunk"],
        Chunk(chunk_id=1, content="DATE: 1999.12.02", metadata={}),
    ],
    verbose=True,
)

In [None]:
print(answer.content)

## RAG Tools  

There are several methods to improve the efficiency of a RAG pipeline, such as query contextualization, query reformulation, re-ranking, query expansion, etc.

In this notebook, we implement **query expansion** to enhance retrieval and apply **reciprocal rank fusion** to optimize the ranking of chunks when multiple queries are involved.

In [None]:
query_expansion_system_message = {
    "role": "system",
    "content": "You are a focused assistant designed to generate multiple, relevant search queries based solely on a single input query. Your task is to produce a list of these queries in English, without adding any further explanations or information.",
}

query_expansion_template_query = """
        Generate multiple search queries related to: {query}, and translate them in english if they are not already in english. Only output {expansion_number} queries in english.
        OUTPUT ({expansion_number} queries):
    """

In [None]:
print(test_question)

In [None]:
answer, cost = query_expansion(
    test_question,
    llm,
    query_expansion_system_message,
    template_query_expansion=query_expansion_template_query,
    expansion_number=5,
)

answer

## RAG  

Finally, the RAG pipeline is defined by integrating all the previously discussed components into a unified process.

In [None]:
rag = DefaultRAG(
    llm=llm,
    text_embedding_model=embedding_model,
    text_vector_store=vector_store,
    generator=generator,
    query_expansion_system_message=query_expansion_system_message,
    query_expansion_template_query=query_expansion_template_query,
    params={"top_k": 5, "number_query_expansion": 3},
)

In [None]:
print(test_question)

In [None]:
answer, sources, cost = rag.execute(test_question, {}, verbose=True)

In [None]:
print(json.dumps(answer, indent=3))

In [None]:
# The documents retrieved by the retriever:
print(len(sources))
print(sources[0])

In [None]:
print(cost)

# Exercises

The different blocks are redefined below, and a new pipeline is created that uses both PDFs.

1. Quickly go through the code and the notebook above to ensure you understand how each block works.
2. Answer the following questions related to `Explainable_machine_learning_prediction_of_edema_a.pdf` and analyze the answers:
   1. "What was identified as the most important predictor for edema occurrence?"
   2. "Which machine learning algorithm performed best for predicting edema, and what was its F1 score?"
   3. "How did cumulative tepotinib dose impact edema predictions, and what insights did SHAP provide about this relationship?"
   4. Propose your own question.
3. Review the `Modeling tumor size dynamics based on real‐world electronic health records.pdf` and come up with a question. Ask it and analyze the answer, confirm that the retriever uses relevant chunks from this source.
4. Discuss how the pipeline could be improved to achieve better answers. If time permits, implement those changes.

In [None]:
data_extractor = PDFExtractorAPI()
text_chunker = SimpleChunker(max_chunk_size=1000)

chunks = []

for pdf_file in pdf_files:
    pdf_path = os.path.join(data_folder, pdf_file)
    _, text, _ = data_extractor.extract_text_and_images(pdf_path)
    chunks_curr = text_chunker.chunk_text(text, {"source_text": pdf_file})
    chunks.extend(chunks_curr)
    print(len(chunks))

len(chunks)

In [None]:
_ = compute_openai_large_embedding_cost(chunks)

In [None]:
embedding_model = OpenAITextEmbeddings()
embeddings = embedding_model.get_embedding([chunk.content for chunk in chunks])

In [None]:
# Reset previous
client = chromadb.Client()
client.delete_collection(vector_store_collection)

# Create new one
vector_store = ChromaDBVectorStore(vector_store_collection)
vector_store.insert_documents(chunks, embeddings)

In [None]:
llm = OpenAILLM(temperature=1.0)

In [None]:
system_prompt = """You are a helpful assistant, and your task is to answer questions using relevant documents. Please first think step-by-step by mentioning which documents you used and then answer the question. Organize your output in a json formatted as dict{"step_by_step_thinking": Str(explanation), "document_used": List(integers), "answer": Str{answer}}. Your responses will be read by someone without specialized knowledge, so please have a definite and concise answer."""
print(system_prompt)

In [None]:
rag_template = """
Here are the relevant DOCUMENTS:
{context}

--------------------------------------------

Here is the USER QUESTION:
{query}

--------------------------------------------

Please think step-by-step and generate your output in json:
"""
print(rag_template)

In [None]:
query_expansion_system_message = {
    "role": "system",
    "content": "You are a focused assistant designed to generate multiple, relevant search queries based solely on a single input query. Your task is to produce a list of these queries in English, without adding any further explanations or information.",
}

query_expansion_template_query = """
        Generate multiple search queries related to: {query}, and translate them in english if they are not already in english. Only output {expansion_number} queries in english.
        OUTPUT ({expansion_number} queries):
    """

In [None]:
generator = Generator(llm, system_prompt=system_prompt, rag_template=rag_template)

In [None]:
rag = DefaultRAG(
    llm=llm,
    text_embedding_model=embedding_model,
    text_vector_store=vector_store,
    generator=generator,
    query_expansion_system_message=query_expansion_system_message,
    query_expansion_template_query=query_expansion_template_query,
    params={"top_k": 1, "number_query_expansion": 0},
)

In [None]:
answer, sources, cost = rag.execute(
    "Here goes my amazing question!",
    {},
    verbose=True,
)

In [None]:
print(json.dumps(answer, indent=3))

In [None]:
# The documents retrieved by the retriever:
print(len(sources))
print(sources[0])

In [None]:
print(cost)

----------------