# Lesson 3: Basic Workflow Ingredients: Chaining, Routing, Parallelization, and Orchestration

This Jupyter notebook demonstrates AI agent workflow patterns using Google Gemini, focusing on chaining, routing, and parallelization strategies.

The sections are:

1. Single LLM Call Problems - Demonstrates issues with complex prompts that try to do everything at once.

2. Sequential Workflows - Breaking tasks into steps (generate questions → answer questions → find sources) for better consistency.

3. Parallel Workflows - Running tasks in parallel (answering questions in parallel) for higher speed.

4. Routing Workflows - Classifying user intent and routing to specialized handlers (technical support, billing, general questions).

5. Orchestrator-Worker Pattern - A system where an orchestrator breaks complex queries into subtasks, specialized workers handle each task, and a synthesizer combines results into a cohesive response

The examples include FAQ generation from multiple sources and a customer service system handling billing inquiries, product returns, and order status updates simultaneously.

## Setup: Installing Required Dependencies

First, we need to install the Google Generative AI library to interact with Gemini models:

In [1]:
%pip install -q google-genai

## Configuring the Gemini Client

Next, we import the necessary libraries and configure the Gemini client. The API key is retrieved from Google Colab's userdata.


In [2]:
import os
import json
import asyncio
import random
from pydantic import BaseModel, RootModel, Field
from typing import List, Optional
import time
from enum import Enum
from google import genai


# Initialize the Gemini client
# The client uses the GOOGLE_API_KEY from the environment
try:
    from google.colab import userdata
    GOOGLE_API_KEY = userdata.get("GOOGLE_API_KEY")
except ImportError:
    GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")

if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY not found. Set it in your environment or Colab userdata.")

# Create Gemini client
client = genai.Client(api_key=GOOGLE_API_KEY)
print("Gemini client initialized successfully.")

Gemini client initialized successfully.


# The problem with a single, large LLM call

## Setting Up Mock Data

We'll create three mock webpages about renewable energy topics that will serve as our source content for the FAQ generation examples. Each webpage has a title and detailed content about solar energy, wind turbines, and energy storage:


In [3]:
# Here we define our mock webpage content. Each source has a title and text.

webpage_1 = {
    "title": "The Benefits of Solar Energy",
    "content": """
    Solar energy is a renewable powerhouse, offering numerous environmental and economic benefits.
    By converting sunlight into electricity through photovoltaic (PV) panels, it reduces reliance on fossil fuels,
    thereby cutting down greenhouse gas emissions. Homeowners who install solar panels can significantly
    lower their monthly electricity bills, and in some cases, sell excess power back to the grid.
    While the initial installation cost can be high, government incentives and long-term savings make
    it a financially viable option for many. Solar power is also a key component in achieving energy
    independence for nations worldwide.
    """,
}

webpage_2 = {
    "title": "Understanding Wind Turbines",
    "content": """
    Wind turbines are towering structures that capture kinetic energy from the wind and convert it into
    electrical power. They are a critical part of the global shift towards sustainable energy.
    Turbines can be installed both onshore and offshore, with offshore wind farms generally producing more
    consistent power due to stronger, more reliable winds. The main challenge for wind energy is its
    intermittency—it only generates power when the wind blows. This necessitates the use of energy
    storage solutions, like large-scale batteries, to ensure a steady supply of electricity.
    """,
}

webpage_3 = {
    "title": "Energy Storage Solutions",
    "content": """
    Effective energy storage is the key to unlocking the full potential of renewable sources like solar
    and wind. Because these sources are intermittent, storing excess energy when it's plentiful and
    releasing it when it's needed is crucial for a stable power grid. The most common form of
    large-scale storage is pumped-hydro storage, but battery technologies, particularly lithium-ion,
    are rapidly becoming more affordable and widespread. These batteries can be used in homes, businesses,
    and at the utility scale to balance energy supply and demand, making our energy system more
    resilient and reliable.
    """,
}

all_sources = [webpage_1, webpage_2, webpage_3]

# We'll combine the content for the LLM to process
combined_content = "\n\n".join(
    [f"Source Title: {source['title']}\nContent: {source['content']}" for source in all_sources]
)

## Example: Complex Single LLM Call

