## Overview

In this lab, we will deploy a Retrieval-Augmented Generation (RAG) powered AI Chatbot. By cloning this project to your [NVIDIA AI Workbench](https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/workbench/), you have already taken care of many configuration and prerequisites for this blueprint. 

This system enhances traditional large language models by incorporating external knowledge, allowing the model to provide more accurate and contextually relevant responses.

The system works in two key stages:

### Data Ingestion Pipeline
- **Ingests and processes enterprise data**: Ingests and processes user documents.
Processing includes detecting and extracting graphic elements such as tables, charts, infographics.

- **Creates embeddings**: Converts text into vector representations that capture semantic meaning

- **Builds vector database**: Stores these embeddings in a searchable database for efficient retrieval

### Query / Response Pipeline
- **Embeds user queries**: Converts user questions into vector embeddings

- **Retrieves relevant context**: Finds semantically similar documents in the vector database

- **Reranks results**: Prioritizes the most relevant information

- **Generates responses**: Uses an LLM to craft comprehensive responses based on retrieved context

Both data and queries are encoded as vectors through an embedding process, enabling efficient similarity search based on semantic meaning rather than simple keyword matching.

## Table of Contents

| Section | Description |
|---------|-------------|
| 1. [Spin Up the Blueprint](#1.-Spin-Up-the-Blueprint) | Set up Docker containers and deploy services |
| 2. [Interact with the Microservices](#2.-Interact-with-the-microservices) | Test API endpoints and microservice functionality |
| 3. [Interacting with the Chatbot](#3.-Interacting-with-the-Chatbot) | Use the chatbot with and without knowledge base |
| 4. [Understanding Document Retrieval and Reranking](#4.-Understanding-Document-Retrieval-and-Reranking) | Understand how the RAG system works |

## 1. Spin Up the Blueprint

Congratulations! You have cloned this blueprint to your NVIDIA AI Workbench. Now, let's walk through how to get this blueprint spun up properly so you can interact with it via either the chatbot interface or later on in this deep dive notebook. 

By default, the blueprint for AI Workbench is kept lightweight by utilizing NVIDIA-hosted **Build Endpoints**. However, you also have the option to deploy your microservices locally. 

**Note:** By default, this blueprint deploys the referenced NIM microservices locally. For this, you will require a minimum of:

 - 2xH100
 - 2xB200
 - 3xA100

#### 1.1.1 Use Locally-Deployed Microservices (default)

1. On the **Project Dashboard** in AI Workbench, select ``ingest``, ``rag``, ``vectordb``, and ``local`` compose profiles under the **Compose** section.

   - Note: ``observability`` and ``guardrails`` are optional profiles you may enable.

1. Select **Start**. The compose services may take several minutes to pull and build.

1. When all compose services are ready, access the frontend on the IP address, eg. ``http://<ip_addr>:8090``. 

1. You can now interact with the RAG Chatbot through its browser interface. 

#### 1.1.2 Use Build Endpoints

1. Inside your Jupyterlab, locate the ``variables.env`` file at the top level of this repo.

1. Make the following edits: 

   - Comment out the variables for ``on-prem NIMs``
   - Uncomment the variables for using ``cloud NIMs``
   - Save your changes

1. On the **Project Dashboard** in AI Workbench, select ``ingest``, ``rag``, and ``vectordb`` compose profiles under the **Compose** section.

   - Note: ``observability`` and ``guardrails`` are optional profiles you may enable.

1. Select **Start**. The compose services may take several minutes to pull and build.

1. When the compose services are ready, access the frontend on the IP address, eg. ``http://<ip_addr>:8090``. 

1. You can now interact with the RAG Chatbot through its browser interface.

### 1.2 Testing the Chatbot

To evaluate how the RAG system works, try these experiments:

1. Create a new collection

2. Add source -- upload a document

3. Try asking a question about the document without selecting the new Collection (knowledge base off)

4. Ask a question with knowledge base on

## 2. Interact with the microservices
In this section, we'll explore how to directly interact with the various microservices that make up our RAG system using their APIs.


### 2.1 RAG Server API Usage

#### 2.1.1 Test the OpenAI-compatible /chat/completions endpoint
- Ensure the compose services are running and ready following the steps above.

##### 2.1.1.1 Setup Base Configuration

Here we will set up the base configuration for the RAG server and a helper function to print the API responses.



In [None]:
import json
import os

import aiohttp
import requests

# Name for the compose service
IPADDRESS = "rag-server"

# Port number for the service
rag_server_port = "8081"

# Base URL constructed from IP and port for making API requests
RAG_BASE_URL = f"http://{IPADDRESS}:{rag_server_port}"


async def print_raw_response(response):
    """Helper function to print API responses."""
    try:
        response_json = await response.json()
        print(json.dumps(response_json, indent=2))
    except aiohttp.ClientResponseError:
        print(await response.text())

##### 2.1.1.2 Test the RAG server health endpoint and chat completion endpoint
We will test both the health endpoint and chat completion functionality of our RAG server


- **Health Check Endpoint purpose:**
This endpoint performs a health check on the server. It returns a 200 status code if the server is operational.

- **Chat Completion Endpoint purpose:**
This endpoint accepts user queries and converts them to embeddings, then retrieves semantically similar document chunks from the knowledge base.


In [None]:
# 1. Test the RAG server health endpoint to verify it's running properly
url = f"{RAG_BASE_URL}/v1/health"
print("\nStep 1: Testing RAG server health endpoint")
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        await print_raw_response(response)

# 2. Test basic chat completion endpoint without using the knowledge base
payload = {
    "messages": [
        {
            "role": "user",
            "content": "Hi",  # Simple test message
        }
    ],
    "use_knowledge_base": False,  # Disable RAG functionality
    "temperature": 0.2,  # Lower temperature for more focused responses
    "model": "nvidia/llama-3.3-nemotron-super-49b-v1",  # Specify LLM model to use
}

chat_url = f"{RAG_BASE_URL}/v1/chat/completions"

print("\nStep 2: Testing chat completion endpoint")
print("\nSending request to:", chat_url)
print("\nWith payload:", json.dumps(payload, indent=2))

async with aiohttp.ClientSession() as session:
    async with session.post(chat_url, json=payload) as response:
        await print_raw_response(response)

##### 2.1.1.3 (Optional) Direct LLM service usage

Similarly it's also possible to directly call the `nim-llm-ms` service to generate a response. This is useful when you want to bypass the RAG server and directly use the NIM service.

<span style='color: #e74c3c; '>**NOTE:** By default, this blueprint for AI Workbench is kept lightweight by utilizing NVIDIA-hosted **Build Endpoints** instead of locally running microservices. If you are not using locally-running microservices, you may skip this cell. </span>

<span style='color: #e74c3c; '>To convert this blueprint to using locally-running NVIDIA NIMs, </span>

1. <span style='color: #e74c3c; '>Comment out the overriding environment variables in  ``variables.env`` </span>

2. <span style='color: #e74c3c; '>Select the ``local`` compose profile from the dropdown when starting and/or restarting the compose services in AI Workbench. </span>


In [None]:
# Port number for the service
nim_llm_server_port = "8000"

# Name for the compose service
IPADDRESS = "nim-llm-ms"

# Base URL constructed from IP and port for making API requests
NIM_LLM_BASE_URL = f"http://{IPADDRESS}:{nim_llm_server_port}"
NIM_BASE_URL = f"http://{IPADDRESS}:{rag_server_port}"

nim_chat_url = f"{NIM_LLM_BASE_URL}/v1/chat/completions"

payload = {
    "messages": [
        {"role": "user", "content": "What is Retrieval Augmented Generation?"}
    ],
    "stream": False,
    "model": "nvidia/llama-3.3-nemotron-super-49b-v1",
    "max_tokens": 1024,
    "temperature": 0.2,
}


print("\nStep 3: Testing NIM LLM endpoint")
print("\nSending request to:", nim_chat_url)
print("\nWith payload:", json.dumps(payload, indent=2))

async with aiohttp.ClientSession() as session:
    async with session.post(nim_chat_url, json=payload) as response:
        await print_raw_response(response)

### 2.2 Ingestor Server API Usage

- Ensure the compose services are running and ready following the steps above.

- You can customize the directory path (`../data/multimodal`) with the correct location of your dataset.


#### 2.2.1 Health Check Endpoint
**Purpose:**
This endpoint performs a health check on the server. It returns a 200 status code if the server is operational.

In [None]:
# Name for the compose service
IPADDRESS = "ingestor-server"

# Port number for the service
ingestor_server_port = "8082"

# Base URL constructed from IP and port for making API requests
INGESTOR_BASE_URL = f"http://{IPADDRESS}:{ingestor_server_port}"

# Test the RAG server health endpoint to verify it's running properly
url = f"{INGESTOR_BASE_URL}/v1/health"
print("\nStep 1: Testing RAG server health endpoint")
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        await print_raw_response(response)

#### 2.2.2 Vector DB APIs Usage

##### 2.2.2.1 Create collection Endpoint
**Purpose:**
This endpoint is used to create a collection in the vector store. 

In [None]:
async def create_collections(
    collection_names: list = None,
    collection_type: str = "text",
    embedding_dimension: int = 2048,
):
    """Create one or more collections in the vector store.

    Args:
        collection_names (list): List of collection names to create
        collection_type (str): Type of collection, defaults to "text"
        embedding_dimension (int): Dimension of embeddings, defaults to 2048

    Returns:
        Response from the API endpoint or error details if request fails
    """
    # Parameters for creating collections
    params = {
        "vdb_endpoint": "http://milvus:19530",  # Milvus vector DB endpoint
        "collection_type": collection_type,  # Type of collection
        "embedding_dimension": embedding_dimension,  # Dimension of embeddings
    }

    HEADERS = {"Content-Type": "application/json"}

    # Make API request to create collections
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{INGESTOR_BASE_URL}/v1/collections",
                params=params,
                json=collection_names,
                headers=HEADERS,
            ) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            return 500, {"error": str(e)}


# Create a collection named "multimodal_data"
await create_collections(collection_names=["multimodal_data"])

##### 2.2.2.2 Get collections Endpoint
**Purpose:**
This endpoint is used to get a list of collection names from the Milvus server. Returns a list of collection names.



In [None]:
# First let's create another collection
await create_collections(collection_names=["multimodal_data1"])


# Now let's get the list of collections
async def fetch_collections():
    """Retrieve a list of all collections from the Milvus vector database.

    Makes a GET request to the ingestor API endpoint to fetch all collection names
    from the specified Milvus server.

    Returns:
        Response from the API endpoint containing the list of collections,
        or prints error message if request fails.
    """
    url = f"{INGESTOR_BASE_URL}/v1/collections"
    params = {"vdb_endpoint": "http://milvus:19530"}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, params=params) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await fetch_collections()

##### 2.2.2.3 Delete collections Endpoint

**Purpose:**
This endpoint deletes list of provided collection names available on the specified vector database server.


In [None]:
from typing import List


async def delete_collections(collection_names: list[str] = "") -> None:
    """Delete specified collections from the Milvus vector database.

    Makes a DELETE request to the ingestor API endpoint to remove the specified
    collections from the Milvus server.

    Args:
        collection_names (List[str]): List of collection names to delete.
            Defaults to empty string.

    Returns:
        None. Prints response from API or error message if request fails.

    Example:
        await delete_collections(collection_names=["collection1", "collection2"])
    """
    url = f"{INGESTOR_BASE_URL}/v1/collections"
    params = {"vdb_endpoint": "http://milvus:19530"}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.delete(
                url, params=params, json=collection_names
            ) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


# Delete the collection from the previous section
print("\nDeleting collection 'multimodal_data1'...")
await delete_collections(collection_names=["multimodal_data1"])

# Fetch collections
print("\nFetching remaining collections:")
print("-" * 30)
await fetch_collections()

#### 2.2.3 Ingestion API Usage

##### 2.2.3.1 Upload Document Endpoint

**Purpose:**
This endpoint uploads new documents to the vector store. 
1. You can specify the collection name where documents should be stored.

2. The collection must exist in the vector database before uploading documents.

3. Documents must not already exist in the collection. To update existing documents, use `session.patch(...)` instead of `session.post(...)`

4. Multiple files can be uploaded in a single request for efficiency

**Configuration Options:**

You can customize the document processing with these parameters:

- `extraction_options`: Control what content is extracted (text, tables, charts)

- `split_options`: Define how documents are chunked (size, overlap)

- Custom metadata: Add additional information to your documents

**We'll fetch the documents to verify ingestion, and then delete the document.**

In [None]:
# Directory containing multimodal documents to upload
DATA_DIR = "../../data/multimodal"


async def upload_documents(collection_name: str = "") -> None:
    """
    Uploads documents from DATA_DIR to the specified collection in the vector store.

    This function:
    1. Reads all files from DATA_DIR
    2. Configures extraction and chunking options
    3. Uploads documents via POST request to the documents endpoint

    Args:
        collection_name (str): Name of the collection to upload documents to.
                             Collection must exist before uploading.

    Extraction options:
        - Extracts text, tables and charts by default
        - Uses pdfium for extraction
        - Processes at page level granularity

    Chunking options:
        - chunk_size: 1024 tokens
        - chunk_overlap: 150 tokens

    """
    # Get list of files from DATA_DIR
    files = [
        os.path.join(DATA_DIR, f)
        for f in os.listdir(DATA_DIR)
        if os.path.isfile(os.path.join(DATA_DIR, f))
    ]

    # Configure upload parameters
    # Configure document processing parameters
    data = {
        # Milvus vector database endpoint
        "vdb_endpoint": "http://milvus:19530",
        # Target collection name for document storage
        "collection_name": collection_name,
        # Document extraction configuration
        "extraction_options": {
            "extract_text": True,  # Extract text content
            "extract_tables": True,  # Extract tabular data
            "extract_charts": True,  # Extract charts/figures
            "extract_images": False,  # Skip image extraction
            "extract_method": "pdfium",  # Use pdfium PDF parser
            "text_depth": "page",  # Process at page granularity
        },
        # Text chunking configuration
        "split_options": {
            "chunk_size": 1024,  # Size of each text chunk in tokens
            "chunk_overlap": 150,  # Overlap between chunks in tokens
        },
    }

    # Prepare multipart form data with files and config
    form_data = aiohttp.FormData()
    for file_path in files:
        form_data.add_field(
            "documents",
            open(file_path, "rb"),
            filename=os.path.basename(file_path),
            content_type="application/pdf",
        )
    form_data.add_field("data", json.dumps(data), content_type="application/json")

    # Upload documents
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{INGESTOR_BASE_URL}/v1/documents", data=form_data
            ) as response:  # Replace with session.patch for reingesting
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


# Upload documents to the multimodal_data collection
await upload_documents(collection_name="multimodal_data")

##### 2.2.3.2 Delete Document Endpoint

**Purpose:**
This endpoint removes specific documents from the vector store.

To demonstrate the functionality of this endpoint, we'll perform a complete document management workflow:

| Step | Description | Endpoint |
|------|-------------|----------|
| 1. Create | Generate a sample text document | N/A |
| 2. Upload | Add the document to the vector store | `POST /v1/documents` |
| 3. Verify | Check that the document was ingested | `GET /v1/documents` |
| 4. Delete | Remove the document from the vector store | `DELETE /v1/documents` |

This workflow demonstrates the full lifecycle of document management in the RAG system, allowing you to update your knowledge base as needed.


In [None]:
import tempfile

# Step 1. Create a sample text document
sample_text = """This is a sample text document.
It contains multiple lines of text.
This will be uploaded to the vector store for retrieval."""

# Create temporary text file
with tempfile.NamedTemporaryFile(mode="w+", suffix=".txt", delete=False) as temp_file:
    temp_file.write(sample_text)
    temp_file_path = temp_file.name

try:
    data = {
        "vdb_endpoint": "http://milvus:19530",
        "collection_name": "multimodal_data",
        "extraction_options": {
            "extract_text": True,
            "extract_tables": False,
            "extract_charts": False,
            "extract_images": False,
            "extract_method": "pdfium",
            "text_depth": "page",
        },
        # Text chunking configuration
        "split_options": {"chunk_size": 1024, "chunk_overlap": 150},
    }

    # Step 2. Upload file
    form_data = aiohttp.FormData()
    form_data.add_field(
        "documents",
        open(temp_file_path, "rb"),
        filename="sample_document.txt",
        content_type="text/plain",
    )
    form_data.add_field("data", json.dumps(data), content_type="application/json")

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{INGESTOR_BASE_URL}/v1/documents", data=form_data
            ) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")
