# ADK 101

ADK Documents 
https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/use/adk



## Part 0: Environment Setup


In [None]:
!pip install -r requirements.txt

### Restart current runtime
To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which will restart the current kernel.
<div class="alert alert-block alert-success"> 
<b>NOTE:</b> Only restart the current runtime if you installed libraries. If you did not install new libraries, you do not need to restart the kernel.
</div>

In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython
import time

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Part 1: Import neccessary libraries

In [None]:
import os

# Cloud project id.
PROJECT_IDS = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_IDS[0]  # @param {type:"string"}

if not PROJECT_ID:
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = "us-central1" # @param {type:"string"}

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE" # Use Vertex AI API
# [your-project-id]

# Print project and location details
print(f"Project ID:", PROJECT_ID)
print(f"Project Region:", LOCATION)

In [None]:
from google.cloud import storage

client = storage.Client()

GCS_BUCKET_LOCATION = LOCATION
UNIQUE_PREFIX = 'ly' #"<REPLACE_WITH_PREFIX>"

GCS_BUCKET_NAME = f"{PROJECT_ID}-{UNIQUE_PREFIX}"
GCS_BUCKET_URI = f"gs://{GCS_BUCKET_NAME}"

bucket = storage.Bucket(client, GCS_BUCKET_NAME)

if bucket.exists()==False:
    # Create a Cloud Storage Bucket
    !gcloud storage buckets create $GCS_BUCKET_URI --location=$GCS_BUCKET_LOCATION

else:    
    print(f"\n{GCS_BUCKET_NAME} already exists.")
    
def gcs_file(blob_name):
    return bucket.blob(blob_name)


In [None]:
import vertexai
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import AdkApp

LOCATION = "us-central1" #@param {type:"string"}
STAGING_BUCKET = GCS_BUCKET_URI

vertexai.init(
    project=PROJECT_ID,
    location=LOCATION,
    staging_bucket=STAGING_BUCKET,
)

In [None]:
import random
import datetime
import re

from google.adk.runners import InMemoryRunner, Runner
from google.adk.agents import BaseAgent, LlmAgent, Agent, SequentialAgent, LoopAgent, ParallelAgent
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools import ToolContext, LongRunningFunctionTool
from google.adk.tools.agent_tool import AgentTool
from google.adk.sessions import Session
from google.adk.events import Event
from google.adk.agents.callback_context import CallbackContext
from google.adk.sessions import InMemorySessionService

from google.genai import types

from pydantic import BaseModel, Field
from typing import List, Optional
import logging

logging.basicConfig(level=logging.ERROR)

## Part 2: Utility Functions

In [None]:
import json
import time

def pprint_events(events):
    '''Pretty print of events generated by ADK runner'''
    start_time = time.time()

    for _, event in enumerate(events):
        is_final_response = event.is_final_response()
        function_calls = event.get_function_calls()
        function_responses = event.get_function_responses()

        try:
            agent_res = json.loads(event.content.model_dump_json(indent=2, exclude_none=True))
        except AttributeError as e:
            print(f"Error parsing event content: {e}")
            continue

        if is_final_response:
            final_response = event.content.parts[0].text if event.content.parts else "No content available"
            elapsed_time_ms = round((time.time() - start_time) * 1000, 3)
            print(f'>>> Final Response ({elapsed_time_ms} ms):\n{final_response}')
            print("-" * 30)
        elif function_calls:
            print('+++ Function Calls:')
            for function_call in function_calls:
                print(f"Function Name: {function_call.name}, Args: {function_call.args}")
        elif function_responses:
            print('--- Function Responses:')
            for function_response in function_responses:
                response_details = function_response.response
                recommended_list = list(response_details.values()) if response_details else []
                print(f"Function Name: {function_response.name}")
                print(f"Function Results: {json.dumps(recommended_list)}")
        else:
            print('No function calls or responses available.')
            print(f"Agent Response: {agent_res}")

    elapsed_time_ms = round((time.time() - start_time) * 1000, 3)
    print(f"Total elapsed time: {elapsed_time_ms} ms")
    
# Agent Interaction
def call_agent(runner, query, user_id="user12345", session_id="session12345"):
    content = types.Content(role='user', parts=[types.Part(text=query)])
    events = runner.run(user_id=user_id, session_id=session_id, new_message=content)
    return events


In [None]:
from typing import Optional, Any

APP_NAME = 'test_app'
USER_ID = 'test_user'

def create_runner(agent: BaseAgent):
    return InMemoryRunner(agent, app_name=APP_NAME)

