# [STARTER] Udaplay Project

## Part 02 - Agent

In this part of the project, you'll use your VectorDB to be part of your Agent as a tool.

You're building UdaPlay, an AI Research Agent for the video game industry. The agent will:
1. Answer questions using internal knowledge (RAG)
2. Search the web when needed
3. Maintain conversation state
4. Return structured outputs
5. Store useful information for future use

### Setup

In [1]:
# Only needed for Udacity workspace

import importlib.util
import sys

# Check if 'pysqlite3' is available before importing
if importlib.util.find_spec("pysqlite3") is not None:
    import pysqlite3
    sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [2]:
# Standard lib imports
import os
from dotenv import load_dotenv
from tavily import TavilyClient
from rich import print as rprint
from typing import Optional 
# Custom lib imports
from lib.agents import Agent
from lib.llm import LLM
from lib.messages import UserMessage, SystemMessage, ToolMessage, AIMessage
from lib.tooling import tool
from lib.vector_db import VectorStoreManager

In [3]:

load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")

In [4]:

# subclass the VectorStoreManager to add a persistent store
import chromadb
from chromadb.utils import embedding_functions
class PersistentVectorStoreManager(VectorStoreManager):
    def __init__(self, openai_api_key: str, chroma_path: Optional[str] = None):
        if chroma_path:
            self.chroma_client = chromadb.PersistentClient(chroma_path)
        else:
            self.chroma_client = chromadb.Client()
        self.embedding_function = self._create_embedding_function(openai_api_key)

manager = PersistentVectorStoreManager(
    openai_api_key=OPENAI_API_KEY,
    chroma_path="chromadb",)

vector_store = manager.get_store("udaplay")

In [5]:
# test if store has loaded well
if vector_store is None:
    print("‚ùå Store not found! Did you use the correct name ('game_knowledge')?")
    print("Existing collections:", [c.name for c in manager.chroma_client.list_collections()])
else:
    # Access the underlying Chroma collection to count documents
    count = vector_store._collection.count()
    print(f"‚úÖ VectorStore loaded successfully!")
    print(f"üìä Document Count: {count}")
    
    # 3. Sanity check: Fetch one item to ensure data is readable
    if count > 0:
        sample = vector_store.query(query_texts="Which game has action?", n_results=1)
        print("\n Sample Document ID:", sample['ids'][0])
        rprint("\n Content Snippet:", sample['documents'] )

‚úÖ VectorStore loaded successfully!
üìä Document Count: 15

 Sample Document ID: ['002']


### Tools

Build at least 3 tools:
- retrieve_game: To search the vector DB
- evaluate_retrieval: To assess the retrieval performance
- game_web_search: If no good, search the web


#### Retrieve Game Tool

In [6]:
# TODO: Create retrieve_game tool
# It should use chroma client and collection you created
# chroma_client = chromadb.PersistentClient(path="chromadb")
# collection = chroma_client.get_collection("udaplay")
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - query: a question about game industry. 
#
#    You'll receive results as list. Each element contains:
#    - Platform: like Game Boy, Playstation 5, Xbox 360...)
#    - Name: Name of the Game
#    - YearOfRelease: Year when that game was released for that platform
#    - Description: Additional details about the game

@tool

def retrieve_game(query: str, results: Optional [int] = 3) -> str:
    """
    Search for video game information in the internal knowledge base.
    Args:
        query (str): A question about the game industry.
        results (int, optional): Number of top results to retrieve. Defaults to 3.
    Returns:
        str: Formatted string containing game information from the knowledge base
 
       """
    # 1. Query with specific include parameters to ensure we get metadata and distances
    results = vector_store.query(
        query_texts=[query], 
        n_results=results,
    )
    
    # 2. Unpack the batch results (we only sent one query, so we take index 0)
    docs = results.get('documents', [[]])[0]
    metas = results.get('metadatas', [[]])[0]
    distances = results.get('distances', [[]])[0]
    
    if not docs:
        return "No relevant game information found in the knowledge base."

    # 3. Format the output nicely
    formatted_output = []
    for doc, meta, distance in zip(docs, metas, distances):
        # Create a structured block for each game found
        entry = (
            f"### Game: {meta.get('Name', 'Unknown Title')}\n"
            f"- **Platform**: {meta.get('Platform', 'N/A')}\n"
            f"- **Year**: {meta.get('YearOfRelease', 'N/A')}\n"
            f"- **Publisher**: {meta.get('Publisher', 'N/A')}\n"
            f"- **Details**: {doc}\n"
            f"- **Relevance Score**: {distance:.4f}\n"
        )
        formatted_output.append(entry)

    return "\n".join(formatted_output)

# Test the tool
rprint(retrieve_game("Which game has action gameplay and was released on PlayStation 2?"))

