# Agent2Agent (A2A) Protocol: Build Interoperable Agents

In this notebook, you'll learn the Agent2Agent (A2A) protocol basics and build two simple interoperable agents that can exchange structured messages. We'll:

- Understand the motivation behind A2A and a minimal message schema
- Configure an LLM model (OpenAI, Gemini, or Ollama) and a base `Agent` that speaks A2A
- Implement two example agents (Researcher and Writer) and a simple mediator
- Run a multi-turn exchange end-to-end
- Practice with an exercise and a bonus tool-use pattern

Prerequisites: 
- For OpenAI: environment variable `OPENAI_API_KEY` set with a valid key
- For Gemini: environment variable `GOOGLE_API_KEY` set with a valid key
- For Ollama: Ollama server running with the model installed
- Set `USE_GEMINI=1` or `USE_OLLAMA=1` in your `.env` file to use those backends (defaults to OpenAI)


In [None]:
# Compatibility fix for langchain_google_genai with newer langchain_core
# Run this cell FIRST before importing langchain_google_genai
import sys
import pydantic
import warnings

# Create a compatibility shim for pydantic_v1
# This provides Pydantic v1 compatibility for langchain_google_genai
class PydanticV1Compat:
    """Compatibility shim for pydantic_v1 imports"""
    def __getattr__(self, name):
        # Handle root_validator specially for Pydantic v2 compatibility
        if name == 'root_validator':
            # Return a decorator that works with Pydantic v2
            def root_validator(*args, **kwargs):
                # Suppress warnings for deprecated root_validator
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    # Try to use model_validator if available (Pydantic v2)
                    if hasattr(pydantic, 'model_validator'):
                        return pydantic.model_validator(mode='before', *args, **kwargs)
                    # Fallback to field_validator or other v2 validators
                    return lambda f: f
            return root_validator
        return getattr(pydantic, name)

# Patch langchain_core.pydantic_v1 if it doesn't exist
try:
    import langchain_core
    if not hasattr(langchain_core, 'pydantic_v1'):
        langchain_core.pydantic_v1 = PydanticV1Compat()
        sys.modules['langchain_core.pydantic_v1'] = langchain_core.pydantic_v1
        print("✅ Compatibility fix applied for langchain_google_genai")
except Exception as e:
    print(f"⚠️ Warning: Could not apply compatibility fix: {e}")

import os

# Try to load from .env file if available (optional)
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass  # dotenv not installed, that's okay

# Determine which backend to use
USE_OLLAMA = os.environ.get("USE_OLLAMA", "").lower() in ("1", "true", "yes")
USE_GEMINI = os.environ.get("USE_GEMINI", "").lower() in ("1", "true", "yes")

if USE_OLLAMA:
    print("⚙️ Will use Ollama backend")
    assert os.getenv("OLLAMA_BASE_URL") or True, "⚠️ OLLAMA_BASE_URL optional (defaults to http://localhost:11434)"
elif USE_GEMINI:
    print("⚙️ Will use Gemini backend")
    assert os.getenv("GOOGLE_API_KEY"), "⚠️ Please set GOOGLE_API_KEY in your environment!"
else:
    print("⚙️ Will use OpenAI backend (default)")
    assert os.getenv("OPENAI_API_KEY"), "⚠️ Please set OPENAI_API_KEY in your environment!"

print("✅ Backend configuration OK")

In [None]:
from typing import List, Literal, Optional, Dict, Any, Union

# Initialize the appropriate client based on backend selection
if USE_OLLAMA:
    # Use Ollama
    from langchain_community.chat_models import ChatOllama
    from langchain_core.language_models.chat_models import BaseChatModel
    llm_backend: BaseChatModel = ChatOllama(model="mistral", temperature=0.2)
    client = None  # Ollama doesn't use OpenAI client
    print("⚙️ Using Ollama backend (mistral)")
elif USE_GEMINI:
    # Use Gemini
    import google.generativeai as genai
    genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
    client = genai  # Store genai module as client
    llm_backend = None
    print("⚙️ Using Gemini backend (gemini-2.5-flash)")
