<a href="https://colab.research.google.com/github/m-gian/agents/blob/main/Create_Agent_from_scratch_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install the Google GenAI library for interacting with Google's generative AI models.
!pip install -U -q google-genai

In [None]:
# Import necessary libraries for file operations, JSON handling, unique IDs, timing, and data structures.
import os
import json
import uuid
import time
from pathlib import Path
from datetime import datetime, UTC
from typing import Dict, List, Callable, Any, Literal, Optional, Union
# Import Google GenAI types and client for AI model interaction.
from google import genai
from google.genai import types
# Import Pydantic for data validation and settings management.
from pydantic import BaseModel, Field

In [None]:
# Import `userdata` from `google.colab` to securely access user-defined secrets.
from google.colab import userdata

# Retrieve the Gemini API key from Colab's user data secrets.
GEMINI_API_KEY = userdata.get('GOOGLE_API_KEY')

# Initialize the GenAI client with the retrieved API key.
client = genai.Client(api_key=GEMINI_API_KEY)

In [None]:
# Define an `AgentObserver` class to log events and trace the agent's execution.
class AgentObserver:
    def __init__(self, log_dir="/content/logs"):
        # Generate a unique trace ID for each run.
        self.trace_id = str(uuid.uuid4())
        # Store a list of events during the trace.
        self.events = []

        # Create the log directory if it doesn't exist.
        Path(log_dir).mkdir(exist_ok=True)
        # Define the path for the log file.
        self.file_path = Path(log_dir) / f"trace_{self.trace_id}.jsonl"

    def log(self, event_type, data=None):
        # Create a log entry with timestamp, event type, and data.
        entry = {
            "trace_id": self.trace_id,
            "timestamp": time.time(),
            "event": event_type,
            "data": data or {}
        }

        # Append the entry to the in-memory events list.
        self.events.append(entry)

        # Write the log entry to the JSONL file.
        with open(self.file_path, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def span(self, name):
        # Create a new `Span` object for timing specific operations.
        return Span(self, name)

# Define a `Span` class for measuring the duration of operations (context manager).
class Span:
    def __init__(self, observer, name):
        self.observer = observer
        self.name = name

    def __enter__(self):
        # Record the start time and log a 'span_start' event.
        self.start = time.time()
        self.observer.log("span_start", {"name": self.name})

    def __exit__(self, exc_type, exc, tb):
        # Calculate the duration and log a 'span_end' event.
        duration = time.time() - self.start
        self.observer.log("span_end", {
            "name": self.name,
            "duration_sec": round(duration, 3)
        })

In [None]:
# Define a `MemoryStore` class for persistent storage of conversational memory.
class MemoryStore:
    def __init__(self, file_path: str, max_entries: int = 50):
        self.file_path = file_path
        self.max_entries = max_entries
        # Ensure the memory file exists upon initialization.
        self._ensure_file()

    def _ensure_file(self):
        # Create an empty JSON array in the file if it doesn't exist.
        if not os.path.exists(self.file_path):
            with open(self.file_path, "w") as f:
                json.dump([], f)

    def load_all(self) -> List[dict]:
        # Load all entries from the memory file.
        try:
            with open(self.file_path, "r") as f:
                return json.load(f)
        except Exception:
            # Return an empty list if the file is not found or corrupted.
            return []

    def append(self, entry: dict):
        # Append a new entry to the memory file.
        data = self.load_all()
        data.append(entry)

        with open(self.file_path, "w") as f:
            json.dump(data, f, indent=2)

    def get_recent(self, limit: Optional[int] = None) -> list[dict]:
        # Retrieve a limited number of recent entries from memory.
        data = self.load_all()
        limit = limit or self.max_entries
        return data[-limit:]

    def delete_all(self):
        # Clear all entries from the memory file.
        with open(self.file_path, "w") as f:
            json.dump([], f)

In [None]:
# Initialize the `MemoryStore` for the agent's long-term memory.
memory_store = MemoryStore(
    file_path="/content/agent_memory.json", # Path to the memory file.
    max_entries=10, # Maximum number of entries to retain.
)

In [None]:
# Define a `Tool` class to represent an external function callable by the agent.
class Tool:
    def __init__(
        self,
        name: str,
        description: str,
        input_schema: Dict[str, Any],
        output_schema: Dict[str, Any],
        func: Callable[..., Any],
    ):
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.output_schema = output_schema
        self.func = func

    def __call__(self, **kwargs):
        # Allow the tool to be called like a function.
        return self.func(**kwargs)

In [None]:
# Define a `ToolRegistry` class to manage and provide access to available tools.
class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        # Add a tool to the registry.
        self.tools[tool.name] = tool

    def get(self, name: str) -> Tool:
        # Retrieve a tool by its name.
        if name not in self.tools.keys():
            raise ValueError(f"Tool '{name}' not found")
        return self.tools[name]

    def list_tools(self) -> List[Dict[str, Any]]:
        # Return a list of tool descriptions, including their input schemas.
        return [
            {
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.input_schema.model_json_schema(),
            }
            for tool in self.tools.values()
        ]

    def get_tool_call_args_type(self) -> Union[BaseModel]:
        # Generate a Pydantic Union type for all tool input arguments.
        input_args_models = [tool.input_schema for tool in self.tools.values()]
        tool_call_args = Union[tuple(input_args_models)]
        return tool_call_args

    def get_tool_names(self) -> Literal[None]:
        # Return a Literal type containing all registered tool names.
        return Literal[*self.tools.keys()]

In [None]:
# Define simple mathematical functions that can be used as tools.
def add(a: int, b: int) -> int:
    return a + b

def multiply(a: int, b: int) -> int:
    return a * b

# Factory function to create a tool for deleting all memory.
def make_delete_all_memory_tool(memory_store: MemoryStore):
    def delete_all_memory(confirm: str):
        print(confirm)
        # Require explicit confirmation to prevent accidental memory deletion.
        if confirm.lower() != "true":
            raise ValueError(
                "delete_all_memory called without explicit confirmation"
            )

        memory_store.delete_all()
        return "All long-term memory has been permanently deleted."

    return delete_all_memory

# Create an instance of the delete_all_memory function using the memory_store.
delete_all_memory_fn = make_delete_all_memory_tool(memory_store)

In [None]:
# Initialize the `ToolRegistry`.
registry = ToolRegistry()

# Define Pydantic models for the input arguments of each tool.
class ToolAddArgs(BaseModel):
    a: int
    b: int

class ToolMultiplyArgs(BaseModel):
    a: int
    b: int

class DeleteAllMemoryArgs(BaseModel):
    confirm: Literal["true"] = Field(
        description="Must be 'true' to confirm permanent deletion of all memory."
    )

# Register the 'add' tool with its description, input/output schemas, and function.
registry.register(
    Tool(
        name="add",
        description="Add two numbers",
        input_schema=ToolAddArgs,
        output_schema={"result": "int"},
        func=add,
    )
)

# Register the 'multiply' tool.
registry.register(
    Tool(
        name="multiply",
        description="Multiply two numbers",
        input_schema=ToolMultiplyArgs,
        output_schema={"result": "int"},
        func=multiply,
    )
)

# Register the 'delete_all_memory' tool.
registry.register(
    Tool(
        name="delete_all_memory",
        description="Permanently delete all long-term memory. This action is irreversible.",
        input_schema=DeleteAllMemoryArgs,
        output_schema={"result": "string"},
        func=delete_all_memory_fn,
    )
)

In [None]:
# Get a Literal type of all registered tool names for type hinting.
ToolNameLiteral = registry.get_tool_names()
# Get a Union type of all tool argument schemas for type hinting.
ToolArgsUnion = registry.get_tool_call_args_type()

# Define Pydantic models for different types of LLM responses.
class ToolCall(BaseModel):
    action: Literal["tool"]
    thought: str
    tool_name: ToolNameLiteral
    args: ToolArgsUnion

class FinalAnswer(BaseModel):
    action: Literal["final"]
    answer: str

class HumanApproval(BaseModel):
    action: Literal["human"]
    reason: str

# Define a Union type for all possible LLM responses.
LLMResponse = Union[ToolCall, FinalAnswer, HumanApproval]

# Define the `GeminiLLM` class to interact with the Google Gemini model.
class GeminiLLM:
    def __init__(self, client, tool_registry, model="gemini-2.5-flash"):
        self.client = client
        self.model = model
        self.tool_registry = tool_registry
        # Generate the system instruction for the LLM based on available tools.
        self.system_instruction = self._create_system_instruction()

    def _create_system_instruction(self) -> str:
        # Format tool descriptions into a JSON string for the system prompt.
        tools_description = json.dumps(
            self.tool_registry.list_tools(),
            indent=2
        )

        # Construct the detailed system prompt with rules and tool information.
        system_prompt = """
You are a conversational AI agent that can interact with external tools.

CRITICAL RULES (MUST FOLLOW):
- You are NOT allowed to perform actions internally if a tool is provided in the registry for that action.
- If a tool exists that can perform any part of the task, you MUST use that tool.
- You MUST NOT skip tools, even for simple or obvious steps.
- You MUST NOT combine multiple operations into a single step unless a tool explicitly supports it.
- You may ONLY produce a final answer when no available tool can further advance the task.

HUMAN-IN-THE-LOOP (MANDATORY):
- You have a special action called "human".
- You MUST choose the "human" action BEFORE performing any irreversible, destructive, or sensitive operation.
- Examples include (but are not limited to): deleting memory, resetting state, or permanently altering stored data.
- When using the "human" action, you MUST clearly explain the reason approval is required.
- After asking for human approval, you have two options depending on the response:
    1. In case the approval is given: You MUST continue the task by selecting the appropriate next action (usually a tool call).
    2. In case the approval is denied: You MUST continue the conversation, informing the use that the original action won't be perfomed because
    approval was not given.
- Do not repeat this action consecutively. You must always follow a "human" action by a "tool" action.

IMPORTANT CLARIFICATION:
- Internal reasoning, text parsing, extraction, summarization, and analysis of the provided input
  are NOT considered tool-eligible operations.
- These cognitive operations MUST be performed internally by the model.
- Tools are ONLY required for external actions, side effects, or interactions with systems
  outside the model (APIs, databases, files, network, state mutation).

TOOL USAGE RULES:
- Each tool call must perform exactly ONE meaningful operation.
- If the task requires multiple operations, you MUST call tools sequentially.
- If multiple tools could apply, choose the most specific one.
- Tools MUST NOT be called unless explicitly selected as an action.

RESPONSE FORMAT (STRICT):
- You MUST respond ONLY in valid JSON.
- Never include explanations outside JSON.
- You must choose exactly ONE action per response.

Allowed actions:
1. Tool call:
{
"action": "tool",
"thought": "...",
"tool_name": "...",
"inputs": { ... }
}

2. Human approval request:
{
"action": "human",
"reason": "Clear explanation of why human approval is required."
}

3. Final answer (only if no tool or approval is needed):
{
"action": "final",
"answer": "..."
}
""" + "\n\nAvailable tools in the registry with description:\n" + tools_description
        return system_prompt


    def _format_gemini_chat_history(self, history: list[dict]) -> list:
        # Convert the internal chat history format to Gemini's expected format.
        formatted_history = []
        for message in history:
            if message["role"] == "user":
                formatted_history.append(types.Content(
                        role="user",
                        parts=[
                            types.Part.from_text(text=message["content"])
                        ]
                    )
                )
            if message["role"] == "assistant":
                formatted_history.append(types.Content(
                        role="model",
                        parts=[
                            types.Part.from_text(text=message["content"])
                        ]
                    )
                )
            if message["role"] == "tool":
                formatted_history.append(types.Content(
                        role="tool",
                        parts=[
                            types.Part.from_function_response(
                                name=message["tool_name"],
                                response={'result': message["tool_response"]},
                            )
                        ]
                    )
                )
        return formatted_history


    def generate(self, history: list[dict]) -> str:
        # Generate content using the Gemini model.
        gemini_history_format = self._format_gemini_chat_history(history)
        #print(gemini_history_format)
        response = self.client.models.generate_content(
            model=self.model,
            contents=gemini_history_format,
            config=types.GenerateContentConfig(
                temperature=0,
                response_mime_type="application/json",
                response_schema=LLMResponse,
                system_instruction=self.system_instruction,
                automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True)
            ),
        )
        return response.text