#### Evaluate Retrieval Tool

In [7]:
# TODO: Create evaluate_retrieval tool
# You might use an LLM as judge in this tool to evaluate the performance
# You need to prompt that LLM with something like:
# "Your task is to evaluate if the documents are enough to respond the query. "
# "Give a detailed explanation, so it's possible to take an action to accept it or not."
# Use EvaluationReport to parse the result
# Tool Docstring:
#    Based on the user's question and on the list of retrieved documents, 
#    it will analyze the usability of the documents to respond to that question. 
#    args: 
#    - question: original question from user
#    - retrieved_docs: retrieved documents most similar to the user query in the Vector Database
#    The result includes:
#    - useful: whether the documents are useful to answer the question
#    - description: description about the evaluation result

@tool
def evaluate_retrieval(question: str, retrieved_docs: str) -> str:
    """
    Evaluate the usability of retrieved documents to answer a user's question.
    Args:
        question (str): The original question from the user.
        retrieved_docs (str): Retrieved documents most similar to the user query in the Vector Database.
    Returns:
        str: Evaluation report indicating usefulness and description.
        - useful: whether the documents are useful to answer the question
        - description: description about the evaluation result
    """
    # Initialize LLM for evaluation
    llm = LLM(
        model="gpt-4o-mini",
        api_key=OPENAI_API_KEY,
        temperature=0.3
    )
    
    # Create the evaluation prompt
    evaluation_prompt = (
        f"Your task is to evaluate if the following documents are sufficient to respond to the question.\n\n"
        f"Question: {question}\n\n"
        f"Retrieved Documents:\n{retrieved_docs}\n\n"
        f"Please provide an evaluation report that includes:\n"
        f"- useful: whether the documents are useful to answer the question (True/False)\n"
        f"- description: detailed explanation of your evaluation."
    )
    
    # Get evaluation from LLM
    evaluation_response = llm.invoke([UserMessage(content=evaluation_prompt)])
    
    return evaluation_response.content

# Test evaluate_retrieval tool
rprint(evaluate_retrieval(
    "Which game has action gameplay and was released on PlayStation 2?",
    retrieve_game("Which game has action gameplay and was released on PlayStation 2?")
))

#### Game Web Search Tool

In [8]:
# TODO: Create game_web_search tool
# Please use Tavily client to search the web
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - question: a question about game industry. 

tavily_client = TavilyClient(api_key=TAVILY_API_KEY)    

@tool
def game_web_search(query: str, num_results: Optional[int] = 3) -> str:
    """
    Search the web for current video game information.
    Args:
        query (str): A question about the game industry.
    Returns:
        str: Formatted string containing web search results.
    """
    try:
        response = tavily_client.search(
            query, 
            search_depth="basic",
            max_results=num_results
            )
        
        formatted_output = []
        total_results = len(response.get('results')) # since we might get less than requested
        
        # rprint(f"Found {total_results} results for query: '{query}'\n")
        for result in response.get('results', [])[:total_results]:
            title = result.get('title', 'No Title')
            url = result.get('url', '#')
            content = result.get('content', 'No content available.')
            
            # Format as a clear source block
            entry = (
                f"### Web Result: {title}\n"
                f"- **Source**: {url}\n"
                f"- **Summary**: {content}\n"
            )
            formatted_output.append(entry)
            
        return "\n".join(formatted_output)

    except Exception as e:
        return f"Error performing web search: {str(e)}"

# Test game_web_search tool
rprint(game_web_search("Which game has action gameplay and was released on PlayStation 2?"))

### Agent

In [9]:
# TODO: Create your Agent abstraction using StateMachine
# Equip with an appropriate model
# Craft a good set of instructions 
# Plug all Tools you developed

agent = Agent(
    model_name="gpt-4o-mini",
    instructions=(
        "You are an Agentic video game expert that can intelligently decide which tools to use"
        "to answer user questions about video games. Reason about the response, change the query and call "
        "the tool again if needed in order to get better results. Always explain your reasoning and provide " 
        "comprehensive answers"
        ),
    tools=[retrieve_game, game_web_search, evaluate_retrieval],
)


In [10]:
# TODO: Invoke your agent
# - When Pok√©mon Gold and Silver was released?
# - Which one was the first 3D platformer Mario game?
# - Was Mortal Kombat X realeased for Playstation 5?
questions = [
    "When was Pok√©mon Gold and Silver released?",
    "Which one was the first 3D platformer Mario game?",
    "Was Mortal Kombat X released for Playstation 5?"
]
for question in questions:
    rprint(f"\n\n### ‚ùì Question: {question}\n")
    response = agent.invoke(question)
    rprint(f"### ü§ñ Agent Response:\n{response.get_final_state()['messages'][-1].content}\n")



