In [1]:
import asyncio
from abc import abstractmethod
from collections import deque
from typing import ClassVar

from lionagi.core.communication.action_response import ActionResponse
from lionagi import Session
from lionagi.core.action.action_manager import ActionManager, Tool
from lionagi.core.communication.message import RoledMessage
from lionagi.core.generic.node import Node
from lionagi.core.communication.action_request import ActionRequest
from lionagi.protocols.operatives.action import ActionRequestModel, ActionResponseModel
from pydantic import BaseModel, PrivateAttr
from abc import ABC

RESERVED_METHODS = (
    "__init__",
    "actions",
    "generate_tools",
)


class Ability(BaseModel):

    _tool: ClassVar[Tool | None] = None

    @classmethod
    def func_callable(cls, *args, **kwargs):
        """
        Return a function that does the actual work.
        Must be overridden in concrete classes.
        """

        async def some_function(*args, **kwargs) -> BaseModel:
            pass

        return some_function

    @classmethod
    def as_tool(cls) -> Tool:
        """
        Convert this Ability into a Tool for registration with ActionManager.
        """
        if cls._tool is None:
            func = cls.func_callable()
            tool = Tool(function=func)
            cls._tool = tool
        return cls._tool

In [2]:
# --------------------------------------------------------------------------
# 3. Capability Base
# --------------------------------------------------------------------------
class Capability(ABC):
    """
    A Capability groups multiple Abilities (Tools) under one manager.
    Each Capability can process ActionRequests directed to its abilities.
    """

    default_abilities: ClassVar[list[type[Ability]]] = []

    def __init__(self):
        self.ability_registry = {}
        self.action_manager = ActionManager()

    def register_ability(self, ability: type[Ability]):
        tool = ability.as_tool()
        func_name = tool.function.__name__
        self.action_manager.register_tool(tool, update=True)
        self.ability_registry[func_name] = ability

    def dump_log(self):
        self.action_manager.logger.dump()

    async def process(self, action_request: ActionRequest) -> ActionResponse:
        """
        Override if you want advanced logic before/after invocation.
        By default, just invoke the tool and wrap the response.
        """
        response = await self.action_manager.invoke(action_request)
        return ActionResponseModel(
            function=action_request.function,
            arguments=action_request.arguments,
            output=response,
        )

    @classmethod
    def create(cls):
        """
        Factory method that registers all default_abilities.
        """
        capability = cls()
        for ability_cls in cls.default_abilities:
            capability.register_ability(ability_cls)
        return capability

In [3]:
from pydantic import Field


class Recipient(BaseModel):
    name: str


class Agent(Node):

    session: Session | None = Field(default_factory=Session)

    mailbox: dict[str, dict[str, deque[RoledMessage]]] = {
        "pending_in": deque(),
        "pending_out": {},
    }

    capabilities: dict[str, Capability] = Field(default_factory=dict)
    _execute_mode: bool = PrivateAttr(default=False)

    @property
    def name(self): ...

    def register_capability(self, capability: type[Capability]):
        capability = capability.create()
        self.capabilities[capability.__class__.__name__] = capability

    def send(self, message: RoledMessage):
        if not self.mailbox["pending_out"].get(message.recipient, None):
            self.mailbox["pending_out"][message.recipient] = deque()
        self.mailbox["pending_out"][message.recipient].append(message)

    async def execute(self):
        self._execute_mode = True
        while self.mailbox["pending_in"]:
            await self.forward()
        self._execute_mode = False

    async def forward(self):
        messages = self.mailbox["pending_in"].popleft()
        response = await self.process(messages)
        self.send(response)

    async def _decide_recipient(
        self, message: ActionResponse, available_recipients: list[str]
    ) -> str:
        recipient = await self.session.default_branch.operate(
            instruction="please decide a the recipient for our message",
            context=message.content.to_dict(),
            guidance=f"must choose one of the following: {available_recipients}",
            operative_model=Recipient,
        )
        return recipient.name

    async def process(
        self, action_request: ActionRequest, available_recipients: list[str]
    ):
        """
        The simplest approach:
        1) find the right Capability that can handle the function
        2) call its .process()
        """

        # Search each capability to see if it has the requested tool
        for cap_name, cap_obj in self.capabilities.items():
            if action_request.function in cap_obj.action_manager.registry:
                action_response_model = await cap_obj.process(action_request)
                action_response = ActionResponse(
                    action_request=action_request,
                    output=action_response_model.output,
                )
                action_response.sender = self.name
                action_response.recipient = await self._decide_recipient(
                    action_response, available_recipients=available_recipients
                )
                self.send(action_response)
                return
        raise ValueError(
            f"No capability can handle function '{action_request.function}'!"
        )

    def join_orchestrator(self, orchestrator):
        orchestrator.agents[self.session.ln_id] = self

