# [STARTER] Udaplay Project

## Part 02 - Agent

In this part of the project, you'll use your VectorDB to be part of your Agent as a tool.

You're building UdaPlay, an AI Research Agent for the video game industry. The agent will:
1. Answer questions using internal knowledge (RAG)
2. Search the web when needed
3. Maintain conversation state
4. Return structured outputs
5. Store useful information for future use

### Setup

In [None]:
# Only needed for Udacity workspace

import importlib.util
import sys

# Check if 'pysqlite3' is available before importing
if importlib.util.find_spec("pysqlite3") is not None:
    import pysqlite3
    sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [None]:
# TODO: Import the necessary libs
# For example: 
import os
import json
from typing import TypedDict, Dict, Union, Optional, List
from dotenv import load_dotenv
import requests
from pydantic import BaseModel

from lib.tooling import tool
from lib.vector_db import  VectorStoreManager
from lib.state_machine import StateMachine, Step, EntryPoint, Termination
from lib.memory import ShortTermMemory, LongTermMemory, MemoryFragment

In [3]:
# TODO: Load environment variables
load_dotenv()

 # Verify API keys are loaded
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
CHROMA_OPENAI_API_KEY = os.getenv('CHROMA_OPENAI_API_KEY')
TAVILY_API_KEY = os.getenv('TAVILY_API_KEY')

# ✅ Validate keys with more readable error messaging
def require_env(var, name):
    assert var, f"[ConfigError] 🔒 {name} is missing! Please set it in your .env file."

require_env(OPENAI_API_KEY, "OPENAI_API_KEY")
require_env(CHROMA_OPENAI_API_KEY, "CHROMA_OPENAI_API_KEY")

# ✅ Detect and configure Vocareum keys
if OPENAI_API_KEY.startswith('voc-'):
    print("📡 Vocareum OpenAI key detected — routing requests through Vocareum endpoint.")
    os.environ['OPENAI_API_BASE'] = 'https://openai.vocareum.com/v1'

if CHROMA_OPENAI_API_KEY.startswith('voc-'):
    print("📡 Vocareum ChromaDB key detected — applying Vocareum configuration.")

📡 Vocareum OpenAI key detected — routing requests through Vocareum endpoint.
📡 Vocareum ChromaDB key detected — applying Vocareum configuration.


In [4]:
vector_manager = VectorStoreManager(CHROMA_OPENAI_API_KEY, persist_path="chroma-db")

vector_store = vector_manager.get_store("udaplay_games")

if vector_store:
    test_results = vector_store.get()
    print(f"✅ Connected to vector store. Sample docs: {len(test_results['ids'])}")
else:
    print("❌ Could not locate 'udaplay_games' vector store.")
    print("Please run Part 1 first!")

✅ Connected to vector store. Sample docs: 15


### Tools

Build at least 3 tools:
- retrieve_game: To search the vector DB
- evaluate_retrieval: To assess the retrieval performance
- game_web_search: If no good, search the web


#### Retrieve Game Tool

In [5]:
# TODO: Create retrieve_game tool
# It should use chroma client and collection you created
# chroma_client = chromadb.PersistentClient(path="chromadb")
# collection = chroma_client.get_collection("udaplay")
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - query: a question about game industry. 
#
#    You'll receive results as list. Each element contains:
#    - Platform: like Game Boy, Playstation 5, Xbox 360...)
#    - Name: Name of the Game
#    - YearOfRelease: Year when that game was released for that platform
#    - Description: Additional details about the game

