In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Get started

### Install required packages

First, we'll install the necessary packages.


In [None]:
%pip install --upgrade --quiet "a2a-sdk>=0.3.4" --force-reinstall --quiet
%pip install --upgrade --quiet "google-cloud-aiplatform[agent_engines, adk]>=1.112.0" --force-reinstall --quiet 

### Authenticate your notebook environment (Colab only)

If you're running this notebook on Google Colab, run the cell below to authenticate your environment.

In [None]:
# import sys
# 
# if "google.colab" in sys.modules:
#     from google.colab import auth
# 
#     auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
# Use the environment variable if the user doesn't provide Project ID.
import os

import vertexai
from google.genai import types

from dotenv import load_dotenv
load_dotenv()  # 


PROJECT_ID = os.environ.get("PROJECT_ID") # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

BUCKET_NAME = f"{PROJECT_ID}-bucket"   # @param {type: "string", placeholder: "[your-bucket-name]", isTemplate: true}
if not BUCKET_NAME or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID

BUCKET_URI = f"gs://{BUCKET_NAME}"

# !gsutil mb -l $LOCATION -p $PROJECT_ID $BUCKET_URI

# Initialize Vertex AI session
vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)
#genai.configure(project=PROJECT_ID)

# Initialize the Gen AI client using http_options
# The parameter customizes how the Vertex AI client communicates with Google Cloud's backend services.
# It's used here to access new, pre-release features.
client = vertexai.Client(
    project=PROJECT_ID,
    location=LOCATION,
    http_options=types.HttpOptions(
        api_version="v1beta1", base_url=f"https://{LOCATION}-aiplatform.googleapis.com/"
    ),
)

In [None]:
PROJECT_NUMBER = os.environ.get("PROJECT_NUMBER")
print(f"PROJECT_NUMBER: {PROJECT_NUMBER}")

### Import libraries

Here, we're importing all the necessary Python classes and functions we'll use throughout the notebook.

In [None]:
# Helpers
import json
import logging
import time
from collections.abc import Awaitable, Callable
from datetime import datetime
from pprint import pprint
from typing import Any, NoReturn

import httpx
from IPython.display import Markdown, display
from google.auth import default
from google.auth.transport.requests import Request as req
from starlette.requests import Request

logging.getLogger().setLevel(logging.INFO)


# A2A
from a2a.client import ClientConfig, ClientFactory
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import (
    AgentSkill,
    Message,
    Part,
    Role,
    TaskState,
    TaskQueryParams,
    TextPart,
    TransportProtocol,
    UnsupportedOperationError,
)
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError

# ADK
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search_tool
from google.genai import types

# Agent Engine
from vertexai.preview.reasoning_engines import A2aAgent
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card

### Helpers

These are simple utility functions to make our lives easier, especially for local testing. They help create mock HTTP requests (`build_post_request`, `build_get_request`) and fetch authentication tokens (`get_bearer_token`).

In [None]:
def receive_wrapper(data: dict) -> Callable[[], Awaitable[dict]]:
    """Creates a mock ASGI receive callable for testing."""

    async def receive():
        byte_data = json.dumps(data).encode("utf-8")
        return {"type": "http.request", "body": byte_data, "more_body": False}

    return receive


def build_post_request(
    data: dict[str, Any] | None = None, path_params: dict[str, str] | None = None
) -> Request:
    """Builds a mock Starlette Request object for a POST request with JSON data."""
    scope = {
        "type": "http",
        "http_version": "1.1",
        "headers": [(b"content-type", b"application/json")],
        "app": None,
    }
    if path_params:
        scope["path_params"] = path_params
    receiver = receive_wrapper(data)
    return Request(scope, receiver)


def build_get_request(path_params: dict[str, str]) -> Request:
    """Builds a mock Starlette Request object for a GET request."""
    scope = {
        "type": "http",
        "http_version": "1.1",
        "query_string": b"",
        "app": None,
    }
    if path_params:
        scope["path_params"] = path_params

    async def receive():
        return {"type": "http.disconnect"}

    return Request(scope, receive)


def get_bearer_token() -> str | None:
    """Fetches a Google Cloud bearer token using Application Default Credentials."""
    try:
        # Use an alias to avoid name collision with starlette.requests.Request
        credentials, project = default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = req()
        credentials.refresh(request)
        return credentials.token
    except Exception as e:
        print(f"Error getting credentials: {e}")
        print(
            "Please ensure you have authenticated with 'gcloud auth application-default login'."
        )
    return None

