<a href="https://colab.research.google.com/github/postak/colazione-con-adk/blob/main/2025_09_Partners_ADK_Learning_session_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

```
Copyright 2025 Google LLC.
SPDX-License-Identifier: Apache-2.0
```

In [None]:
#@title Third Session
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 🚀 Welcome to the Third Session of our Journey! 🚀

Welcome back! This notebook dives into the heart of the Google Agent Development Kit (ADK): orchestrating teams of specialized agents to tackle complex, multi-step problems that a single agent cannot handle alone.

By the end of this session, you will be an expert in advanced agentic workflows:

- **SequentialAgent**: You'll learn to chain agents together, creating pipelines where the output of one agent becomes the input for the next.

- **LoopAgent**: You'll build iterative systems where agents can plan, critique, and refine their work until a specific goal is met, making them "perfectionists."

- **ParallelAgent**: You'll unleash efficiency by running multiple agents simultaneously and then synthesizing their collective findings into a single, comprehensive answer.

- **The Router**: You will construct a "master" router agent that intelligently analyzes a user's request and delegates it to the correct agent or workflow.

Let's get this adventure started!


-------------
### 🎁 🛑 Important Prerequisite: Setup Your Environment! 🛑 🎁
-----------------------------------------------------------------------------

You will need a **Google AI API Key** to run this notebook.