def _content_to_text(content: types.Content | None) -> str:
    if not content or not content.parts:
        return ''
    return ''.join([p.text or '' for p in content.parts])

def print_event(event: Event):
    print(f'Author: {event.author}')
    print(f'Content Text: {_content_to_text(event.content)}')
    print(f'Event: {event.model_dump(exclude_none=True, exclude_defaults=True)}')

async def run_session(new_message: types.Content, *, runner: Runner, session: Optional[Session] = None, state: Optional[dict[str, Any]] = None) -> Session:
    if session is None:
        session = await runner.session_service.create_session(app_name=APP_NAME, user_id=USER_ID, state=state)
        
    print(f'User: {_content_to_text(new_message)}')
    print('----------------------------------')
    async for e in runner.run_async(user_id=USER_ID, session_id=session.id, new_message=new_message):
        print_event(e)
    print('----------------------------------')
    
    session = runner.session_service.get_session(app_name=session.app_name, user_id=session.user_id, session_id=session.id)
    return session

def content_text(msg: str) -> types.Content:
    return types.UserContent(parts=[types.Part(text=msg)])

## Part 3: Start Building Agents!

## Let's Start with the Agent concepts

### Key Building Block of Agents:

- Model: The LLM that orchestrate the Agent execution (like a Brain!)
- Description: A high level description of what is the goal of the agent
- Instruction: Detailed instructions on how the agent is supposed to perform task
- Tool (Optional): Tools that give Agent access and ability beyond what's within the LLM

### Example 1: A basic Hello World Agent

An agent that says hello word in a random language

In [None]:
MODEL = "gemini-2.5-flash"

In [None]:
hello_world_agent = Agent(
    model=MODEL,
    name="hello_world_agent",
    description="An agent that says 'hello world'",
    instruction="""You always say 'hello world' to the user, and nothing else.
    Output 'hello world' in a random language.
    Put the language in brackets.

    Example Output 1:
    hello world (English)

    Example Output 2:
    你好，世界 (Chinese)
    """,
)

To run the agent, we need a service to handle user sessions
- Session - Maintains the state of one invocation. (chat log / events/ etc). The Agent Framework handles session management for you, allowing you to focus on building your agent's logic. You can use the provided BaseSession interface with managed storage or easily plug in your own. The InMemorySession is available for rapid development and debugging
- Runner - The orchestration layer of agents. The "state machine". Runner drives agents to move the states of a session forward.

In [None]:
# Constant
APP_NAME = "hello_world_example"
USER_ID = "user12345"
SESSION_ID = "session12345"
AGENT_NAME = "hello_word_agent"

In [None]:
# Session and Runner
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=hello_world_agent, app_name=APP_NAME, session_service=session_service)

In [None]:
events = call_agent(runner, "hello")
pprint_events(events)

In [None]:
#session.id

In [None]:
#await session_service.list_sessions(app_name=APP_NAME, user_id=USER_ID)



### Example 2: Now, let's build an agent that can have multi-turn conversation (with historical context!)

The session service also enable the agent to "remember" your conversation context, so you don't have to repeat.
Let's build another agent that is able to leverage this capability

Use case:

- An agent that tries to know your name and send hello USERNAME

In [None]:
# Constants
AGENT_NAME = "hello_name_agent"

In [None]:
# Agent
hello_name_agent = Agent(
    model=MODEL,
    name=AGENT_NAME,
    description="An agent that says 'hello USERNAME'",
    instruction="""
    You need to first ask the user's name.
    Try best to convince the user to give you a name, let it be first name, last name, or nick name.
    Once you get the user's name, say 'hello USERNAME'.
    """,
)

In [None]:
session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=hello_name_agent, app_name=APP_NAME, session_service=session_service)

In [None]:
# Test the agent
events = call_agent(runner, "I don't tell you my name")
pprint_events(events)

In [None]:
events = call_agent(runner, "I don't tell you my name")
pprint_events(events)

In [None]:
events = call_agent(runner, "what is your name?")
pprint_events(events)

In [None]:
events = call_agent(runner, "my name is Yuan")
pprint_events(events)

In [None]:
events = call_agent(runner, "what is my name?")
pprint_events(events)

#### Switch session

In [None]:
new_session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID + '-new')

In [None]:
SESSION_ID = SESSION_ID + '-new'

In [None]:
events = call_agent(runner, "what is my name?", session_id=SESSION_ID)
pprint_events(events)

### Example 3: Let's now start using Tool

**Function Tools:** To extend your agent's capabilities, you can provide it with Python functions as tools. When the agent determines it needs to perform a specific action, it can automatically call these functions

