In [1]:
from dataclasses import dataclass
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from autogen_core import SingleThreadedAgentRuntime
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from dotenv import load_dotenv

load_dotenv(override=True)

True

In [2]:
@dataclass 
class Message:
    content : str

In [3]:
class SimpleAgent(RoutedAgent):
    def __init__(self)->None:
        super().__init__("Simple")

    @message_handler
    async def on_my_message(self, message : Message, ctx: MessageContext) -> Message:
        return Message(content = f"This is {self.id.type}-{self.id.key}. You said '{message.content}' and I disagee.")

Creating a Single Threaded Standalone Runtime

In [4]:
runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())

AgentType(type='simple_agent')

In [5]:
runtime.start()

In [6]:
agent_id = AgentId("simple_agent", "default")
response = await runtime.send_message(Message("Well, hi there!"), agent_id)
print(">>>", response.content)

>>> This is simple_agent-default. You said 'Well, hi there!' and I disagee.


In [7]:
await runtime.stop()
await runtime.close()

Using AgentChat Assistant

In [8]:
class MyLLMAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("LLMAgent")
        model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
        self._delegate = AssistantAgent("LLMAgent", model_client=model_client)

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        print(f"{self.id.type} received message: {message.content}")
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], ctx.cancellation_token)
        reply = response.chat_message.content
        print(f"{self.id.type} responded: {reply}")
        return Message(content=reply)

In [9]:
from autogen_core import SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())
await MyLLMAgent.register(runtime, "LLMAgent", lambda: MyLLMAgent())

AgentType(type='LLMAgent')

In [10]:
runtime.start()
response = await runtime.send_message(Message("Hi there!"), AgentId("LLMAgent", "default"))
print(">>>", response.content)
response = await runtime.send_message(Message(response.content), AgentId("simple_agent", "default"))
print(">>>", response.content)
response = await runtime.send_message(Message(response.content), AgentId("LLMAgent", "default"))

LLMAgent received message: Hi there!
LLMAgent responded: Hello! How can I assist you today?
>>> Hello! How can I assist you today?
>>> This is simple_agent-default. You said 'Hello! How can I assist you today?' and I disagee.
LLMAgent received message: This is simple_agent-default. You said 'Hello! How can I assist you today?' and I disagee.
LLMAgent responded: I appreciate your feedback! How can I better assist you today?


In [11]:
await runtime.stop()
await runtime.close()

Create Rock Paper Scissor with three Agents

In [13]:
class Player1Agent(RoutedAgent):
    def __init__(self, name:str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model = "gpt-4o-mini", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client=model_client)
    
    @message_handler
    async def handle_my_message_type(self, message : Message, ctx : MessageContext) -> Message:
        text_message = TextMessage(content = message.content, source = "user")
        response = await self._delegate.on_messages([text_message], ctx.cancellation_token)
        return Message(content=response.chat_message.content)
    
class Player2Agent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model = "gpt-4", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client=model_client)
    
    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content = message.content, source = "user")
        response = await self._delegate.on_messages([text_message], ctx.cancellation_token)
        return Message(content=response.chat_message.content)

In [15]:
JUDGE = "You are jusging a game of Rock, Paper and Scissors. The players have made these choices: \n"

class RockPaperScissorAgent(RoutedAgent):
    def __init__(self, name : str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model = "gpt-4o-mini", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client = model_client)

    @message_handler
    async def handle_my_message_type(self, message : Message, ctx : MessageContext) -> Message:
        instruction = "You are playing Rock, Paper and Scissors. Respond with only one word out of : Rock, Paper and Scissor"
        message = Message(content=instruction)
        inner1 = AgentId("player1", "default")
        inner2 = AgentId("player2", "default")
        response1 = await self.send_message(message, inner1)
        response2 = await self.send_message(message, inner2)
        result = f"Player 1 : {response1.content}\n Player 2 : {response2.content}\n"
        judgement = f"{JUDGE}{result} Who wins?"
        message = TextMessage(content = judgement, source = "user")
        response = await self._delegate.on_messages([message], ctx.cancellation_token)
        return Message(content=result + response.chat_message.content)

In [16]:
runtime = SingleThreadedAgentRuntime()
await Player1Agent.register(runtime, "player1", lambda : Player1Agent("player1"))
await Player2Agent.register(runtime, "player2", lambda : Player2Agent("player2"))
await RockPaperScissorAgent.register(runtime, "rock_paper_scissors", lambda : RockPaperScissorAgent("rock_paper_scissors"))
runtime.start()

In [17]:
agent_id = AgentId("rock_paper_scissors", "default")
message = Message(content = "go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1 : Rock
 Player 2 : Paper
Player 2 wins as Paper covers Rock. TERMINATE


In [18]:
await runtime.stop()
await runtime.close()