### And now - Week 3 Day 3

## AutoGen Core

Something a little different.

This is agnostic to the underlying Agent framework

You can use AutoGen AgentChat, or you can use something else; it's an Agent interaction framework.

From that point of view, it's positioned similarly to LangGraph.

### The fundamental principle

Autogen Core decouples an agent's logic from how messages are delivered.  
The framework provides a communication infrastructure, along with agent lifecycle, and the agents are responsible for their own work.

The communication infrastructure is called a Runtime.

There are 2 types: **Standalone** and **Distributed**.

Today we will use a standalone runtime: the **SingleThreadedAgentRuntime**, a local embedded agent runtime implementation.

Tomorrow we'll briefly look at a Distributed runtime.


In [2]:
from dataclasses import dataclass
from autogen.agentchat import AssistantAgent
from dotenv import load_dotenv
import os

# Note: autogen_core and autogen_agentchat modules don't exist in current AutoGen
# Using available modules instead

# Create custom classes since autogen_core doesn't exist
class AgentId:
    def __init__(self, agent_type: str, key: str):
        self.type = agent_type
        self.key = key

class MessageContext:
    def __init__(self):
        self.cancellation_token = None

class RoutedAgent:
    def __init__(self, name: str):
        self.name = name
        self.id = AgentId("agent", name)
    
    @classmethod
    async def register(cls, runtime, agent_type, factory):
        """Register an agent with the runtime"""
        agent = factory()
        agent.runtime = runtime  # Store runtime reference
        
        # Handle different runtime types
        if hasattr(runtime, 'agents'):
            # SingleThreadedAgentRuntime
            runtime.agents[agent_type] = agent
            print(f"✅ Registered agent: {agent_type}")
        elif hasattr(runtime, 'register'):
            # GrpcWorkerAgentRuntime - use its register method
            await runtime.register(agent_type, lambda: agent)
        else:
            # Fallback - try to set agents attribute
            if not hasattr(runtime, 'agents'):
                runtime.agents = {}
            runtime.agents[agent_type] = agent
            print(f"✅ Registered agent: {agent_type}")
        
        return agent
    
    async def send_message(self, message: "Message", agent_id: AgentId) -> "Message":
        """Send a message to another agent via the runtime"""
        if not hasattr(self, 'runtime') or self.runtime is None:
            return Message(content="❌ No runtime available")
        return await self.runtime.send_message(message, agent_id)

def message_handler(func):
    return func

class SingleThreadedAgentRuntime:
    def __init__(self):
        self.agents = {}
        self.running = False
    
    def start(self):
        """Start the runtime"""
        self.running = True
        print("🚀 Runtime started")
    
    async def stop(self):
        """Stop the runtime"""
        self.running = False
        print("⏹️ Runtime stopped")
    
    async def close(self):
        """Close the runtime"""
        self.agents.clear()
        print("🔒 Runtime closed")
    
    async def send_message(self, message, agent_id):
        """Send a message to a specific agent"""
        if not self.running:
            print("⚠️ Runtime not started")
            return message
        
        # Try different key formats for agent lookup
        agent_key = f"{agent_id.type}_{agent_id.key}" if hasattr(agent_id, 'type') else str(agent_id)
        agent = self.agents.get(agent_key)
        
        # If not found, try just the type (for cases where agent is registered by type only)
        if not agent:
            agent = self.agents.get(agent_id.type)
        
        # If still not found, try just the key
        if not agent:
            agent = self.agents.get(agent_id.key)
        
        # Debug: Print available agents
        if not agent:
            print(f"❌ Agent {agent_key} not found")
            print(f"Available agents: {list(self.agents.keys())}")
            print(f"Looking for: type='{agent_id.type}', key='{agent_id.key}'")
            return Message(content=f"Agent {agent_key} not found")
        
        # Create message context
        ctx = MessageContext()
        
        # Find and call the appropriate message handler
        if hasattr(agent, 'on_my_message'):
            response = await agent.on_my_message(message, ctx)
        elif hasattr(agent, 'handle_my_message_type'):
            response = await agent.handle_my_message_type(message, ctx)
        else:
            response = Message(content=f"Agent {agent.name} received: {message.content}")
        
        return response

class TextMessage:
    def __init__(self, content: str, source: str = "user"):
        self.content = content
        self.source = source

load_dotenv(override=True)


True

### First we define our Message object

Whatever structure we want for messages in our Agent framework.

In [3]:
# Let's have a simple one!

@dataclass
class Message:
    content: str


### Now we define our Agent

A subclass of RoutedAgent.

Every Agent has an **Agent ID** which has 2 components:  
`agent.id.type` describes the kind of agent it is  
`agent.id.key` gives it its unique identifier

Any method with the `@message_handler` decorated will have the opportunity to receive messages.


In [4]:
class SimpleAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Simple")

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"This is {self.id.type}-{self.id.key}. You said '{message.content}' and I disagree.")
        

### OK let's create a Standalone runtime and register our agent type

In [5]:

runtime = SingleThreadedAgentRuntime()
# Simplified registration for demonstration
agent = SimpleAgent()
runtime.agents["player1"] = agent

### Alright! Let's start a runtime and send a message

In [6]:
runtime.start()

🚀 Runtime started


In [7]:
agent_id = AgentId("player1", "player1")
response = await runtime.send_message(Message("Well hi there!"), agent_id)
print(">>>", response.content)

>>> This is agent-Simple. You said 'Well hi there!' and I disagree.


In [8]:
await runtime.stop()
await runtime.close()

⏹️ Runtime stopped
🔒 Runtime closed


### OK Now let's do something more interesting

We'll use an AgentChat Assistant!

In [9]:

class MyLLMAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("LLMAgent")
        self._delegate = AssistantAgent(
            "LLMAgent", 
            llm_config={
                "model": "gpt-4.1-mini",
                "api_key": os.getenv("OPENAI_API_KEY"),
                "price": [0.00015, 0.0006]  # Add pricing to prevent warning
            }
        )

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        print(f"{self.id.type} received message: {message.content}")
        text_message = TextMessage(content=message.content, source="user")
        response = self._delegate.generate_reply([{"role": "user", "content": message.content}])
        reply = response
        print(f"{self.id.type} responded: {reply}")
        return Message(content=reply)
    


In [10]:
# Note: autogen_core doesn't exist in current AutoGen version
# Using our custom SingleThreadedAgentRuntime implementation

# Create runtime instance
runtime = SingleThreadedAgentRuntime()

# Register agents (simplified for demonstration)
simple_agent = SimpleAgent()
llm_agent = MyLLMAgent()

# Store agents in runtime with correct key format
runtime.agents["agent_Simple"] = simple_agent
runtime.agents["agent_LLMAgent"] = llm_agent

print("✅ Agents registered successfully!")
print(f"Registered agents: {list(runtime.agents.keys())}")

✅ Agents registered successfully!
Registered agents: ['agent_Simple', 'agent_LLMAgent']


In [11]:
runtime.start()  # Start processing messages in the background.

# Send messages between agents
response = await runtime.send_message(Message("Hi there!"), AgentId("agent", "LLMAgent"))
print(">LLMAgent>>", response.content)

response = await runtime.send_message(Message(response.content), AgentId("agent", "Simple"))
print(">>Simple>>", response.content)

response = await runtime.send_message(Message(response.content), AgentId("agent", "LLMAgent"))
print(">>LLMAgent>>", response.content)

🚀 Runtime started
agent received message: Hi there!
agent responded: Hello! How can I assist you today?
>LLMAgent>> Hello! How can I assist you today?
>>Simple>> This is agent-Simple. You said 'Hello! How can I assist you today?' and I disagree.
agent received message: This is agent-Simple. You said 'Hello! How can I assist you today?' and I disagree.
agent responded: I understand you disagree with the greeting I used. Could you please clarify how you would prefer me to greet you or assist you? This will help me tailor my responses to your preferences.
>>LLMAgent>> I understand you disagree with the greeting I used. Could you please clarify how you would prefer me to greet you or assist you? This will help me tailor my responses to your preferences.


In [12]:
await runtime.stop()
await runtime.close()

⏹️ Runtime stopped
🔒 Runtime closed


### OK now let's show this at work - let's have 3 agents interact!

In [13]:
# Note: autogen_ext.models.ollama doesn't exist in current AutoGen version
# Using AssistantAgent with OpenAI instead

class Player1Agent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self._delegate = AssistantAgent(
            name,
            llm_config={
                "model": "gpt-4.1-mini",
                "api_key": os.getenv("OPENAI_API_KEY"),
                "price": [0.00015, 0.0006],  # Add pricing to prevent warning
                "temperature": 1.0
            }
        )

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = self._delegate.generate_reply([{"role": "user", "content": message.content}])
        return Message(content=response)
    
class Player2Agent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self._delegate = AssistantAgent(
            name, 
            llm_config={
                "model": "llama3.2",
                "api_key": "ollama",  # Ollama doesn't require a real API key
                "temperature": 1.0,
                "base_url": "http://192.168.2.118:1234/v1"  # Ollama API endpoint
            }
        )

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = self._delegate.generate_reply([{"role": "user", "content": message.content}])
        return Message(content=response)

In [14]:
JUDGE = "You are judging a game of rock, paper, scissors. The players have made these choices:\n"

class RockPaperScissorsAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self._delegate = AssistantAgent(
            name, 
            llm_config={
                "model": "gpt-4.1-mini",
                "api_key": os.getenv("OPENAI_API_KEY"),
                "price": [0.00015, 0.0006],  # Add pricing to prevent warning
                "temperature": 1.0
            }
        )

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        instruction = "You are playing rock, paper, scissors. Respond only with the one word, one of the following: rock, paper, or scissors."
        message = Message(content=instruction)
        inner_1 = AgentId("player1", "player1")
        inner_2 = AgentId("player2", "player2")
        response1 = await self.send_message(message, inner_1)
        response2 = await self.send_message(message, inner_2)
        result = f"Player 1: {response1.content}\nPlayer 2: {response2.content}\n"
        judgement = f"{JUDGE}{result}Who wins?"
        message = TextMessage(content=judgement, source="user")
        response = self._delegate.generate_reply([{"role": "user", "content": judgement}])
        return Message(content=result + response)


In [15]:
runtime = SingleThreadedAgentRuntime()
await Player1Agent.register(runtime, "player1", lambda: Player1Agent("player1"))
await Player2Agent.register(runtime, "player2", lambda: Player2Agent("player2"))
await RockPaperScissorsAgent.register(runtime, "rock_paper_scissors", lambda: RockPaperScissorsAgent("rock_paper_scissors"))
runtime.start()

✅ Registered agent: player1
✅ Registered agent: player2
✅ Registered agent: rock_paper_scissors
🚀 Runtime started


In [21]:
agent_id = AgentId("rock_paper_scissors", "rock_paper_scissors")
message = Message(content="go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1: rock
Player 2: {'content': 'rock', 'refusal': None, 'role': 'assistant', 'annotations': None, 'audio': None, 'function_call': None, 'tool_calls': []}
The choices for the players are:

Player 1: rock
Player 2: rock

Since both players chose rock, the game is a tie. There is no winner. 

TERMINATE


In [22]:
await runtime.stop()
await runtime.close()

⏹️ Runtime stopped
🔒 Runtime closed
