In [8]:
%pip install pydantic-ai python-dotenv nest_asyncio
from IPython.display import clear_output ; clear_output()

from dotenv import load_dotenv ; load_dotenv()
import nest_asyncio ; nest_asyncio.apply()

In [9]:
import asyncio
from typing import List, Dict
from pprint import pprint, pformat

from pydantic import BaseModel, Field
from pydantic_ai import Agent

AI_MODEL = 'openai:gpt-4o'

In [10]:
class Task(BaseModel):
    type: str = Field(..., description=(
        'The type of task. '
        'For example: "formal", "conversational", "hybrid", ...'))
    description: str = Field(..., description='Clear description for executing this task.')


class OrchestratorResponse(BaseModel):
    analysis: str = Field(..., description=(
        'Explain your understanding of the task and which variations would be valuable. '
        'Focus on how each approach serves different aspects of the task.'
    ))
    tasks: List[Task] = Field(..., description="List of tasks")


async def orchestrate(task: str) -> Dict:
    """Process task by breaking it down and running subtasks in parallel."""

    orchestrator_response = await Agent(
        AI_MODEL,
        system_prompt='Analyze this task and break it down into 2-3 distinct approaches.',
        result_type=OrchestratorResponse,
    ).run(task)
    
    # Parse orchestrator response
    analysis = orchestrator_response.data.analysis
    tasks = orchestrator_response.data.tasks
    
    print("\n=== ORCHESTRATOR OUTPUT ===")
    print(f"\nANALYSIS:\n{analysis}")
    print(f"\nTASKS:")
    for task in tasks:
        pprint(task.model_dump())
    
    # Step 2: Process all the tasks in parallel and collect results
    worker_agent = Agent(AI_MODEL, system_prompt='Generate content based on the task specification.')
    worker_responses = await asyncio.gather(*[
        worker_agent.run(pformat({'original_task': task} | task_info.model_dump()))
        for task_info in tasks
    ])
    worker_results = [
        {
            'type': task.type,
            'description': task.description,
            'result': response.data,
        }
        for task, response in zip(tasks, worker_responses)
    ]

    for worker_result in worker_results:
        print(f"\n=== WORKER RESULT ({worker_result['type']}) ===\n{worker_result['result']}\n")
    
    return {
        "analysis": analysis,
        "worker_results": worker_results,
    }

In [11]:
results = await orchestrate('Write a product description for a new eco-friendly water bottle.')


=== ORCHESTRATOR OUTPUT ===

ANALYSIS:
Writing a product description for an eco-friendly water bottle can take various approaches, each appealing to different customer preferences and marketing strategies. The task could focus on sustainability attributes, design and usability features, or a balanced narrative combining both aspects. Each approach highlights unique selling points that may cater to diverse audiences looking for eco-friendly products.

TASKS:
{'description': 'Create a product description that emphasizes the eco-friendly '
                'aspects of the water bottle, such as the sustainable '
                'materials used, environmental impact, and benefits of '
                'reducing plastic waste.',
 'type': 'sustainability-focused'}
{'description': 'Develop a product description that highlights the design and '
                'usability features of the water bottle, including ergonomic '
                'design, color options, durability, and functionality.',
 