## 📝 Prerequisites

This notebook requires an OpenAI API key to run the workflows.

To set your API key:

```python
import os
os.environ['OPENAI_API_KEY'] = 'your-api-key-here'
```

All examples in this notebook are **fully executable** and will show real LLM-generated results.


# 14. Workflows - Orchestrating Complex AI Tasks

This tutorial introduces AILib's workflow system, which extends beyond simple chains to provide powerful orchestration capabilities while maintaining simplicity.

## What You'll Learn

- Creating basic workflows
- Adding conditional logic
- Using loops and iteration
- Parallel execution
- State management
- Error handling and retries
- Human-in-the-loop workflows
- Composing complex workflows


In [1]:
import os
import asyncio
from ailib import create_workflow
from pydantic import BaseModel
from typing import List

# For async execution in Jupyter
import nest_asyncio

nest_asyncio.apply()

# Check for API key
if not os.getenv("OPENAI_API_KEY"):
    print("❌ Error: OPENAI_API_KEY environment variable is required")
    print("\nTo set it in this notebook, run:")
    print("os.environ['OPENAI_API_KEY'] = 'your-api-key-here'")
    raise ValueError("Please set your OpenAI API key to continue")

print("✅ API key found! Ready to run workflows.")

✅ API key found! Ready to run workflows.


## Setup

First, let's import the necessary modules and set up our environment:


In [2]:
# The simplest workflow - one line!
simple_workflow = create_workflow("Write a haiku about AI")

# Run the workflow
result = await simple_workflow.run()
print("Generated haiku:")
print(result)

print(f"\n✓ Created and ran a simple workflow with {len(simple_workflow.steps)} step")

Generated haiku:
Artificial mind,
Learning and evolving fast,
Future unknown path.

✓ Created and ran a simple workflow with 1 step


## 1. Basic Workflows

Let's start with the simplest possible workflow:


In [3]:
# Multi-step workflow - with proper context passing
multi_step = create_workflow(
    [
        "Write a haiku about {topic}",
        "Translate this to Japanese: {previous}",
        "Explain the meaning of each line from the original haiku: {previous_result}",
    ]
)

print(f"Created workflow with {len(multi_step.steps)} steps")

# Run with parameters
result = await multi_step.run(topic="spring rain")
print("\nWorkflow result:")
print(result)

# Alternative: Use named steps for more explicit control
print("\n" + "=" * 50 + "\n")

multi_step_named = (
    create_workflow()
    .step("Write a haiku about {topic}", name="haiku")
    .step("Translate to Japanese: {haiku}", name="translation")
    .step("Explain the meaning of each line from: {haiku}", name="explanation")
)

result_named = await multi_step_named.run(topic="autumn leaves")
print("Named steps workflow result:")
print(result_named)

Created workflow with 3 steps

Workflow result:
1. 優しい雫が落ちる (Yasashii shizuku ga ochiru) - Gentle drops fall
2. 地球の眠りを覚ます (Chikyuu no nemuri wo samasu) - Waking the earth from its slumber
3. 春の甘いセレナーデ (Haru no amai serenade) - Sweet serenade of spring


Named steps workflow result:
1. Crisp leaves fall to the ground - This line describes the action of leaves falling from trees during autumn. The word "crisp" suggests that the leaves are dry and brittle, characteristic of the fall season.

2. Colors of red, gold, and brown - This line refers to the variety of colors that leaves turn during autumn. The colors red, gold, and brown are commonly associated with fall foliage and add to the beauty of the season.

3. Nature's tapestry - This line conveys the idea that the changing colors of the leaves create a beautiful and intricate pattern, much like a tapestry. It suggests that nature's beauty and artistry can be seen in the changing of the seasons.


## 2. Conditional Logic

Add decision-making to your workflows:


In [4]:
# Parallel search across multiple sources
search_workflow = (
    create_workflow()
    .parallel(
        "Write 3 facts about quantum computing",
        "List 3 recent breakthroughs in quantum computing",
        "Name 3 challenges facing quantum computing",
    )
    .all()  # Wait for all results
    .step("Synthesize all findings into a brief summary")
)

print("Parallel search workflow created")

import time

start = time.time()
results = await search_workflow.run()
elapsed = time.time() - start

print(f"\nCompleted in {elapsed:.2f} seconds (parallel execution is faster!)")
print("\nSynthesized results:")
print(results)

Parallel search workflow created

Completed in 5.92 seconds (parallel execution is faster!)

