In [1]:
from dotenv import load_dotenv
load_dotenv()

True

import dspy
import instructor
wrkr = dspy.OpenAI(model='gpt-3.5-turbo', max_tokens=1000, api_base='http://0.0.0.0:4000', model_type="chat")
bss = dspy.OpenAI(model='gpt-4-turbo', max_tokens=1000, api_base='http://0.0.0.0:4000', model_type="chat")
dspy.configure(lm=wrkr) # our default

from typing import List, Any, Callable, Optional
from pydantic import BaseModel

class Plan(dspy.Signature):
    """Produce a step by step plan to perform the task. 
The plan needs to be in markdown format and should be broken down into big steps (with ## headings) and sub-steps beneath those.
When thinking about your plan, be sure to think about the tools at your disposal and include them in your plan.
    """
    task = dspy.InputField(prefix="Task", desc="The task")
    context = dspy.InputField(format=str, desc="The context around the plan")
    proposed_plan = dspy.OutputField(desc="The proposed, step by step execution plan.")

class Worker(dspy.Module):
    def __init__(self, role:str, tools:List):
        self.role = role
        self.tools = tools
        self.tool_descriptions = "\n".join([f"- {t.name}: {t.description}. To use this tool please provide: `{t.requires}`" for t in tools])
        self.plan = dspy.ChainOfThought(Plan)
    def forward(self, task:str):
        context = f"{self.role}\n{self.tool_descriptions}"
        input_args = dict(
            context = context,
            task = task
        ) # just did args for printing for debugging
        result = self.plan(**input_args)
        print(result.proposed_plan)

class Tool(BaseModel):
    name: str
    description: str
    requires: str
    func: Callable

test_tools = [
    Tool(name="phone", description="a way of making phone calls", requires="phone_number", func=lambda x: "they've got time"),
    Tool(name="local business lookup", description="Look up businesses by category", requires="business category", func=lambda x: "Bills landscaping: 415-555-5555")
]

with dspy.context(lm=wrkr):
    Worker("assistant", test_tools).forward("get this yard cleaned up.")
# you'll see output you might expect.

class Worker2(dspy.Module):
    def __init__(self, role:str, tools:List):
        self.role = role
        self.tools = dict([(t.name, t) for t in tools])
        self.tool_descriptions = "\n".join([f"- {t.name}: {t.description}. To use this tool please provide: `{t.requires}`" for t in tools])
        self._plan = dspy.ChainOfThought(Plan)
        self._tool = dspy.ChainOfThought("task, context -> tool_name, tool_argument")
        
        print(self.tool_descriptions)
    def plan(self, task:str, feedback:Optional[str]=None):
        context = f"Your role:{self.role}\n Tools at your disposal:\n{self.tool_descriptions}"
        if feedback:
            context += f"\nPrevious feedback on your prior plan {feedback}"
        input_args = dict(
            task=task,
            context=context
        )    
        result = self._plan(**input_args)
        return result.proposed_plan
    def execute(self, task:str, use_tool:bool):
        print(f"executing {task}")
        if not use_tool:
            return f"{task} completed successfully"
            
        res = self._tool(task=task, context=self.tool_descriptions)
        t = res.tool_name
        arg = res.tool_argument
        if t in self.tools:
            complete = self.tools[t].func(arg)
            return complete
        return "Not done"

email_tool = Tool(
    name="email",
    description="Send and receive emails",
    requires="email_address",
    func=lambda x: f"Email sent to {x}"
)
schedule_meeting_tool = Tool(
    name="schedule meeting",
    description="Schedule meetings",
    requires="meeting_details",
    func=lambda x: f"Meeting scheduled on {x}"
)

cleaning_supplies_tool = Tool(
    name="cleaning supplies",
    description="List of cleaning supplies needed",
    requires="cleaning_area",
    func=lambda x: f"Need supplies for {x}"
)
maintenance_report_tool = Tool(
    name="maintenance report",
    description="Report maintenance issues",
    requires="issue_description",
    func=lambda x: f"There's too much work for one person. I need help!"
)

code_compiler_tool = Tool(
    name="code compiler",
    description="Compile code",
    requires="source_code",
    func=lambda x: "Code compiled successfully"
)
bug_tracker_tool = Tool(
    name="bug tracker",
    description="Track and report bugs",
    requires="bug_details",
    func=lambda x: f"Bug reported: {x}"
)

recipe_lookup_tool = Tool(
    name="recipe lookup",
    description="Look up recipes",
    requires="dish_name",
    func=lambda x: f"Recipe for {x} found"
)
kitchen_inventory_tool = Tool(
    name="kitchen inventory",
    description="Check kitchen inventory",
    requires="ingredient",
    func=lambda x: f"Inventory checked for {x}"
)

workers = [
    Worker2("assistant", [email_tool, schedule_meeting_tool]),
    Worker2("janitor", [cleaning_supplies_tool, maintenance_report_tool]),
    Worker2("software engineer", [code_compiler_tool, bug_tracker_tool]),
    Worker2("cook", [recipe_lookup_tool, kitchen_inventory_tool])
]