@tool(name="retrieve_game", description="Semantic search: Finds relevant games in the vector DB.")
def retrieve_game(query: str, n_results: int = 3) -> Dict:
    """
    Semantic search: Finds the most relevant games in the vector DB.

    Args:
        query (str): A question about the game industry.
        n_results (int): Number of top matches to return (default: 3).

    Returns:
        Dict: The query, matched game entries, and number of results.
    """
    try:
        results = vector_store.query(query_texts=[query], n_results=n_results)
        matches = []

        if results.get("documents") and results["documents"][0]:
            for doc, distance, metadata in zip(
                results["documents"][0],
                results["distances"][0],
                results["metadatas"][0],
            ):
                matches.append({
                    "Name": metadata.get("name"),
                    "Platform": metadata.get("platform"),
                    "YearOfRelease": metadata.get("release_year"),
                    "Description": metadata.get("description"),
                    "Similarity": round(1 - distance, 4)
                })

        return {
            "query": query,
            "results": matches,
            "num_results": len(matches)
        }

    except Exception as e:
        return {
            "query": query,
            "results": [],
            "error": str(e)
        }

test_output = retrieve_game("Pokemon RPGs on Game Boy")
print(f"Retrieved {test_output['num_results']} games")
if test_output["results"]:
    top = test_output["results"][0]
    print(f"Top result: {top['Name']} ({top['YearOfRelease']}) on {top['Platform']}")


Retrieved 3 games
Top result: Pokémon Ruby and Sapphire (2002) on Game Boy Advance


#### Evaluate Retrieval Tool

In [7]:
# TODO: Create evaluate_retrieval tool
# You might use an LLM as judge in this tool to evaluate the performance
# You need to prompt that LLM with something like:
# "Your task is to evaluate if the documents are enough to respond the query. "
# "Give a detailed explanation, so it's possible to take an action to accept it or not."
# Use EvaluationReport to parse the result
# Tool Docstring:
#    Based on the user's question and on the list of retrieved documents, 
#    it will analyze the usability of the documents to respond to that question. 
#    args: 
#    - question: original question from user
#    - retrieved_docs: retrieved documents most similar to the user query in the Vector Database
#    The result includes:
#    - useful: whether the documents are useful to answer the question
#    - description: description about the evaluation result

# Evaluation schema as a typed response
class EvaluationReport(BaseModel):
    useful: bool
    description: str

@tool(name="evaluate_retrieval", description="Evaluates if retrieved documents are useful for the question.")
def evaluate_retrieval(question: str, retrieved_docs: Union[str, Dict]) -> Dict:
    """
    Based on the user's question and on the list of retrieved documents,
    this tool analyzes the usability of the documents to answer the question.

    Args:
        question (str): The user's original query.
        retrieved_docs (str | dict): Most similar documents from the vector DB.

    Returns:
        dict: Contains whether the documents are useful and a description of the evaluation.
    """

    # Detect key and configure evaluator
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    if not OPENAI_API_KEY:
        raise ValueError("OPENAI_API_KEY is not set.")

    from lib.llm import LLM  # make sure this is the updated LLM class with voc- support
    evaluator = LLM(model="gpt-4o-mini", temperature=0.1, api_key=OPENAI_API_KEY)

    def format_docs(docs) -> str:
        return "\n\n".join(
            [
                f"{i+1}. {doc.get('Name')} ({doc.get('YearOfRelease')}) on {doc.get('Platform')}\n"
                f"Description: {doc.get('Description', 'No description.')}"
                for i, doc in enumerate(docs)
            ]
        )

    try:
        # Parse retrieved_docs if it's a JSON string
        if isinstance(retrieved_docs, str):
            parsed = json.loads(retrieved_docs)
        else:
            parsed = retrieved_docs

        doc_list = parsed.get("results", [])
        formatted = format_docs(doc_list) if doc_list else "No relevant documents found."

        prompt = f"""
Your task is to decide whether the following documents contain **enough information** to reasonably answer the user's question.

User Question:
"{question}"

Retrieved Documents:
{formatted}

Respond ONLY in valid JSON with these fields:
- "useful" (true or false)
- "description" (string explaining your decision in one sentence)

Guidelines for `"useful": true`:
✅ If **any** document provides a relevant **year**, **platform**, or **game title** that reasonably supports the answer — even partially — mark it as `true`.

✅ You can assume the user is okay with brief or high-level answers if the core fact (like a release date or game name) is included.

❌ Only mark `"useful": false` if **none** of the documents help answer the question — not even partially — or the content is entirely unrelated.

Respond in strict JSON. No extra commentary.

Example:
{{
  "useful": true,
  "description": "One document states the game was released in 1996, which directly supports the question."
}}
"""

        # Call the LLM
        llm_response = evaluator.invoke(prompt)
        print("🔍 Raw LLM Response:", repr(llm_response.content))  # Debug print

        # Parse response content
        try:
            eval_data = json.loads(llm_response.content)
        except json.JSONDecodeError:
            return {
                "question": question,
                "useful": False,
                "description": f"Evaluation failed: LLM response was not valid JSON.\n\nRaw output:\n{llm_response.content}",
                "num_documents_evaluated": len(doc_list)
            }

        # Validate against schema
        report = EvaluationReport(**eval_data)

        return {
            "question": question,
            "useful": report.useful,
            "description": report.description,
            "num_documents_evaluated": len(doc_list)
        }

    except Exception as e:
        return {
            "question": question,
            "useful": False,
            "description": f"Evaluation failed: {e}",
            "num_documents_evaluated": 0
        }


