# Lesson 4: Workflow Orchestration Patterns

## 🎯 Learning Objectives

By the end of this lesson, you will be able to:

1. **Understand** when to use workflow orchestration vs routing patterns
2. **Implement** sequential workflows for multi-stage processes
3. **Implement** parallel execution for concurrent information gathering
4. **Compare** the performance and use cases of each pattern
5. **Apply** workflow patterns to real-world production scenarios
6. **Optimize** workflows for cost and performance

## 📚 Quick Recap: Lessons 1-3

In previous lessons, you learned:
- ✅ How to create agents with tools (function calling)
- ✅ How to build hierarchical routing with coordinator + specialists
- ✅ How agents transfer control using `sub_agents` and `transfer_to_agent()`

**New in this lesson**: Instead of intelligent routing, we'll use **deterministic workflow patterns** where execution order is fixed and predictable.

## 🚀 What's New: Workflow Agents

ADK provides two powerful workflow orchestration patterns:

- 📋 **SequentialAgent**: Execute agents in strict order (A → B → C)
- ⚡ **ParallelAgent**: Execute agents concurrently (A + B + C simultaneously)

**Key difference from Lesson 3**: These are NOT LLM-powered. They follow predetermined, **deterministic** execution patterns.

## 🏢 Use Case: IT Support Automation

We'll build two workflow systems:
1. **Sequential**: Automated ticket processing pipeline
2. **Parallel**: Multi-source information gathering

---

## 💡 Part 1: Understanding Workflow Patterns

### When to Use Each Pattern

| Pattern | Use When | Example |
|---------|----------|----------|
| **Hierarchical Routing** (Lesson 3) | Request needs ONE specialist | Route "Wi-Fi issue" → Network Specialist |
| **Sequential Workflow** (This lesson) | Task needs MULTIPLE steps in order | Process ticket: Classify → Prioritize → Assign |
| **Parallel Execution** (This lesson) | Need info from MULTIPLE sources | Search: KB + Tickets + Docs simultaneously |

### Sequential vs Parallel Comparison

```
SEQUENTIAL WORKFLOW:           PARALLEL EXECUTION:
Agent A                        Agent A ┐
   ↓                                   ├─→ Aggregate Results
Agent B                        Agent B ┤
   ↓                                   ├─→ Aggregate Results  
Agent C                        Agent C ┘
   ↓
Result                         Result

Time: 3x                       Time: 1x (3x parallel)
When: Steps depend on each     When: Steps are independent
      other                          
```

### Real-World Examples

**Sequential Workflows:**
- 📝 Document pipeline: Write → Review → Publish
- 🎯 Sales process: Qualify → Demo → Quote → Close
- 🏭 Manufacturing: Design → Build → Test → Ship
- 🎓 Course creation: Research → Write → Edit → Publish

**Parallel Execution:**
- 🔍 Research: Search multiple databases at once
- 🌐 API aggregation: Call multiple APIs simultaneously
- 📊 Data analysis: Analyze different datasets in parallel
- 🔐 Validation: Check multiple conditions concurrently

---

## 🔧 Part 2: Environment Setup

In [None]:
# Install the Google Agent Development Kit and dependencies
!pip install -q google-adk litellm openai python-dotenv nest-asyncio

print("✅ Packages installed successfully!")

In [None]:
# Core ADK imports - including workflow agents!
from google.adk.agents import LlmAgent, SequentialAgent, ParallelAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.models.lite_llm import LiteLlm
from google.genai import types

# System imports
import os
import asyncio
import time
from typing import Dict, List, Any
from datetime import datetime

print("✅ Imports successful!")
print("   Workflow agents imported: SequentialAgent, ParallelAgent")

In [None]:
# Configure OpenAI API key
try:
    from google.colab import userdata
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    print("✅ API key loaded from Colab secrets")
except:
    from getpass import getpass
    print("💡 To use Colab secrets: Go to 🔑 (left sidebar) → Add new secret → Name: OPENAI_API_KEY")
    OPENAI_API_KEY = getpass("Enter your OpenAI API Key: ")

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

if not OPENAI_API_KEY or OPENAI_API_KEY.strip() == "":
    raise ValueError("❌ ERROR: No API key provided!")

print("✅ Authentication configured!")

# Model configuration
OPENAI_MODEL = "gpt-5-nano"  # Cost-efficient for learning

print(f"\n🤖 Model: {OPENAI_MODEL}")
print("\n💡 Workflow agents are model-agnostic!")
print("   Same code works with any LLM provider:")
print("   - OpenAI: 'gpt-5-nano', 'gpt-4o-mini', 'gpt-4o'")
print("   - Claude: 'claude-3-5-sonnet-20241022'")
print("   - Gemini: 'gemini-2.0-flash-exp'")

---