Concept:
- Tool (Python function) -- the experience of tool calling by agent (by LLM) or by human (from Python) are the same. Easy to test.

Use case:
- Simple math

In [None]:
def add(numbers: list[int]) -> int:
  """Calculates the sum of a list of integers.

    This function takes a list of integers as input and returns the sum of all
    the elements in the list.  It uses the built-in `sum()` function for
    efficiency.

    Args:
        numbers: A list of integers to be added.

    Returns:
        The sum of the integers in the input list.  Returns 0 if the input
        list is empty.

    Examples:
        add([1, 2, 3]) == 6
        add([-1, 0, 1]) == 0
        add([]) == 0
    """
  return sum(numbers)

def subtract(numbers: list[int]) -> int:
    """Subtracts numbers in a list sequentially from left to right.

    This function performs subtraction on a list of integers, applying the
    subtraction operation from left to right.  For example, given the list
    [10, 2, 5], the function will calculate 10 - 2 - 5.

    Args:
        numbers: A list of integers to be subtracted.

    Returns:
        The result of the sequential subtraction as an integer. Returns 0 if the input list is empty.

    Examples:
        subtract([10, 2, 5]) == 3  # (10 - 2) - 5 = 8 - 5 = 3
        subtract([10, 2]) == 8      # 10 - 2 = 8
        subtract([]) == 0
    """
    if not numbers:
        return 0  # Handle empty list
    result = numbers[0]
    for num in numbers[1:]:
        result -= num
    return result

def multiply(numbers: list[int]) -> int:
  """Calculates the product of a list of integers.

    This function takes a list of integers as input and returns the product of all
    the elements in the list. It iterates through the list, multiplying each
    number with the accumulated product.

    Args:
        numbers: A list of integers to be multiplied.

    Returns:
        The product of the integers in the input list. Returns 1 if the input
        list is empty.

    Examples:
        multiply([2, 3, 4]) == 24  # 2 * 3 * 4 = 24
        multiply([1, -2, 5]) == -10 # 1 * -2 * 5 = -10
        multiply([]) == 1
    """
  product = 1
  for num in numbers:
    product *= num
  return product

def divide(numbers: list[int]) -> float:  # Use float for division
    """Divides numbers in a list sequentially from left to right.

    This function performs division on a list of integers, applying the division
    operation from left to right.  For example, given the list [10, 2, 5], the
    function will calculate 10 / 2 / 5.

    Args:
        numbers: A list of integers to be divided.

    Returns:
        The result of the sequential division as a float.

    Raises:
        ZeroDivisionError: If any number in the list *after* the first element
                           is zero, a ZeroDivisionError is raised.  Division by
                           zero is not permitted.

    Returns:
        float: The result of the division. Returns 0.0 if the input list is empty.

    Examples:
        divide([10, 2, 5]) == 1.0  # (10 / 2) / 5 = 5 / 5 = 1.0
        divide([10, 2]) == 5.0      # 10 / 2 = 5.0
        divide([10, 0, 5])  # Raises ZeroDivisionError
        divide([]) == 0.0
    """
    if not numbers:
        return 0.0 # Handle empty list
    if 0 in numbers[1:]: # Check for division by zero
        raise ZeroDivisionError("Cannot divide by zero.")
    result = numbers[0]
    for num in numbers[1:]:
        result /= num
    return result


Let's quick check to ensure the function is working 

In [None]:
add([2, 2, 3])

In [None]:
simple_math_agent = LlmAgent(
    model=MODEL,
    name="simple_math_agent",
    description="This agent performs basic arithmetic operations (addition, subtraction, multiplication, and division) on user-provided numbers, including ranges.",
    instruction="""
      I can perform addition, subtraction, multiplication, and division operations on numbers you provide.
      Tell me the numbers you want to operate on.
      For example, you can say 'add 3 5', 'multiply 2, 4 and 3', 'Subtract 10 from 20', 'Divide 10 by 2'.
      You can also provide a range: 'Multiply the numbers between 1 and 10'.
    """,
    #generate_content_config=types.GenerateContentConfig(temperature=0.2),
    tools=[add, subtract, multiply, divide],
)

In [None]:
APP_NAME = "Math_Agent_App"

session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=simple_math_agent, app_name=APP_NAME, session_service=session_service)

