# A2A Agent Building Blocks

This notebook contains all the necessary building blocks to create an A2A-compliant agent.

## How to Use This Notebook

1. **Copy the code cells** you need for your agent
2. **Modify** the agent name, description, and business logic
3. **Duplicate tool cells** if you need multiple tools
4. **Combine all cells** at the end to create your complete agent

## Key Concepts (A2A Specification)

- **Messages** (§6.4): Requests sent TO agents (role: user/agent)
- **Artifacts** (§6.7): Outputs generated BY agents (artifactId + parts)
- **Parts** (§6.5): Content units - TextPart or DataPart with 'kind' discriminator
- **Tasks** (§6.1): Execution context containing history (Messages) and artifacts (outputs)

The flow: Orchestrator sends **Message** → Agent processes → Returns **Artifact**

## 1. Setup and Imports

Essential imports for any A2A agent. The base class handles all protocol compliance.

In [None]:
#!/usr/bin/env python3
"""
Your Agent Name - Brief description of what your agent does.
Fully A2A-compliant agent following specification v0.3.0.
"""

import os
import sys
import json
from pathlib import Path
from typing import List, Optional, Union, Dict, Any

# Add parent directories to path for imports (if running as script)
sys.path.insert(0, str(Path(__file__).parent.parent))

# Core A2A imports
from base import A2AAgent
from a2a.types import AgentSkill
from utils.logging import get_logger

# Optional: For LLM integration (if not using tools)
from utils.llm_utils import generate_text

# Set up logging
logger = get_logger(__name__)

## 2. Base Agent Class

Every agent must extend `A2AAgent` and implement the required methods.
The base class handles all A2A protocol details - you just implement business logic.

In [None]:
class MyAgent(A2AAgent):
    """
    Your agent implementation.
    
    The A2AAgent base class handles:
    - Task lifecycle management (§6.1)
    - Message extraction and validation (§6.4)
    - Artifact generation (§6.7)
    - Error handling and status updates
    - Protocol compliance
    
    You only need to implement:
    - Metadata methods (name, description)
    - Business logic (process_message)
    - Optional: tools, skills, system instruction
    """
    
    def __init__(self):
        """Initialize the agent."""
        super().__init__()
        # Add any agent-specific initialization here
        self.config = {
            "max_retries": 3,
            "timeout": 30
        }

## 3. Required Metadata Methods

These methods define your agent's identity and are used in the AgentCard (§5.5).

In [None]:
    # REQUIRED: Agent name for discovery
    def get_agent_name(self) -> str:
        """Return the agent's name (used in AgentCard)."""
        return "My Custom Agent"
    
    # REQUIRED: Agent description for users/orchestrators
    def get_agent_description(self) -> str:
        """Return detailed description of agent capabilities."""
        return (
            "An agent that processes data and returns insights. "
            "Supports both text and structured data input. "
            "Returns analyzed results as text or JSON."
        )

## 4. Optional Metadata Methods

Enhance your agent with version, skills, and streaming support.

In [None]:
    # OPTIONAL: Version tracking
    def get_agent_version(self) -> str:
        """Return agent version."""
        return "1.0.0"
    
    # OPTIONAL: Streaming support declaration
    def supports_streaming(self) -> bool:
        """Return True if agent supports streaming responses."""
        return False  # Set to True if you implement streaming
    
    # OPTIONAL: Declare agent capabilities/skills (§5.5.4)
    def get_agent_skills(self) -> List[AgentSkill]:
        """Declare specific capabilities for the AgentCard."""
        return [
            AgentSkill(
                id="data_analysis",
                name="Data Analysis",
                description="Analyze structured data and provide insights",
                tags=["analysis", "data", "insights"],
                examples=[
                    "Analyze this dataset",
                    "What patterns do you see?",
                    "Summarize the key findings"
                ],
                inputModes=["text/plain", "application/json"],
                outputModes=["text/plain", "application/json"]
            ),
            # Add more skills as needed
        ]

