In [None]:
# Workflow Orchestration Patterns

This notebook demonstrates advanced workflow orchestration patterns including:
- Parallel execution with concurrency control
- Sub-workflows and workflow composition
- Queue management and rate limiting
- Scheduling and cron-based execution
- Lifecycle handlers and notifications


In [None]:
import os
from dotenv import load_dotenv
from kubiya_workflow_sdk import workflow, step, execute_workflow_logged
from kubiya_workflow_sdk.execution import LogLevel
from kubiya_workflow_sdk.dsl import retry_policy, when, continue_on

# Load environment variables
load_dotenv()

api_key = os.getenv("KUBIYA_API_KEY")
if not api_key:
    raise ValueError("KUBIYA_API_KEY is required")

print("✅ SDK loaded for orchestration examples")
print(f"🔗 Connected to Kubiya API")


In [None]:
# Example 1: Parallel Execution with Concurrency Control
parallel_workflow = (
    workflow("parallel-processing")
    .description("Process multiple files in parallel")
    .max_active_steps(3)  # Limit concurrent steps
)

# Add parallel steps
files = ["data1.csv", "data2.csv", "data3.csv", "data4.csv", "data5.csv"]
parallel_workflow.parallel_steps(
    name="process-files",
    items=files,
    command="echo 'Processing ${ITEM}' && sleep 2",
    max_concurrent=2  # Process max 2 files at a time
)

# Add aggregation step after parallel processing
parallel_workflow.step(
    "aggregate-results",
    "echo 'Aggregating results from all files'"
)

print("📋 Parallel workflow created with concurrency control")
print(f"   Files to process: {len(files)}")
print(f"   Max concurrent: 2")


In [None]:
# Example 2: Sub-workflow Composition
main_workflow = (
    workflow("etl-pipeline")
    .description("Main ETL pipeline with sub-workflows")
    .params(date="${DATE:-$(date +%Y-%m-%d)}")
)

# Step 1: Extract data (could be a sub-workflow)
main_workflow.step(
    "extract-data",
    "echo 'Extracting data for date: ${date}'"
)

# Step 2: Transform data with sub-workflow
main_workflow.sub_workflow(
    name="transform-data",
    workflow="data-transformation",  # Reference to another workflow
    params='{"input_date": "${date}", "format": "parquet"}'
)

# Step 3: Load results
main_workflow.step(
    "load-results",
    "echo 'Loading transformed data to warehouse'"
)

print("📋 Main workflow with sub-workflow composition created")


In [None]:
# Execute the parallel workflow with real API
print("\n🚀 Executing parallel workflow...")
print("=" * 60)

try:
    # Execute the workflow
    for event in execute_workflow_logged(
        workflow_definition=parallel_workflow.to_dict(),
        api_key=api_key,
        log_level=LogLevel.NORMAL
    ):
        pass  # The logger handles output
        
    print("\n✅ Parallel workflow execution complete!")
    
except Exception as e:
    print(f"\n❌ Error: {str(e)}")
    print("\nNote: Parallel execution requires proper runner configuration")