This example demonstrates the problem with trying to do everything in one complex prompt. We're asking the LLM to generate questions, find answers, and cite sources all in a single call, which can lead to inconsistent results:


In [4]:
# This prompt tries to do everything at once: generate questions, find answers,
# and cite sources. This complexity can often confuse the model.
n_questions = 10
prompt_complex = f"""
Based on the provided content from three webpages, generate a list of exactly {n_questions} frequently asked questions (FAQs).
For each question, provide a concise answer derived ONLY from the text.
After each answer, you MUST include a list of the 'Source Title's that were used to formulate that answer.

Your final output should be a JSON array where each object has three keys: "question", "answer", and "sources" (which is an array of strings).

<provided_content>
{combined_content}
</provided_content>
""".strip()

# Pydantic classes for structured outputs
class FAQ(BaseModel):
    question: str
    answer: str
    sources: List[str]

class FAQList(BaseModel):
    faqs: List[FAQ]

# Generate FAQs
response_complex = client.models.generate_content(
    model="gemini-2.5-flash",
    contents=prompt_complex,
    config={
        "response_mime_type": "application/json",
        "response_schema": FAQList
    },
)
result_complex = response_complex.parsed

print("Complex prompt result (might be inconsistent):")
print(result_complex.model_dump_json(indent=2))

Complex prompt result (might be inconsistent):
{
  "faqs": [
    {
      "question": "How does solar energy work?",
      "answer": "Solar energy converts sunlight into electricity through photovoltaic (PV) panels.",
      "sources": [
        "The Benefits of Solar Energy"
      ]
    },
    {
      "question": "What are the environmental advantages of using solar energy?",
      "answer": "Solar energy reduces reliance on fossil fuels and cuts down greenhouse gas emissions.",
      "sources": [
        "The Benefits of Solar Energy"
      ]
    },
    {
      "question": "What economic benefits can homeowners gain from installing solar panels?",
      "answer": "Homeowners can significantly lower their monthly electricity bills and potentially sell excess power back to the grid.",
      "sources": [
        "The Benefits of Solar Energy"
      ]
    },
    {
      "question": "What is the primary function of wind turbines?",
      "answer": "Wind turbines capture kinetic energy from 

# Building a sequential workflow

Now, let's split the complex prompt above into a chain of simpler prompts.

## Question Generation Function

Let's create a function to generate questions from the content. This step focuses solely on creating relevant questions based on the provided material:


In [5]:
class QuestionList(BaseModel):
    questions: List[str]

prompt_generate_questions = """
Based on the content below, generate a list of {n_questions} relevant and distinct questions that a user might have.
Return these questions as a JSON array of strings.

<provided_content>
{combined_content}
</provided_content>
""".strip()

def generate_questions(content, n_questions=10):
    """
    Generate a list of questions based on the provided content.

    Args:
        content: The combined content from all sources

    Returns:
        list: A list of generated questions
    """
    response_questions = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt_generate_questions.format(n_questions=n_questions, combined_content=content),
        config={
            "response_mime_type": "application/json",
            "response_schema": QuestionList
        }
    )

    return response_questions.parsed.questions

# Test the question generation function
questions = generate_questions(combined_content, n_questions=10)
print(f"Successfully generated {len(questions)} questions.")
print(f"\nFirst few questions:")
for i, q in enumerate(questions[:3]):
    print(f"{i+1}. {q}")

Successfully generated 10 questions.

First few questions:
1. What are the environmental and economic advantages of solar energy?
2. How do photovoltaic (PV) panels generate electricity from sunlight?
3. What are the financial considerations for homeowners interested in solar panels, including initial costs and potential savings?


## Answer Generation Function

Next, we create a function to generate answers for individual questions using only the provided content:

In [6]:
prompt_answer_question = """
Using ONLY the provided content below, answer the following question.
The answer should be concise and directly address the question.

Question:
"{question}"

Provided Content:
---
{combined_content}
---
""".strip()

def answer_question(question, content):
    """
    Generate an answer for a specific question using only the provided content.

    Args:
        question: The question to answer
        content: The combined content from all sources

    Returns:
        str: The generated answer
    """
    answer_response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt_answer_question.format(question=question, combined_content=content),
    )
    return answer_response.text

# Test the answer generation function
test_question = questions[0]
test_answer = answer_question(test_question, combined_content)
print(f"Question: {test_question}")
print(f"Answer: {test_answer}")

Question: What are the environmental and economic advantages of solar energy?
Answer: Environmental advantages of solar energy include reduced reliance on fossil fuels and a decrease in greenhouse gas emissions. Economic advantages include significantly lower monthly electricity bills for homeowners, the ability to sell excess power back to the grid, long-term savings, and contributing to energy independence for nations.


## Source Finding Function

Finally, we create a function to identify which sources were used to generate an answer:


In [7]:
class SourceList(BaseModel):
    sources: List[str]

prompt_find_sources = """
You will be given a question and an answer that was generated from a set of documents.
Your task is to identify which of the original documents were used to create the answer.
Return a JSON object with a single key "sources" which is a list of the titles of the relevant documents.

Question: "{question}"
Answer: "{answer}"

<provided_content>
{combined_content}
</provided_content>
""".strip()

def find_sources(question, answer, content):
    """
    Identify which sources were used to generate an answer.

    Args:
        question: The original question
        answer: The generated answer
        content: The combined content from all sources

    Returns:
        list: A list of source titles that were used
    """
    sources_response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt_find_sources.format(question=question, answer=answer, combined_content=content),
        config={
            "response_mime_type": "application/json",
            "response_schema": SourceList
        }
    )
    return sources_response.parsed.sources