In [8]:
test_result = retrieve_game("What are the most influential Pokémon titles on Game Boy?")
eval_result = evaluate_retrieval(test_result["query"], json.dumps(test_result))

print(f"Useful: {eval_result['useful']}")
print(f"Details: {eval_result['description']}")

📡 Vocareum key detected — using Vocareum API endpoint.
🔍 Raw LLM Response: '{\n  "useful": true,\n  "description": "The documents mention Pokémon Gold and Silver and Pokémon Ruby and Sapphire, both of which are influential Pokémon titles on handheld platforms."\n}'
Useful: True
Details: The documents mention Pokémon Gold and Silver and Pokémon Ruby and Sapphire, both of which are influential Pokémon titles on handheld platforms.


#### Game Web Search Tool

In [9]:
# TODO: Create game_web_search tool
# Please use Tavily client to search the web
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - question: a question about game industry. 

@tool(name="game_web_search", description="Web search: Uses the Tavily API to find real-time information about video games.")
def game_web_search(question: str) -> Dict:
    """
    Web search: Uses the Tavily API to find real-time information about video games.

    Args:
        question (str): A question about the game industry.

    Returns:
        dict: A quick answer and a list of relevant search results.
    """
    tavily_api_key = os.getenv("TAVILY_API_KEY")
    if not tavily_api_key:
        return {
            "error": "TAVILY_API_KEY is not set in environment.",
            "question": question,
            "results": []
        }

    url = "https://api.tavily.com/search"
    payload = {
        "api_key": tavily_api_key,
        "query": f"{question} video game",
        "search_depth": "advanced",
        "include_answer": True,
        "include_raw_content": False,
        "max_results": 5,
    }

    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        data = response.json()

        results = [
            {
                "title": item.get("title", ""),
                "url": item.get("url", ""),
                "snippet": item.get("content", ""),
                "score": item.get("score", 0),
            }
            for item in data.get("results", [])
        ]

        return {
            "question": question,
            "answer": data.get("answer", ""),
            "results": results,
            "num_results": len(results),
        }

    except requests.exceptions.RequestException as e:
        return {
            "error": f"Tavily request failed: {e}",
            "question": question,
            "results": []
        }
    except Exception as e:
        return {
            "error": f"Unexpected error: {e}",
            "question": question,
            "results": []
        }


result = game_web_search("Pokemon Gold Silver release date")
print(f"Web search found {result['num_results']} results")
if result.get("answer"):
    print(f"Quick answer: {result['answer'][:120]}...")


Web search found 5 results
Quick answer: Pokémon Gold and Silver were released in Japan in 1999, in North America in October 2000, and in Europe in April 2001. T...


### Agent

In [10]:
# TODO: Create your Agent abstraction using StateMachine
# Equip with an appropriate model
# Craft a good set of instructions 
# Plug all Tools you developed

class AgentState(TypedDict):
    query: str
    retrieved: Optional[Dict]
    useful: Optional[bool]
    description: Optional[str]
    web_results: Optional[Dict]

