# Orchestrator Subtask Agent Workflow
Author: [Zain Hasan](https://x.com/ZainHasan6)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/togethercomputer/together-cookbook/blob/main/Parallel_Subtask_Agent_Workflow.ipynb)

## Introduction

In this notebook we'll demonstrate how you can create an agent workflow that will break up original tasks into multiple subtasks, these subtasks are then completed using parallel LLM calls and then these answers are directly provided as output or synthesized into a singular response.

This strategy is similar to the parallel execution of LLMs proposed in the [Mixture of Agents paper](https://arxiv.org/abs/2406.04692) however in this setup we have a dedicated orchestrator LLM that breaks a task up into smaller subtasks and then each of the parallel worker LLMs execute a different aspect of the main task - the prompt going to the worker LLMs might be different and the LLMs themselves might also be different for each subtask.

For example given a coding task we might want to break it up into planning, coding and testing steps. For each step we would use different prompts and even different LLMs.

## Orchestrator Subtask Workflow

<img src="../images/parallel_different.png" width="700">

In this parallel **orchestrator subtask workflow** we demonstrate how you can create an agent system that intelligently decomposes complex tasks into smaller, specialized components. 

The workflow begins with an orchestrator LLM that analyzes the main task and strategically breaks it down into distinct subtasks, which are then executed simultaneously by different worker LLMs. 

Unlike traditional parallel LLM approaches where multiple models work on the same task, this system assigns each worker LLM to a unique aspect of the overall problem, with customized prompts and potentially different model architectures suited to their specific subtask.

For our specific usecase, the workflow operates as follows:
1. The orchestrator LLM analyzes the main task and breaks it into distinct, parallel subtasks
2. Each subtask is assigned to an appropriate worker LLM with specialized prompts
3. The results are either presented individually or synthesized into a unified response

Now lets see the coded up implementation of this workflow.


## Setup and Utils

In [None]:
# Install libraries
!pip install -qU pydantic together

In [None]:
# Import libraries
import json
import asyncio
import together
from together import AsyncTogether, Together

from typing import Any, Optional, Dict, List, Literal
from pydantic import Field, BaseModel, ValidationError

TOGETHER_API_KEY = "--Your API Key--"

client = Together(api_key= TOGETHER_API_KEY)
async_client = AsyncTogether(api_key= TOGETHER_API_KEY)

In [6]:
# Simple LLM call helper function
def run_llm(user_prompt : str, model : str, system_prompt : Optional[str] = None):
    """ Run the language model with the given user prompt and system prompt. """
    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    
    messages.append({"role": "user", "content": user_prompt})
    
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
        max_tokens=4000,        
    )

    return response.choices[0].message.content

# Simple JSON mode LLM call helper function
def JSON_llm(user_prompt : str, schema : BaseModel, system_prompt : Optional[str] = None):
    """ Run a language model with the given user prompt and system prompt, and return a structured JSON object. """
    try:
        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        messages.append({"role": "user", "content": user_prompt})
        
        extract = client.chat.completions.create(
            messages=messages,
            model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
            response_format={
                "type": "json_object",
                "schema": schema.model_json_schema(),
            },
        )
        
        response = json.loads(extract.choices[0].message.content)
        return response
        
    except ValidationError as e:
        raise ValueError(f"Schema validation failed: {str(e)}")

### Orchestrator Agent Workflow

In [9]:
# The function below will call the reference LLMs in parallel
async def run_llm_parallel(user_prompt : str, model : str, system_prompt : str = None):
    """Run parallel LLM call with a reference model."""
    for sleep_time in [1, 2, 4]:
        try:
            messages = []
            if system_prompt:
                messages.append({"role": "system", "content": system_prompt})
    
            messages.append({"role": "user", "content": user_prompt})

            response = await async_client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=0.7,
                max_tokens=2000,
            )
            break
        except together.error.RateLimitError as e:
            print(e)
            await asyncio.sleep(sleep_time)
    return response.choices[0].message.content

Now we will implement the workflow that uses this parallel LLM calling function.

In [14]:
from pydantic import BaseModel, Field
from typing import Literal, List

ORCHESTRATOR_PROMPT = """
Analyze this task and break it down into 2-3 distinct approaches:

Task: {task}

Provide an Analysis:

Explain your understanding of the task and which variations would be valuable.
Focus on how each approach serves different aspects of the task.

Along with the analysis, provide 2-3 approaches to tackle the task, each with a brief description:

Planning for the task: Write a detailed plan to execute on the task without actually solving the task
Solving the task: Write a technical solution for the task provided
Tests for the task: Write a potential test plan for a solution to the task provided, do not solve the task

Return only JSON output.
"""

WORKER_PROMPT = """
Generate content based on:
Task: {original_task}
Style: {task_type}
Guidelines: {task_description}

Return only your response:
[Your content here, maintaining the specified task and fully addressing requirements.]
"""

