# Search Engine: RAG Pipeline with Semantic Caching

## 🎯 Learning Objectives

By the end of this notebook, you will understand:

1. **Retrieval-Augmented Generation (RAG)** - How to combine document retrieval with language models
2. **Semantic Caching** - How to cache responses based on meaning, not exact matches
3. **Vector Databases** - How embeddings enable semantic search
4. **Query Routing** - How to intelligently route queries to appropriate data sources
5. **Performance Optimization** - How caching improves response times and reduces costs

## 📚 What We'll Build

A complete semantic search engine that:
- Searches through 10-K financial documents and OpenAI documentation
- Uses intelligent routing to determine the best data source
- Implements semantic caching for 10x faster responses
- Falls back to web search when needed
- Provides detailed logging for learning and debugging

---
## 1. 🧠 Understanding the Core Concepts

### Retrieval-Augmented Generation (RAG)

RAG combines the power of large language models with external knowledge retrieval:

1. **Query** → User asks a question
2. **Retrieve** → Find relevant documents using semantic search
3. **Augment** → Add retrieved context to the original query
4. **Generate** → LLM generates answer using both query and context

**Why RAG?**
- ✅ Up-to-date information (not limited by training cutoff)
- ✅ Domain-specific knowledge
- ✅ Traceable sources
- ✅ Reduces hallucinations

### Semantic Caching

Traditional caching uses exact matches, but semantic caching understands meaning:

- **Traditional**: "What is Python?" ≠ "Tell me about Python"
- **Semantic**: Both queries have similar meaning and can share cached results

**How it works:**
1. Convert queries to embeddings (vectors)
2. Use similarity search to find semantically similar past queries
3. Return cached response if similarity > threshold
4. Otherwise, process query normally and cache result

**Benefits:**
- ⚡ 10x faster responses (0.1s vs 1-3s)
- 💰 Reduced API costs
- 🎯 Handles paraphrasing and similar questions

---

## 2. 🛠️ Environment Setup

Let's start by installing dependencies and setting up our environment.

In [1]:
# Install required packages. Uncomment if you have not installed
#!pip install openai qdrant-client faiss-cpu  requests python-dotenv numpy pandas matplotlib seaborn tqdm ipywidgets python-dotenv

# For Nomic embeddings
#!pip install sentence-transformers

In [2]:
# Import required libraries
import os
import json
import time
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple, functools, Callable
from dataclasses import dataclass
import warnings
import re

# Data processing
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML, Markdown, clear_output
import ipywidgets as widgets

# ML and embeddings
from sentence_transformers import SentenceTransformer
import faiss

# APIs
from openai import OpenAI,OpenAIError
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import requests

#ENV Variables
from dotenv import load_dotenv
load_dotenv()

# Embedding models (used for text vectorization during retrieval)
from transformers import AutoTokenizer, AutoModel


---

## 3. 📄 Sources

For this demo we will set up the two main external sources of data:
- Web Search: For external queries (e.g., "latest Nvidia earnings") using ARES.
- Local Search from Qdrant Vector database
  1. Vectorized OpenAI documentation
  2. Vectorized 10-K financial filings

---

### Web Search
Let's set up Ares. The get_internet_content function will call ARES using an user query. Most of the code is from a previous notebook so look at RAG in [module 1](https://github.com/hamzafarooq/multi-agent-course/blob/main/Module_1/Agentic_RAG/Agentic_RAG_Notebook.ipynb).

In [3]:
ares_api_key=os.getenv('ARES_API_KEY')

import requests  # For sending HTTP POST requests to the ARES API

def get_internet_content(user_query: str, action: str):
    """
    Fetches a response from the internet using ARES-API based on the user's query.

    This function serves as the tool invoked when the router classifies a query
    as requiring real-time information beyond internal datasets—i.e., "INTERNET_QUERY".
    It sends the query to a live search API (ARES) and returns the result.

    Args:
        user_query (str): The user's question that needs a live answer.
        action (str): Route type (always expected to be "INTERNET_QUERY").

    Returns:
        str: Response text generated using internet search or an error message.
    """
    print("Getting your response from the internet 🌐 ...")

    # API endpoint for the ARES live search tool
    url = "https://api-ares.traversaal.ai/live/predict"

    # Payload structure expected by the ARES API
    payload = {"query": [user_query]}

    # Authentication and content headers for API access
    headers = {
        "x-api-key": ares_api_key,  # Your secret API key (should be securely loaded from environment)
        "content-type": "application/json"
    }

    try:
        # Send the query to the ARES API and check for success
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()

        # Extract and return the main response text from the API's nested JSON
        return response.json().get('data', {}).get('response_text', "No response received.")

    # Handle HTTP-level errors (e.g., 400s or 500s)
    except requests.exceptions.HTTPError as http_err:
        return f"HTTP error occurred: {http_err}"

    # Handle general connection, timeout, or request formatting issues
    except requests.exceptions.RequestException as req_err:
        return f"Request error occurred: {req_err}"

    # Catch-all for any unexpected failure
    except Exception as err:
        return f"An unexpected error occurred: {err}"

