# Lesson 06 — Wrapping Agents as A2A Servers

Transform the multi-skill agent into a fully **A2A-compliant server** that
demonstrates the core protocol capabilities.

## What You'll Learn

- Implement the `AgentExecutor` interface from the A2A Python SDK
- Handle **multi-turn conversations** with `TaskState.input_required`
- Return **Artifacts** with `TextPart` and `DataPart` (structured JSON)
- Support **task cancellation** via `cancel()`
- Define a **rich Agent Card** with multiple skills and capabilities
- Emit **streaming events** (`TaskStatusUpdateEvent`, `TaskArtifactUpdateEvent`)
- Wire up `DefaultRequestHandler` + `A2AStarletteApplication`

## A2A Protocol Features Covered

| Feature           | Protocol Reference         | Implementation                   |
| ----------------- | -------------------------- | -------------------------------- |
| Multi-Turn        | `TaskState.input_required` | Claims filing skill              |
| Artifacts         | `TaskArtifactUpdateEvent`  | Claim receipts, policy summaries |
| Structured Data   | `DataPart` in artifacts    | JSON claim receipts              |
| Text Parts        | `TextPart` in messages     | Q&A answers                      |
| Multiple Skills   | Agent Card `skills[]`      | 3 skills declared                |
| Streaming Events  | `TaskStatusUpdateEvent`    | Working → output → completed     |
| Task Cancellation | `cancel()` method          | Claims session cleanup           |
| Task Lifecycle    | TaskState transitions      | Full state machine               |

## Prerequisites

- Lesson 05 completed (agent classes)
- `GITHUB_TOKEN` in `.env` or AI Toolkit LocalFoundry running