from pydantic import Field
import instructor
from openai import OpenAI
_client = instructor.from_openai(OpenAI(base_url="http://0.0.0.0:4000/"))
class SubTask(BaseModel):
    action:str
    assignee: str
    requires_tool: bool = Field(..., description="Does this require the use of a specific tool?")
                               
class Task(BaseModel):
    sub_tasks:List[SubTask]
    
class ParsedPlan(BaseModel):
    tasks: List[Task]
def get_plan(plan:str, context:str):
    return _client.chat.completions.create(
        response_model=ParsedPlan,
        model="gpt-3.5-turbo",
        messages=[
            dict(role="system", content="You help parse markdown into a structured format."),
            dict(role="user", content=f"Here is the context about the plan including the available tools: \n{context} \n\n The plan: \n\n {plan}")
        ],
    )

class Boss(dspy.Module):
    def __init__(self, base_context:str, direct_reports=List, lm=bss):
        self.base_context = base_context
        self._plan = dspy.ChainOfThought("task, context -> assignee")
        self._approve = dspy.ChainOfThought("task, context -> approve")
        self._critique = dspy.ChainOfThought("task, context -> critique")
        self.reports = dict((d.role,d) for d in direct_reports)
        self.lm = lm
        report_capabilities = []
        for r in direct_reports:
            report_capabilities.append(f"{r.role} has the follow tools:\n{r.tool_descriptions}")
        self.report_capabilities = "\n".join(report_capabilities) 
        print(self.report_capabilities)
    # The `critique` method allows the Boss to provide feedback on a proposed plan. It takes the task, the proposed plan, and an optional extra context as input, and uses the `_critique` chain of thought to generate a critique.
    def critique(self, task:str, plan:str, extra_context:Optional[str]=None):
        context=self.base_context
        if extra_context:
            context += "\n"
            context += extra_context
        
        crit_args = dict(
            context=context,
            task=task,
            proposed_plan=plan)
        with dspy.context(lm=self.lm):
            result = self._critique(**crit_args)
        return result.critique
    # The `approve` method allows the Boss to approve a proposed plan. It takes the task, the proposed plan, and an optional extra context as input, and uses the `_approve` chain of thought to generate an approval decision.
    def approve(self, task:str, plan:str, extra_context:Optional[str]=None):
        context=self.base_context + "\n You only approve plans after 2 iterations"
        if extra_context:
            context += "\n"
            context += extra_context
        
        approval_args = dict(
            context=context,
            task=task,
            proposed_plan=plan)
        with dspy.context(lm=self.lm):
            result = self._approve(**approval_args)
        return result.approve        
    # The `plan` method is the core of the Boss class. It takes a task as input and uses the `_plan` chain of thought to determine which of the direct reports should be assigned to the task. The method then iterates through the assignment process, providing feedback and critiques until a suitable assignee is found. Once the assignee is determined, the Boss calls the `plan` method of the assigned worker to generate a plan for the task. The Boss then approves the plan, providing critiques and feedback as necessary, until the plan is approved.
    def plan(self, task:str):
        # note: this function is a mess, don't judge me
        # I haven't built an agent framework before, so I'm not sure of the ergonomics
        # and best approach
        context=self.base_context + f"Here are your direct report capabilities: {self.report_capabilities}"
        
        plan_args = dict(
            context = context,
            task=f"Which person should take on the following task: {task}"
        )
        assignee = self._plan(**plan_args).assignee
        is_assigned = assignee.lower() in self.reports
        report = None
        print("assigning")
        for x in range(3):
            if is_assigned:
                report = self.reports[assignee]
            else:
                context += f"\n\n you tried to assign to {assignee} but that's not a valid one. Think carefully and assign the proper report"
                plan_args = dict(
                    context = context,
                    task=f"Which person should take on the following task: {task}"
                )
                assignee = self._plan(**plan_args).assignee
        assert report, "Failed to assign"
        print("assigning complete")
        print("planning")
        reports_plan = report.plan(task)
        with dspy.context(lm=self.lm):
            approval = self.approve(task, reports_plan)
            is_approved = "yes" in approval.lower() and "no" not in approval.lower()
        
        for x in range(2): # I created cycles to simulate failures, this might be a while loop in production
            print(f"Cycle {x}: {approval}")
            if is_approved:
                break
            feedback = self.critique(task, reports_plan)
            feedback = f"Prior plan: {reports_plan}\n Boss's Feedback: {feedback}"
            print(feedback)
            reports_plan = report.plan(task, feedback)
            print("new plan===>")
            print(reports_plan)
            complete = f"{feedback}\n\nNew plan:\n\n{reports_plan}"
            approval = self.approve(task, reports_plan)
            is_approved = "yes" in approval.lower()
        print("Now working")
        parsed_plan = get_plan(reports_plan, f"The assignee is: {assignee}. The rest of the team is: {self.report_capabilities}")
        results = []
        for task in parsed_plan.tasks:
            for sub_task in task.sub_tasks:    
                task_result = self.reports[sub_task.assignee].execute(sub_task.action, sub_task.requires_tool)
                results.append(f"\n{task_result}: {sub_task.action}\n")
        print("end result")
        print("\n".join(results))

b = Boss("You are a boss that manages a team of people, you're responsible for them doing well and completing the tasks you are given.", workers)


b.plan("clean up the yard")
