# Multi-Agent Simulation

In [None]:
import nest_asyncio

nest_asyncio.apply()

## Setup Multi-Agent System

In [None]:
import asyncio
from typing import Any, List
from pydantic import PrivateAttr
from llama_agents.message_consumers.base import BaseMessageQueueConsumer
from llama_agents.message_queues.simple import SimpleMessageQueue
from llama_agents.messages.base import QueueMessage
from datetime import datetime


class AgentConsumer(BaseMessageQueueConsumer):
    processed_messages: List[QueueMessage] = []
    _lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock)
    async_callback: Any = None

    async def _process_message(self, message: QueueMessage, **kwargs: Any) -> None:
        # Start service data
        service_start_time = datetime.now()
        wait_time = (service_start_time - message.data["arrival_time"]).total_seconds()
        message.data.update(
            {"service_start_time": service_start_time, "wait_time": wait_time}
        )

        # Service
        if message.type == "A":
            service_rate = 2
            startup = 0
        else:
            service_rate = 8
            startup = 3
        await asyncio.sleep(startup + np.random.exponential(service_rate))
        print("Processed message.")

        # Track data
        async with self._lock:
            departure_time = datetime.now()
            flow_time = (departure_time - message.data["arrival_time"]).total_seconds()
            service_time = (
                departure_time - message.data["service_start_time"]
            ).total_seconds()
            message.data.update(
                {
                    "departure_time": departure_time,
                    "service_time": service_time,
                    "flow_time": flow_time,
                }
            )
            self.processed_messages.append(message)

        # Publish task B if current task is A
        if self.async_callback and message.type == "A":
            await self.async_callback(message.id_)

In [None]:
mq = SimpleMessageQueue()
task = asyncio.create_task(mq.launch_local())


async def publish_B_task(parent_id):
    # print("publish B task", flush=True)
    new_agent_task = QueueMessage(
        type="B", data={"arrival_time": datetime.now(), "parent_A": parent_id}
    )
    await mq.publish(new_agent_task)


agent_consumers = [
    AgentConsumer(message_type="A", async_callback=publish_B_task),
    AgentConsumer(message_type="B"),
]

for agent_consumer in agent_consumers:
    await mq.register_consumer(agent_consumer)

INFO:llama_agents.message_queues.simple - Consumer ac571b31-c950-4eae-a29f-cbbba2e9dd05: A has been registered.
INFO:llama_agents.message_queues.simple - Consumer 2f122c54-6750-43f7-889e-acddfd109f0f: B has been registered.
INFO:llama_agents.message_queues.simple - Launching message queue locally
INFO:llama_agents.message_queues.base - Publishing message to 'B' with action 'None'
INFO:llama_agents.message_queues.simple - Successfully published message 'A' to consumer.


Processed message.


INFO:llama_agents.message_queues.simple - Successfully published message 'B' to consumer.


Processed message.


INFO:llama_agents.message_queues.base - Publishing message to 'B' with action 'None'
INFO:llama_agents.message_queues.simple - Successfully published message 'A' to consumer.


Processed message.


INFO:llama_agents.message_queues.simple - Successfully published message 'B' to consumer.


Processed message.


INFO:llama_agents.message_queues.base - Publishing message to 'B' with action 'None'
INFO:llama_agents.message_queues.simple - Successfully published message 'A' to consumer.


Processed message.


INFO:llama_agents.message_queues.simple - Successfully published message 'B' to consumer.


Processed message.


## Simulation: Incoming Agent Tasks

In [None]:
import asyncio
import numpy as np
from contextvars import ContextVar

In [None]:
async def simulate_incoming_tasks(max_tasks=10):
    task_counter: ContextVar[int] = ContextVar("task_counter", default=0)
    while True:
        interarrival_time = np.random.exponential(5)
        await asyncio.sleep(interarrival_time)
        print("New Task Arrival")
        new_agent_task = QueueMessage(type="A", data={"arrival_time": datetime.now()})
        await mq.publish(new_agent_task)

        task_counter_value = task_counter.get()
        updated_task_counter = task_counter_value + 1
        task_counter.set(updated_task_counter)
        if updated_task_counter == max_tasks:
            break

In [None]:
await asyncio.create_task(simulate_incoming_tasks(3))

INFO:llama_agents.message_queues.base - Publishing message to 'A' with action 'None'


New Task Arrival


INFO:llama_agents.message_queues.base - Publishing message to 'A' with action 'None'


New Task Arrival


INFO:llama_agents.message_queues.base - Publishing message to 'A' with action 'None'


New Task Arrival


### System Metrics