In [None]:
WEA_AGENT_URL = os.environ.get("WEA_AGENT_URL")
CT_AGENT_URL = os.environ.get("CT_AGENT_URL")
print(f"WEA_AGENT_URL: {WEA_AGENT_URL}")
print(f"CT_AGENT_URL: {CT_AGENT_URL}")  

## Test ADK root agent

In [None]:
from google.auth import default
from google.auth.transport.requests import Request as AuthRequest
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent

In [None]:
class GoogleAuth(httpx.Auth):
    """A custom httpx Auth class for Google Cloud authentication."""

    def __init__(self):
        self.credentials, self.project = default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        self.auth_request = AuthRequest()

    def auth_flow(self, request):
        # Refresh the credentials if they are expired
        if not self.credentials.valid:
            print("Credentials expired, refreshing...")
            self.credentials.refresh(self.auth_request)

        # Add the Authorization header to the request
        request.headers["Authorization"] = f"Bearer {self.credentials.token}"
        yield request

In [None]:
class GoogleAuthRefresh(httpx.Auth):
    def __init__(self, scopes):
        self.credentials, _ = default(scopes=scopes)
        self.transport_request = req()
        self.credentials.refresh(self.transport_request)

    def auth_flow(self, request):
        if not self.credentials.valid:
            self.credentials.refresh(self.transport_request)
        
        request.headers['Authorization'] = f'Bearer {self.credentials.token}'
        yield request


### Local testing

In [None]:
httpx_client = httpx.AsyncClient(
                timeout=120,
                headers={
                'Content-Type': 'application/json',
            },
                #auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform'])
                auth=GoogleAuth(),
            )
#httpx_client.headers["Content-Type"] = "application/json"

In [None]:
config = ClientConfig(
            httpx_client=httpx_client,
            supported_transports=[
                TransportProtocol.jsonrpc,
                TransportProtocol.http_json,
            ],
        )
client_factory = ClientFactory(config)

In [None]:
WEA_AGENT_URL = os.environ.get("WEA_AGENT_URL")
CT_AGENT_URL = os.environ.get("CT_AGENT_URL")
print(f"WEA_AGENT_URL: {WEA_AGENT_URL}")
print(f"CT_AGENT_URL: {CT_AGENT_URL}")

In [None]:
weather_agent_remoteA2a = RemoteA2aAgent(
    name='weather_assistant',
    description='''
An agent that gathers the necessary information for weather information
    ''',
    agent_card=f'{WEA_AGENT_URL}/v1/card',
    a2a_client_factory=client_factory,
)

In [None]:
cocktail_agent_remoteA2a = RemoteA2aAgent(
    name='cocktail_assistant',
    description='''
An agent that gathers the necessary information for cocktail information
    ''',
    agent_card=f'{CT_AGENT_URL}/v1/card',
    a2a_client_factory=client_factory,
)

In [None]:
root_instruction ="""
**Role:** You are a Virtual Assistant acting as a Request Router. You can help user with questions regarding cocktails, and weather.

**Primary Goal:** Analyze user requests and route them to the correct specialist sub-agent.

**Capabilities & Routing:**
* **Greetings:** If the user greets you, respond warmly and directly.
* **Cocktails:** Route requests about cocktails, drinks, recipes, or ingredients to `cocktail_assistant`.
* **Booking & Weather:** Route requests about checking weather to `weather_assistant`.
* **Out-of-Scope:** If the request is unrelated (e.g., general knowledge, math), state directly that you cannot assist with that topic.

**Key Directives:**
* **Delegate Immediately:** Once a suitable sub-agent is identified, route the request without asking permission.
* **Do Not Answer Delegated Topics:** You must **not** attempt to answer questions related to cocktails, booking, or weather yourself. Always delegate.
* **Formatting:** Format your final response to the user using Markdown for readability.
"""

In [None]:
root_agent = LlmAgent(
            model="gemini-2.5-flash",
            name='host_agent',
            instruction=root_instruction,
            description=(
                'This agent orchestrates the decomposition of the user request into'
                ' tasks that can be performed by the child agents.'
            ),
            sub_agents=[
                weather_agent_remoteA2a,
                cocktail_agent_remoteA2a,
            
            ],
        )

In [None]:
from vertexai import agent_engines

# Wrap the agent in an AdkApp object
app = agent_engines.AdkApp(
    agent=root_agent,
    #enable_tracing=True,
)

In [None]:
# Create a local session to maintain conversation history
session = await app.async_create_session(user_id="u_123")
print(session)

