このアプリはAWS Bedrock（生成AIを使うサービス）を使用したチャットボットアプリです。  
  
技術的な特徴としては、以下の3点です。
- AWS Bedrockの使用  
- 生成AI APIリクエスト時にファンクションコーリング(Web検索)の使用  
- streaming生成関数による生成AIのレスポンスリアルタイム生成＆表示  
  
ハイレベルな内容ですが、ここまで出来たら社内のDX部隊にも自身をもって見せられるレベルです。  
- 全部理解できなくとも、このアプリを動かせるところまで行けたら十分にすごいレベルです。
- 不明な用語は適宜調べながら進めてください。


以下のようなイメージのアプリです。
<img src = "./img_chatbot.png">

このアプリでは、2種類のAPIを使用します  
 - AWS Bedrock Claude Sonnet 3.5 v2.0 
 - tavily

まずはAWS Bedrockを使用するためにやることを並べます。以下の順序で実践してみてください。  
- 1.AWS アカウントの作成：12ヶ月間無料で使用出来ます。12ヶ月間を超えても従量課金制のため、使用しなければ費用は発生しません。  
- 2.AWS BedorckでClaude 3.5 Sonnet v2.0有効化：以下の記事を参考にClaude 3.5 Sonnet V2.0へのリクエストを有効化してください
https://qiita.com/minorun365/items/e2202774ea357f311243
- 3.AWS ModelIDの取得：今回はクロスリージョンモデルを使用します。モデルIDをコピーしてメモ帳に保存しておいてください。
- 4.AWS accessidとaccess Keyの取得：AWSにリクエストを送る際に必要な認証情報を取得します。  
　以下の記事を参考に「aws_access_key_id」と「aws_secret_access_key」を取得してください。メモ帳に保存しておいてください。
https://qiita.com/raimu_hosoda/items/24b587fe44ced5262722

以下に参考画像を入れておきます。  
AWS Bedrockのモデルアクセスが有効化されている状態。  
<img src = "./斎藤aws環境_bedrock-model-access.png">
AWS Model IDを取得する画面
<img src = "./斎藤aws環境_cross-regin-inference.png">

次に、Tavily APIのAPI Keyを取得します。以下のリンクからAPI Keyを取得してください。(API Keyはメモ帳に保存しておいてください)  
https://tavily.com/  
以下のような画面です。  
<img src = "./Tavily_API画面.png">

ここまで作成できたら、.envファイルを作成して、環境変数を記載します。

In [None]:
#.envファイル内に以下を記載
aws_access_key_id ="AWSのアクセスキーid"
aws_secret_access_key="AWSのシークレットアクセスキー"
TAVILY_API_KEY="Tavily API Key"

いよいよコードを書いていきます。  
これから書くコードの理解度をあげるために、以下の動画の19:30まで見てから臨んでください。  
▼参考：AWS Bedrockによるモデル推論解説動画  
https://youtu.be/YnFch0WIPqw

いよいよコードを書いていきます。  
  
初めに、今回使用するライブラリをインポートします。  
 - boto3：AWS Bedrock（生成AIを使うためのサービス）を使用  
 - tavily：Web Searchエンジンとして使用  

In [5]:
import json
import boto3
import streamlit as st
from tavily import TavilyClient
import os
from dotenv import load_dotenv
import time
from datetime import datetime, timedelta

.envに記載した内容を読み込めるようにします。

In [6]:
load_dotenv()

True

aws bedrockからClaudeを使用するためのセッションを定義します。

In [8]:
#boto3のセッションを明示して使用。値は.envで管理（斎藤）
session = boto3.Session(
    aws_access_key_id=os.getenv("aws_access_key_id"),#自分のものに変更
    aws_secret_access_key=os.getenv("aws_secret_access_key"), #自分のものに変更
    region_name="ap-southeast-2"  # 必要に応じて
)

このセッションを用いて、クライアントを立ち上げます。  
また、モデルの設定も行います。

In [9]:
# Bedrock モデルの設定
# 注意：リクエスト数の制限を緩和するため、クロスリージョンモデルを使用。クロスリージョンモデルを使わない場合はほぼ確実にエラーが出る（斎藤）
#claude3.5 sonnet v2の場合：apac.anthropic.claude-3-5-sonnet-20241022-v2:0 → （斎藤aws環境）1分間に1リクエストのみ使用可能
#claude 3 Haikuの場合：apac.anthropic.claude-3-haiku-20240307-v1:0 → （斎藤aws環境）1分間に4リクエストのみ使用可能
MODEL_ID = "apac.anthropic.claude-3-5-sonnet-20241022-v2:0" #自分のものに変更

