# Step 1: Setup prerequisites

In [None]:
import os
from pymongo import MongoClient

In [None]:
# If you are using your own MongoDB Atlas cluster, use the connection string for your cluster here
MONGODB_URI = os.getenv("MONGODB_URI")
# Initialize a MongoDB Python client
mongodb_client = MongoClient(MONGODB_URI)
# Check the connection to the server
mongodb_client.admin.command("ping")

In [None]:
SERVERLESS_URL = os.getenv("SERVERLESS_URL")
LLM_PROVIDER = "google"

# Step 2: Read PDF from URL

In [None]:
import pymupdf
import requests

📚 https://pymupdf.readthedocs.io/en/latest/how-to-open-a-file.html#opening-remote-files

In [None]:
# Download the DeepSeek paper
response = requests.get("https://arxiv.org/pdf/2501.12948")
if response.status_code != 200:
    raise ValueError(f"Failed to download PDF. Status code: {response.status_code}")
# Get the content of the response
pdf_stream = response.content
# Open the data in `pdf_stream` as a PDF document and store it in `pdf`.
# HINT: Set the `filetype` argument to "pdf".
pdf = <CODE_BLOCK_1>

# Step 3: Store PDF images locally and extract metadata

In [None]:
from tqdm import tqdm

In [None]:
docs = []

📚 https://pymupdf.readthedocs.io/en/latest/page.html#Page.get_pixmap

In [None]:
zoom = 3.0
# Set image matrix dimensions
mat = pymupdf.Matrix(zoom, zoom)
# Iterate through the pages of the PDF
for n in tqdm(range(pdf.page_count)):
    temp = {}
    # Use the `get_pixmap` method to render the PDF page as a matrix of pixels as specified by the variable `mat`
    # HINT: Access the PDF page as pdf[n]
    pix = <CODE_BLOCK_2>
    # Store image locally
    key = f"data/images/{n+1}.png"
    pix.save(key)
    # Extract image metadata to be stored in MongoDB
    temp["key"] = key
    temp["width"] = pix.width
    temp["height"] = pix.height
    docs.append(temp)

# Step 4: Generate image embeddings

Uncomment this section only if you are generating embedding using your own Voyage AI API key.

Follow the steps [here](https://docs.voyageai.com/docs/api-key-and-installation#authentication-with-api-keys) to obtain a Voyage AI API key.

In [None]:
# from voyageai import Client
# from PIL import Image

In [None]:
# # Set Voyage AI API Key
# os.environ["VOYAGE_API_KEY"] = "your-api-key"

In [None]:
# voyageai_client = Client()

In [None]:
# def get_embedding(data, input_type):
#     """
#     Get Voyage AI embeddings for images and text.

#     Args:
#         data: An image or text to embed
#         input_type: Input type, either "document" or "query"

#     Returns: Embeddings as a list
#     """
#     embedding = voyageai_client.multimodal_embed(
#         inputs=[[data]], model="voyage-multimodal-3", input_type=input_type
#     ).embeddings[0]
#     return embedding

In [None]:
# embedded_docs = []
# for doc in tqdm(docs):
#     # Open the image from file
#     img = Image.open(f"{doc['key']}")
#     # Add the embeddings to the document
#     doc["embedding"] = get_embedding(img, "document")
#     embedded_docs.append(doc)

# Step 5: Write embeddings and metadata to MongoDB

In this step, we are ingesting a dataset with multimodal embeddings pre-generated, into MongoDB. 

If you would like to understand how to the embedding process works, uncomment and work through the code in Step 4.

In [None]:
import json

In [None]:
#  Database name
DB_NAME = "mongodb_aiewf"
# Name of the collection to insert documents into
COLLECTION_NAME = "multimodal_workshop"

In [None]:
# Connect to the collection
collection = mongodb_client[DB_NAME][COLLECTION_NAME]

In [None]:
# Read data from local file
with open("data/embeddings.json", "r") as data_file:
    json_data = data_file.read()
data = json.loads(json_data)

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_many

In [None]:
# Delete existing documents from the `collection` collection
collection.delete_many({})
print(f"Deleted existing documents from the {COLLECTION_NAME} collection.")
# Bulk insert documents in `data`, into the `collection` collection.
<CODE_BLOCK_3>
print(
    f"{collection.count_documents({})} documents ingested into the {COLLECTION_NAME} collection."
)

# Step 6: Create a vector search index

In [None]:
VS_INDEX_NAME = "vector_index"

In [None]:
# Create vector index definition specifying:
# path: Path to the embeddings field
# numDimensions: Number of embedding dimensions- depends on the embedding model used
# similarity: Similarity metric. One of cosine, euclidean, dotProduct.
model = {
    "name": VS_INDEX_NAME,
    "type": "vectorSearch",
    "definition": {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 1024,
                "similarity": "cosine",
            }
        ]
    },
}

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_search_index