In [4]:
class EchoAbility(Ability):
    """
    Example ability that echoes a message.
    """

    @classmethod
    def func_callable(cls, *args, **kwargs):
        # This is the actual function the tool calls
        def echo_message(message: str) -> str:
            return f"Echo: {message}"

        return echo_message


class CalculatorAbility(Ability):
    """
    Simple ability that adds two numbers.
    """

    @classmethod
    def func_callable(cls, *args, **kwargs):
        def add_numbers(x: float, y: float) -> float:
            return x + y

        return add_numbers

In [5]:
class EchoCapability(Capability):
    """
    A simple capability that includes an echo ability and a calculator ability.
    """

    # You can define which abilities are by default available
    default_abilities: ClassVar[list[type[Ability]]] = [EchoAbility, CalculatorAbility]

In [6]:
echo1 = ActionRequestModel(
    function="echo_message",
    arguments={"message": "Hello, world!"},
)

add_numbers1 = ActionRequestModel(
    function="add_numbers",
    arguments={"x": 1.0, "y": 2.0},
)

In [7]:
echo_capability = EchoCapability.create()

In [8]:
echo1_response = await echo_capability.process(echo1)
add_numbers1_response = await echo_capability.process(add_numbers1)

In [9]:
echo1_response.model_dump()

{'function': 'echo_message',
 'arguments': {'message': 'Hello, world!'},
 'output': 'Echo: Hello, world!'}

In [10]:
add_numbers1_response.model_dump()

{'function': 'add_numbers', 'arguments': {'x': 1.0, 'y': 2.0}, 'output': 3.0}

In [11]:
class DemoAgent(Agent):

    @property
    def name(self):
        return "alligator"


class Demo2Agent(Agent):

    @property
    def name(self):
        return "crocodile"


class Demo3Agent(Agent):

    @property
    def name(self):
        return "barracuda"

In [12]:
demo1 = DemoAgent()
demo1.register_capability(EchoCapability)

demo2 = Demo2Agent()
demo2.register_capability(EchoCapability)

demo3 = Demo3Agent()
demo3.register_capability(EchoCapability)

In [13]:
echo2 = ActionRequest(
    sender="user",
    function="echo_message",
    arguments={"message": "User: Hello, world!"},
)

echo3 = ActionRequest(
    sender="assistant",
    function="echo_message",
    arguments={"message": "Assistant Crocodile: Hello, world2!"},
)

In [14]:
await demo1.capabilities["EchoCapability"].process(echo2)

ActionResponseModel(function='echo_message', arguments={'message': 'User: Hello, world!'}, output='Echo: User: Hello, world!')

In [15]:
await demo1.process(
    action_request=echo2, available_recipients=["crocodile", "barracuda", "alice"]
)

In [16]:
demo1.mailbox

