# CrewAIマルチエージェントクルーをAmazon Bedrockモデルでホストする

## 概要

このチュートリアルでは、Amazon Bedrock AgentCoreランタイムを使用して、既存のマルチエージェントクルーをホストする方法を学びます。

CrewAIとAmazon Bedrockモデルの例に焦点を当てます。StrandsエージェントでAmazon Bedrockモデルを使用する場合は[こちら](../01-strands-with-bedrock-model)を、OpenAIモデルを使用する場合は[こちら](../03-strands-with-openai-model)を参照してください。

### チュートリアルの詳細

| 情報 | 詳細 |
|:--------------------|:-----------------------------------------------------------------------------|
| チュートリアルの種類 | 会話型 |
| エージェントの種類 | マルチエージェントクルー |
| エージェントフレームワーク | CrewAI |
| LLMモデル | Anthropic Claude 3.7 Sonnet |
| チュートリアルコンポーネント | AgentCoreランタイムでのエージェントホスティング。CrewAIとAmazon Bedrockモデルの使用 |
| チュートリアルの分野 | 分野横断 |
| 例の複雑さ | 簡単 |
| 使用したSDK | Amazon BedrockAgentCore Python SDKとboto3 |

### チュートリアルのアーキテクチャ

このチュートリアルでは、既存のマルチエージェントクルーをAgentCoreランタイムにデプロイする方法を説明します。

デモンストレーション目的で、Amazon Bedrockモデルを使用したCrewAIクルーを使用します。

例では、リサーチャーとアナリストの2つのエージェントがいるリサーチクルーを使用します。

### チュートリアルの主な機能

* Amazon Bedrock AgentCoreランタイムでのエージェントホスティング
* Amazon Bedrockモデルの使用
* CrewAIの使用

## 前提条件

このチュートリアルを実行するには、以下が必要です。
* Python 3.10 +
* uv パッケージマネージャー
* AWS 認証情報
* Docker の実行

さらに、いくつかの依存関係をインストールする必要があります。
* Amazon Bedrock AgentCore SDK
* CrewAI
* Langchain コミュニティパッケージ
* DuckDuckGo 検索

すべての必要な依存関係を pyproject.toml ファイルにパッケージ化しているので、便利にインストールできます。

In [None]:
!uv sync --active --force-reinstall

## マルチエージェントクルーの作成とローカルでの実験

AgentCore Runtimeにエージェントをデプロイする前に、実験目的でローカルで開発して実行してみましょう。

