# Processing Invoices Using A Multi-Agent Solution


In [None]:
import os
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    MessageSendParams,
    SendMessageRequest,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from a2a.client import A2AClient

from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool.mcp_toolset import (
    MCPToolset,
    StdioServerParameters,
    StdioConnectionParams,
)
from dotenv import load_dotenv
import asyncio
import threading
import time
import json
from typing import Any
import uuid
import httpx
import nest_asyncio
import uvicorn


load_dotenv(override=True)

In [None]:
MODEL_NAME = "gemini-2.5-pro"

FILES_AGENT_PORT = 10021
EXTRACTION_AGENT_PORT = 10022
ORCHESTRATOR_AGENT_PORT = 10023

BOX_FOLDER_ID = os.getenv("BOX_FOLDER_ID")
BOX_MCP_SERVER_PATH = "/Users/svpino/dev/mcp-server-box"

# Apply nest_asyncio to allow nested event loops in Jupyter
nest_asyncio.apply()

## Configuring access to Box's MCP Server

In [None]:
connection_params = StdioConnectionParams(
    server_params=StdioServerParameters(
        command="uv",
        args=[
            "--directory",
            BOX_MCP_SERVER_PATH,
            "run",
            "src/mcp_server_box.py",
        ],
        timeout=60,
    )
)

## Creating the Agents

We'll create three agents:

1. Files Agent - Returns every file inside a specified folder in Box.
2. Extraction Agent - Extrats structure data from a specific document stored in Box.
3. Orchestrator Agent - Orchestrates the work of the other agents.

### Agent 1 - Files Agent

This agent will return every file inside a specified folder in Box.

In [None]:
files_agent = Agent(
    model=MODEL_NAME,
    name="files_agent",
    instruction="""
    Your job is to list the ID of every file inside the folder with the given ID.
    """,
    tools=[
        MCPToolset(
            connection_params=connection_params,
            tool_filter=["box_list_folder_content_by_folder_id"],
        )
    ],
)

Let's now create the Agent Card.

In [None]:
files_agent_card = AgentCard(
    name="Files Agent",
    url=f"http://localhost:{FILES_AGENT_PORT}",
    description="List files in a folder given its folder ID.",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    skills=[
        AgentSkill(
            id="list_files_in_folder",
            name="List existing files in a folder",
            description="List files in a folder given its folder ID.",
            tags=["files", "folder", "find", "list"],
            examples=["List files in folder 12345"],
        )
    ],
)

Let's now create the client to communicate with the A2A Server.

In [None]:
remote_files_agent = RemoteA2aAgent(
    name="list_files_in_folder",
    description="List files in a folder given the folder ID.",
    agent_card=f"http://localhost:{FILES_AGENT_PORT}{AGENT_CARD_WELL_KNOWN_PATH}",
)

### Agent 2 - Extraction Agent

This agent will extract invoice data from a given document.

In [None]:
extraction_agent = Agent(
    model=MODEL_NAME,
    name="data_extraction_agent",
    instruction="""
    Your job is to extract invoice data from a file given its ID.

    You will receive the ID of a file. You will extract the following fields from that file:
    client_name, invoice_amount, product_name.

    You MUST return your response in the following JSON format:
    {
        "ID": "<file_id>",
        "client_name": "<name_of_the_client>",
        "invoice_amount": <amount>,
        "product_name": "<name_of_the_product>"
    }

    Only return the JSON object, no additional text, and no markdown formatting.
    """,
    tools=[
        MCPToolset(
            connection_params=connection_params,
            tool_filter=["box_ai_extract_tool"],
        )
    ],
)

Let's now create the Agent Card.

In [None]:
extraction_agent_card = AgentCard(
    name="Invoice Data Extraction Agent",
    url=f"http://localhost:{EXTRACTION_AGENT_PORT}",
    description="Extract invoice data from a given file",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain", "application/json"],
    default_output_modes=["text/plain", "application/json"],
    skills=[
        AgentSkill(
            id="extract_invoice_data_from_file",
            name="Extract Invoice Data From File",
            description="Extracts invoice data from a specific file",
            tags=["extract", "data"],
            examples=[
                "Extract invoice data from file ID 12345",
                '{"file_id": "67890"}',
            ],
        )
    ],
)

Let's now create the client to communicate with the A2A Server.

In [None]:
remote_extraction_agent = RemoteA2aAgent(
    name="extract_invoice_data_from_file",
    description="Extracts invoice data from a specific file",
    agent_card=f"http://localhost:{EXTRACTION_AGENT_PORT}{AGENT_CARD_WELL_KNOWN_PATH}",
)

### Agent 3 - Orchestrator Agent

This agent will coordinate between the other agents to process every invoice in Box and extract their data.

In [None]:
orchestrator_agent = Agent(
    model=MODEL_NAME,
    name="orchestrator_agent",
    instruction="""
You are an expert AI Orchestrator.

Your primary responsibility is to interpret user requests, plan the necessary
sequence of actions if multiple steps are involved, and delegate them to the
most appropriate specialized remote agents.

You do not perform the tasks yourself but manage their assignment, sequence, 
and can monitor their status.

* Always prioritize selecting the correct agent(s) based on their documented purpose.
* Ensure all information required by the chosen remote agent is included in the
  call, including outputs from previous agents if it's a sequential task.
* Focus on the most recent parts of the conversation for immediate context, 
  but maintain awareness of the overall goal, especially for multi-step requests.
""",
    sub_agents=[remote_files_agent, remote_extraction_agent],
)