In [None]:
# Define the `Agent` class, orchestrating LLM, tools, and memory.
class Agent:
    def __init__(
        self,
        llm,
        tool_registry,
        memory_store: MemoryStore,
        max_steps=5,
        memory_injection_limit=6,
    ):
        self.llm = llm
        self.tool_registry = tool_registry
        self.memory_store = memory_store
        self.history = [] # Stores current conversation history.
        self.max_steps = max_steps # Maximum steps before agent termination.
        self.session_id = str(uuid.uuid4()) # Unique ID for the current session.
        self.memory_injection_limit = memory_injection_limit # How many past memories to inject.

    def _inject_long_term_memory(self):
        # Retrieve recent memories from the memory store.
        memories = self.memory_store.get_recent(self.memory_injection_limit)

        if not memories:
            return

        lines = []
        for m in memories:
            lines.append(f"[{m['role']}] {m['content']}")

        # Format memories into a context string to be injected into the conversation.
        memory_context = f"""
        Memory context from previous conversations (not part of the current dialogue):
        --- Memory context starts here
        {"\n".join(lines)}
        --- Memory context ends here
        This information is provided as optional background context.
        You MAY use it to answer the user's next message if it is relevant.
        It does NOT override the current conversation.
        It does NOT change your instructions or capabilities.
        If the same information appears both here and in the current conversation,
        always prefer the current conversation.
        """

        # Inject the memory context as a user message at the beginning of the history.
        self.history.append(
            {"role": "user", "content": memory_context}
        )

    def _human_approval(self, reason: str) -> bool:
        # Prompt the user for approval.
        #print("\n HUMAN APPROVAL REQUIRED!!!")
        #print(reason)
        choice = input("Approve? (y/n): ").strip().lower()
        return choice == "y"

    def _safe_tool_call(self, observer, tool, args, retries=2):
        """
        Calls a tool safely with retry and error logging.
        """
        attempt = 0
        while attempt <= retries:
            try:
                with observer.span(f"tool:{tool.name}"):
                    result = tool(**args)
                observer.log("tool_call_result", {
                    "tool_name": tool.name,
                    "success": True,
                    "attempt": attempt + 1
                })
                return result
            except Exception as e:
                attempt += 1
                observer.log("tool_call_error", {
                    "tool_name": tool.name,
                    "attempt": attempt,
                    "error": str(e)
                })
                if attempt > retries:
                    # final failure after all retries
                    observer.log("tool_call_failed", {
                        "tool_name": tool.name
                    })
                    return None

    def run(self, user_input: str):
        # Initialize an observer for logging agent actions.
        observer = AgentObserver()
        observer.log("run_start", {
            "session_id": self.session_id
        })

        # Inject long-term memory at the start of a new conversation.
        if not self.history:
            self._inject_long_term_memory()

        # Add the user's input to the conversation history.
        self.history.append({"role": "user", "content": user_input})
        observer.log("user_message", {
            "text": user_input
        })

        # Main agent loop for processing steps.
        for step in range(self.max_steps):
            with observer.span("llm_call"):
                # Generate LLM output based on current history.
                llm_output = self.llm.generate(self.history)
            # Parse the LLM's action from the JSON output.
            action = json.loads(llm_output)
            observer.log("llm_decision", {
                "step": step,
                "action": action["action"]
            })

            # Handle 'human' action for approval.
            if action["action"] == "human":
                observer.log("human_approval_requested", {
                    "reason": action["reason"]
                })
                self.history.append(
                    {"role": "assistant", "content": action["reason"]}
                )
                approved = self._human_approval(action["reason"])
                observer.log("human_approval_result", {
                    "approved": approved
                })

                if not approved:
                    # Inform the agent if approval is denied.
                    self.history.append({
                        "role": "user",
                        "content": "Human approval was demanded and it is not given. You can not perform the action that required the approval."
                    })

                # Inform the agent if approval is given.
                self.history.append({
                    "role": "user",
                    "content": "Human approval was demanded and it is given. You can now proceed with the action that required the approval."
                })
                continue

            # Handle 'tool' action for tool execution.
            if action["action"] == "tool":
                observer.log("tool_call_requested", {
                    "tool_name": action["tool_name"],
                    "args": action["args"]
                })
                self.history.append(
                    {"role": "assistant", "content": llm_output}
                )
                # Get the tool from the registry and execute it.
                tool = self.tool_registry.get(action["tool_name"])
                result = self._safe_tool_call(observer, tool, action["args"])
                observer.log("tool_call_result", {
                    "tool_name": tool.name,
                    "tool_response": result,
                })

                # Special handling for memory deletion to clear history.
                if action["tool_name"] == "delete_all_memory":
                    observer.log("memory_cleared")
                    self.history = self.history[1:] # Clear chat history after memory deletion

                # Add tool result to history.
                self.history.append(
                    {
                        "role": "tool",
                        "tool_name": tool.name,
                        "tool_response": result,
                    }
                )
                continue

            # Handle 'final' action to conclude the conversation.
            if action["action"] == "final":
                self.history.append(
                    {"role": "assistant", "content": llm_output}
                )
                observer.log("final_answer", {
                    "text": action["answer"]
                })

                timestamp = datetime.now(UTC).isoformat()

                # Persist meaningful turns (user input and final answer) to long-term memory.
                self.memory_store.append({
                    "session_id": self.session_id,
                    "timestamp": timestamp,
                    "role": "user",
                    "content": user_input,
                })

                self.memory_store.append({
                    "session_id": self.session_id,
                    "timestamp": timestamp,
                    "role": "assistant",
                    "content": action["answer"],
                })

                observer.log("run_complete", {
                    "steps_used": step + 1
                })

                return action["answer"]

        # Raise an error if the agent doesn't terminate within `max_steps`.
        raise RuntimeError("Agent did not terminate within max_steps")