### Vector Database
Here we will put two types of data 10-K financial filings and OpenAI documentation. We will take advantage of the metadata field so we can query based on the two types: "10K_DOCUMENT_QUERY" and "OPENAI_QUERY". We will need to setup the Qdrant vector database and incorporate the embeddings of individual documents. What are embeddings?

**Embeddings** are vector representations of text that capture semantic meaning. Documents with similar meanings will have similar embeddings.

### How Embeddings Enable Semantic Search:

1. **Convert text to numbers**: "Python programming" → [0.1, -0.3, 0.7, ...]
2. **Measure similarity**: Use cosine similarity or euclidean distance
3. **Find relevant content**: Similar embeddings = similar meaning

### Why Vector Databases?

- **Speed**: Optimized for similarity search across millions of vectors
- **Scale**: Handle large document collections efficiently
- **Flexibility**: Support metadata filtering and hybrid search

The documents we will use will be from this github repository https://github.com/hamzafarooq/multi-agent-course.git. Let's create a Qdrant client and set the path to the repository and it should load the data that was already been converted to embeddings.

## 4. 💾 Semantic Cache Implementation

Now let's implement our semantic cache using FAISS for lightning-fast similarity search.

### How Semantic Caching Works:

1. **Store**: Query → Embedding → Cache with response
2. **Lookup**: New query → Embedding → Find similar cached query
3. **Match**: If similarity > threshold → Return cached response
4. **Miss**: Otherwise → Process query normally → Cache result

### Performance Benefits:
- **Speed**: 0.1s cache hit vs 1-3s API call
- **Cost**: No API charges for cached responses  
- **Intelligence**: Handles paraphrasing and similar questions

Let's create a caching class that stores cached results in both in FAISS as well as in a file. A file is used when one wants to save a session with cached results as well as load previous cached results at the last session. The code below is alteration of the code that was shown in previous notebook in [semantic cache notebook](https://github.com/hamzafarooq/multi-agent-course/blob/main/Module_3/Semantic_Cache/Semantic_cache_from_scratch.ipynb) to adapt to other sources. The add method allows different sourcing functions for caching.

In [4]:
#!git clone https://github.com/hamzafarooq/multi-agent-course.git
quadrant_client = QdrantClient(path="multi-agent-course/Module_1/Agentic_RAG/qdrant_data")


The get_text_embeddings function below coverts the text into an embedding.

In [5]:
text_tokenizer = AutoTokenizer.from_pretrained("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True)
text_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True)

def get_text_embeddings(text):
    """
    Converts input text into a dense embedding using the Nomic embedding model.
    These embeddings are used to query Qdrant for semantically relevant document chunks.

    Args:
        text (str): The input text or query from the user.

    Returns:
        np.ndarray: A fixed-size vector representing the semantic meaning of the input.
    """
    # Tokenize and prepare input for the model
    inputs = text_tokenizer(text, return_tensors="pt", padding=True, truncation=True)

    # Forward pass to get model outputs
    outputs = text_model(**inputs)

    # Take the mean across all token embeddings to get a single vector (pooled representation)
    embeddings = outputs.last_hidden_state.mean(dim=1)

    # Convert to NumPy array and detach from computation graph
    return embeddings[0].detach().numpy()


<All keys matched successfully>


Below is the contents that is returned from the vector database after embedding the query and doing a similarity search.