## 📋 Part 3: Pattern A - Sequential Workflows

### Use Case: Automated Ticket Processing Pipeline

When a new IT support ticket arrives, it needs to go through multiple stages:
1. **Classify** the issue type (hardware, software, network)
2. **Prioritize** based on urgency and impact
3. **Assign** to the appropriate team

These steps MUST happen in order:
- Can't prioritize before classifying
- Can't assign before knowing priority

### How SequentialAgent Works

```python
pipeline = SequentialAgent(
    name="ticket_pipeline",
    sub_agents=[agent1, agent2, agent3]  # Execute in this exact order
)
```

- ✅ Agents execute in the order specified
- ✅ Each agent sees output from previous agents
- ✅ Shared session state passes data between stages
- ✅ Deterministic - no LLM decides the flow

Let's build it!

### 3.1: Create Mock Ticket Data

In [None]:
# Sample tickets for testing the pipeline
SAMPLE_TICKETS = [
    {
        "ticket_id": "T-5001",
        "description": "My laptop won't turn on after I spilled coffee on it. It's completely dead and I have a presentation in 2 hours!",
        "user": "john.doe@company.com",
        "submitted_at": "2025-10-06 08:30:00"
    },
    {
        "ticket_id": "T-5002",
        "description": "I can't access my email. It says my password is incorrect but I'm sure it's right.",
        "user": "jane.smith@company.com",
        "submitted_at": "2025-10-06 09:15:00"
    },
    {
        "ticket_id": "T-5003",
        "description": "The Wi-Fi in our conference room is really slow during meetings.",
        "user": "bob.wilson@company.com",
        "submitted_at": "2025-10-06 10:00:00"
    }
]

print("✅ Sample tickets loaded!")
print(f"   Total tickets: {len(SAMPLE_TICKETS)}")

### 3.2: Stage 1 - Ticket Classifier Agent

In [None]:
# Stage 1: Classify the ticket type
ticket_classifier = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="ticket_classifier",
    instruction="""
    You are a Ticket Classifier in an IT support automation pipeline.

    YOUR TASK:
    Analyze the ticket description and classify it into ONE category:
    - hardware: Physical device issues (laptop, printer, monitor)
    - software: Application or license issues
    - network: Connectivity or internet issues
    - access: Account, password, or permissions issues

    OUTPUT FORMAT:
    You must respond with EXACTLY this format:
    CATEGORY: [category]
    REASONING: [brief explanation]

    Example:
    CATEGORY: hardware
    REASONING: User reports physical damage to laptop (coffee spill)

    Be concise and accurate. This classification determines the next steps.
    """
)

print("✅ Ticket Classifier created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Stage 1 - Classify ticket type")

### 3.3: Stage 2 - Priority Assigner Agent

In [None]:
# Stage 2: Assign priority level
priority_assigner = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="priority_assigner",
    instruction="""
    You are a Priority Assigner in an IT support automation pipeline.

    YOUR TASK:
    Based on the ticket description and classification, assign a priority level:
    - critical: System down, data loss, blocking work for multiple people
    - high: Blocking individual user's work, urgent deadline
    - medium: Impacting work but has workarounds
    - low: Minor inconvenience, no immediate impact

    PRIORITY FACTORS:
    - Impact: How many people affected?
    - Urgency: Time-sensitive? Deadline mentioned?
    - Severity: How broken is it? Complete failure vs slow performance?

    OUTPUT FORMAT:
    PRIORITY: [critical/high/medium/low]
    REASONING: [explain why this priority]
    SLA: [response time - e.g., "15 minutes", "2 hours", "24 hours"]

    Be decisive. Consider urgency keywords like "urgent", "ASAP", time mentions.
    """
)

print("✅ Priority Assigner created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Stage 2 - Assign priority level")

### 3.4: Stage 3 - Team Assigner Agent

In [None]:
# Stage 3: Assign to appropriate team
team_assigner = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="team_assigner",
    instruction="""
    You are a Team Assigner in an IT support automation pipeline.

    YOUR TASK:
    Based on the ticket category and priority, assign it to the correct team:

    TEAMS:
    - hardware_team: Physical device repairs and replacements
    - software_team: Application issues, licenses, installations
    - network_team: Connectivity, internet, VPN issues
    - security_team: Access control, passwords, permissions

    ESCALATION RULES:
    - Critical priority: Assign to senior member
    - High priority: Regular team member
    - Medium/Low: Junior team member or queue

    OUTPUT FORMAT:
    TEAM: [team_name]
    ASSIGNEE: [senior/regular/junior/queue]
    NEXT_ACTION: [what the team should do first]

    Make logical assignments based on category and urgency.
    """
)

print("✅ Team Assigner created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Stage 3 - Assign to team")

### 3.5: Create the Sequential Pipeline