In [None]:
# Initialize the Gemini LLM with the client and tool registry.
llm = GeminiLLM(client, registry)
# Initialize the Agent with the LLM, tool registry, and memory store.
agent = Agent(llm, registry, memory_store)

In [None]:
# Optional: Print the system instruction given to the LLM for debugging/inspection.
#print(llm.system_instruction)

In [None]:
# Define a function to conduct an interactive chat session with the agent.
def chat_with_agent(agent: Agent):
    print("Welcome! Type 'exit' to quit.\n")

    while True:
        user_input = input("You: ")
        if user_input.lower() in ["exit", "quit", "q"]:
            print("Goodbye!")
            break

        try:
            # Run the agent with the user's input.
            response = agent.run(user_input)
            print(f"Agent: {response}")
        except RuntimeError as e:
            print(f"Agent error: {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")

In [None]:
# Start the chat interface with the initialized agent.
chat_with_agent(agent)

Welcome! Type 'exit' to quit.

You: what's 59+12
DD result 71
Agent: 59 + 12 = 71
You: q
Goodbye!


In [None]:
# Display the agent's internal chat history after a conversation.
agent.history

[{'role': 'user',
  'content': 'hello, please delete all memoories of our previous conversations'},
 {'role': 'assistant',
  'content': 'Human approval was demanded and it was given. I can now proceed with the action that required the approval.'},
 {'role': 'assistant',
  'content': '{\n"action": "tool",\n"thought": "Human approval was given to delete all memories. I will now call the `delete_all_memory` tool to perform this irreversible action.",\n"tool_name": "delete_all_memory",\n"args": {\n"confirm": "true"\n}\n}'},
 {'role': 'tool',
  'tool_name': 'delete_all_memory',
  'tool_response': 'All long-term memory has been permanently deleted.'},
 {'role': 'assistant',
  'content': '{\n"action": "final",\n"answer": "All memories of our previous conversations have been permanently deleted."\n}'},
 {'role': 'user', 'content': 'great thanks'},
 {'role': 'assistant',
  'content': '{\n"action": "final",\n"answer": "You\'re welcome!"\n}'}]

