[![Lab Documentation and Solutions](https://img.shields.io/badge/Lab%20Documentation%20and%20Solutions-purple)](https://mongodb-developer.github.io/ai-rag-lab/)


# Step 1: Setup prerequisites

In [1]:
import os
from pymongo import MongoClient
from utils import track_progress, set_env

In [2]:
# If you are using your own MongoDB Atlas cluster, use the connection string for your cluster here
MONGODB_URI = os.environ.get("MONGODB_URI")
# Initialize a MongoDB Python client
mongodb_client = MongoClient(MONGODB_URI)
# Check the connection to the server
mongodb_client.admin.command("ping")

{'ok': 1.0,
 '$clusterTime': {'clusterTime': Timestamp(1756153591, 1),
  'signature': {'hash': b'\xda^\xd7\x8em\xfbi\xbbD\xb2\x0c\xedZ\xb4\xb9\x1b`7?T',
   'keyId': 7541088992607862791}},
 'operationTime': Timestamp(1756153591, 1)}

In [3]:
# Track progress of key steps-- DO NOT CHANGE
track_progress("cluster_creation", "ai_rag_lab")

Tracking progress for task cluster_creation


In [4]:
# Set the URL for our AI model proxy endpoint
SERVERLESS_URL = os.environ.get("SERVERLESS_URL")

In [None]:
# Set the passkey provided by your workshop instructor
PASSKEY = "replace-with-passkey"

In [6]:
# Obtain Voyage API key using the passkey and set it as an environment variable-- DO NOT CHANGE
set_env("VOYAGE_API_KEY", PASSKEY)

Setting VOYAGE_API_KEY environment variable successful.


# Step 2: Load the dataset

In [7]:
import json

In [8]:
with open("../data/mongodb_docs.json", "r") as data_file:
    json_data = data_file.read()

docs = json.loads(json_data)

In [9]:
# Note the number of documents in the dataset
len(docs)

20

In [10]:
# Preview a document to understand its structure
docs[0]

{'updated': '2024-05-20T17:30:49.148Z',
 'metadata': {'contentType': None,
  'productName': 'MongoDB Atlas',
  'tags': ['atlas', 'docs'],
  'version': None},
 'action': 'created',
 'sourceName': 'snooty-cloud-docs',
 'body': '# View Database Access History\n\n- This feature is not available for `M0` free clusters, `M2`, and `M5` clusters. To learn more, see Atlas M0 (Free Cluster), M2, and M5 Limits.\n\n- This feature is not supported on Serverless instances at this time. To learn more, see Serverless Instance Limitations.\n\n## Overview\n\nAtlas parses the MongoDB database logs to collect a list of authentication requests made against your clusters through the following methods:\n\n- `mongosh`\n\n- Compass\n\n- Drivers\n\nAuthentication requests made with API Keys through the Atlas Administration API are not logged.\n\nAtlas logs the following information for each authentication request within the last 7 days:\n\n<table>\n<tr>\n<th id="Field">\nField\n\n</th>\n<th id="Description">\nD

# Step 3: Chunk up and embed the data


In [None]:
# You might see a warning after running this cell-- You can ignore it
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import Dict, List
import voyageai
from tqdm import tqdm

In [22]:
# Common list of separators for text data
separators = ["\n\n", "\n", " ", "", "#", "##", "###"]

In [55]:
# Use the `RecursiveCharacterTextSplitter` from LangChain to first split a piece of text on the list of `separators` above.
# Then recursively merge them into tokens until the specified chunk size is reached.
# For text data, you typically want to keep 1-2 paragraphs (~200 tokens) in a single chunk.
# Set chunk overlap to 0 for contextualized embeddings, otherwise 15-20% of chunk size.
# The `model_name` parameter indicates which encoder to use for tokenization, in this case GPT-4's encoder.
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    model_name="gpt-4", separators=separators, chunk_size=200, chunk_overlap=0
)

📚 https://api.python.langchain.com/en/latest/character/langchain_text_splitters.character.RecursiveCharacterTextSplitter.html

In [57]:
def get_chunks(doc: Dict, text_field: str) -> List[Dict]:
    """
    Chunk up a document.

    Args:
        doc (Dict): Parent document to generate chunks from.
        text_field (str): Text field to chunk.

    Returns:
        List[Dict]: List of chunked documents.
    """
    # Extract the field to chunk from `doc`
    text = doc[text_field]
    # Split `text` using the `split_text` method of the `text_splitter` object
    chunks = text_splitter.split_text(text)
    return chunks

In [None]:
# Initialize the Voyage AI client
vo = voyageai.Client()

📚 https://docs.voyageai.com/docs/contextualized-chunk-embeddings#approach-2-contextualized-chunk-embeddings

In [106]:
def get_embeddings(content: List[str], input_type: str) -> List[List[float]]:
    """
    Get contextualized embeddings for each chunk.

    Args:
        content (List[str]): List of chunked texts or the user query as a list
        input_type (str): Type of input, either "document" or "query" 

    Returns:
        List[List[float]]: Contextualized embeddings
    """
    # Use the `contextualized_embed` method of the Voyage AI API to get contextualized embeddings for each chunk with the following arguments:
    # inputs: `content` wrapped in another list
    # model: `voyage-context-3`
    # input_type: `input_type`
    embds_obj = vo.contextualized_embed(inputs=[content], model="voyage-context-3", input_type=input_type)
     # If `input_type` is "document", there is a single result with multiple embeddings, one for each chunk
    if input_type == "document":
        embeddings = [emb for r in embds_obj.results for emb in r.embeddings]
    # If `input_type` is "query", there is a single result with a single embedding
    if input_type == "query":
        embeddings = embds_obj.results[0].embeddings[0]
    return embeddings

In [107]:
embedded_docs = []
# Iterate through `docs` from Step 2
for doc in tqdm(docs):
    # Use the `get_chunks` function to chunk up the "body" field in each document
    chunks = get_chunks(doc, "body")
    # Pass all the `chunks` to the `get_embeddings` function to get contextualized embeddings for each chunk
    # `input_type` should be set to "document" since we are embedding the "documents" for RAG
    chunk_embeddings = get_embeddings(chunks, "document")
    # For each chunk, create a new document with original metadata but replace the `body` and add an `embedding` field
    for chunk, embedding in zip(chunks, chunk_embeddings):
        # Create a new document by copying the original document
        chunk_doc = doc.copy()
        # Replace the `body` field with the chunk content
        chunk_doc["body"] = chunk
        # Add an `embedding` field containing the embedding for this chunk
        chunk_doc["embedding"] = embedding
        # Append `chunk_doc` to `embedded_docs`
        embedded_docs.append(chunk_doc)

  0%|          | 0/20 [00:00<?, ?it/s]

100%|██████████| 20/20 [00:06<00:00,  3.20it/s]


In [None]:
# Notice that the length of `embedded_docs` is greater than the length of `docs` from Step 2 above
# This is because each document in `docs` has been split into multiple chunks
len(embedded_docs)

101

In [None]:
# Preview a chunked document to understand its structure
# Note that the structure looks similar to the original docs, except the `body` field now contains smaller chunks of text
# Each document also has an additonal `embedding` field
embedded_docs[0]

{'updated': '2024-05-20T17:30:49.148Z',
 'metadata': {'contentType': None,
  'productName': 'MongoDB Atlas',
  'tags': ['atlas', 'docs'],
  'version': None},
 'action': 'created',
 'sourceName': 'snooty-cloud-docs',
 'body': '# View Database Access History\n\n- This feature is not available for `M0` free clusters, `M2`, and `M5` clusters. To learn more, see Atlas M0 (Free Cluster), M2, and M5 Limits.\n\n- This feature is not supported on Serverless instances at this time. To learn more, see Serverless Instance Limitations.\n\n## Overview\n\nAtlas parses the MongoDB database logs to collect a list of authentication requests made against your clusters through the following methods:\n\n- `mongosh`\n\n- Compass\n\n- Drivers\n\nAuthentication requests made with API Keys through the Atlas Administration API are not logged.\n\nAtlas logs the following information for each authentication request within the last 7 days:\n\n<table>\n<tr>\n<th id="Field">\nField\n\n</th>\n<th id="Description">\nD

# Step 4: Ingest data into MongoDB


### **Do not change the values assigned to the variables below**

In [110]:
# Name of the database -- Change if needed or leave as is
DB_NAME = "mongodb_genai_devday_rag"
# Name of the collection -- Change if needed or leave as is
COLLECTION_NAME = "knowledge_base"
# Name of the vector search index -- Change if needed or leave as is
ATLAS_VECTOR_SEARCH_INDEX_NAME = "vector_index"

In [111]:
# Connect to the `COLLECTION_NAME` collection.
collection = mongodb_client[DB_NAME][COLLECTION_NAME]

In [112]:
# Bulk delete all existing records from the collection defined above
collection.delete_many({})

DeleteResult({'n': 101, 'electionId': ObjectId('7fffffff0000000000000003'), 'opTime': {'ts': Timestamp(1756158739, 11), 't': 3}, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1756158739, 11), 'signature': {'hash': b'\xac_\x9c\xd7\xe6\x89\xcfO\xcd\x86q\xae)\x17v\x7f\x00\xd3&\x14', 'keyId': 7541088992607862791}}, 'operationTime': Timestamp(1756158739, 11)}, acknowledged=True)

📚 https://pymongo.readthedocs.io/en/stable/examples/bulk.html#bulk-insert


In [113]:
# Bulk insert `embedded_docs` into the `collection` defined above -- should be a one-liner
collection.insert_many(embedded_docs)

print(f"Ingested {collection.count_documents({})} documents into the {COLLECTION_NAME} collection.")

Ingested 101 documents into the knowledge_base collection.


# Step 5: Create a vector search index

In [114]:
from utils import create_index, check_index_ready

In [115]:
# Create vector index definition specifying:
# path: Path to the embeddings field
# numDimensions: Number of embedding dimensions- depends on the embedding model used
# similarity: Similarity metric. One of cosine, euclidean, dotProduct.
model = {
    "name": ATLAS_VECTOR_SEARCH_INDEX_NAME,
    "type": "vectorSearch",
    "definition": {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 1024,
                "similarity": "cosine",
            }
        ]
    },
}