task = """Write a program that prints the next 20 leap years.
"""

In [16]:
class Task(BaseModel):
    type: Literal["plan", "code/solve", "test"]
    description: str

class TaskList(BaseModel):
    analysis: str
    tasks: List[Task]  = Field(..., default_factory=list)

async def orchestrator_workflow(task : str, orchestrator_prompt : str, worker_prompt : str): 
    """Use a orchestrator model to break down a task into sub-tasks and then use worker models to generate and return responses."""

    # Use orchestrator model to break the task up into sub-tasks
    orchestrator_response = JSON_llm(orchestrator_prompt.format(task=task), schema=TaskList)
 
    # Parse orchestrator response
    analysis = orchestrator_response["analysis"]
    tasks= orchestrator_response["tasks"]

    print("\n=== ORCHESTRATOR OUTPUT ===")
    print(f"\nANALYSIS:\n{analysis}")
    print(f"\nTASKS:\n{json.dumps(tasks, indent=2)}")

    worker_model =  ["Qwen/Qwen2.5-Coder-32B-Instruct"]*len(tasks)

    # Gather intermediate responses from worker models
    return tasks , await asyncio.gather(*[run_llm_parallel(user_prompt=worker_prompt.format(original_task=task, task_type=task_info['type'], task_description=task_info['description']), model=model) for task_info, model in zip(tasks,worker_model)])

In [17]:
task = """Write a program that prints the next 20 leap years.
"""

tasks, worker_resp = await orchestrator_workflow(task, orchestrator_prompt=ORCHESTRATOR_PROMPT, worker_prompt=WORKER_PROMPT)


=== ORCHESTRATOR OUTPUT ===

ANALYSIS:
The task of writing a program that prints the next 20 leap years can be broken down into distinct approaches, each serving a different aspect of the task. These approaches include planning, solving, and testing. The planning approach involves outlining the steps required to execute the task, the solving approach involves writing a technical solution to the task, and the testing approach involves verifying that the solution works as expected. Valuable variations of this task could include handling different date ranges, handling different cultures or calendar systems, or optimizing the solution for performance.

TASKS:
[
  {
    "type": "plan",
    "description": "Write a detailed plan to execute on the task"
  },
  {
    "type": "code/solve",
    "description": "Write a technical solution for the task provided"
  },
  {
    "type": "test",
    "description": "Write a potential test plan for a solution to the task provided"
  }
]


In [18]:
for task_info, response in zip(tasks, worker_resp):
    print(f"\n=== WORKER RESULT ({task_info['type']}) ===\n{response}\n")


=== WORKER RESULT (plan) ===
### Plan to Write a Program That Prints the Next 20 Leap Years

#### 1. Problem Understanding
- **Objective**: Develop a program that outputs the next 20 leap years from the current year.
- **Leap Year Criteria**:
  - A year is a leap year if it is divisible by 4.
  - However, if the year is divisible by 100, it is not a leap year, unless:
  - The year is also divisible by 400, in which case it is a leap year.

#### 2. Environment Setup
- **Programming Language**: Choose Python due to its simplicity and readability.
- **Development Environment**: Set up a Python development environment, such as using an IDE like PyCharm or a simple text editor with a Python interpreter.

#### 3. Algorithm Development
- **Step 1**: Determine the current year using the `datetime` module.
- **Step 2**: Initialize a counter to keep track of the number of leap years found.
- **Step 3**: Create a loop that checks each subsequent year to see if it is a leap year.
- **Step 4**: Fo

Synthesize a final answer

In [24]:
SYNTHESIZER_PROMPT = """Given the following task: {task} and the following responses below, that each address different aspects of the 
task, synthesize a final response.

{worker_responses}
"""
concatenated_responses = " ".join([f"\n=== WORKER RESULT ({task_info['type']}) ===\n{response}\n" for task_info, response in zip(tasks, worker_resp)])

final_answer = run_llm(SYNTHESIZER_PROMPT.format(task=task, worker_responses=concatenated_responses), model="meta-llama/Llama-3.3-70B-Instruct-Turbo")

In [25]:
print(final_answer)

### Final Response: Program to Print the Next 20 Leap Years

To develop a program that prints the next 20 leap years from the current year, we have outlined a comprehensive plan, implemented the solution in Python, and devised a thorough test plan. Below is a synthesized version of the key aspects:

#### Program Overview

The program is designed to identify and print the next 20 leap years, starting from the current year or any specified year. A leap year is defined as a year that is divisible by 4, except for years that are divisible by 100 but not by 400.

#### Implementation

The solution is implemented in Python, utilizing its simplicity and readability. The code consists of two primary functions:

1. **`is_leap_year(year)`**: This function determines whether a given year is a leap year based on the leap year criteria.
2. **`find_next_leap_years(start_year, count)`**: This function finds the next 'count' leap years starting from 'start_year'. It iterates through the years, checks e