In [None]:
# Create the sequential ticket processing pipeline
ticket_pipeline = SequentialAgent(
    name="ticket_processing_pipeline",
    sub_agents=[
        ticket_classifier,    # Stage 1: Classify
        priority_assigner,    # Stage 2: Prioritize
        team_assigner        # Stage 3: Assign
    ]
)

print("✅ Sequential Pipeline created!")
print(f"\n📋 Pipeline stages:")
print(f"   1. {ticket_classifier.name} → Classify ticket type")
print(f"   2. {priority_assigner.name} → Assign priority level")
print(f"   3. {team_assigner.name} → Route to team")
print(f"\n⚡ Execution: Sequential (one after another)")
print(f"   Each stage sees the output from previous stages")

### 3.6: Setup Runner for Sequential Pipeline

In [None]:
# Create session service and runner
session_service = InMemorySessionService()
APP_NAME = "ticket_pipeline_app"

pipeline_runner = Runner(
    app_name=APP_NAME,
    agent=ticket_pipeline,
    session_service=session_service
)

print("✅ Pipeline Runner initialized!")
print(f"   App: {APP_NAME}")
print(f"   Root Agent: {ticket_pipeline.name}")

### 3.7: Test the Sequential Pipeline

In [None]:
# Helper function to process a ticket through the pipeline
_created_sessions = set()

async def process_ticket_async(ticket: Dict, session_id: str = None):
    """
    Process a ticket through the sequential pipeline.
    """
    if session_id is None:
        session_id = f"session_{ticket['ticket_id']}"

    user_id = "pipeline_system"

    # Create session
    session_key = (session_id, user_id)
    if session_key not in _created_sessions:
        await session_service.create_session(
            app_name=APP_NAME,
            user_id=user_id,
            session_id=session_id,
            state={}
        )
        _created_sessions.add(session_key)

    # Format ticket as input message
    ticket_message = f"""
    Ticket ID: {ticket['ticket_id']}
    User: {ticket['user']}
    Submitted: {ticket['submitted_at']}
    Description: {ticket['description']}
    """

    content = types.Content(role='user', parts=[types.Part(text=ticket_message)])

    print(f"\n{'='*80}")
    print(f"🎫 PROCESSING TICKET: {ticket['ticket_id']}")
    print(f"{'='*80}")
    print(f"Description: {ticket['description'][:100]}...")
    print(f"\n⏳ Starting sequential pipeline...\n")

    start_time = time.time()

    # Run pipeline
    events = pipeline_runner.run_async(user_id=user_id, session_id=session_id, new_message=content)

    stage_outputs = []
    final_response = None

    async for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text

    elapsed_time = time.time() - start_time

    if final_response:
        print(f"\n📊 PIPELINE RESULTS:")
        print(f"{'-'*80}")
        print(final_response)
        print(f"{'-'*80}")
        print(f"\n⏱️  Processing time: {elapsed_time:.2f} seconds")
        print(f"✅ Ticket {ticket['ticket_id']} processed successfully!")
        print(f"{'='*80}\n")

    return final_response

def process_ticket(ticket: Dict, session_id: str = None):
    """Synchronous wrapper for process_ticket_async."""
    try:
        loop = asyncio.get_running_loop()
        import nest_asyncio
        nest_asyncio.apply()
        return asyncio.run(process_ticket_async(ticket, session_id))
    except RuntimeError:
        return asyncio.run(process_ticket_async(ticket, session_id))

print("✅ Pipeline test function ready!")

### 3.8: Run Pipeline Demonstrations

In [None]:
# Demo 1: Critical hardware issue
process_ticket(SAMPLE_TICKETS[0])

In [None]:
# Demo 2: Access/password issue
process_ticket(SAMPLE_TICKETS[1])

In [None]:
# Demo 3: Network performance issue
process_ticket(SAMPLE_TICKETS[2])

### 3.9: Sequential Pipeline Key Observations

**What you just saw:**

1. ✅ **Strict order**: Classifier → Priority → Team (always this sequence)
2. ✅ **Data flow**: Each stage sees results from previous stages
3. ✅ **Deterministic**: No LLM decides the flow, it's hardcoded
4. ✅ **Cumulative time**: Total time = sum of all stages

**When Sequential is Perfect:**
- Steps depend on previous results
- Order matters for correctness
- Need predictable execution flow
- Building assembly line / pipeline processes

---

## ⚡ Part 4: Pattern B - Parallel Execution

### Use Case: Multi-Source Information Gathering

When researching how to solve a ticket, we want to search multiple sources:
1. **Internal knowledge base** - Published solutions
2. **Past resolved tickets** - Similar issues solved before
3. **System documentation** - Technical specs and configs

These searches are INDEPENDENT:
- Each search doesn't depend on others
- They can all run at the same time
- Results are aggregated afterwards

