## AgentCore、Strands Agents、A2A の入門

[A2A プロトコル](https://a2a-protocol.org/dev/specification/)は、独立した、潜在的に不透明な AI エージェントシステム間の通信と相互運用性を促進するために設計されたオープンスタンダードです。エージェントが異なるフレームワーク、言語、または異なるベンダーによって構築される可能性があるエコシステムでは、A2A は共通の言語とインタラクションモデルを提供します。

[Amazon AgentCore Runtime](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/agents-tools-runtime.html)は、AI エージェントまたはツールをデプロイして実行するための安全でサーバーレスな専用ホスティング環境を提供します。

最近、AWS は AgentCore Runtime の [A2A サポート](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-a2a.html)を発表しました。

このワークショップでは、AgentCore Runtime を使用して以下のアーキテクチャを構築します：

<img src="images/architecture-getting-started.png" style="width: 80%;">

この入門ノートブックでは、2つのエージェントを構築します。最初のエージェントは AWS Docs エキスパートです。AWS Docs MCP をクエリして AWS ドキュメントを読み取り、検索し、推奨事項を生成します。2番目のエージェントは AWS Blog エキスパートです。ウェブ検索を使用して最新の AWS ブログとニュースを調べます。

それでは始めましょう！

### セットアップ

依存関係のインストール

In [None]:
%pip install -q -r requirements.txt --no-cache-dir --force-reinstall

**新しいバージョンが反映されるように環境を再起動してください！**

In [None]:
#import IPython

#IPython.Application.instance().kernel.do_shutdown(True)

`bedrock-agentcore-starter-toolkit` のバージョンが 0.1.21 であることを確認

In [None]:
!pip freeze | grep boto
!pip freeze | grep agentcore

In [None]:
# ライブラリをインポート
import os
import json
import requests
import boto3
import time
from boto3.session import Session
from strands.tools import tool

# boto セッションを取得
boto_session = Session()

### 1 - 2つのエージェント用のコードを作成

`agents` フォルダが作成されていない場合は作成します。

In [None]:
![ ! -d "agents" ] && mkdir agents

#### 1.1 - AWS Docs エキスパートエージェント

まず、最初のエージェントのコードをローカルファイルに書き込みましょう。このエージェントは後で AgentCore Runtime にデプロイされます。

In [None]:
%%writefile agents/strands_aws_docs.py
import os
import logging
import asyncio
from mcp import stdio_client, StdioServerParameters
from strands import Agent
from strands.multiagent.a2a import A2AServer
from strands.tools.mcp import MCPClient
from fastapi import FastAPI
import uvicorn

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()
runtime_url = os.environ.get('AGENTCORE_RUNTIME_URL', 'http://127.0.0.1:9000/')
host, port = "0.0.0.0", 9000

# 遅延初期化を使用したグローバル MCP クライアント
_mcp_client = None

async def get_mcp_client():
    """タイムアウト付きの MCP クライアントの遅延初期化"""
    global _mcp_client
    if _mcp_client is None:
        try:
            _mcp_client = MCPClient(
                lambda: stdio_client(
                    StdioServerParameters(
                        command="uvx", 
                        args=["awslabs.aws-documentation-mcp-server@latest"]
                    )
                )
            )
            # タイムアウト付きで起動
            await asyncio.wait_for(_mcp_client.start(), timeout=10.0)
            logger.info("MCP クライアント初期化完了")
        except asyncio.TimeoutError:
            logger.error("MCP クライアントの起動がタイムアウト")
            _mcp_client = None
        except Exception as e:
            logger.error(f"MCP クライアント失敗: {e}")
            _mcp_client = None
    return _mcp_client

system_prompt = """あなたは AWS Documentation MCP サーバーを活用した AWS ドキュメントアシスタントです。あなたの役割は、ユーザーが AWS ドキュメントから正確で最新の情報を見つけるのを支援することです。

重要: レスポンスは短く、焦点を絞ってください。

ガイドライン:
- 簡潔で実用的な回答を提供（最大3文）
- リストには箇条書きを使用
- 冗長な説明は省略
- MCP が利用できない場合は、基本的な AWS 知識を提供
- 操作は8秒後にタイムアウト
- 完全性よりも速度を優先

利用可能な場合、AWS ドキュメント検索ツールにアクセスできます。"""

# 最小限のツールでエージェントを初期化
agent = Agent(
    system_prompt=system_prompt, 
    tools=[],  # ツールなしで開始し、動的に追加
    name="AWS Docs Agent",
    description="An agent to query AWS Docs using AWS MCP.",
)

# MCP が準備できたらツールを動的に追加
async def setup_agent_tools():
    """MCP クライアントの準備ができたらエージェントのツールをセットアップ"""
    try:
        mcp_client = await get_mcp_client()
        if mcp_client:
            tools = await asyncio.wait_for(
                mcp_client.list_tools_async(), 
                timeout=5.0
            )
            agent.tools = [tools] if tools else []
            logger.info("エージェントのツール設定完了")
    except Exception as e:
        logger.warning(f"MCP ツールのセットアップ失敗: {e}")

a2a_server = A2AServer(
    agent=agent,
    http_url=runtime_url,
    serve_at_root=True
)

@app.get("/ping")
def ping():
    return {"status": "healthy"}

@app.on_event("startup")
async def startup_event():
    """起動時に MCP クライアントを初期化"""
    await setup_agent_tools()

app.mount("/", a2a_server.to_fastapi_app())

if __name__ == "__main__":
    uvicorn.run(app, host=host, port=port)

#### **オプション** - ローカルテスト

このコードをローカルでテストしたい場合は、bash/ターミナルウィンドウを開いて以下のスニペットを実行できます：

```bash
python agents/strands_aws_docs.py
```

サーバーがローカルで起動します。次に、別のターミナル/bash で以下のコマンドを実行してテストします：

```bash
curl -X POST http://0.0.0.0:9000 \-H "Content-Type: application/json" \-d '{  "jsonrpc": "2.0",  "id": "req-001",  "method": "message/send",  "params": {  "message": {  "role": "user",  "parts": [  {  "kind": "text",  "text": "What's AWS Lambda?"  }  ],  "messageId": "d0673ab9-796d-4270-9435-451912020cd1"  }  } }' | jq .
```

MCP をクエリして、AWS Lambda を説明する回答を返します。

エージェントカード情報の取得もテストできます。以下のコマンドを使用します：

```bash
curl http://localhost:9000/.well-known/agent-card.json | jq .
```

#### 1.2 - AWS Blogs エキスパートエージェント

次に、2番目のエージェントのコードをローカルファイルに書き込みましょう。

In [None]:
%%writefile agents/strands_aws_blogs_news.py
import logging
import os
import asyncio
from strands import Agent, tool
from strands.multiagent.a2a import A2AServer
import uvicorn
from fastapi import FastAPI

from ddgs import DDGS
from ddgs.exceptions import RatelimitException, DDGSException

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

runtime_url = os.environ.get('AGENTCORE_RUNTIME_URL', 'http://127.0.0.1:9000/')

@tool
async def fast_internet_search(keywords: str, max_results: int = 3) -> str:
    """タイムアウト付きの高速 Web 検索
    Args:
        keywords (str): 検索クエリのキーワード
        max_results (int): 最大結果数（速度のためデフォルト3）
    Returns:
        検索結果
    """
    try:
        # より良い結果のために AWS 固有の用語を追加
        aws_keywords = f"site:aws.amazon.com {keywords} AWS"
        
        # 検索に asyncio タイムアウトを使用
        async def search_with_timeout():
            return DDGS().text(
                aws_keywords, 
                region="us-en", 
                max_results=max_results
            )
        
        results = await asyncio.wait_for(search_with_timeout(), timeout=8.0)
        
        if results:
            # 結果を簡潔にフォーマット
            formatted = []
            for i, result in enumerate(results[:max_results], 1):
                formatted.append(f"{i}. {result.get('title', 'No title')}\n   {result.get('href', '')}")
            
            return "\n".join(formatted)
        else:
            return "AWS の結果が見つかりませんでした。"
            
    except asyncio.TimeoutError:
        logger.warning(f"検索タイムアウト: {keywords}")
        return "検索がタイムアウトしました。より具体的なクエリを試してください。"
    except RatelimitException:
        logger.warning("レート制限に達しました")
        return "レート制限に達しました。しばらくしてからもう一度お試しください。"
    except (DDGSException, Exception) as e:
        logger.error(f"検索エラー: {e}")
        return f"検索が利用できません: {str(e)[:50]}"

system_prompt = """あなたは AWS ブログエキスパートです。

重要: レスポンスは短く、最新の情報を優先してください。

ガイドライン:
- 最大3件の最新結果を提供
- 公式 AWS ブログ記事のみに焦点を当てる
- 簡潔な要約を使用（各結果につき1-2文）
- 可能な場合は直接リンクを含める
- 検索は8秒後にタイムアウト
- 検索が失敗した場合は、制限事項を認める

検索戦略:
- 検索には常に「AWS」を含める
- aws.amazon.com/blogs/ のコンテンツに焦点を当てる
- 最近の発表を優先"""

agent = Agent(
    system_prompt=system_prompt, 
    tools=[fast_internet_search],
    name="AWS Blog/News Agent",
    description="An agent to search on Web latest AWS Blogs and News.",
)

host, port = "0.0.0.0", 9000

a2a_server = A2AServer(
    agent=agent,
    http_url=runtime_url,
    serve_at_root=True
)

app = FastAPI()

@app.get("/ping")
def ping():
    return {"status": "healthy"}

app.mount("/", a2a_server.to_fastapi_app())

if __name__ == "__main__":
    uvicorn.run(app, host=host, port=port)

エージェントに必要な依存関係を含む requirements.txt ファイルを書き込みましょう。

In [None]:
%%writefile agents/requirements.txt
boto3==1.40.50
bedrock-agentcore==0.1.7
strands-agents[a2a]
strands-agents-tools
pyyaml
ddgs

### 2 - AgentCore Runtime へのデプロイ

次に、このソリューションを AgentCore Runtime にデプロイしましょう。

#### 2.1 - Cognito ユーザープールのセットアップ

エージェントをデプロイする前に、Cognito ユーザープールをセットアップして、エージェントにアクセスするユーザーを検証できるようにする必要があります。または Okta、Microsoft Entra ID などの他の ID プロバイダーを使用することもできます。

ワークショップのいくつかのステップを簡素化するためのメソッドを持つヘルパークラスをインポートします。このヘルパークラスは Cognito ユーザープールを作成するメソッドをインポートします。

In [None]:
from helpers.utils import setup_cognito_user_pool, reauthenticate_user

print("Amazon Cognito ユーザープールをセットアップ中...")
cognito_config = (
    setup_cognito_user_pool()
)  # ベアラートークンはこのセルの出力から取得できます
print("Cognito セットアップ完了 ✓")

#### 2.2 - エージェント用の IAM ロールを作成

##### 2.2.1 AWS Docs エージェントの実行ロール

In [None]:
from helpers.utils import create_agentcore_runtime_execution_role, AWS_DOCS_ROLE_NAME

execution_role_arn_mcp = create_agentcore_runtime_execution_role(AWS_DOCS_ROLE_NAME)

##### 2.2.2 AWS Blogs エージェントの実行ロール

In [None]:
from helpers.utils import create_agentcore_runtime_execution_role, AWS_BLOG_ROLE_NAME

execution_role_arn_blogs = create_agentcore_runtime_execution_role(AWS_BLOG_ROLE_NAME)

##### AgentCore Runtime へのデプロイ用の設定を作成

以下のセクションでは、[スターターツールキット](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/getting-started-starter-toolkit.html)を活用しています。スターターツールキットは、AI エージェントを AgentCore Runtime にデプロイするために使用できるコマンドラインインターフェース (CLI) ツールキットです。

次に、AgentCore Runtime 内で A2A プロトコルをサポートするエージェントを作成します。

##### 2.2.3 - 最初のエージェント（AWS Docs エージェント）を設定してデプロイしましょう：

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime

agentcore_runtime_mcp_agent = Runtime()
aws_docs_agent_name="aws_docs_assistant"

region = boto_session.region_name

# デプロイメントを設定
response_aws_docs_agent = agentcore_runtime_mcp_agent.configure(
    entrypoint="agents/strands_aws_docs.py",
    execution_role=execution_role_arn_mcp,
    auto_create_ecr=True,
    requirements_file="agents/requirements.txt",
    region=region,
    agent_name=aws_docs_agent_name,
    authorizer_configuration={
        "customJWTAuthorizer": {
            "allowedClients": [cognito_config.get("client_id")],
            "discoveryUrl": cognito_config.get("discovery_url"),
        }
    },
    protocol="A2A",
)

print("設定完了:", response_aws_docs_agent)

最初のエージェントを AgentCore Runtime で起動

In [None]:
launch_result_mcp = agentcore_runtime_mcp_agent.launch()
print("起動完了:", launch_result_mcp.agent_arn)

docs_agent_arn = launch_result_mcp.agent_arn

**デプロイステータスの確認**

デプロイが完了したか確認しましょう：

In [None]:
status_response = agentcore_runtime_mcp_agent.status()
status = status_response.endpoint["status"]

print(f"最終ステータス: {status}")

##### 2.2.4 - 2番目のエージェント（AWS Blogs and News エージェント）を設定してデプロイしましょう：

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime

agentcore_runtime_blogs = Runtime()
aws_blogs_agent_name="aws_blog_assistant"

# デプロイメントを設定
response_aws_blogs_agent = agentcore_runtime_blogs.configure(
    entrypoint="agents/strands_aws_blogs_news.py",
    execution_role=execution_role_arn_blogs,
    auto_create_ecr=True,
    requirements_file="agents/requirements.txt",
    region=region,
    agent_name=aws_blogs_agent_name,
    authorizer_configuration={
        "customJWTAuthorizer": {
            "allowedClients": [cognito_config.get("client_id")],
            "discoveryUrl": cognito_config.get("discovery_url"),
        }
    },
    protocol="A2A"
)

print("設定完了:", response_aws_blogs_agent)

2番目のエージェントを AgentCore Runtime で起動

In [None]:
launch_result_blog = agentcore_runtime_blogs.launch()
print("起動完了:", launch_result_blog.agent_arn)

blog_agent_arn = launch_result_blog.agent_arn

**デプロイステータスの確認**

2番目のエージェントのデプロイが完了したか確認しましょう：

In [None]:
status_response = agentcore_runtime_blogs.status()
status = status_response.endpoint["status"]

print(f"最終ステータス: {status}")

##### 2.2.5 - 出力のエクスポートと保存

次のノートブックで使用する変数をエクスポート：

In [None]:
MCP_AGENT_ID = launch_result_mcp.agent_id
MCP_AGENT_ARN = launch_result_mcp.agent_arn
MCP_AGENT_NAME = aws_docs_agent_name

BLOG_AGENT_ID = launch_result_blog.agent_id
BLOG_AGENT_ARN = launch_result_blog.agent_arn
BLOG_AGENT_NAME = aws_blogs_agent_name

COGNITO_CLIENT_ID = cognito_config.get("client_id")
COGNITO_SECRET = cognito_config.get("client_secret")
DISCOVERY_URL = cognito_config.get("discovery_url")

%store MCP_AGENT_ID
%store MCP_AGENT_ARN
%store MCP_AGENT_NAME
%store BLOG_AGENT_ID
%store BLOG_AGENT_ARN
%store BLOG_AGENT_NAME
%store COGNITO_CLIENT_ID
%store COGNITO_SECRET
%store DISCOVERY_URL

エージェントの ARN を SSM に保存して、オーケストレーターで使用できるようにします：

In [None]:
from helpers.utils import put_ssm_parameter, SSM_DOCS_AGENT_ARN, SSM_BLOGS_AGENT_ARN

put_ssm_parameter(SSM_DOCS_AGENT_ARN, MCP_AGENT_ARN)

put_ssm_parameter(SSM_BLOGS_AGENT_ARN, BLOG_AGENT_ARN)

### 3 - A2A エージェントの呼び出し

まず、認証トークンを更新しましょう：

In [None]:
bearer_token = reauthenticate_user(
    cognito_config.get("client_id"), 
    cognito_config.get("client_secret")
)

#### 3.1 エージェントカードの取得

最初のエージェント（AWS Docs MCP エキスパート）からエージェントカード情報を取得してみましょう：

In [None]:
import logging
from uuid import uuid4
from urllib.parse import quote

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

def fetch_agent_card(agent_arn):
    # エージェント ARN を URL エンコード
    escaped_agent_arn = quote(agent_arn, safe='')

    # URL を構築
    url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{escaped_agent_arn}/invocations/.well-known/agent-card.json"
    logger.info(url)
    # 一意のセッション ID を生成
    session_id = str(uuid4())
    logger.info(f"生成されたセッション ID: {session_id}")

    # ヘッダーを設定
    headers = {
        'Accept': '*/*',
        'Authorization': f'Bearer {bearer_token}',
        'X-Amzn-Bedrock-AgentCore-Runtime-Session-Id': session_id,
        'X-Amzn-Trace-Id': f'aws_docs_assistant_{session_id}'
    }

    try:
        # リクエストを実行
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        # JSON を解析して整形して出力
        agent_card = response.json()
        logger.info(json.dumps(agent_card, indent=2))

        return agent_card

    except requests.exceptions.RequestException as e:
        logger.error(f"エージェントカード取得エラー: {e}")
        return None

In [None]:
fetch_agent_card(docs_agent_arn)

次に、2番目のエージェント（AWS Blogs and News エキスパート）のエージェントカードを確認しましょう：

In [None]:
fetch_agent_card(blog_agent_arn)

#### 3.2 - エージェントのテスト

それでは、A2A を使用して最初のエージェントを呼び出してみましょう：

In [None]:
import asyncio
import logging
import os
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 300  # リクエストタイムアウトを5分に設定

def format_agent_response(response):
    """エージェントのレスポンスを抽出して人間が読みやすい形式にフォーマット"""
    # アーティファクトからメインのレスポンステキストを取得
    if response.artifacts and len(response.artifacts) > 0:
        artifact = response.artifacts[0]
        if artifact.parts and len(artifact.parts) > 0:
            return artifact.parts[0].root.text
    
    # フォールバック: 履歴からすべてのエージェントメッセージを連結
    agent_messages = [
        msg.parts[0].root.text 
        for msg in response.history 
        if msg.role.value == 'agent' and msg.parts
    ]
    return ''.join(agent_messages)


def create_message(*, role: Role = Role.user, text: str) -> Message:
    return Message(
        kind="message",
        role=role,
        parts=[Part(TextPart(kind="text", text=text))],
        message_id=uuid4().hex,
    )

async def send_sync_message(agent_arn, message: str):
    # エージェント ARN を URL エンコード
    escaped_agent_arn = quote(agent_arn, safe='')

    # URL を構築
    runtime_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{escaped_agent_arn}/invocations/"
    
    # 一意のセッション ID を生成
    session_id = str(uuid4())
    print(f"生成されたセッション ID: {session_id}")

    # AgentCore 用の認証ヘッダーを追加
    headers = {"Authorization": f"Bearer {bearer_token}",
              'X-Amzn-Bedrock-AgentCore-Runtime-Session-Id': session_id}
        
    async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, headers=headers) as httpx_client:
        # ランタイム URL からエージェントカードを取得
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=runtime_url)
        agent_card = await resolver.get_agent_card()
        print(agent_card)

        # エージェントカードには正しい URL が含まれている（この場合は runtime_url と同じ）
        # 手動でのオーバーライドは不要 - これはパスベースのマウントパターン

        # ファクトリーを使用してクライアントを作成
        config = ClientConfig(
            httpx_client=httpx_client,
            streaming=False,  # 同期レスポンス用に非ストリーミングモードを使用
        )
        factory = ClientFactory(config)
        client = factory.create(agent_card)

        # メッセージを作成して送信
        msg = create_message(text=message)

        # streaming=False の場合、これは正確に1つの結果を返す
        async for event in client.send_message(msg):
            if isinstance(event, Message):
                logger.info(event.model_dump_json(exclude_none=True, indent=2))
                return event
            elif isinstance(event, tuple) and len(event) == 2:
                # (Task, UpdateEvent) タプル
                task, update_event = event
                logger.info(f"タスク: {task.model_dump_json(exclude_none=True, indent=2)}")
                if update_event:
                    logger.info(f"更新: {update_event.model_dump_json(exclude_none=True, indent=2)}")
                return task
            else:
                # その他のレスポンスタイプのフォールバック
                logger.info(f"レスポンス: {str(event)}")
                return event

