# Lab 3: Multi-Tool Workflow Orchestration

**Duration:** 90-120 minutes  
**Level:** Advanced

## Learning Objectives

By the end of this lab, you will be able to:
1. Chain multiple tools sequentially with parameter passing
2. Execute tools in parallel with dependency management
3. Build conditional workflows with branching logic
4. Create DAG-based workflows for optimal parallelization
5. Implement the saga pattern for error recovery
6. Monitor and visualize workflow execution

## Setup

In [None]:
# Install required packages
!pip install openai networkx matplotlib -q

In [None]:
import os
import json
import time
import asyncio
from typing import Dict, List, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import networkx as nx
import matplotlib.pyplot as plt
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

print("✓ Setup complete")

## Exercise 1: Sequential Tool Chaining

Chain tools together, passing outputs as inputs.

**Task:** Build a ToolChain that executes tools in sequence with parameter resolution.

In [None]:
@dataclass
class ToolCall:
    """A tool call in a chain."""
    name: str
    params: Dict[str, Any]
    output_key: str  # Where to store output

@dataclass
class ChainResult:
    """Result of chain execution."""
    success: bool
    outputs: Dict[str, Any] = field(default_factory=dict)
    errors: List[str] = field(default_factory=list)

# TODO: Implement ToolChain
class ToolChain:
    """Execute tools sequentially with parameter passing."""
    
    def __init__(self, tools: Dict[str, Callable]):
        # TODO: Store available tools
        pass
    
    def resolve_params(
        self,
        params: Dict[str, Any],
        context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Resolve parameters using context.
        
        Example:
            params = {"location": "{city}", "units": "celsius"}
            context = {"city": "London"}
            -> {"location": "London", "units": "celsius"}
        """
        # TODO: Replace {key} with values from context
        pass
    
    def execute(self, chain: List[ToolCall]) -> ChainResult:
        """Execute chain of tools."""
        context = {}
        
        for step in chain:
            try:
                # TODO: Get tool function
                # TODO: Resolve parameters
                # TODO: Execute tool
                # TODO: Store result in context
                pass
            except Exception as e:
                # TODO: Record error and stop
                pass
        
        # TODO: Return result
        pass

# Test chaining
def get_user(user_id: str) -> dict:
    """Get user data."""
    return {"id": user_id, "name": "Alice", "city": "London"}

def get_weather(location: str) -> dict:
    """Get weather for location."""
    return {"location": location, "temp": 15, "condition": "Cloudy"}

def send_notification(user_name: str, message: str) -> dict:
    """Send notification."""
    return {"sent": True, "to": user_name, "message": message}

tools = {
    "get_user": get_user,
    "get_weather": get_weather,
    "send_notification": send_notification
}

chain = ToolChain(tools)

# Build chain: get user -> get weather -> send notification
workflow = [
    ToolCall(
        name="get_user",
        params={"user_id": "123"},
        output_key="user"
    ),
    ToolCall(
        name="get_weather",
        params={"location": "{user.city}"},
        output_key="weather"
    ),
    ToolCall(
        name="send_notification",
        params={
            "user_name": "{user.name}",
            "message": "Weather in {weather.location}: {weather.temp}°C, {weather.condition}"
        },
        output_key="notification"
    )
]

result = chain.execute(workflow)
print(f"Success: {result.success}")
print(f"Outputs: {json.dumps(result.outputs, indent=2)}")

## Exercise 2: Parallel Workflow Execution

Execute independent tools in parallel.

**Task:** Build a WorkflowExecutor that identifies and runs parallel steps.

In [None]:
@dataclass
class WorkflowStep:
    """A step in a workflow."""
    id: str
    tool_name: str
    params: Dict[str, Any]
    depends_on: Set[str] = field(default_factory=set)

# TODO: Implement WorkflowExecutor
class WorkflowExecutor:
    """Execute workflows with parallel steps."""
    
    def __init__(self, tools: Dict[str, Callable]):
        # TODO: Store tools
        pass
    
    async def execute_step(
        self,
        step: WorkflowStep,
        context: Dict[str, Any]
    ) -> Any:
        """Execute a single step."""
        # TODO: Get tool
        # TODO: Resolve parameters from context
        # TODO: Execute (with small delay for demo)
        pass
    
    async def execute_workflow(self, steps: List[WorkflowStep]) -> Dict[str, Any]:
        """Execute workflow with parallel execution."""
        context = {}
        completed = set()
        
        while len(completed) < len(steps):
            # TODO: Find steps ready to execute (dependencies met)
            # TODO: Execute ready steps in parallel
            # TODO: Store results in context
            # TODO: Mark as completed
            pass
        
        return context

# Test parallel execution
def fetch_user_profile(user_id: str) -> dict:
    """Fetch user profile."""
    time.sleep(1)  # Simulate API call
    return {"id": user_id, "name": "Alice"}

def fetch_user_orders(user_id: str) -> dict:
    """Fetch user orders."""
    time.sleep(1)
    return {"user_id": user_id, "orders": ["Order1", "Order2"]}

def fetch_user_reviews(user_id: str) -> dict:
    """Fetch user reviews."""
    time.sleep(1)
    return {"user_id": user_id, "reviews": ["Review1"]}

def aggregate_user_data(profile: dict, orders: dict, reviews: dict) -> dict:
    """Aggregate all user data."""
    return {
        "profile": profile,
        "order_count": len(orders["orders"]),
        "review_count": len(reviews["reviews"])
    }

tools = {
    "fetch_user_profile": fetch_user_profile,
    "fetch_user_orders": fetch_user_orders,
    "fetch_user_reviews": fetch_user_reviews,
    "aggregate_user_data": aggregate_user_data
}

executor = WorkflowExecutor(tools)

# Define workflow with parallel steps
workflow = [
    WorkflowStep(
        id="profile",
        tool_name="fetch_user_profile",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="orders",
        tool_name="fetch_user_orders",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="reviews",
        tool_name="fetch_user_reviews",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="aggregate",
        tool_name="aggregate_user_data",
        params={
            "profile": "{profile}",
            "orders": "{orders}",
            "reviews": "{reviews}"
        },
        depends_on={"profile", "orders", "reviews"}
    )
]

start = time.time()
result = await executor.execute_workflow(workflow)
duration = time.time() - start

print(f"Execution time: {duration:.2f}s")
print(f"Result: {json.dumps(result.get('aggregate'), indent=2)}")
print(f"\nExpected: ~2s (parallel) vs ~4s (sequential)")

## Exercise 3: Conditional Workflows

Add branching logic to workflows.

**Task:** Build conditional execution based on previous step results.

In [None]:
@dataclass
class Condition:
    """A condition to evaluate."""
    key: str  # Context key to check
    operator: str  # eq, ne, gt, lt, gte, lte, in
    value: Any
    
    def evaluate(self, context: Dict[str, Any]) -> bool:
        """Evaluate condition."""
        # TODO: Get value from context using key
        # TODO: Apply operator
        pass

@dataclass
class ConditionalStep:
    """A workflow step with conditions."""
    id: str
    tool_name: str
    params: Dict[str, Any]
    conditions: List[Condition] = field(default_factory=list)
    depends_on: Set[str] = field(default_factory=set)
    
    def should_execute(self, context: Dict[str, Any]) -> bool:
        """Check if all conditions are met."""
        # TODO: Evaluate all conditions
        pass

# TODO: Implement ConditionalWorkflowExecutor
class ConditionalWorkflowExecutor:
    """Execute workflows with conditional steps."""
    
    def __init__(self, tools: Dict[str, Callable]):
        # TODO: Store tools
        pass
    
    def execute_workflow(self, steps: List[ConditionalStep]) -> Dict[str, Any]:
        """Execute workflow with conditions."""
        context = {}
        completed = set()
        skipped = set()
        
        for step in steps:
            # TODO: Check dependencies are met
            # TODO: Check conditions
            # TODO: Execute if conditions met, skip otherwise
            pass
        
        context["_skipped"] = list(skipped)
        return context

# Test conditional execution
def check_inventory(product_id: str) -> dict:
    """Check product inventory."""
    return {"product_id": product_id, "in_stock": True, "quantity": 5}

def calculate_price(product_id: str, quantity: int) -> dict:
    """Calculate price."""
    return {"product_id": product_id, "total": quantity * 10.0}

def process_payment(total: float) -> dict:
    """Process payment."""
    return {"payment_id": "PAY123", "amount": total, "status": "completed"}

def send_error_notification(product_id: str) -> dict:
    """Send out of stock notification."""
    return {"sent": True, "reason": "out_of_stock"}

tools = {
    "check_inventory": check_inventory,
    "calculate_price": calculate_price,
    "process_payment": process_payment,
    "send_error_notification": send_error_notification
}

executor = ConditionalWorkflowExecutor(tools)

# Define conditional workflow
workflow = [
    ConditionalStep(
        id="inventory",
        tool_name="check_inventory",
        params={"product_id": "PROD123"}
    ),
    ConditionalStep(
        id="price",
        tool_name="calculate_price",
        params={"product_id": "PROD123", "quantity": 2},
        conditions=[
            Condition(key="inventory.in_stock", operator="eq", value=True)
        ],
        depends_on={"inventory"}
    ),
    ConditionalStep(
        id="payment",
        tool_name="process_payment",
        params={"total": "{price.total}"},
        conditions=[
            Condition(key="inventory.in_stock", operator="eq", value=True)
        ],
        depends_on={"price"}
    ),
    ConditionalStep(
        id="error_notification",
        tool_name="send_error_notification",
        params={"product_id": "PROD123"},
        conditions=[
            Condition(key="inventory.in_stock", operator="eq", value=False)
        ],
        depends_on={"inventory"}
    )
]

result = executor.execute_workflow(workflow)
print("Workflow result:")
print(json.dumps({k: v for k, v in result.items() if not k.startswith("_")}, indent=2))
print(f"\nSkipped steps: {result['_skipped']}")

## Exercise 4: DAG Workflows

Use directed acyclic graphs for optimal parallelization.

**Task:** Build DAG-based workflows using networkx.

In [None]:
# TODO: Implement DAGWorkflow
class DAGWorkflow:
    """Workflow represented as a DAG."""
    
    def __init__(self, tools: Dict[str, Callable]):
        self.tools = tools
        # TODO: Initialize networkx DiGraph
        pass
    
    def add_step(
        self,
        step_id: str,
        tool_name: str,
        params: Dict[str, Any]
    ):
        """Add a step to the workflow."""
        # TODO: Add node with step data
        pass
    
    def add_dependency(self, from_step: str, to_step: str):
        """Add dependency between steps."""
        # TODO: Add edge from from_step to to_step
        pass
    
    def validate(self) -> bool:
        """Check if DAG is valid (no cycles)."""
        # TODO: Use nx.is_directed_acyclic_graph
        pass
    
    def get_execution_order(self) -> List[List[str]]:
        """
        Get execution order grouped by level.
        Steps in same level can run in parallel.
        """
        # TODO: Use nx.topological_generations
        pass
    
    async def execute(self) -> Dict[str, Any]:
        """Execute workflow."""
        if not self.validate():
            raise ValueError("Workflow contains cycles")
        
        context = {}
        execution_order = self.get_execution_order()
        
        for level in execution_order:
            # TODO: Execute all steps in this level in parallel
            # TODO: Store results in context
            pass
        
        return context
    
    def visualize(self):
        """Visualize the workflow DAG."""
        plt.figure(figsize=(12, 8))
        pos = nx.spring_layout(self.graph)
        nx.draw(
            self.graph,
            pos,
            with_labels=True,
            node_color='lightblue',
            node_size=2000,
            font_size=10,
            arrows=True
        )
        plt.title("Workflow DAG")
        plt.tight_layout()
        plt.show()

# Test DAG workflow
def step_a() -> str:
    time.sleep(0.5)
    return "A done"

def step_b() -> str:
    time.sleep(0.5)
    return "B done"

def step_c(a_result: str) -> str:
    time.sleep(0.5)
    return f"C done (after {a_result})"

def step_d(a_result: str, b_result: str) -> str:
    time.sleep(0.5)
    return f"D done (after {a_result} and {b_result})"

def step_e(c_result: str, d_result: str) -> str:
    time.sleep(0.5)
    return f"E done (after {c_result} and {d_result})"

tools = {
    "step_a": step_a,
    "step_b": step_b,
    "step_c": step_c,
    "step_d": step_d,
    "step_e": step_e
}

workflow = DAGWorkflow(tools)

# Build DAG
workflow.add_step("a", "step_a", {})
workflow.add_step("b", "step_b", {})
workflow.add_step("c", "step_c", {"a_result": "{a}"})
workflow.add_step("d", "step_d", {"a_result": "{a}", "b_result": "{b}"})
workflow.add_step("e", "step_e", {"c_result": "{c}", "d_result": "{d}"})

workflow.add_dependency("a", "c")
workflow.add_dependency("a", "d")
workflow.add_dependency("b", "d")
workflow.add_dependency("c", "e")
workflow.add_dependency("d", "e")

# Visualize
workflow.visualize()

# Execute
print("\nExecution order:")
for i, level in enumerate(workflow.get_execution_order()):
    print(f"Level {i}: {level} (parallel)")

start = time.time()
result = await workflow.execute()
duration = time.time() - start

print(f"\nExecution time: {duration:.2f}s")
print(f"Expected: ~1.5s (parallel) vs ~2.5s (sequential)")
print(f"\nResults: {json.dumps(result, indent=2)}")

## Exercise 5: Saga Pattern for Error Recovery

Implement compensating transactions for distributed workflows.

**Task:** Build a saga executor that can rollback on failure.

In [None]:
@dataclass
class CompensatingAction:
    """Action to undo a step."""
    tool_name: str
    params: Dict[str, Any]

@dataclass
class SagaStep:
    """Step with compensation."""
    id: str
    tool_name: str
    params: Dict[str, Any]
    compensation: Optional[CompensatingAction] = None

# TODO: Implement SagaWorkflowExecutor
class SagaWorkflowExecutor:
    """Execute workflows with saga pattern."""
    
    def __init__(self, tools: Dict[str, Callable]):
        # TODO: Store tools
        pass
    
    def execute_saga(self, steps: List[SagaStep]) -> Dict[str, Any]:
        """Execute saga workflow."""
        context = {}
        completed_steps = []
        
        try:
            for step in steps:
                # TODO: Execute step
                # TODO: Store result and completed step
                pass
            
            return {"success": True, "context": context}
            
        except Exception as e:
            # TODO: Rollback completed steps
            # TODO: Return error with rollback info
            pass
    
    def _rollback(self, completed_steps: List[SagaStep], context: Dict[str, Any]):
        """Execute compensating actions in reverse order."""
        # TODO: Iterate completed steps in reverse
        # TODO: Execute compensating actions
        pass

# Test saga pattern
def reserve_inventory(product_id: str, quantity: int) -> dict:
    """Reserve inventory."""
    print(f"✓ Reserved {quantity} units of {product_id}")
    return {"reservation_id": "RES123", "product_id": product_id, "quantity": quantity}

def release_inventory(reservation_id: str) -> dict:
    """Release inventory reservation."""
    print(f"↺ Released reservation {reservation_id}")
    return {"released": True}

def charge_payment(amount: float) -> dict:
    """Charge payment."""
    print(f"✓ Charged ${amount}")
    return {"payment_id": "PAY123", "amount": amount}

def refund_payment(payment_id: str) -> dict:
    """Refund payment."""
    print(f"↺ Refunded payment {payment_id}")
    return {"refunded": True}

def ship_order(order_id: str) -> dict:
    """Ship order - this will fail for demo."""
    print(f"✗ Shipping failed for {order_id}")
    raise Exception("Shipping service unavailable")

tools = {
    "reserve_inventory": reserve_inventory,
    "release_inventory": release_inventory,
    "charge_payment": charge_payment,
    "refund_payment": refund_payment,
    "ship_order": ship_order
}

executor = SagaWorkflowExecutor(tools)

# Define saga
saga = [
    SagaStep(
        id="reserve",
        tool_name="reserve_inventory",
        params={"product_id": "PROD123", "quantity": 2},
        compensation=CompensatingAction(
            tool_name="release_inventory",
            params={"reservation_id": "{reserve.reservation_id}"}
        )
    ),
    SagaStep(
        id="payment",
        tool_name="charge_payment",
        params={"amount": 100.0},
        compensation=CompensatingAction(
            tool_name="refund_payment",
            params={"payment_id": "{payment.payment_id}"}
        )
    ),
    SagaStep(
        id="shipping",
        tool_name="ship_order",
        params={"order_id": "ORD123"}
    )
]

print("=== Executing Saga ===")
result = executor.execute_saga(saga)

print(f"\nSuccess: {result['success']}")
if not result['success']:
    print(f"Error: {result['error']}")
    print(f"Rolled back: {result['rolled_back']} steps")

## Exercise 6: Workflow Monitoring

Track and visualize workflow execution.

**Task:** Build monitoring for workflow performance and status.

In [None]:
class WorkflowStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class StepExecution:
    """Record of step execution."""
    step_id: str
    tool_name: str
    status: WorkflowStatus
    start_time: datetime
    end_time: Optional[datetime] = None
    duration_ms: Optional[float] = None
    error: Optional[str] = None

# TODO: Implement WorkflowMonitor
class WorkflowMonitor:
    """Monitor workflow execution."""
    
    def __init__(self):
        # TODO: Initialize execution records
        pass
    
    def start_step(self, step_id: str, tool_name: str):
        """Record step start."""
        # TODO: Create execution record
        pass
    
    def complete_step(self, step_id: str):
        """Record step completion."""
        # TODO: Update execution record
        pass
    
    def fail_step(self, step_id: str, error: str):
        """Record step failure."""
        # TODO: Update execution record with error
        pass
    
    def get_workflow_summary(self) -> Dict:
        """Get workflow execution summary."""
        # TODO: Calculate total duration
        # TODO: Count completed, failed steps
        # TODO: Calculate success rate
        pass
    
    def get_step_timeline(self) -> List[Dict]:
        """Get timeline of step executions."""
        # TODO: Return sorted execution records
        pass
    
    def visualize_timeline(self):
        """Visualize execution timeline."""
        # TODO: Create gantt-style chart
        pass

# TODO: Add monitoring to workflow executor
class MonitoredWorkflowExecutor(WorkflowExecutor):
    """Workflow executor with monitoring."""
    
    def __init__(self, tools: Dict[str, Callable]):
        super().__init__(tools)
        self.monitor = WorkflowMonitor()
    
    async def execute_step(
        self,
        step: WorkflowStep,
        context: Dict[str, Any]
    ) -> Any:
        """Execute step with monitoring."""
        # TODO: Record start
        try:
            # TODO: Execute step (use parent class)
            result = None
            # TODO: Record completion
            return result
        except Exception as e:
            # TODO: Record failure
            raise

# Test monitoring
tools = {
    "fetch_user_profile": fetch_user_profile,
    "fetch_user_orders": fetch_user_orders,
    "fetch_user_reviews": fetch_user_reviews,
    "aggregate_user_data": aggregate_user_data
}

executor = MonitoredWorkflowExecutor(tools)

workflow = [
    WorkflowStep(
        id="profile",
        tool_name="fetch_user_profile",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="orders",
        tool_name="fetch_user_orders",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="reviews",
        tool_name="fetch_user_reviews",
        params={"user_id": "123"}
    ),
    WorkflowStep(
        id="aggregate",
        tool_name="aggregate_user_data",
        params={
            "profile": "{profile}",
            "orders": "{orders}",
            "reviews": "{reviews}"
        },
        depends_on={"profile", "orders", "reviews"}
    )
]

result = await executor.execute_workflow(workflow)

# Get monitoring data
summary = executor.monitor.get_workflow_summary()
print("\n=== Workflow Summary ===")
print(json.dumps(summary, indent=2))

timeline = executor.monitor.get_step_timeline()
print("\n=== Step Timeline ===")
for step in timeline:
    print(f"{step['step_id']}: {step['status']} ({step.get('duration_ms', 0):.0f}ms)")

executor.monitor.visualize_timeline()

## Bonus Exercise: Dynamic Workflow Generation

Use LLM to generate workflows from natural language.

**Task:** Build a system that creates workflows from user goals.

In [None]:
class DynamicWorkflowBuilder:
    """Generate workflows using LLM."""
    
    def __init__(self, tools: Dict[str, Callable]):
        self.tools = tools
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    
    def generate_workflow(self, goal: str) -> List[WorkflowStep]:
        """Generate workflow from goal."""
        # Build tool descriptions
        tool_descriptions = "\n".join([
            f"- {name}: {func.__doc__ or 'No description'}"
            for name, func in self.tools.items()
        ])
        
        prompt = f"""
Generate a workflow to achieve this goal: {goal}

Available tools:
{tool_descriptions}

Return a JSON array of steps with format:
{{
  "id": "step_id",
  "tool_name": "tool_name",
  "params": {{"param": "value"}},
  "depends_on": ["other_step_id"]
}}

Use {{step_id.key}} syntax to reference outputs from previous steps.
"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        workflow_json = json.loads(response.choices[0].message.content)
        
        # Convert to WorkflowStep objects
        steps = []
        for step_data in workflow_json.get("steps", []):
            steps.append(WorkflowStep(
                id=step_data["id"],
                tool_name=step_data["tool_name"],
                params=step_data["params"],
                depends_on=set(step_data.get("depends_on", []))
            ))
        
        return steps

# Test dynamic workflow generation
tools = {
    "get_user": lambda user_id: {"id": user_id, "name": "Alice", "email": "alice@example.com"},
    "get_orders": lambda user_id: {"orders": ["Order1", "Order2"]},
    "calculate_total": lambda orders: {"total": len(orders) * 50.0},
    "send_email": lambda email, message: {"sent": True, "to": email}
}

builder = DynamicWorkflowBuilder(tools)

# Generate workflow from natural language
goal = "Get user's orders, calculate total, and email them a summary"
print(f"Goal: {goal}\n")

workflow = builder.generate_workflow(goal)

print("Generated workflow:")
for step in workflow:
    print(f"- {step.id}: {step.tool_name}({step.params})")
    if step.depends_on:
        print(f"  Depends on: {step.depends_on}")

# Execute generated workflow
executor = MonitoredWorkflowExecutor(tools)
result = await executor.execute_workflow(workflow)

print("\nExecution result:")
print(json.dumps(result, indent=2))

## Summary

### Key Concepts Covered

1. **Sequential Chaining**: Pass outputs between tools
2. **Parallel Execution**: Run independent steps concurrently
3. **Conditional Logic**: Branch based on runtime conditions
4. **DAG Workflows**: Optimal parallelization with dependency graphs
5. **Saga Pattern**: Compensating transactions for error recovery
6. **Monitoring**: Track execution performance and status
7. **Dynamic Generation**: LLM-powered workflow creation

### Production Considerations

- **State Management**: Persist workflow state for resume capability
- **Error Recovery**: Implement retry and fallback strategies
- **Timeouts**: Set limits on step and workflow duration
- **Cancellation**: Support workflow cancellation
- **Observability**: Log all state transitions
- **Testing**: Test complex dependency graphs thoroughly
- **Validation**: Validate DAG structure before execution
- **Resource Limits**: Control concurrent step execution
- **Audit Trail**: Track who initiated workflows
- **Versioning**: Version workflow definitions

### Next Steps

- Study distributed workflow engines (Temporal, Airflow)
- Explore workflow patterns in microservices
- Learn about saga pattern in distributed systems
- Practice with real-world workflow requirements