In [2]:
from dotenv import load_dotenv
load_dotenv()

True

In [6]:
from typing import AsyncGenerator, List, Sequence, Tuple

from autogen_agentchat.agents import BaseChatAgent, AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken

from typing import Callable, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import ChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


Custom Agents
You may have agents with behaviors that do not fall into a preset. In such cases, you can build custom agents.

All agents in AgentChat inherit from BaseChatAgent class and implement the following abstract methods and attributes:

on_messages(): The abstract method that defines the behavior of the agent in response to messages. This method is called when the agent is asked to provide a response in run(). It returns a Response object.

on_reset(): The abstract method that resets the agent to its initial state. This method is called when the agent is asked to reset itself.

produced_message_types: The list of possible ChatMessage message types the agent can produce in its response.

Optionally, you can implement the the on_messages_stream() method to stream messages as they are generated by the agent. If this method is not implemented, the agent uses the default implementation of on_messages_stream() that calls the on_messages() method and yields all messages in the response.

In [7]:
class CustomAgent(BaseChatAgent):
    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        return Response(chat_message=TextMessage(content="Custom reply", source=self.name))

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass

    @property
    def produced_message_types(self) -> Sequence[type[ChatMessage]]:
        return (TextMessage,)

In [4]:

class CountDownAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts down.")
        self._count = count

    @property
    def produced_message_types(self) -> Sequence[type[ChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        # Calls the on_messages_stream.
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
        inner_messages: List[AgentEvent | ChatMessage] = []
        for i in range(self._count, 0, -1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        # The response is returned at the end of the stream.
        # It contains the final message and all the inner messages.
        yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_countdown_agent() -> None:
    # Create a countdown agent.
    countdown_agent = CountDownAgent("countdown")

    # Run the agent with a given task and stream the response.
    async for message in countdown_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(message.chat_message.content)
        else:
            print(message.content)


# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()

3...
2...
1...
Done!


In [5]:
class ArithmeticAgent(BaseChatAgent):
    def __init__(self,name:str,description: str, operator_func: Callable[[int],[int]]) -> None:
        super().__init__(name,description=description)
        self._operator_func = operator_func
        self._message_history: List[ChatMessage] = []
    
    @property
    def produced_message_types(self) -> Sequence[type[ChatMessage]]:
        return (TextMessage,)
    
    async def on_messages(self,messages: Sequence[ChatMessage],cancellation_token: CancellationToken) -> Response:
        self._message_history.extend(messages)

        assert isinstance(self._message_history[-1],TextMessage)
        number=int(self._message_history[-1].content)   
        result = self._operator_func(number)

        response_message = TextMessage(content=str(result),source=self.name)

        self._message_history.append(response_message)

        return Response(chat_message=response_message)
    
    async def on_reset(self,cancellation_token: CancellationToken) -> None:
        self._message_history = []
        pass

    

In [10]:
async def run_number_agents() -> None:
    # Create agents for number operations.
    add_agent = ArithmeticAgent("add_agent", "Adds 2 to the number.", lambda x: x + 2)
    multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
    subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
    divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
    identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)

    # The termination condition is to stop after 10 messages.
    termination_condition = MaxMessageTermination(10)

    # Create a selector group chat.
    selector_group_chat = SelectorGroupChat(
        [add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
        model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
        termination_condition=termination_condition,
        allow_repeated_speaker=True,  # Allow the same agent to speak multiple times, necessary for this task.
        selector_prompt=(
            "Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
            "Current conversation history:\n{history}\n"
            "Please select the most appropriate role for the next message, and only return the role name."
        ),
    )

    # Run the selector group chat with a given task and stream the response.
    task: List[ChatMessage] = [
        TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
        TextMessage(content="10", source="user"),
    ]
    stream = selector_group_chat.run_stream(task=task)
    await Console(stream)


# Use asyncio.run(run_number_agents()) when running in a script.
await run_number_agents()

---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- multiply_agent ----------
20
---------- add_agent ----------
22
---------- add_agent ----------
24
---------- add_agent ----------
26
---------- subtract_agent ----------
25
---------- identity_agent ----------
25
---------- identity_agent ----------
25
---------- identity_agent ----------
25


In [7]:
#Custom Response Type
from typing import Literal

from pydantic import BaseModel


# The response format for the agent as a Pydantic base model.
class AgentResponse(BaseModel):
    thoughts: str
    response: Literal["happy", "sad", "neutral"]


# Create an agent that uses the OpenAI GPT-4o model with the custom response format.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o-mini",
    response_format=AgentResponse,  # type: ignore
)
agent = AssistantAgent(
    "assistant",
    model_client=model_client,
    system_message="Categorize the input as happy, sad, or neutral following the JSON format.",
)

await Console(agent.run_stream(task="I am happy."))

---------- user ----------
I am happy.
---------- assistant ----------
{"thoughts":"The user expressed a positive emotion by stating they are happy, indicating a joyful or content state.","response":"happy"}


TaskResult(messages=[TextMessage(source='user', models_usage=None, content='I am happy.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=89, completion_tokens=28), content='{"thoughts":"The user expressed a positive emotion by stating they are happy, indicating a joyful or content state.","response":"happy"}', type='TextMessage')], stop_reason=None)