else:
    # Use OpenAI (default)
    from openai import OpenAI
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    llm_backend = None
    print("⚙️ Using OpenAI backend (gpt-4o-mini)")

print("✅ Client initialized")

## Minimal A2A: Message Schema

We'll use a compact message format inspired by A2A ideas:

- `role`: "system" | "user" | "assistant" | "tool" | "agent"
- `name`: optional identifier of the speaker agent
- `content`: natural language content
- `actions`: optional list of proposed actions (e.g., tool calls)
- `metadata`: optional dict (e.g., routing hints)

Agents exchange lists of these messages. A mediator (router) decides who speaks next.


In [None]:
from pydantic import BaseModel, Field

class A2AMessage(BaseModel):
    role: Literal["system", "user", "assistant", "tool", "agent"]
    content: str
    name: Optional[str] = None
    actions: Optional[List[Dict[str, Any]]] = None
    metadata: Optional[Dict[str, Any]] = None

class AgentConfig(BaseModel):
    name: str
    system_prompt: str = ""
    model: str = "gpt-4o-mini"  # small, fast model suitable for classroom (or gemini-2.5-flash for Gemini)
    temperature: float = 0.2

class AgentResponse(BaseModel):
    message: A2AMessage
    stop: bool = False  # allow agent to signal completion

class Agent:
    def __init__(self, config: AgentConfig, client: Union[Any, None] = None, llm_backend: Union[Any, None] = None):
        self.config = config
        self.client = client
        self.llm_backend = llm_backend
        # Determine backend type
        self.use_ollama = USE_OLLAMA
        self.use_gemini = USE_GEMINI
        self.use_openai = not USE_OLLAMA and not USE_GEMINI

    def build_prompt(self, history: List[A2AMessage]) -> List[Dict[str, str]]:
        messages: List[Dict[str, str]] = []
        if self.config.system_prompt:
            messages.append({"role": "system", "content": self.config.system_prompt})
        for m in history:
            # map A2A roles to chat roles where reasonable
            role = m.role if m.role in {"system", "user", "assistant"} else "user"
            name = f"{m.name}: " if m.name else ""
            content = name + m.content
            messages.append({"role": role, "content": content})
        return messages

    def step(self, history: List[A2AMessage]) -> AgentResponse:
        messages = self.build_prompt(history)
        
        if self.use_ollama:
            # Use LangChain ChatOllama
            from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
            langchain_messages = []
            for msg in messages:
                if msg["role"] == "system":
                    langchain_messages.append(SystemMessage(content=msg["content"]))
                elif msg["role"] == "user":
                    langchain_messages.append(HumanMessage(content=msg["content"]))
                elif msg["role"] == "assistant":
                    langchain_messages.append(AIMessage(content=msg["content"]))
            
            response_obj = self.llm_backend.invoke(langchain_messages)
            content = response_obj.content if hasattr(response_obj, 'content') else str(response_obj)
            
        elif self.use_gemini:
            # Use Google Gemini
            import google.generativeai as genai
            # Convert messages to Gemini format
            # Gemini uses a different format: list of dicts with "role" and "parts"
            gemini_history = []
            system_prompt_text = ""
            last_user_message = ""
            first_user_processed = False
            
            # Process messages: collect system prompt and build history
            i = 0
            while i < len(messages):
                msg = messages[i]
                if msg["role"] == "system":
                    system_prompt_text = msg["content"]
                    i += 1
                elif msg["role"] == "user":
                    user_content = msg["content"]
                    # If this is the first user message and we have a system prompt, combine them
                    if system_prompt_text and not first_user_processed:
                        user_content = f"{system_prompt_text}\n\n{user_content}"
                        system_prompt_text = ""  # Clear it since we've used it
                        first_user_processed = True
                    
                    # Check if next message is assistant (to build history)
                    if i + 1 < len(messages) and messages[i + 1]["role"] == "assistant":
                        # Add user message and assistant response to history
                        gemini_history.append({"role": "user", "parts": [user_content]})
                        gemini_history.append({"role": "model", "parts": [messages[i + 1]["content"]]})
                        i += 2  # Skip both user and assistant
                    else:
                        # This is the current prompt (last user message)
                        last_user_message = user_content
                        i += 1
                elif msg["role"] == "assistant":
                    # This should have been handled with the previous user message
                    i += 1
                else:
                    i += 1
            
            # Use the configured model
            model_name = self.config.model if self.config.model != "gpt-4o-mini" else "gemini-2.5-flash"
            model = genai.GenerativeModel(
                model_name=model_name,
                generation_config={"temperature": self.config.temperature}
            )
            
            # Use last user message as current prompt (system prompt already combined if needed)
            current_prompt = last_user_message if last_user_message else system_prompt_text
            
            # Start a chat if we have history, otherwise generate directly
            if gemini_history:
                chat = model.start_chat(history=gemini_history)
                response = chat.send_message(current_prompt)
            else:
                response = model.generate_content(current_prompt)
            
            content = response.text if hasattr(response, 'text') else str(response)
            
        else:
            # Use OpenAI (default)
            completion = self.client.chat.completions.create(
                model=self.config.model,
                temperature=self.config.temperature,
                messages=[{"role": m["role"], "content": m["content"]} for m in messages],
            )
            content = completion.choices[0].message.content or ""
        
        response = AgentResponse(
            message=A2AMessage(role="agent", name=self.config.name, content=content)
        )
        return response

