## Retrieval Augmented Agents



Retrieval-Augmented Generation (RAG) agents combine the power of information retrieval and text generation to create more factual and contextually aware AI responses. Unlike traditional generative models that rely solely on their pre-trained knowledge, RAG systems dynamically retrieve relevant documents from an external knowledge base and incorporate that information into their responses. This process ensures that the AI can provide up-to-date, verifiable, and domain-specific answers rather than relying only on its training data, which may be outdated or limited.

The core mechanism of a RAG agent involves two main steps: retrieval and synthesis. First, a retrieval model searches a structured or unstructured knowledge base (such as databases, documents, or APIs) to fetch the most relevant information based on the user’s query. Then, a generative model (e.g., GPT) processes this retrieved data and integrates it into a coherent, context-rich response. This approach is particularly useful in applications like customer support, research assistants, coding helpers, and medical or legal AI advisors, where accuracy and contextual awareness are critical.

By incorporating external knowledge sources, retrieval-augmented agents reduce hallucinations, improve response reliability, and adapt to evolving information. They can be fine-tuned to retrieve domain-specific knowledge, making them more effective in specialized fields. Additionally, they offer a practical solution to the limitations of static language models, ensuring that AI systems remain scalable, factually accurate, and continuously improving in their performance.

#### 1. Imports and Setup

In [None]:
import requests
import json
import numpy as np
import concurrent.futures
from sentence_transformers import SentenceTransformer
import faiss
import os
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
import mcp

OPENAI_API_KEY = "your_api_key"

# Load embedding model
embedder = SentenceTransformer('all-MiniLM-L6-v2')

# Document store
documents = []

# faiss setup for parallel vector search
dimension = 384  # dimension 
index = faiss.IndexFlatIP(dimension)
faiss.omp_set_num_threads(4)  # Use 4 threads for FAISS

### 2. Pydantic Models

This cell defines structured input schemas using Pydantic. These models provide type validation and clear documentation for the tool interfaces. The models include **DocumentInput** for single document additions, **DocumentsInput** for batch processing, **SearchInput** for configuring searches, and **RagInput** for question answering.

In [14]:
# Define tool schemas using Pydantic
class DocumentInput(BaseModel):
    document: str = Field(..., description="Text document to add to the knowledge base")

class DocumentsInput(BaseModel):
    documents: List[str] = Field(..., description="List of text documents to add to the knowledge base")

class SearchInput(BaseModel):
    query: str = Field(..., description="Search query text")
    top_k: int = Field(3, description="Number of top results to return")

class RagInput(BaseModel):
    question: str = Field(..., description="Question to answer using RAG approach")

#### 3. Core Functions
This cell implements the essential RAG functionality. The add_documents function handles parallel embedding processing with batching for efficiency. The search_documents function performs vector similarity search using FAISS. The rag_agent function combines retrieval and generation by fetching relevant documents and using them to inform the LLM's response.

In [15]:
# Core functions
def add_documents(new_docs):
    """Add documents with parallel embedding processing"""
    global documents, index
    
    # Determine the best processing approach based on document count
    with concurrent.futures.ThreadPoolExecutor() as executor:
        if len(new_docs) > 10:
            # For larger sets, split into batches of 32 for parallel processing
            batch_size = 32
            batches = [new_docs[i:i+batch_size] for i in range(0, len(new_docs), batch_size)]
            
            # Process batches in parallel
            embeddings_list = list(executor.map(embedder.encode, batches))
            
            # Combine results if multiple batches were processed
            if len(embeddings_list) > 1:
                embeddings = np.vstack(embeddings_list)
            else:
                embeddings = embeddings_list[0]
        else:
            # For smaller sets, process directly
            embeddings = embedder.encode(new_docs)
    
    # Normalize vectors for cosine similarity
    faiss.normalize_L2(embeddings)
    
    # Update the database
    documents.extend(new_docs)
    index.add(embeddings)
    
    return len(new_docs), len(documents)