finally:
    # Clean up the temporary file
    os.unlink(temp_file_path)

**Now fetch the documents to verify ingestion, and then delete the document.**

In [None]:
# Step 3: Fetch documents to verify ingestion
async def fetch_documents(collection_name: str = ""):
    url = f"{INGESTOR_BASE_URL}/v1/documents"
    params = {"collection_name": collection_name, "vdb_endpoint": "http://milvus:19530"}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, params=params) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await fetch_documents(collection_name="multimodal_data")


# Step 4: Delete the test document
async def delete_documents(collection_name: str = "", file_names: list[str] = []):
    url = f"{INGESTOR_BASE_URL}/v1/documents"
    params = {"collection_name": collection_name, "vdb_endpoint": "http://milvus:19530"}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.delete(url, params=params, json=file_names) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await delete_documents(
    collection_name="multimodal_data", file_names=["sample_document.txt"]
)

## 3. Interacting with the Chatbot

Now that all services are deployed, we can interact directly with the RAG chatbot to see how it handles queries with and without using its knowledge base. This section will demonstrate how to programmatically interact with the system, evaluate its responses, and understand the impact of retrieval-augmented generation on the quality of answers.

1. **Through the Playground UI** - A visual interface for testing (see [section 1](#1-spin-up-the-blueprint))

2. **Through the API** - Programmatic access for integration into applications

In this section, we'll focus on API interactions to demonstrate how to:

| Interaction Type | Endpoint | Purpose |
|------------------|----------|---------|
| **Direct LLM queries** | `/v1/chat/completions` with `use_knowledge_base: false` | Use the LLM without RAG |
| **RAG-enhanced queries** | `/v1/chat/completions` with `use_knowledge_base: true` | Enhance responses with document knowledge |
| **Search** | `/v1/search` | Retrieve documents based on a query |
| **Reranking** | `/v1/rerank` | Improve search results with reranking |

We'll start with basic LLM queries (no knowledge base) and then show how to leverage the RAG capabilities.

### 3.1 Setting Up API Interaction
First, let's create a utility function to handle streaming responses from the RAG server. This is particularly important because the LLM generates text token by token, and the API can stream these tokens as they're generated.


#### 3.1.1 Streaming Response Handler
This function processes the server's streaming response format, concatenating tokens to form the complete response:



In [None]:
async def generate_answer(payload):
    """
    Asynchronously generates an answer from the RAG server by sending a POST request with the given payload.

    This function handles both streaming and non-streaming responses from the server.
    For streaming responses (text/event-stream), it concatenates the content from multiple chunks.
    For regular JSON responses, it extracts the content directly from the response.

    Args:
        payload (dict): The request payload containing messages and other parameters for the RAG server

    Returns:
        None: Prints the generated content to stdout

    The function expects the response to be in one of two formats:
    1. Streaming response with Server-Sent Events (SSE)
    2. Regular JSON response with a choices->message->content structure
    """
    async with aiohttp.ClientSession() as session:
        async with session.post(url=url, json=payload) as response:
            # Check if we're getting a streaming response
            content_type = response.headers.get("Content-Type", "")

            if "text/event-stream" in content_type:
                # Handle streaming response
                response_text = await response.text()
                concatenated_content = ""

                for line in response_text.split("\n"):
                    if line.startswith("data: "):
                        json_str = line[len("data: ") :]
                        if json_str.strip() == "[DONE]":
                            continue
                        try:
                            json_obj = json.loads(json_str)
                            content = (
                                json_obj.get("choices", [{}])[0]
                                .get("delta", {})
                                .get("content", "")
                            )
                            concatenated_content += content
                        except json.JSONDecodeError:
                            continue

                print(concatenated_content)
            else:
                # Handle regular JSON response
                response_json = await response.json()
                if "error" in response_json:
                    print(f"Error: {response_json['error']}")
                    return

                content = (
                    response_json.get("choices", [{}])[0]
                    .get("message", {})
                    .get("content", "")
                )
                print(content)

#### 3.1.2 Test Endpoint Health
Verify the health of the RAG server:

In [None]:
import aiohttp

# Testing RAG Server health
print("\nStep 1: Testing RAG server health endpoint")
async with aiohttp.ClientSession() as session:
    async with session.get(f"{RAG_BASE_URL}/v1/health") as response:
        await print_raw_response(response)

### 3.2 Preparing Test Documents
To demonstrate the difference between standard LLM responses and RAG-enhanced responses, we'll ingest a document containing information about FIFA World Cup winners. This will allow us to compare how the system responds to queries with and without access to this knowledge.


#### 3.2.1 Downloading the Sample Document
First, let's download a sample document about FIFA World Cup winners:


In [None]:
!wget -O fifa_world_cup_winners.pdf https://s3-ap-south-1.amazonaws.com/adda247jobs-wp-assets-adda247/jobs/wp-content/uploads/sites/2/2022/12/20155949/FIFA-World-Cup-Winners-List-From-1930-to-2022.pdf

#### 3.2.2 Ingest Document into Knowledge Base
Now we need to process this document and add it to our vector database:


In [None]:
# Name for the compose service
IPADDRESS = "ingestor-server"

# Port number for the service
rag_server_port = "8082"

# Base URL constructed from IP and port for making API requests
INGESTOR_BASE_URL = f"http://{IPADDRESS}:{rag_server_port}"


# Upload FIFA World Cup Winners PDF
async def upload_fifa_document(collection_name: str = "") -> None:
    fifa_pdf_path = "fifa_world_cup_winners.pdf"

    data = {
        "vdb_endpoint": "http://milvus:19530",
        "collection_name": collection_name,
        "extraction_options": {
            "extract_text": True,
            "extract_tables": True,
            "extract_charts": True,
            "extract_images": False,
            "extract_method": "pdfium",
            "text_depth": "page",
        },
        "split_options": {"chunk_size": 1024, "chunk_overlap": 150},
    }

    form_data = aiohttp.FormData()
    form_data.add_field(
        "documents",
        open(fifa_pdf_path, "rb"),
        filename="fifa_world_cup_winners.pdf",
        content_type="application/pdf",
    )
    form_data.add_field("data", json.dumps(data))

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{INGESTOR_BASE_URL}/v1/documents", data=form_data
            ) as response:
                await print_raw_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await upload_fifa_document(collection_name="multimodal_data")

### 3.3 Comparing LLM vs. RAG Responses
Now let's compare how the system responds to the same query with and without leveraging the knowledge base.

#### 3.3.1 Query Without Knowledge Base
First, let's see how the model responds using only its pre-trained knowledge:


In [None]:
# Name for the compose service
IPADDRESS = "rag-server"

# Port number for the service
rag_server_port = "8081"

# Base URL constructed from IP and port for making API requests
RAG_BASE_URL = f"http://{IPADDRESS}:{rag_server_port}"

url = f"{RAG_BASE_URL}/v1/chat/completions"
payload = {
    "messages": [{"role": "user", "content": "Who won the last FIFA World Cup?"}],
    "use_knowledge_base": False,
    "temperature": 0.2,
    "top_p": 0.7,
    "max_tokens": 1024,
    "reranker_top_k": 10,
    "vdb_top_k": 100,
    "vdb_endpoint": "http://milvus:19530",
    "collection_name": "multimodal_data",
    "enable_query_rewriting": False,
    "enable_reranker": True,
    "enable_guardrails": False,
    "enable_citations": True,
    "model": "nvidia/llama-3.3-nemotron-super-49b-v1",
    "llm_endpoint": "nim-llm:8000",
    "embedding_model": "nvidia/llama-3.2-nv-embedqa-1b-v2",
    "embedding_endpoint": "nemoretriever-embedding-ms:8000",
    "reranker_model": "nvidia/llama-3.2-nv-rerankqa-1b-v2",
    "reranker_endpoint": "nemoretriever-ranking-ms:8000",
    "stop": [],
}

await generate_answer(payload)

**What's happening:**

> - The request sets use_knowledge_base: false
>
> - The model relies solely on its pre-trained knowledge
> - No document retrieval or context augmentation occurs 
> - The response is based on what the model "knows" from training
> - For recent or specialized information, this may lead to outdated or incorrect answers

#### 3.3.2 Query With Knowledge Base (RAG)
Now, let's run the same query but enable the knowledge base:


In [None]:
# Name for the compose service
IPADDRESS = "rag-server"

# Port number for the service
rag_server_port = "8081"

# Base URL constructed from IP and port for making API requests
RAG_BASE_URL = f"http://{IPADDRESS}:{rag_server_port}"

url = f"{RAG_BASE_URL}/v1/chat/completions"
payload = {
    "messages": [{"role": "user", "content": "Who won the last FIFA World Cup?"}],
    "use_knowledge_base": True,
    "temperature": 0.2,
    "top_p": 0.7,
    "max_tokens": 1024,
    "reranker_top_k": 10,
    "vdb_top_k": 100,
    "vdb_endpoint": "http://milvus:19530",
    "collection_name": "multimodal_data",
    "enable_query_rewriting": False,
    "enable_reranker": True,
    "enable_guardrails": False,
    "enable_citations": True,
    "model": "nvidia/llama-3.3-nemotron-super-49b-v1",
    "llm_endpoint": "nim-llm:8000",
    "embedding_model": "nvidia/llama-3.2-nv-embedqa-1b-v2",
    "embedding_endpoint": "nemoretriever-embedding-ms:8000",
    "reranker_model": "nvidia/llama-3.2-nv-rerankqa-1b-v2",
    "reranker_endpoint": "nemoretriever-ranking-ms:8000",
    "stop": [],
}

await generate_answer(payload)

**What's happening:**

> - The request sets use_knowledge_base: true
>
> - The system converts the query into an embedding
> - It searches the vector database for semantically similar document chunks
> - The retrieved chunks are added as context to the LLM prompt
> - The model generates a response based on this augmented context
> - The response should contain specific information from our ingested document

## 4. Understanding Document Retrieval and Reranking 
### 4.1 Utility function to print search results
First, let's create a utility function to better visualize search results:


In [None]:
def print_search_results(response):
    """
    Nicely formats and prints search results from the RAG system.
    Also renders base64 encoded images when present.

    Args:
        response (dict): The response from the search API

    Returns:
        None: Prints formatted results to the console and displays images when applicable
    """
    if "results" not in response or "total_results" not in response:
        print("Invalid response format or no results found.")
        return

    print(f"\n=== SEARCH RESULTS ({response['total_results']} total) ===\n")

    for i, result in enumerate(response["results"], 1):
        print(f"Result #{i}")
        print(f"Document: {result.get('document_name', 'Unknown')}")
        print(f"Score: {result.get('score')}")

        # Handle different document types
        document_type = result.get("document_type", "text")
        print(f"Type: {document_type}")

        print("\nContent:")
        print("-" * 80)

        content = result.get("content", "")

        # Check if content looks like base64 image data
        # Base64 images typically start with specific patterns
        is_likely_image = False
        if content and isinstance(content, str):
            # Common base64 image prefixes to check
            image_prefixes = ["iVBOR", "/9j/", "R0lGOD", "PD94", "PHN2"]
            is_likely_image = any(
                content.startswith(prefix) for prefix in image_prefixes
            )

        # For chart/image type documents or content that looks like base64
        if document_type in ["chart", "image"] or is_likely_image:
            try:
                import base64
                import io

                from IPython.display import display
                from PIL import Image

                # Get base64 string and decode
                img_data = base64.b64decode(content)

                # Convert to PIL Image
                img = Image.open(io.BytesIO(img_data))

                # Display the image
                display(img)

                # Print metadata description if available
                if "metadata" in result and "description" in result["metadata"]:
                    print(f"\nDescription: {result['metadata']['description']}")
            except Exception as e:
                print(f"Error rendering image: {str(e)}")
                print("Raw content (base64 encoded, truncated):")
                if len(content) > 100:
                    content = content[:100] + "... [base64 content truncated]"
                print(content)
        else:
            # For text documents, print the content with optional truncation
            if len(content) > 500:
                content = content[:500] + "... [content truncated]"
            print(content)

        print("-" * 80)
        print("\n")

### 4.2 Basic Search Without Reranking
Let's perform a basic search query without reranking:


In [None]:
# Search without ranking
url = f"{RAG_BASE_URL}/v1/search"
payload = {
    "query": "",
    "reranker_top_k": 3,
    "vdb_top_k": 100,
    "vdb_endpoint": "http://milvus:19530",
    "collection_name": "multimodal_data",
    "messages": [
        {"role": "user", "content": "What is the rationale for Clear Print Guidelines"}
    ],
    "enable_query_rewriting": False,
    "enable_reranker": False,
    "embedding_model": "nvidia/llama-3.2-nv-embedqa-1b-v2",
    "embedding_endpoint": "nemoretriever-embedding-ms:8000",
    "reranker_model": "nvidia/llama-3.2-nv-rerankqa-1b-v2",
    "reranker_endpoint": "nemoretriever-ranking-ms:8000",
}


async def document_search(payload):
    """
    Performs a search against the RAG system and prints formatted results.

    Args:
        payload (dict): The search query payload

    Returns:
        dict: The raw response from the API
    """
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url=url, json=payload) as response:
                response_json = await response.json()
                print_search_results(response_json)  # Format and print the results
                return None  # Still return the response but it won't be printed
        except aiohttp.ClientError as e:
            print(f"Error: {e}")
            return None


