<a href="https://colab.research.google.com/github/sampathn2005/AI-For-Beginners/blob/main/notebooks/a2a_quickstart.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Copyright 2025 Psitron Technologies

# Getting Started with Google A2A (Agent-to-Agent) Communication

This notebook introduces you to Google's Agent-to-Agent (A2A) protocol, a standardized way for AI agents to communicate and collaborate.  

## What You'll Build

A three-agent system that works together to analyze trending topics:
1. **Trending Topics Agent** - Searches the web for current trending topics
2. **Trend Analyzer Agent** - Performs deep analysis with quantitative data
3. **Host Agent** - Orchestrates the other agents to provide comprehensive insights

<img src="https://storage.googleapis.com/github-repo/a2a/a2a-diagram.png" alt="drawing" width="1000"/>

### Setup and Installation

First, let's install the required dependencies:

In [1]:
# Install required packages
%pip install --upgrade -q google-genai google-adk==1.9.0 a2a-sdk==0.3.0 python-dotenv aiohttp uvicorn requests mermaid-python nest-asyncio

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m26.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m130.3/130.3 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m245.3/245.3 kB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.7/64.7 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m51.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-colab 1.0.0 requires requests==2.32.4, but you have requests 2.32.5 which is incompatible.[0m

## 1. Introduction to A2A

### What is Agent-to-Agent (A2A) Communication?

A2A is a standardized protocol that enables AI agents to:
- **Discover** each other's capabilities
- **Communicate** using a common JSON-RPC based protocol
- **Collaborate** to solve complex tasks
- **Stream** responses for real-time interactions

### Environment Configuration

In [None]:

import sys

from a2a.client import client as real_client_module
from a2a.client.card_resolver import A2ACardResolver


class PatchedClientModule:
    def __init__(self, real_module) -> None:
        for attr in dir(real_module):
            if not attr.startswith('_'):
                setattr(self, attr, getattr(real_module, attr))
        self.A2ACardResolver = A2ACardResolver


patched_module = PatchedClientModule(real_client_module)
sys.modules['a2a.client.client'] = patched_module  # type: ignore

In [None]:
import asyncio
import logging
import os
import sys
import threading
import time

from typing import Any

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from dotenv import load_dotenv
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search

In [None]:
# Set Google Cloud Configuration
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'TRUE'
os.environ['GOOGLE_CLOUD_PROJECT'] = (
    'a2aproject-472304'
)
os.environ['GOOGLE_CLOUD_LOCATION'] = (
    'us-central1'  # Replace with your location
)

load_dotenv()

print('Environment variables configured:')
print(f'GOOGLE_GENAI_USE_VERTEXAI: {os.environ["GOOGLE_GENAI_USE_VERTEXAI"]}')
print(f'GOOGLE_CLOUD_PROJECT: {os.environ["GOOGLE_CLOUD_PROJECT"]}')
print(f'GOOGLE_CLOUD_LOCATION: {os.environ["GOOGLE_CLOUD_LOCATION"]}')

In [None]:
# Authenticate your notebook environment (Colab only)
if 'google.colab' in sys.modules:
    from google.colab import auth

    auth.authenticate_user(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

In [None]:
# Setup logging
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
)

## 2. Building Your A2A System

Let's build our three-agent system step by step. We'll create:

1. **Trending Topics Agent** - Finds current trending topics
2. **Trend Analyzer Agent** - Analyzes trends with quantitative data
3. **Host Agent** - Orchestrates the other agents (sequentially)

### Agent 1: Trending Topics Agent

This agent searches the web for trending topics and returns a list of current trends.

In [None]:
# Create the Trending Topics ADK Agent
trending_agent = Agent(
    model='gemini-2.5-pro',
    name='trending_topics_agent',
    instruction="""
    You are a social media trends analyst. Your job is to search the web for current trending topics,
    particularly from social platforms.

    When asked about trends:
    1. Search for "trending topics today" or similar queries
    2. Extract the top 3 trending topics
    3. Return them in a JSON format

    Focus on current, real-time trends from the last 24 hours.

    You MUST return your response in the following JSON format:
    {
        "trends": [
            {
                "topic": "Topic name",
                "description": "Brief description (1-2 sentences)",
                "reason": "Why it's trending"
            },
            {
                "topic": "Topic name",
                "description": "Brief description (1-2 sentences)",
                "reason": "Why it's trending"
            },
            {
                "topic": "Topic name",
                "description": "Brief description (1-2 sentences)",
                "reason": "Why it's trending"
            }
        ]
    }

    Only return the JSON object, no additional text.
    """,
    tools=[google_search],
)

print('Trending Topics Agent created successfully!')

In [None]:
trending_agent_card = AgentCard(
    name='Trending Topics Agent',
    url='http://localhost:10020',
    description='Searches the web for current trending topics from social media',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='find_trends',
            name='Find Trending Topics',
            description='Searches for current trending topics on social media',
            tags=['trends', 'social media', 'twitter', 'current events'],
            examples=[
                "What's trending today?",
                'Show me current Twitter trends',
                'What are people talking about on social media?',
            ],
        )
    ],
)

