## ADK Features

- This notebook contains the key features of ADK that can help you build your custom agent.
- Credit to lavinigam@

<span style="color:red">**YOU NEED TO UPDATE YOUR PROJECT_ID AND LOCATION**</span>

<span style="color:red">**Verify google-adk version is 0.5.0**</span>

In [1]:
!uv pip show google-adk | grep Version

Version: 0.5.0


In [2]:
%load_ext autoreload
%autoreload 2

In [None]:
# # Temp - to supress WARNING:google_genai.types:
import warnings

warnings.filterwarnings("ignore")

import warnings

warnings.filterwarnings(
    "ignore", category=UserWarning, module="google.generativeai.types.content_types"
)  # Suppress harmless warning

In [None]:
import os

PROJECT_ID = "hello-world-418507"
LOCATION = "us-central1"

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"  # Use Vertex AI API


### LLM Agent with a Single Tool

In [5]:
from google.adk.agents import LlmAgent
from google.genai import types
from pydantic import BaseModel
from google.adk.agents import Agent
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner

# Constant
APP_NAME = "capital_city_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "capital_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-exp"


# Define a simple tool
def get_capital_city(country: str) -> str:
    """Retrieves the capital city of a given country.

    Args:
        country: The name of the country.

    Returns:
        The capital city of the country.
    """
    country_capitals = {
        "united states": "washington, d.c.",
        "canada": "ottawa",
        "france": "paris",
    }
    return country_capitals.get(country.lower(), "Capital not found")


# Agent
capital_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name="capital_agent",
    description="An agent that can retrieve the capital city of a country.",
    instruction="""You are an agent that can retrieve the capital city of a country.
    When a user provides a prompt, extract the country name.
    Then, use the `get_capital_city` tool to retrieve the capital city for that country.
    Finally, present the capital city to the user in a clear and concise manner.
    """,
    tools=[get_capital_city],
    generate_content_config=types.GenerateContentConfig(
        max_output_tokens=100,
    ),
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=capital_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


call_agent("What is the capital of france?")



Agent Response:  The capital of France is Paris.



### LLM Agent with a Input/Output Schema

In [None]:
from pydantic import BaseModel
from pydantic import Field
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent
from google.genai import types

# Constant
APP_NAME = "capital_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "capital_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"


class InputSchema(BaseModel):
    country: str = Field(description="The country to find the capital of.")


class OutputSchema(BaseModel):
    capital: str = Field(description="The capital of the country.")


# Agent
capital_agent = Agent(
    model=GEMINI_2_FLASH,
    name=AGENT_NAME,
    instruction="""You are a Capital Information Agent. Your task is to provide the capital of a given country.

    When a user provides a prompt, extract the country name.
    Then, respond with the capital of that country in the following JSON format:

    """,
    description="""You are an agent who can tell the capital of a country.""",
    disallow_transfer_to_peers=True,
    disallow_transfer_to_parent=True,
    input_schema=InputSchema,
    output_schema=OutputSchema,
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=capital_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


call_agent('{"country": "France"}')
call_agent('{"country": "Germany"}')
call_agent('{"country": "Japan"}')

: 

### LLM Agent with a output_key

- You set `output_key` to store the key value pair into session state.

In [None]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent
import json


# Constant
APP_NAME = "capital_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "capital_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"

# Agent
capital_agent = Agent(
    model=GEMINI_2_FLASH,
    name=AGENT_NAME,
    instruction="""You are a Capital Information Agent. Your task is to provide the capital of a given country.

    When a user provides a prompt, extract the country name.

    """,
    description="""You are an agent who can tell the capital of a country.""",
    output_key="capital_output",
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=capital_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)
            print(
                "Session State:",
                session_service.get_session(
                    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
                ).state.get("capital_output"),
            )


call_agent("What's the capital of France?")
call_agent("What's the capital of Germany?")
call_agent("What's the capital of Japan?")

: 

### LLM Agent with a built_in_code_execution

In [None]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent
import json
from google.adk.tools import built_in_code_execution

# Constant
APP_NAME = "capital_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "capital_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"

code_agent = LlmAgent(
    name="code_execution_agent",
    model=GEMINI_2_FLASH,
    tools=[built_in_code_execution],
    instruction="Generate python code to solve the user's request. "
    "If the user asks for a specific output, return the output of the code execution.",
)
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=code_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    print(f"\n--- Agent Response ---\n{part.text}\n")
        else:
            print(f"Event: {event}")


call_agent("what is 111 + 222")

: 

### LLM Agent with a Multiple Tool

In [None]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent

# Constants
APP_NAME = "recipe_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "recipe_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"

# --- Mock Data ---
recipes = {
    "pasta carbonara": {
        "ingredients": [
            "pasta",
            "eggs",
            "guanciale",
            "pecorino romano",
            "black pepper",
        ],
        "dietary_restrictions": ["none"],
    },
    "chicken tikka masala": {
        "ingredients": ["chicken", "yogurt", "ginger", "garlic", "masala blend"],
        "dietary_restrictions": ["none"],
    },
    "vegan lentil soup": {
        "ingredients": ["lentils", "carrots", "celery", "onion", "vegetable broth"],
        "dietary_restrictions": ["vegan"],
    },
}


# --- Tools ---
def search_recipes(keyword: str) -> str:
    """Searches for recipes based on a keyword.

    Args:
        keyword: The keyword to search for (e.g., ingredient).

    Returns:
        A string containing the names of matching recipes, or a message if no recipes are found.
    """
    matching_recipes = [
        recipe_name
        for recipe_name, recipe_data in recipes.items()
        if keyword.lower() in recipe_name.lower()
        or keyword.lower() in recipe_data["ingredients"]
    ]
    if matching_recipes:
        return f"Recipes matching '{keyword}': {', '.join(matching_recipes)}."
    else:
        return f"No recipes found matching '{keyword}'."


def check_dietary_restrictions(recipe_name: str, dietary_restriction: str) -> str:
    """Checks if a recipe is suitable for a given dietary restriction.

    Args:
        recipe_name: The name of the recipe to check.
        dietary_restriction: The dietary restriction to check for (e.g., "vegan").

    Returns:
        A string indicating if the recipe is suitable or not.
    """
    recipe_data = recipes.get(recipe_name.lower())
    if recipe_data:
        if dietary_restriction.lower() in recipe_data["dietary_restrictions"]:
            return f"'{recipe_name}' is suitable for a '{dietary_restriction}' diet."
        else:
            return (
                f"'{recipe_name}' is not suitable for a '{dietary_restriction}' diet."
            )
    else:
        return f"Recipe '{recipe_name}' not found."


def get_ingredient_list(recipe_name: str) -> str:
    """Returns a list of ingredients for a given recipe.

    Args:
        recipe_name: The name of the recipe.

    Returns:
        A string containing the list of ingredients, or a message if the recipe is not found.
    """
    recipe_data = recipes.get(recipe_name.lower())
    if recipe_data:
        return (
            f"Ingredients for '{recipe_name}': {', '.join(recipe_data['ingredients'])}."
        )
    else:
        return f"Recipe '{recipe_name}' not found."


# --- Agent ---
recipe_agent = Agent(
    model=GEMINI_2_FLASH,
    name=AGENT_NAME,
    instruction="""You are a Recipe Agent. Your task is to help users find recipes and check their suitability for dietary restrictions.

    You have access to three tools:
    1. `search_recipes`: Use this tool to find recipes based on a keyword (e.g., ingredient).
    2. `check_dietary_restrictions`: Use this tool to check if a recipe is suitable for a given dietary restriction.
    3. `get_ingredient_list`: Use this tool to get a list of ingredients for a given recipe.

    When a user provides a prompt, first determine what they are asking for.
    - If they are asking for recipes based on a keyword, use the `search_recipes` tool.
    - If they are asking if a recipe is suitable for a dietary restriction, use the `check_dietary_restrictions` tool.
    - If they are asking for a list of ingredients for a recipe, use the `get_ingredient_list` tool.
    Finally, present the information to the user in a clear and concise manner.
    """,
    description="""An agent that can find recipes, check dietary restrictions, and list ingredients.
    It has access to the `search_recipes`, `check_dietary_restrictions`, and `get_ingredient_list` tools.""",
    tools=[
        search_recipes,
        check_dietary_restrictions,
        get_ingredient_list,
    ],
)

# --- Session and Runner ---
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=recipe_agent, app_name=APP_NAME, session_service=session_service)


# --- Agent Interaction ---
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


call_agent("Find recipes with chicken.")
call_agent("Is pasta carbonara suitable for a vegan diet?")
call_agent("What are the ingredients in vegan lentil soup?")

: 

### LLM Agent with a Async Run

- It is as simple as using `runner.run_async` instead of `runner.run`.

In [None]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent


# Constant
APP_NAME = "weather_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "weather_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"


# Tool
def get_weather(city: str) -> str:
    """Retrieves weather information for the given city.

    Args:
        city: The name of the city for which to retrieve weather information.

    Returns:
        A string containing the weather information for the specified city,
        or a message indicating that the weather information was not found.
    """
    cities = {
        "chicago": {"temperature": 25, "condition": "sunny", "sky": "clear"},
        "toronto": {"temperature": 30, "condition": "partly cloudy", "sky": "overcast"},
        "chennai": {
            "temperature": 15,
            "condition": "rainy",
            "sky": "cloudy",
        },  # Fixed typo: 'tempeerature' to 'temperature'
    }

    city_lower = city.lower()
    if city_lower in cities:
        weather_data = cities[city_lower]
        return f"Weather in {city} is {weather_data['temperature']} degrees Celsius, {weather_data['condition']} with a {weather_data['sky']} sky."
    else:
        return f"Weather information for {city} not found."