[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__


[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__


[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__


### (Optional) Advanced

Version with explicit state machine and the tools being pre-defined nodes, allowing for the option to plan anyhow from the llm if it will re-ask the tools


In [12]:
from typing import TypedDict, List, Optional, Union, TypeVar
import json

from lib.state_machine import StateMachine, Step, EntryPoint, Termination, Run
from lib.llm import LLM
from lib.messages import AIMessage, UserMessage, SystemMessage, ToolMessage
from lib.tooling import Tool, ToolCall
from lib.memory import ShortTermMemory

# Define the state schema
class AgentState(TypedDict):
    user_query: str  # The current user query being processed
    instructions: str  # System instructions for the agent
    messages: List[dict]  # List of conversation messages
    web_results: Optional[str]  # Results from web search tool
    retrieved_docs: Optional[str]  # Documents retrieved from vector DB
    evaluation_report: Optional[str]  # Report from evaluation tool
    current_tool_calls: Optional[List[ToolCall]]  # Current pending tool calls
    total_tokens: int  # Track the cumulative total

class AdvancedAgent(Agent):
    def __init__(
            self, 
            model_name: str, 
            instructions: str, 
            tools: List[Tool], 
            persistent_db_path: Optional [str] = None,
            vector_store = None):
        
        super().__init__(model_name, instructions, tools)
        if vector_store:
            self.vector_store = vector_store
        elif persistent_db_path:
            self.manager = PersistentVectorStoreManager(
                openai_api_key=OPENAI_API_KEY,
                chroma_path=persistent_db_path,
            )
            self.vector_store = self.manager.get_store("udaplay")
        else:
            self.manager = VectorStoreManager(
                openai_api_key=OPENAI_API_KEY,
            )
            self.vector_store = self.manager.get_or_create_store("udaplay")

        return


    def _web_search(self, state: AgentState) -> AgentState:
        """
        Search the web for current video game information.
        Args:
            query (str): A question about the game industry.
        Returns:
            str: Formatted string containing web search results.
        """
        query = state['user_query']
        num_results = 5

        try:
            response = tavily_client.search(
                query, 
                search_depth="basic",
                max_results=num_results
                )
            
            formatted_output = []
            total_results = len(response.get('results')) # since we might get less than requested
            
            # rprint(f"Found {total_results} results for query: '{query}'\n")
            for result in response.get('results', [])[:total_results]:
                title = result.get('title', 'No Title')
                url = result.get('url', '#')
                content = result.get('content', 'No content available.')
                
                # Format as a clear source block
                entry = (
                    f"### Web Result: {title}\n"
                    f"- **Source**: {url}\n"
                    f"- **Summary**: {content}\n"
                )
                formatted_output.append(entry)
            
            return {
                **state,
                "web_results": "\n".join(formatted_output)
            }

        except Exception as e:
            return {
                **state,
                "web_results": f"Error performing web search: {str(e)}"
            }

    def _retrieve_documents(self, state: AgentState) -> AgentState:
        """
        Retrieve documents from the internal vector database.
        """
        """
        Search for video game information in the internal knowledge base.
        Args:
            query (str): A question about the game industry.
            results (int, optional): Number of top results to retrieve. Defaults to 3.
        Returns:
            str: Formatted string containing game information from the knowledge base
    
        """
        query = state["user_query"]
        
        results = self.vector_store.query(
            query_texts=[query], 
            n_results=5,
        )
        
        # unpack the results for first query [0] - since we only sent one query :)
        docs = results.get('documents', [[]])[0]
        metas = results.get('metadatas', [[]])[0]
        distances = results.get('distances', [[]])[0]
        
        if not docs:
            return {
                **state,
                "retrieved_docs": "No relevant game information found in the knowledge base."
            }

        formatted_output = []
        for doc, meta, distance in zip(docs, metas, distances):
            entry = (
                f"### Game: {meta.get('Name', 'Unknown Title')}\n"
                f"- **Platform**: {meta.get('Platform', 'N/A')}\n"
                f"- **Year**: {meta.get('YearOfRelease', 'N/A')}\n"
                f"- **Publisher**: {meta.get('Publisher', 'N/A')}\n"
                f"- **Details**: {doc}\n"
                f"- **Relevance Score**: {distance:.4f}\n"
            )
            formatted_output.append(entry)
        return {
            **state,
            "retrieved_docs": "\n".join(formatted_output)
        }
    
    def _evaluate_retrieval(self, state: AgentState) -> AgentState:
        
        llm = LLM(
            model="gpt-4o-mini",
            api_key=OPENAI_API_KEY,
            temperature=0.3
        )
        question = state["user_query"]
        retrieved_docs = state.get("retrieved_docs", "")
        web_results = state.get("web_results", "")
        # Create the evaluation prompt
        evaluation_prompt = (
            f"Your task is to evaluate if the following documents are sufficient to respond to the question.\n\n"
            f"Question: {question}\n\n"
            f"Retrieved Documents:\n{retrieved_docs}\n\n"
            f"Web Results:\n{web_results}\n\n"
            f"Please provide an evaluation report that includes:\n"
            f"- useful: whether the documents are useful to answer the question (True/False)\n"
            f"- description: detailed explanation of your evaluation."
        )
    
        # Get evaluation from LLM
        evaluation_response = llm.invoke([UserMessage(content=evaluation_prompt)])
        # Create AI message with content and tool calls
        ai_message = AIMessage(
            content=evaluation_response.content
        )
        return {
            **state,
            "messages": state["messages"] + [ai_message],
            "evaluation_report": evaluation_response.content
        }
    

    def _evaluate_documents(self, state: AgentState) -> AgentState:
        """
        Checks retrieved docs and web results, then injects them into 
        the message history so the LLM can use them.
        """
        internal_docs = state.get("retrieved_docs", "")
        web_docs = state.get("web_results", "")
        
        full_context = (
            f"--- INTERNAL DATABASE RESULTS ---\n{internal_docs}\n\n"
            f"--- WEB SEARCH RESULTS ---\n{web_docs}\n"
        )

        context_message = SystemMessage(
            content=(
                f"You have the following context available to answer the user's question:\n"
                f"{full_context}\n\n"
                "If the context is relevant, use it. If not, state that you don't know."
            )
        )
        

        current_messages = state["messages"]
        
        # If the last message is the user query, insert context before it
        # Otherwise, just append.
        new_messages = list(current_messages)
        new_messages.insert(-1, context_message) if len(new_messages) > 0 else new_messages.append(context_message)

        return {
            **state,
            "messages": new_messages,
            "evaluation_report": "Context merged and injected."
        }    

    def _create_state_machine(self) -> StateMachine[AgentState]:
        """Create the internal state machine for the agent"""
        machine = StateMachine[AgentState](AgentState)
        
        # Create steps
        entry = EntryPoint[AgentState]()
        message_prep = Step[AgentState]("message_prep", self._prepare_messages_step)
        doc_search = Step[AgentState]("doc_search", self._retrieve_documents)
        web_search = Step[AgentState]("web_search", self._web_search)
        evaluate_retrieval = Step[AgentState]("evaluate_retrieval", self._evaluate_retrieval)
        llm_processor = Step[AgentState]("llm_processor", self._llm_step)
        tool_executor = Step[AgentState]("tool_executor", self._tool_step)
        termination = Termination[AgentState]()
        
        machine.add_steps([entry, message_prep, doc_search, web_search, evaluate_retrieval, llm_processor, tool_executor, termination])
        
        # Add transitions
        machine.connect(entry, message_prep)
        machine.connect(message_prep, doc_search)
        machine.connect(doc_search, web_search)
        machine.connect(web_search, evaluate_retrieval)
        machine.connect(evaluate_retrieval, llm_processor)

        
        # Transition based on whether there are tool calls
        def check_tool_calls(state: AgentState) -> Union[Step[AgentState], str]:
            """Transition logic: Check if there are tool calls"""
            if state.get("current_tool_calls"):
                return tool_executor
            return termination
        
        machine.connect(llm_processor, [tool_executor, termination], check_tool_calls)
        machine.connect(tool_executor, llm_processor)  # Go back to llm after tool execution
        
        return machine
    

Create the agent as a defined state machine

In [13]:
advanced_agent = AdvancedAgent(
    model_name="gpt-4o-mini",
    instructions=(
        "You are an Agentic video game expert that can intelligently decide which tools to use"
        "to answer user questions about video games. Reason about the response, change the query and call "
        "the tool again if needed in order to get better results. Always explain your reasoning and provide " 
        "comprehensive answers"
        ),
    tools=[retrieve_game, game_web_search, evaluate_retrieval],
)


In [14]:
questions = [
    "When was Pok√©mon Gold and Silver released?",
    "Which one was the first 3D platformer Mario game?",
    "Was Mortal Kombat X released for Playstation 5?"
]
for question in questions:
    rprint(f"\n\n### ‚ùì Question: {question}\n")
    response = advanced_agent.invoke(question)
    rprint(f"### ü§ñ Agent Response:\n{response.get_final_state()['messages'][-1].content}\n")



[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: doc_search
[StateMachine] Executing step: web_search
[StateMachine] Executing step: evaluate_retrieval
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__


[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: doc_search
[StateMachine] Executing step: web_search
[StateMachine] Executing step: evaluate_retrieval
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__


[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: doc_search
[StateMachine] Executing step: web_search
[StateMachine] Executing step: evaluate_retrieval
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__