> **A2A SDK docs:** [pypi.org/project/a2a-sdk](https://pypi.org/project/a2a-sdk/)


## A2A Server Architecture

Understanding how the components fit together before you write any code:

```mermaid
graph TB
    Client["A2A Client\n(Lesson 07)"]

    subgraph A2AServer["A2A Server Stack (port 10001)"]
        direction TB
        App["A2AStarletteApplication\n(ASGI app - Starlette)"]
        Handler["DefaultRequestHandler\n(routes JSON-RPC methods)"]
        Store["InMemoryTaskStore\n(persists task state for multi-turn)"]
        Executor["InsuranceAgentExecutor\n(YOUR code — implements AgentExecutor)"]
        Queue["EventQueue\n(bridge: executor → handler → client)"]

        App --> Handler
        Handler --> Store
        Handler --> Executor
        Executor --> Queue
    end

    subgraph AgentLayer["Agent Layer (Lesson 05)"]
        MS["MultiSkillAgent"]
        QA["QAAgent"]
        CA["ClaimsAgent"]
        PS["PolicySummaryAgent"]
        MS --> QA
        MS --> CA
        MS --> PS
    end

    Client -->|JSON-RPC over HTTP| App
    Executor --> MS

    style A2AServer fill:#fef9e7,stroke:#f39c12
    style AgentLayer fill:#e8f4f8,stroke:#4a90d9
```

**The key abstraction is `AgentExecutor`:**
You implement just two methods — `execute()` and `cancel()`. The SDK handles all the
JSON-RPC routing, task state management, SSE streaming, and error formatting.

**`InMemoryTaskStore` is critical for multi-turn:**
It persists the `Task` object between HTTP requests, so when the client sends a
follow-up message with `taskId`, the handler can look up the existing task and
resume it.


## Step 1 — Install the A2A SDK


In [1]:
%pip install "a2a-sdk[http-server]" openai python-dotenv --quiet
print("Dependencies ready.")

Note: you may need to restart the kernel to use updated packages.
Dependencies ready.



[notice] A new release of pip is available: 24.0 -> 26.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
from dotenv import find_dotenv, load_dotenv

env_path = find_dotenv(raise_error_if_not_found=False)
if env_path:
    load_dotenv(env_path)
    print(f"Loaded .env from: {env_path}")
else:
    print("NOTE: .env not found — set GITHUB_TOKEN manually if needed")

PROVIDER = "github"  # ← change to "github" to use GitHub Models

if PROVIDER == "github":
    token = os.environ.get("GITHUB_TOKEN", "")
    if token:
        print(f"GITHUB_TOKEN is set ({token[:8]}...)")
    else:
        print("ERROR: GITHUB_TOKEN is NOT set")
    ENDPOINT = "https://models.inference.ai.azure.com"
    API_KEY = token
    MODEL = "Phi-4"
elif PROVIDER == "localfoundry":
    ENDPOINT = os.environ.get("LOCALFOUNDRY_ENDPOINT", "http://localhost:5272/v1/")
    API_KEY = "unused"
    MODEL = os.environ.get("LOCALFOUNDRY_MODEL", "qwen2.5-0.5b-instruct-generic-gpu:4")
    print(f"NOTE: AI Toolkit LocalFoundry — ensure a model is running at {ENDPOINT}")
else:
    raise ValueError(f"Unknown PROVIDER: {PROVIDER!r}")

print(f"Provider  : {PROVIDER}")
print(f"Endpoint  : {ENDPOINT}")
print(f"Model     : {MODEL}")

Loaded .env from: y:\.sources\localm-tuts\a2a\_examples\.env
GITHUB_TOKEN is set (github_p...)
Provider  : github
Endpoint  : https://models.inference.ai.azure.com
Model     : Phi-4


## Step 2 — Define the Agent Classes

In Lesson 05 you learned how to build these four agent classes interactively.
Here we **define them inline** so this notebook is fully self-contained —
you can run it top-to-bottom without any external `.py` files.

| Class                | What it does                                           |
| -------------------- | ------------------------------------------------------ |
| `QAAgent`            | Policy Q&A — question → text answer                    |
| `ClaimsAgent`        | Multi-turn claims filing — collects 4 required fields  |
| `PolicySummaryAgent` | Returns structured JSON policy summary as a `DataPart` |
| `MultiSkillAgent`    | Routes user messages to the correct skill agent        |


In [None]:
import json
import re
from datetime import datetime, timezone
from pathlib import Path

from openai import AsyncOpenAI

# ── Shared helpers ─────────────────────────────────────────────────────────

SYSTEM_PROMPT = """\
You are a helpful insurance policy assistant.
Use the following policy document to answer questions accurately.
If the answer is not in the document, say so clearly.
Always cite the relevant section when possible.

--- POLICY DOCUMENT ---
{policy_text}
--- END DOCUMENT ---
"""


def load_knowledge(path: str) -> str:
    """Load a knowledge document from disk."""
    return Path(path).read_text(encoding="utf-8")


# ── QAAgent ────────────────────────────────────────────────────────────────


class QAAgent:
    """Question-answering agent backed by the configured model provider."""

    def __init__(
        self,
        knowledge_path: str,
        model: str = MODEL,
        endpoint: str = ENDPOINT,
        api_key: str = API_KEY,
        temperature: float = 0.2,
    ):
        self.client = AsyncOpenAI(base_url=endpoint, api_key=api_key)
        self.model = model
        self.temperature = temperature
        self.knowledge = load_knowledge(knowledge_path)
        self.system_prompt = SYSTEM_PROMPT.format(policy_text=self.knowledge)

    async def query(self, question: str) -> str:
        """Send a question to the model and return the answer."""
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": question},
            ],
            temperature=self.temperature,
            max_tokens=2048,
        )
        return response.choices[0].message.content


# ── ClaimsAgent ────────────────────────────────────────────────────────────

REQUIRED_CLAIM_FIELDS = ["claim_type", "date_of_service", "amount", "description"]

CLAIMS_SYSTEM_PROMPT = """\
You are an insurance claims filing assistant. Extract claim data from user messages.

Required claim fields:
- claim_type: one of "medical", "dental", "prescription", "emergency"
- date_of_service: date in YYYY-MM-DD format
- amount: dollar amount as a string (e.g. "150.00")
- description: brief description of the service

STRICT RULES — you must follow these exactly:
- ONLY extract fields that are EXPLICITLY and CLEARLY stated by the user
- Do NOT infer, guess, assume, or fabricate any field value
- Do NOT add a field unless the user clearly provided that exact value
- If a field is absent or ambiguous, omit it from extracted_fields entirely
- Do NOT re-extract fields already collected (listed below)

Already collected fields (do NOT include these in extracted_fields):
{collected}

Return ONLY a JSON object with no markdown, no explanation:
{{
  "extracted_fields": {{"field_name": "value"}}
}}

If the user provided nothing extractable, return: {{"extracted_fields": {{}}}}
"""