In [None]:
# Define two specialized agents that speak through the same A2A interface
# Update model name for Gemini if using Gemini backend
model_name = "gemini-2.5-flash" if USE_GEMINI else ("mistral" if USE_OLLAMA else "gpt-4o-mini")

researcher = Agent(
    AgentConfig(
        name="Researcher",
        system_prompt=(
            "You are a helpful research assistant. "
            "Summarize facts concisely, cite sources if possible. "
            "When uncertain, ask clarifying questions."
        ),
        model=model_name,
    ),
    client=client,
    llm_backend=llm_backend,
)

writer = Agent(
    AgentConfig(
        name="Writer",
        system_prompt=(
            "You are a clear technical writer. "
            "Transform research notes into a polished, short paragraph."
        ),
        model=model_name,
    ),
    client=client,
    llm_backend=llm_backend,
)

In [None]:
from typing import Callable
from dataclasses import dataclass

@dataclass
class Route:
    # Simple routing rule: after 'Researcher' speaks, 'Writer' responds; then stop
    next_map: Dict[str, Optional[str]]

    def next_speaker(self, current_name: str) -> Optional[str]:
        return self.next_map.get(current_name)

route = Route(next_map={
    "Researcher": "Writer",
    "Writer": None,  # stop after writer responds
})

history: List[A2AMessage] = [
    A2AMessage(role="user", name="Instructor", content="Explain what Agent2Agent (A2A) is, briefly."),
]

# First, the Researcher responds
resp_r = researcher.step(history)
history.append(resp_r.message)
print(f"{resp_r.message.name}:\n{resp_r.message.content}\n\n")

# Then route to Writer, who polishes the response
next_name = route.next_speaker("Researcher")
if next_name == "Writer":
    resp_w = writer.step(history)
    history.append(resp_w.message)
    print(f"{resp_w.message.name}:\n{resp_w.message.content}")

## Exercise: Add a Fact-Checker Agent

- Create a third agent `FactChecker` whose role is to critique the Researcher output and ask for clarifications if needed.
- Update the routing so that the turn order is: Researcher -> FactChecker -> Writer.
- Keep the same `A2AMessage` history so all agents see the conversation context.

Hint: copy the `Agent` instantiation pattern used for `researcher` and `writer` with a system prompt like: "You verify factual claims, request sources, and highlight ambiguities."


## Bonus: Simple Tool-Use via A2A Actions

