#### Orchestrator and Worker - Agentic Design Pattern

In [None]:
import os

from llama_stack_client import LlamaStackClient, Agent, AgentEventLogger
from termcolor import colored
from dotenv import load_dotenv
from pydantic import BaseModel

import json
from rich.pretty import pprint
from uuid import uuid4

In [None]:
load_dotenv()

In [None]:
host = os.environ["TOGETHER_URL"]
together_api_key = os.environ["TOGETHER_API_KEY"]

client = LlamaStackClient(
    base_url=host,
    provider_data={
        "tavily_search_api_key": os.getenv("TAVILY_SEARCH_API_KEY"),
        "together_api_key": together_api_key,
    },
)

In [None]:
MODEL_ID = "meta-llama/Llama-3.3-70B-Instruct"

base_agent_config = dict(
    model=MODEL_ID,
    instructions="You are a helpful assistant.",
    sampling_params={
        "strategy": {"type": "top_p", "temperature": 1.0, "top_p": 0.9},
    },
)

In [None]:
from typing import List, Dict

class OrchestratorOutputSchema(BaseModel):
    analysis: str
    tasks: List[Dict[str, str]]


In [None]:
orchestrator_agent_config = {
    **base_agent_config,
    "instructions": """Your job is to analyize the task provided by the user and break it down into 2-3 distinct approaches:

    Return your response in the following JSON format:
    {{
        "analysis": "<Your understanding of the task and which variations would be valuable. Focus on how each approach serves different aspects of the task.>",
        "tasks": [
            {{
                "type": "formal",
                "description": "Write a precise, technical version that emphasizes specifications"
            }},
            {{
                "type": "conversational",
                "description": "Write an engaging, friendly version that connects with readers"
            }}
        ]
    }}
    """,
    "response_format": {
        "type": "json_schema",
        "json_schema": OrchestratorOutputSchema.model_json_schema()
    }
}

worker_agent_config = {
    **base_agent_config,
    "instructions": """You will be given a task guideline. Generate content based on the provided
    task, following the style and guideline descriptions. 

    Return your response in this format:

    Response: Your content here, maintaining the specified style and fully addressing requirements.
    """,
}


In [None]:
def orchestrator_worker_workflow(task, context):
    # single orchestrator agent
    orchestrator_agent = Agent(client, **orchestrator_agent_config)
    orchestrator_session_id = orchestrator_agent.create_session(session_name=f"orchestrator_agent_{uuid4()}")

    orchestrator_response = orchestrator_agent.create_turn(
        messages=[{"role": "user", "content": f"Your task is to {task}. Here is some context: {context}"}],
        stream=False,
        session_id=orchestrator_session_id,
    )

    orchestrator_result = json.loads(orchestrator_response.output_message.content)
    pprint(f"[bold cyan] Orchestrator Analysis: [/bold cyan]")
    pprint(orchestrator_result)

    workers = {}
    # spawn multiple worker agents
    for task in orchestrator_result["tasks"]:
        worker_agent = Agent(client, **worker_agent_config)
        worker_session_id = worker_agent.create_session(session_name=f"worker_agent_{uuid4()}")
        workers[task["type"]] = worker_agent
    
        worker_response = worker_agent.create_turn(
            messages=[{"role": "user", "content": f"Your task is to {task['description']}."}],
            stream=False,
            session_id=worker_session_id,
        )
        
        pprint(f"[bold yellow] >>> Worker {task['type']} <<< [/bold yellow]")
        pprint(worker_response.output_message.content)
    
    return orchestrator_agent, workers

In [None]:
orchestrator_agent, workers = orchestrator_worker_workflow(
    task="Write a product description for a new eco-friendly water bottle",
    context={
        "target_audience": "environmentally conscious millennials",
        "key_features": ["plastic-free", "insulated", "lifetime warranty"]
    }
)

In [None]:
orchestrator_session = client.agents.session.retrieve(
    session_id=orchestrator_agent.session_id, agent_id=orchestrator_agent.agent_id)

pprint(orchestrator_session.to_dict())

for worker_type, worker in workers.items():
    worker_session = client.agents.session.retrieve(
        session_id=worker.session_id, agent_id=worker.agent_id)
    print(f"Worker {worker_type} Session:")
    pprint(worker_session.to_dict())