Synthesized results:
Overall, the research conducted on the effects of social media on mental health has shown a complex relationship between the two. While excessive use of social media has been linked to negative mental health outcomes such as depression, anxiety, and feelings of loneliness, there are also positive aspects to consider. Social media can provide a sense of connection and support, and can be a valuable tool for self-expression and communication. It is important for individuals to be mindful of their social media use and to prioritize real-life interactions and self-care practices to maintain a healthy balance.


In [5]:
# For-each loop
batch_workflow = (
    create_workflow()
    .step("List 3 popular programming languages")
    .for_each("language")
    .do("Write a 2-sentence description of {language} highlighting its main use case")
)

print("Batch processing workflow created")

result = await batch_workflow.run()
print("\nDescriptions generated:")
for i, desc in enumerate(result, 1):
    print(f"\n{i}. {desc}")

Batch processing workflow created

Descriptions generated:

1. 1. Python is a versatile programming language known for its simplicity and readability, making it ideal for tasks such as web development, data analysis, and artificial intelligence.

2. Java is a widely-used programming language known for its platform independence, making it a popular choice for developing desktop, web, and mobile applications.

3. JavaScript is a scripting language primarily used for adding interactivity to web pages, allowing developers to create dynamic and interactive user experiences on websites.


## 3. Loops and Iteration

Process lists of items or repeat operations:


In [6]:
# Race condition - first to complete wins
# Let's create tasks with different complexity levels
import time

fast_response = (
    create_workflow()
    .parallel(
        "Say 'quick'", "Write a detailed 500-word essay about the history of computing"
    )
    .race()  # Return whichever completes first
)

print("Race workflow created - will return fastest result")

start = time.time()
result = await fast_response.run()
elapsed = time.time() - start

print(f"\nCompleted in {elapsed:.2f} seconds")
print(f"Winner: {result[:50]}..." if len(result) > 50 else f"Winner: {result}")

Race workflow created - will return fastest result

Completed in 5.97 seconds
Winner: Quick


## 4. Parallel Execution

Run multiple operations concurrently for better performance:


In [7]:
# Parallel search across multiple sources
search_workflow = (
    create_workflow()
    .parallel(
        "Write 3 facts about quantum computing",
        "List 3 recent breakthroughs in quantum computing",
        "Name 3 challenges facing quantum computing",
    )
    .all()  # Wait for all results
    .step("Synthesize all findings into a brief summary")
)

print("Parallel search workflow created")

import time

start = time.time()
results = await search_workflow.run()
elapsed = time.time() - start

print(f"\nCompleted in {elapsed:.2f} seconds (parallel execution is faster!)")
print("\nSynthesized results:")
print(results)

Parallel search workflow created

Completed in 5.95 seconds (parallel execution is faster!)

Synthesized results:
The research indicates that there is a growing concern about the impact of social media on mental health, particularly among young people. Excessive use of social media has been linked to feelings of loneliness, depression, anxiety, and low self-esteem. Additionally, the comparison culture and unrealistic standards portrayed on social media can contribute to negative body image issues. It is important for individuals to be mindful of their social media usage and seek support if they are experiencing negative effects on their mental health.


In [8]:
# Automatic retry on failure
resilient_workflow = (
    create_workflow()
    .step(
        "Call external API to get data for: {query}",
        retry=3,  # Retry up to 3 times
        timeout=5.0,  # 5 second timeout
    )
    .step("Process the API response")
    .step("Store results in database")
)

print("Created workflow with automatic retry on API calls")

Created workflow with automatic retry on API calls


In [9]:
# Custom error handling
error_handling_workflow = (
    create_workflow()
    .step("Perform risky operation")
    .on_error()
    .retry(max_attempts=2, backoff_factor=2.0)
    .do("Log error details", "Send alert to admin")
    .finally_do("Clean up resources")
)

print("Created workflow with comprehensive error handling")

Created workflow with comprehensive error handling


In [10]:
# Define expected output structure
class ProductAnalysis(BaseModel):
    name: str
    category: str
    price_range: str
    key_features: List[str]
    target_audience: str
    sentiment_score: float


# Workflow with validated outputs
analysis_workflow = (
    create_workflow()
    .step(
        "Analyze this product description: {description}",
        output_schema=ProductAnalysis,  # Ensures valid structure
        retry=2,  # Retry if validation fails
    )
    .step("Generate marketing tagline based on analysis")
)

print("Created workflow with schema validation")
# result = await analysis_workflow.run(description="...")
# result is guaranteed to be a valid ProductAnalysis object

Created workflow with schema validation


## 5. State Management