## 5. Core Processing Logic

The `process_message` method is where your agent's business logic lives.
It receives extracted message content and returns output for an Artifact.

In [None]:
    # REQUIRED: Main processing logic
    async def process_message(self, message: str) -> Union[str, Dict, List]:
        """
        Process the incoming message and return output.
        
        The base class has already:
        - Extracted message content from all Parts (§6.5)
        - Concatenated TextParts and serialized DataParts
        - Created the Task and updated status to 'working'
        
        Args:
            message: Extracted content (could be text or JSON string)
            
        Returns:
            - str: Will be wrapped in TextPart within an Artifact
            - dict/list: Will be wrapped in DataPart within an Artifact
            
        The return value is automatically wrapped in an Artifact (§6.7)
        by the base class - agents produce outputs, not conversations.
        """
        logger.info(f"Processing message of length {len(message)}")
        
        # Try to detect if message is structured data
        try:
            data = json.loads(message)
            # Process structured data
            result = await self._process_data(data)
            return result  # Return dict/list for DataPart
        except (json.JSONDecodeError, TypeError):
            # Process as text
            result = await self._process_text(message)
            return result  # Return string for TextPart
    
    # Helper methods for processing
    async def _process_text(self, text: str) -> str:
        """Process text input and return text output."""
        # Your text processing logic here
        analysis = f"Processed {len(text.split())} words"
        return f"Analysis complete: {analysis}"
    
    async def _process_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Process structured data and return structured output."""
        # Your data processing logic here
        return {
            "status": "analyzed",
            "input_keys": list(data.keys()),
            "record_count": len(data.get("records", [])),
            "summary": "Data processed successfully"
        }

## 6. Using Pydantic for Structured Outputs

When your agent needs to return structured data, Pydantic models provide validation and clear contracts. 
Each agent handles its own data formatting - no magic in base.py.

## 6. Adding LLM Integration

For agents that need LLM capabilities without tools.

In [None]:
    # OPTIONAL: Custom system instruction for LLM
    def get_system_instruction(self) -> str:
        """Provide system instruction for LLM-based processing."""
        return (
            "You are a specialized data analysis agent. "
            "Analyze the provided data and return clear, actionable insights. "
            "Be concise but thorough in your analysis."
        )
    
    # Example: Using LLM in process_message
    async def process_message_with_llm(self, message: str) -> str:
        """Alternative process_message using LLM."""
        # Use the auto-detecting LLM utility
        response = await generate_text(
            prompt=f"Analyze this: {message}",
            system_instruction=self.get_system_instruction(),
            temperature=0.7,
            max_tokens=1000
        )
        return response or "Unable to generate analysis"

## 7. Adding Tools (Google ADK FunctionTool)

Tools allow the LLM to perform specific actions. Each tool is a function that the LLM can call.
**Copy this cell multiple times** if you need multiple tools.

In [None]:
# Tool definition (create one cell per tool)
from google.adk.tools import FunctionTool

def analyze_metrics(
    data: Dict[str, Any],
    metric_type: str = "summary",
    include_trends: bool = False
) -> str:
    """
    Analyze metrics from provided data.
    
    Args:
        data: Dictionary containing metric data
        metric_type: Type of analysis (summary, detailed, comparison)
        include_trends: Whether to include trend analysis
        
    Returns:
        JSON string with analysis results
    """
    # Tool implementation
    results = {
        "type": metric_type,
        "metrics_analyzed": len(data),
        "summary": f"Analyzed {len(data)} metrics"
    }
    
    if include_trends:
        results["trends"] = "Upward trend detected"
    
    return json.dumps(results, indent=2)

# Create another tool (duplicate this pattern)
def query_database(
    query: str,
    limit: int = 10
) -> str:
    """
    Execute a database query (simulated).
    
    Args:
        query: SQL-like query string
        limit: Maximum results to return
        
    Returns:
        Query results as JSON string
    """
    # Simulated database query
    return json.dumps({
        "query": query,
        "results": [{"id": i, "value": f"row_{i}"} for i in range(min(limit, 3))],
        "count": min(limit, 3)
    })

In [None]:
    # Add tools to your agent class
    def get_tools(self) -> List[FunctionTool]:
        """
        Return tools available to this agent.
        The LLM will automatically use these tools when appropriate.
        """
        return [
            FunctionTool(analyze_metrics),
            FunctionTool(query_database),
            # Add more FunctionTool instances for each tool
        ]
    
    # When tools are provided, process_message won't be called directly
    # The base class handles tool execution via the LLM
    async def process_message(self, message: str) -> str:
        """This won't be called when tools are provided."""
        return "Handled by tool execution"