In [6]:
def get_vectordb_content(user_query: str, action: str) -> str:
    """
    Retrieves relevant text chunks from the appropriate Qdrant collection
    based on the query type and returns a string format.

    This function powers the retrieval side of the RAG pipeline
    for queries that are classified as either OPENAI-related or 10-K related.

    Args:
        user_query (str): The user's input question.
        action (str): The classification label from the router (e.g., "OPENAI_QUERY", "10K_DOCUMENT_QUERY").

    Returns:
        str: output from the source
    """

    # Define mapping of routing labels to their respective Qdrant collections
    collections = {
        "OPENAI_QUERY": "opnai_data",           # Collection of OpenAI documentation embeddings
        "10K_DOCUMENT_QUERY": "10k_data"        # Collection of 10-K financial document embeddings
    }
 
    try:
        # Ensure that the provided action is valid
        if action not in collections:
            return "Invalid action type for retrieval."

        # Step 1: Convert the user query into a dense vector (embedding)
        try:
            query = get_text_embeddings(user_query)
        except Exception as embed_err:
            return f"Embedding error: {embed_err}"  # Fail early if embedding fails

        # Step 2: Retrieve top-matching chunks from the relevant Qdrant collection
        try:
            text_hits = quadrant_client.query_points(
                collection_name=collections[action],  # Choose the right collection based on routing
                query=query,                          # The embedding of the user's query
                limit=3                               # Fetch top 3 relevant chunks
            ).points
        except Exception as qdrant_err:
            return f"Vector DB query error: {qdrant_err}"  # Handle Qdrant access issues

        # Extract the raw content from the retrieved vector hits
        contents = [point.payload['content'] for point in text_hits]

        # If no relevant content is found, return early
        if not contents:
            return "No relevant content found in the database."
        return '\n'.join(contents)
            
    # Catch any unforeseen errors in the overall process
    except Exception as err:
        return f"Unexpected error: {err}"

Let's create the semantic caching that takes a sourcing function as an argument for the ask function. We will also include some statistics for the caching.

