***AgentRAG: Autonomous Research Assistant***
This notebook implements an intelligent agent that can research topics, retrieve information from the web, and provide comprehensive answers to complex questions.

In [None]:
!pip install numpy pandas torch transformers scikit-learn requests openai python-dotenv beautifulsoup4 tqdm

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
C

In [None]:
import os
os.environ["OPENAI_API_KEY"] = ""
os.environ["SERPER_API_KEY"] = ""

Imports and Initial Setup
Let's import all the necessary libraries:

In [None]:
# Import required libraries
import json
import numpy as np
import requests
import pandas as pd
from typing import List, Dict, Any, Tuple, Optional
from dataclasses import dataclass, field
import torch
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity
import time
import re
from collections import deque
import warnings
from bs4 import BeautifulSoup

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning, module="transformers")

In [None]:
@dataclass
class AgentConfig:
    """Configuration for the Agentic RAG system"""
    model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
    chunk_size: int = 512
    chunk_overlap: int = 50
    top_k: int = 5
    similarity_threshold: float = 0.7
    max_iterations: int = 3
    api_key: str = os.environ.get("OPENAI_API_KEY", "your-openai-api-key")
    serper_api_key: str = os.environ.get("SERPER_API_KEY", "your-serper-api-key")

@dataclass
class Document:
    """Representation of a document with content and metadata"""
    content: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    embedding: Optional[np.ndarray] = None
    chunks: List[Dict[str, Any]] = field(default_factory=list)

    def __post_init__(self):
        self.id = self.metadata.get("id", hash(self.content) % 10000)

**Vector Store Implementation**

The Vector Store handles document embeddings and similarity searches:

In [None]:
class VectorStore:
    """In-memory vector store for document embeddings"""
    def __init__(self, config: AgentConfig):
        self.config = config
        self.documents: List[Document] = []
        self.chunks: List[Dict[str, Any]] = []
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        print(f"Loading embedding model on {self.device}...")
        self.tokenizer = AutoTokenizer.from_pretrained(config.model_name)
        self.model = AutoModel.from_pretrained(config.model_name).to(self.device)
        print("Embedding model loaded successfully!")

    def _get_embedding(self, text: str) -> np.ndarray:
        """Generate embedding for text using the embedding model"""
        inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(self.device)
        with torch.no_grad():
            outputs = self.model(**inputs)

        # Mean pooling to get sentence embedding
        token_embeddings = outputs.last_hidden_state
        attention_mask = inputs['attention_mask']
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        embedding = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
        return embedding[0].cpu().numpy()

    def _chunk_document(self, document: Document) -> List[Dict[str, Any]]:
        """Split document into chunks with overlap"""
        text = document.content
        chunks = []

        for i in range(0, len(text), self.config.chunk_size - self.config.chunk_overlap):
            chunk_text = text[i:i + self.config.chunk_size]
            if len(chunk_text) < 50:  # Skip very small chunks
                continue

            chunk = {
                "text": chunk_text,
                "embedding": self._get_embedding(chunk_text),
                "doc_id": document.id,
                "chunk_id": len(chunks),
                "metadata": document.metadata
            }
            chunks.append(chunk)

        return chunks

    def add_document(self, document: Document) -> None:
        """Add document to the store, chunk it, and compute embeddings"""
        document.embedding = self._get_embedding(document.content)
        document.chunks = self._chunk_document(document)
        self.documents.append(document)
        self.chunks.extend(document.chunks)

    def similarity_search(self, query: str, top_k: int = None) -> List[Dict[str, Any]]:
        """Find most similar chunks to the query"""
        if top_k is None:
            top_k = self.config.top_k

        # Handle empty chunks case
        if not self.chunks:
            return []

        query_embedding = self._get_embedding(query)
        chunk_embeddings = np.array([chunk["embedding"] for chunk in self.chunks])

        similarities = cosine_similarity([query_embedding], chunk_embeddings)[0]
        top_indices = np.argsort(similarities)[-min(top_k, len(similarities)):][::-1]

        results = []
        for idx in top_indices:
            if similarities[idx] >= self.config.similarity_threshold:
                results.append({
                    "chunk": self.chunks[idx],
                    "similarity": float(similarities[idx])
                })

        return results

# **Web Retriever Implementation**
The Web Retriever handles searching the web and extracting content from web pages:

In [None]:
class WebRetriever:
    """Component for retrieving information from the web using real search APIs"""

    def __init__(self, config: AgentConfig):
        self.config = config
        self.serper_api_key = os.environ.get("SERPER_API_KEY", config.serper_api_key)

    def search(self, query: str, num_results: int = 3) -> List[Document]:
        """Search the web for information related to the query using Serper API"""
        try:
            # Use Serper.dev Google Search API
            headers = {
                'X-API-KEY': self.serper_api_key,
                'Content-Type': 'application/json'
            }

            payload = {
                "q": query,
                "num": num_results
            }

            response = requests.post(
                'https://google.serper.dev/search',
                headers=headers,
                json=payload
            )

            if response.status_code != 200:
                print(f"Search API error: Status code {response.status_code}")
                return []

            search_results = response.json()
            documents = []

            # Process organic search results
            if "organic" in search_results:
                for result in search_results["organic"][:num_results]:
                    # Try to extract content from each URL
                    title = result.get("title", "")
                    snippet = result.get("snippet", "")
                    url = result.get("link", "")

                    # Start with the snippet as minimal content
                    content = f"Title: {title}\n\nExcerpt: {snippet}"

                    # Try to get full content
                    full_doc = self.extract_text_from_url(url)
                    if full_doc and len(full_doc.content) > len(content):
                        documents.append(full_doc)
                    else:
                        # Use snippet if extraction failed
                        documents.append(Document(
                            content=content,
                            metadata={"source": url, "title": title}
                        ))

            return documents

        except Exception as e:
            print(f"Error during web search: {e}")
            return []

    def extract_text_from_url(self, url: str) -> Optional[Document]:
        """Extract text content from a URL using requests and basic HTML parsing"""
        try:
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
            }

            response = requests.get(url, headers=headers, timeout=10)

            if response.status_code != 200:
                return None

            # Use BeautifulSoup for content extraction
            soup = BeautifulSoup(response.text, 'html.parser')

            # Remove script and style elements
            for script in soup(["script", "style", "nav", "footer", "header"]):
                script.extract()

            # Get text content
            text = soup.get_text(separator="\n", strip=True)

            # Clean up text (remove extra newlines, etc.)
            lines = [line.strip() for line in text.splitlines() if line.strip()]
            content = "\n".join(lines)

            # Limit content length to avoid extremely long documents
            max_length = 10000
            if len(content) > max_length:
                content = content[:max_length] + "... [content truncated]"

            return Document(
                content=content,
                metadata={"source": url, "title": soup.title.string if soup.title else url}
            )

        except Exception as e:
            print(f"Error extracting text from {url}: {str(e)}")
            return None

**LLM Interface**

The LLM Interface connects to OpenAI's API to generate responses:

In [None]:
class LLMInterface:
    """Interface to the OpenAI API for LLM capabilities"""

    def __init__(self, config: AgentConfig):
        self.config = config
        self.api_key = os.environ.get("OPENAI_API_KEY", config.api_key)

    def generate(self, prompt: str, temperature: float = 0.7, max_tokens: int = 800) -> str:
        """Generate text using OpenAI's API"""
        try:
            import openai

            # Set the API key
            openai.api_key = self.api_key

            # Make API call
            response = openai.chat.completions.create(
                model="gpt-3.5-turbo",  # You can use gpt-4 for higher quality
                messages=[
                    {"role": "system", "content": "You are a helpful research assistant that provides factual, concise information."},
                    {"role": "user", "content": prompt}
                ],
                temperature=temperature,
                max_tokens=max_tokens,
                top_p=1.0,
                frequency_penalty=0.0,
                presence_penalty=0.0
            )

            # Extract and return the response text
            return response.choices[0].message.content.strip()

        except Exception as e:
            print(f"Error generating LLM response: {e}")

            # Fallback response if API call fails
            fallback_msg = f"Error generating response: {str(e)}. "
            fallback_msg += "Please check your API key and connection. "

            if "RateLimitError" in str(e):
                fallback_msg += "You've hit a rate limit. Please try again in a minute."
            elif "AuthenticationError" in str(e):
                fallback_msg += "Authentication failed. Please check your API key."

            return fallback_msg