def search_documents(query, top_k=3):
    """Search documents using FAISS for parallel vector similarity"""
    if not documents:
        return ["No documents in the knowledge base."]
    
    # Encode and normalize query
    query_embedding = embedder.encode([query])
    faiss.normalize_L2(query_embedding)
    
    # Search in FAISS (parallel operation)
    distances, indices = index.search(query_embedding, top_k)
    
    # Properly check indices and distances
    results = []
    for i in range(len(indices[0])):
        idx = indices[0][i]
        # Only include valid indices and positive similarity scores
        if idx < len(documents) and distances[0][i] > 0:
            results.append(documents[idx])
    
    return results if results else ["No relevant documents found."]

def rag_agent(question):
    """RAG agent with parallel retrieval and generation"""
    print("Retrieving and generating...")
    
    # Use ThreadPoolExecutor for parallel operations
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Start retrieval process
        retrieval_future = executor.submit(search_documents, question, 3)
        
        # Get retrieved documents
        docs = retrieval_future.result()
        context = "\n\n".join([f"Document {i+1}: {doc}" for i, doc in enumerate(docs)])
        
        # Build system prompt
        system_prompt = f"""You are an intelligent assistant. Use this relevant information:

{context}

When answering:
1. Synthesize information from sources
2. Use your own words for a coherent response
3. If information is insufficient, acknowledge this
4. Never hallucinate information"""
        
        # Make API call in parallel thread
        api_future = executor.submit(
            requests.post,
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {OPENAI_API_KEY}",
                "Content-Type": "application/json"
            },
            json={
                "model": "gpt-3.5-turbo",
                "messages": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": question}
                ],
                "temperature": 0.3
            }
        )
        
        # Get API response
        response = api_future.result()
        
        if response.status_code != 200:
            return f"Error: {response.status_code}, {response.text}"
        
        return response.json()["choices"][0]["message"]["content"]

#### 4. MCP Tools Implementation
The MCPRagTools class serves as a standardized interface layer for the RAG system. It transforms the core RAG functionality into a structured, reusable format that's compatible with the Model Context Protocol (MCP).

This class uses a static method approach to create stateless tool functions that can be easily registered with MCP or called directly. Each method follows a consistent pattern:

1. Input Validation : Each method accepts a strongly-typed Pydantic model parameter, which automatically validates the input data structure and provides clear documentation.
2. Core Function Delegation : The methods don't implement the actual RAG functionality themselves. Instead, they delegate to the core functions ( add_documents , search_documents , rag_agent ), passing the appropriate parameters.
3. Structured Response : Each method returns a standardized dictionary with consistent fields, making it easier for consumers to process the results programmatically.
The class implements four primary tools:

- add_document : Adds a single document to the knowledge base, returning the updated document count.
- add_documents : Adds multiple documents in batch, leveraging parallel processing for efficiency. Returns both the count of newly added documents and the total document count.
- search_documents : Performs semantic search against the knowledge base, returning the most relevant documents along with metadata about the search.
- rag_query : The primary end-user tool that combines retrieval and generation to answer questions. It returns both the original question and the generated answer.
This architecture separates the interface concerns from implementation details, allowing either to change independently. The static method approach ensures these tools can be used without instantiating the class, making them easier to register with frameworks like MCP.

In [16]:
# Create MCP tools
class MCPRagTools:
    @staticmethod
    def add_document(input_data: DocumentInput) -> Dict[str, Any]:
        """Add a single document to the knowledge base with embedding"""
        added, total = add_documents([input_data.document])
        return {"status": "success", "documents_count": total}
    
    @staticmethod
    def add_documents(input_data: DocumentsInput) -> Dict[str, Any]:
        """Add multiple documents with parallel embedding processing"""
        added, total = add_documents(input_data.documents)
        return {
            "status": "success", 
            "added_count": added,
            "total_documents": total
        }
    
    @staticmethod
    def search_documents(input_data: SearchInput) -> Dict[str, Any]:
        """Search documents using FAISS for parallel vector similarity"""
        results = search_documents(input_data.query, input_data.top_k)
        return {
            "query": input_data.query,
            "results": results,
            "result_count": len(results)
        }
    
    @staticmethod
    def rag_query(input_data: RagInput) -> Dict[str, Any]:
        """Process a question using Retrieval-Augmented Generation"""
        answer = rag_agent(input_data.question)
        return {
            "question": input_data.question,
            "answer": answer
        }

