<center>
    <h1>Retrieval-Augmented Generation (RAG)</h1>
</center>

# Brief Recap of Retrieval-Augmented Generation (RAG)

Imagine you're having a conversation with someone who has read thousands of books but can only remember general concepts, not specific details. Now imagine giving this person access to a library where they can quickly look up exact information while talking to you. This is essentially what RAG does for artificial intelligence systems.

Retrieval-Augmented Generation (RAG) combines two powerful capabilities: the ability to retrieve specific information from a knowledge base and the ability to generate human-like text responses. Think of it as giving an AI system both a powerful memory (retrieval) and the ability to explain things in its own words (generation).

Let's understand this through an example. If you ask a standard language model "What were the key points in the company's Q3 2023 earnings report?", it might generate a plausible but potentially incorrect response based on its training data. However, a RAG system would first retrieve the actual earnings report, find the relevant sections, and then generate a response based on that specific, accurate information.

<center>
    <img src="static/image1.gif" alt="RAG" style="width:50%;">
</center>

## Why Traditional Approaches Are Challenging

### Standard Language Model Limitations

When we use traditional language models without retrieval capabilities, we encounter several significant challenges:

1. **Knowledge Cutoff**: Language models are trained on data up to a certain date. Imagine trying to discuss current events using only information from a year ago. This creates a fundamental limitation in handling recent or updated information.

2. **Hallucination Problems**: Without access to specific reference material, language models might generate plausible-sounding but incorrect information. It's like asking someone to recall the details of a book they read years ago – they might mix up details or fill in gaps with assumptions.

3. **Specialization Difficulties**: Language models struggle with highly specialized or technical information. While they might understand general medical concepts, they wouldn't have access to a specific patient's medical history or the latest research in a particular field.

### Traditional Search Engine Limitations

Traditional search and retrieval systems also have their own set of challenges:

1. **Literal Matching**: Most search engines work by matching exact words or close variants. This means they might miss relevant information expressed in different terms. For example, a search for "heart attack symptoms" might not return documents discussing "myocardial infarction indicators."

2. **Context Understanding**: Search engines struggle to understand the broader context of a query. They might return documents containing the right keywords but missing the actual intent of the question.

3. **Response Generation**: Even if they find relevant information, traditional search engines can't synthesize or explain it – they can only show you the original text.

## Understanding RAG Architecture: Core Components

RAG systems are built on a carefully designed architecture that combines several sophisticated components. Let's break this down using an analogy of a highly efficient library system:

### 1. Document Processing Pipeline

Think of this as the library's cataloging system. Just as a library needs to process new books – categorizing them, creating summaries, and deciding where they belong – the document processing pipeline in RAG:
- Breaks down large documents into manageable chunks
- Cleans and standardizes the text
- Extracts key information and metadata
- Prepares the content for efficient storage and retrieval

### 2. Embedding System

This component is like creating a detailed index of every concept in your library. But instead of just keywords, it creates rich mathematical representations that capture the meaning of the text. For example:
- The phrase "the heart pumps blood" and "the cardiac muscle circulates fluid" would be recognized as similar concepts
- These mathematical representations help find relevant information even when the exact words don't match

### 3. Retrieval Mechanism

This works like an extremely intelligent librarian who:
- Understands the true meaning behind your questions
- Knows exactly where to find relevant information
- Can quickly assess which sources are most relevant
- Brings back not just exact matches, but contextually relevant information

### 4. Generation System

This is like having an expert who:
- Reads through all the retrieved information
- Understands how different pieces of information relate to each other
- Synthesizes the information into a coherent response
- Explains complex concepts in a way that matches your level of understanding

<center>
    <img src="static/image2.png" alt="RAG" style="width:50%;">
</center>

## Key Advantages of RAG

1. Accuracy and Reliability
RAG systems significantly improve the accuracy of AI responses by:
- Grounding answers in specific, verifiable sources
- Reducing hallucinations and speculation
- Providing up-to-date information
- Maintaining factual consistency

2. Transparency and Explainability
Unlike traditional "black box" AI systems, RAG offers:
- Clear sources for information
- Traceable reasoning paths
- Verifiable references
- The ability to audit and validate responses