### How ParallelAgent Works

```python
parallel_research = ParallelAgent(
    name="parallel_researcher",
    sub_agents=[agent1, agent2, agent3]  # Execute simultaneously
)
```

- ✅ All agents execute at the same time
- ✅ Dramatically faster than sequential
- ✅ Each agent works independently
- ✅ Results are collected and returned together

Let's build it!

### 4.1: Create Mock Data Sources

In [None]:
# Mock knowledge base database
KNOWLEDGE_BASE = [
    {"id": "KB-001", "title": "Wi-Fi Connection Issues", "solution": "Reset router, check DNS settings, update drivers"},
    {"id": "KB-002", "title": "Password Reset Procedures", "solution": "Use self-service portal, verify with 2FA, contact security team"},
    {"id": "KB-003", "title": "Laptop Won't Boot", "solution": "Check power supply, test with external display, run diagnostics"},
    {"id": "KB-004", "title": "Email Access Problems", "solution": "Clear browser cache, try different browser, reset password"},
    {"id": "KB-005", "title": "Slow Network Performance", "solution": "Check bandwidth, test different location, verify router settings"},
]

# Mock resolved tickets database
RESOLVED_TICKETS = [
    {"id": "T-4501", "issue": "Laptop power failure", "resolution": "Replaced battery, tested charging port", "time": "2 hours"},
    {"id": "T-4502", "issue": "Email login failure", "resolution": "Reset password via self-service portal", "time": "15 minutes"},
    {"id": "T-4503", "issue": "Conference room Wi-Fi slow", "resolution": "Upgraded router firmware, optimized channel", "time": "3 hours"},
    {"id": "T-4504", "issue": "Cannot access VPN", "resolution": "Renewed VPN certificate, updated client", "time": "1 hour"},
]

# Mock system documentation
SYSTEM_DOCS = [
    {"system": "Email Server", "version": "Exchange 2019", "config": "AD authentication, 2FA enabled"},
    {"system": "Network Infrastructure", "version": "Cisco Catalyst", "config": "VLAN segmentation, 802.11ac"},
    {"system": "Laptop Standard Build", "version": "Windows 11 Pro", "config": "Dell Latitude 5520, 16GB RAM"},
    {"system": "VPN Gateway", "version": "Cisco AnyConnect", "config": "Split tunneling enabled"},
]

print("✅ Mock data sources loaded!")
print(f"   Knowledge Base: {len(KNOWLEDGE_BASE)} articles")
print(f"   Resolved Tickets: {len(RESOLVED_TICKETS)} tickets")
print(f"   System Docs: {len(SYSTEM_DOCS)} systems")

### 4.2: Agent 1 - Knowledge Base Searcher

In [None]:
# Agent 1: Search knowledge base
kb_searcher = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="kb_searcher",
    instruction=f"""
    You are a Knowledge Base Searcher in a parallel research system.

    YOUR TASK:
    Search the knowledge base for relevant articles that could help solve the issue.

    AVAILABLE KNOWLEDGE BASE:
    {KNOWLEDGE_BASE}

    OUTPUT FORMAT:
    SOURCE: Knowledge Base
    RELEVANT ARTICLES: [list article IDs and titles]
    RECOMMENDED SOLUTION: [summarize the most relevant solution]
    CONFIDENCE: [high/medium/low]

    Focus on finding the most relevant articles for the issue described.
    """
)

print("✅ Knowledge Base Searcher created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Search internal KB")

### 4.3: Agent 2 - Ticket History Searcher

In [None]:
# Agent 2: Search resolved tickets
ticket_searcher = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="ticket_searcher",
    instruction=f"""
    You are a Ticket History Searcher in a parallel research system.

    YOUR TASK:
    Search past resolved tickets for similar issues and their solutions.

    AVAILABLE RESOLVED TICKETS:
    {RESOLVED_TICKETS}

    OUTPUT FORMAT:
    SOURCE: Resolved Tickets
    SIMILAR TICKETS: [list ticket IDs and issues]
    PROVEN SOLUTIONS: [what worked in the past]
    RESOLUTION TIME: [typical time to resolve]

    Focus on tickets with similar symptoms and successful resolutions.
    """
)

print("✅ Ticket History Searcher created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Search past tickets")

### 4.4: Agent 3 - Documentation Searcher

In [None]:
# Agent 3: Search system documentation
docs_searcher = LlmAgent(
    model=LiteLlm(model=f"openai/{OPENAI_MODEL}"),
    name="docs_searcher",
    instruction=f"""
    You are a System Documentation Searcher in a parallel research system.

    YOUR TASK:
    Search system documentation for relevant configurations and technical details.

    AVAILABLE SYSTEM DOCUMENTATION:
    {SYSTEM_DOCS}

    OUTPUT FORMAT:
    SOURCE: System Documentation
    RELEVANT SYSTEMS: [list system names]
    TECHNICAL DETAILS: [configuration info that might help]
    CONSIDERATIONS: [any technical constraints or requirements]

    Focus on technical specifications that could be relevant to troubleshooting.
    """
)