client = session.client("bedrock-runtime", region_name="ap-southeast-2")#リージョンは自分のものに変更

ここからファンクションコーリングを行う際のツールを設定していきます。  
まずは、ファンクションコーリングでWeb検索を実行する際の関数を定義します。

In [10]:
def web_search(query: str) -> dict:
    """
    Tavilyクライアントを使用してWeb検索を実行する関数

    Args:
        query: 検索クエリ文字列

    Returns:
        検索結果を含む辞書
    """
    #TavilyClient内にapi_keyを明示して使用。
    tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
    return tavily_client.search(query)

ファンクションコーリングを行う際の設定を書いていきます。  

In [11]:
# 利用可能なツールのマッピング
tools = {"web_search": web_search}

# ツール設定（ClaudeでWeb_searchを使えるよ、ということを教える→claudeのapiを叩くときに使う）
tool_config = {
    "tools": [
        {
            "toolSpec": {
                "name": "web_search",
                "description": "Web Search",
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Search query",
                            }
                        },
                        "required": ["query"],
                    }
                },
            }
        }
    ]
}

続いて、レスポンスをリアルタイムで表示する処理を書いていきます。  
ストリーミング処理については、以下の記事を読んでイメージを付けてから臨むと良いです。  
▼参考：AWS Bedrockのストリーミング応答を実装してみた - converse_streamのPythonサンプル  
https://dev.classmethod.jp/articles/aws-bedrock-converse-stream-python-sample/

In [12]:
def process_stream(response):
    """
    Bedrockからのストリームレスポンスを処理する関数

    Args:
        response: Bedrockからのレスポンスストリーム

    Returns:
        処理されたメッセージオブジェクト
    """
    content = []
    message = {"content": content}
    text = ""
    tool_use = {}
    reasoning = {}

    # UI出力要素を管理する辞書
    st_out = {}

    for chunk in response["stream"]:
        # メッセージ開始情報の処理
        if "messageStart" in chunk:#roleがuser or assitantを分けて格納
            message["role"] = chunk["messageStart"]["role"]

        # コンテンツブロック開始情報の処理（toolを使う場合の記録）
        elif "contentBlockStart" in chunk:
            tool = chunk["contentBlockStart"]["start"]["toolUse"]
            tool_use["toolUseId"] = tool["toolUseId"]
            tool_use["name"] = tool["name"]

        # コンテンツブロックのデルタ情報処理（chunk毎に送られてくる差分を処理）
        elif "contentBlockDelta" in chunk:
            delta = chunk["contentBlockDelta"]["delta"]
            index = str(chunk["contentBlockDelta"]["contentBlockIndex"])

            # ツール使用情報の処理（ツールを使う場合に検索ワードを繋げる）
            if "toolUse" in delta:
                if "input" not in tool_use:
                    tool_use["input"] = ""
                tool_use["input"] += delta["toolUse"]["input"]

                # UIへのツール使用情報の表示（ツールを使用している時に表示）
                if index not in st_out:
                    st_out[index] = st.expander("Tool use...", expanded=False).empty()
                st_out[index].write(tool_use["input"])

            # テキスト情報の処理
            elif "text" in delta:
                text += delta["text"]

                # UIへのテキスト情報の表示
                if index not in st_out:
                    st_out[index] = st.chat_message("assistant").empty()
                st_out[index].write(text)

            #以下はClaude 3.7のThinkingが使える場合に有効
            # # 推論内容の処理
            # if "reasoningContent" in delta:
            #     if "reasoningText" not in reasoning:
            #         reasoning["reasoningText"] = {"text": "", "signature": ""}

            #     if "text" in delta["reasoningContent"]:
            #         reasoning["reasoningText"]["text"] += delta["reasoningContent"][
            #             "text"
            #         ]
            #     if "signature" in delta["reasoningContent"]:
            #         reasoning["reasoningText"]["signature"] = delta["reasoningContent"][
            #             "signature"
            #         ]

            #     # UIへの推論内容の表示
            #     if index not in st_out:
            #         st_out[index] = st.expander("Thinking...", expanded=True).empty()
            #     st_out[index].write(reasoning["reasoningText"]["text"])

        # コンテンツブロック終了情報の処理
        elif "contentBlockStop" in chunk:
            # ツール使用情報のコンテンツへの追加
            if "input" in tool_use:
                tool_use["input"] = json.loads(tool_use["input"])
                content.append({"toolUse": tool_use})
                tool_use = {}
                
            # 推論内容のコンテンツへの追加
            elif "reasoningText" in reasoning:
                content.append({"reasoningContent": reasoning})
                reasoning = {}
                
            # テキスト情報のコンテンツへの追加
            else:
                content.append({"text": text})
                text = ""

        # メッセージ終了情報の処理
        elif "messageStop" in chunk:
            stop_reason = chunk["messageStop"]["stopReason"]

    return message