In [None]:
# Create a vector search index with the above `model` for the `collection` collection
<CODE_BLOCK_4>

In [None]:
# Verify that the index is in READY status before proceeding
list(collection.list_search_indexes())

# Step 7: Create agent tools


In [None]:
from typing import List

📚 https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples (Refer to Basic Example)

In [None]:
def get_information_for_question_answering(user_query: str) -> List[str]:
    """
    Retrieve information using vector search to answer a user query.

    Args:
    user_query (str): The user's query string.

    Returns:
    str: The retrieved information formatted as a string.
    """
    # Embed the user query using our serverless endpoint
    response = requests.post(
        url=SERVERLESS_URL,
        json={
            "task": "get_embedding",
            "data": {"input": user_query, "input_type": "query"},
        },
    )
    # Extract the embedding from the response
    query_embedding = response.json()["embedding"]

    # Define an aggregation pipeline consisting of a $vectorSearch stage followed by a $project stage
    # Set the number of candidates to 150 and only return the top 2 documents from the vector search
    # In the $project stage, exclude the `_id` field, include these fields: `key`, `width`, `height`, and the `vectorSearchScore`
    # NOTE: Use variables defined previously for the `index`, `queryVector` and `path` fields in the $vectorSearch stage
    pipeline = <CODE_BLOCK_5>

    # Execute the aggregation `pipeline` against the `collection` collection and store the results in `results`
    results = <CODE_BLOCK_6>
    # Get images from local storage
    keys = [result["key"] for result in results]
    print(f"Keys: {keys}")
    return keys

📚 https://ai.google.dev/gemini-api/docs/function-calling?example=meeting#step_1_define_function_declaration

In [None]:
# Define the function declaration for the `get_information_for_question_answering` function
get_information_for_question_answering_declaration = {
    "name": <CODE_BLOCK_7>,
    "description": "Retrieve information using vector search to answer a user query.",
    "parameters": {
        "type": "object",
        "properties": {
            "user_query": {
                "type": <CODE_BLOCK_8>,
                "description": "Query string to use for vector search",
            }
        },
        "required": <CODE_BLOCK_9>,
    },
}

# Step 8: Instantiate the Gemini client

In [None]:
from google import genai
from google.genai import types

In [None]:
LLM = "gemini-2.0-flash"

In [None]:
api_key = requests.post(
    url=SERVERLESS_URL, json={"task": "get_api_key", "data": LLM_PROVIDER}
).json()["api_key"]
gemini_client = genai.Client(api_key=api_key)

# Step 9: Create generation config

In [None]:
# Create a generation config with the `get_information_for_question_answering_declaration` function declaration and `temperature` set to 0.0
tools = types.Tool(
    function_declarations=[get_information_for_question_answering_declaration]
)
tools_config = types.GenerateContentConfig(tools=[tools], temperature=0.0)

# Step 10: Define a function for tool selection

In [None]:
from google.genai.types import FunctionCall

📚 https://ai.google.dev/gemini-api/docs/function-calling?example=meeting#step_4_create_user_friendly_response_with_function_result_and_call_the_model_again

In [None]:
def select_tool(messages: List) -> FunctionCall | None:
    """
    Use an LLM to decide which tool to call

    Args:
        messages (List): Messages as a list

    Returns:
        functionCall: Function call object consisting of the tool name and arguments
    """
    system_prompt = [
        (
            "You're an AI assistant. Based on the given information, decide which tool to use."
            "If the user is asking to explain an image, don't call any tools unless that would help you better explain the image."
            "Here is the provided information:\n"
        )
    ]
    # Input to the LLM
    contents = system_prompt + messages
    # Use the `gemini_client`, `LLM`, `contents` and `tools_config` defined previously to generate a response using Gemini
    response = <CODE_BLOCK_10>
    # Extract and return the function call from the response
    return response.candidates[0].content.parts[0].function_call

# Step 10: Define a function to execute tools and generate responses

In [None]:
from PIL import Image

📚 https://ai.google.dev/gemini-api/docs/function-calling?example=meeting#step_3_execute_set_light_values_function_code