In [None]:
# First cell - Class definition with basic methods
class AgentRAG:
    """Main agent class that orchestrates the RAG process"""

    def __init__(self, config: AgentConfig = None):
        print("Initializing AgentRAG...")
        self.config = config or AgentConfig()
        self.vector_store = VectorStore(self.config)
        self.web_retriever = WebRetriever(self.config)
        self.llm = LLMInterface(self.config)
        self.memory = deque(maxlen=10)  # Short-term memory for conversation
        print("AgentRAG initialized!")

    def add_to_memory(self, item: Dict[str, Any]) -> None:
        """Add an interaction to the agent's memory"""
        self.memory.append(item)

    def _format_context(self, chunks: List[Dict[str, Any]]) -> str:
        """Format retrieved chunks into context for the LLM"""
        context = "\n\n".join([
            f"Source: {chunk['chunk']['metadata'].get('source', 'Unknown')}\n"
            f"Content: {chunk['chunk']['text']}"
            for chunk in chunks
        ])
        return context

    def _generate_search_queries(self, user_query: str) -> List[str]:
        """Generate search queries based on the user query"""
        # In production, use LLM to generate these
        base_query = user_query.strip("?").lower()
        return [
            base_query,
            f"latest information about {base_query}",
            f"{base_query} explanation"
        ]

    def _needs_web_search(self, query: str, results: List[Dict[str, Any]]) -> bool:
        """Determine if web search is needed based on query and existing results"""
        if not results:
            return True

        # Check if results are relevant enough
        avg_similarity = sum(r["similarity"] for r in results) / len(results) if results else 0
        return avg_similarity < 0.75

    # Include process_query and recursive_research methods here too
    def process_query(self, query: str) -> Dict[str, Any]:
        """Process a user query through the full agentic RAG pipeline"""
        # 1. Check existing knowledge
        search_results = self.vector_store.similarity_search(query)

        # 2. Always perform web search for the first query or if needed
        if not search_results or self._needs_web_search(query, search_results):
            print("Retrieving information from the web...")
            search_queries = self._generate_search_queries(query)

            # 3. Perform web search and add results to vector store
            for i, search_query in enumerate(search_queries):
                print(f"  Search query {i+1}/{len(search_queries)}: {search_query}")
                documents = self.web_retriever.search(search_query)
                print(f"  Found {len(documents)} documents")
                for doc in documents:
                    self.vector_store.add_document(doc)

            # 4. Search again with new information
            search_results = self.vector_store.similarity_search(query)

        # 5. Format context from search results
        if search_results:
            context = self._format_context(search_results)
            print(f"Using {len(search_results)} relevant chunks of information")
        else:
            context = "No specific information found. Generating response based on general knowledge."
            print("No specific information found in vector store")

        # 6. Generate answer using LLM
        print("Generating answer using LLM...")
        prompt = f"""
        Answer the following question based on the provided context. If the information is not in the context, say so.

        Context:
        {context}

        Question: {query}

        Answer:
        """

        answer = self.llm.generate(prompt)

        # 7. Store interaction in memory
        interaction = {
            "query": query,
            "context": context,
            "answer": answer,
            "timestamp": time.time()
        }
        self.add_to_memory(interaction)

        return {
            "query": query,
            "answer": answer,
            "sources": [r["chunk"]["metadata"].get("source") for r in search_results] if search_results else []
        }

    def recursive_research(self, query: str, max_depth: int = 2) -> Dict[str, Any]:
        """Perform recursive research to answer complex queries"""
        depth = 0
        findings = []
        current_query = query

        print(f"\n🔍 Starting recursive research on: {query}")

        while depth < max_depth:
            print(f"\n📚 Research iteration {depth+1}/{max_depth} - Query: {current_query}")

            # Process the current query
            result = self.process_query(current_query)
            findings.append(result)

            print(f"\n✓ Found: {result['answer'][:100]}...")

            # Generate follow-up query based on findings
            print("\n🤔 Generating follow-up question...")

            follow_up_prompt = f"""
            Based on what we've learned so far about "{query}",
            what important follow-up question should we research next?

            Current findings: {result['answer']}

            Return ONLY the follow-up question, nothing else.
            """

            next_query = self.llm.generate(follow_up_prompt)

            # Clean up the generated query
            next_query = re.sub(r'^[^a-zA-Z0-9]+', '', next_query)  # Remove leading non-alphanumeric chars
            next_query = next_query.split('\n')[0].strip()  # Take first line only

            print(f"📝 Follow-up question: {next_query}")

            # Break if we're not getting meaningful follow-ups
            if len(next_query) < 10 or next_query.lower() == current_query.lower():
                print("No meaningful follow-up questions. Ending research cycle.")
                break

            current_query = next_query
            depth += 1

        # Synthesize final answer from all findings
        print("\n🧠 Synthesizing comprehensive answer...")

        synthesis_prompt = f"""
        Synthesize a comprehensive answer to the original question based on all research findings.

        Original question: {query}

        Research findings:
        {json.dumps([f['answer'] for f in findings], indent=2)}

        Comprehensive answer:
        """

        final_answer = self.llm.generate(synthesis_prompt)

        return {
            "original_query": query,
            "final_answer": final_answer,
            "research_path": [{"query": f["query"], "answer": f["answer"]} for f in findings]
        }