# Test the source finding function
test_sources = find_sources(test_question, test_answer, combined_content)
print(f"Question: {test_question}")
print(f"Answer: {test_answer}")
print(f"Sources: {test_sources}")

Question: What are the environmental and economic advantages of solar energy?
Answer: Environmental advantages of solar energy include reduced reliance on fossil fuels and a decrease in greenhouse gas emissions. Economic advantages include significantly lower monthly electricity bills for homeowners, the ability to sell excess power back to the grid, long-term savings, and contributing to energy independence for nations.
Sources: ['The Benefits of Solar Energy']


## Executing the Sequential Workflow

Now we combine all three functions into a sequential workflow: Generate Questions → Answer Questions → Find Sources. Each step is executed one after another for each question. Notice how much time it takes to run the full workflow.


In [8]:
def sequential_workflow(content, n_questions=10):
    """
    Execute the complete sequential workflow for FAQ generation.

    Args:
        content: The combined content from all sources

    Returns:
        list: A list of FAQs with questions, answers, and sources
    """
    # Generate questions
    questions = generate_questions(content, n_questions)

    # Answer and find sources for each question sequentially
    final_faqs = []
    for question in questions:
        print(f"  - Processing: '{question[:50]}...'")

        # Generate an answer for the current question
        answer = answer_question(question, content)

        # Identify the sources for the generated answer
        sources = find_sources(question, answer, content)

        final_faqs.append({"question": question, "answer": answer, "sources": sources})

    return final_faqs

# Execute the sequential workflow (measure time for comparison)
start_time = time.time()
sequential_faqs = sequential_workflow(combined_content, n_questions=4)
end_time = time.time()
print(f"\nSequential processing completed in {end_time - start_time:.2f} seconds")

# Display the final result
print("\nGenerated FAQ List (Sequential):")
print(json.dumps(sequential_faqs, indent=2))

  - Processing: 'What are the primary economic and environmental be...'
  - Processing: 'What are the main challenges associated with wind ...'
  - Processing: 'Why is effective energy storage crucial for integr...'
  - Processing: 'What are the different types of large-scale energy...'

Sequential processing completed in 20.49 seconds

