# ラボ 4: 永続性とストリーミング

## 環境設定

まず、エージェント環境を確立します。このプロセスには、必要な環境変数の読み込み、必要なモジュールのインポート、Tavily 検索ツールの初期化、エージェント状態の定義、そして最後にエージェントの構築が含まれます。

In [None]:
from dotenv import load_dotenv
import os
import sys
import json
import re
import pprint
import boto3
from botocore.client import Config
import warnings

warnings.filterwarnings("ignore")
import logging

# import local modules
dir_current = os.path.abspath("")
dir_parent = os.path.dirname(dir_current)
if dir_parent not in sys.path:
    sys.path.append(dir_parent)
from utils import utils

bedrock_config = Config(
    connect_timeout=120, read_timeout=120, retries={"max_attempts": 0}
)

# Set basic configs
logger = utils.set_logger()
pp = utils.set_pretty_printer()

# Load environment variables from .env file or Secret Manager
_ = load_dotenv("../.env")
aws_region = os.getenv("AWS_REGION")
tavily_ai_api_key = utils.get_tavily_api("TAVILY_API_KEY", aws_region)

# Set bedrock configs
bedrock_config = Config(
    connect_timeout=120, read_timeout=120, retries={"max_attempts": 0}
)

# Create a bedrock runtime client
bedrock_rt = boto3.client(
    "bedrock-runtime", region_name=aws_region, config=bedrock_config
)

# Create a bedrock client to check available models
bedrock = boto3.client("bedrock", region_name=aws_region, config=bedrock_config)


In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_aws import ChatBedrockConverse
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [None]:
tool = TavilySearchResults(max_results=2)

## 永続性の実装

次に、永続性の実装に注目します。これを実現するために、LangGraph にチェックポインタの概念を導入します。チェックポインタの機能は、エージェントの処理グラフ内の各ノードの後と各ノード間で状態スナップショットを作成することです。

#リソース LangGraph の機能と使用方法をより包括的に理解するには、公式の LangGraph ドキュメントを参照してください。

この実装では、チェックポインタとして SQLite セーバーを使用します。この軽量ソリューションは、組み込みデータベース エンジンである SQLite を活用します。このデモではメモリ内データベースを使用していますが、実稼働環境の外部データベースに接続するために簡単に適応できることに留意してください。LangGraph は、より堅牢なデータベース システムを必要とするシナリオ向けに、Redis や Postgres などの他の永続性ソリューションもサポートしています。

チェックポインタを初期化した後、`graph.compile` メソッドに渡します。エージェントを拡張して、メモリ オブジェクトに設定する `checkpointer` パラメータを受け入れるようにしました。

In [None]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

## エージェント クラス: 詳細な調査

`Agent` クラスは実装の要として機能し、言語モデル (Claude)、ツール (Tavily 検索など)、および全体的な会話フローの間のやり取りを調整します。その主要なコンポーネントを調べてみましょう。

1. `__init__` メソッド: この初期化子は、モデル、ツール、チェックポインター、およびオプションのシステム メッセージを使用してエージェントを設定します。エージェントの動作を定義する状態グラフを構築します。

2. `call_bedrock` メソッド: このメソッドは、Amazon Bedrock を介して Claude モデルを呼び出す役割を担います。現在の状態 (メッセージ) を処理し、モデルの応答を返します。

3. `exists_action` メソッド: このメソッドは、モデルからの最新のメッセージにツール呼び出し (実行するアクション) が含まれているかどうかを評価します。

4. `take_action` メソッド: このメソッドは、モデルによって指定されたツール呼び出しを実行し、結果を返します。

`Agent` クラスは `StateGraph` を使用して会話フローを管理し、明確で管理しやすい構造を維持しながら複雑なインタラクションを可能にします。この設計選択により、永続性とストリーミング機能の実装が容易になります。

## ストリーミングの実装

エージェントが構成されたので、ストリーミング機能を実装できます。考慮すべきストリーミングの主な側面は 2 つあります。

1. メッセージ ストリーミング: 次のアクションを決定する AI メッセージや、アクションの結果を表す観察メッセージなど、個々のメッセージをストリーミングします。

2. トークン ストリーミング: 言語モデルの応答の各トークンが生成されるたびにストリーミングします。

まず、メッセージ ストリーミングを実装します。人間のメッセージ (「サンフランシスコの天気はどうですか?」など) を作成し、スレッド構成を導入します。このスレッド構成は、永続的なチェックポインター内で複数の会話を同時に管理するために重要であり、複数のユーザーにサービスを提供する実稼働アプリケーションに不可欠です。