3. Adaptability and Maintenance
RAG systems excel in staying current because:
- New information can be added without retraining
- Knowledge bases can be updated in real-time
- Different sources can be used for different contexts
- Content can be easily modified or removed

# Implementing RAG Core Components From Scratch

In this section, we'll build each core component of a RAG system from the ground up. We'll start with simple implementations and progressively add sophistication, explaining each step along the way.

## 1. Document Processing Component

The document processor is our first building block. It handles the crucial task of breaking down documents into manageable, meaningful chunks while preserving context.

In [None]:
from typing import Dict, List, Optional
import re

class DocumentProcessor:
    def __init__(self, 
                 chunk_size: int = 500,
                 chunk_overlap: int = 50,
                 min_chunk_size: int = 100):
        """
        Initialize the document processor with configurable parameters.
        
        Parameters:
            chunk_size: Target size for each text chunk
            chunk_overlap: Number of characters to overlap between chunks
            min_chunk_size: Minimum size for a valid chunk
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size

    def clean_text(self, text: str) -> str:
        """
        Clean and normalize text content for better processing.
        
        This method handles common text issues like extra whitespace,
        special characters, and formatting inconsistencies.
        """
        # Remove extra whitespace and normalize line endings
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\n+', '\n', text)
        
        # Remove special characters while preserving essential punctuation
        text = re.sub(r'[^\w\s.,!?;:()\[\]{}"\'`-]', '', text)
        
        # Normalize sentence endings to help with chunking
        text = re.sub(r'([.!?])\s*', r'\1\n', text)
        
        return text.strip()

    def create_chunks(self, 
                     document: Dict[str, str], 
                     respect_sentences: bool = True) -> List[Dict[str, str]]:
        """
        Break document into chunks while trying to preserve semantic meaning.
        
        Parameters:
            document: Dictionary containing document text and metadata
            respect_sentences: Whether to avoid breaking mid-sentence
            
        Returns:
            List of chunk dictionaries with text and metadata
        """
        text = self.clean_text(document['text'])
        chunks = []
        current_chunk = []
        current_length = 0
        
        # Split into sentences if we want to respect sentence boundaries
        sentences = text.split('\n') if respect_sentences else [text]
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
                
            sentence_length = len(sentence)
            
            # If adding this sentence exceeds chunk size
            if current_length + sentence_length > self.chunk_size:
                # Save current chunk if it's large enough
                if current_length >= self.min_chunk_size:
                    chunk_text = ' '.join(current_chunk)
                    chunks.append({
                        'text': chunk_text,
                        'document_id': document.get('id', 'unknown'),
                        'chunk_size': len(chunk_text),
                        'position': len(chunks)
                    })
                
                # Start new chunk, including overlap from previous chunk
                if current_chunk and self.chunk_overlap > 0:
                    # Calculate overlap text from previous chunk
                    overlap_text = ' '.join(current_chunk[-2:])  # Keep last 2 sentences for context
                    current_chunk = [overlap_text, sentence]
                    current_length = len(overlap_text) + sentence_length
                else:
                    current_chunk = [sentence]
                    current_length = sentence_length
            else:
                current_chunk.append(sentence)
                current_length += sentence_length
        
        # Add any remaining text as the final chunk
        if current_chunk and current_length >= self.min_chunk_size:
            chunk_text = ' '.join(current_chunk)
            chunks.append({
                'text': chunk_text,
                'document_id': document.get('id', 'unknown'),
                'chunk_size': len(chunk_text),
                'position': len(chunks)
            })
            
        return chunks

    def process_document(self, document: Dict[str, str]) -> List[Dict[str, str]]:
        """
        Process a document from start to finish.
        
        This is the main method that orchestrates the entire document
        processing pipeline.
        """
        # Validate document
        if not document.get('text'):
            raise ValueError("Document must contain 'text' field")
            
        # Process the document
        chunks = self.create_chunks(document)
        
        # Add metadata to each chunk
        for chunk in chunks:
            chunk.update({
                'source': document.get('source', 'unknown'),
                'title': document.get('title', 'unknown'),
                'timestamp': document.get('timestamp', None)
            })
            
        return chunks

Let's break down the key aspects of our document processor:

1. **Text Cleaning**: The `clean_text` method handles common text issues:
   - Normalizes whitespace and line endings
   - Removes problematic special characters
   - Standardizes sentence endings for better chunking
   - This creates a consistent format for further processing

2. **Chunking Strategy**: The `create_chunks` method uses a sophisticated approach:
   - Respects sentence boundaries to maintain readability
   - Implements overlap between chunks to preserve context
   - Maintains minimum chunk sizes to ensure meaningful content
   - Tracks document metadata throughout the process

## 2. Embedding System Implementation

The embedding system creates vector representations of text that capture semantic meaning. Here's our implementation:

In [None]:
import numpy as np
from hashlib import md5
from typing import Dict, List, Union, Optional

class EmbeddingEngine:
    def __init__(self, dimensions: int = 768, cache_size: int = 10000):
        """
        Initialize the embedding engine.
        
        Parameters:
            dimensions: Size of the embedding vectors
            cache_size: Maximum number of embeddings to cache
        """
        self.dimensions = dimensions
        self.cache_size = cache_size
        self.cache = {}
        self.word_vectors = {}
        
    def _create_word_vector(self, word: str) -> np.ndarray:
        """
        Create a deterministic vector for a word using a hashing approach.
        This is a simplified version - in practice, you'd use a proper
        embedding model.
        """
        
        # If we've already created a vector for this word, return it
        if word in self.word_vectors:
            return self.word_vectors[word]
            
        # Create a deterministic seed from the word that fits within NumPy's limits
        # We'll take the first 8 characters of the hash and convert to int
        # This ensures our seed is within the valid range (0 to 2^32 - 1)
        word_hash = int(md5(word.encode()).hexdigest()[:8], 16)
        
        # Set the random seed for reproducibility
        np.random.seed(word_hash)
        
        # Create a unit vector for the word
        vector = np.random.randn(self.dimensions)
        vector = vector / np.linalg.norm(vector)
        
        # Cache the vector
        self.word_vectors[word] = vector
        return vector
        
    def compute_embedding(self, 
                         text: str, 
                         use_cache: bool = True) -> np.ndarray:
        """
        Convert text into a vector representation.
        
        This implementation uses a bag-of-words approach with position
        weighting and IDF-like scaling.
        """
        # Check cache first
        if use_cache and text in self.cache:
            return self.cache[text]
            
        # Preprocess text
        words = text.lower().split()
        if not words:
            return np.zeros(self.dimensions)
            
        # Initialize embedding
        embedding = np.zeros(self.dimensions)
        word_count = {}
        
        # Compute word vectors with position weighting
        for position, word in enumerate(words):
            # Count word frequency
            word_count[word] = word_count.get(word, 0) + 1
            
            # Get word vector
            word_vector = self._create_word_vector(word)
            
            # Apply position-based weighting (words earlier in the text get slightly more weight)
            position_weight = 1.0 / (1 + np.log1p(position))
            
            # Add to embedding
            embedding += word_vector * position_weight
            
        # Apply IDF-like scaling based on word frequency
        for word, count in word_count.items():
            # Reduce impact of very frequent words
            scaling = 1.0 / np.sqrt(count)
            word_vector = self._create_word_vector(word)
            embedding += word_vector * scaling
            
        # Normalize the final embedding
        embedding_norm = np.linalg.norm(embedding)
        if embedding_norm > 0:
            embedding = embedding / embedding_norm
            
        # Cache the result if caching is enabled
        if use_cache:
            if len(self.cache) >= self.cache_size:
                # Remove oldest entry if cache is full
                self.cache.pop(next(iter(self.cache)))
            self.cache[text] = embedding
            
        return embedding
        
    def compute_similarity(self, 
                          vector1: np.ndarray, 
                          vector2: np.ndarray) -> float:
        """
        Compute cosine similarity between two vectors.
        """
        if vector1.shape != vector2.shape:
            raise ValueError("Vectors must have the same dimensions")
            
        dot_product = np.dot(vector1, vector2)
        return float(dot_product)  # Vectors are already normalized

Our embedding system includes several sophisticated features:

1. **Position Weighting**: Words earlier in the text get slightly more weight
2. **Frequency Scaling**: Reduces the impact of very frequent words
3. **Caching System**: Maintains a fixed-size cache of computed embeddings
4. **Deterministic Word Vectors**: Creates consistent vectors for words

## 3. Retrieval System Implementation

The retrieval system finds relevant chunks based on semantic similarity:

In [None]:
from typing import List, Dict, Any
from heapq import nlargest

class RetrievalSystem:
    def __init__(self, 
                 embedding_engine: EmbeddingEngine,
                 top_k: int = 5,
                 similarity_threshold: float = 0.2):  # Lowered threshold for better recall
        """
        Initialize the retrieval system with more lenient similarity matching.
        
        Parameters:
            embedding_engine: Instance of EmbeddingEngine for vector operations
            top_k: Number of chunks to retrieve
            similarity_threshold: Minimum similarity score to consider (lowered from 0.5)
        """
        self.embedding_engine = embedding_engine
        self.top_k = top_k
        self.similarity_threshold = similarity_threshold
        self.chunk_vectors = []
        self.chunks = []
        
    def add_chunk(self, 
                  chunk: Dict[str, Any],
                  vector: Optional[np.ndarray] = None) -> None:
        """
        Add a chunk and its vector to the retrieval system.
        Prints confirmation for debugging purposes.
        """
        if vector is None:
            vector = self.embedding_engine.compute_embedding(chunk['text'])
            
        self.chunk_vectors.append(vector)
        self.chunks.append(chunk)
        print(f"Added chunk with id: {chunk.get('id', 'unknown')}")  # Debug print
        
    def search(self, 
               query: str,
               filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        """
        Search for relevant chunks based on query with improved logging.
        
        Parameters:
            query: Search query text
            filters: Optional metadata filters to apply
            
        Returns:
            List of relevant chunks with similarity scores
        """
        # Compute query vector
        query_vector = self.embedding_engine.compute_embedding(query)
        
        # Initialize results
        results = []
        
        # Compare with all chunks
        for idx, (chunk_vector, chunk) in enumerate(zip(self.chunk_vectors, self.chunks)):
            # Check filters if provided
            if filters and not self._apply_filters(chunk, filters):
                continue
                
            # Compute similarity
            similarity = self.embedding_engine.compute_similarity(
                query_vector, chunk_vector
            )
            
            print(f"Chunk {idx} similarity: {similarity}")  # Debug print
            
            # Add to results if above threshold
            if similarity >= self.similarity_threshold:
                results.append({
                    'chunk': chunk,
                    'similarity': float(similarity),  # Ensure similarity is regular float
                    'index': idx
                })
                
        # Sort by similarity and get top k
        top_results = sorted(
            results, 
            key=lambda x: x['similarity'],
            reverse=True
        )[:self.top_k]
        
        return top_results
        
    def _apply_filters(self, 
                      chunk: Dict[str, Any], 
                      filters: Dict[str, Any]) -> bool:
        """
        Apply metadata filters to a chunk.
        """
        for key, value in filters.items():
            chunk_value = chunk.get(key)
            if chunk_value != value:
                return False
        return True

The retrieval system includes several important features:

1. **Efficient Search**: Uses vector similarity for semantic matching
2. **Filtering**: Supports metadata-based filtering of results
3. **Threshold-based Selection**: Only returns sufficiently similar results
4. **Flexible Ranking**: Sorts results by similarity score

## 4. Complete RAG Pipeline

Finally, let's put everything together in a complete RAG pipeline:

In [None]:
from openai import OpenAI
from typing import List, Dict
import os

class RAGPipeline:
    def __init__(self, openai_api_key: str):
        self.embedding_engine = EmbeddingEngine()
        self.retrieval_system = RetrievalSystem(self.embedding_engine)
        self.client = OpenAI(api_key=openai_api_key)
        
        # Enhanced system prompt for better context utilization
        self.system_prompt = """You are a helpful assistant that answers questions based on the provided context. 
        Your responses should:
        1. Use specific information from the provided contexts
        2. Acknowledge when information is or isn't available in the context
        3. Synthesize information from multiple contexts when relevant
        4. Stay focused on answering the specific question asked
        
        If the context contains relevant information, use it to provide a detailed answer.
        If the context doesn't contain enough information, clearly state what information is missing."""
    
    def add_document(self, document: Dict[str, str]) -> None:
        """
        Add a document to our knowledge base with improved error handling.
        """
        try:
            chunk = {
                'text': document['text'].strip(),  # Clean whitespace
                'source': document.get('source', 'unknown'),
                'id': document.get('id', 'unknown')
            }
            
            vector = self.embedding_engine.compute_embedding(chunk['text'])
            self.retrieval_system.add_chunk(chunk, vector)
            print(f"Successfully added document {chunk['id']}")  # Debug print
            
        except Exception as e:
            print(f"Error adding document: {str(e)}")
    
    def query(self, question: str, temperature: float = 0.7) -> Dict:
        """
        Process a question with improved context handling and response generation.
        """
        # Retrieve relevant contexts
        relevant_contexts = self.retrieval_system.search(question)
        
        if not relevant_contexts:
            return {
                'answer': "I apologize, but I couldn't find any relevant information in the provided contexts to answer your question.",
                'contexts': [],
                'model': 'gpt-3.5-turbo'
            }
        
        # Format contexts with more structure
        formatted_contexts = "\n\n".join([
            f"Context {i+1} (Relevance Score: {ctx['similarity']:.3f}):\n{ctx['chunk']['text']}"
            for i, ctx in enumerate(relevant_contexts)
        ])
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": f"""Please answer this question based on the following contexts:
                    
                    {formatted_contexts}
                    
                    Question: {question}
                    
                    Remember to use specific information from the contexts and acknowledge if any relevant information is missing."""}
                ],
                temperature=temperature
            )
            
            return {
                'answer': response.choices[0].message.content,
                'contexts': relevant_contexts,
                'model': 'gpt-3.5-turbo'
            }
            
        except Exception as e:
            return {
                'error': f"Error generating response: {str(e)}",
                'contexts': relevant_contexts
            }

## Using the RAG Pipeline

In [None]:
# Replace with your actual API key
os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

In [None]:
def demonstrate_rag(api_key: str):
    """
    Demonstrate the RAG system's capabilities with example documents and questions.
    
    This function shows how the system:
    1. Processes and stores documents
    2. Retrieves relevant information
    3. Generates context-aware answers
    """
    # Initialize our RAG system
    rag = RAGPipeline(api_key)
    
    # Our sample documents about the solar system and Earth
    documents = [
        {
            'id': '1',
            'text': '''
            The solar system consists of the Sun and all the objects that orbit around it. 
            The Sun contains 99.8% of all the mass in our solar system. The planets that 
            orbit the Sun are Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, and 
            Neptune. Each planet has unique characteristics and conditions.
            ''',
            'source': 'astronomy_basics'
        },
        {
            'id': '2',
            'text': '''
            Earth is the third planet from the Sun and the only known planet to support life. 
            It has one natural satellite, the Moon, which influences Earth's tides. The Earth's 
            atmosphere is composed mainly of nitrogen (78%) and oxygen (21%), creating conditions 
            perfect for life as we know it.
            ''',
            'source': 'earth_science'
        }
    ]
    
    print("Adding documents to the RAG system...")
    for document in documents:
        rag.add_document(document)
    
    print("\nDemonstrating RAG System Question-Answering:")
    print("=" * 60)
    
    # Test questions that explore different aspects of our documents
    questions = [
        "What is unique about Earth compared to other planets?",
        "How does the Moon affect Earth?",
        "What are the main components of Earth's atmosphere?"
    ]
    
    for question in questions:
        print(f"\nQuestion: {question}")
        result = rag.query(question)
        
        if 'error' not in result:
            print("\nAnswer:")
            print(result['answer'])
            print("\nBased on these contexts:")
            for i, ctx in enumerate(result['contexts'], 1):
                print(f"\nContext {i} (Relevance: {ctx['similarity']:.3f}):")
                print(ctx['chunk']['text'])
        else:
            print("\nError:", result['error'])
        
        print("\n" + "=" * 60)

In [None]:
# Run the demonstration using the environment variable for the API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
demonstrate_rag(os.getenv("OPENAI_API_KEY"))

In the above demonstration, it can be observed that the relevance score of Context1 which uses the RAG implementation is higher than of the Context2 which does not uses the RAG.