In [7]:
class SemanticCaching:

    def __init__(self, json_file='cache.json', clear_on_init=False):
        # Initialize Faiss index with Euclidean distance
        self.index = faiss.IndexFlatL2(768)  # Use IndexFlatL2 with Euclidean distance
        if self.index.is_trained:
            print('Index trained')

        # Initialize Sentence Transformer model
        self.encoder = SentenceTransformer('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)

        # Euclidean distance threshold for cache hits (lower = more similar)
        self.euclidean_threshold = 0.2

        # JSON file to persist cache entries
        self.json_file = json_file

        # Load cache or clear already loaded cache
        if clear_on_init:
          self.clear_cache()
        else:
          self.load_cache()

        self.stats = {
            "hits": 0,
            "miss": 0
        }

    def clear_cache(self):
        """
        Clears in-memory cache, resets FAISS index, and overwrites cache.json with an empty structure.
        """
        self.cache = {
            'questions': [],
            'embeddings': [],
            'answers': [],
            'response_text': []
        }
        self.index = faiss.IndexFlatL2(768)  # Reinitialize FAISS index
        self.save_cache()
        print("Semantic cache cleared.")

    def load_cache(self):
        """Load existing cache or initialize empty structure."""
        try:
            with open(self.json_file, 'r') as file:
                self.cache = json.load(file)
        except FileNotFoundError:
          # Structure: lists of questions, embeddings, answers, and full response text
            self.cache = {'questions': [], 'embeddings': [], 'answers': [], 'response_text': []}

    def save_cache(self):
        """Persist cache back to disk."""
        with open(self.json_file, 'w') as file:
            json.dump(self.cache, file)

    def ask(self, question: str, source:Callable) -> str:
        """
        Returns a cached answer if within threshold, otherwise generates,
        caches, and returns a new answer.
        """
        start_time = time.time()
        try:
            # Encode the incoming question
            l = [question]
            # embedding = self.encoder.encode(l)
            embedding = self.encoder.encode(l, normalize_embeddings=True)

            # Search for the nearest neighbor in the index
            D, I = self.index.search(embedding, 1)

            # 3) If a neighbor exists and is within threshold → cache hit
            if D[0] >= 0:
                if I[0][0] != -1 and D[0][0] <= self.euclidean_threshold:
                    row_id = int(I[0][0])
                    print(f'Cache hit at row: {row_id} with score {1 - D[0][0]}') #score inversed to show similarity
                    print(f"Time taken: {time.time() - start_time:.3f}s")
                    self.stats["hits"]+=1
                    return self.cache['response_text'][row_id]

            # Handle the case when there are not enough results or Euclidean distance is not met
            answer = self.generate_answer(question, source)

            # Append new entry to cache
            self.cache['questions'].append(question)
            self.cache['embeddings'].append(embedding[0].tolist())
            self.cache['answers'].append(answer)
            self.cache['response_text'].append(answer)
            self.index.add(embedding)
            self.save_cache()
            self.stats["miss"]+=1
            print(f"Time taken: {time.time() - start_time:.3f}s")

            return answer

        except Exception as e:
            raise RuntimeError(f"Error during 'ask' method: {e}")

    def generate_answer(self, question: str, source: Callable) -> str:
        """
        Always use the Traversaal Ares API for new answers.
        Returns (full API result dict, extracted response_text).
        """
        try:
            result = source(question)
            return result
        except Exception as e:
            raise RuntimeError(f"Error during 'generate_answer' method: {e}")

    def analyze_cache_performance(self):
        """Analyze and visualize cache performance."""
    
        # Get cache statistics
        stats = self.stats
        
        display(Markdown("# 📊 Cache Performance Analysis"))
        hit_rate = self.stats["hits"]/(self.stats["hits"]+self.stats["miss"]) * 100
        estimated_cost_savings =  self.stats["hits"] * 0.002  # Rough estimate: $0.002 per query
        time_saved = self.stats["hits"] * 2 #average time of the LLM
        # Display statistics
        stats_html = f"""
        <div style="background-color: #f8f9fa; padding: 20px; border-radius: 10px; margin: 20px 0;">
            <h3>🎯 Overall Performance</h3>
            <div style="display: flex; justify-content: space-around; flex-wrap: wrap;">
                <div style="text-align: center; margin: 10px;">
                    <h2 style="color: #007bff; margin: 0;">{self.stats['miss']}</h2>
                    <p><strong>Cache Entries</strong></p>
                </div>
                <div style="text-align: center; margin: 10px;">
                    <h2 style="color: #28a745; margin: 0;">{hit_rate:.1f}%</h2>
                    <p><strong>Hit Rate</strong></p>
                </div>
                <div style="text-align: center; margin: 10px;">
                    <h2 style="color: #ffc107; margin: 0;">{time_saved:.1f}s</h2>
                    <p><strong>Time Saved</strong></p>
                </div>
                <div style="text-align: center; margin: 10px;">
                    <h2 style="color: #17a2b8; margin: 0;">${estimated_cost_savings:.3f}</h2>
                    <p><strong>Est. Cost Saved</strong></p>
                </div>
            </div>
        </div>
        
        <div style="background-color: #e8f5e8; padding: 15px; border-radius: 10px; margin: 20px 0;">
            <h3>📈 Detailed Statistics</h3>
            <p><strong>Total Queries:</strong> {self.stats['hits']+self.stats['miss']}</p>
            <p><strong>Cache Hits:</strong> {self.stats['hits']}</p>
            <p><strong>Cache Misses:</strong> {self.stats['miss']}</p>
            <p><strong>Average Speedup:</strong> ~10x faster for cached responses</p>
        </div>
        """
        
        display(HTML(stats_html))

In [8]:
cache = SemanticCaching(clear_on_init=True)

Index trained


<All keys matched successfully>


Semantic cache cleared.


### Internet Query

For testing let's see if the cache.ask call really caches the result

In [9]:
get_internet = functools.partial(get_internet_content, action="INTERNET_QUERY")
cache.ask("What is the capital of France?", get_internet)

Getting your response from the internet 🌐 ...
Time taken: 3.934s


'The capital of France is Paris.'

---
Let's try again with a question that is semantically equivalent.

---

In [10]:
cache.ask("What is France's capital?", get_internet)

Cache hit at row: 0 with score 0.9677788019180298
Time taken: 0.063s


'The capital of France is Paris.'

---
Notice the second query has a score > .9 hence it took a cache hit.

---

### Vector Database Query

Again we can see similar type of behavior when we have two queries that are similar.

In [11]:
get_vectordb = functools.partial(get_vectordb_content, action="10K_DOCUMENT_QUERY")
out = cache.ask("What was Uber revenue in 2021?", get_vectordb)
out[:300]

Time taken: 0.074s


'(6,946)\n(1,025)\nProvision for (benefit from) income taxes\n(192)\n(492)\nLoss from equity method investments\n(34)\n(37)\nNet loss including non-controlling interests\n(6,788)\n(570)\nLess: net loss attributable to non-controlling interests, net of tax\n(20)\n(74)\nNet loss attributable to Uber Technologies, In'

In [12]:
out = cache.ask("How much was Uber's 2021 revenue?", get_vectordb)
out[:300]

Cache hit at row: 1 with score 0.931459367275238
Time taken: 0.027s


'(6,946)\n(1,025)\nProvision for (benefit from) income taxes\n(192)\n(492)\nLoss from equity method investments\n(34)\n(37)\nNet loss including non-controlling interests\n(6,788)\n(570)\nLess: net loss attributable to non-controlling interests, net of tax\n(20)\n(74)\nNet loss attributable to Uber Technologies, In'

## 5. LLM Response

The cache.ask function can work for any part of the pipeline including the source functions but we want to cache the LLM response not the sources so let's create a rag_response function. First let create the routing and sub-query division of the query. We want routing so a decision is made whether to choose the the vector database's 10-k filing, vector database's OpenAI technology , or the internet.

### Router & Sub-Query Division

In [13]:
openai_api_key = os.getenv('OPENAI_API_KEY')

# Initialize the OpenAI client with the retrieved API key
# This client will be used for:
# - Query classification via the router prompt
# - Potentially generating responses from retrieved context
openaiclient = OpenAI(api_key=openai_api_key)

def router(user_query:str) -> str:
    router_system_prompt =f"""
    As a professional query router, your objective is to correctly classify user input into one of three categories based on the source most relevant for answering the query:
    1. "OPENAI_QUERY": If the user's query appears to be answerable using information from OpenAI's official documentation, tools, models, APIs, or services (e.g., GPT, ChatGPT, embeddings, moderation API, usage guidelines).
    2. "10K_DOCUMENT_QUERY": If the user's query pertains to a collection of documents from the 10k annual reports, datasets, or other structured documents, typically for research, analysis, or financial content.
    3. "INTERNET_QUERY": If the query is neither related to OpenAI nor the 10k documents specifically, or if the information might require a broader search (e.g., news, trends, tools outside these platforms), route it here.

    Your decision should be made by assessing the domain of the query.

    Always respond in this valid JSON format:
    {{
        "action": "OPENAI_QUERY" or "10K_DOCUMENT_QUERY" or "INTERNET_QUERY",
        "reason": "brief justification",
        "answer": "AT MAX 5 words answer. Leave empty if INTERNET_QUERY"
    }}

    EXAMPLES:

    - User: "How to fine-tune GPT-3?"
    Response:
    {{
        "action": "OPENAI_QUERY",
        "reason": "Fine-tuning is OpenAI-specific",
        "answer": "Use fine-tuning API"
    }}

    - User: "Where can I find the latest financial reports for the last 10 years?"
    Response:
    {{
        "action": "10K_DOCUMENT_QUERY",
        "reason": "Query related to annual reports",
        "answer": "Access through document database"
    }}

    - User: "Top leadership styles in 2024"
    Response:
    {{
        "action": "INTERNET_QUERY",
        "reason": "Needs current leadership trends",
        "answer": ""
    }}

    - User: "What's the difference between ChatGPT and Claude?"
    Response:
    {{
        "action": "INTERNET_QUERY",
        "reason": "Cross-comparison of different providers",
        "answer": ""
    }}

    Strictly follow this format for every query, and never deviate.
    User: {user_query}
    """

    try:
        # Query the GPT-4 model with the router prompt and user input
        response = openaiclient.chat.completions.create(
            model="gpt-4.1",
            messages=[{"role": "system", "content": router_system_prompt}]
        )

        # Extract and parse the model's JSON response
        task_response = response.choices[0].message.content
        json_match = re.search(r"\{.*\}", task_response, re.DOTALL)
        json_text = json_match.group()
        parsed_response = json.loads(json_text)
        return parsed_response

    # Handle OpenAI API errors (e.g., rate limits, authentication)
    except OpenAIError as api_err:
        return {
            "action": "INTERNET_QUERY",
            "reason": f"OpenAI API error: {api_err}",
            "answer": ""
        }

    # Handle case where model response isn't valid JSON
    except json.JSONDecodeError as json_err:
        return {
            "action": "INTERNET_QUERY",
            "reason": f"JSON parsing error: {json_err}",
            "answer": ""
        }

    # Catch-all for any other unforeseen issues
    except Exception as err:
        return {
            "action": "INTERNET_QUERY",
            "reason": f"Unexpected error: {err}",
            "answer": ""
        }

Let's do a sub-query division where we take any complex query and break it down to simple or atomic queries. 

In [14]:
def sub_queries(user_query):
    sub_queries_prompt = f"""
    You are a query router. If the input contains multiple distinct questions, break it into sub-questions making sure each question has enough context. Otherwise, keep it as one. Return a JSON object like:

      Rules:
      - If a time frame, location, or other modifier applies to the whole compound sentence, propagate it to *each* sub-question.
      - Ensure each sub-question is fully self-contained and unambiguous.
      - Use the same tense and wording as the original query, unless clarification is required for completeness.

    {{
        "subQuestions": ["..."]
    }}
    """

    system_prompt = f"""
    {sub_queries_prompt}

    Query: "{user_query}"
    Output:
    """
    response = openaiclient.chat.completions.create(
          model="gpt-4.1",
          messages=[
              {"role": "system", "content": system_prompt},
          ]
      )
    return response.choices[0].message.content

### RAG Components

We will incorporate different functions to complete the rag system. 

The function rag_formatted_response returns the LLM response with the context retrieved from the vectordb or internet

In [15]:
def rag_formatted_response(user_query: str, context: str):
    """
    Generate a response to the user query using the provided context,
    with article references formatted as [1][2], etc from vector db or response from a internet search

    This function performs the final step in the RAG pipeline—synthesizing an answer
    from retrieved document chunks (context). It prompts the model to generate a
    grounded response, explicitly citing sources using a reference format.

    Args:
        user_query (str): The user's original question.
        context (str): List of text chunks retrieved from Qdrant (10-K or OpenAI docs) in str format or from a internet search.

    Returns:
        str: A generated response grounded in the retrieved context, with numbered citations.
    """

    # Construct a RAG prompt that includes both:
    # 1. The user's query
    # 2. The supporting context documents
    # The prompt instructs the model to answer using only the provided context,
    # and to include citations like [1], [2], etc. based on chunk IDs or order.
    rag_prompt = f"""
       Based on the given context, answer the user query: {user_query}\nContext:\n{context}
       and employ references to the ID of articles provided [ID], ensuring their relevance to the query.
       The referencing should always be in the format of [1][2]... etc. </instructions>
    """

    #  Call GPT-4o to generate the response using the RAG-style prompt
    response = openaiclient.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": rag_prompt},
        ]
    )

    # Return the model's generated answer
    return response.choices[0].message.content