class ClaimsAgent:
    """Multi-turn claims filing agent.

    Maintains state across turns to collect all required claim fields.
    Returns structured JSON with extracted fields and missing info.
    """

    def __init__(
        self,
        model: str = MODEL,
        endpoint: str = ENDPOINT,
        api_key: str = API_KEY,
    ):
        self.client = AsyncOpenAI(base_url=endpoint, api_key=api_key)
        self.model = model
        self.sessions: dict[str, dict[str, str]] = {}

    async def process(self, user_text: str, session_id: str = "default") -> dict:
        """Process a claims turn. Returns status + data."""
        collected = self.sessions.get(session_id, {})

        prompt = CLAIMS_SYSTEM_PROMPT.format(
            collected=json.dumps(collected) if collected else "(none yet)"
        )

        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": prompt},
                {"role": "user", "content": user_text},
            ],
            temperature=0.0,
            max_tokens=1024,
        )

        raw = response.choices[0].message.content

        try:
            json_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", raw, re.DOTALL)
            if json_match:
                parsed = json.loads(json_match.group())
            else:
                parsed = json.loads(raw)
        except json.JSONDecodeError:
            parsed = {"extracted_fields": {}}

        extracted = parsed.get("extracted_fields", {})
        collected.update({k: v for k, v in extracted.items() if v})
        self.sessions[session_id] = collected

        missing = [f for f in REQUIRED_CLAIM_FIELDS if f not in collected]

        if missing:
            return {
                "status": "input_required",
                "collected_fields": collected,
                "missing_fields": missing,
                "message": f"Please provide: {', '.join(missing)}",
            }
        else:
            receipt = self._generate_receipt(collected, session_id)
            del self.sessions[session_id]
            return {
                "status": "completed",
                "claim_receipt": receipt,
                "message": f"Claim {receipt['claim_id']} filed successfully.",
            }

    def clear_session(self, session_id: str) -> None:
        """Remove a claims session (used by cancel)."""
        self.sessions.pop(session_id, None)

    def _generate_receipt(self, fields: dict, session_id: str) -> dict:
        """Generate a structured claim receipt."""
        from uuid import uuid4 as _uuid4

        return {
            "claim_id": f"CLM-{_uuid4().hex[:8].upper()}",
            "policy_number": "ACME-STD-2025-001",
            "claim_type": fields.get("claim_type", "unknown"),
            "date_of_service": fields.get("date_of_service", "unknown"),
            "amount": fields.get("amount", "0.00"),
            "description": fields.get("description", ""),
            "status": "submitted",
            "filed_at": datetime.now(timezone.utc).isoformat(),
            "estimated_processing_days": 30,
        }


# ── PolicySummaryAgent ─────────────────────────────────────────────────────


class PolicySummaryAgent:
    """Returns structured policy summaries as JSON (DataPart in A2A)."""

    def __init__(
        self,
        knowledge_path: str = "data/insurance_policy.txt",
        model: str = MODEL,
        endpoint: str = ENDPOINT,
        api_key: str = API_KEY,
    ):
        self.client = AsyncOpenAI(base_url=endpoint, api_key=api_key)
        self.model = model
        self.knowledge = load_knowledge(knowledge_path)

    async def summarize(self) -> dict:
        """Generate a structured policy summary."""
        prompt = f"""\
Extract the key facts from this insurance policy and return ONLY a JSON object:

{self.knowledge}

Return this exact JSON structure (fill in values from the document):
{{
  "policy_number": "...",
  "plan_name": "...",
  "monthly_premium": "...",
  "annual_premium": "...",
  "deductible": "...",
  "annual_maximum": "...",
  "covered_services": ["..."],
  "exclusions": ["..."]
}}
"""
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.0,
            max_tokens=2048,
        )

        raw = response.choices[0].message.content

        try:
            json_match = re.search(r"\{.*\}", raw, re.DOTALL)
            if json_match:
                return json.loads(json_match.group())
            return json.loads(raw)
        except json.JSONDecodeError:
            return {"error": "Failed to parse structured output", "raw": raw}