In [None]:
remote_trending_agent = RemoteA2aAgent(
    name='find_trends',
    description='Searches for current trending topics on social media',
    agent_card=f'http://localhost:10020{AGENT_CARD_WELL_KNOWN_PATH}',
)

### Agent 2: Trend Analyzer Agent

This agent takes a specific trend and performs deep analysis with quantitative data.

In [None]:
# Create the Trend Analyzer ADK Agent
analyzer_agent = Agent(
    model='gemini-2.5-pro',
    name='trend_analyzer_agent',
    instruction="""
    You are a data analyst specializing in trend analysis. When given a trending topic,
    perform deep research to find quantitative data and insights.

    For each trend you analyze:
    1. Search for statistics, numbers, and metrics related to the trend
    2. Look for:
       - Engagement metrics (views, shares, mentions)
       - Growth rates and timeline
       - Geographic distribution
       - Related hashtags or keywords
    3. Provide concrete numbers and data points

    Keep it somehow concise

    Always prioritize quantitative information over qualitative descriptions.
    """,
    tools=[google_search],
)

print('Trend Analyzer Agent created successfully!')

In [None]:
analyzer_agent_card = AgentCard(
    name='Trend Analyzer Agent',
    url='http://localhost:10021',
    description='Performs deep analysis of trends with quantitative data',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='analyze_trend',
            name='Analyze Trend',
            description='Provides quantitative analysis of a specific trend',
            tags=['analysis', 'data', 'metrics', 'statistics'],
            examples=[
                'Analyze the #ClimateChange trend',
                'Get metrics for the Taylor Swift trend',
                'Provide data analysis for AI adoption trend',
            ],
        )
    ],
)

In [None]:
remote_analyzer_agent = RemoteA2aAgent(
    name='analyze_trend',
    description='Provides quantitative analysis of a specific trend',
    agent_card=f'http://localhost:10021{AGENT_CARD_WELL_KNOWN_PATH}',
)

### Agent 3: Host Agent (Orchestrator)

The Host Agent coordinates between the other two agents to provide comprehensive trend analysis.

In [None]:
# Create the Host ADK Agent
host_agent = SequentialAgent(
    name='trend_analysis_host',
    sub_agents=[remote_trending_agent, remote_analyzer_agent],
)

In [None]:
host_agent_card = AgentCard(
    name='Trend Analysis Host',
    url='http://localhost:10022',
    description='Orchestrates, sequentially, trend discovery and analysis using specialized agents',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['application/json'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='comprehensive_trend_analysis',
            name='Comprehensive Trend Analysis',
            description='Finds trending topics and provides deep analysis of the most relevant one',
            tags=['trends', 'analysis', 'orchestration', 'insights'],
            examples=[
                'Analyze current trends',
                "What's trending and why is it important?",
                'Give me a comprehensive trend report',
            ],
        )
    ],
)

## 3. Running

Now let's put everything together. We'll create helper functions to start our agents and run the complete system.

### Starting the A2A Servers

Create function to run each agent as an A2A server:

In [None]:
def create_agent_a2a_server(agent, agent_card):
    """Create an A2A server for any ADK agent.

    Args:
        agent: The ADK agent instance
        agent_card: The ADK agent card

    Returns:
        A2AStarletteApplication instance
    """
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=config)

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    # Create A2A application
    return A2AStarletteApplication(
        agent_card=agent_card, http_handler=request_handler
    )

In [None]:
# Apply nest_asyncio
nest_asyncio.apply()

# Store server tasks
server_tasks: list[asyncio.Task] = []


async def run_agent_server(agent, agent_card, port) -> None:
    """Run a single agent server."""
    app = create_agent_a2a_server(agent, agent_card)

    config = uvicorn.Config(
        app.build(),
        host='127.0.0.1',
        port=port,
        log_level='warning',
        loop='none',  # Important: let uvicorn use the current loop
    )

    server = uvicorn.Server(config)
    await server.serve()


async def start_all_servers() -> None:
    """Start all servers in the same event loop."""
    # Create tasks for all servers
    tasks = [
        asyncio.create_task(
            run_agent_server(trending_agent, trending_agent_card, 10020)
        ),
        asyncio.create_task(
            run_agent_server(analyzer_agent, analyzer_agent_card, 10021)
        ),
        asyncio.create_task(
            run_agent_server(host_agent, host_agent_card, 10022)
        ),
    ]

    # Give servers time to start
    await asyncio.sleep(2)

    print('✅ All agent servers started!')
    print('   - Trending Agent: http://127.0.0.1:10020')
    print('   - Analyzer Agent: http://127.0.0.1:10021')
    print('   - Host Agent: http://127.0.0.1:10022')

    # Keep servers running
    try:
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        print('Shutting down servers...')


# Run in a background thread


def run_servers_in_background() -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(start_all_servers())


# Start the thread
server_thread = threading.Thread(target=run_servers_in_background, daemon=True)
server_thread.start()

# Wait for servers to be ready
time.sleep(3)

## 4. Testing the System

### Call the A2A agents (the 2 remote agents, and the host agent that refers to the 2 remote agents as sub agents)

In [None]:
class A2ASimpleClient:
    """A2A Simple to call A2A servers."""

    def __init__(self, default_timeout: float = 240.0):
        self._agent_info_cache: dict[
            str, dict[str, Any] | None
        ] = {}  # Cache for agent metadata
        self.default_timeout = default_timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        """Send a message following the official A2A SDK pattern."""
        # Configure httpx client with timeout
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )

        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            # Check if we have cached agent card data
            if (
                agent_url in self._agent_info_cache
                and self._agent_info_cache[agent_url] is not None
            ):
                agent_card_data = self._agent_info_cache[agent_url]
            else:
                # Fetch the agent card
                agent_card_response = await httpx_client.get(
                    f'{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}'
                )
                agent_card_data = self._agent_info_cache[agent_url] = (
                    agent_card_response.json()
                )

            # Create AgentCard from data
            agent_card = AgentCard(**agent_card_data)

            # Create A2A client with the agent card
            config = ClientConfig(
                httpx_client=httpx_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )

            factory = ClientFactory(config)
            client = factory.create(agent_card)

            # Create the message object
            message_obj = create_text_message_object(content=message)

            # Send the message and collect responses
            responses = []
            async for response in client.send_message(message_obj):
                responses.append(response)

            # The response is a tuple - get the first element (Task object)
            if (
                responses
                and isinstance(responses[0], tuple)
                and len(responses[0]) > 0
            ):
                task = responses[0][0]  # First element of the tuple

                # Extract text: task.artifacts[0].parts[0].root.text
                try:
                    return task.artifacts[0].parts[0].root.text
                except (AttributeError, IndexError):
                    return str(task)

            return 'No response received'

In [None]:
a2a_client = A2ASimpleClient()

In [None]:
async def test_trending_topics() -> None:
    """Test trending topics agent."""
    trending_topics = await a2a_client.create_task(
        'http://localhost:10020', "What's trending today?"
    )
    print(trending_topics)