print("✅ Documentation Searcher created!")
print(f"   Model: {OPENAI_MODEL}")
print(f"   Role: Search system docs")

### 4.5: Create the Parallel Research Agent

In [None]:
# Create the parallel information gathering system
parallel_researcher = ParallelAgent(
    name="parallel_information_gatherer",
    sub_agents=[
        kb_searcher,        # Search KB simultaneously
        ticket_searcher,    # Search tickets simultaneously
        docs_searcher      # Search docs simultaneously
    ]
)

print("✅ Parallel Research Agent created!")
print(f"\n⚡ Research sources (running in parallel):")
print(f"   1. {kb_searcher.name} → Search knowledge base")
print(f"   2. {ticket_searcher.name} → Search past tickets")
print(f"   3. {docs_searcher.name} → Search system docs")
print(f"\n⚡ Execution: Parallel (all at once)")
print(f"   ~3x faster than sequential search!")

### 4.6: Setup Runner for Parallel Research

In [None]:
# Create separate app for parallel research
RESEARCH_APP_NAME = "parallel_research_app"
research_session_service = InMemorySessionService()

research_runner = Runner(
    app_name=RESEARCH_APP_NAME,
    agent=parallel_researcher,
    session_service=research_session_service
)

print("✅ Parallel Research Runner initialized!")
print(f"   App: {RESEARCH_APP_NAME}")
print(f"   Root Agent: {parallel_researcher.name}")

### 4.7: Test Parallel Research

In [None]:
# Helper function for parallel research
_research_sessions = set()

async def research_issue_async(issue_description: str, session_id: str = "research_001"):
    """
    Research an issue using parallel information gathering.
    Shows individual agent responses for educational purposes.
    """
    user_id = "research_system"

    # Create session
    session_key = (session_id, user_id)
    if session_key not in _research_sessions:
        await research_session_service.create_session(
            app_name=RESEARCH_APP_NAME,
            user_id=user_id,
            session_id=session_id,
            state={}
        )
        _research_sessions.add(session_key)

    content = types.Content(role='user', parts=[types.Part(text=issue_description)])

    print(f"\n{'='*80}")
    print(f"🔍 PARALLEL RESEARCH QUERY")
    print(f"{'='*80}")
    print(f"Issue: {issue_description}")
    print(f"\n⚡ Launching 3 parallel agents...\n")

    start_time = time.time()

    # For educational purposes, let's also run individual agents separately to show their outputs
    # This demonstrates what each agent finds before aggregation
    print(f"{'─'*80}")
    print(f"🔬 INDIVIDUAL AGENT RESPONSES:")
    print(f"{'─'*80}\n")

    # Run each searcher individually to show what they find
    individual_results = []

    # KB Searcher
    print(f"📚 KB SEARCHER:")
    print(f"{'-'*80}")
    kb_session_id = f"{session_id}_kb"
    kb_session_service = InMemorySessionService()
    await kb_session_service.create_session(
        app_name="kb_search",
        user_id=user_id,
        session_id=kb_session_id,
        state={}
    )
    kb_runner = Runner(app_name="kb_search", agent=kb_searcher, session_service=kb_session_service)
    events = kb_runner.run_async(user_id=user_id, session_id=kb_session_id, new_message=content)
    async for event in events:
        if event.is_final_response():
            kb_result = event.content.parts[0].text
            print(kb_result)
            individual_results.append(("KB Searcher", kb_result))
    print(f"{'-'*80}\n")

    # Ticket Searcher
    print(f"🎫 TICKET SEARCHER:")
    print(f"{'-'*80}")
    ticket_session_id = f"{session_id}_ticket"
    ticket_session_service = InMemorySessionService()
    await ticket_session_service.create_session(
        app_name="ticket_search",
        user_id=user_id,
        session_id=ticket_session_id,
        state={}
    )
    ticket_runner = Runner(app_name="ticket_search", agent=ticket_searcher, session_service=ticket_session_service)
    events = ticket_runner.run_async(user_id=user_id, session_id=ticket_session_id, new_message=content)
    async for event in events:
        if event.is_final_response():
            ticket_result = event.content.parts[0].text
            print(ticket_result)
            individual_results.append(("Ticket Searcher", ticket_result))
    print(f"{'-'*80}\n")

    # Docs Searcher
    print(f"📖 DOCS SEARCHER:")
    print(f"{'-'*80}")
    docs_session_id = f"{session_id}_docs"
    docs_session_service = InMemorySessionService()
    await docs_session_service.create_session(
        app_name="docs_search",
        user_id=user_id,
        session_id=docs_session_id,
        state={}
    )
    docs_runner = Runner(app_name="docs_search", agent=docs_searcher, session_service=docs_session_service)
    events = docs_runner.run_async(user_id=user_id, session_id=docs_session_id, new_message=content)
    async for event in events:
        if event.is_final_response():
            docs_result = event.content.parts[0].text
            print(docs_result)
            individual_results.append(("Docs Searcher", docs_result))
    print(f"{'-'*80}\n")

    elapsed_time = time.time() - start_time

    # Now show what happens in parallel mode
    print(f"\n{'='*80}")
    print(f"💡 EDUCATIONAL NOTE:")
    print(f"{'='*80}")
    print(f"Above, we ran each agent SEQUENTIALLY to show you their individual outputs.")
    print(f"In a ParallelAgent, all 3 would run SIMULTANEOUSLY and aggregate automatically.")
    print(f"Let's now run the actual ParallelAgent to see the aggregated result:")
    print(f"{'='*80}\n")

    # Now run the actual parallel agent
    parallel_start = time.time()
    events = research_runner.run_async(user_id=user_id, session_id=session_id, new_message=content)

    final_response = None
    async for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text

    parallel_elapsed = time.time() - parallel_start

    # Display aggregated results
    if final_response:
        print(f"📊 PARALLEL AGENT - AGGREGATED RESULTS:")
        print(f"{'='*80}")
        print(final_response)
        print(f"{'='*80}")
        print(f"\n⏱️  Time comparison:")
        print(f"   Sequential (showing each): {elapsed_time:.2f} seconds")
        print(f"   Parallel (actual): {parallel_elapsed:.2f} seconds")
        print(f"   ⚡ Speedup: ~{elapsed_time/parallel_elapsed:.1f}x faster!")
        print(f"\n✅ Research completed - {len(individual_results)} sources consulted")
        print(f"{'='*80}\n")

    return final_response