retrieve_step = Step("retrieve_game", lambda state: {
    "retrieved": retrieve_game(state["query"])
})

evaluate_step = Step("evaluate_retrieval", lambda state: {
    **evaluate_retrieval(state["query"], state["retrieved"])
})

web_search_step = Step("game_web_search", lambda state: {
    "web_results": game_web_search(state["query"])
})

final_step = Termination()
entry_step = EntryPoint()


In [11]:
# Custom fallback rule for date or uncertainty-based queries
def force_web_search(state: AgentState) -> bool:
    query = state["query"].lower()
    return any(kw in query for kw in [
        "release", "released", "release date", "when", "what year", "first", "date"
    ])

# Enhanced condition handler with logging
def smart_fallback_decision(state: AgentState):
    if not state.get("useful", False):
        if force_web_search(state):
            print("[💡 Info] Web search fallback triggered due to date-related keywords.")
        else:
            print("[💡 Info] Web search fallback triggered due to insufficient internal data.")
        return web_search_step
    return final_step

# Initialize StateMachine with the AgentState schema
machine = StateMachine(AgentState)

# Register all the steps
machine.add_steps([
    entry_step,
    retrieve_step,
    evaluate_step,
    web_search_step,
    final_step
])

# Define the workflow connections
machine.connect(entry_step, retrieve_step)
machine.connect(retrieve_step, evaluate_step)

# 🧠 Smart fallback based on LLM judgment, with logging
machine.connect(
    evaluate_step,
    [web_search_step, final_step],
    condition=smart_fallback_decision
)

# Final transition after optional web search
machine.connect(web_search_step, final_step)


In [12]:
class UdaPlayAgentStateful:
    def __init__(self):
        self.machine = machine  # State machine already configured

    def invoke(self, query: str):
        state: AgentState = {"query": query}
        run = self.machine.run(state)
        final = run.get_final_state()

        print("=" * 60)
        print(f"📌 Final Query: {final.get('query', '[unknown]')}")

        if final.get("useful"):
            print("✅ Internal data was sufficient.")
            if final.get("description"):
                print(f"🎮 Evaluation Summary: {final['description']}")
        elif final.get("web_results"):
            print("🔍 Internal data was insufficient — used web search.")
            if final["web_results"].get("answer"):
                print(f"🌐 Tavily Answer: {final['web_results']['answer'][:150]}...")
            elif final["web_results"].get("results"):
                print(f"🌐 Top Web Result: {final['web_results']['results'][0]['title']}")
            else:
                print("⚠️ Tavily returned no results.")
        else:
            print("❌ No useful data found.")

        print("=" * 60)
        return final


In [13]:
# TODO: Invoke your agent
# - When Pokémon Gold and Silver was released?
# - Which one was the first 3D platformer Mario game?
# - Was Mortal Kombat X realeased for Playstation 5?

# Define your agent
agent = UdaPlayAgentStateful()

# List of questions
questions = [
    "When Pokémon Gold and Silver was released?",
    "Which one was the first 3D platformer Mario game?",
    "Was Mortal Kombat X released for Playstation 5?"
]

# Run the agent on each question
results = []

for q in questions:
    print("\n" + "=" * 60)
    result = agent.invoke(q)
    results.append({
        "question": q,
        "final_state": result
    })