# Run the async function
asyncio.run(test_trending_topics())

In [None]:
async def test_analysis() -> None:
    """Test analysis agent."""
    analysis = await a2a_client.create_task(
        'http://localhost:10021', 'Analyze the trend AI in Social Media'
    )
    print(analysis)


# Run the async function
asyncio.run(test_analysis())

In [None]:
async def test_host_analysis() -> None:
    """Test host analysis agent."""
    host_analysis = await a2a_client.create_task(
        'http://localhost:10022',
        'Find the most relevant trends in the web today, choose randomly one of the top '
        'trends, and give me a complete analysis of it with quantitative data',
    )
    print(host_analysis)


# Run the async function
asyncio.run(test_host_analysis())

## Summary

Congratulations! You've successfully built a multi-agent system using Google's A2A protocol. Here's what you've learned:

1. **A2A Protocol Basics**: How agents discover and communicate with each other
2. **ADK Integration**: Creating ADK agents and wrapping them for A2A
3. **Agent Orchestration**: Building a Host Agent that coordinates multiple agents
4. **Practical Implementation**: Running and testing a complete multi-agent system

### Next Steps

- **Deploy Your Agents**: Deploy agents to Cloud Run or other platforms
- **Add Authentication**: Implement security for production use
- **Create More Agents**: Build agents for your specific use cases, even using other frameworks
- **Advanced Patterns**: Explore agent chains, parallel execution, and more
- **Callbacks**: Add in the Google ADK agents the before and after callbacks of the agent, model and tool, to increase observability

Happy agent building! 🚀

# Appendix

### Why Use Google A2A (Agent-to-Agent) Protocol

Google's Agent-to-Agent (A2A) protocol is a standardized communication framework that enables AI agents to discover, communicate, and collaborate with each other using a common JSON-RPC based protocol.  
It provides a uniform way for agents to interact, regardless of their underlying implementation.  

#### 1. Standardized Communication Protocol

- A2A provides a consistent, JSON-RPC based protocol that any agent can implement
- Agents can communicate without needing to know each other's internal implementation details
- The protocol supports streaming responses for real-time interactions

#### 2. Agent Discovery and Metadata

- Agents expose their capabilities through standardized metadata (AgentCard)
- Each agent publishes its skills, input/output modes, and capabilities
- Host agents can dynamically discover what other agents can do through the `.well-known/agent-card.json` endpoint

#### 3. Orchestration and Composition

- Enables building complex multi-agent systems where a host agent can orchestrate multiple specialized agents
- Supports sequential and parallel task execution patterns
- Allows for sophisticated agent collaboration workflows

#### 4. Platform Independence

- A2A servers can wrap agents from different frameworks (not just ADK)
- Agents can be deployed as independent services on different infrastructure
- Promotes loose coupling between agents

### Differences: Using ADK Agents Directly vs. Through A2A

#### Using ADK Agents Directly

```python
# Conceptual Example: Defining Hierarchy
from google.adk.agents import LlmAgent, BaseAgent

# Define individual agents
greeter = LlmAgent(name="Greeter", model="gemini-2.5-pro")
task_doer = BaseAgent(name="TaskExecutor") # Custom non-LLM agent

# Create parent agent and assign children via sub_agents
coordinator = LlmAgent(
    name="Coordinator",
    model="gemini-2.5-pro",
    description="I coordinate greetings and tasks.",
    sub_agents=[ # Assign sub_agents here
        greeter,
        task_doer
    ]
)
```

__Use Direct ADK for Multi-Agents System When:__

- All agents are tightly related and always used together
- Google ADK is the framework choice, and simplicity is prioritized
- Performance of in-process communication is critical
- You don't need distributed deployment
- No built-in service discovery is needed

#### Using ADK Agents Through A2A

__Use A2A for Multi-Agents System When:__

- Building complex multi-agent systems
- Agents need to be developed, deployed, and scaled independently
- You want to integrate agents from different teams or frameworks
- You need dynamic agent discovery and composition
- Building a platform where agents can be added/removed dynamically
- You want to enable third-party agent integration