📚 Refer to the `utils.py` script under `notebooks/utils`

In [86]:
# Use the `create_index` function from the `utils` module to create a vector search index with the above definition for the `collection` collection
create_index(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME, model)

Creating the vector_index index
vector_index index already exists, recreating...
Dropping vector_index index
vector_index index deletion complete
Creating new vector_index index
Successfully recreated the vector_index index


In [87]:
# Use the `check_index_ready` function from the `utils` module to verify that the index was created and is in READY status before proceeding
check_index_ready(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME)

vector_index index status: READY
vector_index index definition: {'fields': [{'type': 'vector', 'path': 'embedding', 'numDimensions': 1024, 'similarity': 'cosine'}]}


In [72]:
# Track progress of key steps-- DO NOT CHANGE
track_progress("vs_index_creation", "ai_rag_lab")

Tracking progress for task vs_index_creation


# Step 6: Perform vector search on your data


### Define a vector search function

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples (Refer to the "Basic Example")


In [134]:
# Define a function to retrieve relevant documents for a user query using vector search
def vector_search(user_query: str) -> List[Dict]:
    """
    Retrieve relevant documents for a user query using vector search.

    Args:
    user_query (str): The user's query string.

    Returns:
    list: A list of matching documents.
    """

    # Generate embedding for the `user_query` using the `get_embeddings` function defined in Step 3
    # NOTE: Wrap the user_query in a list since the function expects a list of strings
    # `input_type` should be set to "query" since we are embedding the query
    query_embedding = get_embeddings([user_query], "query")

    # Define an aggregation pipeline consisting of a $vectorSearch stage, followed by a $project stage
    # Set the number of candidates to 150 and only return the top 5 documents from the vector search
    # In the $project stage, exclude the `_id` field and include these fields: `body`, `metadata.productName`, `metadata.contentType`, `updated` and the `vectorSearchScore`
    # NOTE: Use variables defined previously for the `index`, `queryVector` and `path` fields in the $vectorSearch stage
    pipeline = [
        {
            "$vectorSearch": {
                "index": ATLAS_VECTOR_SEARCH_INDEX_NAME,
                "queryVector": query_embedding,
                "path": "embedding",
                "numCandidates": 150,
                "limit": 5
            }
        },
        {
            "$project": {
                "_id": 0,
                "body": 1,
                "metadata.productName": 1, 
                "metadata.contentType": 1,
                "updated": 1,
                "score": {"$meta": "vectorSearchScore"}
            }
        }
    ]

    # Execute the aggregation `pipeline` and store the results in `results`
    results = collection.aggregate(pipeline)
    return list(results)