In [None]:
events = []
async for event in app.async_stream_query(
    user_id="u_123",
    session_id=session.id,
    message="whats the weather in new york, ny",
):
    events.append(event)

# The full event stream shows the agent's thought process
print("--- Full Event Stream ---")
for event in events:
    print(event)

# For quick tests, you can extract just the final text response
final_text_responses = [
    e for e in events
    if e.get("content", {}).get("parts", [{}])[0].get("text")
    and not e.get("content", {}).get("parts", [{}])[0].get("function_call")
]
if final_text_responses:
    print("\n--- Final Response ---")
    print(final_text_responses[0]["content"]["parts"][0]["text"])

## Deploy to Agent Engine

ref: https://github.com/google-cloud-japan/sa-ml-workshop/blob/main/blog/Agentic-workflow-AgentEngine-A2A.ipynb. 

Currently, deploying the root agent with RemoteA2aAgent as its subagents needs some workarounds to keep the root agent instance serializable.

We define a wrapper class for ClientFactory and RemoteA2aAgent as below.

## Wrapper class

In [None]:
class MyClientFactory(ClientFactory):
    def create(self, card, consumers=None, interceptors=None):
        if not self._config.httpx_client:
            self._config.httpx_client=httpx.AsyncClient(
                timeout=60,
                headers={'Content-Type': 'application/json'},
                auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform']) 
            )
            self._register_defaults(self._config.supported_transports)
        return super().create(card, consumers, interceptors)


class MyRemoteA2aAgent(RemoteA2aAgent):
    async def _ensure_httpx_client(self) -> httpx.AsyncClient:
        if not self._httpx_client:
            self._httpx_client=httpx.AsyncClient(
                timeout=60,
                headers={'Content-Type': 'application/json'},
                auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform']) 
            )
        return self._httpx_client


client_factory = MyClientFactory(
    ClientConfig(
        supported_transports=[TransportProtocol.http_json, TransportProtocol.jsonrpc],
        use_client_preference=True,
    )
)

In [None]:
weather_agent_remoteA2a = MyRemoteA2aAgent(
    name='weather_assistant',
    description='''
An agent that gathers the necessary information for weather information
    ''',
    agent_card=f'{WEA_AGENT_URL}/v1/card',
    a2a_client_factory=client_factory,
)

In [None]:
cocktail_agent_remoteA2a = MyRemoteA2aAgent(
    name='cocktail_assistant',
    description='''
An agent that gathers the necessary information for cocktail information
    ''',
    agent_card=f'{CT_AGENT_URL}/v1/card',
    a2a_client_factory=client_factory,
)

In [None]:
root_agent = LlmAgent(
            model="gemini-2.5-flash",
            name='host_agent',
            instruction=root_instruction,
            description=(
                'This agent orchestrates the decomposition of the user request into'
                ' tasks that can be performed by the child agents.'
            ),
            sub_agents=[
                weather_agent_remoteA2a,
                cocktail_agent_remoteA2a,
            
            ],
        )

In [None]:
remote_agent = agent_engines.create(
    agent_engine=root_agent,
    display_name='A2A_ADK_Agent_on_Agent_Engine',
    requirements=[
        "google-cloud-aiplatform[agent_engines,adk]>=1.112.0",
            "a2a-sdk >= 0.3.4",
            "pydantic==2.11.9",
            "cloudpickle==3.1.1",
    ],
)

In [None]:
print("Deployment finished!")
print(f"Resource Name: {remote_agent.resource_name}")
# Resource Name: "projects/{PROJECT_NUMBER}/locations/{LOCATION}/reasoningEngines/{RESOURCE_ID}"
#       Note: The PROJECT_NUMBER is different than the PROJECT_ID.

In [None]:
from vertexai import agent_engines

In [None]:
AGENT_ENGINE_ID = os.getenv("AGENT_ENGINE_ID")
LOCATION = os.getenv('GOOGLE_CLOUD_LOCATION', 'us-central1')
PROJECT_NUMBER = os.getenv("PROJECT_NUMBER")

resource_name = (
    f"projects/{PROJECT_NUMBER}/locations/us-central1/reasoningEngines/{AGENT_ENGINE_ID}"
)

In [None]:
# If you are in a new script or used the ADK CLI to deploy, you can connect like this:
remote_agent = agent_engines.get(resource_name)
remote_session = await remote_agent.async_create_session(user_id="u_456")
print(remote_session)

In [None]:
async for event in remote_agent.async_stream_query(
    user_id="u_456",
    session_id=remote_session["id"],
    message="whats the weather in new york, ny",
):
    print(event)
    event["content"]