# Simple Pig-Latin Translator

## Task Eavesdropper

In [1]:
import asyncio

In [2]:
import nest_asyncio

nest_asyncio.apply()

In [3]:
from llama_agents.message_queues.rabbitmq import RabbitMQMessageQueue
from multi_agent_system.utils import load_from_env

message_queue_host = load_from_env("RABBITMQ_HOST")
message_queue_port = load_from_env("RABBITMQ_NODE_PORT")
message_queue_username = load_from_env("RABBITMQ_DEFAULT_USER")
message_queue_password = load_from_env("RABBITMQ_DEFAULT_PASS")

message_queue = RabbitMQMessageQueue(
    url=f"amqp://{message_queue_username}:{message_queue_password}@{message_queue_host}:{message_queue_port}/"
)

In [4]:
from llama_agents.types import TASK_CONSUMER_NAME
from llama_agents.message_consumers.base import (
    BaseMessageQueueConsumer,
    StartConsumingCallable,
)
from llama_agents import QueueMessage, CallableMessageConsumer
from queue import Queue, Empty
from pydantic import PrivateAttr
from typing import Any

task_message_queue = Queue()

def task_message_queue_producer_closure(queue: Queue):
    """Consumer of the multi-agent system, but producer of stream."""
    def process_message(message: QueueMessage, **kwargs: Any):
        queue.put(message)

    return process_message

message_producer = task_message_queue_producer_closure(queue=task_message_queue)

task_message_eavesdropper = CallableMessageConsumer(
    message_type=TASK_CONSUMER_NAME,
    handler=message_producer
)

In [5]:
task_eavesdropper_start_consuming = await message_queue.register_consumer(task_message_eavesdropper)

INFO:llama_agents.message_queues.rabbitmq - Registered consumer 69f4a4da-d290-427f-bdb8-cf75cfd1c6bc: task_eavesdropper


In [6]:
import asyncio

_ = asyncio.create_task(task_eavesdropper_start_consuming())

## Task Progress Plotter

In [7]:
from typing import List
from collections import deque
import json
%matplotlib inline

import matplotlib
import matplotlib.pyplot as plt
plt.ion()

# figure, ax = plt.subplots(figsize=(10, 8))


class TaskProgress:
    def __init__(self, names: List[str]):
        self.queues = {name: deque() for name in names}
        self.task_position = {}
        self.name_position = {name: ix for ix, name in enumerate(names)}


    def reposition_task(self, task_id: str, name: str, text: str):
        if task_id in self.task_position:
            old_owner = self.task_position.pop(task_id)
            self.queues[old_owner].popleft()  # should be by order, right?

        # assign new position
        self.task_position[task_id] = name
        self.queues[name].append((task_id, text))

    def write_plot_data(self):        
        # prepare data
        data = []
        for name, queue in self.queues.items():
            for ix, (task_id, text) in enumerate(queue):
                data.append((self.name_position[name], ix, text))

        with open("sim_data.json", "w") as f:
            json.dump({
                "xs": [el[0] for el in data],
                "ys": [el[1] for el in data],
                "texts": [el[2] for el in data]
            }, f)

In [8]:
def consumer(queue, event):
    """Consumes the TaskResult and TaskDef messages."""
    task_progress = TaskProgress(names=["remove_ay_agent","correct_first_character_agent", "human"])
    while not event.is_set():
        message = queue.get()
        name = message.data["original_type"]
        if "input" in message.data:
            text = message.data["input"]
        elif "result" in message.data:
            text = message.data["result"]
        else:
            text = ""
        task_id = message.data["task_id"]
        if name in task_progress.queues:
            task_progress.reposition_task(task_id, name, text)
            task_progress.write_plot_data()
            # print(task_progress.queues)

In [9]:
import threading

event = threading.Event()
thread = threading.Thread(target=consumer, args=(task_message_queue, event))
thread.start()

## Send Task

In [10]:
from llama_agents import LlamaAgentsClient

client = LlamaAgentsClient("http://0.0.0.0:8001")

In [14]:
task_id = client.create_task("Please decode the following input: 'ellohay owhay reaay ouyay'")

In [12]:
task_result = client.get_task_result(task_id)
print(task_result.result)

TypeError: llama_agents.types.TaskResult() argument after ** must be a mapping, not NoneType

### Send A Bunch of Tasks

In [16]:
from llama_agents import LlamaAgentsClient

In [17]:
def pig_latin(text: str):
    tokens = text.lower().split()
    tmp = []
    for token in tokens:
        token = token[1:] + token[0] + 'ay'
        tmp.append(token)

    return ' '.join(tmp)

In [18]:
import random
from llama_index.llms.openai import OpenAI
from llama_index.core.bridge.pydantic import BaseModel

client = LlamaAgentsClient("http://0.0.0.0:8001")
llm = OpenAI("gpt-4o", temperature=1)

async def send_new_task():
    seed = random.random()
    response = await llm.acomplete(f"({seed}) Provide a 3 to 5 word phrase. Don't include any punctuation.")
    task = pig_latin(response.text)
    print(f"text: {response.text}, task: {task}")
    client.create_task(task)

In [19]:
await send_new_task()

text: Floating through serene skies, task: loatingfay hroughtay erenesay kiessay


In [20]:
import numpy as np

async def generate_tasks(n = 5):
    try:
        while True:
            interarrival_time = np.random.exponential(0.25)
            await asyncio.sleep(interarrival_time)
            await send_new_task()
    except (CancelledError, KeyboardInterrupt):
        print("Shutting down.")

In [21]:
await generate_tasks()

text: Whisper of the night, task: hisperway foay hetay ightnay
text: Whispering winds of change, task: hisperingway indsway foay hangecay
text: Embrace the journey ahead, task: mbraceeay hetay ourneyjay headaay
text: Whispering winds through trees, task: hisperingway indsway hroughtay reestay
text: Innovate for a better tomorrow, task: nnovateiay orfay aay etterbay omorrowtay
text: Whispering winds of change, task: hisperingway indsway foay hangecay
text: Whispering winds of autumn, task: hisperingway indsway foay utumnaay
text: Serene dawn breaking horizon, task: erenesay awnday reakingbay orizonhay
text: Whisper of the wind, task: hisperway foay hetay indway
text: Skyline over the horizon, task: kylinesay veroay hetay orizonhay
text: Whispers of the past, task: hispersway foay hetay astpay
text: Whispering across still waters, task: hisperingway crossaay tillsay atersway
text: Whispering winds through trees, task: hisperingway indsway hroughtay reestay
text: Whispers of autumn leaves

CancelledError: 