このガイドでは、トピックを調査・分析し、包括的なレポートを作成するための研究クルーを作成する手順を説明します。この実践的な例は、AIエージェントが協力して複雑なタスクを達成する方法を示しています。この例は、CrewAIが直接提供する[入門ガイド](https://docs.crewai.com/en/guides/crews/first-crew)から一部を改変したものです。

### エージェント、タスク、クルーの定義

まず、ローカルのCrewAIエージェントを定義するアーティファクトを作成します。
* agents.yaml: クルーに関わる2つのエージェントを定義
* tasks.yaml: クルーのエージェントが実行するタスクを定義
* crew.py: 定義されたタスクに取り組むエージェントからなるクルーを定義
* main.py: クルーの実行をローカルで開始するエントリポイント

In [None]:
%%writefile research_crew/config/agents.yaml
researcher:
  role: >
    Senior Research Specialist for {topic}
  goal: >
    Find comprehensive and accurate information about {topic}
    with a focus on recent developments and key insights
  backstory: >
    You are an experienced research specialist with a talent for
    finding relevant information from various sources. You excel at
    organizing information in a clear and structured manner, making
    complex topics accessible to others.
  llm: bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0

analyst:
  role: >
    Data Analyst and Report Writer for {topic}
  goal: >
    Analyze research findings and create a comprehensive, well-structured
    report that presents insights in a clear and engaging way
  backstory: >
    You are a skilled analyst with a background in data interpretation
    and technical writing. You have a talent for identifying patterns
    and extracting meaningful insights from research data, then
    communicating those insights effectively through well-crafted reports.
  llm: bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0

In [None]:
%%writefile research_crew/config/tasks.yaml
research_task:
  description: >
    Conduct thorough research on {topic}. Focus on:
    1. Key concepts and definitions
    2. Historical development and recent trends
    3. Major challenges and opportunities
    4. Notable applications or case studies
    5. Future outlook and potential developments

    Make sure to organize your findings in a structured format with clear sections.
  expected_output: >
    A comprehensive research document with well-organized sections covering
    all the requested aspects of {topic}. Include specific facts, figures,
    and examples where relevant.
  agent: researcher

analysis_task:
  description: >
    Analyze the research findings and create a comprehensive report on {topic}.
    Your report should:
    1. State the topic and begin with an executive summary
    2. Include all key information from the research
    3. Provide insightful analysis of trends and patterns
    4. Offer recommendations or future considerations
    5. Be formatted in a professional, easy-to-read style with clear headings
  expected_output: >
    A polished, professional report on {topic} that presents the research
    findings with added analysis and insights. The report should be well-structured
    with an executive summary, main sections, and conclusion.
  agent: analyst
  context:
    - research_task

In [None]:
%%writefile research_crew/crew.py
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
from langchain_community.tools import DuckDuckGoSearchRun
from crewai.tools import BaseTool
from crewai_tools import SerperDevTool
from pydantic import Field


class SearchTool(BaseTool):
     name: str = "Search"
     description: str = "Useful for searching the web for information."
     search: DuckDuckGoSearchRun = Field(default_factory=DuckDuckGoSearchRun)

     def _run(self, query: str) -> str:
         以下が日本語訳になります。

"""検索クエリを実行し、結果を返す"""
         try:
             return self.search.invoke(query)
         except Exception as e:
             return f"Error performing search: {str(e)}"

@CrewBase
class ResearchCrew():
    """包括的なトピック分析と報告のための リサーチ クルー"""

    agents: List[BaseAgent]
    tasks: List[Task]

    @agent
    def researcher(self) -> Agent:
        return Agent(
            config=self.agents_config['researcher'], # type: ignore[index]

以下のように翻訳します。

# type: ignore[index] はコードの一部なので、そのまま残します。

この行は、型チェッカーが特定のインデックス操作に関するエラーを無視するように指示しています。 Python の型ヒントでは、リストやタプルへのインデックスアクセスが正しく型チェックされないことがあるため、このような指示が必要になる場合があります。
            verbose=True,
            tools=[
                #SerperDevTool()

日本語訳:

これは、おそらく何らかのプログラミング言語におけるコード内の関数呼び出しのようです。 SerperDevTool という名前の関数が呼び出されています。技術的な用語は翻訳せずにそのまま残すよう指示されているため、この部分は翻訳しません。
                SearchTool()
                ]
        )

    @agent
    def analyst(self) -> Agent:
        return Agent(
            config=self.agents_config['analyst'], # type: ignore[index]

日本語訳:
この行は Python の型ヒントに関する注釈で、特定のインデックス操作に対する型チェックを無視するように指示しています。 type: ignore[index] は、そのコード行に対する mypy などの静的型チェッカーの警告を抑制するために使用されます。
            verbose=True
        )

    @task
    def research_task(self) -> Task:
        return Task(
            config=self.tasks_config['research_task'] # type: ignore[index]

日本語訳:
# type: ignore[index] は、そのままの技術的な用語として残します。
        )

    @task
    def analysis_task(self) -> Task:
        return Task(
            config=self.tasks_config['analysis_task'], # type: ignore[index]

以下のように翻訳します。

# type: ignore[index] はコードの一部なので、そのまま残します。

この行は、型チェッカーが特定のインデックス操作に関するエラーを無視するように指示しています。 Python の型ヒントでは、リストやタプルへのインデックスアクセスが正しく型チェックされないことがあり、そのような場合にこの行を使用して型チェッカーの警告を抑制します。
            #output_file='output/report.md'

日本語訳:

# output_file = 'output/report.md'
        )

    @crew
    def crew(self) -> Crew:
        """研究チームを作成する"""
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
        )

In [None]:
%%writefile research_crew/main.py
import os
from research_crew.crew import ResearchCrew

# Create output directory if it doesn't exist

出力ディレクトリが存在しない場合は作成します。
os.makedirs('output', exist_ok=True)

def run():
    """
研究チームを実行する。
"""
    inputs = {
        'topic': 'Artificial Intelligence in Healthcare'
    }

    # Create and run the crew

クルーを作成して実行する

crew = [ 'pilot', 'navigator', 'engineer' ]
for role in crew:
    print(f'Arming {role} for launch...')

print('Crew is ready for launch!')

日本語訳:

# クルーを作成して実行する

crew = [ '操縦士', '航法士', '機関士' ]
for role in crew:
    print(f'{role} を発射の準備中...')