In [None]:
result = await send_sync_message(docs_agent_arn, "what is DynamoDB")
formatted_output = format_agent_response(result)
print(formatted_output)

次に、2番目のエージェントをテストしましょう：

In [None]:
result = await send_sync_message(blog_agent_arn, "Give me the latest published blog for Bedrock AgentCore?")
formatted_output = format_agent_response(result)
print(formatted_output)

以下は、エージェントが実行したステップを示すより詳細な出力です。

エージェントへの質問を変更して、ステップバイステップの結果を確認してみてください。

In [None]:
def format_agent_trace(response):
    """エージェントのレスポンスを呼び出しの読みやすいトレースとしてフォーマット"""
    print("=" * 60)
    print("AGENT EXECUTION TRACE（エージェント実行トレース）")
    print("=" * 60)
    
    # コンテキスト情報
    print(f"コンテキスト ID: {response.context_id}")
    print(f"タスク ID: {response.id}")
    print(f"ステータス: {response.status.state.value}")
    print(f"完了時刻: {response.status.timestamp}")
    print()
    
    # 履歴をトレース
    print("実行フロー:")
    print("-" * 40)
    
    for i, msg in enumerate(response.history, 1):
        role_icon = "[USER]" if msg.role.value == "user" else "[AGENT]"
        text = msg.parts[0].root.text if msg.parts else "[コンテンツなし]"
        
        # トレース表示用に長いメッセージを切り詰め
        if len(text) > 80:
            text = text[:77] + "..."
            
        print(f"{i:2d}. {role_icon} {msg.role.value.upper()}: {text}")
    
    print()
    print("最終結果:")
    print("-" * 40)
    
    # 最終アーティファクト
    if response.artifacts:
        final_text = response.artifacts[0].parts[0].root.text
        print(final_text[:200] + "..." if len(final_text) > 200 else final_text)
    
    print("=" * 60)

In [None]:
format_agent_trace(result)

おめでとうございます！Amazon AgentCore Runtime で A2A プロトコルを使用した最初のエージェントをデプロイしました！

それでは、次のラボに進みましょう。