Generated FAQ List (Sequential):
[
  {
    "question": "What are the primary economic and environmental benefits of adopting solar energy?",
    "answer": "The primary economic benefits of adopting solar energy include significantly lower monthly electricity bills for homeowners, the potential to sell excess power back to the grid, and contributing to energy independence for nations. The primary environmental benefit is the reduction of greenhouse gas emissions by decreasing reliance on fossil fuels.",
    "sources": [
      "The Benefits of Solar Energy"
    ]
  },
  {
    "question": "What are the main challenges associated with wind power, and how ca

# Parallelization: Improving Efficiency

While the sequential workflow works well, we can optimize it by running some steps in parallel. We can generate the answer and find sources simultaneously for all the questions. This can significantly reduce the overall processing time.

**Important**: you may meet the rate limits of your account if you do this for a lot of questions. If you go over your rate limits, the API calls will return errors and retry after a timeout. Make sure to take this into account when building real-world products!

## Implementing Parallel Processing

Let's implement a parallel version of our workflow using Python's `asyncio` library.


In [9]:
async def answer_question_async(question, content):
    """
    Async version of answer_question function.
    """
    prompt = prompt_answer_question.format(question=question, combined_content=content)
    response = await client.aio.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt
    )
    return response.text

async def find_sources_async(question, answer, content):
    """
    Async version of find_sources function.
    """
    prompt = prompt_find_sources.format(question=question, answer=answer, combined_content=content)
    response = await client.aio.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt,
        config={
            "response_mime_type": "application/json",
            "response_schema": SourceList
        }
    )
    return response.parsed.sources

async def process_question_parallel(question, content):
    """
    Process a single question by generating answer and finding sources in parallel.
    """
    answer = await answer_question_async(question, content)
    sources = await find_sources_async(question, answer, content)
    return {"question": question, "answer": answer, "sources": sources}

## Executing the Parallel Workflow

Now let's process all questions using parallel execution. We'll process multiple questions concurrently, which can significantly reduce the total processing time. Notice how much time it takes to run the full workflow and compare it with the execution time of the sequential workflow.


In [10]:
async def parallel_workflow(content, n_questions=10):
    """
    Execute the complete parallel workflow for FAQ generation.

    Args:
        content: The combined content from all sources

    Returns:
        list: A list of FAQs with questions, answers, and sources
    """
    # Generate questions (this step remains sequential)
    questions = generate_questions(content, n_questions)

    # Process all questions in parallel
    tasks = [process_question_parallel(question, content) for question in questions]

    # Execute all tasks concurrently
    parallel_faqs = await asyncio.gather(*tasks)

    return parallel_faqs

# Execute the parallel workflow (measure time for comparison)
start_time = time.time()
parallel_faqs = await parallel_workflow(combined_content, n_questions=4)
end_time = time.time()
print(f"\nParallel processing completed in {end_time - start_time:.2f} seconds")

# Display the final result
print("\nGenerated FAQ List (Parallel):")
print(json.dumps(parallel_faqs, indent=2))


Parallel processing completed in 9.15 seconds