`invoke` ではなく `stream` メソッドを使用してグラフを呼び出し、メッセージ ディクショナリとスレッド構成を渡します。これにより、状態へのリアルタイム更新を表すイベント ストリームが返されます。

実行すると、結果のストリームが表示されます。最初は、実行するアクションを決定する Claude からの AI メッセージ、次に Tavily 検索結果を含むツール メッセージ、最後に最初のクエリに回答する Claude からの別の AI メッセージです。

In [None]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_bedrock)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges(
            "llm", self.exists_action, {True: "action", False: END}
        )
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_bedrock(self, state: AgentState):
        messages = state["messages"]
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {"messages": [message]}

    def exists_action(self, state: AgentState):
        result = state["messages"][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state["messages"][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t["name"]].invoke(t["args"])
            results.append(
                ToolMessage(tool_call_id=t["id"], name=t["name"], content=str(result))
            )
        print("Back to the model!")
        return {"messages": results}

In [None]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""


model = ChatBedrockConverse(
    client=bedrock_rt,
    model="anthropic.claude-3-haiku-20240307-v1:0",
    temperature=0,
    max_tokens=None,
)
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

> 追記　プロンプトの翻訳

```
あなたは賢いリサーチ アシスタントです。検索エンジンを使用して情報を検索してください。\
複数の通話 (同時にまたは連続して) を行うことができます。\
必要な情報がわかっている場合にのみ情報を検索してください。\
フォローアップの質問をする前に情報を検索する必要がある場合は、検索することができます。
```

In [None]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [None]:
thread = {"configurable": {"thread_id": "1"}}

In [None]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v["messages"])

## 永続性のデモンストレーション

永続性の実装の有効性を示すために、会話を続け、「LA はどうですか?」というフォローアップの質問を行います。同じスレッド ID を使用することで、前回のやり取りからの連続性が確保されます。Claude はコンテキストを維持し、チェックポイント システムによって提供される永続性により、私たちがまだ天候について尋ねていることを理解します。

スレッド ID を変更して「どちらが暖かいですか?」という質問を行うことで、スレッド ID の重要性をさらに強調できます。元のスレッド ID を使用すると、Claude は正確に温度を比較できます。ただし、スレッド ID を変更すると、会話履歴にアクセスできなくなるため、Claude はコンテキストを失います。

In [None]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

In [None]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

In [None]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

## トークン レベルのストリーミング

ストリーミングをより細かく行うために、`astream_events` メソッドを使用してトークン レベルの更新を実装します。この非同期メソッドには非同期チェックポイントが必要であり、これを `AsyncSqliteSaver` を使用して実装します。

非同期プログラミングにより、アプリケーションはメイン実行スレッドをブロックすることなく、複数の操作を同時に処理できます。AI モデルからのトークンのストリーミングのコンテキストでは、これはトークンが生成されるときに処理および表示することになり、ユーザー エクスペリエンスの応答性が向上します。`astream_events` メソッドは、この非同期アプローチを利用して、Claude からのトークン レベルの更新を効率的にストリーミングします。

新しいスレッド ID で新しい会話を開始し、イベントを反復処理します。具体的には、"on_chat_model_stream" タイプのイベントを探します。これらのイベントに遭遇すると、コンテンツを抽出して表示します。

実行すると、トークンのストリーミングがリアルタイムで観察されます。 Claude が関数 (ストリーミング可能なコンテンツは生成しません) を呼び出し、その後に最終応答がトークンごとにストリーミングされることがわかります。

In [None]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# # If you are using a newer version of LangGraph, the package was separated:
# # !pip install langgraph-checkpoint-sqlite

# from langgraph.checkpoint.memory import MemorySaver
# from langgraph.checkpoint.sqlite import SqliteSaver
# from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events(
        {"messages": messages}, thread, version="v1"
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of Amazon Bedrock means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

## 結論

このラボでは、Amazon Bedrock 上の Anthropic の Claude モデルを使用して、永続性とストリーミングの実装について包括的に調査しました。これらの概念は簡単に実装できますが、本番環境レベルの AI アプリケーションを構築するための強力な機能を提供します。

複数の同時会話を管理する機能と、会話再開用の堅牢なメモリ システムを組み合わせることは、スケーラブルな AI ソリューションにとって不可欠です。さらに、最終トークンと中間メッセージの両方をストリーミングする機能により、AI の意思決定プロセスに対する比類のない可視性が提供されます。

永続性は、人間が関与するインタラクションを可能にする上でも重要な役割を果たします。このトピックについては、次のラボでさらに詳しく調査します。

これらの概念の実際的な意味をより深く理解するには、本番環境の AI アプリケーションにおける永続性とストリーミングの実際のケース スタディを調べることをお勧めします。