# 🧪 ADK with A2A Application Testing

This notebook demonstrates how to test an ADK (Agent Development Kit) application that implements the Agent2Agent (A2A) protocol.
It covers both local and remote testing, both with Agent Engine and Cloud Run.

## Install dependencies

In [None]:
!pip install google-cloud-aiplatform a2a-sdk --upgrade

In [None]:
# Uncomment the following lines if you're not using the virtual environment created by uv
# import sys

# sys.path.append("../")

### Import libraries

In [None]:
# ruff: noqa
import asyncio
import json
import os
import requests
import uuid

import vertexai
from a2a.types import (
    Message,
    MessageSendParams,
    Part,
    Role,
    SendStreamingMessageRequest,
    TextPart,
)
from IPython.display import Markdown, display
from google.adk.artifacts import InMemoryArtifactService
from google.adk.sessions import InMemorySessionService

from app.agent_engine_app import AgentEngineApp
from tests.helpers import (
    build_get_request,
    build_post_request,
    poll_task_completion,
)

### Initialize Vertex AI Client

In [None]:
# Initialize the Vertex AI client
LOCATION = "us-central1"

client = vertexai.Client(
    location=LOCATION,
)

## If you are using Agent Engine
See more documentation at [Agent Engine Overview](https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/overview)

### Remote Testing

In [None]:
# Set to None to auto-detect from ./deployment_metadata.json, or specify manually
# "projects/PROJECT_ID/locations/us-central1/reasoningEngines/ENGINE_ID"
REASONING_ENGINE_ID = None

if REASONING_ENGINE_ID is None:
    try:
        with open("../deployment_metadata.json") as f:
            metadata = json.load(f)
            REASONING_ENGINE_ID = metadata.get("remote_agent_engine_id")
    except (FileNotFoundError, json.JSONDecodeError):
        pass

print(f"Using REASONING_ENGINE_ID: {REASONING_ENGINE_ID}")

# Extract project_id, location, and engine_id from REASONING_ENGINE_ID
parts = REASONING_ENGINE_ID.split("/")
project_id = parts[1]
location = parts[3]
engine_id = parts[5]

# Construct API endpoints
base_url = f"https://{location}-aiplatform.googleapis.com"
a2a_base_path = f"/v1beta1/projects/{project_id}/locations/{location}/reasoningEngines/{engine_id}/a2a/v1"

print(f"Base URL: {base_url}")
print(f"A2A base path: {a2a_base_path}")

#### Fetch Agent Card

In [None]:
# Fetch agent card using REST API
import google.auth
import google.auth.transport.requests

# Get authentication token
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)

headers = {"Content-Type": "application/json", "Authorization": f"Bearer {creds.token}"}

# GET request to fetch agent card
response = requests.get(
    f"{base_url}{a2a_base_path}/card",
    headers=headers,
)

print(f"Response status code: {response.status_code}")

if response.status_code == 200:
    remote_a2a_agent_card = response.json()
    print(f"Agent: {remote_a2a_agent_card.get('name')}")
    print(f"URL: {remote_a2a_agent_card.get('url')}")
    print(
        f"Skills: {[s.get('description') for s in remote_a2a_agent_card.get('skills', [])]}"
    )
    print(f"Protocol Version: {remote_a2a_agent_card.get('protocolVersion')}")
else:
    print(f"Error: {response.text}")

#### Send Message

In [None]:
# Send the message using A2A REST API
import google.auth
import google.auth.transport.requests

# Get authentication token
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)

headers = {"Content-Type": "application/json", "Authorization": f"Bearer {creds.token}"}

data = {
    "message": {
        "messageId": f"msg-{os.urandom(8).hex()}",
        "content": [{"text": "What is the weather in New York?"}],
        "role": "ROLE_USER",
    }
}

# Send POST request to message:send endpoint
response = requests.post(
    f"{base_url}{a2a_base_path}/message:send",
    headers=headers,
    json=data,
)

print(f"Response status code: {response.status_code}")

if response.status_code == 200:
    response_data = response.json()
    task_id = response_data["task"]["id"]
    print(f"Task started: {task_id}")
else:
    print(f"Error: {response.text}")

#### Poll for response

In [None]:
# Poll for task completion using REST API
max_attempts = 30
for attempt in range(max_attempts):
    poll_response = requests.get(
        f"{base_url}{a2a_base_path}/tasks/{task_id}",
        headers=headers,
    )

    if poll_response.status_code != 200:
        print(f"Poll failed with status code: {poll_response.status_code}")
        break

    result = poll_response.json()
    task_state = result.get("status", {}).get("state")
    print(f"Attempt {attempt + 1}: {task_state}")

    if task_state == "TASK_STATE_COMPLETED":
        print("Task completed!")
        break
    elif task_state in ["TASK_STATE_FAILED", "TASK_STATE_CANCELLED"]:
        print(f"Task failed: {result}")
        break

    await asyncio.sleep(1)