# ── MultiSkillAgent ────────────────────────────────────────────────────────


class MultiSkillAgent:
    """Routes requests to QAAgent, ClaimsAgent, or PolicySummaryAgent."""

    CLAIM_PATTERN = re.compile(
        r"\b(file|submit|make|start)\b.*\bclaims?\b"
        r"|\bclaims?\s+(for|about|regarding)\b"
        r"|\b(dental|medical|prescription|emergency)\s+claims?\b",
        re.IGNORECASE,
    )
    SUMMARY_KEYWORDS = ["summary", "summarize", "overview", "key facts"]

    def __init__(self, knowledge_path: str = "data/insurance_policy.txt"):
        self.qa_agent = QAAgent(knowledge_path)
        self.claims_agent = ClaimsAgent()
        self.summary_agent = PolicySummaryAgent(knowledge_path)

    def detect_skill(self, user_text: str) -> str:
        """Detect which skill to route to based on user input."""
        lower = user_text.lower()
        if self.CLAIM_PATTERN.search(lower):
            return "claims-filing"
        if any(kw in lower for kw in self.SUMMARY_KEYWORDS):
            return "policy-summary"
        return "policy-qa"

    async def handle(
        self, user_text: str, skill: str | None = None, session_id: str = "default"
    ) -> dict:
        """Handle a request by routing to the correct skill."""
        detected_skill = skill or self.detect_skill(user_text)

        if detected_skill == "claims-filing":
            result = await self.claims_agent.process(user_text, session_id)
            return {"skill": "claims-filing", "type": "multi_turn", **result}

        elif detected_skill == "policy-summary":
            summary = await self.summary_agent.summarize()
            return {
                "skill": "policy-summary",
                "type": "structured",
                "status": "completed",
                "data": summary,
            }

        else:  # policy-qa
            answer = await self.qa_agent.query(user_text)
            return {
                "skill": "policy-qa",
                "type": "text",
                "status": "completed",
                "answer": answer,
            }


print(
    "Agent classes defined: QAAgent, ClaimsAgent, PolicySummaryAgent, MultiSkillAgent"
)

Lesson 05 src added to path: Y:\.sources\localm-tuts\a2a\_examples\a2a\lessons\05-first-a2a-agent\src
Exists: True

Imported from qa_agent.py:
  QAAgent              → <class 'qa_agent.QAAgent'>
  ClaimsAgent          → <class 'qa_agent.ClaimsAgent'>
  PolicySummaryAgent   → <class 'qa_agent.PolicySummaryAgent'>
  MultiSkillAgent      → <class 'qa_agent.MultiSkillAgent'>

NOTE: These classes use ENDPOINT, API_KEY, MODEL from qa_agent.py.
      If you changed PROVIDER above, set matching env vars in .env.


## Step 3 — Implement the Enhanced AgentExecutor

The `AgentExecutor` is the bridge between the A2A protocol and your agent logic.
Our enhanced version handles:

1. **Skill routing** — detects which skill to invoke from the message
2. **Multi-turn** — emits `TaskState.input_required` when claims info is incomplete
3. **Artifacts** — emits `TaskArtifactUpdateEvent` for structured outputs
4. **Streaming events** — `working` → artifact/message → `completed`
5. **Cancellation** — cleans up claims sessions

### Event Types in the EventQueue

| Event Type                | When to Use                                                  |
| ------------------------- | ------------------------------------------------------------ |
| `TaskStatusUpdateEvent`   | State changes (working, input_required, completed, canceled) |
| `TaskArtifactUpdateEvent` | Outputting structured artifacts                              |


### The `execute()` Event Sequence