## 8. Inter-Agent Communication

Agents can call other agents using the unified `call_agent` method.

In [None]:
    async def process_with_other_agents(self, message: str) -> Dict[str, Any]:
        """
        Example of calling other agents from your agent.
        """
        # Call another agent with text (auto-wrapped in TextPart)
        analysis_result = await self.call_agent(
            "http://analyzer-agent:8000",  # Or use agent name from config
            f"Analyze this data: {message}"
        )
        
        # Call with structured data (auto-wrapped in DataPart)
        validation_result = await self.call_agent(
            "http://validator-agent:8000",
            {
                "data": message,
                "rules": ["check_format", "verify_completeness"]
            }
        )
        
        # Call with pre-formatted Message (multiple parts)
        complex_result = await self.call_agent(
            "http://processor-agent:8000",
            {
                "role": "user",
                "parts": [
                    {"kind": "text", "text": "Process this:"},
                    {"kind": "data", "data": {"input": message}}
                ]
            }
        )
        
        # Combine results (all are Artifacts with outputs)
        return {
            "analysis": analysis_result,
            "validation": validation_result,
            "processing": complex_result
        }

## 9. Error Handling

Proper error handling ensures robust agent operation.

In [None]:
    async def process_message_with_errors(self, message: str) -> Union[str, Dict]:
        """
        Example with comprehensive error handling.
        """
        try:
            # Validate input
            if not message or len(message) < 10:
                return {"error": "Input too short", "min_length": 10}
            
            # Process with timeout
            import asyncio
            try:
                result = await asyncio.wait_for(
                    self._heavy_processing(message),
                    timeout=30.0
                )
                return result
            except asyncio.TimeoutError:
                logger.error("Processing timeout")
                return {"error": "Processing timeout", "partial_result": None}
                
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error: {e}")
            return {"error": "Invalid JSON format", "details": str(e)}
            
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            # Return error as structured data
            return {
                "error": "Processing failed",
                "type": type(e).__name__,
                "message": str(e)
            }

## 10. Complete Agent Example

Here's how all the pieces come together into a complete agent file (`agent.py`):

In [None]:
#!/usr/bin/env python3
"""
Complete A2A Agent Example
Copy this entire cell to create your agent.py file.
"""

import os
import sys
import json
from pathlib import Path
from typing import List, Optional, Union, Dict, Any

sys.path.insert(0, str(Path(__file__).parent.parent))

from base import A2AAgent
from a2a.types import AgentSkill
from utils.logging import get_logger
from google.adk.tools import FunctionTool

logger = get_logger(__name__)


# Define tools (if using)
def process_data(data: Dict[str, Any], operation: str = "analyze") -> str:
    """Process data with specified operation."""
    result = {
        "operation": operation,
        "input_size": len(str(data)),
        "status": "completed"
    }
    return json.dumps(result)