In [None]:
def generate_answer(user_query: str, images: List = []) -> str:
    """
    Execute any tools and generate a response

    Args:
        user_query (str): User's query string
        images (List): List of filepaths. Defaults to [].

    Returns:
        str: LLM-generated response
    """
    # Use the `select_tool` function above to get the tool config
    # NOTE: Input to `select_tool` should be a list
    tool_call = <CODE_BLOCK_11>
    # If a tool call is found and the name is `get_information_for_question_answering`
    if (
        tool_call is not None
        and tool_call.name == "get_information_for_question_answering"
    ):
        print(f"Agent: Calling tool: {tool_call.name}")
        # Call the tool with the arguments extracted by the LLM
        tool_images = <CODE_BLOCK_12>
        # Add images return by the tool to the list of input images if any
        images.extend(tool_images)


    system_prompt = f"Answer the questions based on the provided context only. If the context is not sufficient, say I DON'T KNOW. DO NOT use any other information to answer the question."
    # Pass the system prompt, user query, and content retrieved using vector search (`images`) as input to the LLM
    contents = [system_prompt] + [user_query] + [Image.open(image) for image in images]

    # Get the response from the LLM
    response = gemini_client.models.generate_content(
        model=LLM,
        contents=contents,
        config=types.GenerateContentConfig(temperature=0.0),
    )
    answer = response.text
    return answer

# Step 11: Define a function to execute the agent

In [None]:
def execute_agent(user_query: str, images: List = []) -> None:
    """
    Execute the agent.

    Args:
        user_query (str): User query
        images (List, optional): List of filepaths. Defaults to [].
    """
    response = generate_answer(user_query, images)
    print("Agent:", response)

In [None]:
# Test the agent with a text input
execute_agent("What is the Pass@1 accuracy of Deepseek R1 on the MATH500 benchmark?")

In [None]:
# Test the agent with an image input
execute_agent("Explain the graph in this image:", ["data/test.png"])

# Step 12: Add memory to the agent

In [None]:
from datetime import datetime

In [None]:
# Instantiate the history collection
history_collection = mongodb_client[DB_NAME]["history"]

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index

In [None]:
# Create an index on `session_id` on the `history_collection` collection
<CODE_BLOCK_13>

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one

In [None]:
def store_chat_message(session_id: str, role: str, type: str, content: str) -> None:
    """
    Create chat history document and store it in MongoDB

    Args:
        session_id (str): Session ID
        role (str): Message role, one of `human` or `agent`.
        type (str): Type of message, one of `text` or `image`.
        content (str): Content of the message. For images, this is the image key.
    """
    # Create a message object with `session_id`, `role`, `type`, `content` and `timestamp` fields
    # `timestamp` should be set the current timestamp
    message = {
        "session_id": session_id,
        "role": role,
        "type": type,
        "content": content,
        "timestamp": datetime.now(),
    }
    # Insert the `message` into the `history_collection` collection
    <CODE_BLOCK_14>

📚 https://pymongo.readthedocs.io/en/stable/api/pymongo/cursor.html#pymongo.cursor.Cursor.sort

In [None]:
def retrieve_session_history(session_id: str) -> List:
    """
    Retrieve chat history for a particular session.

    Args:
        session_id (str): Session ID

    Returns:
        List: List of messages. Can be a combination of text and images.
    """
    # Query the `history_collection` collection for documents where the "session_id" field has the value of the input `session_id`
    # Sort the results in increasing order of the values in `timestamp` field
    cursor = <CODE_BLOCK_15>
    messages = []
    if cursor:
        for msg in cursor:
            # Is the message type is `text`, append the content as is
            if msg["type"] == "text":
                messages.append(msg["content"])
            # If message type is `image`, open the image
            elif msg["type"] == "image":
                messages.append(Image.open(msg["content"]))
    return messages