```mermaid
sequenceDiagram
    participant Client
    participant Handler as DefaultRequestHandler
    participant Executor as InsuranceAgentExecutor
    participant Queue as EventQueue
    participant Agent as MultiSkillAgent

    Client->>Handler: POST / (message/send or message/stream)
    Handler->>Executor: execute(context, event_queue)

    Executor->>Queue: enqueue(TaskStatusUpdateEvent working)
    Note right of Queue: Client sees: state=working

    Executor->>Agent: detect_skill(user_text)
    Executor->>Agent: handle(user_text, skill, task_id)

    alt Q&A skill
        Agent-->>Executor: {answer: "..."}
        Executor->>Queue: enqueue(TaskStatusUpdateEvent completed + answer)
        Note right of Queue: Client sees: Task(completed, TextPart)
    else claims-filing: input_required
        Agent-->>Executor: {status: input_required, missing: [...]}
        Executor->>Queue: enqueue(TaskStatusUpdateEvent input_required)
        Note right of Queue: Client sees: state=input_required
        Note over Executor: Stores skill in active_skills[task_id]\nClient must send follow-up with taskId
    else claims-filing: completed
        Agent-->>Executor: {status: completed, claim_receipt: {...}}
        Executor->>Queue: enqueue(TaskArtifactUpdateEvent DataPart)
        Executor->>Queue: enqueue(TaskStatusUpdateEvent completed)
        Note right of Queue: Client sees: Artifact + state=completed
    else policy-summary
        Agent-->>Executor: {data: {...}}
        Executor->>Queue: enqueue(TaskArtifactUpdateEvent DataPart)
        Executor->>Queue: enqueue(TaskStatusUpdateEvent completed)
    end

    Queue->>Handler: drain events
    Handler-->>Client: JSON-RPC response (blocking) or SSE stream
```

The `EventQueue` is the decoupling mechanism — your executor enqueues events without
worrying about whether the client requested blocking or streaming. The handler drains
the queue and formats it appropriately.


In [4]:
from uuid import uuid4

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
    Artifact,
    DataPart,
    Message,
    Part,
    TaskArtifactUpdateEvent,
    TaskState,
    TaskStatus,
    TaskStatusUpdateEvent,
    TextPart,
)


