# 第4章: マルチエージェント・ワークフロー

## 準備

以下のセルを順番に実行して、演習に必要な環境をセットアップします。

### LLMプロバイダーの選択

このセルでは、使用するLLMプロバイダーを選択します。
`LLM_PROVIDER` 変数に、利用したいプロバイダー名を設定してください。
選択可能なプロバイダー: `"openai"`, `"azure"`, `"google"` (Vertex AI), `"google_genai"` (Gemini API), `"anthropic"`, `"bedrock"`

In [None]:
# === LLMプロバイダーの選択 ===
# 利用したいLLMプロバイダーを以下の変数で指定してください。
# "openai", "azure", "google" (Vertex AI), "google_genai" (Gemini API), "anthropic", "bedrock" のいずれかを選択できます。
LLM_PROVIDER = "openai"  # 例: OpenAI を利用する場合

### APIキー/環境変数の設定

以下のセルを実行する前に、選択したLLMプロバイダーに応じたAPIキーまたは環境変数を設定する必要があります。

**手順:**
1.  `.env.sample` ファイルをコピーして `.env` ファイルを作成します。
2.  `.env` ファイルを開き、選択したLLMプロバイダーに対応するAPIキーや必要な情報を記述します。
    *   **OpenAI:** `OPENAI_API_KEY`
    *   **Azure OpenAI:** `AZURE_OPENAI_API_KEY`, `AZURE_OPENAI_ENDPOINT`, `OPENAI_API_VERSION`, `AZURE_OPENAI_DEPLOYMENT_NAME`
    *   **Google (Vertex AI):** `GOOGLE_CLOUD_PROJECT_ID`, `GOOGLE_CLOUD_LOCATION` (Colab環境外で実行する場合、`GOOGLE_APPLICATION_CREDENTIALS` 環境変数の設定も必要になることがあります)
    *   **Google (Gemini API):** `GOOGLE_API_KEY`
    *   **Anthropic:** `ANTHROPIC_API_KEY`
    *   **AWS Bedrock:** `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION_NAME` (IAMロールを使用する場合は、これらのキー設定は不要な場合がありますが、リージョン名は必須です)
3.  ファイルを保存します。

**Google Colab を使用している場合:**
上記の `.env` ファイルを使用する代わりに、Colabのシークレットマネージャーに必要なキーを登録してください。
例えば、OpenAIを使用する場合は `OPENAI_API_KEY` という名前でシークレットを登録します。
Vertex AI を利用する場合は、Colab上での認証 (`google.colab.auth.authenticate_user()`) が実行されます。

このセルは、設定された情報に基づいて環境変数をロードし、LLMクライアントを初期化します。

In [None]:
# === APIキー/環境変数の設定 ===
import os
from dotenv import load_dotenv

# .envファイルから環境変数を読み込む (存在する場合)
load_dotenv()

try:
    from google.colab import userdata
    IS_COLAB = True
except ImportError:
    IS_COLAB = False

# --- OpenAI ---
if LLM_PROVIDER == "openai":
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
    if not OPENAI_API_KEY and IS_COLAB:
        OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")
    if not OPENAI_API_KEY:
        raise ValueError("OpenAI APIキーが設定されていません。環境変数 OPENAI_API_KEY を設定するか、Colab環境の場合はシークレットに OPENAI_API_KEY を設定してください。")
    os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

# --- Azure OpenAI ---
elif LLM_PROVIDER == "azure":
    AZURE_OPENAI_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY")
    AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT")
    OPENAI_API_VERSION = os.environ.get("OPENAI_API_VERSION")
    AZURE_OPENAI_DEPLOYMENT_NAME = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")

    if IS_COLAB:
        if not AZURE_OPENAI_API_KEY: AZURE_OPENAI_API_KEY = userdata.get("AZURE_OPENAI_API_KEY")
        if not AZURE_OPENAI_ENDPOINT: AZURE_OPENAI_ENDPOINT = userdata.get("AZURE_OPENAI_ENDPOINT")
        if not OPENAI_API_VERSION: OPENAI_API_VERSION = userdata.get("OPENAI_API_VERSION") # 例: "2023-07-01-preview"
        if not AZURE_OPENAI_DEPLOYMENT_NAME: AZURE_OPENAI_DEPLOYMENT_NAME = userdata.get("AZURE_OPENAI_DEPLOYMENT_NAME")

    if not AZURE_OPENAI_API_KEY: raise ValueError("Azure OpenAI APIキー (AZURE_OPENAI_API_KEY) が設定されていません。")
    if not AZURE_OPENAI_ENDPOINT: raise ValueError("Azure OpenAI エンドポイント (AZURE_OPENAI_ENDPOINT) が設定されていません。")
    if not OPENAI_API_VERSION: OPENAI_API_VERSION = "2023-07-01-preview" # デフォルトを設定することも可能
    if not AZURE_OPENAI_DEPLOYMENT_NAME: raise ValueError("Azure OpenAI デプロイメント名 (AZURE_OPENAI_DEPLOYMENT_NAME) が設定されていません。")

    os.environ["AZURE_OPENAI_API_KEY"] = AZURE_OPENAI_API_KEY
    os.environ["AZURE_OPENAI_ENDPOINT"] = AZURE_OPENAI_ENDPOINT
    os.environ["OPENAI_API_VERSION"] = OPENAI_API_VERSION

# --- Google Cloud Vertex AI (Gemini) ---
elif LLM_PROVIDER == "google":
    PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT_ID") # .env 用に修正
    LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")

    if IS_COLAB:
        if not PROJECT_ID: PROJECT_ID = userdata.get("GOOGLE_CLOUD_PROJECT_ID")
        if not LOCATION: LOCATION = userdata.get("GOOGLE_CLOUD_LOCATION") # 例: "us-central1"
        from google.colab import auth as google_auth
        google_auth.authenticate_user() # Vertex AI を使う場合は Colab での認証を推奨
    else: # Colab外の場合、.envから読み込んだ値で環境変数を設定
        if PROJECT_ID: os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID # Vertex AI SDKが参照する標準的な環境変数名
        if LOCATION: os.environ['GOOGLE_CLOUD_LOCATION'] = LOCATION

    if not PROJECT_ID: raise ValueError("Google Cloud Project ID が設定されていません。環境変数 GOOGLE_CLOUD_PROJECT_ID を設定するか、Colab環境の場合はシークレットに GOOGLE_CLOUD_PROJECT_ID を設定してください。")
    if not LOCATION: LOCATION = "us-central1" # デフォルトロケーション

# --- Google Gemini API (langchain-google-genai) ---
elif LLM_PROVIDER == "google_genai":
    GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
    if not GOOGLE_API_KEY and IS_COLAB:
        GOOGLE_API_KEY = userdata.get("GOOGLE_API_KEY")
    if not GOOGLE_API_KEY:
        raise ValueError("Google APIキーが設定されていません。環境変数 GOOGLE_API_KEY を設定するか、Colab環境の場合はシークレットに GOOGLE_API_KEY を設定してください。")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

# --- Anthropic (Claude) ---
elif LLM_PROVIDER == "anthropic":
    ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
    if not ANTHROPIC_API_KEY and IS_COLAB:
        ANTHROPIC_API_KEY = userdata.get("ANTHROPIC_API_KEY")
    if not ANTHROPIC_API_KEY:
        raise ValueError("Anthropic APIキーが設定されていません。環境変数 ANTHROPIC_API_KEY を設定するか、Colab環境の場合はシークレットに ANTHROPIC_API_KEY を設定してください。")
    os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY

# --- Amazon Bedrock (Claude) ---
elif LLM_PROVIDER == "bedrock":
    AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
    AWS_REGION_NAME = os.environ.get("AWS_REGION_NAME")

    if IS_COLAB: 
        if not AWS_ACCESS_KEY_ID: AWS_ACCESS_KEY_ID = userdata.get("AWS_ACCESS_KEY_ID")
        if not AWS_SECRET_ACCESS_KEY: AWS_SECRET_ACCESS_KEY = userdata.get("AWS_SECRET_ACCESS_KEY")
        if not AWS_REGION_NAME: AWS_REGION_NAME = userdata.get("AWS_REGION_NAME")

    if not AWS_REGION_NAME:
         raise ValueError("AWSリージョン名 (AWS_REGION_NAME) が設定されていません。Bedrock利用にはリージョン指定が必要です。")

    # 環境変数に設定 (boto3がこれらを自動で読み込む)
    if AWS_ACCESS_KEY_ID: os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
    if AWS_SECRET_ACCESS_KEY: os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
    os.environ["AWS_DEFAULT_REGION"] = AWS_REGION_NAME # boto3が参照する標準的なリージョン環境変数名
    os.environ["AWS_REGION"] = AWS_REGION_NAME # いくつかのライブラリはこちらを参照することもある

print(f"APIキー/環境変数の設定完了 (プロバイダー: {LLM_PROVIDER})")

### LLMクライアントの初期化

このセルは、上で選択・設定したLLMプロバイダーに基づいて、対応するLLMクライアントを初期化します。

In [None]:
# === LLMクライアントの動的初期化 ===
llm = None

if LLM_PROVIDER == "openai":
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
elif LLM_PROVIDER == "azure":
    from langchain_openai import AzureChatOpenAI
    llm = AzureChatOpenAI(
        azure_deployment=os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME"), # 環境変数から取得
        openai_api_version=os.environ.get("OPENAI_API_VERSION"), # 環境変数から取得
        temperature=0,
    )
elif LLM_PROVIDER == "google":
    from langchain_google_vertexai import ChatVertexAI
    # PROJECT_ID, LOCATION は前のセルで環境変数に設定済みか、Colabの場合は直接利用
    llm = ChatVertexAI(model_name="gemini-2.0-flash", temperature=0, project=os.environ.get("GOOGLE_CLOUD_PROJECT"), location=os.environ.get("GOOGLE_CLOUD_LOCATION"))
elif LLM_PROVIDER == "google_genai":
    from langchain_google_genai import ChatGoogleGenerativeAI
    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
elif LLM_PROVIDER == "anthropic":
    from langchain_anthropic import ChatAnthropic
    llm = ChatAnthropic(model="claude-3-haiku-20240307", temperature=0)
elif LLM_PROVIDER == "bedrock":
    from langchain_aws import ChatBedrock # langchain_community.chat_models から langchain_aws に変更の可能性あり
    # AWS_REGION_NAME は前のセルで環境変数 AWS_DEFAULT_REGION に設定済み
    llm = ChatBedrock( # BedrockChat ではなく ChatBedrock が一般的
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        # region_name=os.environ.get("AWS_DEFAULT_REGION"), # 通常、boto3が環境変数から自動で読み込む
        model_kwargs={"temperature": 0},
    )
else:
    raise ValueError(
        f"Unsupported LLM_PROVIDER: {LLM_PROVIDER}. "
        "Please choose from 'openai', 'azure', 'google', 'google_genai', 'anthropic', or 'bedrock'."
    )

print(f"LLM Provider: {LLM_PROVIDER}")
if llm:
    print(f"LLM Client Type: {type(llm)}")
    # モデル名取得の試行を汎用的に
    model_attr = (
                 getattr(llm, 'model', None) or
                 getattr(llm, 'model_name', None) or
                 getattr(llm, 'model_id', None) or
                 (hasattr(llm, 'llm') and getattr(llm.llm, 'model', None)) # 一部のLLMクライアントのネスト構造に対応
    )
    if hasattr(llm, 'azure_deployment') and not model_attr: # Azure特有の属性
        model_attr = llm.azure_deployment
        
    if model_attr:
        print(f"LLM Model: {model_attr}")
    else:
        print("LLM Model: (Could not determine model name from client attributes)")


---

### ■ 問題001: 基本的な2エージェント会話（リサーチャーとライター）

複数のLLMエージェントが協調してタスクを解決する第一歩として、2つの異なる役割を持つエージェント（リサーチャーとライター）を作成し、それらが交互に会話（状態を更新）しながら一つのタスク（例: 特定のトピックに関する短い記事の作成）を進めるグラフを構築します。

*   **学習内容:**
    *   異なる役割（プロンプトや利用ツールが異なる）を持つ複数のエージェント関数（ノード）を定義する方法。
    *   状態（State）に「次に実行すべきエージェント」を示すキー（例: `next_agent`）を持たせ、それに基づいて処理をルーティングする方法。
    *   エージェント間で情報を引き継ぎながら（例: リサーチャーの調査結果をライターが利用）、タスクを段階的に進める基本的な流れ。

In [None]:
# 解答欄001 - グラフ構築
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import ____, ____
from langgraph.graph.message import ____
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from uuid import uuid4 # 解答例より

# --- 1. 状態定義 ---
class ____(____):
    messages: Annotated[List[BaseMessage], add_messages]
    next_agent: str # 次に実行するエージェント名 ("Researcher", "Writer", "FINISH")
    topic: str
    research_findings: Optional[str]
    draft_article: Optional[str]