In [None]:
import pandas as pd

message_data = {
    "message_id": [],
    "message_type": [],
    "flow_times": [],
    "service_times": [],
    "wait_times": [],
    "arrival_times": [],
    "service_start_times": [],
    "departure_times": [],
}

for agent_consumer in agent_consumers:
    message_data["message_id"] += [m.id_ for m in agent_consumer.processed_messages]
    message_data["message_type"] += [m.type for m in agent_consumer.processed_messages]
    message_data["flow_times"] += [
        m.data["flow_time"] for m in agent_consumer.processed_messages
    ]
    message_data["service_times"] += [
        m.data["service_time"] for m in agent_consumer.processed_messages
    ]
    message_data["wait_times"] += [
        m.data["wait_time"] for m in agent_consumer.processed_messages
    ]
    message_data["arrival_times"] += [
        m.data["arrival_time"] for m in agent_consumer.processed_messages
    ]
    message_data["service_start_times"] += [
        m.data["service_start_time"] for m in agent_consumer.processed_messages
    ]
    message_data["departure_times"] += [
        m.data["departure_time"] for m in agent_consumer.processed_messages
    ]

system_data = pd.DataFrame(message_data)

In [None]:
system_data

Unnamed: 0,message_id,message_type,flow_times,service_times,wait_times,arrival_times,service_start_times,departure_times
0,93cece3e-5cdc-4a92-b2da-6273b82f01a5,A,0.533361,0.522281,0.01108,2024-07-01 14:18:30.513627,2024-07-01 14:18:30.524707,2024-07-01 14:18:31.046988
1,0b81016c-1e90-42fd-9356-f12cfffa9e9c,A,15.296713,7.304995,7.991718,2024-07-01 14:18:31.261285,2024-07-01 14:18:39.253003,2024-07-01 14:18:46.557998
2,04fecfdf-fa57-4efe-9b92-f8e9f5e65cbf,A,20.511926,1.818384,18.693542,2024-07-01 14:18:36.101715,2024-07-01 14:18:54.795257,2024-07-01 14:18:56.613641
3,299dd9ee-f47b-40a1-9621-cd9d67ca583f,B,8.103188,8.100804,0.002384,2024-07-01 14:18:31.047035,2024-07-01 14:18:31.049419,2024-07-01 14:18:39.150223
4,6d307430-2b38-4d0f-9445-509403179c03,B,8.134733,8.131872,0.002861,2024-07-01 14:18:46.558066,2024-07-01 14:18:46.560927,2024-07-01 14:18:54.692799
5,2472b164-9df5-469c-9e54-21c9d18f162a,B,28.854296,28.851548,0.002748,2024-07-01 14:18:56.613688,2024-07-01 14:18:56.616436,2024-07-01 14:19:25.467984


In [None]:
metric_cols = ["flow_times", "service_times", "wait_times"]
system_data[metric_cols].mean()

flow_times       13.572370
service_times     9.121647
wait_times        4.450722
dtype: float64

In [None]:
system_data.groupby("message_type")[metric_cols].mean()

Unnamed: 0_level_0,flow_times,service_times,wait_times
message_type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
A,12.114,3.21522,8.89878
B,15.030739,15.028075,0.002664


In [None]:
agent_consumers[1]

AgentConsumer(id_='2f122c54-6750-43f7-889e-acddfd109f0f', message_type='B', processed_messages=[QueueMessage(id_='299dd9ee-f47b-40a1-9621-cd9d67ca583f', publisher_id='default', data={'arrival_time': datetime.datetime(2024, 7, 1, 14, 18, 31, 47035), 'parent_A': '93cece3e-5cdc-4a92-b2da-6273b82f01a5', 'service_start_time': datetime.datetime(2024, 7, 1, 14, 18, 31, 49419), 'wait_time': 0.002384, 'departure_time': datetime.datetime(2024, 7, 1, 14, 18, 39, 150223), 'service_time': 8.100804, 'flow_time': 8.103188}, action=None, stats=QueueMessageStats(publish_time='2024-07-01 14:18:31', process_start_time='2024-07-01 14:18:31', process_end_time='2024-07-01 14:18:39'), type='B'), QueueMessage(id_='6d307430-2b38-4d0f-9445-509403179c03', publisher_id='default', data={'arrival_time': datetime.datetime(2024, 7, 1, 14, 18, 46, 558066), 'parent_A': '0b81016c-1e90-42fd-9356-f12cfffa9e9c', 'service_start_time': datetime.datetime(2024, 7, 1, 14, 18, 46, 560927), 'wait_time': 0.002861, 'departure_time'