class InsuranceAgentExecutor(AgentExecutor):
    """Enhanced A2A executor with multi-turn, artifacts, and cancellation.

    Supported flows:
    - policy-qa:      user question → text answer (Task with completed status)
    - claims-filing:  user claim → input_required → follow-up → artifact
    - policy-summary: user request → structured JSON artifact
    """

    def __init__(self, knowledge_path: str = "data/insurance_policy.txt"):
        self.agent = MultiSkillAgent(knowledge_path)
        # Track which skill is active for each task (for follow-up routing)
        self.active_skills: dict[str, str] = {}

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        """Execute the agent for an incoming A2A request."""
        user_text = context.get_user_input()
        task_id = context.task_id or uuid4().hex
        context_id = context.context_id or uuid4().hex

        # Determine skill: use stored skill for follow-ups, detect for new tasks
        skill = self.active_skills.get(task_id)
        if skill is None:
            skill = self.agent.detect_skill(user_text)

        # ── 1. Emit "working" status ──────────────────────────────────────────
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                taskId=task_id,
                contextId=context_id,
                status=TaskStatus(
                    state=TaskState.working,
                    message=Message(
                        role="agent",
                        parts=[
                            Part(
                                root=TextPart(text=f"Processing with {skill} skill...")
                            )
                        ],
                        messageId=uuid4().hex,
                    ),
                ),
                final=False,
            )
        )

        # ── 2. Call the multi-skill agent ─────────────────────────────────────
        result = await self.agent.handle(user_text, skill=skill, session_id=task_id)

        # ── 3. Handle result based on type ────────────────────────────────────

        if result["status"] == "input_required":
            # Multi-turn: store active skill and emit input_required
            self.active_skills[task_id] = result["skill"]
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    status=TaskStatus(
                        state=TaskState.input_required,
                        message=Message(
                            role="agent",
                            parts=[Part(root=TextPart(text=result["message"]))],
                            messageId=uuid4().hex,
                        ),
                    ),
                    final=False,  # Not terminal — client should follow up
                )
            )

        elif result["type"] == "structured":
            # Structured data: emit DataPart artifact, then completed
            self.active_skills.pop(task_id, None)
            await event_queue.enqueue_event(
                TaskArtifactUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    artifact=Artifact(
                        artifactId=uuid4().hex,
                        name="Policy Summary",
                        description="Structured JSON summary of the insurance policy",
                        parts=[
                            Part(root=DataPart(data=result["data"])),
                        ],
                    ),
                    append=False,
                    lastChunk=True,
                )
            )
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    status=TaskStatus(
                        state=TaskState.completed,
                        message=Message(
                            role="agent",
                            parts=[
                                Part(root=TextPart(text="Policy summary generated."))
                            ],
                            messageId=uuid4().hex,
                        ),
                    ),
                    final=True,
                )
            )

        elif result["type"] == "multi_turn" and result["status"] == "completed":
            # Claims completed: emit receipt as DataPart artifact
            self.active_skills.pop(task_id, None)
            receipt = result["claim_receipt"]
            await event_queue.enqueue_event(
                TaskArtifactUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    artifact=Artifact(
                        artifactId=uuid4().hex,
                        name=f"Claim Receipt — {receipt['claim_id']}",
                        description="Insurance claim receipt with processing details",
                        parts=[
                            Part(root=DataPart(data=receipt)),
                        ],
                    ),
                    append=False,
                    lastChunk=True,
                )
            )
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    status=TaskStatus(
                        state=TaskState.completed,
                        message=Message(
                            role="agent",
                            parts=[Part(root=TextPart(text=result["message"]))],
                            messageId=uuid4().hex,
                        ),
                    ),
                    final=True,
                )
            )

        else:
            # Simple text Q&A — emit completed (not a bare Message)
            self.active_skills.pop(task_id, None)
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    taskId=task_id,
                    contextId=context_id,
                    status=TaskStatus(
                        state=TaskState.completed,
                        message=Message(
                            role="agent",
                            parts=[Part(root=TextPart(text=result["answer"]))],
                            messageId=uuid4().hex,
                        ),
                    ),
                    final=True,
                )
            )

    async def cancel(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        """Cancel an in-progress task.

        Cleans up claims sessions and emits a canceled status.
        """
        task_id = context.task_id or "unknown"
        context_id = context.context_id or uuid4().hex

        # Clean up any active claims session
        self.agent.claims_agent.clear_session(task_id)
        self.active_skills.pop(task_id, None)

        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                taskId=task_id,
                contextId=context_id,
                status=TaskStatus(
                    state=TaskState.canceled,
                    message=Message(
                        role="agent",
                        parts=[Part(root=TextPart(text="Task canceled by client."))],
                        messageId=uuid4().hex,
                    ),
                ),
                final=True,
            )
        )


print("InsuranceAgentExecutor defined (multi-turn, artifacts, cancellation).")

InsuranceAgentExecutor defined (multi-turn, artifacts, cancellation).


## Step 4 — Define the Rich Agent Card

The Agent Card is served at `/.well-known/agent.json`. Our enhanced card declares:

- **3 skills** (policy-qa, claims-filing, policy-summary)
- **Streaming** capability
- **State transition history** — clients can inspect past task states
- **Text + data** output modes


In [5]:
from a2a.types import AgentCapabilities, AgentCard, AgentSkill