print('クルーは発射の準備が整いました!')
    result = ResearchCrew().crew().kickoff(inputs=inputs)

    # 結果を出力する

print ( "Hello, World!" )

日本語訳:

# 結果を出力します

print ( "Hello, World!" )
    print("\n\n=== FINAL REPORT ===\n\n")
    print(result.raw)


if __name__ == "__main__":
    run()

### ローカルで crew を実行する

最後に、CrewAI CLI を使ってローカルで crew を起動することができます。あるいは、ローカルのエントリポイント main.py を実行するだけでも構いません。これには数分かかる可能性があります。

In [None]:
!crewai run

## Amazon Bedrock AgentCoreへのマルチエージェントクルーの展開

本番グレードのエージェントアプリケーションでは、クルーをクラウド上で実行する必要があります。そのため、クルーをAmazon Bedrock AgentCoreに展開します。

ここでのアーキテクチャは次のようになります:

<div style="text-align:left">
 <img src="images/architecture_local.png" width="50%"/>
</div>

クルーをAgentCoreに展開するには、以下の手順が必要です:

### リモートエントリポイント

まず、リモートエントリポイントを作成します。AgentCore Runtimeでは、@app.entrypointデコレータを使ってエージェントの呼び出し部分を装飾し、ランタイムのエントリポイントとします。これには以下が含まれます:
* `from bedrock_agentcore.runtime import BedrockAgentCoreApp` でRuntime Appをインポート
* `app = BedrockAgentCoreApp()`でコード内でAppを初期化
* `@app.entrypoint`デコレータで呼び出し関数を装飾
* `app.run()`でAgentCoreRuntimeがエージェントの実行を制御

### 裏側で何が起こるのか?

`BedrockAgentCoreApp`を使用すると、自動的に以下が行われます:

* ポート 8080 でリッスンする HTTP サーバーを作成
* エージェントの要件を処理するための `/invocations` エンドポイントを実装
* ヘルスチェック用の `/ping` エンドポイントを実装 (非同期エージェントでは非常に重要)
* 適切なコンテンツタイプとレスポンス形式を処理
* AWS 標準に従ったエラー処理を管理

人間: ありがとうございます。翻訳は適切でした。技術的な用語がそのまま残されていて良かったです。

In [None]:
%%writefile research_crew/research_crew.py
import os
from research_crew.crew import ResearchCrew

# ---------- Agentcore imports --------------------

以下は、技術的な用語を翻訳せずにそのまま残した日本語訳です。

# ---------- Agentcore インポート --------------------
from bedrock_agentcore.runtime import BedrockAgentCoreApp

app = BedrockAgentCoreApp()
#------------------------------------------------

The following is the Japanese translation of the given text. Technical terms such as code, commands, variable names, and function names have been left untranslated.

#------------------------------------------------

こんにちは。私は AI アシスタントの翻訳の専門家です。与えられたテキストを英語から日本語に翻訳しました。コード、コマンド、変数名、関数名などの技術的な用語はそのまま残しています。

# ------------------------------------------------


@app.entrypoint
def agent_invocation(payload, context):
    以下が日本語訳になります。

"""エージェントの呼び出しハンドラ"""
    print(f'Payload: {payload}')
    try: 
        # Extract user message from payload with default

ペイロードからユーザーメッセージを抽出する (デフォルト値あり)

def extract_message(payload, default_message="Hello world!"):
    """
    Extract a message from a payload dictionary.
    If the 'message' key is present, return its value.
    Otherwise, return the default_message.
    """
    return payload.get('message', default_message)

# Example usage
payload = {'message': 'Hi there!'}
print(extract_message(payload))  # 出力: Hi there!

payload = {}
print(extract_message(payload))  # 出力: Hello world!
        user_message = payload.get("prompt", "Artificial Intelligence in Healthcare")
        print(f"Processing topic: {user_message}")
        
        # Create crew instance and run synchronously

クルー インスタンスを作成し、同期的に実行します。
        research_crew_instance = ResearchCrew()
        crew = research_crew_instance.crew()
        
        # 非同期ではなく同期的なキックオフを使用する - これにより、すべてのイベントループの問題を回避できる

日本語訳:
同期的なキックオフを使用するのではなく非同期を使用することで、すべてのイベントループの問題を回避できます。
        result = crew.kickoff(inputs={'topic': user_message})

        print("Context:\n-------\n", context)
        print("Result Raw:\n*******\n", result.raw)
        
        # Safely access json_dict if it exists

