# GraphFlow (Workflows)

このセクションでは、GraphFlow（または単に「フロー」と呼ぶこともできます）を使用してマルチエージェントワークフローを作成する方法について学びます。このワークフローは構造化された実行を採用し、エージェント間の相互作用を正確に制御してタスクを完了させます。

AutoGen AgentChat は、有向グラフの実行を管理するチームを提供します： 

GraphFlow: エージェント間の実行フローを制御するために `DiGraph` に従うチーム。**順次実行、並列実行、条件分岐、ループ実行**をサポートします。 


## GraphFlow はいつ使用すべきですか？

*エージェントの動作順序を厳密に制御する必要がある場合、または異なる結果が異なる次のステップに接続する必要がある場合に使用します。アドホックな会話フローが十分であれば、`RoundRobinGroupChat` や `SelectorGroupChat` のようなシンプルなチームから始めてください。タスクが確定的な制御、条件分岐、またはサイクルを含む複雑なマルチステッププロセスを処理する必要がある場合は、構造化されたワークフローに移行してください。*

`DiGraphBuilder` は、ワークフローの実行グラフを簡単に構築できる直感的なユーティリティです。以下の機能をサポートしています：

- シーケンシャルチェーン
- 並列ファンアウト
- 条件分岐
- 安全な終了条件付きループ

グラフ内の各ノードはエージェントを表し、エッジは許可された実行パスを定義します。エッジには、エージェントのメッセージに基づく条件をオプションで設定できます。

In [None]:
#!pip install -U "autogen-agentchat"
#!pip install "autogen-ext[mcp,openai,azure]"

In [None]:
from typing import Any, List  
import os  
from dotenv import load_dotenv  

from autogen_agentchat.agents import AssistantAgent  
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

from autogen_core import CancellationToken  
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient  
from autogen_ext.tools.mcp import SseServerParams, mcp_server_tools  
import warnings
warnings.simplefilter('ignore')

load_dotenv()

## OpenTelemetry によるトレーサーのセット
マルチエージェントのデバッグには OpenTelemetry によるトレーサーを利用すると便利。`OpenAIInstrumentor` を使用して OpenAI コールをキャプチャできます。ここではトレース UI として [Jaeger](https://www.jaegertracing.io/download/) を使用しています。

In [None]:
#!pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-openai

In [None]:
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.openai import OpenAIInstrumentor

service_name = "autogen"

# OTLPエクスポーターの設定 (gRPC経由で送信)
otlp_exporter = OTLPSpanExporter(
    endpoint="http://localhost:4317",  # JaegerのgRPCエンドポイント
)
tracer_provider = TracerProvider(resource=Resource({"service.name": service_name}))
    
# トレーサープロバイダーの設定
trace.set_tracer_provider(tracer_provider)

# バッチスパンプロセッサーを設定
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)

# トレーサーを取得
tracer = tracer_provider.get_tracer(service_name)

OpenAIInstrumentor().instrument()

In [None]:
mcp_server_uri = os.getenv("MCP_SERVER_URI")
azure_openai_key = os.getenv("AZURE_OPENAI_KEY")
azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
azure_openai_model = os.getenv("AZURE_OPENAI_MODEL")
azure_deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
openai_model_name = os.getenv("OPENAI_MODEL_NAME")

## Tools の定義(MCP over SSE)

In [None]:
server_params = SseServerParams(  
    url=mcp_server_uri,  
    headers={"Content-Type": "application/json"},  
    timeout=30  
)  

# Fetch tools (async)  
tools = await mcp_server_tools(server_params)  
print(f"Number of Tools: {len(tools)}")
# Set up the OpenAI/Azure model client  
model_client = AzureOpenAIChatCompletionClient(  
    api_key=azure_openai_key,  
    azure_endpoint=azure_openai_endpoint,  
    api_version=api_version,  
    azure_deployment=azure_deployment,  
    model=openai_model_name,  
)  

## エージェント定義

In [None]:

# 3. -----------------  Agent Definitions -----------------  
primary_agent = AssistantAgent(  
    name="primary",  
    model_client=model_client,  
    tools=tools,  
    description="役立つアシスタント。複数のツールを使用して情報を検索し、質問に回答する",
    system_message=(  
"""
あなたは役立つアシスタントです。複数のツールを使用して情報を検索し、質問に回答することができます。
利用可能なツールを確認し、必要に応じて使用してください。ユーザーが不明な点がある場合は、確認のための質問をすることもできます。
"""
)
)  

critic_agent = AssistantAgent(  
    name="critic",  
    model_client=model_client,  
    description="建設的なフィードバックを提供するデータアナリスト。主に他のエージェントの出力を評価し、改善点を提案する役割を担う。",
    tools=tools,  
    system_message=(
"""
プロのデータアナリストとして建設的なフィードバックを提供してください。フィードバックが反映された場合は 'APPROVE' と回答してください。
"""
    ),  
)  



# シーケンシャルフロー
まず、primary が利用可能なツールを実行し、critic(批評家) がフィードバックを提供するというシンプルなワークフローを作成します。このグラフは、critic(批評家) がライターにコメントした時点で終了します。フローはグラフのすべてのソース ノードとリーフ ノードを自動的に計算し、実行はグラフ内のすべてのソース ノードで開始され、実行するノードがなくなると実行が完了することに注意してください。

In [None]:
# Build the graph
builder = DiGraphBuilder()
builder.add_node(primary_agent).add_node(critic_agent)
builder.add_edge(primary_agent, critic_agent)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow([primary_agent, critic_agent], graph=graph)

In [None]:
flow._participant_names

In [None]:
with tracer.start_as_current_span("GraphFlow") as rollspan: # ルートスパンを作成
    task = "ユーザーID:123 の出荷状況を確認してください。"

    await Console(flow.run_stream(task=task))

# 結合による並列フロー
ここで、もう少し複雑なフローを作成します。

- 筆者は段落の下書きを作成します。
- 2 人の編集者が独立して文法とスタイルを編集します (並列ファンアウト)。
- 最終レビュー担当者が編集内容を統合します (結合)。

実行はwriterから始まり、 editor1とeditor2に同時に広がり、その後、両方が最終 revieworに送られます。

In [None]:
# Create the writer agent
primary = AssistantAgent(  
    name="primary",  
    model_client=model_client,  
    tools=tools,  
    description="役立つアシスタント。複数のツールを使用して情報を検索し、質問に回答する",
    system_message=(  
"""
あなたは役立つアシスタントです。複数のツールを使用して情報を検索し、質問に回答することができます。
利用可能なツールを確認し、必要に応じて使用してください。ユーザーが不明な点がある場合は、確認のための質問をすることもできます。
"""
)
)  

# Create two editor agents
reviewer1 = AssistantAgent("Reviewer1", model_client=model_client, system_message="批判的なレビュワーとして、以下の段落をレビューしてください。")

reviewer2 = AssistantAgent("Reviewer2", model_client=model_client, system_message="肯定的なレビュワーとして、以下の段落をレビューしてください。")

# Create the final reviewer agent
final_reviewer = AssistantAgent(
    "final_reviewer",
    model_client=model_client,
    system_message="文法とスタイルの修正を統合し、最終版を作成してください。",
)


In [None]:
# Build the workflow graph
builder = DiGraphBuilder()
builder.add_node(primary).add_node(reviewer1).add_node(reviewer2).add_node(final_reviewer)

# Fan-out from writer to editor1 and editor2
builder.add_edge(primary, reviewer1)
builder.add_edge(primary, reviewer2)

# Fan-in both editors into final reviewer
builder.add_edge(reviewer1, final_reviewer)
builder.add_edge(reviewer2, final_reviewer)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

flow._graph

In [None]:
flow._participant_names

In [None]:
with tracer.start_as_current_span("GraphFlow") as rollspan: # ルートスパンを作成
    # task = "ユーザーID:123 の出荷状況を確認してください。"
    task = "SNS分析を行い、日付ごとのツイート数を集計してください"
    await Console(flow.run_stream(task=task))