class MyCompleteAgent(A2AAgent):
    """Complete A2A-compliant agent with all features."""
    
    def get_agent_name(self) -> str:
        return "Complete Example Agent"
    
    def get_agent_description(self) -> str:
        return (
            "A complete example agent demonstrating all A2A features. "
            "Processes text and data, uses tools, and can call other agents."
        )
    
    def get_agent_version(self) -> str:
        return "1.0.0"
    
    def get_system_instruction(self) -> str:
        return (
            "You are a helpful agent that processes requests accurately. "
            "Use the available tools when appropriate. "
            "Return clear, structured responses."
        )
    
    def get_tools(self) -> List[FunctionTool]:
        """Provide tools for LLM to use."""
        return [
            FunctionTool(process_data),
            # Add more tools as needed
        ]
    
    def get_agent_skills(self) -> List[AgentSkill]:
        return [
            AgentSkill(
                id="processing",
                name="Data Processing",
                description="Process and analyze various data formats",
                tags=["data", "analysis", "processing"],
                inputModes=["text/plain", "application/json"],
                outputModes=["application/json"]
            )
        ]
    
    async def process_message(self, message: str) -> Union[str, Dict, List]:
        """
        Main processing logic.
        Note: This won't be called if tools are provided.
        """
        # Since we have tools, this is just a fallback
        return "Processing handled by tools"


# Module-level instance for import
agent = MyCompleteAgent()

## 11. Main Entry Point

The `main.py` file is almost always the same - it just imports your agent and starts the server:

In [None]:
#!/usr/bin/env python3
"""
main.py - Entry point for your agent.
This file is mostly the same for all agents.
"""

import os
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent))

import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from utils.logging import get_logger, setup_logging

# Import YOUR agent here (change this line)
from agent import MyCompleteAgent

setup_logging()
logger = get_logger(__name__)

def create_app():
    """Create the A2A application."""
    # Instantiate your agent
    agent = MyCompleteAgent()
    logger.info(f"Initializing {agent.get_agent_name()} v{agent.get_agent_version()}")
    
    # Create A2A components
    agent_card = agent.create_agent_card()
    task_store = InMemoryTaskStore()
    request_handler = DefaultRequestHandler(
        agent_executor=agent,
        task_store=task_store
    )
    
    # Build Starlette app with A2A endpoints
    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler
    ).build()
    
    return app, agent

app, agent = create_app()

if __name__ == "__main__":
    port = int(os.getenv("PORT", "8000"))
    host = os.getenv("HOST", "0.0.0.0")
    
    logger.info(f"Starting {agent.get_agent_name()} on http://{host}:{port}")
    logger.info(f"Agent Card: http://localhost:{port}/.well-known/agent-card.json")
    logger.info(f"A2A Endpoint: http://localhost:{port}/a2a/v1/message/sync")
    
    uvicorn.run(app, host=host, port=port, log_level="info")

## 12. Configuration (agents.json)

Configure known agents for inter-agent communication:

In [None]:
# config/agents.json
{
    "agents": {
        "analyzer": {
            "url": "http://analyzer-agent:8000",
            "description": "Data analysis agent"
        },
        "validator": {
            "url": "http://validator-agent:8000",
            "description": "Data validation agent"
        },
        "processor": {
            "url": "http://processor-agent:8000",
            "description": "Data processing agent"
        }
    }
}

## Summary

You now have all the building blocks to create an A2A-compliant agent:

1. **Copy the cells** you need
2. **Modify** names, descriptions, and logic
3. **Add tools** by duplicating tool cells
4. **Save as `agent.py`** in your agent directory
5. **Copy the main.py template** (changing only the import)
6. **Run** with `python main.py`

Remember the key flow:
- Agents receive **Messages** (requests)
- Process them (with optional tools)
- Return **Artifacts** (outputs)

The base class handles all A2A protocol details - you just implement business logic!

In [None]:
# Define your output structure with Pydantic
from pydantic import BaseModel, Field, ValidationError
from typing import List, Dict, Any

class AnalysisResult(BaseModel):
    """Define what your agent returns - clear contract for outputs."""
    summary: str = Field(description="Brief summary of findings")
    confidence: float = Field(ge=0, le=1, description="Confidence score between 0 and 1")
    findings: List[str] = Field(description="List of key findings")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
    
    # You can add examples for documentation
    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "summary": "Analysis of medical record",
                    "confidence": 0.95,
                    "findings": ["Patient shows improvement", "No adverse reactions"],
                    "metadata": {"processed_at": "2024-01-01", "version": "1.0"}
                }
            ]
        }
    }