In [None]:
# Display the first 50 lines of the agent's log file (trace_*.jsonl).
# This command uses shell `sed` to limit output for large log files.
!sed -n '1,50p' logs/trace_*.jsonl

In [None]:
# Parse the JSON content of the last message in the agent's history.
# This is useful for inspecting the LLM's final action or response.
json.loads(agent.history[-1]["content"])

{'action': 'final', 'answer': "I'm glad you think so!"}

In [None]:
# Manually call the LLM to generate a response for a specific chat history.
# This bypasses the full agent run loop and is useful for testing LLM behavior in isolation.
r = llm.generate([{'role': 'user', 'content': 'hi, Can you add 10 and 32?'},
 {'role': 'assistant',
  'content': '{\n  "action": "tool",\n  "thought": "The user wants to add two numbers, 10 and 32. I should use the \'add\' tool for this.",\n  "tool_name": "add",\n  "args": {\n    "a": 10,\n    "b": 32\n  }\n}'},
 {'role': 'tool', 'tool_name': 'add', 'tool_response': 42},
 {'role': 'assistant',
  'content': '{\n  "action": "final",\n  "answer": "42"\n}'},
 {'role': 'user', 'content': 'now multiply that by 2'}] )

In [None]:
# Display the raw response from the LLM generated in the previous cell.
r