In [2]:
from dataclasses import dataclass # decorator is used to automically generate boilerplate code for classes that are mainly used to store data (like __init__, __repr__, __eq__, etc.).
from typing import Callable

from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler


@dataclass
class Message:
    content: int


@default_subscription
class Modifier(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nModifier:\nModified {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())  # type: ignore


@default_subscription
class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\nChecker:\n{message.content} passed the check, continue.")
            await self.publish_message(Message(content=message.content), DefaultTopicId())
        else:
            print(f"{'-'*80}\nChecker:\n{message.content} failed the check, stopping.")


### ✅ 1. **The key idea: `publish_message` → routed by the runtime**

This multi-agent setup uses **`autogen_core`**, where each agent is **"routed"** by a runtime (`SingleThreadedAgentRuntime`). Agents don’t call each other directly — they send messages to a **shared topic**, and the **runtime routes messages** to all subscribed agents.

---

### ✅ 2. **What connects them: `DefaultTopicId()`**

Both agents use:

```python
await self.publish_message(..., DefaultTopicId())
```

This means:

* They are **both subscribed to the default topic**.
* When an agent **publishes a message**, **all agents** listening to that topic receive it.
* Message flow is **broadcasted**, and agents decide **individually** whether to act.

---

### 🔄 3. **How the flow works:**

#### 🔹 Step-by-step:

1. `Checker` receives the initial message:

   ```python
   await runtime.send_message(Message(10), AgentId("checker", "default"))
   ```

2. `Checker` checks the value:

   ```python
   if not self._run_until(message.content):  # i.e., value > 1
   ```

3. If it **passes**, `Checker` **publishes** the same message to the **default topic**:

   ```python
   await self.publish_message(Message(content=message.content), DefaultTopicId())
   ```

4. `Modifier`, which is also subscribed to that topic, receives the message:

   ```python
   val = self._modify_val(message.content)  # subtract 1
   ```

5. `Modifier` then **publishes** the **modified value** again → loop continues.

---

### 🔁 4. **Loop until exit condition is met**

This creates a message loop:

```
Checker → Modifier → Checker → Modifier → ...
```

...until `Checker` sees a value that **fails** its condition (`<= 1`), and **does not re-publish**.

---


### ✅ Summary:

| Component          | Role                                            |
| ------------------ | ----------------------------------------------- |
| `Checker`          | Checks the condition, re-publishes if valid     |
| `Modifier`         | Modifies value (subtracts 1), re-publishes      |
| `DefaultTopicId()` | Shared channel/topic for message exchange       |
| `runtime`          | Routes messages between agents on shared topics |

---

Let me know if you'd like a flow diagram or to visualize the message loop!


In [3]:
from autogen_core import AgentId, SingleThreadedAgentRuntime

# Create a local embedded runtime.
runtime = SingleThreadedAgentRuntime()

# Register the modifier and checker agents by providing
# their agent types, the factory functions for creating instance and subscriptions.
await Modifier.register(
    runtime,
    "modifier",
    # Modify the value by subtracting 1
    lambda: Modifier(modify_val=lambda x: x - 1),
)

await Checker.register(
    runtime,
    "checker",
    # Run until the value is less than or equal to 1
    lambda: Checker(run_until=lambda x: x <= 1),
)

# Start the runtime and send a direct message to the checker.
runtime.start()
await runtime.send_message(Message(10), AgentId("checker", "default"))
await runtime.stop_when_idle()


--------------------------------------------------------------------------------
Checker:
10 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 10 to 9
--------------------------------------------------------------------------------
Checker:
9 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 9 to 8
--------------------------------------------------------------------------------
Checker:
8 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 8 to 7
--------------------------------------------------------------------------------
Checker:
7 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 7 to 6
--------------------------------------------------------------------------------
Checker:
