In [25]:
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_core import AgentId,MessageContext,RoutedAgent,message_handler,SingleThreadedAgentRuntime
from dataclasses import dataclass
import os
from dotenv import load_dotenv

In [26]:
load_dotenv(override=True)
gemini_model  = "gemini-2.0-flash"
gemini_api_key = os.getenv("GEMINI_API_KEY")
gemini_client = OpenAIChatCompletionClient(model=gemini_model,api_key=gemini_api_key)

In [27]:
@dataclass
class Message:
    content :str

In [28]:
class simple_agent(RoutedAgent):
    def __init__(self):
        super().__init__("Simple")
    @message_handler
    async def on_my_message(self,message:Message,ctx:MessageContext)->Message:
        return Message(content=f"This is {self.id.type}-{self.id.key}. You said '{message.content}' and I disagree.")
    

In [29]:
runtime = SingleThreadedAgentRuntime()
await simple_agent.register(runtime,"simple_agent",lambda:simple_agent())


AgentType(type='simple_agent')

In [30]:
runtime.start()

In [31]:
Agent_id = AgentId("simple_agent","default")
response = await runtime.send_message(Message("Hi there"),Agent_id)
response.content

"This is simple_agent-default. You said 'Hi there' and I disagree."

In [32]:
await runtime.stop()
await runtime.close()

In [33]:
class llm_agent(RoutedAgent):
    def __init__(self):
        super().__init__("LLMAgent")
        self._delegate=AssistantAgent("LLMAgent",model_client=gemini_client)
    
    @message_handler
    async def handle_my_message(self,message:Message,ctx:MessageContext)->Message:
        print(f"{self.id.type} received message: {message.content}")
        text_message = TextMessage(content=message.content,source="user")
        response = await self._delegate.on_messages(messages=[text_message],cancellation_token=ctx.cancellation_token)
        reply = response.chat_message.content
        print(reply)
        return Message(content=reply)



In [34]:
runtime = SingleThreadedAgentRuntime()
await simple_agent.register(runtime,"SimpleAgent",lambda:simple_agent())
await llm_agent.register(runtime,"LLMAgent",lambda:llm_agent())


AgentType(type='LLMAgent')

In [35]:
runtime.start()
response = await runtime.send_message(Message("Hi there"),AgentId("SimpleAgent","default"))
response = await runtime.send_message(Message(response.content),AgentId("LLMAgent","default"))
response = await runtime.send_message(Message(response.content),AgentId("LLMAgent","default"))


LLMAgent received message: This is SimpleAgent-default. You said 'Hi there' and I disagree.
Okay, I understand. I will refrain from saying "Hi there" in the future. How can I assist you now?
TERMINATE

LLMAgent received message: Okay, I understand. I will refrain from saying "Hi there" in the future. How can I assist you now?
TERMINATE

Understood. I will not say "Hi there." How can I help you today?
TERMINATE



In [36]:
await runtime.stop()
await runtime.close()

In [37]:
class Player1(RoutedAgent):
    def __init__(self,name:str)->None:
        super().__init__(name)
        self._delegate = AssistantAgent(name,model_client=gemini_client)
    
    @message_handler
    async def handle_message(self,message:Message,ctx:MessageContext)->Message:
        text_message = TextMessage(content=message.content,source="user")
        response = await self._delegate.on_messages([text_message],ctx.cancellation_token)
        return Message(content=response.chat_message.content)
    
class Player2(RoutedAgent):
    def __init__(self,name:str)->None:
        super().__init__(name)
        self._delegate = AssistantAgent(name,model_client=gemini_client)
    
    @message_handler
    async def handle_message(self,message:Message,ctx:MessageContext)->Message:
        text_message = TextMessage(content=message.content,source="user")
        response = await self._delegate.on_messages([text_message],ctx.cancellation_token)
        return Message(content=response.chat_message.content)


In [38]:
JUDGE = "You are judging a game of rock, paper, scissors. The players have made these choices:\n"

class rock_paper_scissers(RoutedAgent):
    def __init__(self,name:str)->None:
        super().__init__(name)
        self._delegate =AssistantAgent(name,model_client=gemini_client)
    
    @message_handler
    async def game_handler(self,message:Message,ctx:MessageContext)->Message:
        instruction = "You are playing rock, paper, scissors. Respond only with the one word, one of the following: rock, paper, or scissors."
        message = Message(content=instruction)
        inner_1 = AgentId("Player1", "default")
        inner_2 = AgentId("Player2", "default")
        response1 = await self.send_message(message, inner_1)
        response2 = await self.send_message(message, inner_2)
        result = f"Player 1: {response1.content}\nPlayer 2: {response2.content}\n"
        judgement = f"{JUDGE}{result}Who wins?"
        message = TextMessage(content=judgement, source="user")
        response = await self._delegate.on_messages([message], ctx.cancellation_token)
        return Message(content=result + response.chat_message.content)

In [41]:
runtime = SingleThreadedAgentRuntime()
await Player1.register(runtime,"Player1",lambda:Player1("Player1"))
await Player2.register(runtime,"Player2",lambda:Player2("Player2"))
await rock_paper_scissers.register(runtime,"rock_paper_sci",lambda:rock_paper_scissers("rock_paper_sci"))
runtime.start()

In [44]:
agent_id = AgentId("rock_paper_sci","default")
message = Message(content="go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1: scissors
TERMINATE

Player 2: scissors
TERMINATE

It's a tie.
TERMINATE