In [None]:
events = call_agent(runner, "Hello", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "what is three plus 9?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "multiply that by 2", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Here is my math problem that is about apple counting. Let us start saying that I have three apple", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Alice gave anoter 2 apples", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "how many apples do I have?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Bob gave me 3 apples. how many apples do I have?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

### Example 4: Multi-agent system: Using Agent As Tool

**AgentTool:** You can embed the power of one agent within another by using AgentTool. This allows you to treat an entire agent as a tool within a parent agent. AgentTools are executed in an isolated environment, promoting safety and modularity. Combined with input/output schemas, this enables the creation of sophisticated interactions

Concepts:
- AgentTool -- If you want another agent to handle a task and always come back to the caller, make this `Agent` a `Tool` by `AgentTool`

Use case:
- Advanced math agent

In [None]:
agent_math_advanced_instruction = '''
I am an advanced math agent. I handle user query in the below steps:

1. I shall analyse the chat log to understand current question and make a math formula for it.
2. Break down a complex computation based on arithmetic priority and hand over to simple_math_agent for the calculation.
3. Note that simple_math_agent can only understand numbers, so I need to convert natural language expression of numbers into digits.

<example>
<input> alice gives us 3 apples, bob gives us 5 apples. They do this seven times. Then we eat four apples. How many apples do we have now? </input>
<think> what is (3+5) * 7 -4 </think>
<think>I need to first calculate (3+5) as the highest priority operation.</think>
<call_tool> pass (3+5) to simple_math_agent </call_tool>
<tool_response>8</tool_response>
<think> The question now becomes 8 * 7 - 4, and next highest operation is 8 * 7</think>
<call_tool> pass 8 * 7 to simple_math_agent </call_tool>
<tool_response>56</tool_response>
<think> The question now becomes 56 - 4, and next highest operation is 56 - 4</think>
<call_tool> pass 56 - 4 to simple_math_agent </call_tool>
<tool_response>52</tool_response>
<think>There is a single number, so it is the final answer.</think>
<output>The result of "(3+5) * 7 - 4" is 52</output>
</example>
'''

agent_math_advanced = Agent(
    model=MODEL,
    name="agent_math_advanced",
    description="The advanced math agent can break down a complex computation into multiple simple operations and use math_agent to solve them.",
    instruction=agent_math_advanced_instruction,
    tools=[AgentTool(agent=simple_math_agent)],
    generate_content_config=types.GenerateContentConfig(temperature=0.2),
)

In [None]:
APP_NAME = "Adv_Math_Agent_App"

session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=agent_math_advanced, app_name=APP_NAME, session_service=session_service)

In [None]:
events = call_agent(runner, "who are you?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "what is 1 + (three+2) times 7 ", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "how much is 10 / 5 + three times 5?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

### You can also leverage built-in tool like Google Search

In [None]:
# import Google Search built in tool
from google.adk.tools import google_search

In [None]:
search_agent = Agent(
    name="basic_search_agent",
    model=MODEL,
    description="Agent to answer questions using Google Search.",
    instruction="Answer user's questions by searching the internet. Also provide the website link for which you are reading the information from as reference at the end.",
    # google_search is a pre-built tool which allows the agent to perform Google searches.
    tools=[google_search]
)

In [None]:
APP_NAME = "basic_search_agent_app"

session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=search_agent, app_name=APP_NAME, session_service=session_service)

In [None]:
events = call_agent(runner, "How is the weather in Singapore today?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

### Input/Output Format Control

**Input Schema / Output Schema:** Ensure data consistency and validity by defining Pydantic schemas for your agent's input and output. The agent can then verify the input against the schema and guarantee that the generated output conforms to the specified structure using constrained decoding

In [None]:
from typing import List
from pydantic import BaseModel, Field
from google.genai import types

json_response_config = types.GenerateContentConfig(
  response_mime_type="application/json",
)

class OutputSchema(BaseModel):
    original_query: str = Field(description="The original text from user.")
    corrected_text: str = Field(description="The corrected text.")
    errors: List[str] = Field(description="An array of descriptions of each error.")
    explanations: List[str] = Field(description="An array of explanations for each correction.")

json_schema = OutputSchema.model_json_schema()
json_schema

In [None]:
# 1. The {json_schema} in instruction is the key for model to follow the schema.
# 2. The output_schema=OutputSchema provides a validation step after model output.

agent_grammar = Agent(
    model=MODEL,
    name='agent_grammar',
    description="This agent corrects grammar mistakes in text provided by children, explains the errors in simple terms, and returns both the corrected text and the explanations.",
    instruction=f"""
        You are a friendly grammar helper for kids.  Analyze the following text,
        correct any grammar mistakes, and explain the errors in a way that a
        child can easily understand.  Don't just list the errors; explain them
        in a paragraph using simple but concise language.

        Output in a JSON object with the below schema:
        {json_schema}
    """,
    output_schema=OutputSchema,
    generate_content_config=json_response_config,
)

In [None]:
APP_NAME = "Grammer_Agent"

session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=agent_grammar, app_name=APP_NAME, session_service=session_service)