We can simulate tool-use by allowing an agent to propose an action in `actions`, and a separate tool-runner to execute it. Below is a toy calculator tool the Researcher can call.

Run once to define tool and an agent that may propose actions.


In [None]:
from math import sqrt

TOOLS: Dict[str, Callable[..., Any]] = {}

def tool(name: str):
    def decorator(fn: Callable[..., Any]):
        TOOLS[name] = fn
        return fn
    return decorator

@tool("calculator")
def calculator(expr: str) -> str:
    try:
        # Danger: eval — keep to math-only by removing builtins
        result = eval(expr, {"__builtins__": {}}, {"sqrt": sqrt})
        return str(result)
    except Exception as e:
        return f"ERROR: {e}"

class ToolAwareAgent(Agent):
    def step(self, history: List[A2AMessage]) -> AgentResponse:
        # Let the LLM propose an action via a protocol hint
        prompt_hint = (
            "If you need to compute something, propose an action in JSON as: "
            "ACTION: {\"tool\": \"calculator\", \"input\": \"...\"}. "
            "Otherwise, answer normally."
        )
        extended = history + [A2AMessage(role="system", content=prompt_hint)]
        resp = super().step(extended)

        # crude parse for ACTION: {...}
        import re, json
        match = re.search(r"ACTION:\s*(\{.*\})", resp.message.content, re.DOTALL)
        if match:
            try:
                action = json.loads(match.group(1))
                tool_name = action.get("tool")
                tool_input = action.get("input", "")
                if tool_name in TOOLS:
                    tool_result = TOOLS[tool_name](tool_input)
                    tool_msg = A2AMessage(role="tool", name=tool_name, content=str(tool_result))
                    # Have the agent incorporate the tool result
                    follow_up = super().step(history + [resp.message, tool_msg])
                    return follow_up
            except Exception:
                pass
        return resp

calc_researcher = ToolAwareAgent(
    AgentConfig(
        name="Researcher",
        system_prompt=(
            "You are a research assistant who can optionally use a calculator tool. "
            "When asked to compute, propose an ACTION with a simple expression e.g. '2+2' or 'sqrt(9)'."
        ),
        model=model_name,
    ),
    client=client,
    llm_backend=llm_backend,
)

In [None]:
# Ask the tool-aware researcher a question that requires computation
history_calc: List[A2AMessage] = [
    A2AMessage(role="user", name="Instructor", content="What is sqrt(144) + 10? Answer with reasoning and final number."),
]
resp_calc = calc_researcher.step(history_calc)
print(f"{resp_calc.message.name}:\n{resp_calc.message.content}")

In [None]:
# Instantiate FactChecker and run Researcher -> FactChecker -> Writer
fact_checker = Agent(
    AgentConfig(
        name="FactChecker",
        system_prompt=(
            "You verify factual claims, request sources, and highlight ambiguities. "
            "Be concise and constructive; suggest corrections or needed citations."
        ),
        model=model_name,
    ),
    client=client,
    llm_backend=llm_backend,
)

# New route order
route_fc = {
    "Researcher": "FactChecker",
    "FactChecker": "Writer",
    "Writer": None,
}

def run_with_fact_checker(user_prompt: str) -> None:
    convo: List[A2AMessage] = [
        A2AMessage(role="user", name="Instructor", content=user_prompt)
    ]

    # Researcher turn
    resp_r = researcher.step(convo)
    convo.append(resp_r.message)
    print(f"{resp_r.message.name}:\n{resp_r.message.content}\n\n")

    # FactChecker turn
    if route_fc["Researcher"] == "FactChecker":
        resp_f = fact_checker.step(convo)
        convo.append(resp_f.message)
        print(f"{resp_f.message.name}:\n{resp_f.message.content}\n\n")

    # Writer turn
    if route_fc["FactChecker"] == "Writer":
        resp_w = writer.step(convo)
        convo.append(resp_w.message)
        print(f"{resp_w.message.name}:\n{resp_w.message.content}")

# Example run
run_with_fact_checker("Explain what Agent2Agent (A2A) is, briefly.")