# メッセージフィルタリング
## 実行グラフとメッセージグラフ
`GraphFlow` では、実行グラフは `DiGraph` を使用して定義され、エージェントの実行順序を制御します。ただし、実行グラフはエージェントが他のエージェントから受け取るメッセージを制御しません。デフォルトでは、グラフ内のすべてのエージェントにすべてのメッセージが送信されます。

メッセージフィルタリングは、各エージェントが受信するメッセージをフィルタリングし、そのモデルコンテキストを関連する情報に限定する独立した機能です。メッセージフィルタのセットがフロー内のメッセージグラフを定義します。

メッセージグラフを指定することで、以下の点が改善されます：

- 幻覚の削減
- メモリ負荷の制御
- エージェントを関連する情報にのみ集中させる

これらのルールを定義するには、`MessageFilterAgent` を `MessageFilterConfig` と `PerSourceFilter` と組み合わせて使用できます。


In [None]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

# Create agents
researcher = AssistantAgent(
    "researcher", model_client=model_client, system_message="気候変動に関する主要な事実を要約してください。"
)
analyst = AssistantAgent("analyst", model_client=model_client, system_message="要約を確認し、改善点を提案してください。")
presenter = AssistantAgent(
    "presenter", model_client=model_client, system_message="最終要約に基づいてプレゼンテーションのスライドを作成してください。"
)

# Apply message filtering
filtered_analyst = MessageFilterAgent(
    name="analyst",
    wrapped_agent=analyst,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="researcher", position="last", count=1)]),
)

filtered_presenter = MessageFilterAgent(
    name="presenter",
    wrapped_agent=presenter,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="analyst", position="last", count=1)]),
)

# Build the flow
builder = DiGraphBuilder()
builder.add_node(researcher).add_node(filtered_analyst).add_node(filtered_presenter)
builder.add_edge(researcher, filtered_analyst).add_edge(filtered_analyst, filtered_presenter)
# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph
)


In [None]:
with tracer.start_as_current_span("GraphFlow") as rollspan: # ルートスパンを作成
# Run the flow
    await Console(flow.run_stream(task="気候変動に関する主要な事実を要約してください。"))

# 高度な例: 条件付きループ + フィルタリングされた要約
この例では次のことを示します。

- generator とレビュー担当者間のループ（レビュー担当者が「承認」と言ったときに終了）
- 最初のユーザー入力と最後のレビュー担当者のメッセージのみを表示する要約エージェント



In [None]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter

# Agents
generator = AssistantAgent("generator", model_client=model_client, system_message="創造的なアイデアのリストを作成してください。最初は全く関係のないアイデアを出してください(全く関係のないという意図を明らかにしてはなりません)")
reviewer = AssistantAgent(
    "reviewer",
    model_client=model_client,
    system_message="アイデアを確認し、'REVISE' と入力してフィードバックを提供するか、最終承認の場合は 'APPROVE' と入力してください。",
)
summarizer_core = AssistantAgent(
    "summary", model_client=model_client, system_message="ユーザーの要求を要約し、最終的なフィードバックをまとめます。"
)

# Filtered summarizer
filtered_summarizer = MessageFilterAgent(
    name="summary",
    wrapped_agent=summarizer_core,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="reviewer", position="last", count=1),
        ]
    ),
)

# 条件付きループを使用してグラフを構築する
builder = DiGraphBuilder()
builder.add_node(generator).add_node(reviewer).add_node(filtered_summarizer)
builder.add_edge(generator, reviewer)
builder.add_edge(reviewer, generator, condition="REVISE")
builder.add_edge(reviewer, filtered_summarizer, condition="APPROVE")
builder.set_entry_point(generator)  # ジェネレーターへのエントリポイントを設定します。ソースノードが存在しない場合に必要です。
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)
with tracer.start_as_current_span("GraphFlow") as rollspan: # ルートスパンを作成
    # フローを実行し、コンソールに出力結果を整形して表示します。
    await Console(flow.run_stream(task="プラスチック廃棄物を減らすためのアイデアを出し合おう。"))