await document_search(payload)

### 4.3 Enhanced Search With Reranking
Now let's see how reranking improves search results:


In [None]:
# Search with ranking
url = f"{RAG_BASE_URL}/v1/search"
payload = {
    "query": "",
    "reranker_top_k": 3,
    "vdb_top_k": 100,
    "vdb_endpoint": "http://milvus:19530",
    "collection_name": "multimodal_data",
    "messages": [
        {"role": "user", "content": "What is the rationale for Clear Print Guidelines"}
    ],
    "enable_query_rewriting": False,
    "enable_reranker": True,
    "embedding_model": "nvidia/llama-3.2-nv-embedqa-1b-v2",
    "embedding_endpoint": "nemoretriever-embedding-ms:8000",
    "reranker_model": "nvidia/llama-3.2-nv-rerankqa-1b-v2",
    "reranker_endpoint": "nemoretriever-ranking-ms:8000",
}


async def document_search(payload):
    """
    Performs a search against the RAG system and prints formatted results.

    Args:
        payload (dict): The search query payload

    Returns:
        dict: The raw response from the API
    """
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url=url, json=payload) as response:
                response_json = await response.json()
                print_search_results(response_json)  # Format and print the results
                return None  # Still return the response but it won't be printed
        except aiohttp.ClientError as e:
            print(f"Error: {e}")
            return None


await document_search(payload)

**What's happening:**

> - Initial retrieval is performed using vector similarity (same as basic search)
>
> - The reranker model then examines each retrieved document more carefully
> - It scores the relevance of each document to the specific query
> - Results are reordered based on these relevance scores
> - The most relevant documents appear at the top
Notice how the scores change and the ordering of results may differ

# Next Steps
Now that you have deployed and tested the RAG system, here are some suggested next steps:

 - Upload your own documents: Try ingesting different document types (PDFs, text files, etc.)

 - Experiment with parameters: Adjust temperature, top_k values, and other settings

 - Compare quality: Set up side-by-side comparisons of RAG vs. non-RAG responses

 - Integrate with applications: Use the APIs to build your own applications on top of this system
 
 - Custom prompt engineering: Experiment with different prompt formats to improve response quality

The system you've deployed provides a powerful foundation for building AI-powered applications that can leverage both the knowledge in your documents and the capabilities of large language models.