agent_card = AgentCard(
    name="InsuranceAgent",
    description=(
        f"Multi-skill insurance agent using {MODEL} via {PROVIDER}. "
        "Handles policy Q&A, claims filing (multi-turn), and structured policy summaries."
    ),
    url="http://localhost:10001/",
    version="2.0.0",
    capabilities=AgentCapabilities(
        streaming=True,
        stateTransitionHistory=True,
    ),
    default_input_modes=["text"],
    default_output_modes=["text", "data"],
    skills=[
        AgentSkill(
            id="policy-qa",
            name="Policy Question Answering",
            description="Answer questions about insurance policy coverage, deductibles, premiums, and exclusions",
            tags=["qa", "insurance", "policy", "coverage"],
            examples=[
                "What is the deductible for the Standard plan?",
                "Are cosmetic procedures covered?",
                "What medications are excluded?",
            ],
        ),
        AgentSkill(
            id="claims-filing",
            name="Claims Filing",
            description=(
                "File insurance claims through a multi-turn conversation. "
                "Collects claim type, date, amount, and description. "
                "Returns a structured claim receipt as a DataPart artifact."
            ),
            tags=["claims", "filing", "multi-turn", "structured"],
            examples=[
                "I need to file a claim for a dental visit",
                "Submit a medical claim for emergency room visit on 2025-01-15",
            ],
        ),
        AgentSkill(
            id="policy-summary",
            name="Policy Summary",
            description=(
                "Generate a structured JSON summary of the insurance policy. "
                "Returns machine-readable data as a DataPart artifact."
            ),
            tags=["summary", "structured", "json", "data"],
            examples=[
                "Give me a summary of my policy",
                "What are the key facts about my coverage?",
            ],
        ),
    ],
)

# Preview the Agent Card
print(agent_card.model_dump_json(indent=2, exclude_none=True))