# Agent
root_agent = Agent(
    model=GEMINI_2_FLASH,
    name=AGENT_NAME,
    instruction="""You are a Weather Information Agent. Your task is to provide weather information for a given city.

    When a user provides a prompt, extract the city name.
    Then, use the `get_weather` tool to retrieve the weather information for that city.
    Finally, present the weather information to the user in a clear and concise manner.""",
    description="""You are an agent who can fetch weather information for a city.
    You have access to the `get_weather` tool to accomplish this task.""",
    tools=[get_weather],
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
async def call_agent_async(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    async for event in runner.run_async(
        user_id=USER_ID, session_id=SESSION_ID, new_message=content
    ):
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


await call_agent_async("What's the weather in toronto?")

: 

### LLM Agent with single sub-agent

In [12]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent
import asyncio

# Constant
APP_NAME = "weather_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "weather_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"


# Tool
def get_weather(city: str) -> str:
    """Retrieves weather information for the given city.

    Args:
        city: The name of the city for which to retrieve weather information.

    Returns:
        A string containing the weather information for the specified city,
        or a message indicating that the weather information was not found.
    """
    cities = {
        "chicago": {"temperature": 25, "condition": "sunny", "sky": "clear"},
        "toronto": {"temperature": 30, "condition": "partly cloudy", "sky": "overcast"},
        "chennai": {"temperature": 15, "condition": "rainy", "sky": "cloudy"},
    }

    city_lower = city.lower()
    if city_lower in cities:
        weather_data = cities[city_lower]
        return f"Weather in {city} is {weather_data['temperature']} degrees Celsius, {weather_data['condition']} with a {weather_data['sky']} sky."
    else:
        return f"Weather information for {city} not found."


def get_greeting(name: str) -> str:
    """Greets the given name.

    Args:
        name: The name to greet.

    Returns:
        A greeting message.
    """
    return f"Hello, {name}!"


# Agent
root_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name="root_agent",
    instruction="""You are a helpful agent with tool and sub-agents.
    - When user ask about weather, extract the city name, then use the `get_weather` tool to retrieve the weather information for that city.
    - If the user asks for a greeting, transfer to the greeting_agent.""",
    description="""You are an agent who can fetch weather information for a city, and also greet a user.""",
    tools=[get_weather],
    # allow_transfer=True,
)

greeting_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name="greeting_agent",
    instruction="""You are a Greeting Agent. Your task is to greet the user.

    When a user provides a prompt, extract the name.
    Then, use the `get_greeting` tool to greet the user.
    Finally, present the greeting to the user in a clear and concise manner.
    If the user asks for weather information, transfer to the weather agent.""",
    description="""You are an agent who can greet a user.
    You have access to the `get_greeting` tool to accomplish this task.""",
    tools=[get_greeting],
    disallow_transfer_to_parent=True,
    disallow_transfer_to_peers=True,
)

# Set parent-child relationship
root_agent.sub_agents = [greeting_agent]

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
async def call_agent_async(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    async for event in runner.run_async(
        user_id=USER_ID, session_id=SESSION_ID, new_message=content
    ):
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


: 

In [None]:
await call_agent_async("What's the weather in toronto?")

: 

In [None]:
await call_agent_async("Hello, Jimmy!")

: 

In [None]:
await call_agent_async("What's the weather in chennai?")

: 

### Weather Agent with multiple sub-agents

In [16]:
# Tool
def get_weather(city: str) -> str:
    """Retrieves weather information for the given city.

    Args:
        city: The name of the city for which to retrieve weather information.

    Returns:
        A string containing the weather information for the specified city,
        or a message indicating that the weather information was not found.
    """
    cities = {
        "chicago": {"temperature": 25, "condition": "sunny", "sky": "clear"},
        "toronto": {"temperature": 30, "condition": "partly cloudy", "sky": "overcast"},
        "chennai": {"temperature": 15, "condition": "rainy", "sky": "cloudy"},
    }

    city_lower = city.lower()
    if city_lower in cities:
        weather_data = cities[city_lower]
        return f"Weather in {city} is {weather_data['temperature']} degrees Celsius, {weather_data['condition']} with a {weather_data['sky']} sky."
    else:
        return f"Weather information for {city} not found."


# Tool for the Greeting Agent
def say_hello(name: str = "there") -> str:
    """Provides a simple greeting."""
    return f"Hello, {name}!"


# Tool for the Farewell Agent
def say_goodbye() -> str:
    """Provides a simple farewell message."""
    return "Goodbye! Have a great day."

: 

In [None]:
# --- Import LlmAgent ---
from google.adk.agents import LlmAgent
import google.genai.types as types  # For Content/Part later

# --- Agent Definitions ---
AGENT_NAME_WEATHER = "weather_agent"
AGENT_NAME_GREETING = "greeting_agent"
AGENT_NAME_FAREWELL = "farewell_agent"
MODEL_NAME = "gemini-2.0-flash-001"  # Use a recent flash model

# --- Parent Agent: Weather ---
root_agent = LlmAgent(
    model=MODEL_NAME,
    name=AGENT_NAME_WEATHER,
    instruction=f"""You are the main Weather Agent in charge. Your primary responsibility is providing weather information.
    - **IF** the user asks specifically about the weather (e.g., 'weather in city', 'forecast'), use the 'get_weather' tool YOURSELF. **DO NOT transfer weather requests.**
    - **ONLY IF** the user gives a simple greeting (like 'Hi', 'Hello') with NO other request, transfer to the '{AGENT_NAME_GREETING}'.
    - **ONLY IF** the user explicitly says goodbye (like 'Bye', 'See you'), transfer to the '{AGENT_NAME_FAREWELL}'.
    - Handle only weather requests directly.
    """,
    description="Provides weather forecasts using 'get_weather'. Delegates greetings/farewells.",
    tools=[get_weather],
)
print(f"Defined parent agent: {root_agent.name}")


# --- Child Agent 1: Greeting ---
greeting_agent = LlmAgent(
    model=MODEL_NAME,
    name=AGENT_NAME_GREETING,
    instruction="You are the Greeting Agent. Use the 'say_hello' tool to greet the user. Do nothing else.",
    description="Handles simple greetings using the 'say_hello' tool.",
    tools=[say_hello],
    disallow_transfer_to_parent=True,
    disallow_transfer_to_peers=True,
)

# --- Child Agent 2: Farewell ---
farewell_agent = LlmAgent(
    model=MODEL_NAME,
    name=AGENT_NAME_FAREWELL,
    instruction="You are the Farewell Agent. Use the 'say_goodbye' tool when the user indicates they are leaving. Do nothing else.",
    description="Handles simple farewells using the 'say_goodbye' tool.",
    tools=[say_goodbye],
    disallow_transfer_to_parent=True,
    disallow_transfer_to_peers=True,
)

# --- Define the Parent-Child Relationship ---
# This tells the framework how the agents are structured.
root_agent.sub_agents = [greeting_agent, farewell_agent]
# The framework automatically sets the .parent_agent attribute on the children

print(
    "Defined weather_agent (parent), greeting_agent (child), and farewell_agent (child)."
)
print(f"{root_agent.name} children: {[child.name for child in root_agent.sub_agents]}")

: 

In [18]:
from google.adk.sessions import InMemorySessionService
import uuid
from google.adk.runners import Runner

session_service = InMemorySessionService()

# Required. Unique identifier for the application.
APP_NAME = "weather_app"
# Required. Identifier for the user interacting with the agent. This is a dynamic variable.
USER_ID = "12345"

SESSION_ID = f"session_{uuid.uuid4()}"  # Use a dynamic session ID
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)


runner = Runner(
    agent=root_agent,
    app_name=APP_NAME,
    session_service=session_service,
)


def call_agent(user_query):
    content = types.Content(role="user", parts=[types.Part(text=user_query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)

: 

In [None]:
call_agent(user_query="What's the weather like in Chennai?")

: 

In [None]:
call_agent(user_query="What about the weather in toronto?")

: 

In [None]:
call_agent(user_query="Okay, thanks, bye!")

: 

In [None]:
call_agent(user_query="Hi, my friend")

: 

### LLM Agents with Callbacks (Agent, Model & Tool)

- This is useful when you want to inspect the messege exchange agents
- This cell shows the available functio parameters that you can use in callback


This is how the sequence looks like:
- Before Agent Callback > Before Model Callback > After Model Callback > Before Tool Callback > After Tool Callback > Before Model Callback > After Model Callback > After Agent Callback

In [23]:
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent
import asyncio

# Constant
APP_NAME = "weather_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "weather_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"


# Tool
def get_weather(city: str) -> str:
    """Retrieves weather information for the given city.

    Args:
        city: The name of the city for which to retrieve weather information.

    Returns:
        A string containing the weather information for the specified city,
        or a message indicating that the weather information was not found.
    """
    cities = {
        "chicago": {"temperature": 25, "condition": "sunny", "sky": "clear"},
        "toronto": {"temperature": 30, "condition": "partly cloudy", "sky": "overcast"},
        "chennai": {"temperature": 15, "condition": "rainy", "sky": "cloudy"},
    }

    city_lower = city.lower()
    if city_lower in cities:
        weather_data = cities[city_lower]
        return f"Weather in {city} is {weather_data['temperature']} degrees Celsius, {weather_data['condition']} with a {weather_data['sky']} sky."
    else:
        return f"Weather information for {city} not found."


def get_greeting(name: str) -> str:
    """Greets the given name.

    Args:
        name: The name to greet.

    Returns:
        A greeting message.
    """
    return f"Hello, {name}!"


# Callbacks


def before_model_callback(callback_context, llm_request):
    print(
        f"Before Model Callback: Agent {callback_context._invocation_context.agent.name}, Request: {llm_request.contents}"
    )
    return None


def after_model_callback(callback_context, llm_response):
    print(
        f"After Model Callback: Agent {callback_context._invocation_context.agent.name}, Response: {llm_response.content}"
    )
    return None


def before_tool_callback(tool, args, tool_context):
    print(f"Before Tool Callback: Tool {tool.name}, Args: {args}")
    return None


def after_tool_callback(tool, args, tool_context, tool_response):
    print(f"After Tool Callback: Tool {tool.name}, Response: {tool_response}")
    return None


def before_agent_callback(callback_context):
    print(
        f"Before Agent Callback: Agent {callback_context._invocation_context.agent.name}"
    )
    return None


def after_agent_callback(callback_context):
    print(
        f"After Agent Callback: Agent {callback_context._invocation_context.agent.name}"
    )
    return None


root_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name="weather_agent",
    instruction="""You are a Weather Information Agent. Your task is to provide weather information for a given city.

    When a user provides a prompt, extract the city name.
    Then, use the `get_weather` tool to retrieve the weather information for that city.
    Finally, present the weather information to the user in a clear and concise manner.
    If the user asks for a greeting, transfer to the greeting agent.""",
    description="""You are an agent who can fetch weather information for a city.
    You have access to the `get_weather` tool to accomplish this task.""",
    tools=[get_weather],
    before_model_callback=before_model_callback,
    after_model_callback=after_model_callback,
    before_tool_callback=before_tool_callback,
    after_tool_callback=after_tool_callback,
    before_agent_callback=before_agent_callback,
    after_agent_callback=after_agent_callback,
)

greeting_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name="greeting_agent",
    instruction="""You are a Greeting Agent. Your task is to greet the user.

    When a user provides a prompt, extract the name.
    Then, use the `get_greeting` tool to greet the user.
    Finally, present the greeting to the user in a clear and concise manner.
    If the user asks for weather information, transfer to the weather agent.""",
    description="""You are an agent who can greet a user.
    You have access to the `get_greeting` tool to accomplish this task.""",
    tools=[get_greeting],
    disallow_transfer_to_parent=True,
    disallow_transfer_to_peers=True,
    before_model_callback=before_model_callback,
    after_model_callback=after_model_callback,
    before_tool_callback=before_tool_callback,
    after_tool_callback=after_tool_callback,
    before_agent_callback=before_agent_callback,
    after_agent_callback=after_agent_callback,
)