# Extract and display artifacts
if "artifacts" in result and result["artifacts"]:
    for artifact in result["artifacts"]:
        if artifact.get("parts"):
            for part in artifact["parts"]:
                if "text" in part:
                    display(Markdown(f"**Answer**:\\n {part['text']}"))
                else:
                    print("Could not extract text from artifact parts.")
else:
    print("No artifacts found in result")

### Local Testing

You can import directly the AgentEngineApp class within your environment. 
To run the agent locally, follow these steps:
1. Make sure all required packages are installed in your environment
2. The recommended approach is to use the same virtual environment created by the 'uv' tool
3. You can set up this environment by running 'make install' from your agent's root directory
4. Then select this kernel (.venv folder in your project) in your Jupyter notebook to ensure all dependencies are available

In [None]:
local_agent_engine = await AgentEngineApp.create(
    artifact_service_builder=lambda: InMemoryArtifactService(),
    session_service_builder=lambda: InMemorySessionService(),
)
local_agent_engine.set_up()

#### Verify Custom Method is Registered

In [None]:
test = local_agent_engine.register_operations()
print(test)

#### Fetch Agent Card

In [None]:
request = build_get_request(None)
response = await local_agent_engine.handle_authenticated_agent_card(
    request=request, context=None
)
print(response)

#### Send Message

In [None]:
message_data = {
    "message": {
        "messageId": f"msg-{os.urandom(8).hex()}",
        "content": [{"text": "What is the weather in New York?"}],
        "role": "ROLE_USER",
    },
}

request = build_post_request(message_data)

response = await local_agent_engine.on_message_send(request=request, context=None)
print(response)

#### Poll for response

In [None]:
task_id = response["task"]["id"]
print(f"The Task ID is: {task_id}")

# Poll for completion using helper
final_response = await poll_task_completion(local_agent_engine, task_id)

# Extract and display artifacts
for artifact in final_response["artifacts"]:
    if artifact["parts"] and "text" in artifact["parts"][0]:
        display(Markdown(f"**Answer**:\n {artifact['parts'][0]['text']}"))
    else:
        print("Could not extract text from artifact parts.")

#### Register Feedback

In [None]:
local_agent_engine.register_feedback(
    feedback={
        "score": 5,
        "text": "Great response!",
        "invocation_id": "test-invocation-123",
        "user_id": "test",
    }
)

## If you are using Cloud Run

### Remote Testing

For more information about authenticating HTTPS requests to Cloud Run services, see:
[Cloud Run Authentication Documentation](https://cloud.google.com/run/docs/triggering/https-request)

Remote testing involves using a deployed service URL instead of localhost.

Authentication is handled using GCP identity tokens instead of local credentials.

In [None]:
ID_TOKEN = get_ipython().getoutput("gcloud auth print-identity-token -q")[0]

In [None]:
SERVICE_URL = "YOUR_SERVICE_URL_HERE"  # Replace with your Cloud Run service URL

Send a message using A2A protocol

In [None]:
# Create A2A message request
message = Message(
    message_id=f"msg-user-{uuid.uuid4()}",
    role=Role.user,
    parts=[Part(root=TextPart(text="Hello! Weather in New York?"))],
)

request = SendStreamingMessageRequest(
    id=f"req-{uuid.uuid4()}",
    params=MessageSendParams(message=message),
)

# Set up headers with authentication
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {ID_TOKEN}"}

# Send the streaming request to the A2A endpoint
response = requests.post(
    f"{SERVICE_URL}/a2a/app",
    headers=headers,
    json=request.model_dump(mode="json", exclude_none=True),
    stream=True,
    timeout=60,
)

print(f"Response status code: {response.status_code}")

# Parse streaming A2A responses
for line in response.iter_lines():
    if line:
        line_str = line.decode("utf-8")
        if line_str.startswith("data: "):
            event_json = line_str[6:]
            event = json.loads(event_json)
            print(f"Received event: {event}")

### Local Testing

> You can run the application locally via the `make local-backend` command.

Send a message to the local backend service using the A2A protocol and receive a streaming response.

In [None]:
# Create A2A message request
message = Message(
    message_id=f"msg-user-{uuid.uuid4()}",
    role=Role.user,
    parts=[Part(root=TextPart(text="Hello! Weather in New York?"))],
)

request = SendStreamingMessageRequest(
    id=f"req-{uuid.uuid4()}",
    params=MessageSendParams(message=message),
)

# Set up headers
headers = {"Content-Type": "application/json"}

# Send the streaming request to the local A2A endpoint
response = requests.post(
    "http://127.0.0.1:8000/a2a/app",
    headers=headers,
    json=request.model_dump(mode="json", exclude_none=True),
    stream=True,
    timeout=60,
)

print(f"Response status code: {response.status_code}")

# Parse streaming A2A responses
for line in response.iter_lines():
    if line:
        line_str = line.decode("utf-8")
        if line_str.startswith("data: "):
            event_json = line_str[6:]
            event = json.loads(event_json)
            print(f"Received event: {event}")