The retrieve function chooses which source whether it is the db uses the above function the get a response from the LLM.

In [16]:
def retrieve_and_response(user_query: str, action: str):
    """
    Retrieves relevant text chunks from the appropriate Qdrant collection
    based on the query type, then generates a response using RAG.

    This function powers the retrieval and response generation pipeline
    for queries that are classified as either OPENAI-related or 10-K related.
    It uses semantic search to fetch relevant context from a Qdrant vector store
    and then generates a response using that context via a RAG prompt.

    Args:
        user_query (str): The user's input question.
        action (str): The classification label from the router (e.g., "OPENAI_QUERY", "10K_DOCUMENT_QUERY").

    Returns:
        str: A model-generated response grounded in retrieved documents, or an error message.
    """

    # Define mapping of routing labels to their respective Qdrant collections
    collections = {
        "OPENAI_QUERY": "opnai_data",           # Collection of OpenAI documentation embeddings
        "10K_DOCUMENT_QUERY": "10k_data"        # Collection of 10-K financial document embeddings
    }

    try:
        # Ensure that the provided action is valid
        if action not in collections:
            return "Invalid action type for retrieval."

        # Step 1: Convert the user query into a dense vector (embedding)
        try:
            query = get_text_embeddings(user_query)
        except Exception as embed_err:
            return f"Embedding error: {embed_err}"  # Fail early if embedding fails

        # Step 2: Retrieve top-matching chunks from the relevant Qdrant collection
        try:
            text_hits = quadrant_client.query_points(
                collection_name=collections[action],  # Choose the right collection based on routing
                query=query,                          # The embedding of the user's query
                limit=3                               # Fetch top 3 relevant chunks
            ).points
        except Exception as qdrant_err:
            return f"Vector DB query error: {qdrant_err}"  # Handle Qdrant access issues

        # Extract the raw content from the retrieved vector hits
        contents = [point.payload['content'] for point in text_hits]

        # If no relevant content is found, return early
        if not contents:
            return "No relevant content found in the database."

        # Step 3: Pass the retrieved context to the RAG model to generate a response
        try: # attempt to see if cache data exists otherwise generate response and store in cache
            response = rag_formatted_response(user_query, contents)
            return response
        except Exception as rag_err:
            return f"RAG response error: {rag_err}"  # Handle generation failures

    # Catch any unforeseen errors in the overall process
    except Exception as err:
        return f"Unexpected error: {err}"

