# AutoGen: Multi-Agent Orchestration

## Setup

In [None]:
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent, UserProxyAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import (
    ExternalTermination,
    HandoffTermination,
    MaxMessageTermination,
    TextMentionTermination,
)
from autogen_agentchat.messages import (
    AgentEvent,
    ChatMessage,
    HandoffMessage,
    MultiModalMessage,
    TextMessage,
)
from autogen_agentchat.teams import (
    MagenticOneGroupChat,
    RoundRobinGroupChat,
    SelectorGroupChat,
    Swarm,
)
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_core import Image as AGImage
from autogen_core.code_executor import CodeBlock
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.tools import FunctionTool
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.openai import (
    AzureOpenAIChatCompletionClient,
    OpenAIChatCompletionClient,
)
from autogen_ext.tools import semantic_kernel
from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

## Connection

In [None]:
# api token auth
# model connection
model_client = AzureOpenAIChatCompletionClient(
    model="gpt-4o-mini",
    api_version="2024-10-21",
    azure_endpoint="https://XXXXXXXXXXXXXXXXXXXXXXXXX.openai.azure.com",
    api_key="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
)

# azure token auth
# need to have "Cog Services OpenAI User" role assinged in cog services resource
token_provider = get_bearer_token_provider(
    DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
az_model_client = AzureOpenAIChatCompletionClient(
    azure_deployment="gpt-4o-mini",
    model="gpt-4o-mini",
    api_version="2024-10-21",
    azure_endpoint="https://XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.openai.azure.com/",
    azure_ad_token_provider=token_provider,
)

## Tools

In [None]:
# tools
async def get_weather(city: str) -> str:
    """Get the weather/temperature for a given city."""
    if city.upper() == "CHICAGO":
        temp = 12
    elif city.upper() == "NEW YORK":
        temp = 43
    elif city.upper() == "NOTRE DAME":
        temp = 1842
    else:
        temp = 79
    return f"The weather in {city} is {temp} degrees."


async def web_search(query: str) -> str:
    """Find information on the web"""
    if "notre dame" in query.lower():
        return "Notre Dame is the best college in the United States"
    else:
        return "no information found"

In [None]:
web_search_function_tool = FunctionTool(
    web_search, description="Find information on the web"
)
web_search_function_tool.schema

## Assistant Agent

In [None]:
# define agent
# has built-in menory/history, maintains state on method calls
agent = AssistantAgent(
    name="question_agent",
    model_client=model_client,
    tools=[get_weather, web_search],
    system_message="""
    Use tools to solve tasks 
    1. talk in a pirate accent
    2. answer in complete sentences 
    """,
    reflect_on_tool_use=True,
    model_client_stream=True,
)

In [None]:
# to limit history, use model_context
# model_context=BufferedChatCompletionContext(buffer_size=5)

## Messages

In [None]:
# message agent
# same as agent run()
response = await agent.on_messages(
    [TextMessage(content="Tell me about Notre Dame", source="user")],
    cancellation_token=CancellationToken(),
)
print(response.inner_messages)
print(response.chat_message)

In [None]:
# stream message
# same as agent.run_stream()
response = await Console(
    agent.on_messages_stream(
        [TextMessage(content="Tell me about Notre Dame", source="user")],
        cancellation_token=CancellationToken(),
    ),
    output_stats=True,
)

In [None]:
# multi-modal messages
# img = AGImage(pil_image)
# multi_modal_message = MultiModalMessage(content=["Can you describe the content of this image?", img], source="User")

## State

In [None]:
agent_state = await agent.save_state()
print(agent_state)
await agent.load_state(agent_state)

## Pre-Built Agents

In [None]:
# UserProxyAgent: An agent that takes user input returns it as responses.
# CodeExecutorAgent: An agent that can execute code.
# MultimodalWebSurfer: A multi-modal agent that can search the web and visit web pages for information.
# FileSurfer: An agent that can search and browse local files for information.
# VideoSurfer: An agent that can watch videos for information.

In [None]:
# code executor
# note: need to install docker on host system
code_executor = DockerCommandLineCodeExecutor(work_dir="coding")
code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor)
await code_executor.start()
task = TextMessage(
    content="""Here is some code
```python
x=2+2
print(x)
```
""",
    source="user",
)
response = await code_executor_agent.on_messages([task], CancellationToken())
await code_executor.stop()
print(response)
print(response.chat_message.content)

## Teams: RoundRobin Group Chat 

In [None]:
# define agents
# agent 1
researcher_agent = AssistantAgent(
    "researcher",
    model_client=model_client,
    system_message="""
    You are a helpful AI assistant for conducting scientific research. 
    You are an expert in biology and chemistry. 
    """,
)
# agent 2
critic_agent = AssistantAgent(
    "critic",
    model_client=model_client,
    system_message="""
    Provide constructive feedback to the researcher.
    You are an expert in biology and chemistry. 
    Respond with 'APPROVE' to when your feedbacks are addressed. 
    Do not approve the first version you read, review the revision(s) and confrim it has been improved per your reccomendations
    """,
)

In [None]:
# termination
text_termination = TextMentionTermination("APPROVE")
# other options:
# messages -> MaxMessageTermination
# tokens -> TokenUsageTermination
# timeout -> TimeoutTermination
# stop message -> StopMessageTermination
# tool call -> FunctionCallTermination
# external control -> ExternalTermination
# - run = asyncio.create_task(Console(team.run_stream(task="...")))
# - external_termination = ExternalTermination()
# - use external_termination.set() to stop async run
# - use cancellation_token.cancel() to cancel async run

In [None]:
# define team
team = RoundRobinGroupChat(
    participants=[researcher_agent, critic_agent],
    termination_condition=text_termination,
    max_turns=10,
)

In [None]:
# run team
result = await team.run(task="Write a summary about the central dogma of biology")

In [None]:
# messages
for each_message in result.messages:
    print(each_message, end="\n\n")

In [None]:
# stream messages
await team.reset()
async for message in team.run_stream(
    task="Write a summary about the central dogma of biology"
):
    if isinstance(message, TaskResult):
        print("Stop Reason:", message.stop_reason)
    else:
        print(message)

In [None]:
# console printing
await team.reset()
reponse = await Console(
    team.run_stream(task="Write a summary about the central dogma of biology"),
    output_stats=True,
)

## Teams: Selector Group Chat

In [None]:
# note: for reasoning models (i.e. o3-mini), you don't need a planning agent

### tools

In [None]:
def bank_balance_tool(year: str) -> str:
    """
    Tool that looks up bank balance in US Dollars for a year (4-digits, i.e. 1999)
    """
    if year == "2020":
        return f"Your bank balance was $1,000 in {year}"
    elif year == "2021":
        return f"Your bank balance was $5,000 in {year}"
    elif year == "2022":
        return f"Your bank balance was $10,000 in {year}"
    elif year == "2023":
        return f"Your bank balance was $100,000 in {year}"
    elif year == "2024":
        return f"Your bank balance was $200,000 in {year}"
    elif year == "2025":
        return f"Your bank balance is $500,000 in {year}, wow!"
    else:
        return f"No Data for {year}"


def bank_credit_apply_tool(balance: float | int) -> str:
    """
    Tools that uses a customer's bank balance to determine if they are approved for a line of credit
    """
    if balance >= 100_000:
        return "Approved"
    else:
        return "Not Approved"


def average_tool(list_of_numbers: list[int | float]) -> float:
    """
    Tool that calcualtes the average (mean) of an array of numbers
    """
    n = len(list_of_numbers)
    sum_of_numbers = sum(list_of_numbers)
    return sum_of_numbers / n

### agents

In [None]:
# planning agent
planning_agent = AssistantAgent(
    "PlanningAgent",
    description="An agent for planning tasks, this agent should be the first to engage when given a new task.",
    model_client=model_client,
    system_message="""
    You are a planning agent.
    Your job is to break down complex tasks into smaller, manageable subtasks.
    
    Your team members are:
        BankLookupAgent: Searches for bank balance information
        DataAnalystAgent: Performs calculations
        
    You only plan and delegate tasks - you do not execute them yourself.
    
    When assigning tasks, use this format:
    1. <agent> : <task>
    
    After all tasks are completed by other agents, summarize the findings and end with "TERMINATE".
    """,
)