In [None]:
def generate_answer(session_id: str, user_query: str, images: List = []) -> str:
    """
    Execute any tools and generate a response

    Args:
        session_id (str): Session ID
        user_query (str): User's query string
        images (List): List of filepaths. Defaults to [].

    Returns:
        str: LLM-generated response
    """
    # Retrieve past conversation history for the specified `session_id` using the `retrieve_session_history` method
    history = <CODE_BLOCK_16>
    # Determine if any additional tools need to be called
    tool_call = select_tool(history + [user_query])
    if (
        tool_call is not None
        and tool_call.name == "get_information_for_question_answering"
    ):
        print(f"Agent: Calling tool: {tool_call.name}")
        # Call the tool with the arguments extracted by the LLM
        tool_images = get_information_for_question_answering(**tool_call.args)
        # Add images return by the tool to the list of input images if any
        images.extend(tool_images)

    # Pass the system prompt, conversation history, user query and retrieved context (`images`) to the LLM to generate an answer
    system_prompt = f"Answer the questions based on the provided context only. If the context is not sufficient, say I DON'T KNOW. DO NOT use any other information to answer the question."
    contents = (
        [system_prompt]
        + history
        + [user_query]
        + [Image.open(image) for image in images]
    )
    # Get a response from the LLM
    response = gemini_client.models.generate_content(
        model=LLM,
        contents=contents,
        config=types.GenerateContentConfig(temperature=0.0),
    )
    answer = response.text
    # Write the current user query to memory using the `store_chat_message` function
    # The `role` for user queries is "user" and `type` is "text"
    <CODE_BLOCK_17>
    # Write the filepaths of input/retrieved images to memory using the store_chat_message` function
    # The `role` for these is "user" and `type` is "image"
    for image in images:
        <CODE_BLOCK_18>
    # Write the LLM generated response to memory
    # The `role` for these is "agent" and `type` is "text"
    <CODE_BLOCK_19>
    return answer

In [None]:
def execute_agent(session_id: str, user_query: str, images: List = []) -> None:
    """
    Execute the agent.

    Args:
        session_id (str): Session ID
        user_query (str): User query
        images (List, optional): List of filepaths. Defaults to [].
    """
    response = generate_answer(session_id, user_query, images)
    print("Agent:", response)

In [None]:
execute_agent(
    "1",
    "What is the Pass@1 accuracy of Deepseek R1 on the MATH500 benchmark?",
)

In [None]:
# Follow-up question to make sure chat history is being used.
execute_agent(
    "1",
    "What did I just ask you?",
)

# 🦸‍♀️ Update to ReAct agent

In [None]:
def generate_answer(user_query: str, images: List = []) -> str:
    """
    Implement a ReAct agent

    Args:
        user_query (str): User's query string
        images (List): List of filepaths. Defaults to [].

    Returns:
        str: LLM-generated response
    """
    # Define reasoning prompt
    system_prompt = [
        (
            "You are an AI assistant. Based on the current information, decide if you have enough to answer the user query, or if you need more information."
            "If you have enough information, respond with 'ANSWER: <your answer>'."
            "If you need more information, respond with 'TOOL: <question for the tool>'. Keep the question concise."
            f"User query: {user_query}\n"
            "Current information:\n"
        )
    ]
    # Set max iterations
    max_iterations = 3
    current_iteration = 0
    # Initialize list to accumulate tool outcomes etc.
    current_information = []

    # If the user input has images, add them to `current_information`
    if len(images) != 0:
        current_information.extend([Image.open(image) for image in images])

    # Run the reasoning -> action taking loop for `max_iterations` number of iterations
    while current_iteration < max_iterations:
        current_iteration += 1
        print(f"Iteration {current_iteration}:")
        # Generate action -> final answer/tool call
        response = gemini_client.models.generate_content(
            model=LLM,
            contents=system_prompt + current_information,
            config=types.GenerateContentConfig(temperature=0.0),
        )
        answer = response.text
        print(f"Agent: {answer}")
        # If the agent has the final answer, return it
        if "ANSWER" in answer:
            return answer
        # If the agent decides to call a tool
        else:
            # determine which tool to call
            tool_call = select_tool([answer])
            if (
                tool_call is not None
                and tool_call.name == "get_information_for_question_answering"
            ):
                print(f"Agent: Calling tool: {tool_call.name}")
                # Call the tool with the arguments extracted by the LLM
                tool_images = get_information_for_question_answering(**tool_call.args)
                # Add images return by the tool to the list of input images if any
                current_information.extend([Image.open(image) for image in tool_images])
                continue

In [None]:
def execute_agent(user_query: str, images: List = []) -> None:
    """
    Execute the agent.

    Args:
        user_query (str): User query
        images (List, optional): List of filepaths. Defaults to [].
    """
    response = generate_answer(user_query, images)
    print("Agent:", response)

In [None]:
execute_agent("What is the Pass@1 accuracy of Deepseek R1 on the MATH500 benchmark?")

In [None]:
execute_agent("Explain the graph in this image:", ["data/test.png"])