### Approach 1: Manual Creation with Validation

When you know exactly what data you're returning, create the Pydantic model directly:

In [None]:
    async def process_message(self, message: str) -> Dict[str, Any]:
        """Process and return structured data manually."""
        
        # Do your actual processing
        word_count = len(message.split())
        has_questions = '?' in message
        
        # Create Pydantic model for validation
        # This ensures your data is correctly structured
        result = AnalysisResult(
            summary=f"Analyzed text with {word_count} words",
            confidence=0.95 if word_count > 10 else 0.7,
            findings=[
                f"Word count: {word_count}",
                f"Contains questions: {has_questions}",
                "Analysis complete"
            ],
            metadata={
                "agent": self.get_agent_name(),
                "timestamp": time.time()
            }
        )
        
        # IMPORTANT: Return as dict for DataPart (A2A requirement)
        # DataPart.data must be a dictionary, not a Pydantic model
        return result.model_dump()

### Approach 2: LLM-Generated with Validation

When using an LLM to generate structured output, provide the schema and validate the response:

In [None]:
    async def process_message_with_llm(self, message: str) -> Dict[str, Any]:
        """Use LLM to generate structured output matching Pydantic schema."""
        
        # Get the JSON schema from Pydantic model
        schema_json = AnalysisResult.model_json_schema()
        
        # Create prompt with schema
        prompt = f"""
        Analyze the following text and return a JSON response that exactly matches this schema:
        
        Schema:
        {json.dumps(schema_json, indent=2)}
        
        Text to analyze:
        {message}
        
        Return ONLY valid JSON, no additional text.
        """
        
        # Use Google ADK via llm_utils (stays within A2A framework)
        from utils.llm_utils import generate_text
        
        llm_response = await generate_text(
            prompt=prompt,
            system_instruction="You are a JSON generator. Return only valid JSON matching the provided schema.",
            temperature=0.1,  # Low temperature for consistent structure
            max_tokens=500
        )
        
        # Validate LLM output with Pydantic
        try:
            # Parse and validate the JSON
            result = AnalysisResult.model_validate_json(llm_response)
            
            # Return as dict for DataPart
            return result.model_dump()
            
        except ValidationError as e:
            # Handle invalid LLM output
            logger.error(f"LLM generated invalid JSON: {e}")
            
            # Return error as structured data
            return {
                "error": "LLM generated invalid structure",
                "validation_errors": e.errors(),
                "raw_response": llm_response[:200]  # First 200 chars for debugging
            }
        except json.JSONDecodeError as e:
            # Handle non-JSON response
            logger.error(f"LLM didn't return JSON: {e}")
            return {
                "error": "LLM response was not valid JSON",
                "details": str(e),
                "raw_response": llm_response[:200]
            }

### Why We Use `.model_dump()`

The A2A specification requires that DataPart contains a dictionary (§6.5.3):

```typescript
// From A2A Specification
export interface DataPart extends PartBase {
    readonly kind: "data";
    data: { [key: string]: any };  // <-- Must be a dictionary/object
}
```

Since:
- Pydantic models are Python class instances, not dictionaries
- DataPart.data requires a dict type
- `.model_dump()` converts the Pydantic model to a dict

This is explicit and clear - each agent handles its own data formatting, no hidden magic!

## 13. Common LLM Integration Pitfalls 🚨

Here are critical issues we discovered during debugging that can save you hours:

### F-String Formatting with JSON Examples

**Problem:** Using f-strings with JSON examples causes "Invalid format specifier" errors.

```python
# ❌ THIS WILL FAIL - f-string sees { and } as format specifiers
prompt = f"""
Generate JSON like this:
{
    "patterns": ["\\d+"],  # <-- Error: Invalid format specifier
    "data": {"key": "value"}
}
"""

# ✅ CORRECT - Escape braces with {{ and }}
prompt = f"""
Generate JSON like this:
{{
    "patterns": ["\\\\d+"],
    "data": {{"key": "value"}}
}}
Document preview: {document_preview}  # <-- f-string substitution still works
"""
```