ここから、
- StreamlitによるチャットボットのUI実装
- Claude Sonnet 3.5 v2.0へのリクエスト
- ファンクションコーリング使用時の処理  
をするコードを書いていきます。  
少し複雑ですが、頑張って読み解いてみてください。  
streamlitのコードはjupyterファイルでは動作しないので注意が必要です。

In [None]:
# ==============================
# Streamlit UIの実装
# ==============================

st.title("Claude 3.5 Sonnet on Bedrock") #斎藤AWS環境ではClaude 3.7が使えなかったため、3.5 Sonnetを使用
st.subheader("extended thinking with web search")

# セッション状態の初期化（アプリ起動時にメッセージを格納する箱を用意）
if "messages" not in st.session_state:
    st.session_state.messages = []
messages = st.session_state.messages

# 過去のメッセージの表示
for message in messages:
    # テキストコンテンツのみをフィルタリング
    text_content = list(filter(lambda x: "text" in x.keys(), message["content"]))

    for content in text_content:
        with st.chat_message(message["role"]):
            st.write(content["text"])

# ユーザー入力の処理
if prompt := st.chat_input("質問を入力してください"):
    #Claude 3.7が使えないので、思考過程を吐かせるプロンプトでラッピング
    wrapped_prompt = f"""以下の問いに、1度だけ思考プロセスを回してから答えてください。
    
    #条件
    1つの質問につき、情報の検索は1度しか行ってはいけない

    Q: {prompt}"""
    with st.chat_message("user"):
        st.write(prompt)

    # ユーザーメッセージの追加
    messages.append({"role": "user", "content": [{"text": wrapped_prompt}]})
    
    # ツール使用のループ処理
    while True:
        # Bedrockモデルへのリクエスト
        response = client.converse_stream(
            modelId=MODEL_ID,
            messages=messages,
            toolConfig=tool_config,
            #claude 3.7以降だと以下のthinking機能も使える（斎藤）
            # additionalModelRequestFields={
            #     "thinking": {
            #         "type": "enabled",
            #         "budget_tokens": 1024,
            #     },
            # },
        )

        # レスポンスの処理とメッセージへの追加
        message = process_stream(response)
        messages.append(message)

        # ツール使用コンテンツのフィルタリング
        tool_use_content = list(
            filter(lambda x: "toolUse" in x.keys(), message["content"])
        )

        # ツール使用がなければループを終了
        if len(tool_use_content) == 0:
            break

        # 各ツール使用の処理
        for content in tool_use_content:
            tool_use_id = content["toolUse"]["toolUseId"]
            name = content["toolUse"]["name"]
            input = content["toolUse"]["input"]

            # ツールの実行と結果の取得
            result = tools[name](**input)

            # ツール結果メッセージの作成
            tool_result = {
                "toolUseId": tool_use_id,
                "content": [{"text": json.dumps(result, ensure_ascii=False)}],
            }

            # ツール結果メッセージの追加
            tool_result_message = {
                "role": "user",
                "content": [{"toolResult": tool_result}],
            }
            messages.append(tool_result_message)

ここまでで処理の全容が書き終わりましたので、上記をまとめてアプリの完成コードを作ります。  
以下のコードを.pyに書いてstreamlit run ●●.pyを実行するとアプリが動くようになります。  

In [None]:
import json
import boto3
import streamlit as st
from tavily import TavilyClient
import os
from dotenv import load_dotenv
import time
from datetime import datetime, timedelta