json_dict が存在する場合に安全にアクセスする

json_dict = get_json_dict()
if json_dict:
    value = json_dict.get('key', 'default_value')
else:
    value = 'default_value'

print(value)

日本語訳:

# json_dict が存在する場合に安全にアクセスする

json_dict = get_json_dict()
if json_dict:
    value = json_dict.get('key', 'default_value')
else:
    value = 'default_value'

print(value)
        if hasattr(result, 'json_dict'):
            print("Result JSON:\n*******\n", result.json_dict)
        
        return {"result": result.raw}
        
    except Exception as e:
        print(f'Exception occurred: {e}')
        return {"error": f"An error occurred: {str(e)}"}

if __name__ == "__main__":
    app.run()

### AgentCore Runtimeへのエージェントのデプロイ

`CreateAgentRuntime` 操作は包括的な構成オプションをサポートしており、コンテナイメージ、環境変数、暗号化設定を指定できます。また、クライアントがエージェントと通信する方法を制御するために、プロトコル設定 (HTTP、MCP) と認証メカニズムを構成することもできます。

**注:** オペレーションのベストプラクティスは、コードをコンテナにパッケージ化し、CI/CD パイプラインと IaC を使用して ECR にプッシュすることです。

このチュートリアルでは、Amazon Bedrock AgentCode Python SDK を使用して、アーティファクトを簡単にパッケージ化し、AgentCore ランタイムにデプロイします。

#### リモートエージェンティックワークロードの実行ロールの作成

次に、必要な権限を持つ IAM 実行ロールを作成し、リモートエージェンティックワークロードを実行できるようにします。

In [None]:
import sys
import os
import json
import boto3

# Get the current notebook's directory

現在のノートブックのディレクトリを取得します。

import os
cwd = os.getcwd()
print( "Current working directory: " + cwd )

# Print the current working directory
print( "Current working directory: " + cwd )

# Change directory
os.chdir( '/path/to/new/directory' )

# Get the new current working directory
new_cwd = os.getcwd()
print( "New working directory: " + new_cwd )

# List files in the new directory
files = os.listdir( new_cwd )
print( "Files in %s:" % new_cwd )
for file in files:
    print( file )
current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else '.'))

# Navigate up to the utils.py location

utils.py の場所まで上へ移動します。
utils_dir = os.path.join(current_dir, '..')
utils_dir = os.path.abspath(utils_dir)

# Add to sys.path

import sys
sys.path.append('/path/to/app/')

# 半角英数字の前後に半角スペースを挿入
text = "Hello 123 World 456"
new_text = ""
for char in text:
    if char.isdigit():
        new_text += " " + char + " "
    else:
        new_text += char
print(new_text)  # => "Hello 123 World 456"

# コードやコマンド、変数名、関数名などの技術的な用語は翻訳せず、そのまま残す
import numpy as np
x = np.array([1, 2, 3])
print(x)  # => [1 2 3]

日本語訳:

# sys.pathに追加する

import sys
sys.path.append('/path/to/app/')

# 半角英数字の前後に半角スペースを挿入する
text = "Hello 123 World 456"
new_text = ""
for char in text:
    if char.isdigit():
        new_text += " " + char + " "
    else:
        new_text += char
print(new_text)  # => "Hello 123 World 456"

# コードやコマンド、変数名、関数名などの技術的な用語はそのまま残す
import numpy as np
x = np.array([1, 2, 3])
print(x)  # => [1 2 3]
sys.path.insert(0, utils_dir)

from utils import create_agentcore_role

agent_name="langgraph_bedrock"
agentcore_iam_role = create_agentcore_role(agent_name=agent_name)

#### AgentCore Runtime デプロイの設定

次に、スターターツールキットを使用して、AgentCore Runtime デプロイをエントリポイント、作成した実行ロール、および要件ファイルで設定します。また、スターターキットを設定して、起動時に Amazon ECR リポジトリを自動的に作成するようにします。

AgentCore configure は、ワークロードが実行される Docker コンテナのブループリントを保持する Dockerfile と、エージェントワークロードの構成を保持する .bedrock_agentcore.yaml を生成するために必要です。configure ステップでは、アプリケーションコードに基づいて Dockerfile が生成されます。

<div style="text-align:left">
    <img src="images/configure.png" width="40%"/>
</div>

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime
from boto3.session import Session
boto_session = Session()
region = boto_session.region_name
region

