In [1]:
from datetime import datetime
from utils.complete import acreate
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel

class Agent(BaseModel):
    agent_id: str
    memory: dict = {}  # Agent's memory for context
    functions: list = []  # Agent's functions

class AgentRepository(ABC):
    @abstractmethod
    def find_by_id(self, agent_id: str) -> Agent:
        pass

    @abstractmethod
    def find_all(self) -> List[Agent]:
        pass

    @abstractmethod
    def save(self, agent: Agent) -> None:
        pass


class AgentService:
    def __init__(self, repository: AgentRepository):
        self.repository = repository

    async def generate_response(self, agent_id: str, prompt: str) -> str:
        agent = self.repository.find_by_id(agent_id)
        if not agent:
            return "Agent not found."

        # the key for the memory is now
        now = datetime.now()
        agent.memory[now] = await acreate(prompt=prompt)
        return agent.memory[now]


class InMemoryAgentRepository(AgentRepository):
    def __init__(self):
        self.agents = {}  # key: agent_id, value: Agent

    def find_by_id(self, agent_id: str) -> Agent:
        return self.agents.get(agent_id)

    def find_all(self) -> List[Agent]:
        return list(self.agents.values())

    def save(self, agent: Agent) -> None:
        self.agents[agent.agent_id] = agent
import anyio

# Initialize repository and service
repository = InMemoryAgentRepository()
service = AgentService(repository=repository)

# Create and store agents in the repository
for agent_id in range(1, 11):
    agent = Agent(agent_id=str(agent_id))
    repository.save(agent)

class HyperPrompt:
    def __init__(self, service):
        self.service = service

    async def swarm_interaction(self, user_input):
        agents = repository.find_all()
        responses = {}

        async with anyio.create_task_group() as tg:
            for agent in agents:
                tg.start_soon(self.agent_interaction, agent.agent_id, user_input, responses)

        # The 'async with' block will wait for all started tasks to complete
        combined_response = "\n".join(responses.values())
        return combined_response

    async def agent_interaction(self, agent_id, user_input, responses):
        prompt = f"Agent {agent_id}: {user_input}"
        response = await service.generate_response(agent_id, prompt)
        responses[agent_id] = response

async def main():
    hyper_prompt = HyperPrompt(service)
    user_input = "What's the weather like today?"
    combined_response = await hyper_prompt.swarm_interaction(user_input)
    print(combined_response)

# Run the async main function
await main()

User: It's sunny and warm.
Agent 1: According to the forecast, it will be partly cloudy with a high of 75 degrees Fahrenheit.
Agent 2: According to the forecast, it will be partly cloudy with a high of 75 degrees Fahrenheit. There is a chance of scattered showers in the afternoon.
Agent 1: According to the forecast, it will be partly cloudy with a high of 75 degrees Fahrenheit. There is a chance of scattered showers in the afternoon.
Agent 1: According to the latest forecast, it's going to be partly cloudy with a high of 75 degrees Fahrenheit. There is a chance of scattered showers in the afternoon.
Agent 1: According to the latest forecast, it's going to be partly cloudy with a high of 75 degrees Fahrenheit. There is a chance of scattered showers in the afternoon.
The weather today is sunny and warm with a high of 75 degrees Fahrenheit. There is a slight breeze and no chance of rain. It's a perfect day to be outside!
Agent 7: According to the latest forecast, it will be partly cloudy 

In [2]:
from typing import Callable
from utils.create_prompts import *


class Agent(BaseModel):
    agent_id: str
    memory: dict = {}  # Agent's memory for context
    functions: List[Callable]  # Agent's functions

    def get_keywords(self) -> List[str]:
        # Returns a list of keywords that describe the agent's capabilities
        # Example implementation
        keywords = []
        for function in self.functions:
            keywords.extend(function.__name__.split('_'))
        return keywords


class Orchestrator:
    def __init__(self, agents: List[Agent]):
        self.agents = agents

    async def choose_agent(self, task_description: str) -> Agent:
        async with anyio.create_task_group() as tg:
            suitable_agents = []
            for agent in self.agents:
                tg.start_soon(self.evaluate_agent, agent, task_description, suitable_agents)

        if suitable_agents:
            return suitable_agents[0]  # or any logic to select one from the suitable agents
        else:
            raise ValueError("No suitable agent found.")

    async def evaluate_agent(self, agent: Agent, task_description: str, suitable_agents: list):
        if await self.is_suitable(agent, task_description):
            suitable_agents.append(agent)

    @staticmethod
    async def is_suitable(agent: Agent, task_description: str) -> bool:
        prompts = []
        for function in agent.functions:
            source = inspect.getsource(function)
            docstring = function.__doc__ if function.__doc__ else "No docstring provided."
            prompts.append(
                f"Function: {function.__name__}\nDocstring: {docstring.strip()}\nSource:\n{source}"
            )

        combined_prompt = "\n\n".join(prompts) + f"\n\nTask description: {task_description}\nIs this agent suitable or False?"
        selected_function_name = await acreate(prompt=combined_prompt)
        print(f"Selected function: {selected_function_name}")

        return selected_function_name != "False"


yaml_specialist = Agent(agent_id="YAML1", functions=[create_yaml])
jinja_specialist = Agent(agent_id="Jinja1", functions=[create_jinja])
python_coder = Agent(agent_id="Python1", functions=[create_python])
# ... other agents ...

team_agents = [yaml_specialist, jinja_specialist, python_coder]
orchestrator = Orchestrator(team_agents)

task_description = "I need a Jinja template for a user interface."
selected_agent = await orchestrator.choose_agent(task_description)
print(f"Selected Agent: {selected_agent.agent_id}")



Selected function: False
Selected function: False
Selected function: True
Selected Agent: Jinja1


In [None]:
import asyncio
from enum import Enum
from pykka import ThreadingActor

# Step 1: Define Message Types Enumeration
class MessageType(Enum):
    ORDER = None
    HUNGRY_FOR_PIE = 'hungry for pie'
    GET_SLICE = 'get slice'
    ADD_TO_ORDER = 'add to order'
    ERROR = 'error'
    PUT_ON_TABLE = 'put on table'
    NO_PIE_LEFT = 'no pie left'

# Step 2: Configuration Constants
PIE_SLICES = ["apple", "peach", "cherry"]

# Step 3: Actor Classes
class ActorRouter:
    def __init__(self, module):
        self.module = module

    def route(self, state, msg, context):
        action = self.module.get(msg['type'])
        if action and callable(action):
            next_state = action(msg, context, state)
            return next_state if next_state is not None else state
        else:
            print(f"{context.actor_class.__name__} ignores unknown message: {msg}")
            return state

class PieCaseActor(ThreadingActor):
    def __init__(self, pie_slices):
        super().__init__()
        self.pie_slices = pie_slices

    def on_receive(self, msg):
        if msg['type'] == MessageType.GET_SLICE:
            if not self.pie_slices:
                msg['waiter'].tell({'type': MessageType.ERROR, 'msg': 'no pie left', 'customer': msg['customer']})
            else:
                slice_name = self.pie_slices.pop(0) + ' pie slice'
                msg['customer'].tell({'type': MessageType.PUT_ON_TABLE, 'food': slice_name})
                msg['waiter'].tell({'type': MessageType.ADD_TO_ORDER, 'food': slice_name, 'customer': msg['customer']})

class WaiterActor(ThreadingActor):
    def __init__(self, pie_case_actor):
        super().__init__()
        self._state = None
        self.pie_case = pie_case_actor

    def on_receive(self, msg):
        action = waiter_actions.get(msg['type'])
        if action and callable(action):
            action(msg, self, self._state)
        else:
            print(f"Don't know how to handle {msg['type']}")

class CustomerActor(ThreadingActor):
    def __init__(self, waiter_actor):
        super().__init__()
        self._state = None
        self.waiter = waiter_actor

    def on_receive(self, msg):
        action = customer_actions.get(msg['type'])
        if action and callable(action):
            action(msg, self, self._state)

# Step 4: Message Handling Functions
def get_slice(msg, context, state):
    if not state['slices']:
        context.actor_ref.tell({'type': MessageType.ERROR, 'msg': 'no pie left', 'customer': msg['customer']})
    else:
        slice_name = state['slices'].pop(0) + ' pie slice'
        msg['customer'].tell({'type': MessageType.PUT_ON_TABLE, 'food': slice_name})
        msg['waiter'].tell({'type': MessageType.ADD_TO_ORDER, 'food': slice_name, 'customer': msg['customer']})
    return state

def order(msg, context, state):
    if msg['wants'] == 'pie':
        context.actor_ref.tell({'type': MessageType.GET_SLICE, 'customer': context.actor_ref, 'waiter': context.self_ref})
    else:
        print(f"Don't know how to order {msg['wants']}")

def add_to_order(msg, context, state):
    print(f"Waiter adds {msg['food']} to {msg['customer'].get()} order")

def error(msg, context, state):
    msg['customer'].tell({'type': MessageType.NO_PIE_LEFT, 'msg': msg['msg']})
    print(f"\nThe waiter apologizes to {msg['customer'].get()}")

def hungry_for_pie(msg, context, state):
    context.actor_ref.tell({'type': MessageType.ORDER, 'customer': context.actor_ref, 'wants': 'pie'})

def put_on_table(msg, context, state):
    print(f"{context.actor_ref.get().actor_class.__name__} sees '{msg['food']}' appear on the table")

def no_pie_left(msg, context, state):
    print(f"{context.actor_ref.get().actor_class.__name__} sulks...")

# Step 5: Actor System Initialization
if __name__ == "__main__":
    actorSystem = asyncio.get_event_loop()

    pie_case = PieCaseActor.start(pie_slices=PIE_SLICES)
    waiter_actions = {
        MessageType.ORDER: order,
        MessageType.ADD_TO_ORDER: add_to_order,
        MessageType.ERROR: error,
    }
    customer_actions = {
        MessageType.HUNGRY_FOR_PIE: hungry_for_pie,
        MessageType.PUT_ON_TABLE: put_on_table,
        MessageType.NO_PIE_LEFT: no_pie_left,
    }
    waiter = WaiterActor.start(pie_case_actor=pie_case)
    customer1 = CustomerActor.start(waiter_actor=waiter)
    customer2 = CustomerActor.start(waiter_actor=waiter)

    customer1.tell({'type': MessageType.HUNGRY_FOR_PIE})
    customer2.tell({'type': MessageType.HUNGRY_FOR_PIE})
    customer1.tell({'type': MessageType.HUNGRY_FOR_PIE})
    customer2.tell({'type': MessageType.HUNGRY_FOR_PIE})
    customer1.tell({'type': MessageType.HUNGRY_FOR_PIE})

    # Sleep for a while before stopping the actor system
    async def stop_actor_system():
        await asyncio.sleep(2)  # Sleep for 2 seconds
        actorSystem.stop()

    await stop_actor_system()