def research_issue(issue_description: str, session_id: str = "research_001"):
    """Synchronous wrapper for research_issue_async."""
    try:
        loop = asyncio.get_running_loop()
        import nest_asyncio
        nest_asyncio.apply()
        return asyncio.run(research_issue_async(issue_description, session_id))
    except RuntimeError:
        return asyncio.run(research_issue_async(issue_description, session_id))

print("✅ Parallel research function ready!")
print("   💡 Shows individual outputs THEN parallel aggregation!")

### 4.8: Run Parallel Research Demonstrations

**What you'll see in the output:**

For educational purposes, the demonstration runs in two phases:

1. **🔬 Individual Agent Responses (Sequential)** - We run each agent separately to show you what each one finds:
   - 📚 KB Searcher - Results from knowledge base
   - 🎫 Ticket Searcher - Results from past tickets  
   - 📖 Docs Searcher - Results from system documentation

2. **📊 Parallel Agent (Actual)** - Then we run the ParallelAgent to show:
   - How it aggregates results automatically
   - The dramatic speed improvement (3x faster!)
   - The final combined output

**Why show both?** ParallelAgent doesn't expose individual agent outputs in its event stream - it runs them all concurrently and only returns the aggregated result. By running agents individually first, you can see what each contributes before seeing the parallel aggregation!

This verbose output helps you understand what happens "under the hood" when agents run in parallel!

In [None]:
# Demo 1: Research laptop power issue
research_issue(
    "Laptop won't turn on after liquid spill. Need to know repair procedures, past incidents, and hardware specs.",
    session_id="research_demo_1"
)

In [None]:
# Demo 2: Research Wi-Fi performance issue
research_issue(
    "Conference room Wi-Fi is very slow during meetings. Need troubleshooting steps and network configuration.",
    session_id="research_demo_2"
)

In [None]:
# Demo 3: Research email access problem
research_issue(
    "User can't login to email despite correct password. Need to understand email system config and past solutions.",
    session_id="research_demo_3"
)

### 4.9: Parallel Execution Key Observations

**What you just saw:**

1. ✅ **Individual agent outputs (sequential demonstration)**:
   - We first ran each agent separately to show what they find
   - 📚 KB Searcher displayed knowledge base matches
   - 🎫 Ticket Searcher displayed similar past tickets
   - 📖 Docs Searcher displayed relevant system documentation
   - This helps you understand what each agent contributes

2. ✅ **Parallel execution (actual ParallelAgent)**:
   - Then ran all 3 agents simultaneously using ParallelAgent
   - All searches happened at the same time (not one after another)
   - Results automatically aggregated by the ParallelAgent
   - ~3x faster than the sequential demonstration!