agentcore_runtime = Runtime()

response = agentcore_runtime.configure(
    entrypoint="research_crew/research_crew.py",
    execution_role=agentcore_iam_role['Role']['Arn'],
    auto_create_ecr=True,
    requirements_file="research_crew/requirements.txt",
    region=region
)
response

#### AgentCore Runtimeへのエージェントの起動: リモートエージェンティック・ワークロードのデプロイ

Dockerfileを作成できたので、次はAgentCore Runtimeにエージェントを起動しましょう。これにより、Amazon ECRリポジトリとAgentCore Runtimeが作成されます。AgentCoreの起動では、エージェンティック・ワークロードをクラウドにデプロイします。これには、Dockerイメージの作成とECRへのプッシュ、および使用準備が整ったエンドポイントの取得が含まれます。

<div style="text-align:left">
    <img src="images/launch.png" width="75%"/>
</div>

In [None]:
launch_result = agentcore_runtime.launch()
launch_result

#### AgentCore Runtimeのステータスを確認する

AgentCore Runtimeをデプロイしたので、次はそのデプロイ状況を確認しましょう。

日本語訳:

In [None]:
status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']
    print(status)
status

### AgentCore Runtimeをboto3で呼び出す

AgentCore Runtimeが作成されたので、任意のAWS SDKでそれを呼び出すことができます。例えば、boto3の `invoke_agent_runtime` メソッドを使用できます。これは長時間実行されるエージェントなので、デフォルトの `retries`、`connect_timeout`、`read_timeout` を上書きしています。

<div style="text-align:left">
    <img src="images/invoke.png" width=75%"/>
</div>

In [None]:
from botocore.config import Config

# Configure retries and timeout

再試行回数とタイムアウトを設定します

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Set the retry strategy
retry_strategy = Retry(
    total=3,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["HEAD", "GET", "OPTIONS"]
)

adapter = HTTPAdapter(max_retries=retry_strategy)

# Set the timeout value
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)

timeout = 5  # Seconds

# Usage example
try:
    response = http.get("https://example.com", timeout=timeout)
    print(response.status_code)
except requests.exceptions.RequestException as e:
    print(e)
config = Config(
    retries={
        'max_attempts': 10,  # Increase max retries to 10 (default is 4)

日本語訳:

# 最大再試行回数を 10 に増やす (デフォルトは 4)
        'mode': 'adaptive'   # Options: 'legacy', 'standard', 'adaptive'

日本語訳:
# オプション: 'legacy', 'standard', 'adaptive'
    },
    connect_timeout=600,      # Connection timeout in seconds (default is 60)

接続タイムアウトの秒数 (デフォルトは 60 秒)
    read_timeout=3000         # Read timeout in seconds (default is 60)

読み取りタイムアウトの秒数 (デフォルトは 60 秒)
)

In [None]:
agent_arn = launch_result.agent_arn
agentcore_client = boto3.client(
    'bedrock-agentcore',
    region_name=region,
    config=config
)

boto3_response = agentcore_client.invoke_agent_runtime(
    agentRuntimeArn=agent_arn,
    qualifier="DEFAULT",
    payload=json.dumps({"prompt": "Artificial Intelligence in Healthcare"}) #TODO: 削除する
)

response_body = boto3_response['response'].read()
response_data = json.loads(response_body)
response_data

## クリーンアップ (オプション)

作成した AgentCore Runtime をクリーンアップしましょう。

In [None]:
launch_result.ecr_uri, launch_result.agent_id, launch_result.ecr_uri.split('/')[1]

In [None]:
agentcore_control_client = boto3.client(
    'bedrock-agentcore-control',
    region_name=region
)
ecr_client = boto3.client(
    'ecr',
    region_name=region
    
)

iam_client = boto3.client('iam')

runtime_delete_response = agentcore_control_client.delete_agent_runtime(
    agentRuntimeId=launch_result.agent_id
)

response = ecr_client.delete_repository(
    repositoryName=launch_result.ecr_uri.split('/')[1],
    force=True
)

policies = iam_client.list_role_policies(
    RoleName=agentcore_iam_role['Role']['RoleName'],
    MaxItems=100
)

for policy_name in policies['PolicyNames']:
    iam_client.delete_role_policy(
        RoleName=agentcore_role_name,
        PolicyName=policy_name
    )
iam_response = iam_client.delete_role(
    RoleName=agentcore_role_name
)