Generated FAQ List (Parallel):
[
  {
    "question": "What are the primary environmental and economic benefits of utilizing solar energy?",
    "answer": "The primary environmental benefit of utilizing solar energy is cutting down greenhouse gas emissions by reducing reliance on fossil fuels. The economic benefits include significantly lowering monthly electricity bills, the potential to sell excess power back to the grid, and contributing to national energy independence.",
    "sources": [
      "The Benefits of Solar Energy"
    ]
  },
  {
    "question": "What is the main challenge associated with wind energy, and how is this issue typically addressed?",
    "answer": "The main challenge for wind energy is its intermittency, meaning it only generates power when the wind blows. This issue is typically addressed through the use of energy storage solutions, such as large-scale batteries.",
    "sources": [
      "Understanding Wind Turbin

## Sequential vs Parallel: Key Differences

The main differences between sequential and parallel approaches:

**Sequential Processing:**
- Questions are processed one at a time
- Predictable execution order
- Easier to debug and understand
- Higher total processing time

**Parallel Processing:**
- Multiple questions can be processed simultaneously
- Significant reduction in total processing time
- More complex error handling
- Better resource utilization

Both approaches produce the same results, but parallel processing can be significantly faster for larger datasets.


# Building a routing workflow

Routing is a method that categorizes an input and then sends it to a specific task designed to handle that type of input. This approach helps keep different functions separate and lets you create more specialized prompts. If you don't use routing, trying to optimize for one kind of input might negatively affect how well the system performs with other kinds of inputs.

## Intent Classification

First, we create a classification prompt and function to determine the user's intent. This will help us route the query to the appropriate handler:

In [11]:
class IntentEnum(str, Enum):
    """
    Defines the allowed values for the 'intent' field.
    Inheriting from 'str' ensures that the values are treated as strings.
    """
    TECHNICAL_SUPPORT = "Technical Support"
    BILLING_INQUIRY = "Billing Inquiry"
    GENERAL_QUESTION = "General Question"

class UserIntent(BaseModel):
    intent: IntentEnum

prompt_classification = """
Classify the user's query into one of the following categories:
{categories}

Return only the category name and nothing else.

User Query: "{user_query}"
""".strip()


def classify_intent(user_query):
    """Uses an LLM to classify a user query."""
    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt_classification.format(
            user_query=user_query,
            categories=[intent.value for intent in IntentEnum]
        ),
        config={
            "response_mime_type": "application/json",
            "response_schema": UserIntent
        }
    )
    return response.parsed.intent


query_1 = "My internet connection is not working."
query_2 = "I think there is a mistake on my last invoice."
query_3 = "What are your opening hours?"

intent_1 = classify_intent(query_1)
print(f"Query: {query_1}\nIntent: {intent_1}\n")
intent_2 = classify_intent(query_2)
print(f"Query: {query_2}\nIntent: {intent_2}\n")
intent_3 = classify_intent(query_3)
print(f"Query: {query_3}\nIntent: {intent_3}\n")

Query: My internet connection is not working.
Intent: IntentEnum.TECHNICAL_SUPPORT

Query: I think there is a mistake on my last invoice.
Intent: IntentEnum.BILLING_INQUIRY

Query: What are your opening hours?
Intent: IntentEnum.GENERAL_QUESTION



## Defining Specialized Handlers

Next, we create specialized prompts for each type of query and a routing function that directs queries to the appropriate handler based on the classified intent:

In [12]:
prompt_technical_support = """
You are a helpful technical support agent.
The user says: '{user_query}'.
Provide a helpful first response, asking for more details like what troubleshooting steps they have already tried.
""".strip()

prompt_billing_inquiry = """
You are a helpful billing support agent.
The user says: '{user_query}'.
Acknowledge their concern and inform them that you will need to look up their account, asking for their account number.
""".strip()

prompt_general_question = """
You are a general assistant.
The user says: '{user_query}'.
Apologize that you are not sure how to help and ask them to rephrase their question.
""".strip()


def handle_query(user_query, intent):
    """Routes a query to the correct handler based on its classified intent."""
    if intent == IntentEnum.TECHNICAL_SUPPORT:
        prompt = prompt_technical_support.format(user_query=user_query)
    elif intent == IntentEnum.BILLING_INQUIRY:
        prompt = prompt_billing_inquiry.format(user_query=user_query)
    elif intent == IntentEnum.GENERAL_QUESTION:
        prompt = prompt_general_question.format(user_query=user_query)
    else:
        prompt = prompt_general_question.format(user_query=user_query)
    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt
    )
    return response.text


response_1 = handle_query(query_1, intent_1)
print(f"Query: {query_1}\nIntent: {intent_1}\nResponse: {response_1}\n")

response_2 = handle_query(query_2, intent_2)
print(f"Query: {query_2}\nIntent: {intent_2}\nResponse: {response_2}\n")

response_3 = handle_query(query_3, intent_3)
print(f"Query: {query_3}\nIntent: {intent_3}\nResponse: {response_3}\n")

Query: My internet connection is not working.
Intent: IntentEnum.TECHNICAL_SUPPORT
Response: I'm sorry to hear your internet connection isn't working! That's definitely frustrating.

To help me understand what might be going on and avoid suggesting steps you've already tried, could you tell me a bit more about it?

For example:
*   **What troubleshooting steps have you already attempted** (like restarting your modem/router, checking cables, etc.)?
*   Are you seeing any specific error messages, or are there any unusual lights on your modem or router?
*   Is it affecting all your devices (computer, phone, tablet), or just one?

Query: I think there is a mistake on my last invoice.
Intent: IntentEnum.BILLING_INQUIRY
Response: "I understand you're concerned about a mistake on your last invoice. I'd be happy to look into that for you. To access your billing details, could you please provide your account number?"

Query: What are your opening hours?
Intent: IntentEnum.GENERAL_QUESTION
Respo

# Orchestrator-worker pattern

The orchestrator-workers workflow uses a main LLM to dynamically break down complex tasks into smaller subtasks, which are then assigned to other "worker" LLMs. The orchestrator LLM also combines the results from these workers.

This approach is ideal for complex problems where the specific steps or subtasks can't be known in advance. For instance, in a coding project, the orchestrator can decide which files need modifying and how, based on the initial request. While it might look similar to parallel processing, its key advantage is flexibility: instead of pre-defined subtasks, the orchestrator LLM determines them on the fly according to the given input.

## Defining the Orchestrator

The orchestrator is the central coordinator that breaks down complex user queries into structured JSON tasks. It analyzes the input and identifies what types of actions need to be taken, such as billing inquiries, product returns, or status updates:


In [13]:
# Orchestrator
class QueryTypeEnum(str, Enum):
    BILLING_INQUIRY = "BillingInquiry"
    PRODUCT_RETURN = "ProductReturn"
    STATUS_UPDATE = "StatusUpdate"

class Task(BaseModel):
    query_type: str
    invoice_number: Optional[str] = None
    product_name: Optional[str] = None
    reason_for_return: Optional[str] = None
    order_id: Optional[str] = None

class TaskList(BaseModel):
    tasks: List[Task]

prompt_orchestrator = f"""
You are a master orchestrator. Your job is to break down a complex user query into a JSON array of objects.
Each object represents one sub-task and must have a "query_type" and relevant parameters.

The possible "query_type" values are:
1. "{QueryTypeEnum.BILLING_INQUIRY.value}": Requires "invoice_number".
2. "{QueryTypeEnum.PRODUCT_RETURN.value}": Requires "product_name" and "reason_for_return".
3. "{QueryTypeEnum.STATUS_UPDATE.value}": Requires "order_id".

User Query:
---
{{query}}
---
""".strip()


def orchestrator(query):
    """Breaks down a complex query into a list of tasks."""
    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt_orchestrator.format(query=query),
        config={
            "response_mime_type": "application/json",
            "response_schema": TaskList
        }
    )
    return response.parsed.tasks