Let's now create the Agent Card.

In [None]:
orchestrator_agent_card = AgentCard(
    name="Orchestrator Agent",
    url=f"http://localhost:{ORCHESTRATOR_AGENT_PORT}",
    description="Orchestrates listing files in folders and extracting invoice data from them",
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain", "application/json"],
    skills=[
        AgentSkill(
            id="folder_data_extraction",
            name="Folder Data Extraction",
            description="Extracts invoice data from files in a specific folder",
            tags=["files", "extract", "orchestration"],
            examples=[
                "Extract invoice data from files in folder 123",
                "Process folder 123 and extract data from all files",
            ],
        )
    ],
)

## Running the agents

Let's start by creating a function to run an agent as an A2A server.

In [None]:
def create_agent_a2a_server(agent, agent_card):
    """Create an A2A server for the supplied agent."""
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=config)

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)

Let's now define a few functions to run each agent as an A2A Server in the background.

In [None]:
servers = []


async def run_server_notebook(create_agent_function, port):
    """Run server with proper error handling."""
    try:
        print(f"\nStarting agent on port {port}...")
        app = create_agent_function()
        config = uvicorn.Config(
            app.build(), host="127.0.0.1", port=port, log_level="error", loop="asyncio"
        )
        server = uvicorn.Server(config)
        servers.append(server)
        await server.serve()
    except Exception as e:
        print(f"Error running agent server: {e}")


def run_agent_in_background(create_agent_function, port, name):
    """Run an agent server in a background thread."""

    def run():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(run_server_notebook(create_agent_function, port))
        except Exception as e:
            print(f"{name} error: {e}")

    thread = threading.Thread(target=run, daemon=True)
    thread.start()
    return thread

Let's now start the agent servers.

In [None]:
files_thread = run_agent_in_background(
    lambda: create_agent_a2a_server(files_agent, files_agent_card),
    FILES_AGENT_PORT,
    "Files Agent",
)

extraction_thread = run_agent_in_background(
    lambda: create_agent_a2a_server(extraction_agent, extraction_agent_card),
    EXTRACTION_AGENT_PORT,
    "Invoice Data Extraction Agent",
)

orchestrator_thread = run_agent_in_background(
    lambda: create_agent_a2a_server(orchestrator_agent, orchestrator_agent_card),
    ORCHESTRATOR_AGENT_PORT,
    "Orchestrator Agent",
)

Let's wait for the servers to start and check if every agent is running.

In [None]:
time.sleep(3)

if (
    files_thread.is_alive()
    and extraction_thread.is_alive()
    and orchestrator_thread.is_alive()
):
    print("Agent servers are running.")
else:
    print("Agent servers failed to start.")

## Running the Multi-Agent system

Let's create a simple client class to access the A2A system.

In [None]:
class A2ASimpleClient:
    def __init__(self, default_timeout: float = 3600.0):
        self._agent_info_cache: dict[
            str, dict[str, Any] | None
        ] = {}  # Cache for agent metadata
        self.default_timeout = default_timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        """Send a message following the official A2A SDK pattern."""
        # Configure httpx client with timeout
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )

        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            # Check if we have cached agent card data
            if (
                agent_url in self._agent_info_cache
                and self._agent_info_cache[agent_url] is not None
            ):
                agent_card_data = self._agent_info_cache[agent_url]
            else:
                # Fetch the agent card
                agent_card_response = await httpx_client.get(
                    f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}"
                )
                agent_card_data = self._agent_info_cache[agent_url] = (
                    agent_card_response.json()
                )

            # Create AgentCard from data
            agent_card = AgentCard(**agent_card_data)

            # Create A2A client with the agent card
            client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

            # Build the message parameters following official structure
            send_message_payload = {
                "message": {
                    "role": "user",
                    "parts": [{"kind": "text", "text": message}],
                    "messageId": uuid.uuid4().hex,
                }
            }

            # Create the request
            request = SendMessageRequest(
                id=str(uuid.uuid4()), params=MessageSendParams(**send_message_payload)
            )

            # Send the message with timeout configuration
            response = await client.send_message(request)

            # Extract text from response
            try:
                response_dict = response.model_dump(mode="json", exclude_none=True)

                if "result" in response_dict and "artifacts" in response_dict["result"]:
                    artifacts = response_dict["result"]["artifacts"]
                    for artifact in artifacts:
                        if "parts" in artifact:
                            for part in artifact["parts"]:
                                if "text" in part:
                                    return part["text"]

                # If we couldn't extract text, return the full response as formatted JSON
                return json.dumps(response_dict, indent=2)

            except Exception as e:
                print(f"Error parsing response: {e}")
                return str(response)


a2a_client = A2ASimpleClient()

Let's now ask the orchestrator agent to list every file in a folder

In [None]:
report = await a2a_client.create_task(
    f"http://localhost:{ORCHESTRATOR_AGENT_PORT}",
    f"List files in folder {BOX_FOLDER_ID}.",
)

print(report)

We can also ask the orchestrator to extract data from a specific file

In [None]:
report = await a2a_client.create_task(
    f"http://localhost:{ORCHESTRATOR_AGENT_PORT}",
    "Extract data from file 1920593027952",
)

print(report)