[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve_game
📡 Vocareum key detected — using Vocareum API endpoint.
🔍 Raw LLM Response: '{\n  "useful": true,\n  "description": "One document mentions Pokémon Gold and Silver and provides the year 1999, which directly answers the user\'s question."\n}'
[StateMachine] Executing step: evaluate_retrieval
[StateMachine] Terminating: __termination__
📌 Final Query: When Pokémon Gold and Silver was released?
✅ Internal data was sufficient.
🎮 Evaluation Summary: One document mentions Pokémon Gold and Silver and provides the year 1999, which directly answers the user's question.

[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve_game
📡 Vocareum key detected — using Vocareum API endpoint.
🔍 Raw LLM Response: '{\n  "useful": true,\n  "description": "One document identifies Super Mario 64 as a 3D platformer released in 1996, which directly answers the user\'s question."\n}'
[StateMachine] Executing step: e

### (Optional) Advanced

In [14]:
# TODO: Update your agent with long-term memory
# TODO: Convert the agent to be a state machine, with the tools being pre-defined nodes
class AgentState(TypedDict):
    query: str
    retrieved: Optional[Dict]
    useful: Optional[bool]
    description: Optional[str]
    web_results: Optional[Dict]
    memory: Optional[List[Dict]]
    short_term_summary: Optional[str]
    long_term_summary: Optional[str]
    
long_term_memory = []

def load_memory() -> List[Dict]:
    return long_term_memory[-10:]  # retrieve last 10 for context

def save_to_memory(query: str, answer: str):
    long_term_memory.append({
        "query": query,
        "answer": answer
    })


In [15]:
def invoke(self, query: str):
    state: AgentState = {
        "query": query,
        "memory": load_memory(),
        "short_term_summary": "",
        "long_term_summary": ""
    }

    # Run the state machine
    run = self.machine.run(state)
    final = run.get_final_state()

    # Fallback-safe answer extraction
    answer = (
        final.get("description") or
        (final.get("retrieved", {}).get("results", [{}])[0].get("description")) or
        final.get("web_results", {}).get("answer") or
        "No answer available."
    )

    # Save memory entry
    save_to_memory(final.get("query", query), answer)

    print("=" * 60)
    print(f"📌 Final Query: {final.get('query', query)}")

    if final.get("useful"):
        print("✅ Internal data was sufficient.")
        if final.get("retrieved", {}).get("results"):
            print(f"🎮 Top Internal Result: {final['retrieved']['results'][0].get('description')}")
    elif final.get("web_results", {}).get("answer"):
        print("🔍 Internal data was insufficient — used web search.")
        print(f"🌐 Tavily Answer: {final['web_results']['answer'][:150]}...")
    else:
        print("❌ No useful data found.")
    print("=" * 60)

    return final


In [16]:
retrieve_step = Step("retrieve_game", lambda state: {
    "retrieved": retrieve_game(state["query"])
})

evaluate_step = Step("evaluate_retrieval", lambda state: {
    **evaluate_retrieval(state["query"], state["retrieved"])
})

web_search_step = Step("game_web_search", lambda state: {
    "web_results": game_web_search(state["query"])
})


In [17]:
def summarize_short_term(memory: ShortTermMemory, session_id: str = "default"):
    history = memory.get_all_objects(session_id)
    if not history:
        return "🧠 No short-term memory available."

    summary = "\n".join(
        [f"Q: {item.get('query')}\nA: {item.get('answer')}" for item in history if isinstance(item, dict)]
    )
    print("📚 Short-Term Memory Summary:\n", summary)
    return summary

def summarize_long_term(memory: LongTermMemory, owner: str):
    results = memory.search("summary of past questions", owner=owner, limit=5)
    if not results.fragments:
        return "🧠 No long-term memory available."

    summary = "\n".join([f"- {frag.content}" for frag in results.fragments])
    print("📦 Long-Term Memory Summary:\n", summary)
    return summary

# 1. Init memory modules
short_term = ShortTermMemory()
long_term = LongTermMemory(vector_manager)

# 2. Short-term summary step
def summarize_short_term_step_fn(state: AgentState) -> AgentState:
    summary = summarize_short_term(short_term)
    state["short_term_summary"] = summary
    return state

short_term_step = Step("short_term_summary", summarize_short_term_step_fn)

# 3. Long-term summary step
def summarize_long_term_step_fn(state: AgentState) -> AgentState:
    summary = summarize_long_term(long_term, owner="user")
    state["long_term_summary"] = summary
    return state

long_term_step = Step("long_term_summary", summarize_long_term_step_fn)

#4. Commit memory step
def commit_memory_step_fn(state: AgentState) -> AgentState:
    query = state.get("query")
    answer = (
        state.get("web_results", {}).get("answer")
        or state.get("retrieved", {}).get("results", [{}])[0].get("Description")
        or state.get("description")
        or "No answer available."
    )

    if query and answer:
        short_term.add({"query": query, "answer": answer})

        frag = MemoryFragment(content=f"Q: {query}\nA: {answer}", owner="user")
        long_term.register(frag)

        long_term_memory.append({"query": query, "answer": answer})
        state["memory"] = load_memory()

        print("✅ Committed to short and long-term memory.")
    else:
        print("⚠️ Nothing to commit: query or answer missing.")
    
    return state

commit_memory_step = Step("commit_memory", commit_memory_step_fn)


# Final step
final_step = Termination()




In [18]:

# Initialize the state machine
machine = StateMachine(AgentState)

# Register all steps
machine.add_steps([
    entry_step,
    short_term_step,
    long_term_step,
    retrieve_step,
    evaluate_step,
    web_search_step,
    commit_memory_step,
    final_step
])

# Define step transitions
machine.connect(entry_step, short_term_step)
machine.connect(short_term_step, long_term_step)
machine.connect(long_term_step, retrieve_step)
machine.connect(retrieve_step, evaluate_step)

# Conditional branch: fallback to web or commit if retrieval was sufficient
machine.connect(
    evaluate_step,
    [web_search_step, commit_memory_step],
    condition=smart_fallback_decision
)

# After web search, still commit memory
machine.connect(web_search_step, commit_memory_step)

# Commit always transitions to final step
machine.connect(commit_memory_step, final_step)


In [19]:
# Define your agent
agent = UdaPlayAgentStateful()

# Example questions to test full flow
questions = [
    "When Pokémon Gold and Silver was released?",
    "Which one was the first 3D platformer Mario game?",
    "Was Mortal Kombat X released for Playstation 5?",
    "What year was Halo 3 released?",
    "Who published The Legend of Zelda: Ocarina of Time?",
    "When did Sony release the PS5?"
]

# Track all run outputs
results = []

for q in questions:
    print("\n" + "=" * 60)
    result = agent.invoke(q)
    results.append({
        "question": q,
        "state": result
    })

print("\n🧠 Final Long-Term Memory View:")

# Perform a broad retrieval of memory fragments for the user
results = long_term.search("past queries", owner="user", limit=20)

if not results.fragments:
    print("No long-term memory found.")
else:
    # Extract unique Q&A pairs from fragments
    unique_qas = {}
    for frag in results.fragments:
        if "Q:" in frag.content and "A:" in frag.content:
            parts = frag.content.split("A:", 1)
            q = parts[0].replace("Q:", "").strip()
            a = parts[1].strip()
            if q and a and q not in unique_qas:
                unique_qas[q] = a

    if not unique_qas:
        print("No valid Q&A pairs found in memory.")
    else:
        for i, (q, a) in enumerate(unique_qas.items(), 1):
            print(f"\n--- Run {i}: ---")
            print(f"Q: {q}")
            print(f"A: {a}")



[StateMachine] Starting: __entry__
[StateMachine] Executing step: short_term_summary
[StateMachine] Executing step: long_term_summary
[StateMachine] Executing step: retrieve_game
📡 Vocareum key detected — using Vocareum API endpoint.
🔍 Raw LLM Response: '{\n  "useful": true,\n  "description": "One document mentions Pokémon Gold and Silver and provides the year 1999, which directly answers the user\'s question."\n}'
[StateMachine] Executing step: evaluate_retrieval
[StateMachine] Terminating: __termination__
📌 Final Query: When Pokémon Gold and Silver was released?
✅ Internal data was sufficient.
🎮 Evaluation Summary: One document mentions Pokémon Gold and Silver and provides the year 1999, which directly answers the user's question.

[StateMachine] Starting: __entry__
[StateMachine] Executing step: short_term_summary
[StateMachine] Executing step: long_term_summary
[StateMachine] Executing step: retrieve_game
📡 Vocareum key detected — using Vocareum API endpoint.
🔍 Raw LLM Response: '