## Billing Worker Implementation

The billing worker specializes in handling invoice-related inquiries. It extracts the specific concern from the user's query, simulates opening an investigation, and returns structured information about the action taken:

In [14]:
# Worker for Billing Inquiry
prompt_billing_worker_extractor = """
You are a specialized assistant. A user has a query regarding invoice '{invoice_number}'.
From the full user query provided below, extract the specific concern or question the user has voiced about this particular invoice.
Respond with ONLY the extracted concern/question. If no specific concern is mentioned beyond a general inquiry about the invoice, state 'General inquiry regarding the invoice'.

Full User Query:
---
{original_user_query}
---

Extracted concern about invoice {invoice_number}:
""".strip()


def handle_billing_worker(invoice_number, original_user_query):
    """
    Handles a billing inquiry.
    1. Uses an LLM to extract the specific concern about the invoice from the original query.
    2. Simulates opening an investigation.
    3. Returns structured data about the action taken.
    """
    extraction_prompt = prompt_billing_worker_extractor.format(
        invoice_number=invoice_number, original_user_query=original_user_query
    )
    response = client.models.generate_content(model="gemini-2.5-flash", contents=extraction_prompt)
    extracted_concern = response.text

    # Simulate backend action: opening an investigation
    print(f"  [Billing Worker] Action: Investigating invoice {invoice_number} for concern: '{extracted_concern}'")
    investigation_id = f"INV_CASE_{random.randint(1000, 9999)}"
    eta_days = 2

    return {
        "task": "Billing Inquiry",
        "invoice_number": invoice_number,
        "user_concern": extracted_concern,
        "action_taken": f"An investigation (Case ID: {investigation_id}) has been opened regarding your concern.",
        "resolution_eta": f"{eta_days} business days",
    }

## Product Return Worker

The return worker handles product return requests by generating RMA (Return Merchandise Authorization) numbers and providing detailed shipping instructions for customers:

In [15]:
# Worker for Product Return
def handle_return_worker(product_name, reason_for_return):
    """
    Handles a product return request.
    1. Simulates generating an RMA number and providing return instructions.
    2. Returns structured data.
    """
    # Simulate backend action: generating RMA and getting instructions
    rma_number = f"RMA-{random.randint(10000, 99999)}"
    shipping_instructions = (
        "Please pack the '{product_name}' securely in its original packaging if possible. "
        "Include all accessories and manuals. Write the RMA number ({rma_number}) clearly on the outside of the package. "
        "Ship to: Returns Department, 123 Automation Lane, Tech City, TC 98765."
    ).format(product_name=product_name, rma_number=rma_number)
    print(f"  [Return Worker] Action: Generated RMA {rma_number} for {product_name} (Reason: {reason_for_return})")

    return {
        "task": "Product Return",
        "product_name": product_name,
        "reason_for_return": reason_for_return,
        "rma_number": rma_number,
        "shipping_instructions": shipping_instructions,
    }

## Order Status Worker

The status worker retrieves and formats order status information, including shipping details, tracking numbers, and delivery estimates:

In [16]:
# Worker for Status Update
def handle_status_worker(order_id):
    """
    Handles an order status update request.
    1. Simulates fetching order status from a backend system.
    2. Returns structured data.
    """
    # Simulate backend action: fetching order status
    # Possible statuses and details to make it more dynamic
    possible_statuses = [
        {"status": "Processing", "carrier": "N/A", "tracking": "N/A", "delivery_estimate": "3-5 business days"},
        {
            "status": "Shipped",
            "carrier": "SuperFast Shipping",
            "tracking": f"SF{random.randint(100000, 999999)}",
            "delivery_estimate": "Tomorrow",
        },
        {
            "status": "Delivered",
            "carrier": "Local Courier",
            "tracking": f"LC{random.randint(10000, 99999)}",
            "delivery_estimate": "Delivered yesterday",
        },
        {
            "status": "Delayed",
            "carrier": "Standard Post",
            "tracking": f"SP{random.randint(10000, 99999)}",
            "delivery_estimate": "Expected in 2-3 additional days",
        },
    ]
    # For a given order_id, we could hash it to pick a status or just pick one randomly for this example
    # This ensures that for the same order_id in a single run, we'd get the same fake status if we implement a simple hash.
    # For now, let's pick randomly for demonstration.
    status_details = random.choice(possible_statuses)
    print(f"  [Status Worker] Action: Fetched status for order {order_id}: {status_details['status']}")

    return {
        "task": "Status Update",
        "order_id": order_id,
        "current_status": status_details["status"],
        "carrier": status_details["carrier"],
        "tracking_number": status_details["tracking"],
        "expected_delivery": status_details["delivery_estimate"],
    }

## Response Synthesizer

The synthesizer takes the structured results from all workers and combines them into a single, coherent, and customer-friendly response message:

In [17]:
# Synthesizer
prompt_synthesizer = """
You are a master communicator. Combine several distinct pieces of information from our support team into a single, well-formatted, and friendly email to a customer.

Here are the points to include, based on the actions taken for their query:
---
{formatted_results}
---

Combine these points into one cohesive response. Start with a friendly greeting (e.g., "Dear Customer," or "Hi there,") and end with a polite closing (e.g., "Sincerely," or "Best regards,").
Ensure the tone is helpful and professional.
""".strip()


def synthesizer(results):
    """Combines structured results from workers into a single user-facing message."""
    bullet_points = []
    for res in results:
        point = f"Regarding your {res['task']}:\n"
        if res["task"] == "Billing Inquiry":
            point += f"  - Invoice Number: {res['invoice_number']}\n"
            point += f'  - Your Stated Concern: "{res["user_concern"]}"\n'
            point += f"  - Our Action: {res['action_taken']}\n"
            point += f"  - Expected Resolution: We will get back to you within {res['resolution_eta']}."
        elif res["task"] == "ProductReturn":
            point += f"  - Product: {res['product_name']}\n"
            point += f'  - Reason for Return: "{res["reason_for_return"]}"\n'
            point += f"  - Return Authorization (RMA): {res['rma_number']}\n"
            point += f"  - Instructions: {res['shipping_instructions']}"
        elif res["task"] == "Status Update":
            point += f"  - Order ID: {res['order_id']}\n"
            point += f"  - Current Status: {res['current_status']}\n"
            if res["carrier"] != "N/A":
                point += f"  - Carrier: {res['carrier']}\n"
            if res["tracking_number"] != "N/A":
                point += f"  - Tracking Number: {res['tracking_number']}\n"
            point += f"  - Delivery Estimate: {res['expected_delivery']}"
        bullet_points.append(point)

    formatted_results = "\n\n".join(bullet_points)
    prompt = prompt_synthesizer.format(formatted_results=formatted_results)
    response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
    return response.text