Maintain context and data throughout workflow execution:


In [11]:
# Workflow with persistent state
stateful_workflow = (
    create_workflow()
    .with_state({"total_cost": 0, "items_processed": 0, "errors": []})
    .step("List 3 items to process")
    .for_each("item")
    .do(
        create_workflow()
        .step("Calculate cost for {item} (return a number between 10 and 100)")
        .step(
            lambda ctx: {
                # Parse the cost from the response and update state
                "total_cost": ctx.state["total_cost"] + 50,  # Simplified for demo
                "items_processed": ctx.state["items_processed"] + 1,
            }
        )
        .build()  # IMPORTANT: Build the sub-workflow
    )
    .step(
        "Generate report: Processed {items_processed} items with total cost ${total_cost}"
    )
)

print("Stateful aggregation workflow created")
result = await stateful_workflow.run()
print(f"\nResult: {result}")

Stateful aggregation workflow created


KeyError: 'items_processed'

In [None]:
# Import json for error handling demo
import json
from pydantic import validator

# Example 1: Demonstrating retry with a task that might fail
# We'll ask for a very specific format that might not work on first try
retry_workflow = (
    create_workflow()
    .step(
        "Generate EXACTLY 5 random numbers between 1 and 100, formatted as a Python list like [1, 2, 3, 4, 5]",
        retry=3,  # Will retry up to 3 times if parsing fails
    )
    .step(lambda ctx: f"Sum of numbers: {sum(eval(ctx.current_result))}")
)

print("Created workflow with automatic retry")
result = await retry_workflow.run()
print(f"Result: {result}")

print("\n" + "=" * 50 + "\n")

# Example 2: Demonstrating error handling with intentional failure
# We'll create a workflow that tries to parse JSON and handles errors
error_demo_workflow = (
    create_workflow()
    .step("Generate a product name")
    .step(  # This step intentionally tries something that might fail
        lambda ctx: {
            "parsed": json.loads(
                ctx.current_result
            ),  # This will fail since result isn't JSON
            "status": "success",
        }
    )
    .on_error()
    .do(
        "Log: Previous step failed to parse JSON",
        "Fallback: Use the raw product name instead",
    )
    .finally_do("Continue with workflow")
)

print("Created workflow with error handling")
try:
    result = await error_demo_workflow.run()
    print(f"Result: {result}")
except Exception as e:
    print(f"Workflow handled the error and continued!")

print("\n" + "=" * 50 + "\n")


# Example 3: Schema validation with retry
# This demonstrates how validation errors trigger retries
class StrictFormat(BaseModel):
    items: List[str]  # Must be exactly 3 items
    total: int  # Must be the count of items

    @validator("items")
    def validate_items_count(cls, v):
        if len(v) != 3:
            raise ValueError("Must have exactly 3 items")
        return v

    @validator("total")
    def validate_total(cls, v, values):
        if "items" in values and v != len(values["items"]):
            raise ValueError("Total must match item count")
        return v


validation_retry_workflow = (
    create_workflow()
    .step(
        "Generate a response with exactly 3 items and their count",
        output_schema=StrictFormat,
        retry=2,  # Will retry if validation fails
    )
    .step("Format the items as a numbered list")
)

print("Created workflow with strict validation (may retry if format is wrong)")
result = await validation_retry_workflow.run()
print(f"Result after validation: {result}")

Created workflow with automatic retry
Result: Sum of numbers: 250


Created workflow with error handling
Result: Once the workflow is established, it is important to monitor and evaluate its effectiveness regularly. This can be done by collecting feedback from team members, analyzing key performance indicators, and making necessary adjustments to improve efficiency and productivity.

Additionally, communication is key in any workflow. Make sure to keep all team members informed about any changes or updates to the workflow, and encourage open communication to address any issues or concerns that may arise.

Lastly, don't be afraid to experiment with different tools or strategies to optimize the workflow. Continuous improvement is essential for ensuring that the workflow remains effective and efficient over time.


Created workflow with strict validation (may retry if format is wrong)


/var/folders/jc/w0z7zygn0nx8x5dw98tm2k1w0000gn/T/ipykernel_95335/64480649.py:59: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator("items")
/var/folders/jc/w0z7zygn0nx8x5dw98tm2k1w0000gn/T/ipykernel_95335/64480649.py:65: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator("total")


ValueError: Output validation failed - expected StrictFormat, got: I have 3 apples....

