In [46]:
from textwrap import dedent

class Agent:
    def __init__(self, name, task_description, task_expected_output=""):
        self.name = name
        self.task_description = task_description
        self.task_expected_output = task_expected_output
        self.dependencies = []
        self.dependents = []
        self.context = ""

    def __repr__(self):
        return f"{self.name}"

    def __rshift__(self, other):
        self.add_dependent(other)
        return other

    def __lshift__(self, other):
        self.add_dependency(other)
        return other

    def add_dependency(self, other):
        if isinstance(other, Agent):
            self.dependencies.append(other)
            other.dependents.append(self)
        elif isinstance(other, list) and all(isinstance(item, Agent) for item in other):
            for item in other:
                self.dependencies.append(item)
                item.dependents.append(self)
        else:
            raise TypeError("The dependency must be an instance or list of Agent.")

    def add_dependent(self, other):
        if isinstance(other, Agent):
            other.dependencies.append(self)
            self.dependents.append(other)
        elif isinstance(other, list) and all(isinstance(item, Agent) for item in other):
            for item in other:
                item.dependencies.append(self)
                self.dependents.append(item)
        else:
            raise TypeError("The dependent must be an instance or list of Agent.")

    def receive_context(self, input_data):
        self.context += input_data + "\n"

    def create_prompt(self):
        prompt = dedent(
            f"""
            You are an AI agent. You are part of a team of agents working together to complete a task.
            <task_description>
            {self.task_description}
            </task_description>

            <task_expected_output>
            {self.task_expected_output}
            </task_expected_output>

            <context>
            {self.context.strip()}
            </context>

            Your response:
            """
        ).strip()
        return prompt

    def dummy_response(self):
        if self.name == "Poet Agent":
            return "A gentle breeze through silent trees."
        elif self.name == "Translator Agent":
            return "Una brisa suave entre árboles silenciosos."
        elif self.name == "Writer Agent":
            return "Poem saved to file: poem.txt"
        else:
            return f"{self.name} responding to: {self.task_description}"

    def run(self):
        msg = self.create_prompt()
        output = self.dummy_response()
        print(f"{self.name} Output:\n{output}\n")

        for dependent in self.dependents:
            dependent.receive_context(output)
        return output



In [56]:

# Example usage
agent_1 = Agent("Poet Agent", task_description="Write a poem about nature")
agent_2 = Agent("Translator Agent", task_description="Translate the poem into Spanish")
agent_3 = Agent("Writer Agent", task_description="Save the translated poem to a file")



In [57]:
agent_1 >> agent_2 >> agent_3

Writer Agent

In [58]:
print("agent_1 context: ", agent_1.context)
agent_1.run()
print("agent_2 context: ", agent_2.context)
agent_2.run()
print("agent_3 context: ", agent_3.context)
agent_3.run()


agent_1 context:  
Poet Agent Output:
A gentle breeze through silent trees.

agent_2 context:  A gentle breeze through silent trees.

Translator Agent Output:
Una brisa suave entre árboles silenciosos.

agent_3 context:  Una brisa suave entre árboles silenciosos.

Writer Agent Output:
Poem saved to file: poem.txt



'Poem saved to file: poem.txt'