👉 Follow the instructions [here](https://github.com/postak/colazione-con-adk/blob/main/Setting%20Up%20Your%20GCP%20Project%20%26%20Gemini%20API%20Key.pdf)


 -----------------------------------------------------------------------------

## Part 0: Setup & Authentication 🔑

First things first, let's get all our tools ready. This step installs the necessary libraries and securely configures your Google API key so your agents can access the power of Gemini.

In [None]:
!pip install google-adk google-generativeai -q

# --- Import all necessary libraries for our entire adventure ---
import os
import re
import asyncio
from IPython.display import display, Markdown
import google.generativeai as genai
from google.adk.agents import Agent, SequentialAgent, LoopAgent, ParallelAgent
from google.adk.tools import google_search, ToolContext
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.genai.types import Content, Part
from getpass import getpass

print("✅ All libraries are ready to go!")

In [None]:
# --- Securely Configure Your API Key ---

# Prompt the user for their API key securely
api_key = getpass('Enter your Google API Key: ')

# Get Your API Key HERE 👉 https://codelabs.developers.google.com/onramp/instructions#0
# Configure the generative AI library with the provided key
genai.configure(api_key=api_key)

# Set the API key as an environment variable for ADK to use
os.environ['GOOGLE_API_KEY'] = api_key

print("✅ API Key configured successfully! Let the fun begin.")

In [None]:
# --- A Helper Function to Run Our Agents ---
# We'll use this function throughout the notebook to make running queries easy.

async def run_agent_query(agent: Agent, query: str, session: Session, user_id: str, is_router: bool = False):
    """Initializes a runner and executes a query for a given agent and session."""
    print(f"\n🚀 Running query for agent: '{agent.name}' in session: '{session.id}'...")

    runner = Runner(
        agent=agent,
        session_service=session_service,
        app_name=agent.name
    )

    final_response = ""
    try:
        async for event in runner.run_async(
            user_id=user_id,
            session_id=session.id,
            new_message=Content(parts=[Part(text=query)], role="user")
        ):
            if not is_router:
                # Let's see what the agent is thinking!
                print(f"==> EVENT: {str(event)[:300]} \n...\n...\n<output truncated>\n\n")
                print(f"==> EVENT content: {event.content.parts[0].text}\n\n")
            if event.is_final_response():
                final_response = event.content.parts[0].text
    except Exception as e:
        final_response = f"An error occurred: {e}"

    if not is_router:
     print("\n" + "-"*50)
     print(f"✅ Final Response ({agent.name}):")
     display(Markdown(final_response))
     print("-"*50 + "\n")

    return final_response

# --- Initialize our Session Service ---
# This one service will manage all the different sessions in our notebook.
session_service = InMemorySessionService()
my_user_id = "adk_adventurer_001"

---
## Part1: Sequential Workflows 🧠→🤖→🤖

Some tasks are too complex for one agent. A user might ask, "Find me a great restaurant and then tell me how to get there." This requires two different skills: food recommendation and navigation.


---
### 1.1 Manual Sequential Workflows

```
+------------------+    +-------------------------+
| foodie_agent 🍣  |    | transportation_agent 🚗 |
| Finds place      | -->| Uses {destination}      |
| Output: 'Jin Sho'|    | Output: Directions      |
+------------------+    +-------------------------+
```

In [None]:
# --- Agent Definitions for our Specialist Team ---
# --- Agent Definition ---

foodie_agent = Agent(
    name="foodie_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="You are an expert food critic. Your goal is to find the absolute best food, restaurants, or culinary experiences based on a user's request. When you recommend a place, state its name clearly. For example: 'The best sushi is at **Jin Sho**.'"
)


transportation_agent = Agent(
    name="transportation_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="You are a navigation assistant. Given a starting point and a destination, provide clear directions on how to get from the start to the end."
)



In [None]:
# The "Hard Way" - Corrected to Print the Final Output
async def run_manual_app():
    query = "Find me the best sushi in Palo Alto and then tell me how to get there from the Caltrain station."

    # 1. Run the first agent
    print("➡️ Running foodie_agent to find the destination...")
    foodie_session = await session_service.create_session(app_name=foodie_agent.name, user_id=my_user_id)
    foodie_response = await run_agent_query(foodie_agent, query, foodie_session, my_user_id)

    # 2. Manually parse the output
    print("...parsing response to find the name...")
    match = re.search(r'\*\*(.*?)\*\*', foodie_response)
    if not match:
        print("🚨 Could not determine the restaurant name.")
        return # Exit if parsing fails
    destination = match.group(1)
    print(f"💡 Destination found: {destination}")

    # 3. Manually create a new query for the next agent
    directions_query = f"Give me directions to {destination} from the Palo Alto Caltrain station."
    print(f"➡️ Running transportation_agent with new query: '{directions_query}'")

    # 4. Run the second agent with the new query
    transport_session = await session_service.create_session(app_name=transportation_agent.name, user_id=my_user_id)

    # --- ✨ CORRECTION HERE ---
    # 5. Run the second agent and CAPTURE the final output
    final_directions = await run_agent_query(transportation_agent, directions_query, transport_session, my_user_id)

    # 6. Print the final output
    print("\n--- ✅ Final Response from Manual Workflow ---")
    print(final_directions)
    print("-----------------------------------------")

await run_manual_app()

---
### 1.2 (The ADK Way): Multi-Agent Mayhem with `SequentialAgent`

You've seen how to manually link agents together with custom Python code. It works, but it can get complicated. Now, let's refactor our workflow to use a powerful, built-in ADK feature designed specifically for this: the **`SequentialAgent`**.

The `SequentialAgent` is a *workflow agent*. It's not powered by an LLM itself; instead, its only job is to execute a list of other agents in a strict, predefined order.

The real magic ✨ is how it passes information. The ADK uses a shared `state` dictionary that each agent in the sequence can read from and write to.

**Our New Workflow:**

1.  **Foodie Agent**: Finds the restaurant and saves the name to `state['destination']`.
2.  **Transportation Agent**: Automatically reads `state['destination']` and uses it to find directions.

This means we no longer need custom Python code to extract text or build new queries! The ADK handles the plumbing for us.

```
+-------------------------------+
|  find_and_navigate_agent 🧭   |
| SequentialAgent:              |
| 1. Find destination           |
| 2. Get directions             |
+---------------+---------------+
                |
                |
                v
      +---------+--------+   
      | foodie_agent 🍣  |   
      | Finds place      |   
      | Output: 'Jin Sho'|  
      +---------+--------+   
                |
                |
                v
    +-------------------------+
    | transportation_agent 🚗 |
    | Uses {destination}      |
    | Output: Directions      |
    +-------------------------+

    
Final Output: 🍣 Restaurant + 🚗 Route
```

Step 1: Define the Specialist "Worker" Agents
First, we define our individual specialist agents. The magic lies in two key parameters: output_key and the {placeholder} syntax.

The Information Producer (foodie_agent): This agent's job is to find a piece of information. We'll add an output_key to tell the framework where to save its final answer.

The Information Consumer (transportation_agent): This agent's job is to use the information from the first agent. We'll use a {placeholder} in its instructions that matches the output_key from the producer.



Now, we define the SequentialAgent. We simply list the worker agents in the exact order we want them to run. The framework handles the rest.

In [None]:
# --- Agent Definitions for our Specialist Team (Refactored for Sequential Workflow) ---

# Note the new `output_key` and the more specific instruction.
foodie_agent = Agent(
    name="foodie_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="""You are an expert food critic. Your goal is to find the best restaurant based on a user's request.

    When you recommend a place, you must output *only* the name of the establishment and nothing else.
    For example, if the best sushi is at 'Jin Sho', you should output only: Jin Sho
    """,
    output_key="destination"  # ADK will save the agent's final response to state['destination']
)

# The `{destination}` placeholder is automatically filled by the ADK from the state.
transportation_agent = Agent(
    name="transportation_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="""You are a navigation assistant. Given a destination, provide clear directions.
    The user wants to go to: {destination}.

    Analyze the user's full original query to find their starting point.
    Then, provide clear directions from that starting point to {destination}.
    """,
)

# This agent will run foodie_agent, then transportation_agent, in that exact order.
find_and_navigate_agent = SequentialAgent(
    name="find_and_navigate_agent",
    sub_agents=[foodie_agent, transportation_agent],
    description="A workflow that first finds a location and then provides directions to it."
)

In [None]:
# --- Let's Test the Streamlined Workflow! ---

async def run_sequential_workflow():
    """
    A simplified test function that directly invokes the SequentialAgent.
    """

    # The query contains all the information needed for the entire sequence.
    query = "Find me the best sushi restaurant in Palo Alto, and then tell me how to get there from the downtown Caltrain station."

    print(f"\n{'='*60}\n🗣️  Processing Query: '{query}'\n{'='*60}")
    print(f"🚀 Handing off the entire task to the '{find_and_navigate_agent.name}'...")

    # 1. Create a single session for our sequential agent
    # The session will manage the state (like the 'destination' variable) across the sub-agent calls.
    session = await session_service.create_session(app_name=find_and_navigate_agent.name, user_id=my_user_id)

    # 2. Run the query
    # The SequentialAgent will automatically:
    #   - Call foodie_agent with the query.
    #   - Take its output and save it to the state as `state['destination']`.
    #   - Call transportation_agent, injecting the destination into its prompt.
    #   - Stream the final response from the transportation_agent.
    await run_agent_query(find_and_navigate_agent, query, session, my_user_id)

    print(f"\n--- ✅ '{find_and_navigate_agent.name}' Workflow Complete ---")


# Execute the simplified test
await run_sequential_workflow()

---
## Part 2: Parallel Power with `ParallelAgent` 🧠→⚡️→🤖🤖🤖

What if a user wants to find multiple, unrelated things at once? "Find me a museum, a concert, AND a restaurant." Running these searches one by one is slow and inefficient.

Enter the **`ParallelAgent`**. This workflow agent executes a list of sub-agents *concurrently*, dramatically speeding up tasks that can be performed independently.

**Our New Workflow: The Multi-Researcher**

1.  **Parallel Agent**: Simultaneously runs three specialist agents:
    - `MuseumFinderAgent`: Finds a museum.
    - `ConcertFinderAgent`: Finds a concert.
    - `FoodieAgent`: Finds a restaurant.
2.  **Synthesis Agent**: Once all three parallel searches are complete, this final agent gathers the results (which were saved to the shared `state`) and formats them into a single, neat summary for the user.

This pattern lets us get a lot of work done, fast! 🚀

```
+-------------------------------+
|  sequential_planner_agent ⚡   |
| SequentialAgent:              |
| 1. Run parallel research      |
| 2. Synthesize results         |
+---------------+---------------+
                |
                |  
                v  
+------------------------------+  
| parallel_research_agent ⚡    |
| ParallelAgent:               |
| - museum_finder_agent 🖼️     |   
| - concert_finder_agent 🎵    |  
| - restaurant_finder_agent 🍽️ |
+---------------+--------------+
                |
                |
                v
+---------------+------------+
| synthesis_agent 📋         |
| Combine results            |
| Output: Bulleted summary   |
+----------------------------+

Final Output:
• Museum: XYZ  
• Concert: Artist at Venue  
• Restaurant: ABC
```

In [None]:
# --- Agent Definitions for a Parallel Workflow ---

# Specialist Agent 1
museum_finder_agent = Agent(
    name="museum_finder_agent", model="gemini-2.5-flash", tools=[google_search],
    instruction="You are a museum expert. Find the best museum based on the user's query. Output only the museum's name.",
    output_key="museum_result"
)

# Specialist Agent 2
concert_finder_agent = Agent(
    name="concert_finder_agent", model="gemini-2.5-flash", tools=[google_search],
    instruction="You are an events guide. Find a concert based on the user's query. Output only the concert name and artist.",
    output_key="concert_result"
)

# We can reuse our foodie_agent for the third parallel task!
# Just need to give it a new output_key for this workflow.
# restaurant_finder_agent = foodie_agent.copy(update={"output_key": "restaurant_result"})
restaurant_finder_agent = Agent(
    name="restaurant_finder_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="""You are an expert food critic. Your goal is to find the best restaurant based on a user's request.

    When you recommend a place, you must output *only* the name of the establishment.
    For example, if the best sushi is at 'Jin Sho', you should output only: Jin Sho
    """,
    output_key="restaurant_result" # Set the correct output key for this workflow
)


# ✨ The ParallelAgent runs all three specialists at once ✨
parallel_research_agent = ParallelAgent(
    name="parallel_research_agent",
    sub_agents=[museum_finder_agent, concert_finder_agent, restaurant_finder_agent]
)

# Agent to synthesize the parallel results
synthesis_agent = Agent(
    name="synthesis_agent", model="gemini-2.5-flash",
    instruction="""You are a helpful assistant. Combine the following research results into a clear, bulleted list for the user.
    - Museum: {museum_result}
    - Concert: {concert_result}
    - Restaurant: {restaurant_result}
    """
)

# ✨ The SequentialAgent runs the parallel search, then the synthesis ✨
sequential_planner_agent = SequentialAgent(
    name="sequential_planner_agent",
    sub_agents=[parallel_research_agent, synthesis_agent],
    description="A workflow that finds multiple things in parallel and then summarizes the results."
)

print("🤖 Agent team supercharged with a ParallelAgent workflow!")

In [None]:
# --- Let's Test the Parallel Workflow! ---

async def run_parallel_workflow():
    """
    A test function that invokes the ParallelAgent within a SequentialAgent.
    It demonstrates how to gather different pieces of information at the same time
    and then synthesize them into a single response.
    """

    # 1. Define a query that requires multiple, independent pieces of information.
    # This is a perfect use case for parallel execution!
    query = "I'm planning a trip to San Francisco. Find me the best art museum, a cool rock concert, and the best Italian restaurant."

    print(f"\n{'='*60}\n🗣️  Processing Query: '{query}'\n{'='*60}")
    print(f"🚀 Handing off the entire task to the '{sequential_planner_agent.name}'...")

    # 2. Create a session for the full workflow.
    session = await session_service.create_session(app_name=sequential_planner_agent.name, user_id=my_user_id)

    # 3. Run the query.
    # The ParallelPlannerAgent will automatically:
    #  - Kick off the three finder agents (museum, concert, restaurant) simultaneously. 🚀🚀🚀
    #  - Wait for all of them to complete.
    #  - Store their results in the session state ('museum_result', 'concert_result', 'restaurant_result').
    #  - Pass control to the synthesis_agent, which will have its prompt placeholders filled from the state.
    #  - Stream the final, synthesized response.
    await run_agent_query(sequential_planner_agent, query, session, my_user_id)

    print(f"\n--- ✅ '{sequential_planner_agent.name}' Workflow Complete ---")


# Execute the parallel test
await run_parallel_workflow()

---
## Part 3: Iterative Ideas with `LoopAgent` 🧠→🔁→🤖

Sometimes a task isn't a straight line; it's a loop of refinement. A user might ask for a plan, but have constraints that require checking and re-planning. For this, the ADK provides the **`LoopAgent`**.

The `LoopAgent` executes a sequence of sub-agents repeatedly until a condition is met. This is perfect for workflows involving trial and error, like planning a trip with a tight schedule.

**Our New Workflow: The Perfectionist Planner**

1. **Planner Agent**: Proposes an itinerary (e.g., a museum and a restaurant).
2. **Critic Agent**: Checks the plan against a constraint (e.g., "Is the travel time between these two places less than 45 minutes?").
3. **Refiner Agent**: If the critic finds a problem, this agent takes the feedback and creates a new, improved plan. If the critic is happy, it calls a special `exit_loop` tool to stop the process.

The `LoopAgent` manages this cycle, ensuring we don't get stuck in an infinite loop by setting a `max_iterations` limit.

```
+-------------------------------+
|  iterative_planner_agent 🔁.  |
|                               |
| SequentialAgent:              |
| 1. Propose Plan               |
| 2. Refine in loop (≤ 3 times) |
+---------------+---------------+
                |
                |
                v
        +-------+--------+
        | planner_agent  |
        |                |
        | Propose plan   |
        | e.g., Activity |
        | Restaurant     |
        +-------+--------+
                |
                |
                v
    +-----------+-----------+
    | refinement_loop 🔁    |
    |                       |
    | LoopAgent             |
    | 1. Critic (time check)|
    | 2. Refiner (fix/exit) |
    +-----------------------+


Uses shared state: {current_plan}, {criticism}
Exits when: "Plan is feasible..."

```

In [None]:
# --- Agent Definitions for an Iterative Workflow ---

# A tool to signal that the loop should terminate
COMPLETION_PHRASE = "The plan is feasible and meets all constraints."
def exit_loop(tool_context: ToolContext):
  """Call this function ONLY when the plan is approved, signaling the loop should end."""
  print(f"  [Tool Call] exit_loop triggered by {tool_context.agent_name}")
  tool_context.actions.escalate = True
  return {}

# Agent 1: Proposes an initial plan
planner_agent = Agent(
    name="planner_agent", model="gemini-2.5-flash", tools=[google_search],
    instruction="You are a trip planner. Based on the user's request, propose a single activity and a single restaurant. Output only the names, like: 'Activity: Exploratorium, Restaurant: La Mar'.",
    output_key="current_plan"
)

# Agent 2 (in loop): Critiques the plan
critic_agent = Agent(
    name="critic_agent", model="gemini-2.5-flash", tools=[google_search],
    instruction=f"""You are a logistics expert. Your job is to critique a travel plan. The user has a strict constraint: total travel time must be short.
    Current Plan: {{current_plan}}
    Use your tools to check the travel time between the two locations.
    IF the travel time is over 45 minutes, provide a critique, like: 'This plan is inefficient. Find a restaurant closer to the activity.'
    ELSE, respond with the exact phrase: '{COMPLETION_PHRASE}'""",
    output_key="criticism"
)

# Agent 3 (in loop): Refines the plan or exits
refiner_agent = Agent(
    name="refiner_agent", model="gemini-2.5-flash", tools=[exit_loop],
    instruction=f"""You are a trip planner, refining a plan based on criticism.
    Original Request: {{session.query}}
    Critique: {{criticism}}
    IF the critique is '{COMPLETION_PHRASE}', you MUST call the 'exit_loop' tool.
    ELSE, generate a NEW plan that addresses the critique. Output only the new plan names, like: 'Activity: de Young Museum, Restaurant: Nopa'.""",
    output_key="current_plan"
)

# ✨ The LoopAgent orchestrates the critique-refine cycle ✨
refinement_loop = LoopAgent(
    name="refinement_loop",
    sub_agents=[critic_agent, refiner_agent],
    max_iterations=3
)

# ✨ The SequentialAgent puts it all together ✨
iterative_planner_agent = SequentialAgent(
    name="iterative_planner_agent",
    sub_agents=[planner_agent, refinement_loop],
    description="A workflow that iteratively plans and refines a trip to meet constraints."
)

print("🤖 Agent team updated with an iterative LoopAgent workflow!")

In [None]:
# --- Let's Test the Iterative Workflow! ---

async def run_iterative_workflow():
    """
    A test function that invokes the LoopAgent to iteratively refine a plan
    until it meets a specific constraint.
    """

    # 1. Define a query where the initial, naive plan is likely to fail the constraint.
    # Here, Muir Woods (in Marin County) and San Jose are very far apart,
    # guaranteeing the critic_agent will find the travel time too long.
    query = "Plan a day for me. I want to see the redwoods at Muir Woods and eat at a top-rated Vietnamese restaurant in San Jose."

    print(f"\n{'='*60}\n🗣️  Processing Query: '{query}'\n{'='*60}")
    print(f"🚀 Handing off the task to the '{iterative_planner_agent.name}'...")

    # 2. Create a session.
    session = await session_service.create_session(app_name=iterative_planner_agent.name, user_id=my_user_id)


    # 3. Run the query.
    # The iterative_planner_agent will:
    #  - Call `planner_agent` to create an initial plan (Muir Woods & a San Jose restaurant).
    #  - Start the `refinement_loop`.
    #  - LOOP 1: 🔄
    #    - `critic_agent` checks the long travel time and outputs a critique.
    #    - `refiner_agent` receives the critique and proposes a *new* plan (e.g., Muir Woods & a closer restaurant in Marin).
    #  - LOOP 2: 🔄
    #    - `critic_agent` checks the new, shorter travel time and finds it acceptable. It outputs the completion phrase.
    #    - `refiner_agent` sees the completion phrase and calls the `exit_loop` tool, which escalates and terminates the loop.
    #  - The final, approved plan from the previous step is returned as the result.
    await run_agent_query(iterative_planner_agent, query, session, my_user_id)

    print(f"\n--- ✅ '{iterative_planner_agent.name}' Workflow Complete ---")


# Execute the iterative test
await run_iterative_workflow()

## Part 4: The Agent-as-a-Tool: Consulting a Specialist 🧑‍🍳

Why build one agent that does everything when you can build a **team of specialist agents?** The **Agent-as-a-Tool** pattern allows one agent to delegate a task to another agent.

**Key Concept:** This is different from a sub-agent. When Agent A calls Agent B as a tool, Agent B's response is passed **back to Agent A**. Agent A then uses that information to form its own final response to the user. It's a powerful way to compose complex behaviors from simpler, focused, and reusable agents.

### How It Works

Our top-level agent, the `trip_data_concierge_agent`, acts as the **Orchestrator**. It has two tools at its disposal:

1.  `call_db_agent`: A function that internally calls our `db_agent` to fetch raw data.
2.  `call_concierge_agent`: A function that calls the `concierge_agent`.

The `concierge_agent` itself has a tool: the `food_critic_agent`.

The flow for a complex query is:

1.  **User** asks the `trip_data_concierge_agent` for a hotel and a nearby restaurant.
2.  The **Orchestrator** first calls `call_db_agent` to get hotel data.
3.  The data is saved in `tool_context.state`.
4.  The **Orchestrator** then calls `call_concierge_agent`, which retrieves the hotel data from the context.
5.  The `concierge_agent` receives the request and decides it needs to use its own tool, the `food_critic_agent`.
6.  The `food_critic_agent` provides a witty recommendation.
7.  The `concierge_agent` gets the critic's response and politely formats it.
8.  This final, polished response is returned to the **Orchestrator**, which presents it to the user.

                         +-----------------------------------------------------------+
                         |              🧭 Trip Data Concierge Agent                 |
                         |-----------------------------------------------------------|
                         |  Model: gemini-2.5-flash                                  |
                         |  Description:                                             |
                         |-----------------------------------------------------------|
                         |  🔧 Tools:                                                |
                         |   1. call_db_agent                                        |
                         |   2. call_concierge_agent                                 |
                         +-----------------------------------------------------------+
                                      /                                \
                                     /                                  \
                                    ▼                                    ▼
        +-------------------------------------------+    +---------------------------------------------+
        |            🔧 Tool: call_db_agent         |    |         🔧 Tool: call_concierge_agent       |
        |-------------------------------------------|    |---------------------------------------------|
        | Calls: db_agent                           |    | Calls: concierge_agent                      |
        |                                           |    | Uses data from db_agent for recommendations |
        +-------------------------------------------+    +---------------------------------------------+
                                |                                          |
                                ▼                                          ▼
       +--------------------------------------------+   +------------------------------------------------+
       |              📦 db_agent                   |   |             🤵 concierge_agent                 |
       |--------------------------------------------|   |------------------------------------------------|
       | Model: gemini-2.5-flash                    |   | Model: gemini-2.5-flash                        |
       | Role: Return mock JSON hotel data          |   | Role: Hotel staff that handles user Q&A        |
       +--------------------------------------------+   | Tools:                                         |
                                                        |  - food_critic_agent                           |
                                                        +------------------------------------------------+
                                                                                 |
                                                                                 ▼
                                                       +------------------------------------------------+
                                                       |          🍽️ food_critic_agent                  |
                                                       |------------------------------------------------|
                                                       | Model: gemini-2.5-flash                        |
                                                       | Role: Gives a witty restaurant recommendation  |
                                                       +------------------------------------------------+


In [None]:
import asyncio
from google.adk.tools import ToolContext
from google.adk.tools.agent_tool import AgentTool

# Assume 'db_agent' is a pre-defined NL2SQL Agent
# For this example, we'll create placeholder agents.

db_agent = Agent(
    name="db_agent",
    model="gemini-2.5-flash",
    instruction="You are a database agent. When asked for data, return this mock JSON object: {'status': 'success', 'data': [{'name': 'The Grand Hotel', 'rating': 5, 'reviews': 450}, {'name': 'Seaside Inn', 'rating': 4, 'reviews': 620}]}")

# --- 1. Define the Specialist Agents ---

# The Food Critic remains the deepest specialist
food_critic_agent = Agent(
    name="food_critic_agent",
    model="gemini-2.5-flash",
    instruction="You are a snobby but brilliant food critic. You ONLY respond with a single, witty restaurant suggestion near the provided location.",
)

# The Concierge knows how to use the Food Critic
concierge_agent = Agent(
    name="concierge_agent",
    model="gemini-2.5-flash",
    instruction="You are a five-star hotel concierge. If the user asks for a restaurant recommendation, you MUST use the `food_critic_agent` tool. Present the opinion to the user politely.",
    tools=[AgentTool(agent=food_critic_agent)]
)


# --- 2. Define the Tools for the Orchestrator ---

async def call_db_agent(
    question: str,
    tool_context: ToolContext,
):
    """
    Use this tool FIRST to connect to the database and retrieve a list of places, like hotels or landmarks.
    """
    print("--- TOOL CALL: call_db_agent ---")
    agent_tool = AgentTool(agent=db_agent)
    db_agent_output = await agent_tool.run_async(
        args={"request": question}, tool_context=tool_context
    )
    # Store the retrieved data in the context's state
    tool_context.state["retrieved_data"] = db_agent_output
    return db_agent_output


async def call_concierge_agent(
    question: str,
    tool_context: ToolContext,
):
    """
    After getting data with call_db_agent, use this tool to get travel advice, opinions, or recommendations.
    """
    print("--- TOOL CALL: call_concierge_agent ---")
    # Retrieve the data fetched by the previous tool
    input_data = tool_context.state.get("retrieved_data", "No data found.")

    # Formulate a new prompt for the concierge, giving it the data context
    question_with_data = f"""
    Context: The database returned the following data: {input_data}

    User's Request: {question}
    """

    agent_tool = AgentTool(agent=concierge_agent)
    concierge_output = await agent_tool.run_async(
        args={"request": question_with_data}, tool_context=tool_context
    )
    return concierge_output


# --- 3. Define the Top-Level Orchestrator Agent ---

trip_data_concierge_agent = Agent(
    name="trip_data_concierge",
    model="gemini-2.5-flash",
    description="Top-level agent that queries a database for travel data, then calls a concierge agent for recommendations.",
    tools=[call_db_agent, call_concierge_agent],
    instruction="""
    You are a master travel planner who uses data to make recommendations.

    1.  **ALWAYS start with the `call_db_agent` tool** to fetch a list of places (like hotels) that match the user's criteria.

    2.  After you have the data, **use the `call_concierge_agent` tool** to answer any follow-up questions for recommendations, opinions, or advice related to the data you just found.
    """,
)

print(f"✅ Orchestrator Agent '{trip_data_concierge_agent.name}' is defined and ready.")

In [None]:
# --- Let's test the Trip Data Concierge Agent ---

async def run_trip_data_concierge():
    """
    Sets up a session and runs a query against the top-level
    trip_data_concierge_agent.
    """
    # Create a new, single-use session for this query
    concierge_session = await session_service.create_session(
        app_name=trip_data_concierge_agent.name,
        user_id=my_user_id
    )

    # This query is specifically designed to trigger the full two-step process:
    # 1. Get data from the db_agent.
    # 2. Get a recommendation from the concierge_agent based on that data.
    query = "Find the top-rated hotels in San Francisco from the database, then suggest a dinner spot near the one with the most reviews."
    print(f"🗣️ User Query: '{query}'")

    # We call our existing helper function with the top-level orchestrator agent
    await run_agent_query(trip_data_concierge_agent, query, concierge_session, my_user_id)

# Run the test
await run_trip_data_concierge()

---
## Part 5: Final Step: Updating the Router and Running the App

Now we just have one last thing to do: make our `router_agent` aware of these powerful new workflows! We'll add `iterative_planner_agent` and `parallel_planner_agent` to its list of available options.

Then we can run our app with new queries designed to trigger these advanced, multi-agent workflows.

```
                    +---------------------+
                    |    User Query 🗣️    |
                    +----------+----------+
                               |
                               v
                    +---------------------+
                    |   Router Agent 🤖   |
                    |  (Classify Request) |
                    +----------+----------+
                               |
          +----------------+---------------------+---------------------+--------------------+
          |                |                     |                     |                    |
          v                v                     v                     v                    v
+--------------+  +------------------+  +------------------+  +------------------+  +-----------------+
| foodie_agent |  | find_and_navigate|  | iterative_planner|  | parallel_planner |  | day_trip_agent  |
| 🍣 Food Only |  | 🧭 Seq Workflow  |  | 🔁 Loop Workflow |  | ⚡ Parallel Tasks |  | 🧳 Basic Plan   |
+--------------+  +------------------+  +------------------+  +------------------+  +-----------------+
```

In [None]:
# --- The ULTIMATE Router Agent --- #

# in this approach, there are 2 agents: the "router_agent" return the appropriate
# agent for the specific request, then the suggested agent is manually invoked


from google.adk.agents import LlmAgent

router_agent = Agent(
    name="router_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are a master request router. Your job is to analyze a user's query and decide which of the following agents or workflows is best suited to handle it.
    Do not answer the query yourself, only return the name of the most appropriate choice.

    Available Options:
    - 'foodie_agent': For queries *only* about finding a single food place.
    - 'find_and_navigate_agent': For queries that ask to *first find a place* and *then get directions* to it.
    - 'iterative_planner_agent': For planning a trip with a specific constraint that needs checking, like travel time.
    - 'sequential_planner_agent': For queries that ask to find multiple, independent things at once (e.g., a museum AND a concert AND a restaurant).

    Only return the single, most appropriate option's name and nothing else.
    """
)

# The master dictionary of all our executable agents and workflows
worker_agents = {
    "foodie_agent": foodie_agent, # For simple food queries
    "find_and_navigate_agent": find_and_navigate_agent, # Sequential
    "iterative_planner_agent": iterative_planner_agent, # Loop
    "sequential_planner_agent": sequential_planner_agent,   # Parallel
}

# --- Let's Test Everything! ---

async def run_fully_loaded_app():
    queries = [
        # Test Case 1: Simple Sequential Flow
        "Find me the best sushi in Palo Alto and then tell me how to get there from the Caltrain station.",

        # Test Case 2: Iterative Loop Flow
        "Plan me a day in San Francisco with a museum and a nice dinner, but make sure the travel time between them is very short.",

        # Test Case 3: Parallel Flow
        "Help me plan a trip to SF. I need one museum, one concert, and one great restaurant."
    ]

    for query in queries:
        print(f"\n{'='*60}\n🗣️ Processing New Query: '{query}'\n{'='*60}")

        # 1. Ask the Router Agent to choose the right agent or workflow
        router_session = await session_service.create_session(app_name=router_agent.name, user_id=my_user_id)
        print("🧠 Asking the router agent to make a decision...")
        chosen_route = await run_agent_query(router_agent, query, router_session, my_user_id, is_router=True)
        chosen_route = chosen_route.strip().replace("'", "")
        print(f"🚦 Router has selected route: '{chosen_route}'")

        # 2. Execute the chosen route
        if chosen_route in worker_agents:
            worker_agent = worker_agents[chosen_route]
            print(f"--- Handing off to {worker_agent.name} ---")
            worker_session = await session_service.create_session(app_name=worker_agent.name, user_id=my_user_id)
            await run_agent_query(worker_agent, query, worker_session, my_user_id)
            print(f"--- {worker_agent.name} Complete ---")
        else:
            print(f"🚨 Error: Router chose an unknown route: '{chosen_route}'")

await run_fully_loaded_app()

#### in this approach, there is 1 agents that decide which subagent to call and then "router" the request to it

In [None]:
from google.adk.agents import LlmAgent

billing_agent = LlmAgent(name="Billing", description="Handles billing inquiries.")
support_agent = LlmAgent(name="Support", description="Handles technical support requests.")

coordinator = LlmAgent(
    name="HelpDeskCoordinator",
    model="gemini-2.0-flash",
    instruction="Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",
    description="Main help desk router.",
    # allow_transfer=True is often implicit with sub_agents in AutoFlow
    sub_agents=[billing_agent, support_agent]
)

In [None]:
async def run_coordinator_app():
    queries = [
        "Help me with a billing issue.",
        "Help me with a technical issue."
    ]

    for query in queries:
        # Create a new, single-use session for this query
        coordinator_session = await session_service.create_session(
            app_name=coordinator.name,
            user_id=my_user_id
        )

        print(f"\n{'='*60}\n🗣️ Processing New Query: '{query}'\n{'='*60}")

        # Run the query using the top-level coordinator agent
        await run_agent_query(coordinator, query, coordinator_session, my_user_id)

# Make sure you're inside an async context, or run with asyncio
await run_coordinator_app()


## Part 6: Human-in-the-Loop Pattern



In [None]:
def request_human_approval(amount: float, reason: str) -> dict:
    """
    Pauses the workflow and asks for human approval for an expense.
    This tool simulates a human-in-the-loop interaction by using input().
    """
    print("\n" + "="*60)
    print("🚨 HUMAN ACTION REQUIRED 🚨")
    print(f"An agent is requesting approval for an expense.")
    print(f"  - Amount: ${amount:,.2f}")
    print(f"  - Reason: {reason}")
    print("="*60)

    # Loop until valid input is received
    while True:
        # The input() function pauses execution here!
        decision = input("Please type 'yes' to approve or 'no' to deny: ").lower().strip()
        if decision in ["yes", "no"]:
            print(f"✅ Human has decided: '{decision}'. Resuming workflow...")
            return {"human_decision": decision}
        else:
            print("Invalid input. Please enter 'yes' or 'no'.")

print("🛠️ Human interaction tool is defined.")

In [None]:
# --- Define a clearly-typed function to be used as a tool ---
def save_expense_details(amount: float, reason: str, tool_context: ToolContext):
    """Saves the extracted expense details to the session state."""
    print(f"🛠️ TOOL CALLED: save_expense_details(amount={amount}, reason='{reason}')")
    tool_context.state.update({
        "expense_amount": amount,
        "expense_reason": reason
    })
    return {"status": "success", "message": f"Saved amount: {amount}, reason: {reason}"}


# --- Agent 1: The Expense Preparer ---
# This agent's only job is to parse the user's request and get the data ready.
expense_preparer_agent = Agent(
    name="expense_preparer_agent",
    model="gemini-2.5-flash",
    instruction="""You are an expense report assistant. From the user's plain-text request, extract the dollar amount and the reason for the expense.
    The 'amount' should be a number, ignoring any currency symbols like '$'.
    You MUST call the `save_expense_details` tool with these details.""",
    tools=[save_expense_details],
)


# --- Agent 2: The Approval Requester ---
# This agent's only job is to call the tool that pauses for human input.
approval_requester_agent = Agent(
    name="approval_requester_agent",
    model="gemini-2.5-flash",
    instruction="Your job is to request human approval for the expense. You MUST use the `request_human_approval` tool, passing it the `expense_amount` and `expense_reason` from the session state.",
    tools=[request_human_approval],
    # Save the human's response to the state for the next agent
    output_key="human_decision"
)


# --- Agent 3: The Decision Processor ---
# This agent acts on the human's final decision.
decision_processor_agent = Agent(
    name="decision_processor_agent",
    model="gemini-2.5-flash",
    instruction="You are the final step in the expense process. Read the `{human_decision}` from the state. If the decision was 'yes', respond to the user that their expense report has been approved and submitted. If it was 'no', inform them that the expense report was denied by the manager.",
)


print("✅ Agent team for expense approval is ready.")

In [None]:
# The SequentialAgent ensures our agents run in the correct order.
human_approval_workflow = SequentialAgent(
    name="human_approval_workflow",
    sub_agents=[
        expense_preparer_agent,
        approval_requester_agent,
        decision_processor_agent
    ],
    description="A workflow that prepares an expense report and requires human approval before finalizing."
)

print(f"🤖 Workflow '{human_approval_workflow.name}' is online.")

# --- Let's run the test! ---
async def run_human_in_loop_test():
    """Sets up a session and runs a query that requires human intervention."""

    query = "Hi there, I need to file an expense report for my flight to the Next '25 conference in Las Vegas. The total cost was $782.55."
    print(f"🗣️ User Query: '{query}'")

    # Create a session for this specific job
    approval_session = await session_service.create_session(
        app_name=human_approval_workflow.name,
        user_id=my_user_id
    )

    # Call our helper function. It will pause mid-execution!
    await run_agent_query(
        agent=human_approval_workflow,
        query=query,
        session=approval_session,
        user_id=my_user_id
    )

# When you run this cell, be ready to type "yes" or "no" into the input box!
await run_human_in_loop_test()

## Part 7: Plugins

### 1) Define a Plugin

In [None]:
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.plugins.base_plugin import BasePlugin

class CountInvocationPlugin(BasePlugin):
  """A custom plugin that counts agent and tool invocations."""

  def __init__(self) -> None:
    """Initialize the plugin with counters."""
    super().__init__(name="count_invocation")
    self.agent_count: int = 0
    self.tool_count: int = 0
    self.llm_request_count: int = 0

  async def before_agent_callback(
      self, *, agent: BaseAgent, callback_context: CallbackContext
  ) -> None:
    """Count agent runs."""
    self.agent_count += 1
    print(f"[Plugin] Agent run count: {self.agent_count}")
    print(f"[Plugin] Agent name: {agent.name}")

  async def before_model_callback(
      self, *, callback_context: CallbackContext, llm_request: LlmRequest
  ) -> None:
    """Count LLM requests."""
    self.llm_request_count += 1
    print(f"[Plugin] LLM request count: {self.llm_request_count}")

###  2) Define a Runner with Plugin

In [None]:
# --- A Helper Function to Run Our Agents ---
# We'll use this function throughout the notebook to make running queries easy.
from google.adk.runners import InMemoryRunner
from google.adk import Agent
from google.adk.tools.tool_context import ToolContext
from google.genai import types
import asyncio

async def run_agent_query_with_plugin(agent: Agent, query: str, session: Session, user_id: str, is_router: bool = False):
    """Initializes a runner and executes a query for a given agent and session."""
    print(f"\n🚀 Running query for agent: '{agent.name}' in session: '{session.id}'...")

    runner = Runner(
        agent=agent,
        session_service=session_service,
        app_name=agent.name,
        plugins=[CountInvocationPlugin()],
    )
    final_response = ""
    try:
      async for event in runner.run_async(
        user_id=user_id,
        session_id=session.id,
        new_message=Content(parts=[Part(text=query)], role="user")
      ):
            if not is_router:
                # Let's see what the agent is thinking!
                print(f'** Got event from {event.author}')
            if event.is_final_response():
                final_response = event.content.parts[0].text
    except Exception as e:
        final_response = f"An error occurred: {e}"

    if not is_router:
     print("\n" + "-"*50)
     print("✅ Final Response:")
     display(Markdown(final_response))
     print("-"*50 + "\n")

    return final_response

# --- Initialize our Session Service ---
# This one service will manage all the different sessions in our notebook.
session_service = InMemorySessionService()
my_user_id = "adk_adventurer_001"

### 3) Define a Sequential Agent

In [None]:
# --- Agent Definitions for our Specialist Team (Refactored for Sequential Workflow) ---

# Note the new `output_key` and the more specific instruction.
foodie_agent = Agent(
    name="foodie_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="""You are an expert food critic. Your goal is to find the best restaurant based on a user's request.

    When you recommend a place, you must output *only* the name of the establishment and nothing else.
    For example, if the best sushi is at 'Jin Sho', you should output only: Jin Sho
    """,
    output_key="destination"  # ADK will save the agent's final response to state['destination']
)

# The `{destination}` placeholder is automatically filled by the ADK from the state.
transportation_agent = Agent(
    name="transportation_agent",
    model="gemini-2.5-flash",
    tools=[google_search],
    instruction="""You are a navigation assistant. Given a destination, provide clear directions.
    The user wants to go to: {destination}.

    Analyze the user's full original query to find their starting point.
    Then, provide clear directions from that starting point to {destination}.
    """,
)

# This agent will run foodie_agent, then transportation_agent, in that exact order.
find_and_navigate_agent = SequentialAgent(
    name="find_and_navigate_agent",
    sub_agents=[foodie_agent, transportation_agent],
    description="A workflow that first finds a location and then provides directions to it."
)

###  4) Run the Sequential Agent with Plugin

In [None]:
# --- Let's Test the Streamlined Workflow! ---

async def run_sequential_workflow():
    """
    A simplified test function that directly invokes the SequentialAgent.
    """

    # The query contains all the information needed for the entire sequence.
    query = "Find me the best sushi restaurant in Palo Alto, and then tell me how to get there from the downtown Caltrain station."

    print(f"\n{'='*60}\n🗣️  Processing Query: '{query}'\n{'='*60}")
    print(f"🚀 Handing off the entire task to the '{find_and_navigate_agent.name}'...")

    # 1. Create a single session for our sequential agent
    # The session will manage the state (like the 'destination' variable) across the sub-agent calls.
    session = await session_service.create_session(app_name=find_and_navigate_agent.name, user_id=my_user_id)

    # 2. Run the query
    # The SequentialAgent will automatically:
    #   - Call foodie_agent with the query.
    #   - Take its output and save it to the state as `state['destination']`.
    #   - Call transportation_agent, injecting the destination into its prompt.
    #   - Stream the final response from the transportation_agent.
    await run_agent_query_with_plugin(find_and_navigate_agent, query, session, my_user_id)

    print(f"\n--- ✅ '{find_and_navigate_agent.name}' Workflow Complete ---")


# Execute the simplified test
await run_sequential_workflow()