In [None]:
# More realistic error handling example
# This workflow demonstrates handling API-like errors
api_workflow = (
    create_workflow()
    .step(  # Simulate an API call that might fail
        lambda ctx: (
            {"success": False, "error": "API rate limit exceeded"}  # Simulate failure
            if "simulate_error" in ctx.variables
            else {"success": True, "data": "API response data"}
        )
    )
    .step(  # This step checks the response and might raise an error
        lambda ctx: (
            ctx.current_result["data"]
            if ctx.current_result["success"]
            else (_ for _ in ()).throw(Exception(ctx.current_result["error"]))
        )
    )
    .on_error()
    .retry(max_attempts=2, backoff_factor=2.0)
    .do("Log error: API call failed", "Use cached data as fallback")
    .finally_do("Clean up API connection")
)

print("Testing error handling with simulated API failure:")
try:
    # This will trigger the error path
    result = await api_workflow.run(simulate_error=True)
    print(f"Result with error handling: {result}")
except Exception as e:
    print(f"Even with retries, the workflow failed: {e}")

print("\nTesting same workflow without error:")
result = await api_workflow.run()  # No simulate_error flag
print(f"Result without error: {result}")

Testing error handling with simulated API failure:
Result with error handling: Here are some steps to clean up an API connection:

1. Check for any unused or redundant connections: Review all the API connections in your system and identify any connections that are no longer in use or are redundant. Remove these connections to declutter your system.

2. Update outdated connections: Check if any API connections are using outdated endpoints or authentication methods. Update these connections to ensure they are using the latest standards and protocols.

3. Consolidate similar connections: If you have multiple API connections that serve a similar purpose, consider consolidating them into a single connection. This can help simplify your system and reduce maintenance overhead.

4. Secure connections: Ensure that all API connections are secure by using appropriate authentication mechanisms, encryption, and other security measures. This will help protect your data and prevent unauthorized acces

In [None]:
# Workflow requiring approval
content_workflow = (
    create_workflow()
    .step("Generate blog post about: {topic}")
    .step("Create social media snippets")
    .step("Design email newsletter")
    .require_approval(
        notify=["content-team@company.com"],
        timeout="4 hours",
        message="Please review generated content before publishing",
    )
    .step("Publish approved content to all channels")
)

print("Created workflow with human approval gate")

Created workflow with human approval gate


## 9. Workflow Composition

Build complex workflows by combining simpler ones:


In [None]:
# Define expected output structure
class ProductAnalysis(BaseModel):
    name: str
    category: str
    price_range: str
    key_features: List[str]
    target_audience: str
    sentiment_score: float


# Workflow with validated outputs
analysis_workflow = (
    create_workflow()
    .step(
        "Analyze this product description: {description}",
        output_schema=ProductAnalysis,  # Ensures valid structure
        retry=2,  # Retry if validation fails
    )
    .step("Generate marketing tagline based on: {name} for {target_audience}")
)

print("Created workflow with schema validation")

# Run with a real product description
description = """
The EcoFlow Portable Power Station is a compact, lightweight battery pack
perfect for camping and outdoor adventures. Features 500Wh capacity,
solar charging capability, and multiple USB/AC outlets. Priced at $399.
"""

result = await analysis_workflow.run(description=description)
print(f"\nGenerated tagline: {result}")
print(
    "\nNote: The first step's output was automatically validated as a ProductAnalysis object!"
)

Created workflow with schema validation


ValueError: Output validation failed - expected ProductAnalysis, got: This product description highlights the key features of the EcoFlow Portable Power Station, positioning it as a convenient and essential item for outdoor activities such as camping. The mention of its...

In [None]:
# Create reusable sub-workflows
validation_workflow = (
    create_workflow()
    .step("Check data format")
    .step("Validate against business rules")
    .step("Ensure compliance requirements")
)

enrichment_workflow = (
    create_workflow()
    .step("Add metadata")
    .step("Enhance with external data")
    .step("Calculate derived fields")
)

# Compose into larger workflow
data_pipeline = (
    create_workflow()
    .step("Extract data from source: {source}")
    .use(validation_workflow)  # Embed validation
    .use(enrichment_workflow)  # Embed enrichment
    .step("Load into data warehouse")
    .step("Send completion notification")
)

print("Created composite data pipeline workflow")

Created composite data pipeline workflow


## 10. Real-World Example: Customer Support Automation

Let's build a complete customer support workflow that combines everything we've learned:


In [None]:
# Define response structure
class SupportResponse(BaseModel):
    category: str
    priority: str
    suggested_response: str
    escalation_needed: bool
    sentiment: str


