# RFC 001: Global run function

In [1]:
from abc import ABC, abstractmethod
from typing import Annotated, Any, Iterable, Optional, Protocol, Union, runtime_checkable
from unittest.mock import MagicMock
from uuid import UUID

from pydantic import BaseModel, Field

from autogen import Agent

## Events

These are five different abstract types of events used by the runtime...

In [2]:
@runtime_checkable
class EventProtocol(Protocol):
    @property
    def uuid(self) -> UUID: ...


@runtime_checkable
class InputRequestProtocol(EventProtocol, Protocol):
    @property
    def prompt(self) -> str: ...

    def respond(self, response: Union[str, "InputResponseProtocol"]) -> "InputResponseProtocol": ...


@runtime_checkable
class AsyncInputRequestProtocol(EventProtocol, Protocol):
    @property
    def prompt(self) -> str: ...

    async def respond(self, response: Union[str, "InputResponseProtocol"]) -> "InputResponseProtocol": ...


@runtime_checkable
class InputResponseProtocol(EventProtocol, Protocol):
    pass

@runtime_checkable
class AgentMessageProtocol(EventProtocol, Protocol):
    @property
    def message(self) -> str: ...


@runtime_checkable
class OutputProtocol(EventProtocol, Protocol):
    @property
    def message(self) -> str: ...


@runtime_checkable
class SystemEventProtocol(EventProtocol, Protocol):
    pass

Helpers

In [3]:
def is_input_request(event: EventProtocol) -> bool:
    return isinstance(event, InputRequestProtocol)


def is_async_input_request(event: EventProtocol) -> bool:
    return isinstance(event, AsyncInputRequestProtocol)


def is_input_response(event: EventProtocol) -> bool:
    return isinstance(event, InputResponseProtocol)


def is_output(event: EventProtocol) -> bool:
    return isinstance(event, OutputProtocol)

def is_agent_message(event: EventProtocol) -> bool:
    return isinstance(event, AgentMessageProtocol)

def is_system_event(event: EventProtocol) -> bool:
    return isinstance(event, SystemEventProtocol)

The actual implementation could use Pydantic `BaseModel`:

In [4]:
class Event(BaseModel, ABC):
    uuid: Annotated[UUID, Field]


class InputRequest(Event, ABC):
    @property
    @abstractmethod
    def prompt(self) -> str: ...

    @abstractmethod
    def respond(self, response: "InputResponseProtocol"): ...


class InputResponse(Event, ABC):
    pass


class Output(Event, ABC):
    @property
    @abstractmethod
    def message(self) -> str: ...

class AgentMessage(Event, ABC):
    @property
    @abstractmethod
    def message(self) -> str: ...

class SystemEvent(Event, ABC):
    pass

## Client will abstract from the actual LLM used

In [5]:
class LLMClientProtocol(Protocol):
    pass


class LLMMessageProtocol(Protocol):
    @property
    def client(self) -> LLMClientProtocol: ...

    @property
    def raw_message(self) -> Any: ...

## Global run function

In [6]:
class RunResponseProtocol(Protocol):
    @property
    def events(self) -> list[EventProtocol]: ...

    @property
    def messages(self) -> list[LLMMessageProtocol]: ...

    @property
    def summary(self) -> str: ...


In [7]:
class EventProcessorProtocol(Protocol):
    def process(self, event: EventProtocol) -> None: ...

class AsyncEventProcessorProtocol(Protocol):
    async def process(self, event: EventProtocol) -> None: ...


In [8]:
class ConsoleEventProcessor(EventProcessorProtocol):
    def process(self, event: EventProtocol) -> None:
        if is_input_request(event):
            print(f"Input request: {event.prompt}")
            s = input(event.prompt)

            print(f"Response: {s}")
            event.respond(s)

        elif is_output(event):
            print(f"Output message: {event.message}")

        elif is_system_event(event):
            print(f"System event: {event}")

        elif is_agent_message(event):
            print(f"Agent message: {event.message}")


In [11]:
def run(
    *agents: Agent,
    message: Optional[str] = None,
    previous_run: Optional[RunResponseProtocol] = None,
    event_processor: Optional[EventProcessorProtocol] = None,
    **kwargs: Any
) -> RunResponseProtocol:
    """Run the agents with the given initial message.

    Args:
        agents: The agents to run.
        message: The initial message to send to the first agent.
        previous_run: The previous run to continue.
        kwargs: Additional arguments to pass to the first agent

    """
    event_processor = event_processor or ConsoleEventProcessor()

    # we should get those from running the agents
    events = [
        MagicMock(spec=InputRequestProtocol),
        MagicMock(spec=OutputProtocol),
        MagicMock(spec=SystemEventProtocol),
        MagicMock(spec=AgentMessageProtocol),
    ]
    # there will be IOStream class handling this
    for event in events:
        event_processor.process(event)

    response = MagicMock(spec=RunResponseProtocol)
    response.events = events
    response.summary = "42"

    return response


async def a_run(
    *agents: Agent,
    message: Optional[str] = None,
    previous_run: Optional[RunResponseProtocol] = None,
    event_processor: Optional[AsyncEventProcessorProtocol] = None,
    **kwargs: Any
) -> RunResponseProtocol:
    """Run the agents with the given initial message.

    Args:
        agents: The agents to run.
        message: The initial message to send to the first agent.
        previous_run: The previous run to continue.
        kwargs: Additional arguments to pass to the first agent

    """
    ...

## Event processing loop

In [13]:
agents: list[Agent] = []

response = run(*agents, message="What is the meaning of life?")

summary = response.summary

print(f"Summary: {summary}")

Input request: <MagicMock name='mock.prompt' id='5739869424'>
Response: hi
Output message: <MagicMock name='mock.message' id='5740958992'>
System event: <MagicMock spec='SystemEventProtocol' id='5739481216'>
Output message: <MagicMock name='mock.message' id='5741096208'>
Summary: 42


In [None]:
agents: list[Agent] = []

response = await a_run(*agents, message="What is the meaning of life?")

response = MagicMock(spec=AsyncRunResponseProtocol)
events = [
    MagicMock(spec=AsyncInputRequestProtocol),
    MagicMock(spec=OutputProtocol),
    MagicMock(spec=SystemEventProtocol),
    MagicMock(spec=AgentMessageProtocol),
]


async def _events():
    for e in events:
        yield e


response.events = _events()

async for event in response.events:
    if is_input_request(event):
        print(f"Input request: {event.prompt}")
        s = input(event.prompt)

        print(f"Response: {s}")
        await event.respond(s)

    elif is_output(event):
        print(f"Output message: {event.message}")

    elif is_system_event(event):
        print(f"System event: {event}")

    elif is_agent_message(event):
        print(f"Agent message: {event.message}")

summary = response.summary

## Examples

In [None]:
import autogen

config_list = autogen.config_list_from_json("../../notebook/OAI_CONFIG_LIST")

assert config_list