# Lab 4: Persistence and Streaming

## 環境設定

エージェント環境の構築から始めます。このプロセスには、必要な環境変数の読み込み、必要なモジュールのインポート、Tavilyサーチツールの初期化、エージェントの状態の定義、そして最終的にエージェントの構築が含まれます。

In [1]:
from dotenv import load_dotenv
import json
import os
import re
import sys
import warnings
import boto3
from botocore.config import Config
warnings.filterwarnings("ignore")
import logging

# ローカルモジュールのインポート
dir_current = os.path.abspath("")
dir_parent = os.path.dirname(dir_current)
if dir_parent not in sys.path:
    sys.path.append(dir_parent)
from utils import utils

# 基本設定
logger = utils.set_logger()  # ロガーの設定
pp = utils.set_pretty_printer()  # 整形出力用のプリンターを設定

# .envファイルまたはSecret Managerから環境変数を読み込む
_ = load_dotenv("../.env")
aws_region = os.getenv("AWS_REGION")  # AWS地域を環境変数から取得
tavily_ai_api_key = utils.get_tavily_api("TAVILY_API_KEY", aws_region)  # Tavily APIキーを取得

# Bedrockの設定
bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={"max_attempts": 0})  # タイムアウトと再試行の設定

# Bedrockランタイムクライアントの作成
bedrock_rt = boto3.client("bedrock-runtime", region_name=aws_region, config=bedrock_config)

# 利用可能なモデルを確認するためのBedrockクライアントの作成
bedrock = boto3.client("bedrock", region_name=aws_region, config=bedrock_config)

[2025-07-09 19:07:56,410] p1185132 {utils.py:66} INFO - TAVILY_API_KEY variable correctly retrieved from the .env file.


In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_aws import ChatBedrockConverse
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [3]:
tool = TavilySearchResults(max_results=2)

  tool = TavilySearchResults(max_results=2)


## 永続性の実装

次に、永続性の実装に注目します。これを達成するために、LangGraphにおけるチェックポインターの概念を導入します。チェックポインターの機能は、エージェントの処理グラフの各ノードの後および間に状態のスナップショットを作成することです。

#リソース LangGraphの機能と使用法についてより包括的に理解するには、公式のLangGraphドキュメントを参照してください。

この実装では、チェックポインターとしてSQLiteセーバーを使用します。この軽量なソリューションは、組み込みのデータベースエンジンであるSQLiteを活用しています。このデモでは、インメモリデータベースを使用していますが、本番環境では外部データベースに接続するように簡単に適応できることに注意することが重要です。LangGraphは、より堅牢なデータベースシステムが必要なシナリオのために、RedisやPostgresなどの他の永続性ソリューションもサポートしています。

チェックポインターを初期化した後、それを`graph.compile`メソッドに渡します。エージェントを強化して`checkpointer`パラメータを受け入れるようにし、それをメモリオブジェクトに設定しました。

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

## エージェントクラス：詳細な検証

`Agent`クラスは私たちの実装の基盤として機能し、言語モデル（Claude）、ツール（Tavilyサーチなど）、および全体的な会話の流れの間の相互作用を調整します。その主要なコンポーネントを検討しましょう：

1. `__init__`メソッド：このイニシャライザはモデル、ツール、チェックポインター、およびオプションのシステムメッセージでエージェントを設定します。エージェントの動作を定義する状態グラフを構築します。

2. `call_bedrock`メソッド：このメソッドはAmazon Bedrockを介してClaudeモデルを呼び出す責任があります。現在の状態（メッセージ）を処理し、モデルの応答を返します。

3. `exists_action`メソッド：このメソッドは、モデルからの最新のメッセージに何らかのツール呼び出し（実行されるアクション）が含まれているかどうかを評価します。

4. `take_action`メソッド：このメソッドはモデルによって指定されたツール呼び出しを実行し、結果を返します。

`Agent`クラスは会話の流れを管理するために`StateGraph`を利用し、明確で管理しやすい構造を維持しながら複雑な相互作用を可能にします。この設計選択により、永続性とストリーミング機能の実装が容易になります。

## ストリーミングの実装

エージェントが設定されたので、ストリーミング機能を実装できます。考慮すべきストリーミングの主な側面は2つあります：

1. メッセージストリーミング：これには、次のアクションを決定するAIメッセージやアクションの結果を表す観察メッセージなど、個々のメッセージのストリーミングが含まれます。

2. トークンストリーミング：これには、生成される際の言語モデルの応答の各トークンのストリーミングが含まれます。