## Main Orchestrator-Worker Pipeline

This function coordinates the entire orchestrator-worker workflow: it runs the orchestrator to break down the query, dispatches the appropriate workers, and synthesizes the final response:

In [18]:
def process_user_query(user_query):
    """Processes a query using the Orchestrator-Worker-Synthesizer pattern."""

    print(f"User query:\n---\n{user_query}\n---")

    # 1. Run orchestrator
    tasks_list = orchestrator(user_query)
    if not tasks_list:
        print("\nOrchestrator did not return any tasks. Exiting.")
        return

    print("\nDeconstructed tasks from Orchestrator:")
    for task in tasks_list:
        print(task.model_dump_json(indent=2))

    # 2. Run workers
    worker_results = []
    if tasks_list:
        print(f"\nDispatching {len(tasks_list)} workers...")
        for task in tasks_list:
            if task.query_type == QueryTypeEnum.BILLING_INQUIRY:
                worker_results.append(handle_billing_worker(task.invoice_number, user_query))
            elif task.query_type == QueryTypeEnum.PRODUCT_RETURN:
                # Ensure reason_for_return is present, provide a default if not (though orchestrator should capture it)
                worker_results.append(handle_return_worker(task.product_name, task.reason_for_return))
            elif task.query_type == QueryTypeEnum.STATUS_UPDATE:
                worker_results.append(handle_status_worker(task.order_id))
            else:
                print(f"Warning: Unknown query_type '{task.query_type}' found in orchestrator tasks.")

        if worker_results:
            print(f"Ran {len(worker_results)} workers sequentially.")
            print("\nWorkers finished their jobs. Results:")
            for i, res in enumerate(worker_results):
                print(f"--- Worker Result {i + 1} ---")
                print(json.dumps(res, indent=2))
                print("----------------------")
        else:
            print("\nNo valid worker tasks to run.")
    else:
        print("\nNo tasks to run for workers.")

    # 3. Run synthesizer
    if worker_results:
        print("\nSynthesizing final response...")
        final_user_message = synthesizer(worker_results)
        print("\n--- Final Synthesized Response ---")
        print(final_user_message)
        print("---------------------------------")
    else:
        print("\nSkipping synthesis because there were no worker results.")

## Testing the Complete Workflow

Let's test our orchestrator-worker pattern with a complex customer query that involves multiple tasks: a billing inquiry, a product return, and an order status update:

In [19]:
# Test with customer query
complex_customer_query = """
Hi, I'm writing to you because I have a question about invoice #INV-7890. It seems higher than I expected.
Also, I would like to return the 'SuperWidget 5000' I bought because it's not compatible with my system.
Finally, can you give me an update on my order #A-12345?
""".strip()

process_user_query(complex_customer_query)

User query:
---
Hi, I'm writing to you because I have a question about invoice #INV-7890. It seems higher than I expected.
Also, I would like to return the 'SuperWidget 5000' I bought because it's not compatible with my system.
Finally, can you give me an update on my order #A-12345?
---

Deconstructed tasks from Orchestrator:
{
  "query_type": "BillingInquiry",
  "invoice_number": "INV-7890",
  "product_name": null,
  "reason_for_return": null,
  "order_id": null
}
{
  "query_type": "ProductReturn",
  "invoice_number": null,
  "product_name": "SuperWidget 5000",
  "reason_for_return": "not compatible with my system",
  "order_id": null
}
{
  "query_type": "StatusUpdate",
  "invoice_number": null,
  "product_name": null,
  "reason_for_return": null,
  "order_id": "A-12345"
}

Dispatching 3 workers...
  [Billing Worker] Action: Investigating invoice INV-7890 for concern: 'It seems higher than I expected.'
  [Return Worker] Action: Generated RMA RMA-91987 for SuperWidget 5000 (Reason: no