In [3]:
from gai.messages.typing import MessagePydantic
from gai.sessions.message_bus_local import LocalMessageBus
from gai.messages import message_helper
from gai.lib.constants import DEFAULT_GUID
from rich import print

amb = LocalMessageBus(message_type=MessagePydantic)

# 1. Start the background bus loop

await amb.start()
print("Bus is ready.")

# 2. Subscribe and handle send to LLM


async def handle_send(msg: MessagePydantic):
    # Forward user message to LLM

    from gai.llm.openai import OpenAI

    client = OpenAI(client_config={"model": "gpt-4o-mini"})
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": msg.body.content}],
        max_tokens=100,
    )

    # Forward LLM response to user
    assistant_message = message_helper.create_chat_reply_message(
        dialogue_id=dialogue_id,
        round_no=resp.body.round_no,
        step_no=resp.body.step_no + 1,
        chunk_no=0,
        sender=msg.header.recipient,
        recipient=msg.header.sender,
        chunk="<eom>",
        content=resp.choices[0].message.content,
    )
    await amb.publish(assistant_message)


await amb.subscribe("send", {"Alice": handle_send})

# 3. Subscribe and handle replies


async def handle_reply(msg: MessagePydantic):
    print(f"Reply from {msg.header.sender}: [green]{msg.body.content}[/]")


await amb.subscribe("reply", {"User": handle_reply})

# 4. Publish a message to the bus
user_message = message_helper.create_chat_send_message(
    dialogue_id=DEFAULT_GUID,
    round_no=0,
    step_no=0,
    recipient="Agent",
    content="Hello, what is your name?",
)

await amb.publish(user_message)


### b) Same Message Type - Multiple Subscribers

In [6]:
import asyncio
from gai.messages import MessagePydantic
from gai.sessions.message_bus_local import LocalMessageBus
from rich import print

amb = LocalMessageBus(message_type=MessagePydantic)

# 1. Start the background bus loop
await amb.start()
print("Bus is ready.")

# 2. Create 2 handlers to show multiple subscriptions to the same message type


async def handler_1(msg: MessagePydantic):
    print(f"[bright_yellow] handler_1 > {msg}[/bright_yellow]")


async def handler_2(msg: MessagePydantic):
    print(f"[bright_green] handler_2 > {msg}[/bright_green]")


# 2. Subscribe the callback to the message type="send"

await amb.subscribe("send", {"Alice": handler_1})
await amb.subscribe("send", {"Bob": handler_2})
print("Subscribed to message type 'send'")

# 4. Confirm the bus is started and publish a message

if not amb.is_started:
    raise RuntimeError("Bus is not started, cannot publish message.")

user_message = message_helper.create_chat_send_message(
    dialogue_id=DEFAULT_GUID,
    round_no=0,
    step_no=0,
    recipient="Agent",
    content="Hello, what is your name?",
)

print("Publishing message.")
await amb.publish(user_message)
print("Message published.")
await asyncio.sleep(0.1)  # Give time for the message to be processed

# 5. Wait a little for message to be received before cancelling task
await amb.stop()

print("Bus stopped.")


### b) Multiple Message Type - One Subscriber

In [7]:
import asyncio
from gai.messages import MessagePydantic, message_helper
from gai.sessions.message_bus_local import LocalMessageBus
from rich import print

amb = LocalMessageBus(message_type=MessagePydantic)

# 1. Start the background bus loop AND wait for signal to be ready
await amb.start()
print("Bus is ready.")

# 2. Create 2 handlers: one for SendMessagePydantic and one for ReplyMessagePydantic


async def send_handler(msg: MessagePydantic):
    print(f"[bold bright_yellow]Send Message Received: {msg}[/bold bright_yellow]")


async def reply_handler(msg: MessagePydantic):
    print(f"[bold bright_green]Send Message Received: {msg}[/bold bright_green]")


# 3. Subscribe the callback to the message type="alpha"

await amb.subscribe("send", {"Alice": send_handler})
await amb.subscribe("reply", {"Alice": reply_handler})
print("Subscribed to send and reply message types")

# 4. Confirm the bus is started and publish a message

if not amb.is_started:
    raise RuntimeError("Bus is not started, cannot publish message.")

user_message = message_helper.create_chat_send_message(
    dialogue_id=DEFAULT_GUID,
    round_no=0,
    step_no=0,
    recipient="Agent",
    content="Tell me a one sentence story.",
)

print("Publishing send message.")
await amb.publish(user_message)
print("Send message published.")

assistant_message = message_helper.create_chat_reply_message(
    dialogue_id=DEFAULT_GUID,
    round_no=0,
    step_no=1,
    chunk_no=0,
    sender="Sara",
    recipient="User",
    chunk="<eom>",
    content="Once upon a time, a brave knight saved a kingdom from a dragon.",
)

print("Publishing reply message.")
await amb.publish(assistant_message)
print("Reply message published.")

# 5. Wait a little for message to be received before cancelling task

await asyncio.sleep(0.1)

# 6. Stop the bus
await amb.stop()
print("Bus stopped.")


### c) Same Message Type - Same Handler - Different Handler State

In [9]:
import asyncio
from gai.messages import MessagePydantic, message_helper
from gai.sessions.message_bus_local import LocalMessageBus

from rich import print

# 3. Start the background bus loop AND wait for signal to be ready

amb = LocalMessageBus(message_type=MessagePydantic)
await amb.start()
print("Bus is ready.")

# 1. Create 2 assistant send_handlers to show multiple subscriptions to the same message type


class Host:
    def __init__(self, name: str):
        self.name = name

    async def send_handler(self, msg: MessagePydantic):
        print(f"[bold bright_yellow]{self.name} > {msg}[/bold bright_yellow]")

        assistant_message = message_helper.create_chat_reply_message(
            dialogue_id=DEFAULT_GUID,
            round_no=msg.body.round_no,
            step_no=msg.body.step_no + 1,
            chunk_no=0,
            sender=self.name,
            recipient="User",
            chunk="<eom>",
            content=f"My name is {self.name}",
        )

        await amb.publish(assistant_message)


host_1 = Host("Host 1")
host_2 = Host("Host 2")

# 2. Create 1 user reply_handlers to handle replies


async def reply_handler(msg: MessagePydantic):
    print(f"[bold bright_green]User > {msg}[/bold bright_green]")


# 2. Subscribe the callback to the message type="alpha" but same handler belonging to different objects

await amb.subscribe("send", {"Alice": host_1.send_handler})
await amb.subscribe("send", {"Bob": host_2.send_handler})
await amb.subscribe("reply", {"User": reply_handler})

print("Subscribed to message type 'send'")

# 4. Confirm the bus is started and publish a message

if not amb.is_started:
    raise RuntimeError("Bus is not started, cannot publish message.")

user_message = message_helper.create_chat_send_message(
    dialogue_id=DEFAULT_GUID,
    round_no=0,
    step_no=0,
    recipient="*",
    content="Hello, what is your name?",
)
print("Publishing message.")
await amb.publish(user_message)
print("Message published.")

# 5. Wait a little for message to be received before cancelling task

await asyncio.sleep(0.1)
status = await amb.stop()
print("Bus stopped.")