And finally the agentic_semantic_rag which returns the final results either through cached results or from the LLM with context.

In [17]:

routes = {
    "OPENAI_QUERY": retrieve_and_response,
    "10K_DOCUMENT_QUERY": retrieve_and_response,
    "INTERNET_QUERY": get_internet_content,
}

def agentic_semantic_rag(user_query: str):
    """
    Main function that runs the full Agentic RAG system with semantic cache and sub-query divsion.

    This function takes a user's question, decides what type of query it is (OpenAI-related,
    financial document-related, or general internet), and then calls the right function
    to handle it. Finally, it prints out the full conversation and response.

    Args:
        user_query (str): The user's input question.

    Returns:
        None (It just prints the result nicely to the console)
    """

    #  Terminal color codes to make the printed output easier to read and visually structured
    CYAN = "\033[96m"
    GREY = "\033[90m"
    BOLD = "\033[1m"
    RESET = "\033[0m"

    try:
        # Step 1: Print the user's original question to the console
        print(f"{BOLD}{CYAN}👤 User Query:{BOLD}{GREY} {user_query}\n")

        subQuestions = json.loads(sub_queries(user_query))['subQuestions']
        # Step 2: Iterrate over sub-query division and use the router (powered by GPT) to decide which route the query belongs to
        output = []
        for subQuestion in subQuestions:
            
            try:
                response = router(subQuestion)
            except Exception as route_err:
                # If something goes wrong while classifying the query, show an error message
                print(f"{BOLD}{CYAN}🤖 BOT RESPONSE:{RESET}\n")
                print(f"Routing error: {route_err}\n")
                return
    
            # Extract the routing decision and the reason behind it
            action = response.get("action")  # e.g., "OPENAI_QUERY"
            reason = response.get("reason")  # e.g., "Related to OpenAI tools"
    
            # Step 3: Show the selected route and why it was chosen
            print(f"{GREY}📍 Selected Route: {action}")
            print(f"📝 Reason: {reason}")
            print(f"⚙️ Processing query...{RESET}\n")
    
            # Step 4: Call the correct function depending on the route (retrieval or web search)
            try:
                route_function = routes.get(action)  # Find the function to use for this route
                if route_function:
                    rf = functools.partial(route_function, action=action)
                    result = cache.ask(subQuestion, rf)
                    output.append(result)
                else:
                    result = f"Unsupported action: {action}"  # Catch unknown routing types
            except Exception as exec_err:
                result = f"Execution error: {exec_err}"  # Handle failure in the chosen route function
    
            # Step 5: Print the final response to the user
            
        result = '\n'.join(output)
        print(f"{BOLD}{CYAN}🤖 BOT RESPONSE:{RESET}\n")
        print(f"{result}\n")
        return result

    except Exception as err:
        # Catch-all for any unexpected errors in the overall logic
        print(f"{BOLD}{CYAN}🤖 BOT RESPONSE:{RESET}\n")
        print(f"Unexpected error occurred: {err}\n")