# --- 2. エージェント（ノード）定義 ---
def ____(state: TwoAgentConversationState) -> dict: # 戻り値の型アノテーションをdictに変更 (解答例より)
    print(f"
[リサーチャーエージェント] トピック「{state['topic']}」について調査します。")
    findings = f"「{state['topic']}」に関する詳細な調査結果: 主要なポイントはA, B, Cであり、背景にはDが存在します。今後の展望としてはEが考えられます。" # 解答例よりメッセージ変更
    if LLM_PROVIDER != "fake" and llm: # 解答例より llm の存在チェック追加
        prompt = f"あなたは優秀なリサーチャーです。トピック「{state['topic']}」について、架空の調査結果を詳細に（3つの主要ポイント、背景、今後の展望を含めて）生成してください。" # 解答例よりプロンプト変更
        response = llm.invoke([HumanMessage(content=prompt)])
        findings = response.content
    
    print(f"  調査結果: {findings[:100]}...") # 解答例より出力短縮
    return {
        "messages": [AIMessage(content=f"調査結果報告: {findings}", name="Researcher")], # 解答例よりメッセージ変更
        "research_findings": findings,
        "next_agent": "Writer" # 次はライターエージェントへ (解答例より)
    }

def ____(state: TwoAgentConversationState) -> dict: # 戻り値の型アノテーションをdictに変更 (解答例より)
    print(f"
[ライターエージェント] 調査結果を元に記事を作成します。")
    findings = state.get("research_findings", "調査結果が提供されていません。") # 解答例より
    article = f"タイトル案: {state['topic']}の深掘り\n\n{findings}\n\n本稿は上記の調査に基づき構成されました。" # 解答例よりメッセージ変更
    if LLM_PROVIDER != "fake" and llm: # 解答例より llm の存在チェック追加
        prompt = f"あなたは熟練のライターです。以下の調査結果に基づいて、読者の関心を引くような、約200字程度の解説記事を作成してください。タイトルも提案してください。\n調査結果:\n{findings}" # 解答例よりプロンプト変更
        response = llm.invoke([HumanMessage(content=prompt)])
        article = response.content

    print(f"  作成された記事: {article[:100]}...") # 解答例より出力短縮
    return {
        "messages": [AIMessage(content=f"記事ドラフト:\n{article}", name="Writer")], # 解答例よりメッセージ変更
        "draft_article": article,
        "next_agent": "FINISH" # これで終了 (解答例より)
    }

# --- 3. ルーター関数の定義 ---
def ____(state: TwoAgentConversationState) -> str:
    next_node_name = state.get("next_agent") # 解答例より変数名変更
    print(f"  -> ルーター: 次のエージェントは「{next_node_name}」です。") # 解答例より変数名変更
    if next_node_name == "Writer":
        return "writer_node" # ノード名と合わせる (解答例より)
    elif next_node_name == "Researcher": 
        return "researcher_node" # 解答例より
    else: 
        return END

# --- 4. グラフの構築 ---
workflow_q1_ch4 = StateGraph(TwoAgentConversationState)

workflow_q1_ch4.____("researcher_node", researcher_agent) # 解答例よりノード名変更
workflow_q1_ch4.add_node("writer_node", writer_agent) # 解答例よりノード名変更

workflow_q1_ch4.____(
    route_to_next_agent, 
    {
        "researcher_node": "researcher_node", # 解答例より
        "writer_node": "writer_node", # 解答例より
        END: END
    }
)

workflow_q1_ch4.add_conditional_edges("researcher_node", route_to_next_agent, {"writer_node": "writer_node", END: END}) # 解答例よりノード名変更
workflow_q1_ch4.add_conditional_edges("writer_node", route_to_next_agent, {END: END}) # 解答例よりノード名変更

graph_q1_ch4 = workflow_q1_ch4.____()

In [None]:
# 解答欄001 - グラフ可視化
from IPython.display import Image, display # 解答例より

try:
    display(Image(graph_q1_ch4.____().____()))
except Exception as e:
    print(f"グラフの可視化に失敗しました。Graphvizが正しくインストールされているか確認してください。エラー: {e}")

In [None]:
# 解答欄001 - グラフ実行
topic_q1_ch4 = "LangGraphを用いた高度な自律エージェントの設計パターン" # 解答例よりトピック変更
thread_id_q1 = f"thread-2agents-{uuid4()[:4]}" # 解答例より
config_q1 = {"____": {"____": thread_id_q1}} # 解答例より

initial_state_q1_ch4 = {
    "messages": [____(content=f"トピック「{topic_q1_ch4}」について記事を作成してください。まず調査からお願いします。初期状態です。")], # 解答例よりメッセージ変更
    "____": "Researcher", 
    "topic": topic_q1_ch4,
    "____": None,
    "____": None
}

print(f"--- 2エージェント会話テスト (トピック: {topic_q1_ch4}) ---")
final_state_q1_ch4_values = None # 解答例より
for event_chunk in graph_q1_ch4.____(initial_state_q1_ch4, ____=config_q1, recursion_limit=5): # 解答例より config 追加, recursion_limitは元の解答から
    print(f"Event Chunk: {event_chunk}") # 解答例より
    for key, value_dict in event_chunk.items(): # 解答例より
        if key == END: # 解答例より
            final_state_q1_ch4_values = value_dict # 解答例より
    print("----");

if not final_state_q1_ch4_values: # 解答例より
    print("ENDイベントから最終状態を取得できませんでした。get_stateを試みます。") # 解答例より
    current_state_obj = graph_q1_ch4.____(config=config_q1) # 解答例より
    if current_state_obj: # 解答例より
        final_state_q1_ch4_values = current_state_obj.values # 解答例より

if final_state_q1_ch4_values: # 解答例より
    print("
--- 最終結果 ---")
    print(f"トピック: {final_state_q1_ch4_values.get('topic')}")
    print(f"調査結果: {final_state_q1_ch4_values.get('research_findings')}")
    print(f"最終記事: {final_state_q1_ch4_values.get('draft_article')}")
else:
    print("最終状態が取得できませんでした。")

<details><summary>解答001</summary>

``````python
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from IPython.display import Image, display
from uuid import uuid4

# --- 1. 状態定義 ---
class TwoAgentConversationState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    next_agent: str 
    topic: str
    research_findings: Optional[str]
    draft_article: Optional[str]

# --- 2. エージェント（ノード）定義 ---
def researcher_agent(state: TwoAgentConversationState) -> dict:
    print(f"
[リサーチャーエージェント] トピック「{state['topic']}」について調査します。")
    findings = f"「{state['topic']}」に関する詳細な調査結果: 主要なポイントはA, B, Cであり、背景にはDが存在します。今後の展望としてはEが考えられます。"
    if LLM_PROVIDER != "fake" and llm:
        prompt = f"あなたは優秀なリサーチャーです。トピック「{state['topic']}」について、架空の調査結果を詳細に（3つの主要ポイント、背景、今後の展望を含めて）生成してください。"
        response = llm.invoke([HumanMessage(content=prompt)])
        findings = response.content
    
    print(f"  調査結果: {findings[:100]}...")
    return {
        "messages": [AIMessage(content=f"調査結果報告: {findings}", name="Researcher")],
        "research_findings": findings,
        "next_agent": "Writer" 
    }

def writer_agent(state: TwoAgentConversationState) -> dict:
    print(f"
[ライターエージェント] 調査結果を元に記事を作成します。")
    findings = state.get("research_findings", "調査結果が提供されていません。")
    article = f"タイトル案: {state['topic']}の深掘り\n\n{findings}\n\n本稿は上記の調査に基づき構成されました。"
    if LLM_PROVIDER != "fake" and llm:
        prompt = f"あなたは熟練のライターです。以下の調査結果に基づいて、読者の関心を引くような、約200字程度の解説記事を作成してください。タイトルも提案してください。\n調査結果:\n{findings}"
        response = llm.invoke([HumanMessage(content=prompt)])
        article = response.content

    print(f"  作成された記事: {article[:100]}...")
    return {
        "messages": [AIMessage(content=f"記事ドラフト:\n{article}", name="Writer")],
        "draft_article": article,
        "next_agent": "FINISH" 
    }

# --- 3. ルーター関数の定義 ---
def route_to_next_agent(state: TwoAgentConversationState) -> str:
    next_node_name = state.get("next_agent")
    print(f"  -> ルーター: 次のエージェントは「{next_node_name}」です。")
    if next_node_name == "Writer":
        return "writer_node" # ノード名と合わせる
    elif next_node_name == "Researcher":
        return "researcher_node"
    else: # FINISH または不明な場合は終了
        return END

# --- 4. グラフの構築 ---
workflow_q1_ch4 = StateGraph(TwoAgentConversationState)

workflow_q1_ch4.add_node("researcher_node", researcher_agent)
workflow_q1_ch4.add_node("writer_node", writer_agent)

workflow_q1_ch4.set_conditional_entry_point(
    route_to_next_agent,
    {
        "researcher_node": "researcher_node",
        "writer_node": "writer_node",
        END: END
    }
)

workflow_q1_ch4.add_conditional_edges("researcher_node", route_to_next_agent, {"writer_node": "writer_node", END: END})
workflow_q1_ch4.add_conditional_edges("writer_node", route_to_next_agent, {END: END})

graph_q1_ch4 = workflow_q1_ch4.compile()
try:
    display(Image(graph_q1_ch4.get_graph().draw_png()))
except Exception as e:
    print(f"グラフ描画に失敗: {e}")

# --- 5. グラフの実行 ---
topic_q1_ch4 = "LangGraphを用いた高度な自律エージェントの設計パターン"
thread_id_q1 = f"thread-2agents-{uuid4()[:4]}"
config_q1 = {"configurable": {"thread_id": thread_id_q1}}

initial_state_q1_ch4 = {
    "messages": [HumanMessage(content=f"トピック「{topic_q1_ch4}」について記事を作成してください。まず調査からお願いします。初期状態です。")],
    "next_agent": "Researcher",
    "topic": topic_q1_ch4,
    "research_findings": None,
    "draft_article": None
}

print(f"--- 2エージェント会話テスト (トピック: {topic_q1_ch4}) ---")
final_state_q1_ch4_values = None
for event_chunk in graph_q1_ch4.stream(initial_state_q1_ch4, config=config_q1, recursion_limit=5):
    # streamから返る各chunkは {'node_name': {'state_key': value, ...}} の形式
    # 最後のイベントがENDの場合、そのキーに対応する値が最終状態全体になる
    print(f"Event Chunk: {event_chunk}")
    for key, value_dict in event_chunk.items():
        if key == END:
            final_state_q1_ch4_values = value_dict
    print("----");

if not final_state_q1_ch4_values: # ENDイベントでキャッチできなかった場合 (例: recursion_limit)
    print("ENDイベントから最終状態を取得できませんでした。get_stateを試みます。")
    current_state_obj = graph_q1_ch4.get_state(config=config_q1)
    if current_state_obj:
        final_state_q1_ch4_values = current_state_obj.values

if final_state_q1_ch4_values:
    print("
--- 最終結果 ---")
    print(f"トピック: {final_state_q1_ch4_values.get('topic')}")
    print(f"調査結果: {final_state_q1_ch4_values.get('research_findings')}")
    print(f"最終記事: {final_state_q1_ch4_values.get('draft_article')}")
    # print(f"最終メッセージ履歴: {final_state_q1_ch4_values.get('messages')}") # 必要なら表示
else:
    print("最終状態が取得できませんでした。")
``````
</details>

<details><summary>解説001</summary>

#### この問題のポイント

*   **状態によるエージェントの切り替え:** `TwoAgentConversationState` に `next_agent` というキーを設け、このキーの値を "Researcher", "Writer", "FINISH" のように変更することで、次にどのアクション（ノード）を実行するかを制御します。
*   **エージェントノードの役割分担:**
    *   `researcher_agent`: 指定されたトピックについて（ダミーの）調査を行い、結果を `research_findings` に保存し、次に `Writer` を指定します。
    *   `writer_agent`: `research_findings` を使って（ダミーの）記事を作成し、`draft_article` に保存し、次に `FINISH` を指定します。
    *   実際のアプリケーションでは、これらのノード内部でLLMを呼び出し、より高度な調査や記事作成を行います。この解答例では、`LLM_PROVIDER != "fake"` の場合にLLMを呼び出すようにしています。
*   **ルーター (`route_to_next_agent`):** `next_agent` の値を見て、次に実行すべきノード名（`"researcher_node"` や `"writer_node"`）または特別な終了マーカー `END` を返します。これにより、グラフの実行フローが動的に決定されます。
*   **`set_conditional_entry_point`:** グラフの開始点を固定せず、初期状態の `next_agent` の値に基づいて最初のノードを決定するために使用しています。これにより、例えば途中から処理を再開するようなシナリオにも対応しやすくなります（ただし、この問題では初期状態は常にリサーチャーから開始）。
*   **情報の引き継ぎ:** リサーチャーが生成した `research_findings` は状態を通じてライターに引き継がれ、ライターはそれを利用して記事を作成します。このように、状態オブジェクトがエージェント間の情報共有媒体として機能します。

---</details>

### ■ 問題002: エージェントスウォームの基本 - スーパーバイザーによるタスク割り振り

より複雑なタスクでは、複数の専門エージェントチーム（スウォーム）を統括するスーパーバイザー（監督者）エージェントを導入するアプローチが有効です。スーパーバイザーは、全体のタスクを分析し、適切なサブタスクを特定のエージェント（またはエージェントチーム）に割り振ります。この問題では、スーパーバイザーがユーザーの要求に応じて、「リサーチャー」または「ライター」のいずれか一方のエージェントを選択して処理を委任する、基本的なタスク割り振りグラフを構築します。

*   **学習内容:**
    *   スーパーバイザー役のLLMノードが、ユーザー入力に基づいて次に実行すべきエージェント（または処理）を決定する方法。
    *   状態に「選択されたエージェント」や「タスク指示」を格納し、それに基づいて条件付きエッジで処理を分岐させる方法。
    *   委任されたエージェントが処理を実行し、その結果をスーパーバイザー（または次のステップ）に返す流れ。

In [None]:
# 解答欄002 - グラフ構築
from langchain_core.messages import SystemMessage
from typing import TypedDict, Annotated, List, Optional, ____ # Literalを追加 (解答例より)
from langgraph.graph import ____, END # (既にインポート済みだが明示)
from langgraph.graph.message import add_messages # (既にインポート済みだが明示)
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage # (既にインポート済みだが明示)
from langchain_community.tools.tavily_search import ____ # search_tool用 (解答例より)
from langchain_core.tools import tool # dummy_search_tool用 (解答例より)
from uuid import uuid4 # 解答例より
import re # 解答例より

# --- 1. 状態定義 ---
class ____(____):
    messages: Annotated[List[BaseMessage], add_messages]
    user_request: str
    next_action: Optional[Literal["DelegateToResearcher", "DelegateToWriter", "RespondToUser", "FINISH"]] # 解答例よりOptional追加
    research_task_description: Optional[str]
    research_result: Optional[str]
    writing_task_description: Optional[str]
    final_draft: Optional[str]
    supervisor_response: Optional[str] 

# --- 2. エージェント（ノード）定義 ---
def ____(state: SupervisorAgentState) -> dict:
    print(f"
[スーパーバイザーエージェント]")
    user_req = state.get("user_request") or (state["messages"][-1].content if state.get("messages") else "") # 解答例より修正
    print(f"  ユーザーリクエスト: {user_req}")
    
    next_act: Optional[Literal["DelegateToResearcher", "DelegateToWriter", "RespondToUser", "FINISH"]] = None # 解答例よりOptional
    research_desc, write_desc, sup_resp = None, None, None

    if LLM_PROVIDER != "fake" and llm:
        system_prompt = (
            "あなたはタスクを分析し、リサーチャー、ライター、または自分自身（ユーザーへの応答）のいずれに "
            "処理を割り振るかを決定するスーパーバイザーです。"
            "可能なアクションは、「DelegateToResearcher: [調査指示]」、「DelegateToWriter: [執筆指示]」、"
            "「RespondToUser: [ユーザーへの直接応答内容]」、「FINISH」のいずれかの形式で答えてください。"
            "指示や応答内容には必ず具体的な内容を入れてください。"
            "例1: ユーザーが「明日の東京の天気を調べて」と依頼したら、「DelegateToResearcher: 明日の東京の天気調査」と応答します。"
            "例2: ユーザーが「AI倫理に関する記事を書いて」と依頼したら、「DelegateToWriter: AI倫理に関する記事執筆」と応答します。"
            "例3: ユーザーが「こんにちは」と挨拶したら、「RespondToUser: こんにちは！ご用件は何でしょう？」と応答します。"
        )
        response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=user_req)])
        decision_text = response.content.strip()
        print(f"  LLMによる判断: {decision_text}")
        if decision_text.startswith("DelegateToResearcher:"):
            next_act = "DelegateToResearcher"
            research_desc = decision_text.replace("DelegateToResearcher:", "").strip() or user_req # 解答例より
        elif decision_text.startswith("DelegateToWriter:"):
            next_act = "DelegateToWriter"
            write_desc = decision_text.replace("DelegateToWriter:", "").strip() or user_req # 解答例より
        elif decision_text.startswith("RespondToUser:"):
            next_act = "RespondToUser"
            sup_resp = decision_text.replace("RespondToUser:", "").strip() or "ごめんなさい、よくわかりませんでした。" # 解答例より
        elif decision_text == "FINISH":
            next_act = "FINISH"
        else: 
            print("  WARN: LLMが期待した形式で判断を返しませんでした。フォールバックロジックを使用します。") # 解答例より
            if any(kw in user_req.lower() for kw in ["調べ", "調査", "research", "find out"]): # 解答例より
                next_act = "DelegateToResearcher"; research_desc = user_req
            elif any(kw in user_req.lower() for kw in ["書い", "記事", "作成", "write", "article"]): # 解答例より
                next_act = "DelegateToWriter"; write_desc = user_req
            else:
                next_act = "RespondToUser"; sup_resp = "ご要望を理解できませんでした。具体的な指示をいただけますか？" # 解答例より
    else: 
        if any(kw in user_req.lower() for kw in ["調べ", "調査", "research", "find out"]): # 解答例より
            next_act = "DelegateToResearcher"; research_desc = user_req
        elif any(kw in user_req.lower() for kw in ["書い", "記事", "作成", "write", "article"]): # 解答例より
            next_act = "DelegateToWriter"; write_desc = user_req
        else:
            next_act = "RespondToUser"; sup_resp = "FakeSupervisor: ご要望を理解できませんでした。"
            
    print(f"  決定された次のアクション: {next_act}")
    return {
        "messages": [AIMessage(content=f"スーパーバイザー判断: {next_act}. 指示: {research_desc or write_desc or sup_resp}", name="Supervisor")], # 解答例より
        "next_action": next_act,
        "research_task_description": research_desc,
        "writing_task_description": write_desc,
        "supervisor_response": sup_resp
    }

def ____(state: SupervisorAgentState) -> dict:
    task = state.get("research_task_description", "指定なし")
    print(f"
[簡易リサーチャーノード] タスク: {task}")
    result = f"「{task}」に関するダミー調査結果です。非常に詳細な情報が得られました。"
    if search_tool and task != "指定なし" and TAVILY_API_KEY and search_tool.name != "dummy_search_tool": # 解答例より条件修正
        try: result = search_tool.invoke(task)
        except Exception as e: print(f"  リサーチャーツールエラー: {e}") # 解答例より
    elif search_tool and search_tool.name == "dummy_search_tool": result = search_tool.invoke(task) # 解答例より
    print(f"  調査結果: {str(result)[:100]}...") # 解答例より str() で囲む
    return {"messages": [AIMessage(content=str(result), name="Researcher") ], "research_result": str(result), "next_action": "FINISH"} 

def ____(state: SupervisorAgentState) -> dict:
    task_desc = state.get("writing_task_description", "トピック指定なし")
    research_data = state.get("research_result") # 解答例より
    input_for_writing = f"執筆指示: {task_desc}\n" # 解答例より
    if research_data: input_for_writing += f"利用可能な調査結果: {research_data[:150]}...\n" # 解答例より
    print(f"
[簡易ライターノード] {input_for_writing}") # 解答例より
    draft = f"「{task_desc}」についての素晴らしい記事が完成しました。内容は次の通り..."
    if LLM_PROVIDER != "fake" and llm: # 解答例より
        response = llm.invoke(f"以下の指示と情報に基づいて記事を作成してください。\n{input_for_writing}") # 解答例より
        draft = response.content # 解答例より
    print(f"  作成ドラフト: {draft[:100]}...")
    return {"messages": [AIMessage(content=draft, name="Writer")], "final_draft": draft, "next_action": "FINISH"}

def user_responder_node(state: SupervisorAgentState) -> dict:
    response = state.get("supervisor_response", "エラーが発生したか、応答がありませんでした。")
    print(f"
[ユーザー応答ノード] スーパーバイザーからの応答: {response}")
    return {"messages": [AIMessage(content=response, name="SupervisorDirectResponse")], "next_action": "FINISH"}

# --- 3. ルーター関数の定義 ---
def ____(state: SupervisorAgentState) -> str:
    decision = state.get("next_action")
    print(f"  -> ルーター(SupervisorDecision): 次のアクションは「{decision}」です。")
    if decision == "DelegateToResearcher": return "researcher"
    if decision == "DelegateToWriter": return "writer"
    if decision == "RespondToUser": return "user_responder"
    return END # 終了 (解答例より)

# --- 4. グラフの構築 ---
workflow_q2_ch4 = StateGraph(SupervisorAgentState)
workflow_q2_ch4.____("supervisor", supervisor_agent)
workflow_q2_ch4.add_node("researcher", simple_researcher_node)
workflow_q2_ch4.add_node("writer", simple_writer_node)
workflow_q2_ch4.add_node("user_responder", user_responder_node)

workflow_q2_ch4.____("supervisor")

workflow_q2_ch4.____(
    "supervisor", route_by_supervisor_decision,
    {
        "researcher": "researcher", "writer": "writer", 
        "user_responder": "user_responder", END: END
    }
)
workflow_q2_ch4.add_conditional_edges("researcher", route_by_supervisor_decision, {END: END}) 
workflow_q2_ch4.add_conditional_edges("writer", route_by_supervisor_decision, {END: END})
workflow_q2_ch4.add_conditional_edges("user_responder", route_by_supervisor_decision, {END: END})

graph_q2_ch4 = workflow_q2_ch4.compile()

In [None]:
# 解答欄002 - グラフ可視化
from IPython.display import Image, display # 解答例より

try:
    display(Image(graph_q2_ch4.____().____()))
except Exception as e:
    print(f"グラフ描画に失敗: {e}")

In [None]:
# 解答欄002 - グラフ実行
test_requests_q2 = [
    "LangGraphの分散型エージェントアーキテクチャについて調査し、その利点をまとめてください。", # 解答例より
    "AIが創造性を発揮する事例について、感動的なブログ記事を執筆してください。", # 解答例より
    "今日の天気は良いですね！" # 解答例より
]

for i, req in enumerate(test_requests_q2):
    print(f"
--- スーパーバイザーテスト {i+1} (リクエスト: {req}) ---")
    initial_state_q2_ch4 = {
        "messages": [____(content=req)], "____": req, # 解答例より
        "____": None, "____": None, "research_result": None,
        "____": None, "final_draft": None, "____": None
    }
    thread_q2 = {"____": {"thread_id": f"supervisor-test-{i}-{uuid4()[:4]}"}}}
    final_run_state_q2 = None # 解答例より
    for event in graph_q2_ch4.____(initial_state_q2_ch4, config=thread_q2, ____=5): # 解答例より config 追加, recursion_limit は元の解答から
        print(f"Event: {event}")
        if END in event: final_run_state_q2 = event[END] # 解答例より
        print("----");
    
    if not final_run_state_q2: final_run_state_q2 = graph_q2_ch4.get_state(thread_q2).values # 解答例より

    print("
  最終結果:")
    if final_run_state_q2.get("research_result"): # 解答例より
        print(f"    調査結果: {final_run_state_q2['research_result'][:100]}...")
    if final_run_state_q2.get("final_draft"): # 解答例より
        print(f"    最終ドラフト: {final_run_state_q2['final_draft'][:100]}...")
    if final_run_state_q2.get("supervisor_response") and not final_run_state_q2.get("research_result") and not final_run_state_q2.get("final_draft"): # 解答例より
        print(f"    スーパーバイザーからの直接応答: {final_run_state_q2['supervisor_response']}")

<details><summary>解答002</summary>

``````python
from typing import TypedDict, Annotated, List, Optional, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_community.tools.tavily_search import TavilySearchResults # search_tool用
from langchain_core.tools import tool # dummy_search_tool用
from IPython.display import Image, display
from uuid import uuid4
import re

# --- 1. 状態定義 ---
class SupervisorAgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    user_request: str
    next_action: Optional[Literal["DelegateToResearcher", "DelegateToWriter", "RespondToUser", "FINISH"]]
    research_task_description: Optional[str]
    research_result: Optional[str]
    writing_task_description: Optional[str]
    final_draft: Optional[str]
    supervisor_response: Optional[str] 

# --- 2. エージェント（ノード）定義 ---
def supervisor_agent(state: SupervisorAgentState) -> dict:
    print(f"
[スーパーバイザーエージェント]")
    user_req = state.get("user_request") or (state["messages"][-1].content if state.get("messages") else "")
    print(f"  ユーザーリクエスト: {user_req}")
    
    next_act: Optional[Literal["DelegateToResearcher", "DelegateToWriter", "RespondToUser", "FINISH"]] = None
    research_desc, write_desc, sup_resp = None, None, None

    if LLM_PROVIDER != "fake" and llm:
        system_prompt = (
            "あなたはタスクを分析し、リサーチャー、ライター、または自分自身（ユーザーへの応答）のいずれに "
            "処理を割り振るかを決定するスーパーバイザーです。"
            "可能なアクションは、「DelegateToResearcher: [調査指示]」、「DelegateToWriter: [執筆指示]」、"
            "「RespondToUser: [ユーザーへの直接応答内容]」、「FINISH」のいずれかの形式で答えてください。"
            "指示や応答内容には必ず具体的な内容を入れてください。"
            "例1: ユーザーが「明日の東京の天気を調べて」と依頼したら、「DelegateToResearcher: 明日の東京の天気調査」と応答します。"
            "例2: ユーザーが「AI倫理に関する記事を書いて」と依頼したら、「DelegateToWriter: AI倫理に関する記事執筆」と応答します。"
            "例3: ユーザーが「こんにちは」と挨拶したら、「RespondToUser: こんにちは！ご用件は何でしょう？」と応答します。"
        )
        response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=user_req)])
        decision_text = response.content.strip()
        print(f"  LLMによる判断: {decision_text}")
        if decision_text.startswith("DelegateToResearcher:"):
            next_act = "DelegateToResearcher"
            research_desc = decision_text.replace("DelegateToResearcher:", "").strip() or user_req # 指示が空なら元のリクエスト
        elif decision_text.startswith("DelegateToWriter:"):
            next_act = "DelegateToWriter"
            write_desc = decision_text.replace("DelegateToWriter:", "").strip() or user_req
        elif decision_text.startswith("RespondToUser:"):
            next_act = "RespondToUser"
            sup_resp = decision_text.replace("RespondToUser:", "").strip() or "ごめんなさい、よくわかりませんでした。"
        elif decision_text == "FINISH":
            next_act = "FINISH"
        else: 
            print("  WARN: LLMが期待した形式で判断を返しませんでした。フォールバックロジックを使用します。")
            # フォールバックロジック (FakeLLMと同様)
            if any(kw in user_req.lower() for kw in ["調べ", "調査", "research", "find out"]):
                next_act = "DelegateToResearcher"; research_desc = user_req
            elif any(kw in user_req.lower() for kw in ["書い", "記事", "作成", "write", "article"]):
                next_act = "DelegateToWriter"; write_desc = user_req
            else:
                next_act = "RespondToUser"; sup_resp = "ご要望を理解できませんでした。具体的な指示をいただけますか？"
    else: 
        if any(kw in user_req.lower() for kw in ["調べ", "調査", "research", "find out"]):
            next_act = "DelegateToResearcher"; research_desc = user_req
        elif any(kw in user_req.lower() for kw in ["書い", "記事", "作成", "write", "article"]):
            next_act = "DelegateToWriter"; write_desc = user_req
        else:
            next_act = "RespondToUser"; sup_resp = "FakeSupervisor: ご要望を理解できませんでした。"
            
    print(f"  決定された次のアクション: {next_act}")
    return {
        "messages": [AIMessage(content=f"スーパーバイザー判断: {next_act}. 指示: {research_desc or write_desc or sup_resp}", name="Supervisor")],
        "next_action": next_act,
        "research_task_description": research_desc,
        "writing_task_description": write_desc,
        "supervisor_response": sup_resp,
        # 他のエージェントの結果フィールドはクリアしない（後続のスーパーバイザー判断で使う可能性があるため）
    }

def simple_researcher_node(state: SupervisorAgentState) -> dict:
    task = state.get("research_task_description", "指定なし")
    print(f"
[簡易リサーチャーノード] タスク: {task}")
    result = f"「{task}」に関するダミー調査結果です。非常に詳細な情報が得られました。"
    if search_tool and task != "指定なし" and TAVILY_API_KEY and search_tool.name != "dummy_search_tool":
        try: result = search_tool.invoke(task)
        except Exception as e: print(f"  リサーチャーツールエラー: {e}")
    elif search_tool and search_tool.name == "dummy_search_tool": result = search_tool.invoke(task)
    print(f"  調査結果: {str(result)[:100]}...")
    return {"messages": [AIMessage(content=str(result), name="Researcher") ], "research_result": str(result), "next_action": "FINISH"} 

def simple_writer_node(state: SupervisorAgentState) -> dict:
    task_desc = state.get("writing_task_description", "トピック指定なし")
    research_data = state.get("research_result") # リサーチャーの結果も使えるように
    input_for_writing = f"執筆指示: {task_desc}\n" 
    if research_data: input_for_writing += f"利用可能な調査結果: {research_data[:150]}...\n"
    print(f"
[簡易ライターノード] {input_for_writing}")
    draft = f"「{task_desc}」についての素晴らしい記事が完成しました。内容は次の通り..."
    if LLM_PROVIDER != "fake" and llm:
        response = llm.invoke(f"以下の指示と情報に基づいて記事を作成してください。\n{input_for_writing}")
        draft = response.content
    print(f"  作成ドラフト: {draft[:100]}...")
    return {"messages": [AIMessage(content=draft, name="Writer")], "final_draft": draft, "next_action": "FINISH"}

def user_responder_node(state: SupervisorAgentState) -> dict:
    response = state.get("supervisor_response", "エラーが発生したか、応答がありませんでした。")
    print(f"
[ユーザー応答ノード] スーパーバイザーからの応答: {response}")
    return {"messages": [AIMessage(content=response, name="SupervisorDirectResponse")], "next_action": "FINISH"}

# --- 3. ルーター関数の定義 ---
def route_by_supervisor_decision(state: SupervisorAgentState) -> str:
    decision = state.get("next_action")
    print(f"  -> ルーター(SupervisorDecision): 次のアクションは「{decision}」です。")
    if decision == "DelegateToResearcher": return "researcher"
    if decision == "DelegateToWriter": return "writer"
    if decision == "RespondToUser": return "user_responder"
    return END 

# --- 4. グラフの構築 ---
workflow_q2_ch4 = StateGraph(SupervisorAgentState)
workflow_q2_ch4.add_node("supervisor", supervisor_agent)
workflow_q2_ch4.add_node("researcher", simple_researcher_node)
workflow_q2_ch4.add_node("writer", simple_writer_node)
workflow_q2_ch4.add_node("user_responder", user_responder_node)

workflow_q2_ch4.set_entry_point("supervisor")

workflow_q2_ch4.add_conditional_edges(
    "supervisor", route_by_supervisor_decision,
    {
        "researcher": "researcher", "writer": "writer", 
        "user_responder": "user_responder", END: END
    }
)
workflow_q2_ch4.add_conditional_edges("researcher", route_by_supervisor_decision, {END: END}) # 実行後、next_action='FINISH'で終了
workflow_q2_ch4.add_conditional_edges("writer", route_by_supervisor_decision, {END: END})
workflow_q2_ch4.add_conditional_edges("user_responder", route_by_supervisor_decision, {END: END})

graph_q2_ch4 = workflow_q2_ch4.compile()
try: display(Image(graph_q2_ch4.get_graph().draw_png()))
except Exception as e: print(f"グラフ描画失敗: {e}")

# --- 5. グラフの実行 ---
test_requests_q2 = [
    "LangGraphの分散型エージェントアーキテクチャについて調査し、その利点をまとめてください。",
    "AIが創造性を発揮する事例について、感動的なブログ記事を執筆してください。",
    "今日の天気は良いですね！"
]

for i, req in enumerate(test_requests_q2):
    print(f"
--- スーパーバイザーテスト {i+1} (リクエスト: {req}) ---")
    initial_state_q2_ch4 = {
        "messages": [HumanMessage(content=req)], "user_request": req,
        "next_action": None, "research_task_description": None, "research_result": None,
        "writing_task_description": None, "final_draft": None, "supervisor_response": None
    }
    thread_q2 = {"configurable": {"thread_id": f"supervisor-test-{i}-{uuid4()[:4]}"}}}
    final_run_state_q2 = None
    for event in graph_q2_ch4.stream(initial_state_q2_ch4, config=thread_q2, recursion_limit=5):
        print(f"Event: {event}")
        # streamの各要素は {'node_name': state_update_dict } という形式
        # 最後のイベントがENDノードからのものであれば、そのvalueが最終状態
        if END in event:
            final_run_state_q2 = event[END]
        print("----");
    
    if not final_run_state_q2: # ENDから取得できなかった場合 (例: recursion limit)
        final_run_state_q2 = graph_q2_ch4.get_state(thread_q2).values

    print("
  最終結果:")
    if final_run_state_q2.get("research_result"):
        print(f"    調査結果: {final_run_state_q2['research_result'][:100]}...")
    if final_run_state_q2.get("final_draft"):
        print(f"    最終ドラフト: {final_run_state_q2['final_draft'][:100]}...")
    if final_run_state_q2.get("supervisor_response") and not final_run_state_q2.get("research_result") and not final_run_state_q2.get("final_draft"):
        print(f"    スーパーバイザーからの直接応答: {final_run_state_q2['supervisor_response']}")
``````
</details>

<details><summary>解説002</summary>

#### この問題のポイント

*   **スーパーバイザーの役割:** `supervisor_agent` ノードが、ユーザーの要求 (`user_request`) を解釈し、次にどのアクションを取るべきか（`next_action`）、そしてそのための具体的な指示（`research_task_description` や `writing_task_description`）を決定します。この判断は、実際のシステムではLLMの推論能力に大きく依存します。解答例では、LLMが期待する形式で判断を返すようにプロンプトで指示し、もし期待通りでなければフォールバックとしてキーワードベースの単純なロジックで割り振りを試みています。
*   **状態によるタスク情報の伝達:** スーパーバイザーが決定したタスク指示は、状態オブジェクトの対応するキー（例: `research_task_description`）に格納され、委任先の専門エージェントノード（`simple_researcher_node` など）はその情報を読み取って処理を実行します。
*   **ルーターによる処理分岐:** `route_by_supervisor_decision` ルーターが、スーパーバイザーの決定（`state['next_action']`）に基づいて、次に実行する専門エージェントのノード、またはユーザーへの直接応答ノード、あるいは終了（`END`）へと処理を振り分けます。
*   **専門エージェントの処理:** `simple_researcher_node` や `simple_writer_node` は、割り当てられたタスクを実行し、その結果を状態に書き戻します。この問題では、各専門エージェントは一度処理を実行したら `next_action` を `"FINISH"` に設定し、スーパーバイザーには戻らずに処理を終了する単純な流れになっています。
*   **拡張性:** この基本構造は、より多くの専門エージェントを追加したり、専門エージェントの処理後に再度スーパーバイザーに結果を報告させて次の指示を仰ぐような、より複雑な協調ワークフローへと拡張する際の基礎となります。

---</details>

### ■ 問題003: スーパーバイザーによる逐次連携ワークフロー

問題002のスーパーバイザーモデルを拡張し、複数の専門エージェントがスーパーバイザーの指示のもとで逐次的に連携するワークフローを構築します。例えば、「トピックについて調査（リサーチャー）し、その結果を元に記事を作成（ライター）し、最後に記事をレビュー（レビュアー）する」といった流れです。スーパーバイザーは各ステップの完了を確認し、次のエージェントに必要な情報を渡しながらタスクを進めます。

*   **学習内容:**
    *   スーパーバイザーがタスクの進行状況を管理し、複数のエージェントに順番に処理を委任していく方法。
    *   状態（State）に現在のタスクフェーズ（例: "RESEARCHING", "WRITING", "REVIEWING"）や、各エージェントの成果物を保持し、それらを次のエージェントに引き渡す方法。
    *   スーパーバイザーが全工程の完了を判断し、最終成果物を生成（または選択）して終了するロジック。

In [None]:
# 解答欄003 - グラフ構築
from typing import TypedDict, List, Optional, Annotated, Literal # Literal をインポート (解答例より)
from langgraph.graph import ____, END # (既にインポート済みだが明示)
from langgraph.graph.message import add_messages # (既にインポート済みだが明示)
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage # (既にインポート済みだが明示)
from uuid import uuid4 # 解答例より
import re # 解答例より

# --- 1. 状態定義 ---
class ____(____):
    messages: Annotated[List[BaseMessage], add_messages]
    user_request: str
    ____: Optional[Literal["PLANNING", "RESEARCHING", "WRITING", "REVIEWING", "DONE", "ERROR"]] # 解答例よりOptional追加
    ____: Optional[str]
    research_findings: Optional[str]
    ____: Optional[str]
    review_comments: Optional[str]
    final_product: Optional[str]
    error_message: Optional[str]

# --- 2. エージェント（ノード）定義 ---
def ____(state: SequentialWorkflowState) -> dict:
    current_phase = state.get("current_phase") # 解答例より
    user_req = state.get("user_request", "") # 解答例より
    print(f"
[スーパーバイザー・プランナー] 現在フェーズ: {current_phase}, リクエスト: {user_req[:50]}...") # 解答例より
    
    if not current_phase: # 初回実行 (解答例より)
        topic = user_req # 解答例より
        match = re.search(r"「([^」]+)」について", user_req) or re.search(r"『([^』]+)』について", user_req) # 解答例より
        if match: topic = match.group(1) # 解答例より
        else: topic = user_req.replace("に関する記事を作成","").replace("について記事を書いて","").strip("。 ")[:30] # 解答例より
        
        print(f"  -> 計画: リサーチフェーズ開始。トピック: {topic}")
        return {"current_phase": "RESEARCHING", "research_topic": topic, "messages": [AIMessage(content=f"計画: 「{topic}」の調査を開始します。", name="Supervisor")]}
    
    next_phase: Optional[Literal["PLANNING", "RESEARCHING", "WRITING", "REVIEWING", "DONE", "ERROR"]] = None # 解答例より
    supervisor_log = ""
    update_dict = {} # 解答例より

    if current_phase == "RESEARCHING": # 解答例より
        if state.get("research_findings"):
            next_phase = "WRITING"
            supervisor_log = "調査完了。執筆フェーズへ。"
        elif state.get("error_message"): next_phase = "ERROR"
    elif current_phase == "WRITING": # 解答例より
        if state.get("article_draft"):
            next_phase = "REVIEWING"
            supervisor_log = "執筆完了。レビューフェーズへ。"
        elif state.get("error_message"): next_phase = "ERROR"
    elif current_phase == "REVIEWING": # 解答例より
        if state.get("review_comments"):
            next_phase = "DONE"
            supervisor_log = f"レビュー完了。コメント: 「{state['review_comments']}」。処理を終了します。"
            update_dict["final_product"] = state.get("article_draft") 
        elif state.get("error_message"): next_phase = "ERROR"
    
    if state.get("error_message") and not next_phase: # 解答例より
        next_phase = "ERROR"
        supervisor_log = f"エラー発生のため処理を中断します: {state['error_message']}"
        
    if next_phase: # 解答例より
        print(f"  -> スーパーバイザー判断: 次のフェーズ「{next_phase}」へ。ログ: {supervisor_log}")
        update_dict["current_phase"] = next_phase
        update_dict["messages"] = [AIMessage(content=supervisor_log if supervisor_log else f"次のフェーズ: {next_phase}", name="Supervisor")]
        return update_dict
    
    print("  -> スーパーバイザー判断: 現状維持または不明な状態。") # 解答例より
    return {} # 変更なし (解答例より)

def ____(state: SequentialWorkflowState) -> dict:
    topic = state["research_topic"]
    print(f"
[リサーチノード] トピック: {topic}")
    findings = f"「{topic}」に関するダミー調査結果。ポイントX, Y, Z。" # 解答例より
    print(f"  -> 調査結果: {findings[:100]}...")
    return {"research_findings": findings, "messages": [AIMessage(content=findings, name="Researcher")]}

def ____(state: SequentialWorkflowState) -> dict:
    findings = state["research_findings"]
    topic = state["research_topic"]
    print(f"
[ライティングノード] 調査結果に基づいて「{topic}」の記事を作成します。")
    draft = f"タイトル: {topic}の全貌\n\n{findings}\n\nこの記事は、提供された情報に基づきAIによって生成されました。" # 解答例より
    print(f"  -> 作成ドラフト: {draft[:100]}...")
    return {"article_draft": draft, "messages": [AIMessage(content=draft, name="Writer")]}

def ____(state: SequentialWorkflowState) -> dict:
    draft = state["article_draft"]
    print(f"
[レビューノード] ドラフトをレビューします: {draft[:50]}...")
    comments = "素晴らしい内容です。特に導入部分が読者の興味を引きます。改善点は特に見当たりません。" # 解答例より
    print(f"  -> レビューコメント: {comments}")
    return {"review_comments": comments, "messages": [AIMessage(content=comments, name="Reviewer")]}

def error_node(state: SequentialWorkflowState) -> dict:
    err_msg = state.get('error_message', '不明なエラーが発生しました。処理を終了します。') # 解答例より
    print(f"
[エラー処理ノード] エラーメッセージ: {err_msg}")
    return {"messages": [AIMessage(content=f"エラー発生: {err_msg}", name="Error Handler")]} # 解答例より

# --- 3. ルーター関数の定義 ---
def ____(state: SequentialWorkflowState) -> str:
    phase = state.get("current_phase")
    print(f"  -> ルーター(Phase): 現在のフェーズは「{phase}」です。")
    if phase == "RESEARCHING": return "researcher"
    if phase == "WRITING": return "writer"
    if phase == "REVIEWING": return "reviewer"
    if phase == "DONE": return END
    if phase == "ERROR": return "error_handler"
    print(f"  WARN: 不明なフェーズ「{phase}」です。スーパーバイザーに戻します。") # 解答例より
    return "supervisor_planner" # 不明なフェーズはスーパーバイザーに戻す (解答例より)

# --- 4. グラフの構築 ---
workflow_q3_ch4 = StateGraph(SequentialWorkflowState)
workflow_q3_ch4.add_node("supervisor_planner", supervisor_planner_node)
workflow_q3_ch4.add_node("researcher", research_node)
workflow_q3_ch4.add_node("writer", writing_node)
workflow_q3_ch4.add_node("reviewer", review_node)
workflow_q3_ch4.add_node("error_handler", error_node)

workflow_q3_ch4.set_entry_point("supervisor_planner") 

workflow_q3_ch4.add_conditional_edges(
    "supervisor_planner", route_by_phase,
    {
        "researcher": "researcher", "writer": "writer", "reviewer": "reviewer",
        "supervisor_planner": "supervisor_planner", # ルーターが不明なフェーズと判断した場合など (解答例より)
        END: END, "error_handler": "error_handler"
    }
)

workflow_q3_ch4.____("researcher", "supervisor_planner") # 解答例より
workflow_q3_ch4.add_edge("writer", "supervisor_planner") # 解答例より
workflow_q3_ch4.add_edge("reviewer", "supervisor_planner") # 解答例より

workflow_q3_ch4.add_edge("error_handler", END) 

graph_q3_ch4 = workflow_q3_ch4.compile()

In [None]:
# 解答欄003 - グラフ可視化
from IPython.display import Image, display # 解答例より

try:
    display(Image(graph_q3_ch4.____().____()))
except Exception as e:
    print(f"グラフの可視化に失敗しました。Graphvizが正しくインストールされているか確認してください。エラー: {e}")

In [None]:
# 解答欄003 - グラフ実行
user_req_q3 = "LangGraphの条件付きエッジ機能について、その利点と簡単な使用例を含む技術ブログ記事を作成してください。"
initial_state_q3_ch4 = {
    "messages": [____(content=user_req_q3)], "____": user_req_q3, # 解答例より
    "____": None, "research_topic": None, "____": None, 
    "____": None, "____": None, "____": None, "error_message": None
}
thread_q3 = {"____": {"thread_id": f"seq-workflow-{uuid4()[:4]}"}}}

print(f"--- 逐次連携ワークフローテスト (リクエスト: {user_req_q3}) ---")
final_q3_state_val = None
for event in graph_q3_ch4.____(initial_state_q3_ch4, config=thread_q3, recursion_limit=10): # 解答例より recursion_limit 変更
    print(f"Event: {event}")
    if END in event: final_q3_state_val = event[END]
    print("----");

if not final_q3_state_val: final_q3_state_val = graph_q3_ch4.____(thread_q3).values

print("
  最終成果物:")
if final_q3_state_val.get("final_product"):
    print(f"    {final_q3_state_val['final_product']}")
elif final_q3_state_val.get("error_message"):
    print(f"    エラー: {final_q3_state_val['error_message']}")
else:
    print(f"    最終成果物がありませんでした。最終フェーズ: {final_q3_state_val.get('current_phase')}") # 解答例より

<details><summary>解答003</summary>

``````python
from typing import TypedDict, List, Optional, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from IPython.display import Image, display
from uuid import uuid4
import re

# --- 1. 状態定義 ---
class SequentialWorkflowState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    user_request: str
    current_phase: Optional[Literal["PLANNING", "RESEARCHING", "WRITING", "REVIEWING", "DONE", "ERROR"]]
    research_topic: Optional[str]
    research_findings: Optional[str]
    article_draft: Optional[str]
    review_comments: Optional[str]
    final_product: Optional[str]
    error_message: Optional[str]

# --- 2. エージェント（ノード）定義 ---
def supervisor_planner_node(state: SequentialWorkflowState) -> dict:
    current_phase = state.get("current_phase")
    user_req = state.get("user_request", "")
    print(f"
[スーパーバイザー・プランナー] 現在フェーズ: {current_phase}, リクエスト: {user_req[:50]}...")
    
    if not current_phase: # 初回実行
        # 簡単なリクエスト解析（実際はLLMでトピック抽出）
        topic = user_req
        match = re.search(r"「([^」]+)」について", user_req) or re.search(r"『([^』]+)』について", user_req)
        if match: topic = match.group(1)
        else: topic = user_req.replace("に関する記事を作成","").replace("について記事を書いて","").strip("。 ")[:30] # 短縮
        
        print(f"  -> 計画: リサーチフェーズ開始。トピック: {topic}")
        return {"current_phase": "RESEARCHING", "research_topic": topic, "messages": [AIMessage(content=f"計画: 「{topic}」の調査を開始します。", name="Supervisor")]}
    
    next_phase: Optional[Literal["PLANNING", "RESEARCHING", "WRITING", "REVIEWING", "DONE", "ERROR"]] = None
    supervisor_log = ""
    update_dict = {}

    if current_phase == "RESEARCHING":
        if state.get("research_findings"):
            next_phase = "WRITING"
            supervisor_log = "調査完了。執筆フェーズへ。"
        elif state.get("error_message"): next_phase = "ERROR"
    elif current_phase == "WRITING":
        if state.get("article_draft"):
            next_phase = "REVIEWING"
            supervisor_log = "執筆完了。レビューフェーズへ。"
        elif state.get("error_message"): next_phase = "ERROR"
    elif current_phase == "REVIEWING":
        if state.get("review_comments"):
            next_phase = "DONE"
            supervisor_log = f"レビュー完了。コメント: 「{state['review_comments']}」。処理を終了します。"
            update_dict["final_product"] = state.get("article_draft") # レビュー後のドラフトを最終成果物とする
        elif state.get("error_message"): next_phase = "ERROR"
    
    if state.get("error_message") and not next_phase:
        next_phase = "ERROR"
        supervisor_log = f"エラー発生のため処理を中断します: {state['error_message']}"
        
    if next_phase:
        print(f"  -> スーパーバイザー判断: 次のフェーズ「{next_phase}」へ。ログ: {supervisor_log}")
        update_dict["current_phase"] = next_phase
        update_dict["messages"] = [AIMessage(content=supervisor_log if supervisor_log else f"次のフェーズ: {next_phase}", name="Supervisor")]
        return update_dict
    
    print("  -> スーパーバイザー判断: 現状維持または不明な状態。")
    return {} # 変更なし

def research_node(state: SequentialWorkflowState) -> dict:
    topic = state["research_topic"]
    print(f"
[リサーチノード] トピック: {topic}")
    findings = f"「{topic}」に関するダミー調査結果。ポイントX, Y, Z。"
    # if search_tool and TAVILY_API_KEY and search_tool.name != "dummy_search_tool":
    #     try: findings = search_tool.invoke(topic)
    #     except Exception as e: return {"error_message": str(e), "current_phase": state["current_phase"]}
    print(f"  -> 調査結果: {findings[:100]}...")
    return {"research_findings": findings, "messages": [AIMessage(content=findings, name="Researcher")]}

def writing_node(state: SequentialWorkflowState) -> dict:
    findings = state["research_findings"]
    topic = state["research_topic"]
    print(f"
[ライティングノード] 調査結果に基づいて「{topic}」の記事を作成します。")
    draft = f"タイトル: {topic}の全貌\n\n{findings}\n\nこの記事は、提供された情報に基づきAIによって生成されました。"
    print(f"  -> 作成ドラフト: {draft[:100]}...")
    return {"article_draft": draft, "messages": [AIMessage(content=draft, name="Writer")]}

def review_node(state: SequentialWorkflowState) -> dict:
    draft = state["article_draft"]
    print(f"
[レビューノード] ドラフトをレビューします: {draft[:50]}...")
    comments = "素晴らしい内容です。特に導入部分が読者の興味を引きます。改善点は特に見当たりません。"
    print(f"  -> レビューコメント: {comments}")
    return {"review_comments": comments, "messages": [AIMessage(content=comments, name="Reviewer")]}

def error_node(state: SequentialWorkflowState) -> dict:
    err_msg = state.get('error_message', '不明なエラーが発生しました。処理を終了します。')
    print(f"
[エラー処理ノード] エラーメッセージ: {err_msg}")
    # final_product にエラー情報を入れてもよい
    return {"messages": [AIMessage(content=f"エラー発生: {err_msg}", name="Error Handler")]}

# --- 3. ルーター関数の定義 ---
def route_by_phase(state: SequentialWorkflowState) -> str:
    phase = state.get("current_phase")
    print(f"  -> ルーター(Phase): 現在のフェーズは「{phase}」です。")
    if phase == "RESEARCHING": return "researcher"
    if phase == "WRITING": return "writer"
    if phase == "REVIEWING": return "reviewer"
    if phase == "DONE": return END
    if phase == "ERROR": return "error_handler"
    # 初期状態や予期せぬフェーズの場合はスーパーバイザーに戻すか、エラーにする
    # ここでは、supervisor_plannerが最初のフェーズを設定するので、基本的には上記に分岐するはず
    print(f"  WARN: 不明なフェーズ「{phase}」です。スーパーバイザーに戻します。")
    return "supervisor_planner" 

# --- 4. グラフの構築 ---
workflow_q3_ch4 = StateGraph(SequentialWorkflowState)
workflow_q3_ch4.add_node("supervisor_planner", supervisor_planner_node)
workflow_q3_ch4.add_node("researcher", research_node)
workflow_q3_ch4.add_node("writer", writing_node)
workflow_q3_ch4.add_node("reviewer", review_node)
workflow_q3_ch4.add_node("error_handler", error_node)

workflow_q3_ch4.set_entry_point("supervisor_planner")

workflow_q3_ch4.add_conditional_edges(
    "supervisor_planner", route_by_phase,
    {
        "researcher": "researcher", "writer": "writer", "reviewer": "reviewer",
        "supervisor_planner": "supervisor_planner", # ルーターが不明なフェーズと判断した場合など
        END: END, "error_handler": "error_handler"
    }
)

workflow_q3_ch4.add_edge("researcher", "supervisor_planner")
workflow_q3_ch4.add_edge("writer", "supervisor_planner")
workflow_q3_ch4.add_edge("reviewer", "supervisor_planner")
workflow_q3_ch4.add_edge("error_handler", END)

graph_q3_ch4 = workflow_q3_ch4.compile()
try: display(Image(graph_q3_ch4.get_graph().draw_png()))
except Exception as e: print(f"グラフ描画失敗: {e}")

# --- 5. グラフの実行 ---
user_req_q3 = "LangGraphの条件付きエッジ機能について、その利点と簡単な使用例を含む技術ブログ記事を作成してください。"
initial_state_q3_ch4 = {
    "messages": [HumanMessage(content=user_req_q3)], "user_request": user_req_q3,
    "current_phase": None, "research_topic": None, "research_findings": None, 
    "article_draft": None, "review_comments": None, "final_product": None, "error_message": None
}
thread_q3 = {"configurable": {"thread_id": f"seq-workflow-{uuid4()[:4]}"}}}

print(f"--- 逐次連携ワークフローテスト (リクエスト: {user_req_q3}) ---")
final_q3_state_val = None
for event in graph_q3_ch4.stream(initial_state_q3_ch4, config=thread_q3, recursion_limit=10):
    print(f"Event: {event}")
    if END in event: final_q3_state_val = event[END]
    print("----");

if not final_q3_state_val: final_q3_state_val = graph_q3_ch4.get_state(thread_q3).values

print("
  最終成果物:")
if final_q3_state_val.get("final_product"):
    print(f"    {final_q3_state_val['final_product']}")
elif final_q3_state_val.get("error_message"):
    print(f"    エラー: {final_q3_state_val['error_message']}")
else:
    print(f"    最終成果物がありませんでした。最終フェーズ: {final_q3_state_val.get('current_phase')}")
``````
</details>

<details><summary>解説003</summary>

#### この問題のポイント

*   **スーパーバイザーによるフェーズ管理:** `supervisor_planner_node` がワークフロー全体の進行管理を行います。状態キー `current_phase` を見て、次にどの専門エージェントを呼び出すべきか（または処理を終了すべきか）を決定します。
    *   初回実行時は、`user_request` からトピックを（簡易的に）抽出し、`current_phase` を "RESEARCHING" に設定してリサーチャーを起動します。
    *   各専門エージェント（リサーチャー、ライター、レビュアー）の処理が完了すると、エッジは再び `supervisor_planner_node` に戻ります。スーパーバイザーは、その時点での状態（例: `research_findings` が存在するか）を確認し、次のフェーズ（例: "WRITING"）に進むよう `current_phase` を更新します。
*   **専門エージェントの役割:**
    *   `research_node`: `research_topic` に基づいて調査を行い、結果を `research_findings` に格納します。
    *   `writing_node`: `research_findings` を利用して記事ドラフトを `article_draft` に作成します。
    *   `review_node`: `article_draft` をレビューし、コメントを `review_comments` に格納します。
    *   各専門ノードは、自身の処理結果を状態に書き込んだ後、制御をスーパーバイザーに戻します。
*   **ルーター (`route_by_phase`):** スーパーバイザーが更新した `current_phase` の値に基づいて、実際に次に実行する専門エージェントのノード、またはエラー処理ノード、あるいは終了 (`END`) へと処理をルーティングします。
*   **逐次処理の実現:** 「専門エージェント処理 → スーパーバイザーによる次のフェーズ決定 → ルーターによる分岐」というサイクルを繰り返すことで、リサーチ、執筆、レビューという一連のタスクが順番に実行されます。
*   **エラーハンドリング:** 各専門エージェントの処理中にエラーが発生した場合（この解答例ではシミュレートしていませんが、発生しうる）、`error_message` に情報を記録し、スーパーバイザーがそれを検知して `error_handler_node` に処理を移すような拡張が考えられます（解答例では簡易的なエラーパスのみ）。
*   **最終成果物:** 全てのフェーズが正常に完了すると（この例ではレビュー完了後）、スーパーバイザーは `current_phase` を "DONE" に設定し、`final_product` に最終的な成果物（ここではレビュー後の記事ドラフト）を格納して終了します。

このパターンは、複数のステップからなる複雑なタスクを、各ステップの専門家（エージェント）に分担させ、スーパーバイザーが全体を統括する、という現実世界のプロジェクト進行に近い形で自動化する際に有効です。

---</details>

### ■ 問題004: 階層型エージェント - タスクの委任と報告

エージェントの組織構造を階層的にすることで、より複雑な問題解決に対応できます。この問題では、上位の「マネージャーエージェント」がタスクを受け取り、それをより具体的なサブタスクに分解して、下位の「ワーカーエージェント」に委任します。ワーカーエージェントはサブタスクを実行し、その結果をマネージャーに報告。マネージャーは全ワーカーの結果を統合して最終的な成果を出す、という階層的な協調作業をシミュレートします。

*   **学習内容:**
    *   異なる階層レベルのエージェント（マネージャー、ワーカー）を定義し、それぞれの役割（タスク分解、サブタスク実行、結果統合）を実装する方法。
    *   マネージャーがタスクを複数のサブタスクに分割し、それらを状態を通じてワーカーに渡す方法。
    *   ワーカーが並列または逐次でサブタスクを実行し、結果を状態に格納する方法（ファンアウト・ファンインの応用）。
    *   マネージャーがワーカーからの報告を集約し、最終的な応答を生成するプロセス。

In [None]:
# 解答欄004 - グラフ構築
from typing import TypedDict, List, Optional, Annotated, Dict, Any
from langgraph.graph import StateGraph, END # 解答例より (既にインポート済みだが明示)
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage # 解答例より (既にインポート済みだが明示)
from langchain_core.tools import tool # 解答例より (既にインポート済みだが明示)
from uuid import uuid4 # 解答例より (既にインポート済みだが明示)
import re # 解答例より (既にインポート済みだが明示)
import json # 解答例より (既にインポート済みだが明示)

# --- 1. 状態定義 ---
class ____(____):
    messages: Annotated[List[BaseMessage], add_messages]
    ____: str
    ____: Optional[List[Dict[str, Any]]] 
    ____: Optional[str]
    # current_worker_index: Optional[int] # マネージャーの実行調整ノード内で管理・更新されるので、状態としては必須ではない (解答例より)
    # error_message: Optional[str] # エラー処理を追加する場合 (解答例より)

# --- 2. ワーカーエージェントのシミュレーション関数 ---
def ____(worker_id: str, instruction: str, dependencies_results: Optional[Dict[str, str]] = None) -> str: # 解答例より
    print(f"    [ワーカー {worker_id}] 指示: 「{instruction}」を実行開始。")
    if dependencies_results:
        print(f"      依存結果: {dependencies_results}")
    
    result = f"ワーカー「{worker_id}」による「{instruction[:20]}...」の高品質な実行結果。"
    if "天気" in instruction.lower() and search_tool and search_tool.name != "dummy_search_tool" and TAVILY_API_KEY:
        try: result = search_tool.invoke(instruction)
        except Exception as e: result = f"天気検索エラー: {e}"
    elif "レポート作成" in instruction and dependencies_results:
        report_content = "レポート:\n"
        for task_id, dep_res in dependencies_results.items():
            report_content += f"  - {task_id}の結果: {str(dep_res)[:50]}...\n"
        result = report_content + "上記を統合しました。"
    
    print(f"    -> ワーカー {worker_id} の結果: {str(result)[:80]}...")
    return str(result)

# --- 3. マネージャーエージェント（ノード）定義 ---
def ____(state: HierarchicalAgentState) -> dict: # 解答例よりノード名変更
    print(f"
[マネージャー: タスク分解] メインタスク: {state['main_task_description']}")
    sub_tasks_list: List[Dict[str, Any]] = [] # 解答例より
    main_task_lower = state['main_task_description'].lower()
    
    if "天気" in main_task_lower and ("観光" in main_task_lower or "スポット" in main_task_lower) and "レポート" in main_task_lower: # 解答例の条件分岐
        location = "東京" 
        if "大阪" in main_task_lower: location = "大阪"
        sub_tasks_list.append({'id': 'task_weather', 'instruction': f'{location}の今日の天気調査', 'assigned_to': 'WeatherWorker', 'status': 'pending', 'result': None, 'dependencies': []}) # 解答例より dependencies 追加
        sub_tasks_list.append({'id': 'task_spots', 'instruction': f'{location}の主要観光スポット3箇所調査', 'assigned_to': 'TourismWorker', 'status': 'pending', 'result': None, 'dependencies': []}) # 解答例より dependencies 追加
        sub_tasks_list.append({'id': 'task_report', 'instruction': f'{location}の天気と観光スポットに関する統合レポート作成', 'assigned_to': 'ReportWriter', 'status': 'pending', 'result': None, '____': ['task_weather', 'task_spots']}) # 解答例より dependencies 追加
    else:
        sub_tasks_list.append({'id': 'task_general', 'instruction': state['main_task_description'], 'assigned_to': 'GeneralWorker', 'status': 'pending', 'result': None, 'dependencies': []}) # 解答例より dependencies 追加
        
    print(f"  分解されたサブタスク: {json.dumps(sub_tasks_list, ensure_ascii=False, indent=2)}") # 解答例より
    return {"sub_tasks": sub_tasks_list, "messages": [AIMessage(content=f"タスクを{len(sub_tasks_list)}個のサブタスクに分解しました。", name="ManagerDecomposer")]} # current_worker_index は coordinator で管理 (解答例より)

def ____(state: HierarchicalAgentState) -> dict: # 解答例よりノード名変更
    print(f"
[マネージャー: 実行調整・ワーカー呼び出し]")
    current_sub_tasks = state.get("sub_tasks", [])
    updated_sub_tasks = [st.copy() for st in current_sub_tasks] # 解答例より
    all_done = True # 解答例より
    
    for i, task in enumerate(updated_sub_tasks): # 解答例より
        if task["status"] == "pending":
            all_done = False 
            can_execute = True
            dependency_results_for_worker = {}
            if task.get("dependencies"):
                for dep_id in task["dependencies"]:
                    dep_task = next((st for st in updated_sub_tasks if st["id"] == dep_id), None)
                    if not dep_task or dep_task["status"] != "completed":
                        can_execute = False
                        print(f"  サブタスク「{task['id']}」は依存先「{dep_id}」が未完了のため待機します。")
                        break
                    dependency_results_for_worker[dep_id] = dep_task.get("result")
            
            if can_execute:
                print(f"  サブタスク「{task['id']}」({task['instruction']}) をワーカー「{task['assigned_to']}」に実行させます。")
                task_result = simulated_worker_execution(task['assigned_to'], task['instruction'], dependency_results_for_worker)
                updated_sub_tasks[i]["result"] = task_result
                updated_sub_tasks[i]["status"] = "completed"
                break 
    
    if all_done and any(st["status"] == "completed" for st in updated_sub_tasks): # 解答例より
         print("  全サブタスクの処理が完了したか、これ以上進められるタスクがありません。")
         return {"sub_tasks": updated_sub_tasks, "messages": [AIMessage(content="全サブタスクの実行調整が完了。", name="ManagerCoordinator")]}
    
    return {"sub_tasks": updated_sub_tasks, "messages": [AIMessage(content="サブタスク実行調整中...", name="ManagerCoordinator")]}

def ____(state: HierarchicalAgentState) -> dict: # 解答例よりノード名変更
    print("
[マネージャー: 結果集約]")
    sub_tasks = state.get("sub_tasks", [])
    final_report_parts = [] # 解答例より
    for task in sub_tasks:
        if task["status"] == "completed" and task["result"] is not None: # 解答例より is not None 追加
            final_report_parts.append(f"- 「{task['instruction'][:30]}...」の結果: {task['result'][:70]}...") # 解答例より
        else: # 解答例より
            final_report_parts.append(f"- 「{task['instruction'][:30]}...」は未完了または結果なし (状態: {task['status']})。") # 解答例より
    
    final_report = "統合最終報告書:\n" + "\n".join(final_report_parts) # 解答例より
    if LLM_PROVIDER != "fake" and llm and any(st["status"] == "completed" for st in sub_tasks): # 解答例より
        prompt = f"以下のサブタスクの実行結果を元に、ユーザーへの最終報告をまとめてください。\n{final_report}"
        final_report = llm.invoke(prompt).content

    print(f"  最終報告: {final_report[:200]}...") # 解答例より出力文字数変更
    return {"final_aggregated_result": final_report, "messages": [AIMessage(content=final_report, name="ManagerAggregator")]} # 解答例より name 変更

# --- 4. ルーター関数 ---
def ____(state: HierarchicalAgentState) -> str: # 解答例より関数名変更
    sub_tasks = state.get("sub_tasks")
    if not sub_tasks: 
        print("  -> ルーター(Hierarchical): 計画未作成のため、タスク分解ノードへ。") # 解答例より
        return "manager_decomposer_node" 
    
    all_completed = all(task.get("status") == "completed" for task in sub_tasks) # 解答例より
    
    if all_completed: # 解答例より
        print("  -> ルーター(Hierarchical): 全サブタスク完了。結果集約ノードへ。")
        return "manager_aggregator_node"
    else: # 解答例より
        print("  -> ルーター(Hierarchical): 未完了のサブタスクあり。実行調整ノードへ。")
        return "manager_coordinator_node" # 実行調整ノードへのキー (解答例より)

# --- 5. グラフの構築 ---
workflow_q4_ch4 = StateGraph(HierarchicalAgentState)
workflow_q4_ch4.add_node("manager_decomposer_node", manager_decomposer_node) # 解答例よりノード名変更
workflow_q4_ch4.add_node("manager_coordinator_node", manager_executor_coordinator_node) 
workflow_q4_ch4.add_node("manager_aggregator_node", manager_aggregator_node) # 解答例よりノード名変更

workflow_q4_ch4.____( # 解答例よりエントリーポイント変更
    route_hierarchical_manager_actions,
    {
        "manager_decomposer_node": "manager_decomposer_node",
        "manager_coordinator_node": "manager_coordinator_node",
        "manager_aggregator_node": "manager_aggregator_node"
    }
)

workflow_q4_ch4.add_edge("manager_decomposer_node", "manager_coordinator_node") # 解答例より

workflow_q4_ch4.add_conditional_edges(
    "manager_coordinator_node",
    route_hierarchical_manager_actions, 
    {
        "manager_coordinator_node": "manager_coordinator_node", 
        "manager_aggregator_node": "manager_aggregator_node",
        "manager_decomposer_node": "manager_decomposer_node" 
    }
)
workflow_q4_ch4.add_edge("manager_aggregator_node", END)

graph_q4_ch4 = workflow_q4_ch4.compile()

In [None]:
# 解答欄004 - グラフ可視化
from IPython.display import Image, display # 解答例より

try:
    display(Image(graph_q4_ch4.____().____()))
except Exception as e:
    print(f"グラフ描画に失敗: {e}")

In [None]:
# 解答欄004 - グラフ実行
main_task_q4 = "東京の天気と主要観光スポット（3箇所）を調べて、それらをまとめた短いレポートを作成してください。" # 解答例より
initial_state_q4_ch4 = {
    "messages": [____(content=main_task_q4)],
    "____": main_task_q4,
    "____": None, "____": None # current_worker_index, error_message は削除 (解答例より)
}
thread_q4 = {"____": {"____": f"hierarchical-agent-{uuid4()[:4]}"}}}

print(f"--- 階層型エージェントテスト (メインタスク: {main_task_q4}) ---")
final_q4_state_val = None
for ____, event in ____(graph_q4_ch4.____(initial_state_q4_ch4, config=thread_q4, recursion_limit=15)): # 解答例より event_idx 追加
    print(f"Event {event_idx}: {event}") # 解答例より event_idx 追加
    if END in event: final_q4_state_val = event[END]
    print("----");

if not final_q4_state_val: final_q4_state_val = graph_q4_ch4.____(thread_q4).values

print("
  最終成果物:")
if final_q4_state_val.get("final_aggregated_result"):
    print(f"    {final_q4_state_val['final_aggregated_result']}")
else:
    print("    最終成果物が生成されませんでした。")
print("
  サブタスク状況:") # 解答例より追加
for st in final_q4_state_val.get("sub_tasks",[]): # 解答例より追加
    print(f"    - ID: {st['id']}, ステータス: {st['status']}, 結果: {str(st.get('result','N/A'))[:50]}...") # 解答例より追加

<details><summary>解答004</summary>

``````python
from typing import TypedDict, List, Optional, Annotated, Dict, Any
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, ToolCall # ToolMessage, ToolCallは直接は使わないが概念として
from langchain_core.tools import tool
from IPython.display import Image, display
from uuid import uuid4
import re
import json

# --- 1. 状態定義 ---
class HierarchicalAgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    main_task_description: str
    sub_tasks: Optional[List[Dict[str, Any]]]
    final_aggregated_result: Optional[str]
    # current_worker_indexはマネージャーの実行調整ノード内で管理・更新されるので、状態としては必須ではない
    # error_message: Optional[str] # エラー処理を追加する場合

# --- 2. ワーカーエージェントのシミュレーション関数 ---
# 実際のワーカーは独立したグラフやLLMChainである可能性が高い
def simulated_worker_execution(worker_id: str, instruction: str, dependencies_results: Optional[Dict[str, str]] = None) -> str:
    print(f"    [ワーカー {worker_id}] 指示: 「{instruction}」を実行開始。")
    if dependencies_results:
        print(f"      依存結果: {dependencies_results}")
    
    # ダミー処理 (LLM呼び出しやツール実行を模倣)
    result = f"ワーカー「{worker_id}」による「{instruction[:20]}...」の高品質な実行結果。"
    if "天気" in instruction.lower() and search_tool and search_tool.name != "dummy_search_tool" and TAVILY_API_KEY:
        try: result = search_tool.invoke(instruction)
        except Exception as e: result = f"天気検索エラー: {e}"
    elif "レポート作成" in instruction and dependencies_results:
        # 依存結果を使ってレポート作成を模倣
        report_content = "レポート:\n"
        for task_id, dep_res in dependencies_results.items():
            report_content += f"  - {task_id}の結果: {str(dep_res)[:50]}...\n"
        result = report_content + "上記を統合しました。"
    
    print(f"    -> ワーカー {worker_id} の結果: {str(result)[:80]}...")
    return str(result)

# --- 3. マネージャーエージェントのノード定義 ---
def manager_decomposer_node(state: HierarchicalAgentState) -> dict:
    print(f"
[マネージャー: タスク分解] メインタスク: {state['main_task_description']}")
    sub_tasks_list: List[Dict[str, Any]] = []
    main_task_lower = state['main_task_description'].lower()
    
    # ダミータスク分解ロジック (実際はLLMが行う)
    if "天気" in main_task_lower and ("観光" in main_task_lower or "スポット" in main_task_lower) and "レポート" in main_task_lower:
        location = "東京" # デフォルト
        if "大阪" in main_task_lower: location = "大阪"
        sub_tasks_list.append({'id': 'task_weather', 'instruction': f'{location}の今日の天気調査', 'assigned_to': 'WeatherWorker', 'status': 'pending', 'result': None, 'dependencies': []})
        sub_tasks_list.append({'id': 'task_spots', 'instruction': f'{location}の主要観光スポット3箇所調査', 'assigned_to': 'TourismWorker', 'status': 'pending', 'result': None, 'dependencies': []})
        sub_tasks_list.append({'id': 'task_report', 'instruction': f'{location}の天気と観光スポットに関する統合レポート作成', 'assigned_to': 'ReportWriter', 'status': 'pending', 'result': None, 'dependencies': ['task_weather', 'task_spots']})
    else:
        sub_tasks_list.append({'id': 'task_general', 'instruction': state['main_task_description'], 'assigned_to': 'GeneralWorker', 'status': 'pending', 'result': None, 'dependencies': []})
        
    print(f"  分解されたサブタスク: {json.dumps(sub_tasks_list, ensure_ascii=False, indent=2)}")
    return {"sub_tasks": sub_tasks_list, "messages": [AIMessage(content=f"タスクを{len(sub_tasks_list)}個のサブタスクに分解しました。", name="ManagerDecomposer")]}

def manager_executor_coordinator_node(state: HierarchicalAgentState) -> dict:
    print(f"
[マネージャー: 実行調整・ワーカー呼び出し]")
    current_sub_tasks = state.get("sub_tasks", [])
    updated_sub_tasks = [st.copy() for st in current_sub_tasks] # 変更用にコピー
    all_done = True
    
    for i, task in enumerate(updated_sub_tasks):
        if task["status"] == "pending":
            all_done = False # まだ保留中のタスクがある
            # 依存関係チェック
            can_execute = True
            dependency_results_for_worker = {}
            if task.get("dependencies"):
                for dep_id in task["dependencies"]:
                    dep_task = next((st for st in updated_sub_tasks if st["id"] == dep_id), None)
                    if not dep_task or dep_task["status"] != "completed":
                        can_execute = False
                        print(f"  サブタスク「{task['id']}」は依存先「{dep_id}」が未完了のため待機します。")
                        break
                    dependency_results_for_worker[dep_id] = dep_task.get("result")
            
            if can_execute:
                print(f"  サブタスク「{task['id']}」({task['instruction']}) をワーカー「{task['assigned_to']}」に実行させます。")
                # ワーカー実行のシミュレーション
                task_result = simulated_worker_execution(task['assigned_to'], task['instruction'], dependency_results_for_worker)
                updated_sub_tasks[i]["result"] = task_result
                updated_sub_tasks[i]["status"] = "completed"
                # このループでは1ステップに1タスク実行（または全実行可能タスク実行）とする
                # ここでは1つ実行したら一度マネージャーに戻る形にするためbreak
                # (より高度な並列実行や依存関係解決は複雑になる)
                break 
    
    if all_done and any(st["status"] == "completed" for st in updated_sub_tasks): # 全て完了（または初期からタスクなし）
         print("  全サブタスクの処理が完了したか、これ以上進められるタスクがありません。")
         # next_action を設定してルーターに判断させる (このノードでは直接FINISHさせない)
         return {"sub_tasks": updated_sub_tasks, "messages": [AIMessage(content="全サブタスクの実行調整が完了。", name="ManagerCoordinator")]}
    
    return {"sub_tasks": updated_sub_tasks, "messages": [AIMessage(content="サブタスク実行調整中...", name="ManagerCoordinator")]}

def manager_aggregator_node(state: HierarchicalAgentState) -> dict:
    print("
[マネージャー: 結果集約]")
    sub_tasks = state.get("sub_tasks", [])
    final_report_parts = []
    for task in sub_tasks:
        if task["status"] == "completed" and task["result"] is not None:
            final_report_parts.append(f"- 「{task['instruction'][:30]}...」の結果: {task['result'][:70]}...")
        else:
            final_report_parts.append(f"- 「{task['instruction'][:30]}...」は未完了または結果なし (状態: {task['status']})。")
    
    final_report = "統合最終報告書:\n" + "\n".join(final_report_parts)
    if LLM_PROVIDER != "fake" and llm and any(st["status"] == "completed" for st in sub_tasks):
        prompt = f"以下のサブタスクの実行結果を元に、ユーザーへの最終報告をまとめてください。\n{final_report}"
        final_report = llm.invoke(prompt).content

    print(f"  最終報告: {final_report[:200]}...")
    return {"final_aggregated_result": final_report, "messages": [AIMessage(content=final_report, name="ManagerAggregator")]}

# --- 4. ルーター関数 ---
def route_hierarchical_manager_actions(state: HierarchicalAgentState) -> str:
    sub_tasks = state.get("sub_tasks")
    if not sub_tasks: # 初回、または計画がまだない
        print("  -> ルーター(Hierarchical): 計画未作成のため、タスク分解ノードへ。")
        return "manager_decomposer_node" 
    
    # すべてのタスクが 'completed' になっているか確認
    all_completed = all(task.get("status") == "completed" for task in sub_tasks)
    
    if all_completed:
        print("  -> ルーター(Hierarchical): 全サブタスク完了。結果集約ノードへ。")
        return "manager_aggregator_node"
    else:
        print("  -> ルーター(Hierarchical): 未完了のサブタスクあり。実行調整ノードへ。")
        return "manager_coordinator_node"

# --- 5. グラフの構築 ---
workflow_q4_ch4 = StateGraph(HierarchicalAgentState)
workflow_q4_ch4.add_node("manager_decomposer_node", manager_decomposer_node)
workflow_q4_ch4.add_node("manager_coordinator_node", manager_executor_coordinator_node)
workflow_q4_ch4.add_node("manager_aggregator_node", manager_agent_aggregator)

# エントリポイントはルーターにし、初期状態に基づいて最初のノードを決定
workflow_q4_ch4.set_conditional_entry_point(
    route_hierarchical_manager_actions,
    {
        "manager_decomposer_node": "manager_decomposer_node",
        "manager_coordinator_node": "manager_coordinator_node", # 通常ここには直接来ないはず
        "manager_aggregator_node": "manager_aggregator_node" # 通常ここには直接来ないはず
    }
)

workflow_q4_ch4.add_edge("manager_decomposer_node", "manager_coordinator_node") # 分解後は必ず調整へ
workflow_q4_ch4.add_conditional_edges(
    "manager_coordinator_node",
    route_hierarchical_manager_actions, # 実行調整後、再度ルーターで判断
    {
        "manager_coordinator_node": "manager_coordinator_node", # 未完了タスクあれば再度調整 (ループ)
        "manager_aggregator_node": "manager_aggregator_node",
        "manager_decomposer_node": "manager_decomposer_node" # 基本ここには来ない想定
    }
)
workflow_q4_ch4.add_edge("manager_aggregator_node", END)

graph_q4_ch4 = workflow_q4_ch4.compile()
try: display(Image(graph_q4_ch4.get_graph().draw_png()))
except Exception as e: print(f"グラフ描画失敗: {e}")

# --- 6. グラフの実行 ---
main_task_q4 = "東京の天気と主要観光スポット（3箇所）を調べて、それらをまとめた短いレポートを作成してください。"
initial_state_q4_ch4 = {
    "messages": [HumanMessage(content=main_task_q4)],
    "main_task_description": main_task_q4,
    "sub_tasks": None, "final_aggregated_result": None
}
thread_q4 = {"configurable": {"thread_id": f"hierarchical-agent-{uuid4()[:4]}"}}}

print(f"--- 階層型エージェントテスト (メインタスク: {main_task_q4}) ---")
final_q4_state_val = None
for event_idx, event in enumerate(graph_q4_ch4.stream(initial_state_q4_ch4, config=thread_q4, recursion_limit=15)):
    print(f"Event {event_idx}: {event}")
    if END in event: final_q4_state_val = event[END]
    print("----");

if not final_q4_state_val: final_q4_state_val = graph_q4_ch4.get_state(thread_q4).values

print("
  最終成果物:")
if final_q4_state_val.get("final_aggregated_result"):
    print(f"    {final_q4_state_val['final_aggregated_result']}")
else:
    print("    最終成果物が生成されませんでした。")
print("
  サブタスク状況:")
for st in final_q4_state_val.get("sub_tasks",[]):
    print(f"    - ID: {st['id']}, ステータス: {st['status']}, 結果: {str(st.get('result','N/A'))[:50]}...")
``````
</details>

<details><summary>解説004</summary>

#### この問題のポイント

*   **階層構造の役割分担:**
    *   **マネージャーエージェント:** 複数のノード（`manager_decomposer_node`、`manager_executor_coordinator_node`、`manager_aggregator_node`）にまたがって実装されています。
        *   `manager_decomposer_node`: メインのタスクを受け取り、それを実行可能なサブタスクのリストに分解（プランニング）します。各サブタスクには、指示内容、担当ワーカー（想定）、依存関係などが含まれます。
        *   `manager_executor_coordinator_node`: 分解されたサブタスクの実行を調整・管理します。どのサブタスクを次に実行すべきか（依存関係を考慮しつつ）、どのワーカーに委任するかを決定し、ワーカーの実行を（この例ではシミュレーションで）トリガーします。全てのサブタスクが完了するまで、この調整役を繰り返します。
        *   `manager_aggregator_node`: 全てのサブタスクが完了した後、それらの結果を収集・統合し、最終的な成果物（レポートなど）を生成します。
    *   **ワーカーエージェント（`simulated_worker_execution`）:** マネージャーから委任された特定のサブタスクを実行します。この解答例では、ワーカーは独立したノードではなく、マネージャーの調整ノード内で呼び出されるシミュレーション関数として実装されていますが、実際のシステムでは各ワーカーが専用のツールや特化LLMを持つ独立したグラフやLangChainのChainとして実装されることが多いです。
*   **状態 (`HierarchicalAgentState`):**
    *   `main_task_description`: マネージャーが最初に受け取るタスク。
    *   `sub_tasks`: マネージャーが生成したサブタスクのリスト。各要素は辞書で、サブタスクID、指示、担当、現在の状態（pending, completed）、実行結果、依存タスクIDリストなどを持ちます。
    *   `final_aggregated_result`: マネージャーが集約して作成した最終成果物。
*   **グラフのフローとロジック:**
    1.  エントリーポイントはルーター `route_hierarchical_manager_actions` ですが、初期状態では `sub_tasks` がないため、`manager_decomposer_node` に処理が移ります。
    2.  `manager_decomposer_node` がサブタスクリストを作成します。
    3.  その後、`manager_coordinator_node` が呼び出されます。このノードは `sub_tasks` リストをイテレートし、実行可能な（依存関係が解決された）ペンディング中のタスクを見つけて、（シミュレーションで）ワーカーに実行させ、結果を `sub_tasks` 内の対応するタスクに記録し、ステータスを "completed" に更新します。この調整処理は、全てのタスクが完了するまでルーターを介して繰り返されます。
    4.  `route_hierarchical_manager_actions` ルーターは、全てのサブタスクが完了したと判断すると、`manager_aggregator_node` に処理を移します。
    5.  `manager_aggregator_node` が全サブタスクの結果を統合し、最終成果物を作成して `END` で終了します。
*   **依存関係の簡易処理:** この解答例では、サブタスク間の依存関係（例: レポート作成は天気調査と観光スポット調査が完了してから）を `dependencies` キーで表現し、`manager_executor_coordinator_node` が簡易的にチェックしています。より堅牢なシステムでは、タスクスケジューリングや依存関係解決のためのより洗練されたロジックが必要になります。
*   **ワーカーの抽象化:** `simulated_worker_execution` 関数は、様々な種類のワーカーの処理を模倣しています。実際には、各ワーカー（例: `WeatherWorker`, `TourismWorker`, `ReportWriter`）は、それぞれ特化したツールやプロンプトを持つLLM、あるいは専用のLangGraphサブグラフとして実装されるでしょう。

この階層型アプローチは、複雑な問題を管理しやすく、各コンポーネントの専門性を高めることができるため、高度なAIエージェントシステムの設計において非常に強力なパラダイムです。

---</details>

### ■ 問題005: 第4章のまとめ - 複数エージェントによる協調型リサーチボット

第4章で学んだマルチエージェントのコンセプト（基本的な2エージェント会話、スーパーバイザーによるタスク割り振り、逐次連携、階層型委任）を組み合わせ、より洗練された「協調型リサーチボット」を構築します。このボットは以下の役割を持つエージェントで構成されることを目指します（全てを完全に実装する必要はなく、主要な連携フローを示すことを目標とします）。

1.  **ユーザーインターフェースエージェント（UI Agent）:** ユーザーからのリサーチ要求を受け付け、明確化する。
2.  **プランニングスーパーバイザーエージェント（Planner Supervisor）:** 明確化された要求に基づき、リサーチ計画（必要な情報収集タスク、分析タスクなど）を立案し、適切な専門エージェントにタスクを割り振る。
3.  **専門リサーチエージェント（Specialist Researcher(s)）:** 特定の種類の情報収集（例: ウェブ検索、データベース検索、特定APIからのデータ取得など）を担当する。複数のリサーチャーが並行して動作することも考えられる。
4.  **データ分析エージェント（Data Analyst）:** 収集された情報を分析・統合し、洞察を抽出する。
5.  **レポート生成エージェント（Report Generator）:** 分析結果と洞察に基づいて、最終的なレポートを作成する。
6.  （オプション）**レビュー・品質管理エージェント（Reviewer Agent）:** 生成されたレポートの品質をチェックし、必要であれば修正を指示する。

*   **学習内容:**
    *   複数の異なる役割と能力を持つエージェントを定義し、それらを一つの状態（State）とグラフ定義の下で連携させる複雑なワークフローの設計。
    *   スーパーバイザーを中心としたタスクの委任、専門エージェントによる処理、結果の報告と統合、というマルチエージェントシステムの典型的なパターンの実装。
    *   状態（State）の設計が、エージェント間の情報共有とワークフロー全体の制御においていかに重要であるかの再確認。
    *   （概念的に）エラーハンドリングや、特定条件下での処理の分岐（例: リサーチ結果が不十分なら追加リサーチ）なども組み込む余地があることの理解。

In [None]:
# 解答欄005 - グラフ構築
from typing import TypedDict, List, Optional, Annotated, Dict, Any, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
import json
from uuid import uuid4
import re # 解答例より

# --- 0. 共通ツールの準備 (search_toolは準備セルで定義済み) ---
# @tool def summarize_text(text: str) -> str: ... (必要なら定義)

# --- 1. 状態定義 ---
____ = Literal[
    "CLARIFYING_REQUEST", "PLANNING", "RESEARCHING", "ANALYZING", 
    "REPORTING", "REVIEWING", "FINALIZING", "DONE", "ERROR"
]

class ____(____):
    messages: Annotated[List[BaseMessage], add_messages]
    user_original_request: str
    ____: Optional[str]
    ____: Optional[List[Dict[str, Any]]] 
    ____: Dict[str, Any] 
    analysis_summary: Optional[str]
    report_draft: Optional[str]
    review_feedback: Optional[str]
    final_report: Optional[str]
    current_phase: Optional[ResearchPhase]
    ____: Optional[str] 
    error_message_for_chapter4: Optional[str] 

# --- 2. 各エージェント/ノードの簡易的な実装 ---
def ____(state: CollaborativeResearchBotState) -> dict:
    print(f"
[UIエージェント] 元のリクエスト: {state['user_original_request']}")
    clarified = state['user_original_request']
    print(f"  -> 明確化されたリクエスト (今回は変更なし): {clarified}") # 解答例より
    return {"clarified_request": clarified, "current_phase": "PLANNING", "messages": [AIMessage(content=f"リクエスト「{clarified}」として承りました。計画を作成します。", name="UIAgent")]}

def ____(state: CollaborativeResearchBotState) -> dict:
    req = state.get("clarified_request", state["user_original_request"]) # 解答例より
    print(f"
[プランニングスーパーバイザー] リクエスト「{req}」の計画立案開始。")
    plan: List[Dict[str, Any]] = [] # 解答例より
    if "LangGraph" in req and "マルチエージェント" in req and ("ベストプラクティス" in req or "レポート" in req): # 解答例より
        plan.append({'task_id': 'lg_ma_bp_search', 'instruction': 'LangGraphマルチエージェントのベストプラクティスに関する情報をウェブで検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]})
        plan.append({'task_id': 'lg_ma_ex_search', 'instruction': 'LangGraphマルチエージェントの具体的なコード例や応用事例を検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]})
        plan.append({'task_id': 'lg_ma_analysis', 'instruction': '上記検索結果(ベストプラクティスと事例)を統合・分析し、主要な設計パターンと利点を抽出', 'assigned_to': 'DataAnalyst', 'status': 'pending', 'result': None, 'dependencies':['lg_ma_bp_search', 'lg_ma_ex_search']})
        plan.append({'task_id': 'lg_ma_reporting', 'instruction': '分析結果に基づき、LangGraphマルチエージェントのベストプラクティスに関する簡潔なレポートを作成', 'assigned_to': 'ReportGenerator', 'status': 'pending', 'result': None, 'dependencies':['lg_ma_analysis']})
    else:
        plan.append({'task_id': 'generic_search_main', 'instruction': f'{req}についてウェブで包括的に検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]}) # 解答例より
        plan.append({'task_id': 'generic_report_main', 'instruction': f'{req}の検索結果を元に要点をまとめたレポートを作成', 'assigned_to': 'ReportGenerator', 'status': 'pending', 'result': None, 'dependencies':['generic_search_main']}) # 解答例より
    print(f"  -> 生成された計画: {json.dumps(plan, ensure_ascii=False, indent=2)}")
    return {"research_plan": plan, "current_phase": "RESEARCHING", "collected_data": {}, "messages": [AIMessage(content=f"計画を立案しました: {len(plan)}ステップ。リサーチを開始します。", name="PlannerSupervisor")]} # 解答例より

def web_searcher_worker(state: CollaborativeResearchBotState) -> dict:
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"}
    
    print(f"
[WebSearcherワーカー] タスク「{task['instruction']}」を実行します。")
    search_query = task['instruction']
    search_result_content = f"「{search_query}」に関するウェブ検索結果: 多数の関連情報が見つかりました。主要な情報源はA, B, Cです。" # 解答例より
    if search_tool and search_tool.name != "dummy_search_tool" and TAVILY_API_KEY:
        try: search_result_content = str(search_tool.invoke({"query": search_query})) # 解答例より
        except Exception as e: search_result_content = f"検索エラー({search_query}): {e}"
    elif search_tool: search_result_content = str(search_tool.invoke(search_query)) # 解答例より
    
    print(f"  -> 検索結果 ({task_id}): {search_result_content[:100]}...")
    updated_collected_data = state.get("collected_data", {}).copy()
    updated_collected_data[task_id] = search_result_content
    updated_plan = [p.copy() for p in plan] # 解答例より
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = search_result_content; break
    return {"collected_data": updated_collected_data, "research_plan": updated_plan, "messages": [AIMessage(content=f"検索タスク「{task_id}」完了。結果を保存しました。", name="WebSearcher")]} # 解答例より

def data_analyst_worker(state: CollaborativeResearchBotState) -> dict: 
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"} # 解答例より
    print(f"
[DataAnalystワーカー] タスク「{task['instruction']}」を実行します。")
    input_for_analysis = ""
    for dep_id in task.get('dependencies', []):
        input_for_analysis += f"依存データ({dep_id}): {str(state.get('collected_data',{}).get(dep_id,'データなし'))[:100]}...\n" # 解答例より
    analysis = f"分析結果 ({task['instruction']}):\n{input_for_analysis}上記データから、重要な洞察として「パターンX」と「傾向Y」が抽出されました。これらは相互に関連しています。" # 解答例より
    print(f"  -> 分析結果 ({task_id}): {analysis}")
    updated_collected_data = state.get("collected_data", {}).copy()
    updated_collected_data[task_id] = analysis 
    updated_plan = [p.copy() for p in plan] # 解答例より
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = analysis; break
    return {"collected_data": updated_collected_data, "research_plan": updated_plan, "analysis_summary": analysis, "messages": [AIMessage(content=f"分析タスク「{task_id}」完了。サマリーを生成しました。", name="DataAnalyst")]} # 解答例より

def report_generator_worker(state: CollaborativeResearchBotState) -> dict:
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"} # 解答例より
    print(f"
[ReportGeneratorワーカー] タスク「{task['instruction']}」を実行します。")
    input_for_report = ""
    for dep_id in task.get('dependencies', []):
        input_for_report += f"参照データ({dep_id}): {str(state.get('collected_data',{}).get(dep_id,'データなし'))[:100]}...\n" # 解答例より
    report = f"最終レポート: {task['instruction']}\n{input_for_report}上記情報を総合的に判断し、以下に要点をまとめました。\n1. 主要な発見点A\n2. 注目すべき傾向B\n3. 今後の課題C\n以上が本件に関する報告です。" # 解答例より
    print(f"  -> レポートドラフト ({task_id}): {report[:150]}...")
    updated_plan = [p.copy() for p in plan] # 解答例より
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = report; break
    return {"report_draft": report, "research_plan": updated_plan, "messages": [AIMessage(content=f"レポート生成タスク「{task_id}」完了。", name="ReportGenerator")]} # 解答例より

def ____(state: CollaborativeResearchBotState) -> dict:
    print(f"
[統括スーパーバイザー] 現在フェーズ: {state.get('current_phase')}, 直前処理タスクID: {state.get('current_task_id_being_processed')}") # 解答例より
    current_phase = state.get("current_phase")
    plan = state.get("research_plan", [])
    next_phase: Optional[ResearchPhase] = None
    next_task_id_to_process: Optional[str] = None # 解答例より変数名変更
    supervisor_log = "" # 解答例より

    if state.get("error_message_for_chapter4"): next_phase = "ERROR"; supervisor_log = f"エラー発生: {state['error_message_for_chapter4']}" # 解答例より
    elif current_phase == "PLANNING": next_phase = "RESEARCHING"; supervisor_log = "計画に基づきリサーチ開始。"
    elif current_phase == "RESEARCHING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == 'WebSearcher'), None)
        if pending_task: next_task_id_to_process = pending_task['task_id']; next_phase = "RESEARCHING"; supervisor_log = f"リサーチタスク「{next_task_id_to_process}」へ。"
        else: next_phase = "ANALYZING"; supervisor_log = "全リサーチ完了。分析へ。"
    elif current_phase == "ANALYZING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == "DataAnalyst"), None)
        if pending_task:
            deps_completed = all(state.get("collected_data",{}).get(dep_id) for dep_id in pending_task.get('dependencies',[])) # 解答例より
            if deps_completed: next_task_id_to_process = pending_task['task_id']; next_phase = "ANALYZING"; supervisor_log = f"分析タスク「{next_task_id_to_process}」へ。"
            else: supervisor_log = f"分析タスク「{pending_task['task_id']}」の依存未解決。待機。"
        else: next_phase = "REPORTING"; supervisor_log = "全分析完了。レポート作成へ。"
    elif current_phase == "REPORTING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == "ReportGenerator"), None)
        if pending_task:
            deps_completed = all(state.get("collected_data",{}).get(dep_id) for dep_id in pending_task.get('dependencies',[])) # 解答例より
            if deps_completed: next_task_id_to_process = pending_task['task_id']; next_phase = "REPORTING"; supervisor_log = f"レポートタスク「{next_task_id_to_process}」へ。"
            else: supervisor_log = f"レポートタスク「{pending_task['task_id']}」の依存未解決。待機。"
        else: next_phase = "DONE"; supervisor_log = "全レポート作成完了。終了。"; return {"current_phase": next_phase, "final_report": state.get("report_draft"), "messages":[AIMessage(content=supervisor_log, name="OverallSupervisor")]} # 解答例より
    
    if next_phase:
        print(f"  -> 統括SV判断: 次フェーズ「{next_phase}」{', タスクID「'+next_task_id_to_process+'」' if next_task_id_to_process else ''}. ログ: {supervisor_log}") # 解答例より
        return {"current_phase": next_phase, "current_task_id_being_processed": next_task_id_to_process, "messages": [AIMessage(content=supervisor_log, name="OverallSupervisor")]}
    
    print(f"  -> 統括SV判断: フェーズ「{current_phase}」で待機または進展なし。") # 解答例より
    return {"messages": [AIMessage(content=f"フェーズ「{current_phase}」で待機中。", name="OverallSupervisor")]} # 解答例より

# --- 3. ルーター定義 ---
def ____(state: CollaborativeResearchBotState) -> str:
    phase = state.get("current_phase")
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    print(f"  -> ルーター(PhaseRouter): フェーズ「{phase}」、タスクID「{task_id}」")

    if phase == "ERROR": return END 
    if phase == "DONE": return END

    if task_id: # 解答例より
        task_to_run = next((t for t in plan if t['task_id'] == task_id and t['status'] == 'pending'), None)
        if task_to_run:
            assigned_worker = task_to_run['assigned_to']
            if assigned_worker == "WebSearcher": return "web_searcher"
            if assigned_worker == "DataAnalyst": return "data_analyst"
            if assigned_worker == "ReportGenerator": return "report_generator"
    
    return "overall_supervisor" # 解答例より

# --- 4. グラフ構築 ---
workflow_q5_ch4 = StateGraph(CollaborativeResearchBotState)
workflow_q5_ch4.add_node("ui_agent", ui_agent_clarifier)
workflow_q5_ch4.add_node("planner_supervisor", planning_supervisor_agent)
workflow_q5_ch4.add_node("overall_supervisor", overall_supervisor_node)
workflow_q5_ch4.add_node("web_searcher", web_searcher_worker)
workflow_q5_ch4.add_node("data_analyst", data_analyst_worker)
workflow_q5_ch4.add_node("report_generator", report_generator_worker)

workflow_q5_ch4.set_entry_point("ui_agent")
workflow_q5_ch4.add_edge("ui_agent", "planner_supervisor") 
workflow_q5_ch4.add_edge("planner_supervisor", "overall_supervisor") 

workflow_q5_ch4.add_conditional_edges("overall_supervisor", route_by_current_phase,
    {"web_searcher": "web_searcher", "data_analyst": "data_analyst", 
     "report_generator": "report_generator", "overall_supervisor": "overall_supervisor", END: END}) # 解答例より

workflow_q5_ch4.add_edge("web_searcher", "overall_supervisor") # 解答例より
workflow_q5_ch4.add_edge("data_analyst", "overall_supervisor") # 解答例より
workflow_q5_ch4.add_edge("report_generator", "overall_supervisor") # 解答例より

graph_q5_ch4 = workflow_q5_ch4.compile()

In [None]:
# 解答欄005 - グラフ可視化
from IPython.display import Image, display # 解答例より

try:
    display(Image(graph_q5_ch4.____().____()))
except Exception as e:
    print(f"グラフの可視化に失敗しました。Graphvizが正しくインストールされているか確認してください。エラー: {e}")

In [None]:
# 解答欄005 - グラフ実行
user_main_request_q5 = "LangGraphを使ったマルチエージェントシステム構築のベストプラクティスについて包括的に調査し、その結果を詳細な技術レポートとしてまとめてください。" # 解答例より
initial_state_q5_ch4 = {
    "messages": [____(content=user_main_request_q5)], "____": user_main_request_q5, # 解答例より
    "clarified_request": None, "____": None, "____": {},
    "____": None, "____": None, "review_feedback": None,
    "____": None, "____": None, 
    "current_task_id_being_processed": None, "error_message_for_chapter4": None
}
thread_q5 = {"____": {"thread_id": f"collab-research-bot-{uuid4()[:4]}"}}}

print(f"--- 協調型リサーチボットテスト (リクエスト: {user_main_request_q5}) ---")
final_q5_val = None
for event_idx, event in enumerate(graph_q5_ch4.____(initial_state_q5_ch4, config=thread_q5, recursion_limit=25)): # 解答例より recursion_limit変更
    print(f"Event {event_idx}: {event}") # 解答例より
    if END in event: final_q5_val = event[END]
    print("----");

if not final_q5_val: final_q5_val = graph_q5_ch4.get_state(thread_q5).values

print("
  最終成果物:")
if final_q5_val.get("final_report"):
    print(f"    {final_q5_val['final_report']}")
elif final_q5_val.get("error_message_for_chapter4"):
    print(f"    エラー終了: {final_q5_val['error_message_for_chapter4']}")
else:
    print(f"    最終成果物がありませんでした。最終フェーズ: {final_q5_val.get('current_phase')}")
print("
  収集データ概要:")
for task_id, data_item in final_q5_val.get("collected_data", {}).items():
    print(f"    - {task_id}: {str(data_item)[:100]}...")

<details><summary>解答005</summary>

``````python
from typing import TypedDict, List, Optional, Annotated, Dict, Any, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
import json
from uuid import uuid4
from IPython.display import Image, display
import re

# --- 0. 共通ツールの準備 (search_toolは準備セルで定義済み) ---

# --- 1. 状態定義 ---
ResearchPhase = Literal[
    "CLARIFYING_REQUEST", "PLANNING", "RESEARCHING", "ANALYZING", 
    "REPORTING", "REVIEWING", "FINALIZING", "DONE", "ERROR"
]

class CollaborativeResearchBotState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    user_original_request: str
    clarified_request: Optional[str]
    research_plan: Optional[List[Dict[str, Any]]]
    collected_data: Dict[str, Any] 
    analysis_summary: Optional[str]
    report_draft: Optional[str]
    review_feedback: Optional[str]
    final_report: Optional[str]
    current_phase: Optional[ResearchPhase]
    current_task_id_being_processed: Optional[str]
    error_message_for_chapter4: Optional[str]

# --- 2. 各エージェント/ノードの簡易的な実装 ---
def ui_agent_clarifier(state: CollaborativeResearchBotState) -> dict:
    print(f"
[UIエージェント] 元のリクエスト: {state['user_original_request']}")
    clarified = state['user_original_request']
    # if "あいまい" in clarified.lower(): clarified = clarified.replace("あいまいな", "明確な") + " (明確化済み)"
    print(f"  -> 明確化されたリクエスト (今回は変更なし): {clarified}")
    return {"clarified_request": clarified, "current_phase": "PLANNING", "messages": [AIMessage(content=f"リクエスト「{clarified}」として承りました。計画を作成します。", name="UIAgent")]}

def planning_supervisor_agent(state: CollaborativeResearchBotState) -> dict:
    req = state.get("clarified_request", state["user_original_request"])
    print(f"
[プランニングスーパーバイザー] リクエスト「{req}」の計画立案開始。")
    plan: List[Dict[str, Any]] = []
    if "LangGraph" in req and "マルチエージェント" in req and ("ベストプラクティス" in req or "レポート" in req):
        plan.append({'task_id': 'lg_ma_bp_search', 'instruction': 'LangGraphマルチエージェントのベストプラクティスに関する情報をウェブで検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]})
        plan.append({'task_id': 'lg_ma_ex_search', 'instruction': 'LangGraphマルチエージェントの具体的なコード例や応用事例を検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]})
        plan.append({'task_id': 'lg_ma_analysis', 'instruction': '上記検索結果(ベストプラクティスと事例)を統合・分析し、主要な設計パターンと利点を抽出', 'assigned_to': 'DataAnalyst', 'status': 'pending', 'result': None, 'dependencies':['lg_ma_bp_search', 'lg_ma_ex_search']})
        plan.append({'task_id': 'lg_ma_reporting', 'instruction': '分析結果に基づき、LangGraphマルチエージェントのベストプラクティスに関する簡潔なレポートを作成', 'assigned_to': 'ReportGenerator', 'status': 'pending', 'result': None, 'dependencies':['lg_ma_analysis']})
    else:
        plan.append({'task_id': 'generic_search_main', 'instruction': f'{req}についてウェブで包括的に検索', 'assigned_to': 'WebSearcher', 'status': 'pending', 'result': None, 'dependencies':[]})
        plan.append({'task_id': 'generic_report_main', 'instruction': f'{req}の検索結果を元に要点をまとめたレポートを作成', 'assigned_to': 'ReportGenerator', 'status': 'pending', 'result': None, 'dependencies':['generic_search_main']})
    print(f"  -> 生成された計画: {json.dumps(plan, ensure_ascii=False, indent=2)}")
    return {"research_plan": plan, "current_phase": "RESEARCHING", "collected_data": {}, "messages": [AIMessage(content=f"計画を立案しました: {len(plan)}ステップ。リサーチを開始します。", name="PlannerSupervisor")]}

def web_searcher_worker(state: CollaborativeResearchBotState) -> dict:
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"}
    
    print(f"
[WebSearcherワーカー] タスク「{task['instruction']}」を実行します。")
    search_query = task['instruction']
    search_result_content = f"「{search_query}」に関するウェブ検索結果: 多数の関連情報が見つかりました。主要な情報源はA, B, Cです。"
    if search_tool and search_tool.name != "dummy_search_tool" and TAVILY_API_KEY:
        try: search_result_content = str(search_tool.invoke({"query": search_query})) # Tavilyはdictを期待
        except Exception as e: search_result_content = f"検索エラー({search_query}): {e}"
    elif search_tool: search_result_content = str(search_tool.invoke(search_query))
    
    print(f"  -> 検索結果 ({task_id}): {search_result_content[:100]}...")
    updated_collected_data = state.get("collected_data", {}).copy()
    updated_collected_data[task_id] = search_result_content
    updated_plan = [p.copy() for p in plan]
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = search_result_content; break
    return {"collected_data": updated_collected_data, "research_plan": updated_plan, "messages": [AIMessage(content=f"検索タスク「{task_id}」完了。結果を保存しました。", name="WebSearcher")]}

def data_analyst_worker(state: CollaborativeResearchBotState) -> dict: 
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"}
    print(f"
[DataAnalystワーカー] タスク「{task['instruction']}」を実行します。")
    input_for_analysis = ""
    for dep_id in task.get('dependencies', []):
        input_for_analysis += f"依存データ({dep_id}): {str(state.get('collected_data',{}).get(dep_id,'データなし'))[:100]}...\n"
    analysis = f"分析結果 ({task['instruction']}):\n{input_for_analysis}上記データから、重要な洞察として「パターンX」と「傾向Y」が抽出されました。これらは相互に関連しています。"
    print(f"  -> 分析結果 ({task_id}): {analysis}")
    updated_collected_data = state.get("collected_data", {}).copy()
    updated_collected_data[task_id] = analysis 
    updated_plan = [p.copy() for p in plan]
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = analysis; break
    return {"collected_data": updated_collected_data, "research_plan": updated_plan, "analysis_summary": analysis, "messages": [AIMessage(content=f"分析タスク「{task_id}」完了。サマリーを生成しました。", name="DataAnalyst")]}

def report_generator_worker(state: CollaborativeResearchBotState) -> dict:
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    task = next((t for t in plan if t['task_id'] == task_id), None)
    if not task: return {"error_message_for_chapter4": f"タスクID {task_id} がプランに見つかりません。"}
    print(f"
[ReportGeneratorワーカー] タスク「{task['instruction']}」を実行します。")
    input_for_report = ""
    for dep_id in task.get('dependencies', []):
        input_for_report += f"参照データ({dep_id}): {str(state.get('collected_data',{}).get(dep_id,'データなし'))[:100]}...\n"
    report = f"最終レポート: {task['instruction']}\n{input_for_report}上記情報を総合的に判断し、以下に要点をまとめました。\n1. 主要な発見点A\n2. 注目すべき傾向B\n3. 今後の課題C\n以上が本件に関する報告です。"
    print(f"  -> レポートドラフト ({task_id}): {report[:150]}...")
    updated_plan = [p.copy() for p in plan]
    for p_task in updated_plan: 
        if p_task['task_id'] == task_id: p_task['status'] = 'completed'; p_task['result'] = report; break
    return {"report_draft": report, "research_plan": updated_plan, "messages": [AIMessage(content=f"レポート生成タスク「{task_id}」完了。", name="ReportGenerator")]}

def overall_supervisor_node(state: CollaborativeResearchBotState) -> dict:
    print(f"
[統括スーパーバイザー] 現在フェーズ: {state.get('current_phase')}, 直前処理タスクID: {state.get('current_task_id_being_processed')}")
    current_phase = state.get("current_phase")
    plan = state.get("research_plan", [])
    next_phase: Optional[ResearchPhase] = None
    next_task_id_to_process: Optional[str] = None
    supervisor_log = ""

    if state.get("error_message_for_chapter4"): next_phase = "ERROR"; supervisor_log = f"エラー発生: {state['error_message_for_chapter4']}"
    elif current_phase == "PLANNING": next_phase = "RESEARCHING"; supervisor_log = "計画に基づきリサーチ開始。"
    elif current_phase == "RESEARCHING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == 'WebSearcher'), None)
        if pending_task: next_task_id_to_process = pending_task['task_id']; next_phase = "RESEARCHING"; supervisor_log = f"リサーチタスク「{next_task_id_to_process}」へ。"
        else: next_phase = "ANALYZING"; supervisor_log = "全リサーチ完了。分析へ。"
    elif current_phase == "ANALYZING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == 'DataAnalyst'), None)
        if pending_task:
            deps_completed = all(state.get("collected_data",{}).get(dep_id) for dep_id in pending_task.get('dependencies',[]))
            if deps_completed: next_task_id_to_process = pending_task['task_id']; next_phase = "ANALYZING"; supervisor_log = f"分析タスク「{next_task_id_to_process}」へ。"
            else: supervisor_log = f"分析タスク「{pending_task['task_id']}」の依存未解決。待機。"
        else: next_phase = "REPORTING"; supervisor_log = "全分析完了。レポート作成へ。"
    elif current_phase == "REPORTING":
        pending_task = next((t for t in plan if t['status'] == 'pending' and t['assigned_to'] == 'ReportGenerator'), None)
        if pending_task:
            deps_completed = all(state.get("collected_data",{}).get(dep_id) for dep_id in pending_task.get('dependencies',[]))
            if deps_completed: next_task_id_to_process = pending_task['task_id']; next_phase = "REPORTING"; supervisor_log = f"レポートタスク「{next_task_id_to_process}」へ。"
            else: supervisor_log = f"レポートタスク「{pending_task['task_id']}」の依存未解決。待機。"
        else: next_phase = "DONE"; supervisor_log = "全レポート作成完了。終了。"; return {"current_phase": next_phase, "final_report": state.get("report_draft"), "messages":[AIMessage(content=supervisor_log, name="OverallSupervisor")]}
    
    if next_phase:
        print(f"  -> 統括SV判断: 次フェーズ「{next_phase}」{', タスクID「'+next_task_id_to_process+'」' if next_task_id_to_process else ''}. ログ: {supervisor_log}")
        return {"current_phase": next_phase, "current_task_id_being_processed": next_task_id_to_process, "messages": [AIMessage(content=supervisor_log, name="OverallSupervisor")]}
    
    print(f"  -> 統括SV判断: フェーズ「{current_phase}」で待機または進展なし。")
    return {"messages": [AIMessage(content=f"フェーズ「{current_phase}」で待機中。", name="OverallSupervisor")]}

# --- 3. ルーター定義 ---
def route_by_current_phase(state: CollaborativeResearchBotState) -> str:
    phase = state.get("current_phase")
    task_id = state.get("current_task_id_being_processed")
    plan = state.get("research_plan", [])
    print(f"  -> ルーター(PhaseRouter): フェーズ「{phase}」、タスクID「{task_id}」")

    if phase == "ERROR": return END # エラーなら即終了
    if phase == "DONE": return END

    # どの専門エージェントを呼び出すか決定
    if task_id:
        task_to_run = next((t for t in plan if t['task_id'] == task_id and t['status'] == 'pending'), None)
        if task_to_run:
            assigned_worker = task_to_run['assigned_to']
            if assigned_worker == "WebSearcher": return "web_searcher"
            if assigned_worker == "DataAnalyst": return "data_analyst"
            if assigned_worker == "ReportGenerator": return "report_generator"
    
    # 適切なワーカーが見つからない、またはタスクIDがない場合は統括スーパーバイザーに戻す
    return "overall_supervisor"

# --- 4. グラフ構築 ---
workflow_q5_ch4 = StateGraph(CollaborativeResearchBotState)
workflow_q5_ch4.add_node("ui_agent", ui_agent_clarifier)
workflow_q5_ch4.add_node("planner_supervisor", planning_supervisor_agent)
workflow_q5_ch4.add_node("overall_supervisor", overall_supervisor_node)
workflow_q5_ch4.add_node("web_searcher", web_searcher_worker)
workflow_q5_ch4.add_node("data_analyst", data_analyst_worker)
workflow_q5_ch4.add_node("report_generator", report_generator_worker)

workflow_q5_ch4.set_entry_point("ui_agent")
workflow_q5_ch4.add_edge("ui_agent", "planner_supervisor")
workflow_q5_ch4.add_edge("planner_supervisor", "overall_supervisor")

workflow_q5_ch4.add_conditional_edges("overall_supervisor", route_by_current_phase,
    {"web_searcher": "web_searcher", "data_analyst": "data_analyst", 
     "report_generator": "report_generator", "overall_supervisor": "overall_supervisor", END: END})

workflow_q5_ch4.add_edge("web_searcher", "overall_supervisor")
workflow_q5_ch4.add_edge("data_analyst", "overall_supervisor")
workflow_q5_ch4.add_edge("report_generator", "overall_supervisor")

graph_q5_ch4 = workflow_q5_ch4.compile()
try: display(Image(graph_q5_ch4.get_graph().draw_png()))
except Exception as e: print(f"グラフ描画失敗: {e}")

# --- 5. グラフの実行 ---
user_main_request_q5 = "LangGraphを使ったマルチエージェントシステム構築のベストプラクティスについて包括的に調査し、その結果を詳細な技術レポートとしてまとめてください。"
initial_state_q5_ch4 = {
    "messages": [HumanMessage(content=user_main_request_q5)], "user_original_request": user_main_request_q5,
    "clarified_request": None, "research_plan": None, "collected_data": {},
    "analysis_summary": None, "report_draft": None, "review_feedback": None,
    "final_report": None, "current_phase": None, 
    "current_task_id_being_processed": None, "error_message_for_chapter4": None
}
thread_q5 = {"configurable": {"thread_id": f"collab-research-bot-{uuid4()[:4]}"}}}

print(f"--- 協調型リサーチボットテスト (リクエスト: {user_main_request_q5}) ---")
final_q5_val = None
for event_idx, event in enumerate(graph_q5_ch4.stream(initial_state_q5_ch4, config=thread_q5, recursion_limit=25)):
    print(f"Event {event_idx}: {event}")
    if END in event: final_q5_val = event[END]
    print("----");

if not final_q5_val: final_q5_val = graph_q5_ch4.get_state(thread_q5).values

print("
  最終成果物:")
if final_q5_val.get("final_report"):
    print(f"    {final_q5_val['final_report']}")
elif final_q5_val.get("error_message_for_chapter4"):
    print(f"    エラー終了: {final_q5_val['error_message_for_chapter4']}")
else:
    print(f"    最終成果物がありませんでした。最終フェーズ: {final_q5_val.get('current_phase')}")
print("
  収集データ概要:")
for task_id, data_item in final_q5_val.get("collected_data", {}).items():
    print(f"    - {task_id}: {str(data_item)[:100]}...")
``````
</details>