### JSON Regex Escaping

**Problem:** JSON requires double escaping for regex patterns.

```python
# ❌ WRONG - Single backslash breaks JSON parsing
prompt = f"""Return this JSON:
{{
    "patterns": ["\d+", "\w+"]  # <-- JSON parsing error
}}"""

# ✅ CORRECT - Use double backslash for JSON
prompt = f"""Return this JSON:
{{
    "patterns": ["\\\\d+", "\\\\w+"]  # <-- Proper JSON escaping
}}"""

# In the actual regex, this becomes \d+ and \w+ as expected
```

### Token Limits and Truncation

**Problem:** LLM responses get truncated, causing invalid JSON.

```python
# ❌ RISKY - May truncate mid-JSON
response = await generate_text(
    prompt=prompt,
    max_tokens=500  # <-- Too small for complex JSON
)

# ✅ SAFE - Ensure enough tokens for complete response
response = await generate_text(
    prompt=prompt,
    max_tokens=3000,  # <-- Generous limit
    temperature=0.3   # <-- Lower temp for consistent structure
)

# Always check for truncation
if response and response.endswith('...'):
    logger.warning("Response may be truncated")
```

### Artifact Extraction Pattern

**Problem:** Agent responses are wrapped in artifact structure, not raw data.

```python
# ❌ WRONG - Treating response as direct data
keyword_response = await self.call_agent("keyword", message)
patterns = keyword_response["patterns"]  # <-- KeyError!

# ✅ CORRECT - Extract from artifact first
def _extract_from_artifact(self, response):
    """Helper to extract data from artifact structure."""
    if isinstance(response, dict):
        if "parts" in response:
            for part in response["parts"]:
                if part.get("kind") == "data":
                    return part.get("data")
                elif part.get("kind") == "text":
                    return part.get("text")
    return response

# Use the helper
keyword_response = await self.call_agent("keyword", message)
data = self._extract_from_artifact(keyword_response)
patterns = data.get("patterns", [])
```

### Clean JSON Extraction from Code Blocks

**Problem:** LLM often returns JSON wrapped in markdown code blocks.

```python
# LLM might return:
# ```json
# {"key": "value"}
# ```

# ✅ Handle code block extraction
if llm_response and '```json' in llm_response:
    start = llm_response.find('```json') + 7
    end = llm_response.find('```', start)
    if end > start:
        cleaned_response = llm_response[start:end]
    else:
        cleaned_response = llm_response
else:
    cleaned_response = llm_response

# Then parse the cleaned JSON
data = json.loads(cleaned_response)
```

### No Fallback Patterns

**Problem:** Hardcoded fallback patterns mask LLM failures.

```python
# ❌ WRONG - Hides LLM problems
try:
    patterns = extract_patterns_from_llm(response)
except:
    patterns = ["(?i)patient", "\\d+"]  # <-- NO! This hides issues

# ✅ CORRECT - Return empty on failure, log the error
try:
    patterns = extract_patterns_from_llm(response)
except Exception as e:
    logger.error(f"LLM pattern generation failed: {e}")
    patterns = []  # <-- Empty is honest, shows something went wrong
```

### Testing Without Real Orchestrator

**Problem:** Direct agent calls don't trigger tool execution.

```python
# ❌ WRONG - Bypasses tools
agent = SimpleOrchestratorAgent()
result = await agent.process_message("analyze this")  # <-- Tools not invoked!

# ✅ CORRECT - Call through execute_pipeline for orchestrators
agent = SimpleOrchestratorAgent()
result = await agent.execute_pipeline("analyze this")  # <-- Proper execution

# OR override process_message to call execute_pipeline
async def process_message(self, message: str) -> str:
    return await self.execute_pipeline(message)
```