3. ✅ **Time comparison**:
   - Sequential (for demonstration): Shows each agent's work
   - Parallel (production use): Dramatically faster
   - The speedup is visible in the timing output

**How ParallelAgent Really Works:**

```
Parallel Agent Execution (what actually happens):
Time 0s:  ParallelAgent launches all 3 sub-agents simultaneously
          ├─ 📚 KB Searcher starts (API call 1)
          ├─ 🎫 Ticket Searcher starts (API call 2)
          └─ 📖 Docs Searcher starts (API call 3)

Time 1-2s: All agents working in parallel
          (3 concurrent API calls to OpenAI)

Time 2s:  All agents complete
          ParallelAgent aggregates results
          Returns combined output
```

**Important Note:**
ParallelAgent doesn't expose individual agent responses in its event stream - it handles aggregation internally. In our demo, we ran agents individually FIRST (sequentially) to show what each finds, then ran the actual ParallelAgent to demonstrate the speed benefit and automatic aggregation.

**When Parallel is Perfect:**
- Tasks are independent (don't depend on each other)
- Speed is critical
- Gathering information from multiple sources
- API calls or I/O-bound operations

**Cost Consideration:**
- ⚠️ Parallel = Multiple simultaneous API calls
- 3 agents in parallel = 3 concurrent API calls
- Same total cost as sequential, but higher instant rate
- Trade-off: Speed vs API rate limits
- Worth it when time is more valuable than cost

---

## 🔄 Part 5: Comparing Sequential vs Parallel

### Performance Comparison

| Aspect | Sequential | Parallel |
|--------|-----------|----------|
| **Execution Order** | One after another (A→B→C) | All at once (A+B+C) |
| **Total Time** | Sum of all stages | Max of longest stage |
| **Dependencies** | Later stages see earlier results | All stages independent |
| **API Calls** | N stages = N calls | N stages = N simultaneous calls |
| **Cost** | Linear (1x) | Higher (Nx but faster) |
| **Use When** | Steps depend on each other | Steps are independent |

### Example: 3 Agents, Each Takes 2 Seconds

**Sequential:**
```
Agent 1 (2s) → Agent 2 (2s) → Agent 3 (2s)
Total Time: 6 seconds
API Calls: 3 (one at a time)
```

**Parallel:**
```
Agent 1 (2s) ┐
Agent 2 (2s) ├─ All at once
Agent 3 (2s) ┘
Total Time: 2 seconds
API Calls: 3 (simultaneously)
```

### Decision Matrix

**Choose Sequential When:**
- ✅ Stage B needs results from Stage A
- ✅ Order matters for correctness
- ✅ Building a pipeline/assembly line
- ✅ Want to minimize concurrent API usage

**Choose Parallel When:**
- ✅ All tasks can run independently
- ✅ Speed is more important than cost
- ✅ Gathering data from multiple sources
- ✅ Have concurrent API quota available

### Real-World Pattern Combinations

**Fan-Out/Gather (Most Common):**
```
Sequential [
    Prepare Query,
    Parallel [Search A, Search B, Search C],  # Fan-out
    Aggregate Results                          # Gather
]
```

**Staged Pipeline with Parallel Steps:**
```
Sequential [
    Initial Processing,
    Parallel [Validation A, Validation B],
    Final Decision
]
```

---

## 🎓 Part 7: Student Exercises

### Exercise 1: Create a Code Review Pipeline (Intermediate)

**Task:** Build a sequential code review pipeline with 3 stages:
1. **Syntax Checker**: Verify code syntax is valid
2. **Security Reviewer**: Check for security vulnerabilities
3. **Performance Optimizer**: Suggest performance improvements

**Requirements:**
- Use `SequentialAgent` with 3 `LlmAgent` sub-agents
- Each stage should provide specific feedback
- Test with a sample code snippet

**Hint:** Each reviewer builds on findings from previous stage.

In [None]:
# Exercise 1: Your code here

# TODO: Create 3 reviewer agents
# syntax_checker = LlmAgent(...)
# security_reviewer = LlmAgent(...)
# performance_optimizer = LlmAgent(...)

# TODO: Create sequential pipeline
# code_review_pipeline = SequentialAgent(
#     name="code_review_pipeline",
#     sub_agents=[syntax_checker, security_reviewer, performance_optimizer]
# )

# TODO: Test with sample code
# sample_code = "def add(a, b): return a + b"
# Review the code through the pipeline

### Exercise 2: Add a 4th Parallel Searcher (Beginner)

**Task:** Extend the parallel research system with a 4th searcher.

**Requirements:**
1. Create a "FAQ Searcher" agent with mock FAQ data
2. Add it to the `ParallelAgent` sub_agents list
3. Test that all 4 searchers run in parallel

**Hint:** Follow the same pattern as existing searchers.

In [None]:
# Exercise 2: Your code here

# TODO: Create mock FAQ data
# FAQ_DATA = [...]

# TODO: Create FAQ searcher agent
# faq_searcher = LlmAgent(...)

# TODO: Recreate parallel agent with 4 searchers
# extended_parallel_researcher = ParallelAgent(
#     name="extended_researcher",
#     sub_agents=[kb_searcher, ticket_searcher, docs_searcher, faq_searcher]
# )

# TODO: Test it

### Exercise 3: Fan-Out/Gather Pattern (Advanced)

**Task:** Combine Sequential and Parallel agents in a fan-out/gather pattern.

**Requirements:**
1. Create a SequentialAgent with 3 stages:
   - Stage 1: Prepare query (1 agent)
   - Stage 2: Search multiple sources (ParallelAgent with 3 searchers)
   - Stage 3: Aggregate results (1 agent)
2. Test the complete workflow

**Challenge:** Show that parallel stage runs faster than sequential would.

In [None]:
# Exercise 3: Your code here

# TODO: Create query preparer
# query_preparer = LlmAgent(...)

# TODO: Use existing parallel_researcher or create new one

# TODO: Create results aggregator
# results_aggregator = LlmAgent(...)

# TODO: Combine in sequential wrapper
# fan_out_gather_pipeline = SequentialAgent(
#     name="fan_out_gather",
#     sub_agents=[query_preparer, parallel_researcher, results_aggregator]
# )

# TODO: Test and time it

### Exercise 4: Performance Comparison (Intermediate)

**Task:** Compare sequential vs parallel execution times.

**Requirements:**
1. Create a simple task that can run sequentially or in parallel
2. Time both approaches with the same input
3. Calculate the speedup: `sequential_time / parallel_time`
4. Analyze when the speedup is worth the cost

**Goal:** Understand the speed vs cost trade-off empirically.

In [None]:
# Exercise 4: Your code here

# TODO: Create 3 simple agents (e.g., summarizers)

# TODO: Create sequential version
# sequential_test = SequentialAgent(...)

# TODO: Create parallel version
# parallel_test = ParallelAgent(...)

# TODO: Time both with same input
# import time
# start = time.time()
# # Run sequential
# sequential_time = time.time() - start

# start = time.time()
# # Run parallel
# parallel_time = time.time() - start

# TODO: Calculate and display speedup
# speedup = sequential_time / parallel_time
# print(f"Speedup: {speedup:.2f}x")

---

## 🎯 Part 8: Key Takeaways

Congratulations! You've mastered workflow orchestration patterns in ADK!

### What You Learned ✅

1. **Sequential Workflows (SequentialAgent)**
   - Execute agents in strict order
   - Each stage sees previous results
   - Perfect for pipelines and assembly lines
   - Deterministic, no LLM decides flow

2. **Parallel Execution (ParallelAgent)**
   - Run agents simultaneously
   - ~Nx faster (N = number of agents)
   - Perfect for independent information gathering
   - Same total cost, but higher instant load

3. **When to Use Each**
   - Sequential: Steps depend on each other
   - Parallel: Steps are independent
   - Combine them: Fan-out/gather pattern

4. **Performance Trade-offs**
   - Parallel is faster, not cheaper
   - May need higher API rate limits
   - Worth it for interactive/time-sensitive tasks


### Production Best Practices 💡

1. **Choose the Right Pattern:**
   - Need one expert? → Hierarchical Routing
   - Multi-stage process? → Sequential
   - Parallel info gathering? → Parallel

2. **Optimize Performance:**
   - Cache frequently used results
   - Batch similar requests
   - Use cheaper models where possible

3. **Monitor Costs:**
   - Track API usage per workflow
   - Measure time savings vs cost
   - Adjust based on SLAs and budget

### Real-World Applications

**Sequential Workflows:**
- Document processing pipelines
- Multi-stage approvals
- Manufacturing processes
- Data ETL pipelines

**Parallel Execution:**
- Multi-source research
- API aggregation
- Competitive analysis
- Risk assessment from multiple models

**Combined Patterns:**
- E-commerce: Search (parallel) → Filter (sequential) → Rank (sequential)
- Healthcare: Gather (parallel) → Analyze (sequential) → Recommend (sequential)
- Finance: Fetch data (parallel) → Calculate (sequential) → Report (sequential)


### Resources 📚

- [ADK Sequential Agents](https://google.github.io/adk-docs/agents/workflow-agents/sequential-agents/)
- [ADK Parallel Agents](https://google.github.io/adk-docs/agents/workflow-agents/parallel-agents/)
- [ADK Workflow Patterns](https://google.github.io/adk-docs/agents/workflow-agents/)
- [OpenAI Rate Limits](https://platform.openai.com/docs/guides/rate-limits)

---