# Set parent-child relationship
root_agent.sub_agents = [greeting_agent]

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
async def call_agent_async(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    async for event in runner.run_async(
        user_id=USER_ID, session_id=SESSION_ID, new_message=content
    ):
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)

: 

In [None]:
await call_agent_async("What's the weather in Chicago?")

: 

### LLM Agent with before_agent_callback and state

- In this example, we check the session state `skip_agent`, and decide how the agent will response.
- If condition is met, we skip LLM response.


In [None]:
import asyncio
from typing import AsyncGenerator, Optional
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define the Callback Function (Same as before) ---
def simple_before_agent_logger(
    callback_context: CallbackContext,
) -> Optional[types.Content]:
    """Logs entry into an agent and checks a condition."""
    agent_name = callback_context.agent_name
    invocation_id = callback_context.invocation_id
    print(f"[Callback] Entering agent: {agent_name} (Invocation: {invocation_id})")

    # Example: Check a condition in state
    if callback_context.state.get("skip_agent", False):
        print(f"[Callback] Condition met: Skipping agent {agent_name}.")
        # Return Content to skip the agent's run
        return types.Content(
            parts=[types.Part(text=f"Agent {agent_name} was skipped by callback.")]
        )
    else:
        print(f"[Callback] Condition not met: Proceeding with agent {agent_name}.")
        # Return None to allow the agent's run to execute
        return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent and Assign Callback
    my_llm_agent = LlmAgent(
        name="SimpleLlmAgent",
        model=GEMINI_2_FLASH,
        instruction="You are a simple agent. Just say 'Hello!'",
        description="An LLM agent demonstrating before_agent_callback",
        before_agent_callback=simple_before_agent_logger,
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent, app_name="llm_demo_app", session_service=session_service
    )
    session_id_run = "llm_session_run_1"
    session_id_skip = "llm_session_skip_1"
    user_id = "llm_test_user"

    # Create sessions
    session_service.create_session(
        app_name="llm_demo_app", user_id=user_id, session_id=session_id_run
    )
    session_service.create_session(
        app_name="llm_demo_app",
        user_id=user_id,
        session_id=session_id_skip,
        state={"skip_agent": True},
    )  # Set state to trigger skip condition

    print("--- Running LLM Agent Normally ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(role="user", parts=[types.Part(text="Run normally")]),
    ):
        # Only print final LLM response or callback override
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )  # Added strip() for cleaner output

    print("\n--- Running LLM Agent with Skip Condition ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_skip,
        new_message=types.Content(parts=[types.Part(text="Skip this agent")]),
    ):
        # Only print final LLM response or callback override
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with after_agent_callback and state

- In this use case, we check the `add_concluding_note` session state.
- If condition is met, we append extra message to LLM response.

In [None]:
import asyncio
from typing import AsyncGenerator, Optional
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define the Callback Function ---
def simple_after_agent_logger(
    callback_context: CallbackContext,
) -> Optional[types.Content]:
    """Logs exit from an agent and optionally appends a message."""
    agent_name = callback_context.agent_name
    invocation_id = callback_context.invocation_id
    print(f"[Callback] Exiting agent: {agent_name} (Invocation: {invocation_id})")

    # Example: Optionally return Content to append a message
    if callback_context.state.get("add_concluding_note", False):
        print(f"[Callback] Adding concluding note for agent {agent_name}.")
        # Return Content to append after the agent's own output
        return types.Content(
            parts=[types.Part(text=f"Concluding note added by after_agent_callback.")]
        )
    else:
        print(f"[Callback] No concluding note added for agent {agent_name}.")
        # Return None - no additional message appended
        return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent and Assign Callback
    my_llm_agent = LlmAgent(
        name="SimpleLlmAgentWithAfter",
        model=GEMINI_2_FLASH,
        instruction="You are a simple agent. Just say 'Processing complete!'",
        description="An LLM agent demonstrating after_agent_callback",
        after_agent_callback=simple_after_agent_logger,  # Assign the function here
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent,
        app_name="llm_demo_app_after",
        session_service=session_service,
    )
    session_id_run = "llm_session_run_after_1"
    session_id_conclude = "llm_session_conclude_1"
    user_id = "llm_test_user_after"

    # Create sessions
    session_service.create_session(
        app_name="llm_demo_app_after", user_id=user_id, session_id=session_id_run
    )
    # Session where the callback will add a note
    session_service.create_session(
        app_name="llm_demo_app_after",
        user_id=user_id,
        session_id=session_id_conclude,
        state={"add_concluding_note": True},
    )

    # Test with different session_id
    print("--- Running LLM Agent Normally (with after_agent_callback) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(role="user", parts=[types.Part(text="Run normally")]),
    ):
        # Print any event content from agent or callback
        if event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print("\n--- Running LLM Agent with Concluding Note Condition ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_conclude,
        new_message=types.Content(
            role="user", parts=[types.Part(text="Run and conclude")]
        ),
    ):
        # Print any event content from agent or callback
        if event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with before_model_callback and state

In this example, we demonstrate how to

- modify system instruction
- block model response based on keyword in query

In [None]:
import asyncio
from typing import AsyncGenerator, Optional
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext

# Need LlmRequest and LlmResponse for the callback signature and return type
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define the Callback Function ---
def simple_before_model_modifier(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Inspects/modifies the LLM request or skips the call."""
    agent_name = callback_context.agent_name
    print(f"[Callback] Before model call for agent: {agent_name}")

    # Inspect the last user message in the request contents
    last_user_message = ""
    if llm_request.contents and llm_request.contents[-1].role == "user":
        if llm_request.contents[-1].parts:
            last_user_message = llm_request.contents[-1].parts[0].text
    print(f"[Callback] Inspecting last user message: '{last_user_message}'")

    # --- Modification Example ---
    # Add a prefix to the system instruction
    original_instruction = llm_request.config.system_instruction or types.Content(
        role="system", parts=[]
    )
    prefix = "[Modified by Callback] "
    # Ensure system_instruction is Content and parts list exists
    if not isinstance(original_instruction, types.Content):
        # Handle case where it might be a string (though config expects Content)
        original_instruction = types.Content(
            role="system", parts=[types.Part(text=str(original_instruction))]
        )
    if not original_instruction.parts:
        original_instruction.parts.append(
            types.Part(text="")
        )  # Add an empty part if none exist

    # Modify the text of the first part
    modified_text = prefix + (original_instruction.parts[0].text or "")
    original_instruction.parts[0].text = modified_text
    llm_request.config.system_instruction = original_instruction
    print(f"[Callback] Modified system instruction to: '{modified_text}'")

    # --- Skip Example ---
    # Check if the last user message contains "BLOCK"
    if "BLOCK" in last_user_message.upper():
        print("[Callback] 'BLOCK' keyword found. Skipping LLM call.")
        # Return an LlmResponse to skip the actual LLM call
        return LlmResponse(
            content=types.Content(
                role="model",
                parts=[
                    types.Part(text="LLM call was blocked by before_model_callback.")
                ],
            )
        )
    else:
        print("[Callback] Proceeding with LLM call.")
        # Return None to allow the (modified) request to go to the LLM
        return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent and Assign Callback
    my_llm_agent = LlmAgent(
        name="ModelCallbackAgent",
        model=GEMINI_2_FLASH,
        instruction="You are a helpful assistant.",  # Base instruction
        description="An LLM agent demonstrating before_model_callback",
        before_model_callback=simple_before_model_modifier,  # Assign the function here
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent, app_name="llm_model_cb_app", session_service=session_service
    )
    session_id_run = "model_cb_run_1"
    session_id_block = "model_cb_block_1"
    user_id = "model_cb_user"

    # Create sessions
    session_service.create_session(
        app_name="llm_model_cb_app", user_id=user_id, session_id=session_id_run
    )
    session_service.create_session(
        app_name="llm_model_cb_app", user_id=user_id, session_id=session_id_block
    )

    print(
        "--- Running LLM Agent Normally (with before_model_callback modification) ---"
    )
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(
            role="user", parts=[types.Part(text="Tell me a short joke.")]
        ),
    ):
        # Only print final LLM response or callback override
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print("\n--- Running LLM Agent with BLOCK Keyword (triggering skip) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_block,
        new_message=types.Content(
            role="user", parts=[types.Part(text="BLOCK this request.")]
        ),
    ):
        # Only print final LLM response or callback override
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with after_model_callback and state

This use case demonstract
- overwriting model response with custom text

In [None]:
import asyncio
from typing import AsyncGenerator, Optional
import copy  # Needed to safely modify response content

from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define the Callback Function ---
def simple_after_model_modifier(
    callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
    """Inspects/modifies the LLM response after it's received."""
    agent_name = callback_context.agent_name
    print(f"[Callback] After model call for agent: {agent_name}")

    # --- Inspection ---
    original_text = ""
    if llm_response.content and llm_response.content.parts:
        # Assuming simple text response for this example
        if llm_response.content.parts[0].text:
            original_text = llm_response.content.parts[0].text
            print(
                f"[Callback] Inspected original response text: '{original_text[:100]}...'"
            )  # Log snippet
        elif llm_response.content.parts[0].function_call:
            print(
                f"[Callback] Inspected response: Contains function call '{llm_response.content.parts[0].function_call.name}'. No text modification."
            )
            return None  # Don't modify tool calls in this example
        else:
            print("[Callback] Inspected response: No text content found.")
            return None
    elif llm_response.error_message:
        print(
            f"[Callback] Inspected response: Contains error '{llm_response.error_message}'. No modification."
        )
        return None
    else:
        print("[Callback] Inspected response: Empty LlmResponse.")
        return None  # Nothing to modify

    # --- Modification Example ---
    # Replace "scientists" with "funny scientists" (case-insensitive)
    search_term = "scientists"
    replace_term = "CUSTOM scientists"
    if search_term in original_text.lower():
        print(f"[Callback] Found '{search_term}'. Modifying response.")
        modified_text = original_text.replace(search_term, replace_term)
        modified_text = modified_text.replace(
            search_term.capitalize(), replace_term.capitalize()
        )  # Handle capitalization

        # Create a NEW LlmResponse with the modified content
        # Deep copy parts to avoid modifying original if other callbacks exist
        modified_parts = [copy.deepcopy(part) for part in llm_response.content.parts]
        modified_parts[0].text = modified_text  # Update the text in the copied part

        new_response = LlmResponse(
            content=types.Content(role="model", parts=modified_parts),
            # Copy other relevant fields if necessary, e.g., grounding_metadata
            grounding_metadata=llm_response.grounding_metadata,
        )
        print(f"[Callback] Returning modified response.")
        return new_response  # Return the modified response
    else:
        print(
            f"[Callback] '{search_term}' not found. Passing original response through."
        )
        # Return None to use the original llm_response
        return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent and Assign Callback
    my_llm_agent = LlmAgent(
        name="AfterModelCallbackAgent",
        model=GEMINI_2_FLASH,
        instruction="You are a helpful assistant.",
        description="An LLM agent demonstrating after_model_callback",
        after_model_callback=simple_after_model_modifier,  # Assign the function here
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent,
        app_name="llm_after_model_cb_app",
        session_service=session_service,
    )
    session_id_run = "after_model_cb_run_1"
    session_id_modify = "after_model_cb_modify_1"
    user_id = "after_model_cb_user"

    # Create sessions
    session_service.create_session(
        app_name="llm_after_model_cb_app", user_id=user_id, session_id=session_id_run
    )
    session_service.create_session(
        app_name="llm_after_model_cb_app", user_id=user_id, session_id=session_id_modify
    )

    print("--- Running LLM Agent Normally (Callback passes response through) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(role="user", parts=[types.Part(text="Say hello.")]),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print("\n--- Running LLM Agent with Input Triggering Modification ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_modify,
        new_message=types.Content(
            role="user", parts=[types.Part(text="Why don't scientists trust atoms")]
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with before_tool_callback and state

This cell demonstract:
- inspection of tool arguments, and overwriting of argument and response.

In [None]:
import asyncio
from typing import AsyncGenerator, Optional, Dict, Any
import copy

from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext

# Need LlmRequest, LlmResponse for context
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse

# Need BaseTool, ToolContext for the callback signature
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext

# Using FunctionTool to easily create a tool
from google.adk.tools.function_tool import FunctionTool
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define a Simple Tool Function ---
def get_capital_city(country: str) -> str:
    """Retrieves the capital city of a given country."""
    print(f"--- Tool 'get_capital_city' executing with country: {country} ---")
    country_capitals = {
        "united states": "Washington, D.C.",
        "canada": "Ottawa",  # Intentionally correct here
        "france": "Paris",
        "germany": "Berlin",
    }
    return country_capitals.get(country.lower(), f"Capital not found for {country}")


# --- Wrap the function into a Tool ---
capital_tool = FunctionTool(func=get_capital_city)


# --- Define the Callback Function ---
def simple_before_tool_modifier(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
    """Inspects/modifies tool args or skips the tool call."""
    agent_name = tool_context.agent_name
    tool_name = tool.name
    print(f"[Callback] Before tool call for tool '{tool_name}' in agent '{agent_name}'")
    print(f"[Callback] Original args: {args}")

    # --- Modification Example ---
    # If the tool is 'get_capital_city' and country is 'Canada', change it to 'France'
    if tool_name == "get_capital_city" and args.get("country", "").lower() == "canada":
        print("[Callback] Detected 'Canada'. Modifying args to 'France'.")
        args["country"] = "France"  # Modify the args dictionary directly
        print(f"[Callback] Modified args: {args}")
        return None  # Proceed with modified args

    # --- Skip Example ---
    # If the tool is 'get_capital_city' and country is 'BLOCK'
    if tool_name == "get_capital_city" and args.get("country", "").upper() == "BLOCK":
        print("[Callback] Detected 'BLOCK'. Skipping tool execution.")
        # Return a dictionary to be used as the tool result, skipping the actual tool call
        return {"result": "Tool execution was blocked by before_tool_callback."}

    print("[Callback] Proceeding with original or previously modified args.")
    # Return None to allow the tool to execute normally (with original or modified args)
    return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent with the tool and callback
    my_llm_agent = LlmAgent(
        name="ToolCallbackAgent",
        model=GEMINI_2_FLASH,
        instruction="You are an agent that can find capital cities. Use the get_capital_city tool.",
        description="An LLM agent demonstrating before_tool_callback",
        tools=[capital_tool],  # Add the tool here
        before_tool_callback=simple_before_tool_modifier,  # Assign the callback here
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent, app_name="llm_tool_cb_app", session_service=session_service
    )
    session_id_run = "tool_cb_run_1"
    session_id_modify = "tool_cb_modify_1"
    session_id_block = "tool_cb_block_1"
    user_id = "tool_cb_user"

    # Create sessions
    session_service.create_session(
        app_name="llm_tool_cb_app", user_id=user_id, session_id=session_id_run
    )
    session_service.create_session(
        app_name="llm_tool_cb_app", user_id=user_id, session_id=session_id_modify
    )
    session_service.create_session(
        app_name="llm_tool_cb_app", user_id=user_id, session_id=session_id_block
    )

    print("--- Running Agent (Normal Tool Call - Germany) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(
            role="user", parts=[types.Part(text="What is the capital of Germany?")]
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print(
        "\n--- Running Agent (Tool Call Triggering Modification - Canada -> France) ---"
    )
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_modify,
        new_message=types.Content(
            role="user", parts=[types.Part(text="What is the capital of Canada?")]
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print("\n--- Running Agent (Tool Call Triggering Skip - BLOCK) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_block,
        new_message=types.Content(
            role="user", parts=[types.Part(text="What is the capital of BLOCK?")]
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with after_tool_callback and state

- Modify tool call result based on tool argument.


In [None]:
import asyncio
from typing import AsyncGenerator, Optional, Dict, Any
import copy  # Good practice for modifying results

from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.function_tool import FunctionTool
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event
from google.genai import types

# Ensure GEMINI_2_FLASH is defined (replace if needed)
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred model


# --- Define a Simple Tool Function (Same as before) ---
def get_capital_city(country: str) -> str:
    """Retrieves the capital city of a given country."""
    print(f"--- Tool 'get_capital_city' executing with country: {country} ---")
    country_capitals = {
        "united states": "Washington, D.C.",
        "canada": "Ottawa",
        "france": "Paris",
        "germany": "Berlin",
    }
    return {
        "result": country_capitals.get(
            country.lower(), f"Capital not found for {country}"
        )
    }


# --- Wrap the function into a Tool ---
capital_tool = FunctionTool(func=get_capital_city)


# --- Define the Callback Function ---
def simple_after_tool_modifier(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext, tool_response: Dict
) -> Optional[Dict]:
    """Inspects/modifies the tool result after execution."""
    agent_name = tool_context.agent_name
    tool_name = tool.name
    print(f"[Callback] After tool call for tool '{tool_name}' in agent '{agent_name}'")
    print(f"[Callback] Args used: {args}")
    print(f"[Callback] Original tool_response: {tool_response}")

    # Default structure for function tool results is {"result": <return_value>}
    original_result_value = tool_response.get("result", "")
    # original_result_value = tool_response

    # --- Modification Example ---
    # If the tool was 'get_capital_city' and result is 'Washington, D.C.'
    if tool_name == "get_capital_city" and original_result_value == "Washington, D.C.":
        print("[Callback] Detected 'Washington, D.C.'. Modifying tool response.")

        # IMPORTANT: Create a new dictionary or modify a copy
        modified_response = copy.deepcopy(tool_response)
        modified_response["result"] = (
            f"{original_result_value} (Note: This is the capital of the USA)."
        )
        modified_response["note_added_by_callback"] = True  # Add extra info if needed

        print(f"[Callback] Modified tool_response: {modified_response}")
        return modified_response  # Return the modified dictionary

    print("[Callback] Passing original tool response through.")
    # Return None to use the original tool_response
    return None


# --- Setup and Run ---
async def main():
    # 1. Create LlmAgent with the tool and callback
    my_llm_agent = LlmAgent(
        name="AfterToolCallbackAgent",
        model=GEMINI_2_FLASH,
        instruction="You are an agent that finds capital cities using the get_capital_city tool. Report the result clearly.",
        description="An LLM agent demonstrating after_tool_callback",
        tools=[capital_tool],  # Add the tool
        after_tool_callback=simple_after_tool_modifier,  # Assign the callback
    )

    # 2. Setup Runner and Session
    session_service = InMemorySessionService()
    runner = Runner(
        agent=my_llm_agent,
        app_name="llm_after_tool_cb_app",
        session_service=session_service,
    )
    session_id_run = "after_tool_cb_run_1"
    session_id_modify = "after_tool_cb_modify_1"
    user_id = "after_tool_cb_user"

    # Create sessions
    session_service.create_session(
        app_name="llm_after_tool_cb_app", user_id=user_id, session_id=session_id_run
    )
    session_service.create_session(
        app_name="llm_after_tool_cb_app", user_id=user_id, session_id=session_id_modify
    )

    print("--- Running Agent (Callback passes result through - France) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_run,
        new_message=types.Content(
            role="user", parts=[types.Part(text="What is the capital of France?")]
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )

    print("\n--- Running Agent (Callback modifies result - United States) ---")
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id_modify,
        new_message=types.Content(
            role="user",
            parts=[types.Part(text="What is the capital of the United States?")],
        ),
    ):
        # Only print final LLM response
        if event.is_final_response() and event.content:
            print(
                f"Event Output: {event.author}: {event.content.parts[0].text.strip()}"
            )


await main()

: 

### LLM Agent with Gaurdrail (Profanity Checker with before_model callback)

- Prevent model calling if there is bad word detected.

In [7]:
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.llm_agent import AfterModelCallback, BeforeModelCallback
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.agents.llm_agent import LlmAgent
from typing import Any, List, Optional
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.agents import LlmAgent


def profanity_guardrail(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Check for profanity in the model request."""
    profanity_list: List[str] = ["badword1", "badword2", "badword3"]
    if llm_request.contents:
        for content in llm_request.contents:
            for part in content.parts:
                if part.text:
                    for profanity in profanity_list:
                        if profanity in part.text.lower():
                            callback_context.state["profanity_trigger"] = True
                            return LlmResponse(
                                content=types.Content(
                                    role="model",
                                    parts=[types.Part(text=("No bad word allowed."))],
                                )
                            )
    return None


# Tool
def get_weather(city: str) -> str:
    """Retrieves weather information for the given city.

    Args:
        city: The name of the city for which to retrieve weather information.

    Returns:
        A string containing the weather information for the specified city,
        or a message indicating that the weather information was not found.
    """
    cities = {
        "chicago": {"temperature": 25, "condition": "sunny", "sky": "clear"},
        "toronto": {"temperature": 30, "condition": "partly cloudy", "sky": "overcast"},
        "chennai": {"temperature": 15, "condition": "rainy", "sky": "cloudy"},
    }

    city_lower = city.lower()
    if city_lower in cities:
        weather_data = cities[city_lower]
        return f"Weather in {city} is {weather_data['temperature']} degrees Celsius, {weather_data['condition']} with a {weather_data['sky']} sky."
    else:
        return f"Weather information for {city} not found."


async def run_query(query: str):
    weather_agent = LlmAgent(
        model=GEMINI_2_FLASH,
        name="weather_agent",
        instruction="""You are a Weather Information Agent. Your task is to provide weather information for a given city.

        When a user provides a prompt, extract the city name.
        Then, use the `get_weather` tool to retrieve the weather information for that city.
        Finally, present the weather information to the user in a clear and concise manner.
        If the user asks for a greeting, transfer to the greeting agent.""",
        description="""You are an agent who can fetch weather information for a city.
        You have access to the `get_weather` tool to accomplish this task.""",
        tools=[get_weather],
        before_model_callback=profanity_guardrail,
    )

    session_service = InMemorySessionService()
    session = session_service.create_session(
        app_name="weather_app", user_id="12345", session_id="123344"
    )
    runner = Runner(
        agent=weather_agent,
        app_name="weather_app",
        session_service=session_service,
    )

    content = types.Content(role="user", parts=[types.Part(text=query)])
    async for event in runner.run_async(
        user_id="12345", session_id="123344", new_message=content
    ):
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print(f"Query: {query}")
            print(f"Agent Response: {final_response}")
            print("-" * 20)
        if "profanity_trigger" in event.actions.state_delta:
            print(
                f"Profanity Triggered: {event.actions.state_delta['profanity_trigger']}"
            )


: 

In [None]:
await run_query("What is the weather in Chicago?")

: 

In [None]:
await run_query("what the badword1 is the weather in Chicago?")

: 

### LlmAgent with All Callbacks

In [None]:
import asyncio
import warnings
import os
from typing import Any, Optional, Dict, List, AsyncGenerator

# --- ADK Imports ---
from google.adk.agents import Agent, LlmAgent, BaseAgent  # Using LlmAgent directly
from google.adk.sessions import InMemorySessionService, Session, State
from google.adk.runners import Runner
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.function_tool import FunctionTool
from google.adk.events import Event
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.callback_context import CallbackContext
from google.genai import types

# Suppress specific UserWarning from google.generativeai if necessary
warnings.filterwarnings(
    "ignore", category=UserWarning, module="google.generativeai.types.content_types"
)

# --- Constants ---
APP_NAME = "support_ticket_app"
USER_ID = "customer_123"
SESSION_ID = "ticket_session_abc"
AGENT_NAME = "support_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"  # Or your preferred Gemini model


# --- Simulated Knowledge Base Tool ---
def kb_search(keywords: List[str]) -> Dict[str, Any]:
    """
    Searches the knowledge base for troubleshooting steps based on keywords.

    Args:
        keywords: A list of keywords related to the issue (e.g., ['screen', 'flickering']).

    Returns:
        A dictionary containing potential solutions or an empty dictionary if none found.
    """
    print(f"      [Tool Executing: kb_search with keywords: {keywords}]")
    # Simple mock implementation
    mock_kb = {
        "screen": [
            "Check the display cable connection.",
            "Try a different monitor port.",
        ],
        "flickering": [
            "Update the graphics driver.",
            "Adjust the screen refresh rate.",
        ],
        "display": ["Ensure monitor power is on.", "Reboot the computer."],
        "keyboard": ["Check battery if wireless.", "Try a different USB port."],
    }
    results = []
    for keyword in keywords:
        if keyword.lower() in mock_kb:
            results.extend(mock_kb[keyword.lower()])

    if results:
        # Remove duplicates while preserving order (if Python 3.7+)
        unique_results = list(dict.fromkeys(results))
        print(f"      [Tool Result: Found {len(unique_results)} steps]")
        return {"solutions": unique_results}
    else:
        print("      [Tool Result: No relevant KB articles found]")
        return {
            "solutions": [
                "No specific troubleshooting steps found in KB for these keywords."
            ]
        }


# Wrap the function into a FunctionTool
kb_search_tool = FunctionTool(func=kb_search)


# --- Callback Implementations ---


def log_before_agent(callback_context: CallbackContext) -> Optional[types.Content]:
    """Callback executed before the agent starts processing."""
    ticket_id = (
        f"TICKET-{SESSION_ID.split('_')[-1].upper()}"  # Simulate getting ticket ID
    )
    print(f"\n[Callback Triggered: before_agent_callback]")
    print(f"  -> Processing Ticket ID: {ticket_id}")
    # Example state modification: Store ticket ID
    callback_context.state["ticket_id"] = ticket_id
    print(f"  -> Agent '{callback_context.agent_name}' starting.")
    return None  # Return None to allow agent execution


def log_after_agent(callback_context: CallbackContext) -> Optional[types.Content]:
    """Callback executed after the agent finishes processing."""
    ticket_id = callback_context.state.get("ticket_id", "UNKNOWN")
    print(f"\n[Callback Triggered: after_agent_callback]")
    print(f"  -> Finished processing for Ticket ID: {ticket_id}.")
    # Example: Simulate updating ticket status in an external system
    print(
        f"  -> Updating external system: Ticket {ticket_id} status set to 'Responded'."
    )
    # Example state modification
    callback_context.state["processing_status"] = "completed"
    return None  # Return None, we don't want to append extra content here


def log_before_model(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Callback executed before sending the request to the LLM."""
    print(f"\n[Callback Triggered: before_model_callback]")
    print(f"  -> Preparing to call LLM for agent '{callback_context.agent_name}'.")
    # Example: Log request details (be careful with PII in real scenarios)
    # print(f"  -> LLM Request Contents (brief): {str(llm_request.contents)[:200]}...")
    # Example: Add a safety reminder (Note: modifying system_instruction directly might be overwritten)
    # llm_request.append_instructions(["Remember to be helpful and safe."]) # Use append_instructions
    print(f"  -> Safety check/prompt augmentation applied (simulated).")
    return None  # Return None to proceed with LLM call


def check_after_model(
    callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
    """Callback executed after receiving the response from the LLM."""
    print(f"\n[Callback Triggered: after_model_callback]")
    print(f"  -> Received LLM response for agent '{callback_context.agent_name}'.")
    # Example: Log response details (be careful with PII)
    response_text = (
        llm_response.content.parts[0].text
        if llm_response.content and llm_response.content.parts
        else "[No Text]"
    )
    print(f"  -> LLM Raw Response (brief): {response_text[:100]}...")
    # Example: Simulate PII check
    if "password" in response_text.lower() or "credit card" in response_text.lower():
        print("  -> !! PII potentially detected in LLM response (simulated) !!")
        # Could modify response here, e.g., return an error or redacted text
        # return LlmResponse(content=types.Content(parts=[types.Part(text="[Response redacted due to potential PII]")]))
    else:
        print("  -> PII check passed (simulated).")
    return (
        None  # Return None to use the original (or potentially modified) LLM response
    )


def validate_before_tool(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
    """Callback executed before a tool is called."""
    print(f"\n[Callback Triggered: before_tool_callback for Tool: '{tool.name}']")
    print(f"  -> Attempting to call tool '{tool.name}' with args: {args}")
    # Example: Validate arguments
    if tool.name == "kb_search" and "keywords" in args:
        if not isinstance(args["keywords"], list) or not args["keywords"]:
            print(
                "  -> !! Validation Failed: Keywords must be a non-empty list. Skipping tool call. !!"
            )
            return {
                "error": "Invalid keywords provided for KB search."
            }  # Return error to LLM
        print("  -> Keyword validation passed.")
    return None  # Return None to proceed with actual tool execution


def log_after_tool(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext, tool_response: Dict
) -> Optional[Dict]:
    """Callback executed after a tool has run."""
    print(f"\n[Callback Triggered: after_tool_callback for Tool: '{tool.name}']")
    print(f"  -> Tool '{tool.name}' executed with args: {args}")
    print(f"  -> Received tool response (brief): {str(tool_response)[:200]}...")
    # Example: Caching simulation (just log it)
    print(f"  -> Caching tool result (simulated).")
    # Example: Modify response if needed
    # if "solutions" in tool_response and tool_response["solutions"]:
    #    tool_response["solutions"].append("Also, try restarting your device.") # Append suggestion
    return None  # Return None to use the original (or modified) tool response


# --- Agent Definition ---
support_agent = LlmAgent(
    model=GEMINI_2_FLASH,
    name=AGENT_NAME,
    instruction="""You are an IT Support Agent. Your goal is to help users troubleshoot technical issues.
1. Analyze the user's problem description (support ticket).
2. Identify keywords related to the issue.
3. If the issue relates to common hardware problems like 'screen', 'display', 'flickering', 'keyboard', use the `kb_search` tool with the identified keywords to find troubleshooting steps.
4. Based on your analysis and any results from the `kb_search` tool, provide a clear, step-by-step response to the user.
5. If the `kb_search` tool doesn't return useful information, state that and provide general troubleshooting advice (e.g., restart, check connections).
""",
    description="First-level IT support agent that analyzes issues and uses a knowledge base.",
    tools=[kb_search_tool],
    # --- Assign Callbacks ---
    before_agent_callback=log_before_agent,
    after_agent_callback=log_after_agent,
    before_model_callback=log_before_model,
    after_model_callback=check_after_model,
    before_tool_callback=validate_before_tool,
    after_tool_callback=log_after_tool,
)

# --- Session and Runner Setup ---
session_service = InMemorySessionService()
# Ensure session is created before running
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=support_agent, app_name=APP_NAME, session_service=session_service)


# --- Agent Interaction Logic ---
async def call_agent_and_show_flow(query):
    print(f"\n--- Starting Workflow for Query: '{query}' ---")
    print(f"\n[User Submits Ticket: '{query}']")
    content = types.Content(role="user", parts=[types.Part(text=query)])
    final_response_text = "[Agent did not produce a final response text]"

    async for event in runner.run_async(
        user_id=USER_ID, session_id=SESSION_ID, new_message=content
    ):
        # Print event details to trace the flow
        # print(f"\nDEBUG Event: Author={event.author}, Partial={event.partial}, Final={event.is_final_response()}, Content={str(event.content)[:100]}...")

        if event.get_function_calls():
            print(
                f"\n[AFW: LLM decided to use Tool '{event.get_function_calls()[0].name}']"
            )
            # Before tool callback is triggered internally by the runner/flow

        elif event.get_function_responses():
            print(
                f"\n[AFW: Received result from Tool '{event.get_function_responses()[0].name}']"
            )
            # After tool callback is triggered internally by the runner/flow
            print(
                "  -> AFW: Sending tool result back to LLM for final response generation..."
            )

        if event.is_final_response() and event.content and event.content.parts:
            # Check if there's text before accessing parts[0]
            if event.content.parts[0].text:
                final_response_text = event.content.parts[0].text
                print(f"\n[AFW: Sending Final Response to User]")
                print(f"  -> Response: {final_response_text}")
            else:
                print(
                    f"\n[AFW: Final Response - Non-text content received: {event.content.parts}]"
                )
                final_response_text = "[Non-text final response]"
            # After agent callback will be triggered after this loop finishes internally

    print(f"\n--- Workflow Finished for Query: '{query}' ---")
    # Retrieve final state to show callback modification
    final_session = session_service.get_session(
        app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
    )
    print(
        f"Final Session State (example): ticket_id='{final_session.state.get('ticket_id')}', status='{final_session.state.get('processing_status')}'"
    )
    return final_response_text


await call_agent_and_show_flow("Help! My computer screen keeps flickering constantly.")

: 

###  Quickstart session, states and SessionService

- How to retreive session state

In [None]:
from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types

# --- Constants ---
APP_NAME = "capital_finder_app"
USER_ID = "quickstart_user"
SESSION_ID = "session_abc"
MODEL = "gemini-2.0-flash-001"

# Agent
capital_agent = LlmAgent(
    model=MODEL,
    name="CapitalFinderAgent",
    instruction="""You are an agent that finds the capital of a given country.
    When asked for the capital, respond *only* with the name of the capital city.
    """,
    output_key="capital_city",  # Save the agent's final response text to state['capital_city']
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=capital_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("\nAgent Response: ", final_response)


initial_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(f"Initial Session State: {initial_session.state}")  # Should be empty {}

call_agent("What is the capital of france?")

final_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(
    f"Final Session State: {final_session.state}"
)  # Should now contain {'capital_city': 'Paris'}

: 

### Session State - State Manupilation

- How to manually modify session state.

In [16]:
from google.adk.agents import LlmAgent
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event, EventActions

# --- Constants ---
APP_NAME = "task_manager_app"
USER_ID = "test_user"
AGENT_NAME = "task_manager_agent"
MODEL_NAME = "gemini-2.0-flash-001"  # Or any suitable model

# --- Agent Definition ---
#  Simplified instruction, as we're handling logic directly
task_agent = LlmAgent(
    model=MODEL_NAME,
    name=AGENT_NAME,
    instruction="""You are a Task Management Agent. Respond to user requests to manage tasks.
    """,
)


# --- Helper Functions ---


def add_task(session, task_description):
    """Adds a task to the task list."""
    tasks = session.state.get("user:tasks", [])  # Get current tasks (or empty list)
    new_task_id = len(tasks) + 1
    new_task = {"id": new_task_id, "description": task_description, "status": "pending"}
    tasks.append(new_task)
    # Use EventActions to update the state (delta update)
    add_event = Event(
        author="agent", actions=EventActions(state_delta={"user:tasks": tasks})
    )
    session_service.append_event(session, add_event)
    return f"Task '{task_description}' added with ID {new_task_id}."


def modify_task(session, task_id, new_status):
    """Modifies the status of a task."""
    tasks = session.state.get("user:tasks", [])
    try:
        task_id = int(task_id)  # Ensure task_id is an integer
    except ValueError:
        return "Invalid task ID. Please provide a number."

    for i, task in enumerate(tasks):
        if task["id"] == task_id:
            tasks[i]["status"] = new_status
            # Update state via EventActions
            modify_event = Event(
                author="agent", actions=EventActions(state_delta={"user:tasks": tasks})
            )
            session_service.append_event(session, modify_event)
            return f"Task {task_id} status updated to '{new_status}'."
    return f"Task with ID {task_id} not found."


def delete_task(session, task_id):
    """Deletes a task from the task list."""
    tasks = session.state.get("user:tasks", [])
    try:
        task_id = int(task_id)
    except ValueError:
        return "Invalid task ID.  Please provide a number."

    updated_tasks = [task for task in tasks if task["id"] != task_id]
    if len(updated_tasks) < len(tasks):
        # Update state via EventActions
        delete_event = Event(
            author="agent",
            actions=EventActions(state_delta={"user:tasks": updated_tasks}),
        )
        session_service.append_event(session, delete_event)
        return f"Task {task_id} deleted."
    return f"Task with ID {task_id} not found."


def list_tasks(session):
    """Lists all tasks for the user."""
    tasks = session.state.get("user:tasks", [])
    if not tasks:
        return "You have no tasks."
    task_list_str = "\n".join(
        f"{task['id']}: {task['description']} ({task['status']})" for task in tasks
    )
    return f"Your tasks:\n{task_list_str}"


def call_agent(user_input, session):
    """Sends user input to the agent and processes events."""
    content = types.Content(role="user", parts=[types.Part(text=user_input)])
    events = runner.run(
        user_id=USER_ID, session_id=session.id, new_message=content
    )  # session.id, not SESSION_ID

    final_response_text = ""
    for event in events:
        if event.content and event.content.role == "model":
            final_response_text = event.content.parts[0].text
            break  # Exit loop after getting the final response.

    return final_response_text

: 

In [None]:
# 1. Create a Session (with initial state, if any)
#  demonstrates: create_session, InMemorySessionService, initial state
USER_ID = "test_user2"
session_service = InMemorySessionService()

session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID
)  #  Let the service generate the ID

print(f"Created session with ID: {session.id}")
runner = Runner(agent=task_agent, app_name=APP_NAME, session_service=session_service)

: 

In [None]:
session.id

: 

In [None]:
retrieved_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)
print(f"\nRetrieved session state:")
retrieved_session.state

: 

In [None]:
print("\nAdding tasks directly to state via Function...")

add_task(session, "Buy milk")
add_task(session, "Walk the dog")
add_task(session, "Prepare presentation")
add_task(session, "Buy groceries")

: 

In [None]:
# 3. Retrieve and display the session (demonstrates get_session)
retrieved_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)
print(f"\nRetrieved session state:")
retrieved_session.state

: 

In [None]:
# 4. List tasks (demonstrates accessing state)
print("\nListing tasks...")
print(list_tasks(retrieved_session))  # Using a helper, accessing state directly

: 

In [None]:
# 5. Modify a task (demonstrates state modification)
print("\nModifying task 2...")
print(modify_task(retrieved_session, "2", "completed"))
retrieved_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)
print(f"Modified session state:")
retrieved_session.state

: 

In [None]:
# 6. Delete a task (demonstrates state deletion)
print("\nDeleting task 1...")
print(delete_task(retrieved_session, "1"))
retrieved_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)

: 

In [None]:
print(f"Session state after deletion:")
retrieved_session.state

: 

In [None]:
# 7. List sessions for the user (demonstrates list_sessions)
# Demonstrates:  list_sessions
print(f"\nSessions for user {USER_ID}:")
session_service.list_sessions(app_name=APP_NAME, user_id=USER_ID)

: 

In [None]:
# 8. Delete a session (demonstrates delete_session).
session_service.delete_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)
print(f"\nDeleted session: {session.id}")
retrieved_session = session_service.get_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=session.id
)

: 

In [None]:
if retrieved_session:  # Should be None.
    print(retrieved_session)
else:
    print("Session deleted successfully, as could not be retrieved.")

: 

### Session State - delta_states (OPTIONAL)

In [None]:
from google.adk.sessions import InMemorySessionService


# Create the session service
session_service = InMemorySessionService()

# Create a session (with initial state)
session = session_service.create_session(
    app_name="my_app",
    user_id="user1",
    state={"order_status": "pending", "items": ["shirt"], "notes": "Initial order"},
)

# --- Direct State Manipulation (Generally NOT Recommended) ---

# 1. Retrieve the session (to get the current state)
retrieved_session = session_service.get_session(
    app_name="my_app", user_id="user1", session_id=session.id
)

# 2. Access the state dictionary directly
current_state = retrieved_session.state

# --- Simulating a Delta Update (Directly) ---

# Directly modify the state dictionary as if applying a state_delta:
current_state["order_status"] = "shipped"  # Update existing key
current_state["tracking_number"] = "XYZ456"  # Add a new key
current_state["items"].append("pants")  # Modify a list (append)
del current_state["notes"]  # Delete a key

# The changes are reflected *immediately* in the retrieved_session (with InMemorySessionService)
print(retrieved_session.state)

# --- Demonstrating State Prefixes (Directly) ---
# We need to use setdefault to properly initialize nested dictionaries.
print("haha", session_service.app_state.setdefault("my_app", {}))
print(
    "haha", session_service.user_state.setdefault("my_app", {}).setdefault("user1", {})
)
session_service.app_state.setdefault("my_app", {})["max_retries"] = 3  # app-level
session_service.user_state.setdefault("my_app", {}).setdefault("user1", {})[
    "pref_contact"
] = "email"
current_state["temp:request_id"] = "temp_value"

print("\nState with prefixes added:")
print(retrieved_session.state)  # temp wont be there

retrieved_session = session_service.get_session(
    app_name="my_app", user_id="user1", session_id=session.id
)
print("\nState after retrieval (temp should be gone):")

print(retrieved_session.state)

print("\nApp State:")
print(session_service.app_state)
print("\nUser State:")
print(session_service.user_state)

: 

### Accessing Session Properties

In [None]:
from google.adk.sessions import InMemorySessionService


# Create a simple session to examine its properties
temp_service = InMemorySessionService()
example_session = temp_service.create_session(
    app_name="my_app", user_id="example_user", state={"initial_value": 1}
)

print(f"--- Examining Session Properties ---")
print(f"ID (`id`):                {example_session.id}")  # Unique identifier
print(
    f"Application Name (`app_name`): {example_session.app_name}"
)  # Which app it belongs to
print(f"User ID (`user_id`):         {example_session.user_id}")  # Who the user is
print(
    f"State (`state`):           {example_session.state}"
)  # The dynamic 'notes' dictionary
print(
    f"Events (`events`):         {example_session.events}"
)  # The conversation history (initially empty)
print(
    f"Last Update (`last_update_time`): {example_session.last_update_time:.2f}"
)  # When it was last triggered
print(f"---------------------------------")

: 

### Using InMemorySessionService Methods

- Adding event
- Update state with `state_delta`

In [None]:
# Example: Using InMemorySessionService Methods
from google.adk.sessions import InMemorySessionService
from google.adk.events import Event, EventActions
from google.genai import types
import time
import uuid

print("\n--- Demonstrating InMemorySessionService ---")

# 1. Instantiate
session_service = InMemorySessionService()
app_name, user_id = "memory_app", "user_mem"
session_id = "mem_session_1"

# 2. Create Session
current_session = session_service.create_session(
    app_name=app_name, user_id=user_id, session_id=session_id, state={"counter": 0}
)
print(f"Created Session: ID={current_session.id}, State={current_session.state}")

# 3. Append Event with State Delta
user_event = Event(
    invocation_id="inv_1",
    author="user",
    content=types.Content(parts=[types.Part(text="Increment")]),
)
session_service.append_event(current_session, user_event)  # No state change yet

agent_event = Event(
    invocation_id="inv_2",
    author="agent",
    actions=EventActions(state_delta={"counter": 1}),  # Increment counter
)
session_service.append_event(current_session, agent_event)
print(f"Appended Event, state['counter'] should be 1")

# 4. Get Session
retrieved_session = session_service.get_session(
    app_name=app_name, user_id=user_id, session_id=session_id
)
print(f"Retrieved Session: ID={retrieved_session.id}, State={retrieved_session.state}")
print(
    f"Events in session: {len(retrieved_session.events)}"
)  # Shows 2 events were added

# 5. List Sessions
session_list = session_service.list_sessions(app_name=app_name, user_id=user_id)
print(f"List Sessions for {user_id}: {session_list}")

# 6. Delete Session
session_service.delete_session(
    app_name=app_name, user_id=user_id, session_id=session_id
)
print(f"Deleted Session: {session_id}")

# 7. Get Session (should fail)
deleted_session = session_service.get_session(
    app_name=app_name, user_id=user_id, session_id=session_id
)
print(
    f"Retrieve after delete: {'Session found (unexpected!)' if deleted_session else 'Session not found (correct)'}"
)
print("------------------------------------------")

: 

###  Using DatabaseSessionService Methods (with SQLite for demo)

In [None]:
# Example: Using DatabaseSessionService Methods (with SQLite for demo)

# NOTE: Requires `sqlalchemy` to be installed.
# NOTE: This creates a file 'db_sessions_demo.db' in the current directory.
from google.adk.sessions import DatabaseSessionService
from google.adk.events import Event, EventActions
from google.genai import types  # Make sure types is imported
import time
import uuid
import os  # To manage the demo database file

print("\n--- Demonstrating DatabaseSessionService (SQLite) ---")
DB_FILE = "./db_sessions_demo.db"  # Define path for the database file
DB_DIR = os.path.dirname(DB_FILE)  # Get directory path

# Ensure the directory exists (useful if DB_FILE includes subdirectories)
if DB_DIR and not os.path.exists(DB_DIR):
    os.makedirs(DB_DIR)
    print(f"Created directory: {DB_DIR}")

# Remove the database file if it exists from a previous run for a clean demo
if os.path.exists(DB_FILE):
    os.remove(DB_FILE)
    print(f"Removed existing demo DB file: {DB_FILE}")

# 1. Instantiate (using SQLite file)
# The DatabaseSessionService's __init__ method will handle DB/table creation.
db_service = DatabaseSessionService(db_url=f"sqlite:///{DB_FILE}")
print(f"Instantiated DatabaseSessionService. DB file '{DB_FILE}' ensured/created.")

APP_DB, USER_DB = "db_app", "user_db"
SESSION_ID_DB = "db_session_1"

# 2. Create Session
session_db = db_service.create_session(
    app_name=APP_DB, user_id=USER_DB, session_id=SESSION_ID_DB, state={"status": "new"}
)
print(f"Created Session: ID={session_db.id}, State={session_db.state}")

# 3. Append Event with State Delta
# *** FIX: Ensure event has a 'content' object, even if minimal ***
event_db_1 = Event(
    invocation_id="inv_db1",
    author="agent",
    content=types.Content(
        parts=[types.Part(text="System update: Processing")]
    ),  # Add content
    actions=EventActions(state_delta={"status": "processing", "db_key": "db_val"}),
)
# Note: append_event updates the state in the DB and the passed session's last_update_time
db_service.append_event(session_db, event_db_1)
print(f"Appended Event, state should be updated in the database.")

# 4. Get Session (re-fetch from DB to see persisted changes)
# Note: Must re-fetch session to see DB changes reflected in the object state
retrieved_session_db = db_service.get_session(
    app_name=APP_DB, user_id=USER_DB, session_id=SESSION_ID_DB
)
print(
    f"Retrieved Session: ID={retrieved_session_db.id}, State={retrieved_session_db.state}"
)
# Note: Events are not automatically loaded by get_session in this implementation by default.

# 5. List Sessions
sessions_list_db = db_service.list_sessions(app_name=APP_DB, user_id=USER_DB)
print(f"List Sessions for {USER_DB}: {sessions_list_db}")

# 6. List Events (Not Implemented in DatabaseSessionService base implementation)
try:
    db_service.list_events(app_name=APP_DB, user_id=USER_DB, session_id=SESSION_ID_DB)
except NotImplementedError as e:
    print(f"List Events: Received NotImplementedError (as expected in base class)")
except AttributeError as e:
    print(
        f"List Events: Method not found or not implemented (as expected in base class)"
    )

# 7. Delete Session
db_service.delete_session(app_name=APP_DB, user_id=USER_DB, session_id=SESSION_ID_DB)
print(f"Deleted Session: {SESSION_ID_DB}")

# 8. Get Session (should fail)
deleted_session_check_db = db_service.get_session(
    app_name=APP_DB, user_id=USER_DB, session_id=SESSION_ID_DB
)
print(
    f"Retrieve after delete: {'Session found (unexpected!)' if deleted_session_check_db else 'Session not found (correct)'}"
)

# Cleanup demo file (optional, good practice for demos)
# if os.path.exists(DB_FILE):
#     os.remove(DB_FILE)
#     print(f"Cleaned up demo DB file: {DB_FILE}")
print("--------------------------------------------")

: 

### LlMAgent with Antrhopic (3rd Party Model)

In [None]:
import os
import asyncio
import warnings

# --- GCP/Vertex AI Configuration ---
# Make sure these are set in your environment OR uncomment and set here
# os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project-id"
os.environ["GOOGLE_CLOUD_LOCATION"] = (
    "us-east5"  # Make sure its us-east5 or europe-west1
)

# --- ADK Imports ---
from google.adk.agents import Agent, LlmAgent  # Using LlmAgent directly
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from google.adk.models.anthropic_llm import Claude
from google.adk.models.registry import LLMRegistry

# Manually register the Claude model class with the registry
# This step is crucial if the framework doesn't do it automatically
LLMRegistry.register(Claude)


# --- Constants ---
APP_NAME = "anthropic_capital_app"
USER_ID = "anthropic_user"
SESSION_ID = "anthropic_session_1"
AGENT_NAME = "claude_capital_agent"
# --- Anthropic Model Name (via Vertex AI) ---
# Use the model identifier available in your Vertex AI region.
# Example: Claude 3.5 Sonnet. Check Vertex AI documentation for available models.
ANTHROPIC_MODEL = "claude-3-7-sonnet@20250219"
# You might need to adjust this based on availability in your specific GCP project/location.
# Other possibilities: "claude-3-haiku@...", "claude-3-opus@..."

# --- Agent Definition ---
# Simplest agent: takes user input, uses the LLM directly to answer.
capital_agent = LlmAgent(
    model=ANTHROPIC_MODEL,  # Specify the Anthropic model identifier
    name=AGENT_NAME,
    instruction="You are a helpful assistant. When asked for the capital of a country, provide only the name of the capital city.",
    description="An agent that provides the capital city of a country using an Anthropic model.",
    # No tools needed for this simple task
    tools=[],
)

# --- Session and Runner Setup ---
session_service = InMemorySessionService()
# Ensure session is created before running
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=capital_agent, app_name=APP_NAME, session_service=session_service)


# --- Agent Interaction Logic ---
# Using async version as it's preferred
async def call_agent_async(query):
    print(f"\nUser Query: {query}")
    content = types.Content(role="user", parts=[types.Part(text=query)])
    final_response_text = "Agent did not produce a final response."
    try:
        async for event in runner.run_async(
            user_id=USER_ID, session_id=SESSION_ID, new_message=content
        ):
            if event.is_final_response() and event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
                print(f"Agent Response: {final_response_text}")
                # Break after final response for simplicity in this example
                break
            elif event.error_message:
                final_response_text = f"Agent Error: {event.error_message}"
                print(final_response_text)
                break
    except Exception as e:
        print(f"An error occurred during agent execution: {e}")
        final_response_text = f"Execution Error: {e}"

    return final_response_text


# --- Example Usage ---
async def run_example():
    print("--- Running Anthropic Agent Example ---")
    print(f"Using model: {ANTHROPIC_MODEL}")
    print(
        f"Make sure GCP Project '{os.environ['GOOGLE_CLOUD_PROJECT']}' and Location '{os.environ['GOOGLE_CLOUD_LOCATION']}' are correct and the model is available there."
    )
    print(
        "Ensure you are authenticated with GCP (e.g., `gcloud auth application-default login`)."
    )

    await call_agent_async("What is the capital of France?")
    await call_agent_async("What's the capital of Japan?")
    await call_agent_async("Tell me the capital of Germany.")
    print("--- Example Finished ---")


await run_example()

: 

### Artifact Service

In [None]:
def before_model_callback(callback_context, llm_request):
    print(
        f"Before Model Callback: Agent {callback_context._invocation_context.agent.name}, Request: {llm_request.contents}"
    )
    return None


def after_model_callback(callback_context, llm_response):
    print(
        f"After Model Callback: Agent {callback_context._invocation_context.agent.name}, Response: {llm_response.content}"
    )
    return None


def before_tool_callback(tool, args, tool_context):
    print(f"Before Tool Callback: Tool {tool.name}, Args: {args}")
    return None


def after_tool_callback(tool, args, tool_context, tool_response):
    print(f"After Tool Callback: Tool {tool.name}, Response: {tool_response}")
    return None


def before_agent_callback(callback_context):
    print(
        f"Before Agent Callback: Agent {callback_context._invocation_context.agent.name}"
    )
    return None


def after_agent_callback(callback_context):
    print(
        f"After Agent Callback: Agent {callback_context._invocation_context.agent.name}"
    )
    return None


: 

In [6]:
from google.adk.runners import Runner
from google.adk.artifacts import InMemoryArtifactService
from google.adk.agents import LlmAgent
from google.adk.tools import ToolContext

async def save_dummy_report(tool_context: ToolContext):
    """
    Saves a dummy PDF report as an artifact.
    """

    report_artifact = types.Part.from_bytes(
        data=b"this is dummy content", mime_type="application/pdf"
    )
    filename = "generated_report.pdf"
    print("Generating dummy pdf")
    version = await tool_context.save_artifact(filename=filename, artifact=report_artifact)
    return {"status": "ok", "filename": filename}

async def load_dummy_report(tool_context: ToolContext)->str:
    """
    Loads the dummy PDF report artifact from storage.
    """
    filename = "generated_report.pdf"
    report_artifact = await tool_context.load_artifact(filename=filename)

    if report_artifact and report_artifact.inline_data:
        print(f"MIME Type: {report_artifact.inline_data.mime_type}")
        pdf_bytes = report_artifact.inline_data.data
        print(f"Report size: {len(pdf_bytes)} bytes.")
        return {"status": "ok", "filename": filename}
    else:
        return {"status": "ok", "filename":"No artifact found"}

async def save_memory(
    tool_context: ToolContext,
) -> dict:
    """Dummy function to save a dummy pdf"""
    filename = "test.pdf"
    report_artifact = types.Part.from_bytes(
        data=b"this is dummy content", mime_type="application/pdf"
    )
    await tool_context.save_artifact(filename, report_artifact)
    return {"status": "ok", "filename": filename}



In [7]:
from google.adk.sessions import InMemorySessionService
import uuid
from google.adk.runners import Runner

session_service = InMemorySessionService()
artifact_service = InMemoryArtifactService()

# Required. Unique identifier for the application.
APP_NAME = "weather_app"
# Required. Identifier for the user interacting with the agent. This is a dynamic variable.
USER_ID = "12345"

SESSION_ID = f"session_{uuid.uuid4()}"  # Use a dynamic session ID
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)

# Your agent definition
root_agent = LlmAgent(
    name="my_agent",
    model=GEMINI_2_FLASH,
    tools=[save_dummy_report, load_dummy_report],
    instruction="You are a helpful assistant that saves a dummy PDF report as an artifact, and then read it back to user.",
)

runner = Runner(
    agent=root_agent,
    app_name=APP_NAME,
    session_service=session_service,
    artifact_service=artifact_service,
)


def call_agent(user_query):
    content = types.Content(role="user", parts=[types.Part(text=user_query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


# call_agent("load and retrieve the dummy report")
call_agent("generating save dummy pdf")



Generating dummy pdf
Agent Response:  OK. I have saved a dummy PDF report as an artifact with filename `generated_report.pdf`. Next, please ask me to load the report.



In [8]:
artifact_service.artifacts

{'weather_app/12345/session_533d576a-ff80-4d08-b9a3-9ddc1a8503d2/generated_report.pdf': [Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=Blob(data=b'this is dummy content', mime_type='application/pdf'), text=None)]}

## Non-LLM Agents

### Multi-Agent - SequenceAgent

In [9]:
import asyncio
from uuid import uuid4
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.invocation_context import (
    InvocationContext,
    new_invocation_context_id,
)
from google.adk.events import Event
from typing_extensions import override
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.sessions.session import Session
from google.genai import types
from typing import AsyncGenerator


class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the SequentialAgent
    sequential_agent = SequentialAgent(
        name="SequentialAgent", sub_agents=[agent_a, agent_b]
    )

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=sequential_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the SequentialAgent
    async for event in sequential_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Finishing...


### Passing state between Children

In [10]:
import asyncio
from uuid import uuid4
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.invocation_context import (
    InvocationContext,
    new_invocation_context_id,
)
from google.adk.events import Event
from typing_extensions import override
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.sessions.session import Session
from google.genai import types
from typing import AsyncGenerator


class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        # Set a value in the session state
        ctx.session.state["agent_a_value"] = "Hello from Agent A!"
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        # Retrieve the value from the session state
        agent_a_value = ctx.session.state.get("agent_a_value")
        yield Event(
            author="AgentB",
            content=types.Content(
                parts=[types.Part(text=f"Agent B: Received value: {agent_a_value}")]
            ),
        )
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the SequentialAgent
    sequential_agent = SequentialAgent(
        name="SequentialAgent", sub_agents=[agent_a, agent_b]
    )

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=sequential_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the SequentialAgent
    async for event in sequential_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Received value: Hello from Agent A!
Event: AgentB: Agent B: Finishing...


### Sequence with LLMAgent and simple runner

In [11]:
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.llm_agent import LlmAgent
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner


APP_NAME = "sequential_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "sequential_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"


agent_a = LlmAgent(
    name="AgentA",
    model=GEMINI_2_FLASH,
    instruction="You are Agent A. Respond with 'Agent A: Starting...'",
    output_key="agent_a",
)

agent_b = LlmAgent(
    name="AgentB",
    model=GEMINI_2_FLASH,
    instruction="You are Agent B. Respond with 'Agent B: Starting...'",
)

# Create the SequentialAgent
sequential_agent = SequentialAgent(
    name="SequentialAgent", sub_agents=[agent_a, agent_b]
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(
    agent=sequential_agent, app_name=APP_NAME, session_service=session_service
)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


call_agent("execute")

Agent Response:  Agent A: Starting...

Agent Response:  Agent B: Starting...



## Multi-Agent - LoopAgent

### LoopAgent with Simple Runner

In [12]:
from google.adk.agents.loop_agent import LoopAgent
from google.adk.agents.llm_agent import LlmAgent
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner


APP_NAME = "loop_app"
USER_ID = "12345"
SESSION_ID = "123344"
AGENT_NAME = "loop_agent"
GEMINI_2_FLASH = "gemini-2.0-flash-001"

agent_a = LlmAgent(
    name="AgentA",
    model=GEMINI_2_FLASH,
    instruction="You are Agent A. Respond with 'Agent A: Starting...'",
)

agent_b = LlmAgent(
    name="AgentB",
    model=GEMINI_2_FLASH,
    instruction="You are Agent B. Respond with 'Agent B: Starting...'",
)

# Create the LoopAgent
loop_agent = LoopAgent(
    name="LoopAgent", sub_agents=[agent_a, agent_b], max_iterations=3
)


# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
runner = Runner(agent=loop_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)


call_agent("execute")

Agent Response:  Agent A: Starting...

Agent Response:  Agent B: Starting...

Agent Response:  Agent A: Acknowledged. Agent B has started.

Agent Response:  Agent B: Acknowledged.

Agent Response:  Agent A: Acknowledged. Agent B has acknowledged.

Agent Response:  Agent B: Acknowledged.



### LoopAgent with InnvocationContext

In [13]:
from google.adk.agents.loop_agent import LoopAgent


class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the LoopAgent
    loop_agent = LoopAgent(
        name="LoopAgent", sub_agents=[agent_a, agent_b], max_iterations=2
    )

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=loop_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the LoopAgent
    async for event in loop_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Finishing...
Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Finishing...


### Stop Condition with EventActions=Escalation

In [14]:
from google.adk.agents.loop_agent import LoopAgent
from google.adk.events.event_actions import EventActions


class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
            actions=EventActions(escalate=True),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the LoopAgent
    loop_agent = LoopAgent(
        name="LoopAgent", sub_agents=[agent_a, agent_b], max_iterations=10
    )

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=loop_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the LoopAgent
    async for event in loop_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Finishing...


### Escalation with Condition - defined in state

In [15]:
class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )

        # Example condition: Escalate if session state has a key 'escalate_agent_b'
        escalate = ctx.session.state.get("escalate_agent_b", False)

        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
            actions=EventActions(escalate=escalate),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the LoopAgent
    loop_agent = LoopAgent(
        name="LoopAgent", sub_agents=[agent_a, agent_b], max_iterations=3
    )

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=loop_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Example: Set a condition to escalate
    ctx.session.state["escalate_agent_b"] = True

    # Run the LoopAgent
    async for event in loop_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Starting...
Event: AgentB: Agent B: Finishing...


## ParallelAgent

### Simple

In [16]:
from google.adk.agents.parallel_agent import ParallelAgent


class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        await asyncio.sleep(1)
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        await asyncio.sleep(2)
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Finishing...")]),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the ParallelAgent
    parallel_agent = ParallelAgent(name="ParallelAgent", sub_agents=[agent_a, agent_b])

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=parallel_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the ParallelAgent
    async for event in parallel_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentA: Agent A: Starting...
Event: AgentB: Agent B: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Finishing...


### Shared State

In [17]:
class AgentA(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Starting...")]),
        )
        await asyncio.sleep(1)
        ctx.session.state["shared_data"] = "Data from Agent A"
        yield Event(
            author="AgentA",
            content=types.Content(parts=[types.Part(text="Agent A: Finishing...")]),
        )


class AgentB(BaseAgent):
    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            author="AgentB",
            content=types.Content(parts=[types.Part(text="Agent B: Starting...")]),
        )
        await asyncio.sleep(2)
        shared_data = ctx.session.state.get("shared_data", "No data from Agent A")
        yield Event(
            author="AgentB",
            content=types.Content(
                parts=[
                    types.Part(text=f"Agent B: Finishing... Received: {shared_data}")
                ]
            ),
        )


async def main():
    # Create a session
    session_service = InMemorySessionService()
    session: Session = session_service.create_session(
        app_name="test_app", user_id="test_user"
    )

    agent_a = AgentA(name="AgentA")
    agent_b = AgentB(name="AgentB")

    # Create the ParallelAgent
    parallel_agent = ParallelAgent(name="ParallelAgent", sub_agents=[agent_a, agent_b])

    # Create InvocationContext
    ctx = InvocationContext(
        invocation_id=new_invocation_context_id(),
        session_service=session_service,
        session=session,
        agent=parallel_agent,
        user_content=types.Content(parts=[types.Part(text="execute")]),
    )

    # Run the ParallelAgent
    async for event in parallel_agent.run_async(ctx):
        if event.content and event.content.parts:
            print(f"Event: {event.author}: {event.content.parts[0].text}")


await main()

Event: AgentB: Agent B: Starting...
Event: AgentA: Agent A: Starting...
Event: AgentA: Agent A: Finishing...
Event: AgentB: Agent B: Finishing... Received: Data from Agent A


## CustomAgent

In [18]:
class CustomAgent(BaseAgent):
    """A custom agent that generates a simple text message."""

    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        yield Event(
            invocation_id=ctx.invocation_id,
            author=self.name,
            content=types.Content(
                parts=[types.Part.from_text(text="Hello from Custom Agent!")],
            ),
        )


async def main():
    # Create a custom agent instance
    custom_agent = CustomAgent(name="CustomAgent", description="A custom agent")

    # Create a session service
    session_service = InMemorySessionService()

    # Create a session
    session = session_service.create_session(
        app_name="demo_app", user_id="test_user", session_id="test_session"
    )

    # Create a runner instance
    runner = Runner(
        app_name="demo_app",
        agent=custom_agent,
        session_service=session_service,
    )

    # Run the agent
    async for event in runner.run_async(
        user_id="test_user",
        session_id="test_session",
        new_message=types.Content(
            parts=[types.Part.from_text(text="Hi, how are you?")]
        ),
    ):
        print(f"Event: {event.content.parts[0].text}")


await main()

Event: Hello from Custom Agent!
