# Magentic-One

Magentic-One は Microsoft Research が開発した汎用マルチエージェントシステムです。
Magentic-One の Orchestrator エージェントは、計画を作成し、他のエージェントにタスクを委任し、目標に向けた進捗状況を追跡して、必要に応じて計画を動的に修正します。Orchestrator は、ファイルの読み取りと処理を行う FileSurfer エージェント、Web ブラウザーを操作する WebSurfer エージェント、またはコードの記述や実行を行う Coder エージェントまたは Computer Terminal エージェントにタスクを委任できます。

https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/magentic-one.html

In [None]:
#!pip install -U "autogen-agentchat"
#!pip install "autogen-ext[magentic-one,openai]"
#!playwright install --with-deps chromium

In [1]:
from typing import Any, Dict, List

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.agents.web_surfer import MultimodalWebSurfer
from autogen_ext.agents.magentic_one import MagenticOneCoderAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import ExternalTermination, TextMentionTermination
from autogen_agentchat.teams import MagenticOneGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination, TimeoutTermination

## OpenTelemetry によるトレーサーのセット
マルチエージェントのデバッグには OpenTelemetry によるトレーサーを利用すると便利。`OpenAIInstrumentor` を使用して OpenAI コールをキャプチャできます。ここではトレース UI として [Jaeger](https://www.jaegertracing.io/download/) を使用しています。

In [None]:
#!pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-openai

In [2]:
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.openai import OpenAIInstrumentor

service_name = "autogen"

# OTLPエクスポーターの設定 (gRPC経由で送信)
otlp_exporter = OTLPSpanExporter(
    endpoint="http://localhost:4317",  # JaegerのgRPCエンドポイント
)
tracer_provider = TracerProvider(resource=Resource({"service.name": service_name}))
    
# トレーサープロバイダーの設定
trace.set_tracer_provider(tracer_provider)

# バッチスパンプロセッサーを設定
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)

# トレーサーを取得
tracer = tracer_provider.get_tracer(service_name)

OpenAIInstrumentor().instrument()

# Tools の定義
- search_website: Bing Search API を利用して Web検索を実行

In [3]:

async def search_website(query: str) -> str:
    """
    Perform a Bing search for a given query and return the top snippets.

    This function uses the Bing Search V7 API to send a search request with the provided query. It retrieves 
    the top three web page snippets from the search results. The search is performed in Japanese ('jp-JP').

    :param query: The search query string to search for on the web.
    :type query: str
    :return: A list of the top three web page snippets, or an error message if the request fails.
    :rtype: str
    """
    import os,re
    import requests

    # Add your Bing Search V7 subscription key and endpoint to your environment variables.
    subscription_key = os.environ['BING_SEARCH_V7_SUBSCRIPTION_KEY']
    endpoint = os.environ['BING_SEARCH_V7_ENDPOINT']

    # Construct a request
    mkt = 'jp-JP'
    params = { 'q': query, 'mkt': mkt  ,"textDecorations": True, "textFormat": "Raw", "count": 3}
    headers = { 'Ocp-Apim-Subscription-Key': subscription_key }
    # トレーススパンの作成
    with tracer.start_as_current_span("search_website") as span:
        span.set_attribute("query", query)  # クエリを記録

        try:
            response = requests.get(endpoint, headers=headers, params=params)
            response.raise_for_status()

            jsonres = response.json()
            snippets = [item['snippet'] for item in jsonres['webPages']['value']]

            unicode_pattern = r"\\u[0-9a-fA-F]{4}"
            cleaned_text = re.sub(unicode_pattern, "", str(snippets))
            span.set_attribute("result", cleaned_text)  # 結果を記録
            return cleaned_text
        
        except requests.exceptions.Timeout:
            error_message = "Error: Request timed out"
            span.record_exception(Exception(error_message))  # 例外を記録
            return error_message

        except requests.exceptions.ConnectionError:
            error_message = "Error: Failed to connect to the website"
            span.record_exception(Exception(error_message))
            return error_message

        except requests.exceptions.HTTPError as e:
            error_message = f"Error: HTTP {e.response.status_code} - {e.response.reason}"
            span.record_exception(e)
            return error_message

        except Exception as e:
            error_message = f"Error: {str(e)}"
            span.record_exception(e)
            return error_message

## エージェント定義

In [34]:

client = AzureOpenAIChatCompletionClient(
    azure_deployment="<Your_AzureOpenAI_deployment>",
    model="gpt-4o",
    api_key="<Your_AzureOpenAI_key>",
    api_version="2024-08-01-preview",
    azure_endpoint="<Your_AzureOpenAI_endpoint>",
)

travel_agent = AssistantAgent(
      "travel_agent",
      description="あなたは総合トラベルエージェントです",
      model_client=client,
    #tools=[search_website], #WebSurferを使用しない場合
    reflect_on_tool_use=True,
    system_message="""あなたは優秀な総合トラベルエージェントです。
    地方のトラベルエージェントが回答した情報をもとに、最終的な回答を作成します。
    最終回答が完成したら調査結果を要約し、文の最後に TERMINATE を含めること!
    """,
)

surfer = MultimodalWebSurfer(
    "WebSurfer",
    model_client=client,
    to_save_screenshots=True,
    browser_data_dir="./browser_data",
    debug_dir="./debug",
    downloads_folder="./downloads",
)

corder = MagenticOneCoderAgent(
    "Corder",
    model_client=client
)


# Magentic-One の実行

In [None]:
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination, TimeoutTermination
# Define termination condition
max_msg_termination = MaxMessageTermination(max_messages=10)
text_termination = TextMentionTermination("TERMINATE")
time_terminarion = TimeoutTermination(120)
combined_termination = max_msg_termination | text_termination

with tracer.start_as_current_span("magenticone") as rollspan:

    team = MagenticOneGroupChat([travel_agent, surfer], model_client=client, termination_condition=combined_termination, max_turns=10)

    task = """
1週間のドバイ海外旅行で、おすすめの観光地とホテルを教えてください。
"""
    await Console(team.run_stream(task=task))

# 状態管理
AutoGen では状態管理の仕組みが備わっています。エージェント、チーム、終了条件の状態を保存および読み込むことができます。多くの場合、これらのコンポーネントの状態をディスクに保存し、後で再度読み込むと便利です。


## 状態の保存

In [None]:
agent_state = await team.save_state()
print(agent_state)

import json
with open("./magenticone_team_state.json", "w") as f:
    json.dump(agent_state, f)

## 状態のロード

In [131]:
with tracer.start_as_current_span("magenticone") as rollspan:
    ## load state from disk
    with open("./magenticone_team_state.json", "r") as f:
        team_state = json.load(f)

    new_agent_team = MagenticOneGroupChat([travel_agent, surfer], model_client=client, termination_condition=combined_termination, max_turns=10)

    await new_agent_team.load_state(team_state)
    stream = new_agent_team.run_stream(task="さっきのホテルって何て言ったっけ？")
    await Console(stream)