In [None]:
events = call_agent(runner, "whats the weather in new york?", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

## Part 4 (Bonus): Sample "Deep Research Agent"

A sample "Deep Research Agent" consist of multi-agent collaboration. Adapted from ADK Samples: https://github.com/google/adk-samples/tree/main/python/agents/gemini-fullstack

Structure:
1. Overall Research Planner - interact with user + create overall research plan
2. Section Planner - Plan output structre (sections / what content should go in each)
3. Section Researcher - Deep research on each of the section
4. Composer - Bring everything together to give you the final report

In [None]:
PLANNER_MODEL = "gemini-2.5-flash"
EXECUTION_MODEL = "gemini-2.5-flash"

#### Researching Part

In [None]:
# Some callbacks as utility tools
def collect_research_sources_callback(callback_context: CallbackContext) -> None:
    """Collects and organizes web-based research sources and their supported claims from agent events.

    This function processes the agent's `session.events` to extract web source details (URLs,
    titles, domains from `grounding_chunks`) and associated text segments with confidence scores
    (from `grounding_supports`). The aggregated source information and a mapping of URLs to short
    IDs are cumulatively stored in `callback_context.state`.

    Args:
        callback_context (CallbackContext): The context object providing access to the agent's
            session events and persistent state.
    """
    session = callback_context._invocation_context.session
    url_to_short_id = callback_context.state.get("url_to_short_id", {})
    sources = callback_context.state.get("sources", {})
    id_counter = len(url_to_short_id) + 1
    for event in session.events:
        if not (event.grounding_metadata and event.grounding_metadata.grounding_chunks):
            continue
        chunks_info = {}
        for idx, chunk in enumerate(event.grounding_metadata.grounding_chunks):
            if not chunk.web:
                continue
            url = chunk.web.uri
            title = (
                chunk.web.title
                if chunk.web.title != chunk.web.domain
                else chunk.web.domain
            )
            if url not in url_to_short_id:
                short_id = f"src-{id_counter}"
                url_to_short_id[url] = short_id
                sources[short_id] = {
                    "short_id": short_id,
                    "title": title,
                    "url": url,
                    "domain": chunk.web.domain,
                    "supported_claims": [],
                }
                id_counter += 1
            chunks_info[idx] = url_to_short_id[url]
        if event.grounding_metadata.grounding_supports:
            for support in event.grounding_metadata.grounding_supports:
                confidence_scores = support.confidence_scores or []
                chunk_indices = support.grounding_chunk_indices or []
                for i, chunk_idx in enumerate(chunk_indices):
                    if chunk_idx in chunks_info:
                        short_id = chunks_info[chunk_idx]
                        confidence = (
                            confidence_scores[i] if i < len(confidence_scores) else 0.5
                        )
                        text_segment = support.segment.text if support.segment else ""
                        sources[short_id]["supported_claims"].append(
                            {
                                "text_segment": text_segment,
                                "confidence": confidence,
                            }
                        )
    callback_context.state["url_to_short_id"] = url_to_short_id
    callback_context.state["sources"] = sources


def citation_replacement_callback(
    callback_context: CallbackContext,
) -> types.Content:
    """Replaces citation tags in a report with Markdown-formatted links.

    Processes 'final_cited_report' from context state, converting tags like
    `<cite source="src-N"/>` into hyperlinks using source information from
    `callback_context.state["sources"]`. Also fixes spacing around punctuation.

    Args:
        callback_context (CallbackContext): Contains the report and source information.

    Returns:
        types.Content: The processed report with Markdown citation links.
    """
    final_report = callback_context.state.get("final_cited_report", "")
    sources = callback_context.state.get("sources", {})

    def tag_replacer(match: re.Match) -> str:
        short_id = match.group(1)
        if not (source_info := sources.get(short_id)):
            logging.warning(f"Invalid citation tag found and removed: {match.group(0)}")
            return ""
        display_text = source_info.get("title", source_info.get("domain", short_id))
        return f" [{display_text}]({source_info['url']})"

    processed_report = re.sub(
        r'<cite\s+source\s*=\s*["\']?\s*(src-\d+)\s*["\']?\s*/>',
        tag_replacer,
        final_report,
    )
    processed_report = re.sub(r"\s+([.,;:])", r"\1", processed_report)
    callback_context.state["final_report_with_citations"] = processed_report
    return types.Content(parts=[types.Part(text=processed_report)])

In [None]:
section_planner = LlmAgent(
    model=EXECUTION_MODEL,
    name="section_planner",
    description="Breaks down the research plan into a structured markdown outline of report sections.",
    instruction="""
    You are an expert report architect. Using the research topic and the plan from the 'research_plan' state key, design a logical structure for the final report.
    Note: Ignore all the tag nanes ([MODIFIED], [NEW], [RESEARCH], [DELIVERABLE]) in the research plan.
    Your task is to create a markdown outline with 4-6 distinct sections that cover the topic comprehensively without overlap.
    You can use any markdown format you prefer, but here's a suggested structure:
    # Section Name
    A brief overview of what this section covers
    Feel free to add subsections or bullet points if needed to better organize the content.
    Make sure your outline is clear and easy to follow.
    Do not include a "References" or "Sources" section in your outline. Citations will be handled in-line.
    """,
    output_key="report_sections",
)

In [None]:
section_researcher = LlmAgent(
    model=EXECUTION_MODEL,
    name="section_researcher",
    description="Performs the crucial first pass of web research.",
    #planner=BuiltInPlanner(
    #    thinking_config=genai_types.ThinkingConfig(include_thoughts=True)
    #),
    instruction="""
    You are a highly capable and diligent research and synthesis agent. Your comprehensive task is to execute a provided research plan with **absolute fidelity**, first by gathering necessary information, and then by synthesizing that information into specified outputs.

    You will be provided with a sequential list of research plan goals, stored in the `research_plan` state key. Each goal will be clearly prefixed with its primary task type: `[RESEARCH]` or `[DELIVERABLE]`.

    Your execution process must strictly adhere to these two distinct and sequential phases:

    ---

    **Phase 1: Information Gathering (`[RESEARCH]` Tasks)**

    *   **Execution Directive:** You **MUST** systematically process every goal prefixed with `[RESEARCH]` before proceeding to Phase 2.
    *   For each `[RESEARCH]` goal:
        *   **Query Generation:** Formulate a comprehensive set of 2-3 targeted search queries. These queries must be expertly designed to broadly cover the specific intent of the `[RESEARCH]` goal from multiple angles.
        *   **Execution:** Utilize the `google_search` tool to execute **all** generated queries for the current `[RESEARCH]` goal.
        *   **Summarization:** Synthesize the search results into a detailed, coherent summary that directly addresses the objective of the `[RESEARCH]` goal.
        *   **Internal Storage:** Store this summary, clearly tagged or indexed by its corresponding `[RESEARCH]` goal, for later and exclusive use in Phase 2. You **MUST NOT** lose or discard any generated summaries.

    ---

    **Phase 2: Synthesis and Output Creation (`[DELIVERABLE]` Tasks)**

    *   **Execution Prerequisite:** This phase **MUST ONLY COMMENCE** once **ALL** `[RESEARCH]` goals from Phase 1 have been fully completed and their summaries are internally stored.
    *   **Execution Directive:** You **MUST** systematically process **every** goal prefixed with `[DELIVERABLE]`. For each `[DELIVERABLE]` goal, your directive is to **PRODUCE** the artifact as explicitly described.
    *   For each `[DELIVERABLE]` goal:
        *   **Instruction Interpretation:** You will interpret the goal's text (following the `[DELIVERABLE]` tag) as a **direct and non-negotiable instruction** to generate a specific output artifact.
            *   *If the instruction details a table (e.g., "Create a Detailed Comparison Table in Markdown format"), your output for this step **MUST** be a properly formatted Markdown table utilizing columns and rows as implied by the instruction and the prepared data.*
            *   *If the instruction states to prepare a summary, report, or any other structured output, your output for this step **MUST** be that precise artifact.*
        *   **Data Consolidation:** Access and utilize **ONLY** the summaries generated during Phase 1 (`[RESEARCH]` tasks`) to fulfill the requirements of the current `[DELIVERABLE]` goal. You **MUST NOT** perform new searches.
        *   **Output Generation:** Based on the specific instruction of the `[DELIVERABLE]` goal:
            *   Carefully extract, organize, and synthesize the relevant information from your previously gathered summaries.
            *   Must always produce the specified output artifact (e.g., a concise summary, a structured comparison table, a comprehensive report, a visual representation, etc.) with accuracy and completeness.
        *   **Output Accumulation:** Maintain and accumulate **all** the generated `[DELIVERABLE]` artifacts. These are your final outputs.

    ---

    **Final Output:** Your final output will comprise the complete set of processed summaries from `[RESEARCH]` tasks AND all the generated artifacts from `[DELIVERABLE]` tasks, presented clearly and distinctly.
    """,
    tools=[google_search],
    output_key="section_research_findings",
    after_agent_callback=collect_research_sources_callback,
)

In [None]:
report_composer = LlmAgent(
    model=EXECUTION_MODEL,
    name="report_composer_with_citations",
    include_contents="none",
    description="Transforms research data and a markdown outline into a final, cited report.",
    instruction="""
    Transform the provided data into a polished, professional, and meticulously cited research report.

    ---
    ### INPUT DATA
    *   Research Plan: `{research_plan}`
    *   Research Findings: `{section_research_findings}`
    *   Citation Sources: `{sources}`
    *   Report Structure: `{report_sections}`

    ---
    ### CRITICAL: Citation System
    To cite a source, you MUST insert a special citation tag directly after the claim it supports.

    **The only correct format is:** `<cite source="src-ID_NUMBER" />`

    ---
    ### Final Instructions
    Generate a comprehensive report using ONLY the `<cite source="src-ID_NUMBER" />` tag system for all citations.
    The final report must strictly follow the structure provided in the **Report Structure** markdown outline.
    Do not include a "References" or "Sources" section; all citations must be in-line.
    """,
    output_key="final_cited_report",
    after_agent_callback=citation_replacement_callback,
)

In [None]:
research_pipeline = SequentialAgent(
    name="research_pipeline",
    description="Executes a pre-approved research plan. It performs iterative research, evaluation, and composes a final, cited report.",
    sub_agents=[
        section_planner,
        section_researcher,
        report_composer,
    ],
)

#### Planning part

In [None]:
plan_generator = LlmAgent(
    model=EXECUTION_MODEL,
    name="plan_generator",
    description="Generates or refine the existing 5 line action-oriented research plan, using minimal search only for topic clarification.",
    instruction=f"""
    You are a research strategist. Your job is to create a high-level RESEARCH PLAN, not a summary. If there is already a RESEARCH PLAN in the session state,
    improve upon it based on the user feedback.

    RESEARCH PLAN(SO FAR):
    {{ research_plan? }}

    **GENERAL INSTRUCTION: CLASSIFY TASK TYPES**
    Your plan must clearly classify each goal for downstream execution. Each bullet point should start with a task type prefix:
    - **`[RESEARCH]`**: For goals that primarily involve information gathering, investigation, analysis, or data collection (these require search tool usage by a researcher).
    - **`[DELIVERABLE]`**: For goals that involve synthesizing collected information, creating structured outputs (e.g., tables, charts, summaries, reports), or compiling final output artifacts (these are executed AFTER research tasks, often without further search).

    **INITIAL RULE: Your initial output MUST start with a bulleted list of 5 action-oriented research goals or key questions, followed by any *inherently implied* deliverables.**
    - All initial 5 goals will be classified as `[RESEARCH]` tasks.
    - A good goal for `[RESEARCH]` starts with a verb like "Analyze," "Identify," "Investigate."
    - A bad output is a statement of fact like "The event was in April 2024."
    - **Proactive Implied Deliverables (Initial):** If any of your initial 5 `[RESEARCH]` goals inherently imply a standard output or deliverable (e.g., a comparative analysis suggesting a comparison table, or a comprehensive review suggesting a summary document), you MUST add these as additional, distinct goals immediately after the initial 5. Phrase these as *synthesis or output creation actions* (e.g., "Create a summary," "Develop a comparison," "Compile a report") and prefix them with `[DELIVERABLE][IMPLIED]`.

    **REFINEMENT RULE**:
    - **Integrate Feedback & Mark Changes:** When incorporating user feedback, make targeted modifications to existing bullet points. Add `[MODIFIED]` to the existing task type and status prefix (e.g., `[RESEARCH][MODIFIED]`). If the feedback introduces new goals:
        - If it's an information gathering task, prefix it with `[RESEARCH][NEW]`.
        - If it's a synthesis or output creation task, prefix it with `[DELIVERABLE][NEW]`.
    - **Proactive Implied Deliverables (Refinement):** Beyond explicit user feedback, if the nature of an existing `[RESEARCH]` goal (e.g., requiring a structured comparison, deep dive analysis, or broad synthesis) or a `[DELIVERABLE]` goal inherently implies an additional, standard output or synthesis step (e.g., a detailed report following a summary, or a visual representation of complex data), proactively add this as a new goal. Phrase these as *synthesis or output creation actions* and prefix them with `[DELIVERABLE][IMPLIED]`.
    - **Maintain Order:** Strictly maintain the original sequential order of existing bullet points. New bullets, whether `[NEW]` or `[IMPLIED]`, should generally be appended to the list, unless the user explicitly instructs a specific insertion point.
    - **Flexible Length:** The refined plan is no longer constrained by the initial 5-bullet limit and may comprise more goals as needed to fully address the feedback and implied deliverables.

    **TOOL USE IS STRICTLY LIMITED:**
    Your goal is to create a generic, high-quality plan *without searching*.
    Only use `google_search` if a topic is ambiguous or time-sensitive and you absolutely cannot create a plan without a key piece of identifying information.
    You are explicitly forbidden from researching the *content* or *themes* of the topic. That is the next agent's job. Your search is only to identify the subject, not to investigate it.
    Current date: {datetime.datetime.now().strftime("%Y-%m-%d")}
    """,
    tools=[google_search],
)

In [None]:
interactive_planner_agent = LlmAgent(
    name="interactive_planner_agent",
    model=PLANNER_MODEL,
    description="The primary research assistant. It collaborates with the user to create a research plan, and then executes it upon approval.",
    instruction=f"""
    You are a research planning assistant. Your primary function is to convert ANY user request into a research plan.

    **CRITICAL RULE: Never answer a question directly or refuse a request.** Your one and only first step is to use the `plan_generator` tool to propose a research plan for the user's topic.
    If the user asks a question, you MUST immediately call `plan_generator` to create a plan to answer the question.

    Your workflow is:
    1.  **Plan:** Use `plan_generator` to create a draft plan and present it to the user.
    2.  **Refine:** Incorporate user feedback until the plan is approved.
    3.  **Execute:** Once the user gives EXPLICIT approval (e.g., "looks good, run it"), you MUST delegate the task to the `research_pipeline` agent, passing the approved plan.

    Current date: {datetime.datetime.now().strftime("%Y-%m-%d")}
    Do not perform any research yourself. Your job is to Plan, Refine, and Delegate.
    """,
    sub_agents=[research_pipeline],
    tools=[AgentTool(plan_generator)],
    output_key="research_plan",
)

#### Interact with it

In [None]:
session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name="deep_research_agent", 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
runner = Runner(agent=interactive_planner_agent, app_name="deep_research_agent", session_service=session_service)

In [None]:
events = call_agent(runner, "hello", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "What can you do", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Help me research the weather trend in Singapore", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Simplify it, 3 research point and 1 deliverable will do", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

In [None]:
events = call_agent(runner, "Lets go", user_id=USER_ID, session_id=SESSION_ID)
pprint_events(events)

#### For more complex pattern 

You may consider adding in an "evaluator" to check each of the research output, and ask it to redo the search

This can be done using "LoopAgent" class in the ADK, with sub-agents conducting 1) Evaluation, and 2) Research