### Run vector search queries


In [135]:
vector_search("What are some best practices for data backups in MongoDB?")

[{'updated': '2024-05-20T17:31:07.735Z',
  'metadata': {'contentType': None, 'productName': 'MongoDB Server'},
  'body': '# Backup and Restore Sharded Clusters\n\nThe following tutorials describe backup and restoration for sharded clusters:\n\nTo use `mongodump` and `mongorestore` as a backup strategy for sharded clusters, you must stop the sharded cluster balancer and use the `fsync` command or the `db.fsyncLock()` method on `mongos` to block writes on the cluster during backups.\n\nSharded clusters can also use one of the following coordinated backup and restore processes, which maintain the atomicity guarantees of transactions across shards:\n\n- MongoDB Atlas\n\n- MongoDB Cloud Manager\n\n- MongoDB Ops Manager\n\nUse file system snapshots back up each component in the sharded cluster individually. The procedure involves stopping the cluster balancer. If your system configuration allows file system backups, this might be more efficient than using MongoDB tools.\n\nCreate backups usi

In [136]:
vector_search("How to resolve alerts in MongoDB?")

[{'updated': '2024-05-20T17:30:49.148Z',
  'metadata': {'contentType': None, 'productName': 'MongoDB Atlas'},
  'body': '</td>\n<td headers="Description">\nPercentage of used disk space on a partition reaches a specified threshold.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nQuery Targeting Alerts\n\n</td>\n<td headers="Description">\nIndicates inefficient queries.\n\nThe change streams cursors that the Atlas Search process (`mongot`) uses to keep Atlas Search indexes updated can contribute to the query targeting ratio and trigger query targeting alerts if the ratio is high.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nReplica Set Has No Primary\n\n</td>\n<td headers="Description">\nNo primary is detected in replica set.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nReplication Oplog Alerts\n\n</td>\n<td headers="Description">\nAmount of oplog data generated on a primary cluster member is larger than the cluster\'s configured oplog size.',
  'score': 0.752524971961

# 🦹‍♀️ Combine pre-filtering with vector search

### Filter for documents where the product name is `MongoDB Atlas`

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-type/#about-the-filter-type

In [119]:
# Modify the vector search index `model` from Step 6 to include the `metadata.productName` field as a `filter` field
model = {
    "name": ATLAS_VECTOR_SEARCH_INDEX_NAME,
    "type": "vectorSearch",
    "definition": {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 1024,
                "similarity": "cosine"
            },
            {"type": "filter", "path": "metadata.productName"}
        ]
    }
}

In [120]:
# Use the `create_index` function from the `utils` module to re-create the vector search index with the modified model
create_index(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME, model)

Creating the vector_index index
vector_index index already exists, recreating...
Dropping vector_index index
vector_index index deletion complete
Creating new vector_index index
Successfully recreated the vector_index index


In [121]:
# Use the `check_index_ready` function from the `utils` module to verify that the index has the right filter fields and is in READY status before proceeding
check_index_ready(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME)

vector_index index status: READY
vector_index index definition: {'fields': [{'type': 'vector', 'path': 'embedding', 'numDimensions': 1024, 'similarity': 'cosine'}, {'type': 'filter', 'path': 'metadata.productName'}]}


In [None]:
# Embed the user query
query_embedding = get_embeddings(
    ["What are some best practices for data backups in MongoDB?"], "query"
)

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples (Refer to the "Filter Example")

In [138]:
# Modify the aggregation pipeline defined in Step 6 to:
# Include a filter in the $vectorSearch stage for documents where the `metadata.productName` field has the value "MongoDB Atlas".
# Include the `metadata.productName` in the $project stage of the pipeline.
pipeline = [
    {
        "$vectorSearch": {
            "index": ATLAS_VECTOR_SEARCH_INDEX_NAME,
            "path": "embedding",
            "queryVector": query_embedding,
            "numCandidates": 150,
            "limit": 5,
            "filter": {"metadata.productName": "MongoDB Atlas"}
        }
    },
    {
        "$project": {
            "_id": 0,
            "body": 1,
            "metadata.productName": 1, 
            "metadata.contentType": 1,
            "updated": 1,
            "score": {"$meta": "vectorSearchScore"}
        }
    }
]

In [139]:
# Execute the aggregation pipeline and view the results
results = collection.aggregate(pipeline)
list(results)

[{'updated': '2024-05-20T17:30:49.148Z',
  'metadata': {'contentType': None, 'productName': 'MongoDB Atlas'},
  'body': '</td>\n<td headers="Description">\nPercentage of used disk space on a partition reaches a specified threshold.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nQuery Targeting Alerts\n\n</td>\n<td headers="Description">\nIndicates inefficient queries.\n\nThe change streams cursors that the Atlas Search process (`mongot`) uses to keep Atlas Search indexes updated can contribute to the query targeting ratio and trigger query targeting alerts if the ratio is high.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nReplica Set Has No Primary\n\n</td>\n<td headers="Description">\nNo primary is detected in replica set.\n\n</td>\n</tr>\n<tr>\n<td headers="Alert%20Type">\nReplication Oplog Alerts\n\n</td>\n<td headers="Description">\nAmount of oplog data generated on a primary cluster member is larger than the cluster\'s configured oplog size.',
  'score': 0.630968928337

### Filter on documents which have been updated on or after `2024-05-19` and where the content type is `Tutorial`

In [146]:
# Modify the vector search index `model` from Step 6 to include `metadata.contentType` and `updated` as `filter` fields
model = {
    "name": ATLAS_VECTOR_SEARCH_INDEX_NAME,
    "type": "vectorSearch",
    "definition": {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 1024,
                "similarity": "cosine"
            },
            {"type": "filter", "path": "metadata.contentType"},
            {"type": "filter", "path": "updated"}
        ]
    }
}

In [147]:
# Use the `create_index` function from the `utils` module to re-create the vector search index with the modified model
create_index(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME, model)

Creating the vector_index index
vector_index index already exists, recreating...
Dropping vector_index index
vector_index index deletion complete
Creating new vector_index index
Successfully recreated the vector_index index


In [148]:
# Use the `check_index_ready` function from the `utils` module to verify that the index has the right filter fields and is in READY status before proceeding
check_index_ready(collection, ATLAS_VECTOR_SEARCH_INDEX_NAME)

vector_index index status: READY
vector_index index definition: {'fields': [{'type': 'vector', 'path': 'embedding', 'numDimensions': 1024, 'similarity': 'cosine'}, {'type': 'filter', 'path': 'metadata.contentType'}, {'type': 'filter', 'path': 'updated'}]}


In [149]:
# Embed the user query
query_embedding = get_embeddings(
    ["What are some best practices for data backups in MongoDB?"], "query"
)

In [None]:
# Modify the aggregation pipeline defined in Step 6 to:
# Include a filter in the $vectorSearch stage for documents where the `metadata.contentType` field is "Tutorial" AND the `updated` field is greater than or equal to "2024-05-19".
# Include the `metadata.contentType` and `updated` fields in the $project stage of the pipeline.
pipeline = [
    {
        "$vectorSearch": {
            "index": ATLAS_VECTOR_SEARCH_INDEX_NAME,
            "path": "embedding",
            "queryVector": query_embedding,
            "numCandidates": 150,
            "limit": 5,
            "filter": {
                "$and": [
                    {"metadata.contentType": "Tutorial"},
                    {"updated": {"$gte": "2024-05-19"}}
                ]
            }
        }
    },
    {
        "$project": {
            "_id": 0,
            "body": 1,
            "updated": 1,
            "score": {"$meta": "vectorSearchScore"}
        }
    }
]

In [151]:
# Execute the aggregation pipeline and view the results
results = collection.aggregate(pipeline)
list(results)

[{'updated': '2024-05-20T17:32:23.500Z',
  'body': '### Main trigger logic\n\nThe main trigger logic is invoked when an update change event is detected with a `"process" : false` field.\n```javascript\nexports = async function(changeEvent) {\n  // A Database Trigger will always call a function with a changeEvent.\n  // Documentation on ChangeEvents: https://www.mongodb.com/docs/manual/reference/change-events\n\n  // This sample function will listen for events and replicate them to a collection in a different Database\nfunction sampleReviews(reviews) {\n// Logic above...\n   if (reviews.length <= 50) {\n        return reviews;\n    }\n    const sampledReviews = [];\n    const seenIndices = new Set();\n\n    while (sampledReviews.length < 50) {\n        const randomIndex = Math.floor(Math.random() * reviews.length);\n        if (!seenIndices.has(randomIndex)) {\n            seenIndices.add(randomIndex);\n            sampledReviews.push(reviews[randomIndex]);\n        }\n    }\n\n    retu

# Step 7: Build the RAG application


In [155]:
import requests

### Define a function to create the chat prompt

In [None]:
# Define a function to create the user prompt for our RAG application
def create_prompt(user_query: str) -> str:
    """
    Create a chat prompt that includes the user query and retrieved context.

    Args:
        user_query (str): The user's query string.

    Returns:
        str: The chat prompt string.
    """
    # Retrieve the most relevant documents for the `user_query` using the `vector_search` function defined in Step 6
    context = vector_search(user_query)
    # Join the retrieved documents into a single string, where each document is separated by two new lines ("\n\n")
    context = "\n\n".join([doc.get('body') for doc in context])
    # Prompt consisting of the question and relevant context to answer it
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

### Define a function to answer user queries

In [153]:
# Define a function to answer user queries
def generate_answer(user_query: str) -> None:
    """
    Generate an answer to the user query.

    Args:
        user_query (str): The user's query string.
    """
    # Use the `create_prompt` function above to create a chat prompt
    prompt = create_prompt(user_query)
    # Format the message to the LLM in the format [{"role": <role_value>, "content": <content_value>}
    # The role value for user messages must be "user"
    # Use the `prompt` created above to populate the `content` field in the chat message
    messages = [{"role": "user", "content": prompt}]
    # Send the chat messages to a serverless function to get back an LLM response
    response = requests.post(url=SERVERLESS_URL, json={"task": "completion", "data": messages})
    # Print the final answer
    print(response.json()["text"])

### Query the RAG application


In [156]:
generate_answer("What are some best practices for data backups in MongoDB?")

Based on the given context, some best practices for data backups in MongoDB sharded clusters include:

1. Stopping the sharded cluster balancer before performing backups.

2. Using the `fsync` command or `db.fsyncLock()` method on `mongos` to block writes during backups when using `mongodump` and `mongorestore`.

3. Using coordinated backup and restore processes that maintain atomicity guarantees across shards, such as MongoDB Atlas, MongoDB Cloud Manager, or MongoDB Ops Manager.

4. Using file system snapshots to back up each component in the sharded cluster individually, if the system configuration allows.

5. Using `mongodump` to back up each component in the cluster individually.

6. Limiting the operation of the cluster balancer to provide a window for regular backup operations.

7. For file system backups, stopping the cluster balancer before proceeding.

The context also mentions that these methods can help maintain the atomicity guarantees of transactions across shards in a sha

In [157]:
# Notice that the LLM does not remember the conversation history at this stage
generate_answer("What did I just ask you?")

I DON'T KNOW

The context provided does not contain any information about a question you asked previously. The context appears to be about MongoDB documentation and tips for using CodeWhisperer, but does not include any prior question from you.


# 🦹‍♀️ Re-rank retrieved results


📚 https://docs.voyageai.com/docs/reranker#python-api (See Example)

In [None]:
# Add a re-ranking step to the following function
def create_prompt(user_query: str) -> str:
    """
    Create a chat prompt that includes the user query and retrieved context.

    Args:
        user_query (str): The user's query string.

    Returns:
        str: The chat prompt string.
    """
    # Retrieve the most relevant documents for the `user_query` using the `vector_search` function defined in Step 6
    context = vector_search(user_query)
    # Extract the "body" field from each document in `context`
    documents = [d.get("body") for d in context]
    # Use the `rerank` method of the Voyage AI API to re-rank the `documents` witht he following arguments:
    # model: "rerank-2.5"
    # top_k: 5 
    reranked_documents = vo.rerank(user_query, documents, model="rerank-2.5", top_k=5).results
    # Join the re-ranked documents into a single string, where each document is separated by two new lines ("\n\n")
    context = "\n\n".join([d.document for d in reranked_documents])
    # Prompt consisting of the question and relevant context to answer it
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

In [167]:
# Note the impact of re-ranking on the generated answer
# You might not see a difference in this example since we are only re-ranking 5 documents
# In practice, you would send a larger number of documents to the re-ranker, and get the top few AFTER reranking
generate_answer("What are some best practices for data backups in MongoDB?")

Based on the provided context, some best practices for data backups in MongoDB sharded clusters include:

1. Stopping the sharded cluster balancer before performing backups.

2. Using the `fsync` command or `db.fsyncLock()` method on `mongos` to block writes on the cluster during backups when using `mongodump` and `mongorestore`.

3. Using coordinated backup and restore processes that maintain atomicity guarantees across shards, such as MongoDB Atlas, MongoDB Cloud Manager, or MongoDB Ops Manager.

4. Using file system snapshots to back up each component in the sharded cluster individually, if the system configuration allows it.

5. Using `mongodump` to back up each component in the cluster individually.

6. Limiting the operation of the cluster balancer to provide a window for regular backup operations.

7. For file system backups, stopping the cluster balancer before proceeding.

These practices help ensure consistent and reliable backups of sharded clusters in MongoDB.


# Step 8: Add memory to the RAG application


In [168]:
from datetime import datetime

In [169]:
history_collection = mongodb_client[DB_NAME]["chat_history"]

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index


In [170]:
# Create an index on the key `session_id` for the `history_collection` collection
history_collection.create_index("session_id")

'session_id_1'

### Define a function to store chat messages in MongoDB

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one

In [171]:
def store_chat_message(session_id: str, role: str, content: str) -> None:
    """
    Store a chat message in a MongoDB collection.

    Args:
        session_id (str): Session ID of the message.
        role (str): Role for the message. One of `system`, `user` or `assistant`.
        content (str): Content of the message.
    """
    # Create a message object with `session_id`, `role`, `content` and `timestamp` fields
    # `timestamp` should be set the current timestamp
    message = {
        "session_id": session_id,
        "role": role,
        "content": content,
        "timestamp": datetime.now(),
    }
    # Insert the `message` into the `history_collection` collection
    history_collection.insert_one(message)

### Define a function to retrieve chat history from MongoDB

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/cursor.html#pymongo.cursor.Cursor.sort

In [172]:
def retrieve_session_history(session_id: str) -> List:
    """
    Retrieve chat message history for a particular session.

    Args:
        session_id (str): Session ID to retrieve chat message history for.

    Returns:
        List: List of chat messages.
    """
    # Query the `history_collection` collection for documents where the "session_id" field has the value of the input `session_id`
    # Sort the results in increasing order of the values in `timestamp` field
    cursor =  history_collection.find({"session_id": session_id}).sort("timestamp", 1)

    if cursor:
        # Iterate through the cursor and extract the `role` and `content` field from each entry
        # Then format each entry as: {"role": <role_value>, "content": <content_value>}
        messages = [{"role": msg["role"], "content": msg["content"]} for msg in cursor]
    else:
        # If cursor is empty, return an empty list
        messages = []

    return messages

### Handle chat history in the `generate_answer` function

In [173]:
def generate_answer(session_id: str, user_query: str) -> None:
    """
    Generate an answer to the user's query taking chat history into account.

    Args:
        session_id (str): Session ID to retrieve chat history for.
        user_query (str): The user's query string.
    """
    # Initialize list of messages to pass to the chat completion model
    messages = []

    # Retrieve documents relevant to the user query and convert them to a single string
    context = vector_search(user_query)
    context = "\n\n".join([d.get("body", "") for d in context])
    # Create a system prompt containing the retrieved context
    system_message = {
        "role": "user",
        "content": f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}",
    }
    # Append the system prompt to the `messages` list
    messages.append(system_message)

    # Use the `retrieve_session_history` function to retrieve message history from MongoDB for the session ID `session_id` 
    # And add all messages in the message history to the `messages` list 
    message_history = retrieve_session_history(session_id)
    messages.extend(message_history)

    # Format the user query in the format {"role": <role_value>, "content": <content_value>}
    # The role value for user messages must be "user"
    # And append the user message to the `messages` list
    user_message = {"role": "user", "content": user_query}
    messages.append(user_message)

    # Send the chat messages to a serverless function to get back an LLM response
    response = requests.post(url=SERVERLESS_URL, json={"task": "completion", "data": messages})

    # Extract the answer from the response
    answer = response.json()["text"]

    # Use the `store_chat_message` function to store the user message and also the generated answer in the message history collection
    # The role value for user messages is "user", and "assistant" for the generated answer
    store_chat_message(session_id, "user", user_query)
    store_chat_message(session_id, "assistant", answer)

    print(answer)

In [174]:
generate_answer(
    session_id="1",
    user_query="What are some best practices for data backups in MongoDB?",
)

Based on the provided context, some best practices for data backups in MongoDB sharded clusters include:

1. Stop the sharded cluster balancer before performing backups.

2. Use the fsync command or db.fsyncLock() method on mongos to block writes during backups when using mongodump and mongorestore.

3. Consider using coordinated backup and restore processes that maintain transaction atomicity across shards, such as MongoDB Atlas, Cloud Manager, or Ops Manager.

4. Use file system snapshots to back up each component of the sharded cluster individually if your system configuration allows it.

5. Use mongodump to back up each component in the cluster individually.

6. Limit the operation of the cluster balancer to provide a window for regular backup operations.

7. For file system backups, stop the cluster balancer before taking snapshots.

8. Consider using MongoDB's built-in backup solutions like Atlas, Cloud Manager or Ops Manager for coordinated backups that maintain consistency acro

In [175]:
generate_answer(
    session_id="1",
    user_query="What did I just ask you?",
)

You asked: "What are some best practices for data backups in MongoDB?"