#### 5. MCP context creation function
The create_mcp_context() function is designed to establish a standardized execution environment for your RAG tools. This function serves as a compatibility layer that handles different MCP implementations.

The function begins by attempting to create a new MCP Context object, which serves as a container for registered tools. This context provides a standardized way to execute tools by name with appropriate parameters.

The most important aspect of this function is its robust approach to tool registration. Since different MCP implementations might use different methods for registering tools, the function implements a three-tiered fallback strategy:

1. First, it attempts to use the context.register() method, which is common in some MCP implementations.
2. If that fails with an AttributeError (meaning the method doesn't exist), it falls back to trying context.add_tool() , another common registration pattern.
3. If both previous methods fail, it attempts a third approach using context.register_tool() .
This defensive programming approach ensures maximum compatibility across different MCP versions and implementations. Each registration attempt registers all four RAG tools:

- add_document for single document addition
- add_documents for batch document processing
- search_documents for knowledge retrieval
- rag_query for question answering
The function ultimately returns the configured context object, which can then be used to execute the registered tools through a standardized interface. This abstraction layer allows your code to work with different MCP implementations without modification, providing flexibility and future-proofing.

In [17]:
# Create MCP context
def create_mcp_context():
    # Create a new MCP context
    context = mcp.Context()
    
    # Register tools with the context
    # Note: The exact registration method may vary depending on your MCP implementation
    # Common methods include:
    try:
        # Method 1: Using register method
        context.register("add_document", MCPRagTools.add_document)
        context.register("add_documents", MCPRagTools.add_documents)
        context.register("search_documents", MCPRagTools.search_documents)
        context.register("rag_query", MCPRagTools.rag_query)
    except AttributeError:
        try:
            # Method 2: Using add_tool method
            context.add_tool("add_document", MCPRagTools.add_document)
            context.add_tool("add_documents", MCPRagTools.add_documents)
            context.add_tool("search_documents", MCPRagTools.search_documents)
            context.add_tool("rag_query", MCPRagTools.rag_query)
        except AttributeError:
            # Method 3: Using register_tool method
            context.register_tool("add_document", MCPRagTools.add_document)
            context.register_tool("add_documents", MCPRagTools.add_documents)
            context.register_tool("search_documents", MCPRagTools.search_documents)
            context.register_tool("rag_query", MCPRagTools.rag_query)
    
    return context

In [18]:
# Sample documents
sample_docs = [
    "OpenAI was founded in December 2015 by Sam Altman, Elon Musk, and others with the mission to ensure that artificial general intelligence benefits all of humanity.",
    "GPT-4 is a multimodal large language model created by OpenAI in 2023, capable of processing both text and image inputs.",
    "RAG stands for Retrieval-Augmented Generation, a technique to enhance LLM responses with external knowledge by retrieving relevant information and incorporating it into the generation process.",
    "Vector databases store embeddings of text which can be searched by similarity using mathematical operations like cosine similarity.",
    "Retrieval-Augmented Generation (RAG) helps address hallucination problems in language models by grounding responses in factual information from reliable sources.",
    "The key components of a RAG system include an embedding model, a vector database, a retrieval mechanism, and a text generation model."
]

# Add documents directly
added, total = add_documents(sample_docs)
print(f"Added {added} documents. Knowledge base now has {total} documents.")

# Example query
question = "What are the key components of a RAG system?"
print(f"\nQuestion: {question}")

# Execute RAG query directly
answer = rag_agent(question)
print(f"\nAnswer: {answer}")

Added 6 documents. Knowledge base now has 6 documents.

Question: What are the key components of a RAG system?
Retrieving and generating...

Answer: The key components of a RAG system include an embedding model, a vector database, a retrieval mechanism, and a text generation model. RAG, which stands for Retrieval-Augmented Generation, utilizes these components to enhance language model responses by incorporating external knowledge retrieved from reliable sources to ground the generated text in factual information. This approach helps mitigate hallucination issues in language models by ensuring that the responses are supported by relevant and accurate external data.


In [20]:
# Simplified MCP Usage Example
try:
    # Register tools directly with MCP if possible
    if hasattr(mcp, 'register_tool'):
        mcp.register_tool("add_document", MCPRagTools.add_document)
        mcp.register_tool("add_documents", MCPRagTools.add_documents)
        mcp.register_tool("search_documents", MCPRagTools.search_documents)
        mcp.register_tool("rag_query", MCPRagTools.rag_query)
    
    # Try to execute with MCP if available
    if hasattr(mcp, 'execute'):
        # Execute add_documents
        result = mcp.execute("add_documents", {"documents": sample_docs})
        print(f"Added documents via MCP. Result: {result}")
        
        # Execute rag_query
        question = "What is RAG used for?"
        print(f"\nQuestion: {question}")
        answer_result = mcp.execute("rag_query", {"question": question})
        print(f"\nMCP Answer: {answer_result.get('answer', answer_result)}")
    else:
        print("MCP execute method not available. Using direct method calls instead.")
        # Fall back to direct method calls
        result = MCPRagTools.add_documents(DocumentsInput(documents=sample_docs))
        print(f"Added {result['added_count']} documents via direct call.")
        
        question = "What is RAG used for?"
        print(f"\nQuestion: {question}")
        answer_result = MCPRagTools.rag_query(RagInput(question=question))
        print(f"\nDirect Answer: {answer_result['answer']}")
except Exception as e:
    print(f"MCP execution failed: {e}")
    print("Falling back to direct method calls...")
    
    # Fall back to direct method calls
    result = MCPRagTools.add_documents(DocumentsInput(documents=sample_docs))
    print(f"Added {result['added_count']} documents via direct call.")
    
    question = "What is RAG used for?"
    print(f"\nQuestion: {question}")
    answer_result = MCPRagTools.rag_query(RagInput(question=question))
    print(f"\nDirect Answer: {answer_result['answer']}")

MCP execute method not available. Using direct method calls instead.
Added 6 documents via direct call.

Question: What is RAG used for?
Retrieving and generating...

Direct Answer: RAG, which stands for Retrieval-Augmented Generation, is a technique used to enhance Large Language Models (LLMs) responses by incorporating external knowledge into the generation process. This is achieved by retrieving relevant information from a database and integrating it into the generated text. The key components of a RAG system include an embedding model, a vector database, a retrieval mechanism, and a text generation model.


In [21]:
# Additional example queries
example_questions = [
    "How does RAG help with hallucination?",
    "What is the role of vector databases in RAG?"
]

for q in example_questions:
    print(f"\nQuestion: {q}")
    answer = rag_agent(q)
    print(f"Answer: {answer}")


Question: How does RAG help with hallucination?
Retrieving and generating...
Answer: Retrieval-Augmented Generation (RAG) is a technique that aids in mitigating hallucination issues in language models by incorporating factual information obtained from reliable sources into the generation process. By grounding responses in this external knowledge, RAG helps ensure that the generated content is more accurate and less likely to produce misleading or false information. This approach enhances the overall reliability and credibility of the language model's outputs, thereby reducing the occurrence of hallucinations in the generated text.

Question: What is the role of vector databases in RAG?
Retrieving and generating...
Answer: In a Retrieval-Augmented Generation (RAG) system, vector databases play a crucial role in storing and organizing the external knowledge or information that is retrieved to enhance the language model's responses. These databases contain vector representations of the r