# agent 2
bank_agent = AssistantAgent(
    "BankAgent",
    description="An agent for getting bank information and applying for credit",
    tools=[bank_balance_tool, bank_credit_apply_tool],
    model_client=model_client,
    system_message="""
    You are a bank agent.
    Your tools are 
        1. bank_balance_tool  - use it to find the customer's bank balance for a given year
        2. bank_credit_apply_tool - use it to apply for credit
    You make only one search call at a time.
    Once you have the results, do not do any calculations with them.
    """,
)

# agent 3
data_analyst_agent = AssistantAgent(
    "DataAnalystAgent",
    description="An agent for performing calculations",
    model_client=model_client,
    tools=[average_tool],
    system_message="""
    You are a data analyst.
    Given the tasks you have been assigned, you should analyze the data and provide results using the tools provided.
    If you do not have the data needed, ask the other agents for it and then try again. 
    """,
)

# termination conditions
text_mention_termination = TextMentionTermination("TERMINATE")
max_messages_termination = MaxMessageTermination(max_messages=15)
termination = text_mention_termination | max_messages_termination

## team

In [None]:
selector_prompt = """
Select an agent to perform task.
{roles}
Current conversation context:
{history}
Read the above conversation, then select an agent from {participants} to perform the next task.
Make sure the planner agent has assigned tasks before other agents start working.
Only select one agent.
"""

In [None]:
team = SelectorGroupChat(
    [planning_agent, bank_agent, data_analyst_agent],
    model_client=model_client,
    termination_condition=termination,
    selector_prompt=selector_prompt,
    allow_repeated_speaker=True,
)

### run

In [None]:
task = """
Instructions:
Determine my average bank balance for the years 2019-2025, and then check if the bank will aprove a line of credit for me
Note: If a year has no bank blaance data, then ignore that year, do not use $0 for that year
"""
await Console(team.run_stream(task=task))

In [None]:
# note: use selector_func param to customize selection process/sequence
def selector_func(messages) -> str | None:
    if messages[-1].source != planning_agent.name:
        return planning_agent.name
    return None


team = SelectorGroupChat(
    [planning_agent, bank_agent, data_analyst_agent],
    model_client=model_client,
    termination_condition=termination,
    selector_prompt=selector_prompt,
    allow_repeated_speaker=True,
)

await Console(team.run_stream(task=task))

## Teams: Swarm 

In [None]:
# note: swam does not use a "planner agent"
# agent 1
bank_agent = AssistantAgent(
    "BankAgent",
    description="An agent for getting bank information and applying for credit",
    tools=[bank_balance_tool, bank_credit_apply_tool],
    model_client=model_client,
    handoffs=["data_analyst_agent", "user"],
    system_message="""
    You are a bank agent.
    Your tools are 
        1. bank_balance_tool  - use it to find the customer's bank balance for a given year
        2. bank_credit_apply_tool - use it to apply for credit
    You make only one search call at a time.
    Once you have the results, do not do any calculations with them.
    """,
)

# agent 2
data_analyst_agent = AssistantAgent(
    "DataAnalystAgent",
    description="An agent for performing calculations",
    model_client=model_client,
    handoffs=["bank_agent", "user"],
    tools=[average_tool],
    system_message="""
    You are a data analyst.
    Given the tasks you have been assigned, you should analyze the data and provide results using the tools provided.
    If you do not have the data needed, ask the other agents for it and then try again. 
    """,
)

# termination
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm([bank_agent, data_analyst_agent], termination_condition=termination)

In [None]:
task = """
Instructions:
Determine my average bank balance for the years 20024-2025, and then check if the bank will aprove a line of credit for me
"""

task_result = await Console(team.run_stream(task=task))
last_message = task_result.messages[-1]

while isinstance(last_message, HandoffMessage) and last_message.target == "user":
    user_message = input("User: ")

    task_result = await Console(
        team.run_stream(
            task=HandoffMessage(
                source="user", target=last_message.source, content=user_message
            )
        )
    )
    last_message = task_result.messages[-1]