In [18]:
user_input = ""  # Initialize user_input to an empty string
cache.clear_cache() #clear the cache just in case
while user_input != "quit":  # Loop continues until user types "quit"
    user_input = input("Enter something (type 'quit' to exit): ")
    if user_input != "quit":
        #print(f"You entered: {user_input}")
        response = agentic_semantic_rag(user_input)
        print("-" * 70) 
    

print("Query Search Ended!")

Semantic cache cleared.


Enter something (type 'quit' to exit):  What were Uber and Lyft’s revenues in 2021?


[1m[96m👤 User Query:[1m[90m What were Uber and Lyft’s revenues in 2021?

[90m📍 Selected Route: 10K_DOCUMENT_QUERY
📝 Reason: Question about company's financials
⚙️ Processing query...[0m

Time taken: 1.495s
[90m📍 Selected Route: 10K_DOCUMENT_QUERY
📝 Reason: Revenue figure from annual report
⚙️ Processing query...[0m

Time taken: 1.235s
[1m[96m🤖 BOT RESPONSE:[0m

Uber's revenue in 2021 was $17,455 million [2].
The context does not provide information on Lyft's revenue in 2021.

----------------------------------------------------------------------


Enter something (type 'quit' to exit):  Uber's 2021 revenue?


[1m[96m👤 User Query:[1m[90m Uber's 2021 revenue?

[90m📍 Selected Route: 10K_DOCUMENT_QUERY
📝 Reason: Request for historical financial data
⚙️ Processing query...[0m

Cache hit at row: 0 with score 1.0
Time taken: 0.065s
[1m[96m🤖 BOT RESPONSE:[0m

Uber's revenue in 2021 was $17,455 million [2].

----------------------------------------------------------------------


Enter something (type 'quit' to exit):  Give me revenue numbers by Uber in 2021


[1m[96m👤 User Query:[1m[90m Give me revenue numbers by Uber in 2021

[90m📍 Selected Route: 10K_DOCUMENT_QUERY
📝 Reason: Uber's revenue is found in 10k reports
⚙️ Processing query...[0m

Cache hit at row: 0 with score 1.0
Time taken: 0.162s
[1m[96m🤖 BOT RESPONSE:[0m

Uber's revenue in 2021 was $17,455 million [2].

----------------------------------------------------------------------


Enter something (type 'quit' to exit):  How do I authenticate with OpenAI's API?


[1m[96m👤 User Query:[1m[90m How do I authenticate with OpenAI's API?

[90m📍 Selected Route: OPENAI_QUERY
📝 Reason: OpenAI API authentication is specific
⚙️ Processing query...[0m

Time taken: 8.015s
[1m[96m🤖 BOT RESPONSE:[0m

To authenticate with OpenAI's API, you will need to use API keys for authentication [1]. The process is as follows:

- First, create or manage your API key in your organization settings [1].
- Remember not to share your API key or expose it in client-side code [1].
- API keys should be securely loaded from an environment variable or key management service on the server [1].
- Provide your API keys via HTTP Bearer authentication [1].
  
You can pass a header to specify which organization and project to use for an API request, especially if you belong to multiple organizations or access projects through a legacy user API key [1].

Here is an example of the curl command for authorization:
```
curl https://api.openai.com/v1/models \
  -H "Authorization: Beare

Enter something (type 'quit' to exit):  How do text embeddings work?


[1m[96m👤 User Query:[1m[90m How do text embeddings work?

[90m📍 Selected Route: OPENAI_QUERY
📝 Reason: Embeddings are part of OpenAI API
⚙️ Processing query...[0m

Time taken: 6.270s
[1m[96m🤖 BOT RESPONSE:[0m

Text embeddings work by converting pieces of text data into vector form that preserves aspects of their content or meaning. This is done through processing the text in chunks known as tokens, which represent common sequences of characters[1]. Text that has similar content or meaning will have closer embeddings than those that are unrelated[2]. OpenAI offers text embedding models that accept a text string as input and produce an embedding vector as output. These embeddings serve useful for tasks like search, clustering, recommendation systems, anomaly detection, and classification among others[2]. For example, you can request embeddings through OpenAI's API by providing the text you want to input, specifying the model, and the encoding format you want the embeddings retur

Enter something (type 'quit' to exit):  Explain text embeddings to me


[1m[96m👤 User Query:[1m[90m Explain text embeddings to me

[90m📍 Selected Route: OPENAI_QUERY
📝 Reason: Text embeddings are OpenAI feature
⚙️ Processing query...[0m

Cache hit at row: 3 with score 0.8538163304328918
Time taken: 0.082s
[1m[96m🤖 BOT RESPONSE:[0m

Text embeddings work by converting pieces of text data into vector form that preserves aspects of their content or meaning. This is done through processing the text in chunks known as tokens, which represent common sequences of characters[1]. Text that has similar content or meaning will have closer embeddings than those that are unrelated[2]. OpenAI offers text embedding models that accept a text string as input and produce an embedding vector as output. These embeddings serve useful for tasks like search, clustering, recommendation systems, anomaly detection, and classification among others[2]. For example, you can request embeddings through OpenAI's API by providing the text you want to input, specifying the model, a

Enter something (type 'quit' to exit):  quit


Query Search Ended!


### STATS!

Take a look at some caching statistics. The costs and time related related information are just very rough estimates.

In [19]:
cache.analyze_cache_performance()

# 📊 Cache Performance Analysis

# 6. 📖 Additional Resources

### 📚 Further Reading

- [RAG Papers and Research](https://arxiv.org/abs/2005.11401) - Original RAG paper
- [Vector Database Comparison](https://liquidmetal.ai/casesAndBlogs/vector-comparison/) 
- [Semantic Search Best Practices](https://www.pinecone.io/learn/semantic-search/)
- [LangChain RAG Tutorial](https://python.langchain.com/docs/tutorials/retrievers/)

### 🛠️ Tools and Frameworks

- **Vector Databases**: Qdrant, Pinecone, Weaviate, Chroma
- **Embedding Models**: OpenAI, Cohere, Sentence-Transformers
- **RAG Frameworks**: LangChain, LlamaIndex, Haystack
- **Monitoring**: Weights & Biases, MLflow, Arize

### 🎓 Next Learning Steps

1. **Advanced RAG**: Multi-hop reasoning, graph-based retrieval
2. **Fine-tuning**: Custom embedding models for domain data
3. **Evaluation**: RAGAS, answer quality metrics
4. **Production**: Deployment, scaling, monitoring

---

*Thank you for following along with this comprehensive semantic search engine tutorial! 🚀*