# Strands Agents レスポンスの高度な処理

Strands Agents では、以下の 2 つの方法を使用して、エージェント実行中に発生したイベントをインターセプトして処理できます。

- **非同期イテレータ**: FastAPI、aiohttp、Django Channels などの非同期フレームワークに最適です。これらの環境向けに、SDK は非同期イテレータを返す `stream_async` メソッドを提供しています。
- **コールバックハンドラ**: エージェント実行中に発生したイベントをインターセプトして処理できます。これにより、リアルタイム監視、カスタム出力フォーマット、外部システムとの統合が可能になります。

この例では、両方の方法を使用してエージェントの呼び出しを処理する方法を説明します。

## エージェントの詳細
<div style="float: left; margin-right: 20px; ">

|機能 |説明 |
|--------------------|---------------------------------------------------|
|使用する機能 |非同期イテレータ、コールバックハンドラ |
|エージェントの構造 |単一エージェントアーキテクチャ |
|使用するネイティブツール |計算機 |
|カスタムツールを作成しました|天気予報|

</div>

## アーキテクチャ

<div style="text-align:left;">
<img src="images/architecture.png" width="65%" />
</div>

## 主な機能
* ストリーミング用非同期イテレータ
* コールバックハンドラー

## セットアップと前提条件

### 前提条件
* Python 3.10 以上
* AWS アカウント
* Amazon Bedrock で有効化された Anthropic Claude 3.7

Strands Agent に必要なパッケージをインストールしましょう

In [None]:
# 前提条件のインストール
!pip install -r requirements.txt

### 依存パッケージのインポート

依存パッケージをインポートしましょう

In [None]:
import asyncio

import httpx
import nest_asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from strands import Agent, tool
from strands_tools import calculator

## 方法 1 - ストリーミング用の非同期イテレータ


Strands Agents は、`stream_async` メソッドを通じて非同期イテレータをサポートし、Web サーバー、API、その他の非同期アプリケーションなどの非同期環境でエージェントの応答をリアルタイムにストリーミングできるようにします。

この例はノートブックで説明しているため、`asyncio.run` と `loop.run_until_complete` をネストして使用できるように `nest_asyncio` を適用する必要があります。

In [None]:
nest_asyncio.apply()

### stream_async を使用したエージェントの作成と呼び出し

組み込みの計算ツールを備え、`callback_handler` を持たないエージェントを作成しましょう。`stream_async` メソッドを使用して、ストリーミングされたエージェントイベントを反復処理します。

In [None]:
# コールバックハンドラなしでエージェントを初期化する
agent = Agent(tools=[calculator], callback_handler=None)

# ストリームされたエージェントイベントを反復する非同期関数


async def process_streaming_response():
    agent_stream = agent.stream_async("Calculate 2+2")
    async for event in agent_stream:
        print(event)


# Run the agent
asyncio.run(process_streaming_response())

### イベントループのライフサイクルの追跡

この例は、イベントループのライフサイクルとイベント同士の関連性を示しています。Strands Agent の実行フローを理解するのに役立ちます。

エージェントストリームイベントをより適切に分析するために、出力形式のコードを作成しましょう。このコードでは、引き続き同じエージェントを使用します。

In [None]:
# ストリームされたエージェントイベントを反復する非同期関数


async def process_streaming_response():
    agent_stream = agent.stream_async("フランスの首都はどこですか？42+7はいくつですか？")
    async for event in agent_stream:
        # イベントループのライフサイクルを追跡
        if event.get("init_event_loop", False):
            print("🔄 Event loop initialized")
        elif event.get("start_event_loop", False):
            print("▶️ Event loop cycle starting")
        elif event.get("start", False):
            print("📝 New cycle started")
        elif "message" in event:
            print(f"📬 New message created: {event['message']['role']}")
        elif event.get("complete", False):
            print("✅ Cycle completed")
        elif event.get("force_stop", False):
            print(
                f"🛑 Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}"
            )

        # ツールの使用状況を追跡
        if "current_tool_use" in event and event["current_tool_use"].get("name"):
            tool_name = event["current_tool_use"]["name"]
            print(f"🔧 Using tool: {tool_name}")

        # 出力をすっきりと保つためにテキストのスニペットのみを表示
        if "data" in event:
            # デモ用に各チャンクの最初の 20 文字のみを表示
            data_snippet = event["data"][:20] + (
                "..." if len(event["data"]) > 20 else ""
            )
            print(f"📟 Text: {data_snippet}")


#エージェントを実行
asyncio.run(process_streaming_response())

### FastAPI 統合

`stream_async` を FastAPI と統合して、アプリケーションへのストリーミングエンドポイントを作成することもできます。そのために、エージェントに `weather_forecast` ツールを追加します。アーキテクチャの更新は次のようになります。

<div style="text-align:left;">
<img src="images/architecture_2.png" width="65%" />
</div>

In [None]:
# ツールの定義


@tool
def weather_forecast(city: str, days: int = 3) -> str:
    return f"Weather forecast for {city} for the next {days} days..."


# FastAPI app
app = FastAPI()


class PromptRequest(BaseModel):
    prompt: str


@app.post("/stream")
async def stream_response(request: PromptRequest):
    async def generate():
        agent = Agent(tools=[calculator, weather_forecast], callback_handler=None)
        try:
            async for event in agent.stream_async(request.prompt):
                if "data" in event:
                    yield event["data"]
        except Exception as e:
            yield f"Error: {str(e)}"

    return StreamingResponse(generate(), media_type="text/plain")


# ブロックせずにサーバーを起動する関数


async def start_server():
    config = uvicorn.Config(app, host="0.0.0.0", port=8001, log_level="info")
    server = uvicorn.Server(config)
    await server.serve()


# サーバーをバックグラウンドタスクとして実行
if "server_task" not in globals():
    server_task = asyncio.create_task(start_server())
    await asyncio.sleep(0.1)  # サーバーの起動に時間をかける

print("✅ Server is running at http://0.0.0.0:8001")

#### FastAPIエージェントの呼び出し
これで、プロンプトでエージェントを呼び出すことができます

In [None]:
async def fetch_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://0.0.0.0:8001/stream",
            json={"prompt": "ニューヨークの天気はどうですか?"},
        ) as response:
            async for line in response.aiter_lines():
                if line.strip():  # Skip empty lines
                    print("Received:", line)


await fetch_stream()

## 方法 2 - ストリーミング用のコールバックハンドラー

コールバックハンドラは、Strandsエージェントの強力な機能であり、エージェント実行中に発生したイベントをインターセプトして処理することができます。これにより、リアルタイム監視、カスタム出力フォーマット、外部システムとの統合が可能になります。



コールバックハンドラーは、エージェントのライフサイクル中に発生するイベントをリアルタイムで受信します。

- モデルからのテキスト生成
- ツールの選択と実行
- 推論プロセス
- エラーと補完

では、イベント入力をフォーマットしてツールの使用状況とモデル出力を強調表示するカスタムコールバックハンドラー関数を作成しましょう。そのためには、エージェントを計算機ツールのみで使用します。

<div style="text-align:left;">
<img src="images/architecture.png" width="65%" />
</div>

In [None]:
def custom_callback_handler(**kwargs):
    # ストリームデータの処理
    if "data" in kwargs:
        print(f"MODEL OUTPUT: {kwargs['data']}")
    elif "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
        print(f"\nUSING TOOL: {kwargs['current_tool_use']['name']}")


# カスタムコールバックハンドラを持つエージェントを作成
agent = Agent(tools=[calculator], callback_handler=custom_callback_handler)

agent("2+2を計算してください")

### おめでとうございます！

このノートブックでは、非同期イテレータとコールバックハンドラを使用してエージェントの出力をストリーミングする方法を学びました。