load_dotenv()

#boto3のセッションを明示して使用。keyは.envで管理（斎藤）
session = boto3.Session(
    aws_access_key_id=os.getenv("aws_access_key_id"),#自分のものに変更
    aws_secret_access_key=os.getenv("aws_secret_access_key"), #自分のものに変更
    region_name="ap-southeast-2"  # 必要に応じて
)

# Bedrock モデルの設定
# 注意：リクエスト数の制限を緩和するため、クロスリージョンモデルを使用。クロスリージョンモデルを使わない場合はほぼ確実にエラーが出る（斎藤）
#claude3.5 sonnet v2の場合：apac.anthropic.claude-3-5-sonnet-20241022-v2:0 → （斎藤aws環境）1分間に1リクエストのみ使用可能
#claude 3 Haikuの場合：apac.anthropic.claude-3-haiku-20240307-v1:0 → （斎藤aws環境）1分間に4リクエストのみ使用可能
MODEL_ID = "apac.anthropic.claude-3-5-sonnet-20241022-v2:0"

client = session.client("bedrock-runtime", region_name="ap-southeast-2")#リージョンは自分のものに変更

# ==============================
# ツール関連の実装
# ==============================


def web_search(query: str) -> dict:
    """
    Tavilyクライアントを使用してWeb検索を実行する関数

    Args:
        query: 検索クエリ文字列

    Returns:
        検索結果を含む辞書
    """
    #TavilyClient内にapi_keyを明示して使用。
    tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
    return tavily_client.search(query)


# 利用可能なツールのマッピング
tools = {"web_search": web_search}

# ツール設定（ClaudeでWeb_searchを使えるよ、ということを教える→claudeのapiを叩くときに使う）
tool_config = {
    "tools": [
        {
            "toolSpec": {
                "name": "web_search",
                "description": "Web Search",
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Search query",
                            }
                        },
                        "required": ["query"],
                    }
                },
            }
        }
    ]
}


def process_stream(response):
    """
    Bedrockからのストリームレスポンスを処理する関数

    Args:
        response: Bedrockからのレスポンスストリーム

    Returns:
        処理されたメッセージオブジェクト
    """
    content = []
    message = {"content": content}
    text = ""
    tool_use = {}
    reasoning = {}

    # UI出力要素を管理する辞書
    st_out = {}

    for chunk in response["stream"]:
        # メッセージ開始情報の処理
        if "messageStart" in chunk:#roleがuser or assitantを分けて格納
            message["role"] = chunk["messageStart"]["role"]

        # コンテンツブロック開始情報の処理（toolを使う場合の記録）
        elif "contentBlockStart" in chunk:
            tool = chunk["contentBlockStart"]["start"]["toolUse"]
            tool_use["toolUseId"] = tool["toolUseId"]
            tool_use["name"] = tool["name"]

        # コンテンツブロックのデルタ情報処理（chunk毎に送られてくる差分を処理）
        elif "contentBlockDelta" in chunk:
            delta = chunk["contentBlockDelta"]["delta"]
            index = str(chunk["contentBlockDelta"]["contentBlockIndex"])

            # ツール使用情報の処理（ツールを使う場合に検索ワードを繋げる）
            if "toolUse" in delta:
                if "input" not in tool_use:
                    tool_use["input"] = ""
                tool_use["input"] += delta["toolUse"]["input"]

                # UIへのツール使用情報の表示（ツールを使用している時に表示）
                if index not in st_out:
                    st_out[index] = st.expander("Tool use...", expanded=False).empty()
                st_out[index].write(tool_use["input"])

            # テキスト情報の処理
            elif "text" in delta:
                text += delta["text"]

                # UIへのテキスト情報の表示
                if index not in st_out:
                    st_out[index] = st.chat_message("assistant").empty()
                st_out[index].write(text)

            #以下はClaude 3.7のThinkingが使える場合に有効
            # # 推論内容の処理
            # if "reasoningContent" in delta:
            #     if "reasoningText" not in reasoning:
            #         reasoning["reasoningText"] = {"text": "", "signature": ""}

            #     if "text" in delta["reasoningContent"]:
            #         reasoning["reasoningText"]["text"] += delta["reasoningContent"][
            #             "text"
            #         ]
            #     if "signature" in delta["reasoningContent"]:
            #         reasoning["reasoningText"]["signature"] = delta["reasoningContent"][
            #             "signature"
            #         ]

            #     # UIへの推論内容の表示
            #     if index not in st_out:
            #         st_out[index] = st.expander("Thinking...", expanded=True).empty()
            #     st_out[index].write(reasoning["reasoningText"]["text"])

        # コンテンツブロック終了情報の処理
        elif "contentBlockStop" in chunk:
            # ツール使用情報のコンテンツへの追加
            if "input" in tool_use:
                tool_use["input"] = json.loads(tool_use["input"])
                content.append({"toolUse": tool_use})
                tool_use = {}
                
            # 推論内容のコンテンツへの追加
            elif "reasoningText" in reasoning:
                content.append({"reasoningContent": reasoning})
                reasoning = {}
                
            # テキスト情報のコンテンツへの追加
            else:
                content.append({"text": text})
                text = ""

        # メッセージ終了情報の処理
        elif "messageStop" in chunk:
            stop_reason = chunk["messageStop"]["stopReason"]

    return message