{
  "capabilities": {
    "stateTransitionHistory": true,
    "streaming": true
  },
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text",
    "data"
  ],
  "description": "Multi-skill insurance agent using Phi-4 via github. Handles policy Q&A, claims filing (multi-turn), and structured policy summaries.",
  "name": "InsuranceAgent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "Answer questions about insurance policy coverage, deductibles, premiums, and exclusions",
      "examples": [
        "What is the deductible for the Standard plan?",
        "Are cosmetic procedures covered?",
        "What medications are excluded?"
      ],
      "id": "policy-qa",
      "name": "Policy Question Answering",
      "tags": [
        "qa",
        "insurance",
        "policy",
        "coverage"
      ]
    },
    {
      "description": "File insurance claims through a multi-turn conversation. Collects claim 

## Step 5 — Wire Up the Server

Three components wired together:

1. **`InsuranceAgentExecutor`** — our enhanced executor with multi-turn + artifacts
2. **`DefaultRequestHandler`** — routes requests, manages task state in `InMemoryTaskStore`
3. **`A2AStarletteApplication`** — builds the ASGI app (Starlette-based)

The `InMemoryTaskStore` is critical for multi-turn: it stores task state between
the client's initial message and follow-up messages.


In [None]:
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

# Knowledge file lives in this lesson's data/ directory
KNOWLEDGE_PATH = "data/insurance_policy.txt"
print(f"Knowledge path: {KNOWLEDGE_PATH}")

request_handler = DefaultRequestHandler(
    agent_executor=InsuranceAgentExecutor(KNOWLEDGE_PATH),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    agent_card=agent_card,
    http_handler=request_handler,
)

app = server.build()
print(f"ASGI app built — type: {type(app).__name__}")
print(f"Agent Card URL: {agent_card.url}.well-known/agent.json")

Knowledge path: Y:\.sources\localm-tuts\a2a\_examples\a2a\lessons\05-first-a2a-agent\src\data\insurance_policy.txt
ASGI app built — type: Starlette
Agent Card URL: http://localhost:10001/.well-known/agent.json


: 

## Step 6 — Run the Server

> **Note:** Running the server blocks this notebook kernel.
> It will keep running until you interrupt the kernel (Kernel → Interrupt).
> Keep it running and open the **Lesson 07 client notebook** in a separate tab.


In [None]:
import uvicorn

config = uvicorn.Config(app, host="0.0.0.0", port=10001)
server_instance = uvicorn.Server(config)

print("Starting InsuranceAgent A2A Server on http://localhost:10001")
print("Agent Card: http://localhost:10001/.well-known/agent.json")
print(f"Skills: {[s.id for s in agent_card.skills]}")
print("Press Ctrl+C or interrupt kernel to stop")
await server_instance.serve()

INFO:     Started server process [34852]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:10001 (Press CTRL+C to quit)


Starting InsuranceAgent A2A Server on http://localhost:10001
Agent Card: http://localhost:10001/.well-known/agent.json
Skills: ['policy-qa', 'claims-filing', 'policy-summary']
Press Ctrl+C or interrupt kernel to stop
INFO:     127.0.0.1:53700 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK


Queue is closed. Event will not be dequeued.


INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK


Queue is closed. Event will not be dequeued.


INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK


Request Error (ID: fd6ed778-265a-4ebe-b8fa-2982764d68ab): Code=-32601, Message='Method not found'


INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53703 - "POST / HTTP/1.1" 200 OK


## Step 7 — Test with curl

With the server running, test each capability:

### Fetch Agent Card

```bash
curl http://localhost:10001/.well-known/agent.json | python -m json.tool
```

### Test Policy Q&A (Text Response)

```bash
curl -X POST http://localhost:10001 \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "test-qa",
    "method": "message/send",
    "params": {
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "What is the deductible?"}],
        "messageId": "msg-001"
      }
    }
  }'
```

### Test Claims Filing (Multi-Turn — Input Required)

```bash
# Turn 1: Partial claim info → expect input_required
curl -X POST http://localhost:10001 \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "test-claim-1",
    "method": "message/send",
    "params": {
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "I need to file a dental claim"}],
        "messageId": "msg-002"
      }
    }
  }'
```

Note the `taskId` in the response, then send the follow-up:

```bash
# Turn 2: Provide remaining details (replace TASK_ID with the actual task ID)
curl -X POST http://localhost:10001 \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "test-claim-2",
    "method": "message/send",
    "params": {
      "taskId": "TASK_ID",
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "Root canal on 2025-01-15 for $450"}],
        "messageId": "msg-003"
      }
    }
  }'
```

### Test Policy Summary (Structured Data Artifact)

```bash
curl -X POST http://localhost:10001 \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "test-summary",
    "method": "message/send",
    "params": {
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "Give me a policy summary"}],
        "messageId": "msg-004"
      }
    }
  }'
```

### Test Streaming

```bash
curl -X POST http://localhost:10001 \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "test-stream",
    "method": "message/stream",
    "params": {
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "What is covered?"}],
        "messageId": "msg-005"
      }
    }
  }'
```

You should see multiple SSE events: `working` → response → `completed`.


## Port Convention

| Port  | Agent             | Lesson |
| ----- | ----------------- | ------ |
| 10001 | InsuranceAgent    | 5-7    |
| 10002 | ResearchAgent     | 9      |
| 10003 | CodeAgent         | 10     |
| 10004 | PlannerAgent      | 11     |
| 10005 | TaskAgent         | 12     |
| 10006 | AssistantAgent    | 13     |
| 10007 | CopilotAgent      | 14     |
| 10008 | OrchestratorAgent | 15     |


## A2A Protocol Coverage Summary

| Protocol Feature         | How It's Implemented                                         |
| ------------------------ | ------------------------------------------------------------ |
| Agent Card (discovery)   | `/.well-known/agent.json` with 3 skills                      |
| Multiple Skills          | `policy-qa`, `claims-filing`, `policy-summary`               |
| TaskState.working        | Emitted at start of every execute()                          |
| TaskState.input_required | Claims filing when info is incomplete                        |
| TaskState.completed      | After Q&A answer or final artifact                           |
| TaskState.canceled       | Via cancel() with session cleanup                            |
| TextPart messages        | Q&A answers via `TaskStatusUpdateEvent(completed, TextPart)` |
| DataPart artifacts       | Claims receipts and policy summaries                         |
| Streaming events         | `message/stream` yields working → output → completed         |
| Multi-turn               | Claims follow-ups via stored task_id sessions                |
| Task store               | `InMemoryTaskStore` persists task state between turns        |

## Next Steps

**Keep the server running!** In Lesson 7, you'll build a client that exercises
all these capabilities:

- Multi-turn claims conversations
- Artifact parsing (DataPart)
- Task state inspection
- Streaming event handling
- Task cancellation

→ Continue to [Lesson 07 — A2A Client Fundamentals](../07-a2a-client/)