## Part 5: Productionise your Agent: Hosting Agent on Agent engine

Agent Engine handles all the session & resource management for you, so you can focus on building the agent logic

### Put agent into an ADK App first

In [None]:
app = AdkApp(
    agent=interactive_planner_agent,
    #enable_tracing=True,
)

Note: If you see an error on "Cloud Trace" when running the above cell - please ignore for this workshop

### Agent Engine hosting

following code might take 5~10 minutes to finish

Agent Engine help to manage session / runtime, so you can focus on building agent itself instead of worry about Session management

In [None]:
remote_app = agent_engines.create(
    app,
    requirements=["google-cloud-aiplatform[agent_engines,adk]"],
    display_name="deep_research_agent",
    description="Deep Research Agent that uses ADK",        
)

In [None]:
remote_app

In [None]:
remote_session = remote_app.create_session(user_id="u_123")
remote_session

In [None]:
def call_ae_agent(query):
    for event in remote_app.stream_query(
        user_id="u_123",
        session_id=remote_session["id"],
        message=query,
    ):
        try:
            print(event['content']['parts'][0]['text'])
        except:
            if event.get('content'):
                if event['content']['parts'][0].get("thought_signature"):
                    print("Calling function with following details: ")
                    print(event['content']['parts'][0]['function_call'])
                elif event['content']['parts'][0].get("function_response"):
                    print("Function response as follows: ")
                    print(event['content']['parts'][0]['function_response'])
            else:
                print(event)
        print("\n")

In [None]:
call_ae_agent("hello")

In [None]:
call_ae_agent("Help me research weather trends in Singapore")

In [None]:
call_ae_agent("Looks good to me")

### Don't forget to release the resource

In [None]:
remote_app.delete(force=True)