# ==============================
# Streamlit UIの実装
# ==============================

st.title("Claude 3.5 Sonnet on Bedrock") #斎藤AWS環境ではClaude 3.7が使えなかったため、3.5 Sonnetを使用
st.subheader("extended thinking with web search")

# セッション状態の初期化（アプリ起動時にメッセージを格納する箱を用意）
if "messages" not in st.session_state:
    st.session_state.messages = []
messages = st.session_state.messages

# 過去のメッセージの表示
for message in messages:
    # テキストコンテンツのみをフィルタリング
    text_content = list(filter(lambda x: "text" in x.keys(), message["content"]))

    for content in text_content:
        with st.chat_message(message["role"]):
            st.write(content["text"])

# ユーザー入力の処理
if prompt := st.chat_input("質問を入力してください"):
    #思考過程を吐かせるプロンプトでラッピング
    wrapped_prompt = f"""以下の問いに、1度だけ思考プロセスを回してから答えてください。
    
    #条件
    1つの質問につき、情報の検索は1度しか行ってはいけない

    Q: {prompt}"""
    with st.chat_message("user"):
        st.write(prompt)

    # ユーザーメッセージの追加
    messages.append({"role": "user", "content": [{"text": wrapped_prompt}]})
    
    # ツール使用のループ処理
    while True:
        # Bedrockモデルへのリクエスト
        response = client.converse_stream(
            modelId=MODEL_ID,
            messages=messages,
            toolConfig=tool_config,
            #claude 3.7以降だと以下のthinking機能も使える（斎藤）
            # additionalModelRequestFields={
            #     "thinking": {
            #         "type": "enabled",
            #         "budget_tokens": 1024,
            #     },
            # },
        )

        # レスポンスの処理とメッセージへの追加
        message = process_stream(response)
        messages.append(message)

        # ツール使用コンテンツのフィルタリング
        tool_use_content = list(
            filter(lambda x: "toolUse" in x.keys(), message["content"])
        )

        # ツール使用がなければループを終了
        if len(tool_use_content) == 0:
            break

        # 各ツール使用の処理
        for content in tool_use_content:
            tool_use_id = content["toolUse"]["toolUseId"]
            name = content["toolUse"]["name"]
            input = content["toolUse"]["input"]

            # ツールの実行と結果の取得
            result = tools[name](**input)

            # ツール結果メッセージの作成
            tool_result = {
                "toolUseId": tool_use_id,
                "content": [{"text": json.dumps(result, ensure_ascii=False)}],
            }

            # ツール結果メッセージの追加
            tool_result_message = {
                "role": "user",
                "content": [{"toolResult": tool_result}],
            }
            messages.append(tool_result_message)


（出る杭&Giver向け任意課題）
- 上記を理解する上で役に立った記事や動画があれば皆さんにシェアしてください
- 上記の資料をより分かりやすくするための解説やコメント、参考情報などを追加して、資料をアップデートしてください。  
  アップデートしたものはシェアしてください。教材として採用される可能性があります！
- 出来る方は、斎藤のGitHubリポジトリにPullRequestを送ってください！
https://github.com/takahiro1313/aws_streamlit_claude3.5sonnet_chatobot