**Main Agent Implementation**

Now we implement the main Agent class that orchestrates the entire process:

In [None]:
class AgentRAG:
    """Main agent class that orchestrates the RAG process"""

    def __init__(self, config: AgentConfig = None):
        print("Initializing AgentRAG...")
        self.config = config or AgentConfig()
        self.vector_store = VectorStore(self.config)
        self.web_retriever = WebRetriever(self.config)
        self.llm = LLMInterface(self.config)
        self.memory = deque(maxlen=10)  # Short-term memory for conversation
        print("AgentRAG initialized!")

    def add_to_memory(self, item: Dict[str, Any]) -> None:
        """Add an interaction to the agent's memory"""
        self.memory.append(item)

    def _format_context(self, chunks: List[Dict[str, Any]]) -> str:
        """Format retrieved chunks into context for the LLM"""
        context = "\n\n".join([
            f"Source: {chunk['chunk']['metadata'].get('source', 'Unknown')}\n"
            f"Content: {chunk['chunk']['text']}"
            for chunk in chunks
        ])
        return context

    def _generate_search_queries(self, user_query: str) -> List[str]:
        """Generate search queries based on the user query"""
        # In production, use LLM to generate these
        base_query = user_query.strip("?").lower()
        return [
            base_query,
            f"latest information about {base_query}",
            f"{base_query} explanation"
        ]

    def _needs_web_search(self, query: str, results: List[Dict[str, Any]]) -> bool:
        """Determine if web search is needed based on query and existing results"""
        if not results:
            return True

        # Check if results are relevant enough
        avg_similarity = sum(r["similarity"] for r in results) / len(results) if results else 0
        return avg_similarity < 0.75

**Query Processing Method**

This is the main method that processes user queries:

In [None]:
def process_query(self, query: str) -> Dict[str, Any]:
        """Process a user query through the full agentic RAG pipeline"""
        # 1. Check existing knowledge
        search_results = self.vector_store.similarity_search(query)

        # 2. Always perform web search for the first query or if needed
        if not search_results or self._needs_web_search(query, search_results):
            print("Retrieving information from the web...")
            search_queries = self._generate_search_queries(query)

            # 3. Perform web search and add results to vector store
            for i, search_query in enumerate(search_queries):
                print(f"  Search query {i+1}/{len(search_queries)}: {search_query}")
                documents = self.web_retriever.search(search_query)
                print(f"  Found {len(documents)} documents")
                for doc in documents:
                    self.vector_store.add_document(doc)

            # 4. Search again with new information
            search_results = self.vector_store.similarity_search(query)

        # 5. Format context from search results
        if search_results:
            context = self._format_context(search_results)
            print(f"Using {len(search_results)} relevant chunks of information")
        else:
            context = "No specific information found. Generating response based on general knowledge."
            print("No specific information found in vector store")

        # 6. Generate answer using LLM
        print("Generating answer using LLM...")
        prompt = f"""
        Answer the following question based on the provided context. If the information is not in the context, say so.

        Context:
        {context}

        Question: {query}

        Answer:
        """

        answer = self.llm.generate(prompt)

        # 7. Store interaction in memory
        interaction = {
            "query": query,
            "context": context,
            "answer": answer,
            "timestamp": time.time()
        }
        self.add_to_memory(interaction)

        return {
            "query": query,
            "answer": answer,
            "sources": [r["chunk"]["metadata"].get("source") for r in search_results] if search_results else []
        }

**Recursive Research Method**

This method enables deeper research through follow-up questions:

In [None]:
def recursive_research(self, query: str, max_depth: int = 2) -> Dict[str, Any]:
        """Perform recursive research to answer complex queries"""
        depth = 0
        findings = []
        current_query = query

        print(f"\n🔍 Starting recursive research on: {query}")

        while depth < max_depth:
            print(f"\n📚 Research iteration {depth+1}/{max_depth} - Query: {current_query}")

            # Process the current query
            result = self.process_query(current_query)
            findings.append(result)

            print(f"\n✓ Found: {result['answer'][:100]}...")

            # Generate follow-up query based on findings
            print("\n🤔 Generating follow-up question...")

            follow_up_prompt = f"""
            Based on what we've learned so far about "{query}",
            what important follow-up question should we research next?

            Current findings: {result['answer']}

            Return ONLY the follow-up question, nothing else.
            """

            next_query = self.llm.generate(follow_up_prompt)

            # Clean up the generated query
            next_query = re.sub(r'^[^a-zA-Z0-9]+', '', next_query)  # Remove leading non-alphanumeric chars
            next_query = next_query.split('\n')[0].strip()  # Take first line only

            print(f"📝 Follow-up question: {next_query}")

            # Break if we're not getting meaningful follow-ups
            if len(next_query) < 10 or next_query.lower() == current_query.lower():
                print("No meaningful follow-up questions. Ending research cycle.")
                break

            current_query = next_query
            depth += 1

        # Synthesize final answer from all findings
        print("\n🧠 Synthesizing comprehensive answer...")

        synthesis_prompt = f"""
        Synthesize a comprehensive answer to the original question based on all research findings.

        Original question: {query}

        Research findings:
        {json.dumps([f['answer'] for f in findings], indent=2)}

        Comprehensive answer:
        """

        final_answer = self.llm.generate(synthesis_prompt)

        return {
            "original_query": query,
            "final_answer": final_answer,
            "research_path": [{"query": f["query"], "answer": f["answer"]} for f in findings]
        }

**Main Execution Function**

In [None]:
def run_agent():
    # Check for API keys
    openai_key = os.environ.get("OPENAI_API_KEY")
    serper_key = os.environ.get("SERPER_API_KEY")

    if not openai_key or openai_key == "your-openai-api-key-here":
        print("⚠️ Warning: OpenAI API key not found or not set.")
        print("Please set your OpenAI API key in the environment variables.")
        return

    if not serper_key or serper_key == "your-serper-api-key-here":
        print("⚠️ Warning: Serper API key not found or not set.")
        print("Please set your Serper API key in the environment variables.")
        return

    # Initialize the agent
    config = AgentConfig(
        api_key=openai_key,
        serper_api_key=serper_key
    )

    agent = AgentRAG(config)

    # Get query from user
    query = input("Enter your research question: ")

    if not query:
        query = "What are the environmental impacts of electric vehicles compared to gas vehicles?"
        print(f"Using default question: {query}")

    print(f"\n{'='*80}")
    print(f"🔍 PROCESSING QUERY: {query}")
    print(f"{'='*80}\n")

    try:
        # Process with regular search first for a quick answer
        print("Performing initial research...")
        basic_result = agent.process_query(query)

        print("\n📝 INITIAL ANSWER:")
        print("-" * 80)
        print(basic_result["answer"])
        print("-" * 80)

        if basic_result["sources"]:
            print("\n📚 SOURCES:")
            for i, source in enumerate(basic_result["sources"], 1):
                print(f"{i}. {source}")

        # Ask user if they want more in-depth research
        user_input = input("\n🤔 Would you like deeper research on this topic? (y/n): ")

        if user_input.lower() in ['y', 'yes']:
            print("\nPerforming deeper recursive research. This may take a few minutes...\n")
            result = agent.recursive_research(query, max_depth=2)

            print("\n🎯 COMPREHENSIVE ANSWER:")
            print("=" * 80)
            print(result["final_answer"])
            print("=" * 80)

            print("\n🔍 RESEARCH PATH:")
            for i, step in enumerate(result["research_path"]):
                print(f"\nStep {i+1}:")
                print(f"Query: {step['query']}")
                print(f"Finding: {step['answer'][:300]}...")
                if len(step['answer']) > 300:
                    print("...")

    except Exception as e:
        print(f"\n❌ Error during research: {str(e)}")
        import traceback
        traceback.print_exc()

    print("\n✨ Done!")

In [None]:
# Run this to start the agent
run_agent()

Initializing AgentRAG...
Loading embedding model on cpu...
Embedding model loaded successfully!
AgentRAG initialized!
Enter your research question: what is the purpose of agents

🔍 PROCESSING QUERY: what is the purpose of agents

Performing initial research...
Retrieving information from the web...
  Search query 1/3: what is the purpose of agents
  Found 3 documents
  Search query 2/3: latest information about what is the purpose of agents
  Found 3 documents
  Search query 3/3: what is the purpose of agents explanation
  Found 3 documents
No specific information found in vector store
Generating answer using LLM...

📝 INITIAL ANSWER:
--------------------------------------------------------------------------------
The purpose of agents can vary depending on the context in which they are used. In general, agents are individuals or entities that act on behalf of others to represent their interests, negotiate deals, provide services, or perform specific tasks. In various fields such as re