{'pending_in': deque([]),
 'pending_out': {'alice': deque([ActionResponse(ln_id=IDType(5b032233-f85e-4ff9-bd72-dade1440411a), created_timestamp=2024-12-27 22:48:50.671391, content={'action_request_id': IDType(288409af-8613-40c9-8202-460be0b589da), 'action_response': {'function..., metadata={'last_updated': {'sender': 1735339730.671427, 'recipient': 1735339731.078203}}, created_timestamp=1735339730.671391, sender=alligator, recipient=alice, role=assistant, extra_fields={})])}}

In [17]:
await demo1.process(
    action_request=echo3, available_recipients=["crocodile", "barracuda", "alice"]
)

In [18]:
demo1.mailbox

{'pending_in': deque([]),
 'pending_out': {'alice': deque([ActionResponse(ln_id=IDType(5b032233-f85e-4ff9-bd72-dade1440411a), created_timestamp=2024-12-27 22:48:50.671391, content={'action_request_id': IDType(288409af-8613-40c9-8202-460be0b589da), 'action_response': {'function..., metadata={'last_updated': {'sender': 1735339730.671427, 'recipient': 1735339731.078203}}, created_timestamp=1735339730.671391, sender=alligator, recipient=alice, role=assistant, extra_fields={})]),
  'crocodile': deque([ActionResponse(ln_id=IDType(4e6e22f6-4959-44f8-a547-220a2f7f26ff), created_timestamp=2024-12-27 22:48:51.088725, content={'action_request_id': IDType(f337c24b-aeb0-4781-8f2b-4873a8cb5865), 'action_response': {'function..., metadata={'last_updated': {'sender': 1735339731.088762, 'recipient': 1735339731.65494}}, created_timestamp=1735339731.088725, sender=alligator, recipient=crocodile, role=assistant, extra_fields={})])}}

In [20]:
class Orchestrator:

    def __init__(self, refresh_rate: int = 60, agents: list[Agent] = []):
        self.agents: dict[str, Agent] = {}
        self.mailbox = {
            "pending_in": deque(),
            "pending_out": {agent.name: deque() for agent in agents},
            "external_out": deque(),
        }
        self.external_sources = {}
        self._stop_event = asyncio.Event()
        self._refresh_rate = refresh_rate
        self._lock = asyncio.Lock()
        self._execute_mode = False

    def collect(self):
        for agent in self.agents.values():
            for k, v in agent.mailbox["pending_out"].items():
                self.mailbox["pending_in"].extend(v)
            self.mailbox["pending_in"].extend(agent.mailbox["pending_out"])

    def send(self):
        while self.mailbox["pending_in"]:
            msg = self.mailbox["pending_in"].popleft()
            if msg.recipient in self.agents:
                self.agents[msg.recipient].mailbox["pending_in"].append(msg)
            else:
                self.mailbox["external_out"].append(msg)

    async def forward(self):
        self.collect()
        self.send()
        tasks = [asyncio.create_task(agent.execute()) for agent in self.agents.values()]
        asyncio.gather(*tasks)
        self.collect()
        await asyncio.sleep(0.1)

    async def stop(self):
        self._stop_event.set()

    async def execute(self):
        self._execute_mode = True
        while not self._stop_event.is_set():
            await self.forward()
            await asyncio.sleep(self._refresh_rate)

        self._execute_mode = False

In [21]:
orch = Orchestrator(refresh_rate=60, agents=[demo1, demo2, demo3])

In [22]:
orch.collect()

In [23]:
orch.mailbox

{'pending_in': deque([]),
 'pending_out': {'alligator': deque([]),
  'crocodile': deque([]),
  'barracuda': deque([])},
 'external_out': deque([])}

In [24]:
orch.send()

In [19]:
echo2 = ActionRequest(
    sender="user",
    function="echo_message",
    arguments={"message": "User: Hello, world!"},
)

demo1.mailbox["pending_in"].append(echo2)

In [20]:
await orch.forward()

In [21]:
demo1.mailbox

{'pending_in': deque([ActionRequest(ln_id=IDType(5064450a-c903-47d4-b41b-a2c9843cc750), created_timestamp=2024-12-27 22:44:57.885043, content={'action_request': {'function': 'echo_message', 'arguments': {'message': 'User: Hello, world!'}}}, metadata={}, created_timestamp=1735339497.885043, sender=user, recipient=N/A, role=assistant, extra_fields={})]),
 'pending_out': {'alice': deque([ActionResponse(ln_id=IDType(f6ebe6cd-98c5-4af3-beea-1124db591f3c), created_timestamp=2024-12-27 22:44:57.222842, content={'action_request_id': IDType(87db6215-b87c-415b-beda-16f836e426f8), 'action_response': {'function..., metadata={'last_updated': {'sender': 1735339497.22289, 'recipient': 1735339497.863748}}, created_timestamp=1735339497.222842, sender=alligator, recipient=alice, role=assistant, extra_fields={})])}}

In [28]:
orch.mailbox

{'pending_in': deque([]), 'pending_out': {}, 'external_out': deque([])}

In [29]:
orch.collect()

In [30]:
orch.mailbox

{'pending_in': deque([]), 'pending_out': {}, 'external_out': deque([])}

In [22]:
demo2.mailbox

{'pending_in': deque([]), 'pending_out': {}}

In [23]:
demo3.mailbox

{'pending_in': deque([]), 'pending_out': {}}

In [24]:
orch.mailbox

{'pending_in': deque([]), 'pending_out': {}, 'external_out': deque([])}

In [25]:
orch.mailbox["external_out"]

deque([])