まず、メッセージストリーミングを実装します。人間のメッセージ（例：「SFの天気はどうですか？」）を作成し、スレッド設定を導入します。このスレッド設定は、永続的なチェックポインター内で複数の会話を同時に管理するために重要であり、複数のユーザーにサービスを提供する本番アプリケーションには必須です。

`invoke`の代わりに`stream`メソッドを使用してグラフを呼び出し、メッセージ辞書とスレッド設定を渡します。これにより、状態のリアルタイム更新を表すイベントのストリームが返されます。

実行すると、結果のストリームが観察されます：まず、取るべきアクションを決定するClaudeからのAIメッセージ、次にTavilyサーチ結果を含むツールメッセージ、そして最後に、最初のクエリに答えるClaudeからの別のAIメッセージです。

In [5]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        # システムメッセージを設定
        self.system = system
        # 状態グラフを初期化
        graph = StateGraph(AgentState)
        # LLMノードを追加（Bedrockを呼び出す）
        graph.add_node("llm", self.call_bedrock)
        # アクションノードを追加（ツールを実行する）
        graph.add_node("action", self.take_action)
        # 条件付きエッジを追加：ツール呼び出しがあればアクションへ、なければ終了
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        # アクションからLLMへのエッジを追加（ツール実行後に再びLLMへ）
        graph.add_edge("action", "llm")
        # エントリーポイントをLLMに設定
        graph.set_entry_point("llm")
        # チェックポインターを使用してグラフをコンパイル
        self.graph = graph.compile(checkpointer=checkpointer)
        # ツールを名前でアクセスできるように辞書に格納
        self.tools = {t.name: t for t in tools}
        # モデルにツールをバインド
        self.model = model.bind_tools(tools)

    def call_bedrock(self, state: AgentState):
        # 現在のメッセージ履歴を取得
        messages = state["messages"]
        # システムメッセージがあれば先頭に追加
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        # モデルを呼び出して応答を取得
        message = self.model.invoke(messages)
        # 応答をメッセージリストに追加して返す
        return {"messages": [message]}

    def exists_action(self, state: AgentState):
        # 最新のメッセージを取得
        result = state["messages"][-1]
        # ツール呼び出しがあるかどうかを確認
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        # 最新のメッセージからツール呼び出しを取得
        tool_calls = state["messages"][-1].tool_calls
        results = []
        # 各ツール呼び出しを実行
        for t in tool_calls:
            print(f"Calling: {t}")
            # ツールを名前で呼び出し、引数を渡して実行
            result = self.tools[t["name"]].invoke(t["args"])
            # 結果をツールメッセージとして追加
            results.append(ToolMessage(tool_call_id=t["id"], name=t["name"], content=str(result)))
        print("Back to the model!")
        # ツール実行結果をメッセージリストとして返す
        return {"messages": results}

In [6]:
prompt = """あなたはスマートな研究アシスタントです。検索エンジンを使用して情報を調べてください。\
複数の呼び出しを行うことができます（同時または連続して）。\
何を求めているか確信がある場合にのみ、情報を検索してください。\
フォローアップの質問をする前に情報を調べる必要がある場合は、それも許可されています！
"""

model = ChatBedrockConverse(
    client=bedrock_rt,
    model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
    temperature=0,
    max_tokens=None,
)
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [7]:
# 「サンフランシスコの天気は？」という質問を含む人間のメッセージを作成
messages = [HumanMessage(content="サンフランシスコの天気は？")]

In [8]:
thread = {"configurable": {"thread_id": "1"}}

In [9]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v["messages"])

[AIMessage(content=[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'input': {'query': 'サンフランシスコ 現在の天気'}, 'id': 'tooluse_cvFg5jASQaKjge6ynQaIxA'}], additional_kwargs={}, response_metadata={'ResponseMetadata': {'RequestId': '66f12951-b481-4914-950f-9c30e12edc99', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 09 Jul 2025 19:19:36 GMT', 'content-type': 'application/json', 'content-length': '328', 'connection': 'keep-alive', 'x-amzn-requestid': '66f12951-b481-4914-950f-9c30e12edc99'}, 'RetryAttempts': 0}, 'stopReason': 'tool_use', 'metrics': {'latencyMs': [1823]}, 'model_name': 'us.anthropic.claude-3-5-haiku-20241022-v1:0'}, id='run--f46242e8-45bf-42d0-9392-74875fb8204c-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'サンフランシスコ 現在の天気'}, 'id': 'tooluse_cvFg5jASQaKjge6ynQaIxA', 'type': 'tool_call'}], usage_metadata={'input_tokens': 496, 'output_tokens': 71, 'total_tokens': 567, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})]
Calling:

## 永続性の実証

永続性の実装の効果を示すために、フォローアップの質問「LAではどうですか？」で会話を続けます。同じスレッドIDを使用することで、前回のやり取りからの連続性を確保します。Claudeはチェックポイントシステムによって提供される永続性のおかげで、私たちがまだ天気状況について問い合わせていることを理解し、コンテキストを維持します。

スレッドIDの重要性をさらに強調するために、それを変更して「どちらが暖かいですか？」という質問をすることができます。元のスレッドIDでは、Claudeは正確に温度を比較できます。しかし、スレッドIDを変更すると、会話履歴にアクセスできなくなるため、Claudeはコンテキストを失います。

In [11]:
messages = [HumanMessage(content="東京はどうですか?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content=[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'input': {'query': '東京 現在の天気 気温'}, 'id': 'tooluse_eIfMJaYWQLaT2HIKNgHQCw'}], additional_kwargs={}, response_metadata={'ResponseMetadata': {'RequestId': '6ea56c40-7ea6-4302-9c92-d31a90dc8c60', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 09 Jul 2025 19:31:15 GMT', 'content-type': 'application/json', 'content-length': '429', 'connection': 'keep-alive', 'x-amzn-requestid': '6ea56c40-7ea6-4302-9c92-d31a90dc8c60'}, 'RetryAttempts': 0}, 'stopReason': 'tool_use', 'metrics': {'latencyMs': [1763]}, 'model_name': 'us.anthropic.claude-3-5-haiku-20241022-v1:0'}, id='run--a9545749-c54f-4bf3-9c90-51520a55288e-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': '東京 現在の天気 気温'}, 'id': 'tooluse_eIfMJaYWQLaT2HIKNgHQCw', 'type': 'tool_call'}], usage_metadata={'input_tokens': 4986, 'output_tokens': 68, 'total_tokens': 5054, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})]

In [12]:
messages = [HumanMessage(content="どちらの方が暖かいですか?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='現在の気温を比較すると：\n\n1. サンフランシスコ：約18℃\n2. ロサンゼルス：約24℃\n3. 東京：約29-31℃\n\n東京が最も暖かく、次にロサンゼルス、そしてサンフランシスコが最も涼しいです。具体的には、東京はロサンゼルスよりも約5-7℃、サンフランシスコよりも約11-13℃高温となっています。\n\nしたがって、東京が最も暖かい都市です。', additional_kwargs={}, response_metadata={'ResponseMetadata': {'RequestId': '7b5840fb-1974-47a8-916c-1dcd35a254a3', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 09 Jul 2025 19:31:47 GMT', 'content-type': 'application/json', 'content-length': '753', 'connection': 'keep-alive', 'x-amzn-requestid': '7b5840fb-1974-47a8-916c-1dcd35a254a3'}, 'RetryAttempts': 0}, 'stopReason': 'end_turn', 'metrics': {'latencyMs': [6824]}, 'model_name': 'us.anthropic.claude-3-5-haiku-20241022-v1:0'}, id='run--3f08d03d-f424-40ef-80fa-7458375fe0c7-0', usage_metadata={'input_tokens': 7808, 'output_tokens': 165, 'total_tokens': 7973, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})]}


In [13]:
messages = [HumanMessage(content="どちらの方が暖かいですか?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='申し訳ありませんが、「どちらの」という具体的な対象が明確ではありません。比較したい2つのものについて、もう少し詳しく教えていただけますか？例えば、地域、気候、物、服、食べ物など、何と何を比較したいのでしょうか？具体的な情報をお聞かせください。', additional_kwargs={}, response_metadata={'ResponseMetadata': {'RequestId': '73045f3e-d24a-4375-b2ed-652107cccf75', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 09 Jul 2025 19:32:40 GMT', 'content-type': 'application/json', 'content-length': '546', 'connection': 'keep-alive', 'x-amzn-requestid': '73045f3e-d24a-4375-b2ed-652107cccf75'}, 'RetryAttempts': 0}, 'stopReason': 'end_turn', 'metrics': {'latencyMs': [3263]}, 'model_name': 'us.anthropic.claude-3-5-haiku-20241022-v1:0'}, id='run--c565b235-15b6-440d-89b4-d76b77210ab1-0', usage_metadata={'input_tokens': 497, 'output_tokens': 111, 'total_tokens': 608, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})]}


## トークンレベルのストリーミング

ストリーミングへのより細かいアプローチとして、`astream_events`メソッドを使用してトークンレベルの更新を実装します。この非同期メソッドは非同期チェックポインターを必要とし、`AsyncSqliteSaver`を使用して実装します。

非同期プログラミングにより、アプリケーションはメイン実行スレッドをブロックすることなく、複数の操作を同時に処理できます。AIモデルからのトークンのストリーミングのコンテキストでは、これはトークンが生成されるにつれて処理して表示することを意味し、より応答性の高いユーザーエクスペリエンスをもたらします。`astream_events`メソッドはこの非同期アプローチを活用して、Claudeからトークンレベルの更新を効率的にストリーミングします。

新しいスレッドIDで新しい会話を開始し、イベントを反復処理して、特に「on_chat_model_stream」タイプのイベントを探します。これらのイベントに遭遇すると、コンテンツを抽出して表示します。

実行すると、リアルタイムでトークンがストリーミングされるのを観察できます。Claudeが関数を呼び出す（ストリーミング可能なコンテンツを生成しない）のを見た後、最終的な応答がトークンごとにストリーミングされるのを見ることができます。

In [17]:
# # 新しいバージョンのLangGraphを使用している場合、パッケージは分離されています：
# # !pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

# 非同期SQLiteセーバーを使用してチェックポイントデータベースに接続
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as memory:
    # メモリチェックポインターを使用してエージェントを初期化
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)
    
    # 「サンフランシスコの天気は？」という質問を含むメッセージを作成
    messages = [HumanMessage(content="サンフランシスコの天気は？")]
    
    # スレッドIDを設定（会話の永続性を管理するため）
    thread = {"configurable": {"thread_id": "4"}}
    
    # イベントストリームを非同期で処理
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        # イベントの種類を取得
        kind = event["event"]
        
        # チャットモデルのストリーミングイベントを処理
        if kind == "on_chat_model_stream":
            # チャンクからコンテンツを抽出
            content = event["data"]["chunk"].content
            
            if content:
                # 空でないコンテンツのみを表示
                # Amazon Bedrockのコンテキストでは、空のコンテンツはモデルがツールの呼び出しを要求していることを意味します
                # そのため、空でないコンテンツのみを表示します
                print(content, end="|")

[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'id': 'tooluse_lW7zo05zRXy65kFqL949tw', 'index': 0}]|[{'type': 'tool_use', 'input': '', 'id': None, 'index': 0}]|[{'type': 'tool_use', 'input': '{"query', 'id': None, 'index': 0}]|[{'type': 'tool_use', 'input': '": "サンフ', 'id': None, 'index': 0}]|[{'type': 'tool_use', 'input': 'ランシスコ 現在', 'id': None, 'index': 0}]|[{'type': 'tool_use', 'input': 'の天気', 'id': None, 'index': 0}]|[{'type': 'tool_use', 'input': '"}', 'id': None, 'index': 0}]|[{'index': 0}]|Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'サンフランシスコ 現在の天気'}, 'id': 'tooluse_lW7zo05zRXy65kFqL949tw', 'type': 'tool_call'}
Back to the model!
[{'type': 'text', 'text': '\n\n現', 'index': 0}]|[{'type': 'text', 'text': '在の', 'index': 0}]|[{'type': 'text', 'text': 'サンフランシ', 'index': 0}]|[{'type': 'text', 'text': 'スコの天', 'index': 0}]|[{'type': 'text', 'text': '気は以', 'index': 0}]|[{'type': 'text', 'text': '下のようです', 'index': 0}]|[{'type': 'text', 'text': '：\n\n- 気'

## 結論

この実習では、Amazon Bedrock上のAnthropic Claudeモデルを使用した永続性とストリーミングの実装について包括的に探求しました。これらの概念は実装が簡単ですが、本番環境グレードのAIアプリケーションを構築するための強力な機能を提供します。

複数の同時会話を管理する能力と、会話再開のための堅牢なメモリシステムは、スケーラブルなAIソリューションにとって重要です。さらに、最終トークンと中間メッセージの両方をストリーミングする能力は、AIの意思決定プロセスに対する比類のない可視性を提供します。

永続性はまた、人間を介在させる相互作用を可能にする上で重要な役割を果たしており、これは次の実習でより深く探求するトピックです。

これらの概念の実践的な意味をより深く理解するために、本番環境のAIアプリケーションにおける永続性とストリーミングの実際のケーススタディを探索することをお勧めします。