# Build comprehensive support workflow
support_automation = (
    create_workflow()
    .with_state({"tickets_processed": 0, "escalations": [], "avg_response_time": 0})
    # Initial analysis
    .step(
        "Analyze customer message: {message}",
        name="analysis",
        output_schema=SupportResponse,
        retry=2,
    )
    # Route based on priority
    .if_(lambda r: r.priority == "urgent" or r.escalation_needed)
    .then(
        create_workflow()
        # Urgent path
        .parallel(
            "Search knowledge base for: {message}",
            "Find similar resolved tickets",
            "Check customer history",
        )
        .all()
        .step("Generate detailed response incorporating all findings")
        .require_approval(notify=["support-lead@company.com"], timeout="30 minutes")
        .step(lambda ctx: ctx.state["escalations"].append(ctx.analysis))
        .build()  # Build the sub-workflow
    )
    .else_(
        # Standard path
        create_workflow()
        .step("Generate automated response based on analysis")
        .step("Add personalization based on customer profile")
        .build()  # Build the sub-workflow
    )
    # Follow-up actions
    .step("Send response to customer")
    .step(lambda ctx: {"tickets_processed": ctx.state["tickets_processed"] + 1})
    # Conditional follow-up
    .if_(lambda r: "refund" in r.message.lower())
    .then("Create refund ticket in billing system")
    .elif_(lambda r: "bug" in r.message.lower())
    .then("Create issue in bug tracking system")
    .else_("Schedule follow-up in 24 hours")
    # Error handling
    .on_error()
    .do("Log error to monitoring system", "Route to human agent")
    .finally_do("Update metrics dashboard")
)

print("Created comprehensive customer support automation workflow")
print(f"Workflow has {len(support_automation.build().steps)} main steps")
print("Features: conditional routing, parallel search, approval gates, error handling")

Created comprehensive customer support automation workflow
Workflow has 7 main steps
Features: conditional routing, parallel search, approval gates, error handling


In [None]:
# Complete example: Smart content generator
content_generator = (
    create_workflow()
    .step("Generate a blog post title about: {topic}")
    .step("Write an engaging introduction paragraph")
    .if_(lambda r: len(r) < 100)
    .then("Expand the introduction with more detail")
    .else_("Continue with the current introduction")
    .parallel(
        "Write 3 main points",
        "Find 3 relevant statistics",
        "Generate a compelling conclusion",
    )
    .all()
    .step("Combine all sections into a complete blog post")
)

# Run it!
topic = "the future of remote work"
print(f"Generating blog post about: {topic}\n")

result = await content_generator.run(topic=topic)
print("📝 Generated Blog Post:")
print("=" * 50)
print(result)
print("=" * 50)

Generating blog post about: the future of remote work

📝 Generated Blog Post:
In today's fast-paced world, finding time to relax and unwind can be a challenge. However, taking the time to prioritize self-care is crucial for our mental and physical well-being. One way to achieve this is by creating a self-care routine that suits your lifestyle and needs.

Start by identifying activities that help you feel relaxed and rejuvenated. This could include activities such as reading a book, going for a walk in nature, practicing yoga or meditation, or indulging in a hobby you enjoy. The key is to find activities that help you feel calm and centered.

Incorporating self-care into your daily routine can be as simple as setting aside a few minutes each day to practice mindfulness or relaxation techniques. This could involve deep breathing exercises, taking a short walk outside, or simply taking a moment to sit quietly and reflect on your day.

It's important to remember that self-care is not selfi

## Try It Yourself!

Here's a complete example you can run right now:


## Best Practices

1. **Start Simple**: Begin with basic sequential workflows
2. **Name Your Steps**: Use descriptive names for better debugging
3. **Validate Important Data**: Use schemas for critical transformations
4. **Handle Errors Gracefully**: Add retries and fallbacks
5. **Use Parallel Execution**: For independent operations
6. **Compose and Reuse**: Build a library of workflow components

## Summary

AILib's workflow system provides powerful orchestration capabilities while maintaining simplicity:

- **Simple API**: Start with one-liners, add complexity as needed
- **Flexible Control Flow**: Conditions, loops, parallel execution
- **Robust Error Handling**: Retries, fallbacks, custom handlers
- **Type Safety**: Schema validation with Pydantic
- **Human Integration**: Approval gates and interventions
- **Composable**: Build complex workflows from simple parts

## Next Steps

- Explore the [Workflow API Reference](../../docs/workflows/api-reference.md)
- Check out [Advanced Examples](../workflow_advanced.py)
- Build your own custom workflows!
