Lightweight Workflow Library for Learning and Experimentation
TinyWorkflow is a simple, Python-first workflow library designed for learning workflow concepts, prototyping, and lightweight task orchestration. Perfect for AI experimentation, small projects, and understanding workflow patterns before moving to production systems.
β οΈ Important: TinyWorkflow is designed for learning and lightweight use cases. For production-grade durable workflows with full fault tolerance, use Temporal, Azure Durable Functions, or DBOS.
Perfect for learning and lightweight workflows:
- π― Pure Python - Simple decorator-based API, no DSL to learn
- πΎ State Persistence - SQLite, PostgreSQL, or MySQL (basic state tracking)
- π Retry Logic - Exponential backoff with jitter for failed activities
- β‘ Async/Await - Modern Python async for high-performance
- π Parallel Execution - Run activities concurrently (fan-out/fan-in)
- π₯ Human-in-the-Loop - Basic approval workflows
- π Scheduling - Cron expressions and delayed execution
- π Event Sourcing - Audit trail for observability
- π₯οΈ Web UI - Simple workflow monitoring interface
- π οΈ CLI Tool - Command-line interface for operations
- π Zero Setup - No external services required (SQLite default)
- π Easy to Learn - Small codebase (~2000 LOC), great for education
# Install from PyPI (once published)
# Includes support for SQLite, PostgreSQL, and MySQL
pip install tinyworkflow
# Or install from source
git clone https://github.com/scionoftech/tinyworkflow
cd tinyworkflow
pip install -e .import asyncio
from tinyworkflow import workflow, activity, WorkflowContext, TinyWorkflowClient
# Define activities
@activity(name="fetch_data")
async def fetch_data(url: str):
# Your code here
return {"data": "..."}
@activity(name="process_data")
async def process_data(data: dict):
# Your code here
return {"result": "..."}
# Define workflow
@workflow(name="etl_pipeline")
async def etl_workflow(ctx: WorkflowContext):
url = ctx.get_input("url")
# Execute activities
data = await ctx.execute_activity(fetch_data, url)
result = await ctx.execute_activity(process_data, data)
return result
# Run workflow
async def main():
async with TinyWorkflowClient() as client:
run_id = await client.start_workflow(
"etl_pipeline",
input_data={"url": "https://api.example.com"}
)
print(f"Workflow started: {run_id}")
asyncio.run(main())# IMPORTANT: Run from project root directory
cd /path/to/tinyworkflow
# Start server with example workflows
tinyworkflow server --import-workflows examples.workflows
# Open browser to http://localhost:8080
# You'll see all 20 example workflows ready to run!Common Error: If you see "No module named 'examples'", make sure you're running from the project root directory (the directory containing the examples/ folder).
Activities are reusable tasks that perform a single unit of work. They support automatic retries and timeouts.
from tinyworkflow import activity, RetryPolicy
@activity(
name="fetch_user",
retry_policy=RetryPolicy(max_retries=5, initial_delay=1.0),
timeout=30.0
)
async def fetch_user(user_id: str):
# Activity code
return {"id": user_id, "name": "John"}Workflows orchestrate multiple activities and define the business logic. They are automatically persisted and can recover from failures.
from tinyworkflow import workflow, WorkflowContext, RetryPolicy
@workflow(
name="user_onboarding",
retry_policy=RetryPolicy(max_retries=3)
)
async def user_onboarding_workflow(ctx: WorkflowContext):
user_id = ctx.get_input("user_id")
# Sequential execution
user = await ctx.execute_activity(fetch_user, user_id)
await ctx.execute_activity(send_welcome_email, user)
return {"status": "completed"}Execute multiple activities concurrently for better performance:
@workflow(name="parallel_example")
async def parallel_workflow(ctx: WorkflowContext):
user_id = ctx.get_input("user_id")
# Run activities in parallel
user, orders, preferences = await ctx.execute_parallel(
(fetch_user, (user_id,), {}),
(fetch_orders, (user_id,), {}),
(fetch_preferences, (user_id,), {})
)
return {"user": user, "orders": orders, "preferences": preferences}Pause workflows for manual approval:
@workflow(name="expense_approval")
async def expense_workflow(ctx: WorkflowContext):
amount = ctx.get_input("amount")
if amount > 1000:
# Wait for manager approval
approved = await ctx.wait_for_approval("manager_approval", timeout=3600)
if not approved:
return {"status": "rejected"}
# Process payment
result = await ctx.execute_activity(process_payment, amount)
return resultTinyWorkflow includes a powerful CLI for workflow management:
# Start the web UI server (with workflow imports)
tinyworkflow server --import-workflows examples.workflows --port 8080
# Start a background worker (with workflow imports)
tinyworkflow worker --import-workflows examples.workflows
# Start a workflow
tinyworkflow start my_workflow --input '{"key": "value"}'
# Check workflow status
tinyworkflow status <run_id>
# List all workflows
tinyworkflow list --status running
# View workflow events (audit trail)
tinyworkflow events <run_id>
# Schedule a workflow (cron)
tinyworkflow schedule my_workflow "0 9 * * *"
# List pending approvals
tinyworkflow approvals
# Approve a workflow
tinyworkflow approve <run_id> --approve
# List registered workflows
tinyworkflow workflows
# Cancel a workflow
tinyworkflow cancel <run_id>Start the web interface to manage workflows visually:
# IMPORTANT: Run from project root directory
cd /path/to/tinyworkflow
# Start server with workflow imports
tinyworkflow server --import-workflows examples.workflows --port 8080Then open http://localhost:8080 in your browser. Features include:
- π Dashboard with workflow statistics
βΆοΈ Start new workflows with custom input- π List and filter workflow executions
- π View detailed workflow status and events
- β° Schedule workflows with cron expressions
- β Approve/reject pending workflows
- π Browse registered workflows and activities
- Must use
--import-workflowsto make workflows available - Must run from project root directory
- See Workflow Registration for troubleshooting
async with TinyWorkflowClient() as client:
# Run daily at 9am
await client.schedule_workflow("daily_report", "0 9 * * *")
# Run every 5 minutes
await client.schedule_workflow("health_check", "*/5 * * * *")async with TinyWorkflowClient() as client:
# Run after 5 minutes
await client.schedule_delayed_workflow(
"cleanup_job",
delay_seconds=300,
input_data={"resource_id": "abc123"}
)Perfect for multi-step AI pipelines with automatic retries and state management:
@workflow(name="ai_content_pipeline")
async def ai_content_pipeline(ctx: WorkflowContext):
prompt = ctx.get_input("prompt")
# Generate content with retry logic
content = await ctx.execute_activity(generate_ai_content, prompt)
# Parallel analysis: sentiment, moderation, keywords
sentiment, moderation, keywords = await ctx.execute_parallel(
(analyze_sentiment, (content,), {}),
(moderate_content, (content,), {}),
(extract_keywords, (content,), {})
)
# Check moderation
if moderation["flagged"]:
return {"status": "rejected", "reason": "content_moderation"}
# Translate to multiple languages
translations = await ctx.execute_parallel(
(translate, (content, "es"), {}),
(translate, (content, "fr"), {}),
(translate, (content, "de"), {})
)
# Save results with full audit trail
await ctx.execute_activity(save_results, {
"content": content,
"sentiment": sentiment,
"translations": translations
})
return {"status": "completed", "content": content}Real-world AI use cases:
- Content generation and moderation pipelines
- Document processing and extraction
- Sentiment analysis workflows
- Multi-language translation pipelines
- Image/video processing workflows
- ML model inference pipelines
- Data labeling and annotation workflows
ETL and data pipelines:
@workflow(name="etl")
async def etl_workflow(ctx: WorkflowContext):
# Extract
data = await ctx.execute_activity(extract_from_source)
# Transform
transformed = await ctx.execute_activity(transform_data, data)
# Load
await ctx.execute_activity(load_to_destination, transformed)
return {"status": "success"}Business processes requiring human approval:
@workflow(name="purchase_order")
async def purchase_order_workflow(ctx: WorkflowContext):
order = await ctx.execute_activity(create_order, ctx.get_input("items"))
# Require approval for large orders
if order["total"] > 10000:
approved = await ctx.wait_for_approval("purchase_approval")
if not approved:
return {"status": "rejected"}
await ctx.execute_activity(process_order, order)
return {"status": "completed", "order_id": order["id"]}TinyWorkflow is designed as a simple workflow library with these components:
- State Manager - SQLAlchemy-based persistence (SQLite/PostgreSQL/MySQL)
- Workflow Engine - Executes workflows with state tracking
- Activity Executor - Runs activities with retry logic
- Scheduler - Cron and delayed jobs (APScheduler)
- Worker - Background processor for async execution
- Client API - Python API for workflow management
- CLI - Command-line interface (Click)
- Web UI - FastAPI-based web interface
What TinyWorkflow does NOT provide (by design):
- No Workflow Replay - Failed workflows retry from scratch, not from the failure point
- No Deterministic Execution - Can use
datetime.now(),uuid.uuid4(),random()freely - No Durable Timers - Using
asyncio.sleep()loses timer state on crash - No Signal System - Cannot send external events to running workflows
- No Saga/Compensation - No automatic rollback on failures
- No Workflow Versioning - Changing code may break in-flight workflows
These limitations are intentional - implementing them would significantly increase complexity. For workflows requiring these features, use production systems like Temporal or DBOS.
What TinyWorkflow DOES provide:
β State persistence (workflows/activities stored in database) β Retry policies (exponential backoff with jitter) β Parallel execution (fan-out/fan-in patterns) β Event sourcing (audit trail) β Human-in-the-loop (basic approval workflows) β Scheduling (cron expressions) β Web UI (workflow monitoring) β Multi-database support (SQLite/PostgreSQL/MySQL)
TinyWorkflow supports SQLite (default), PostgreSQL, and MySQL for state persistence.
No additional setup required. Perfect for development and small deployments:
from tinyworkflow import TinyWorkflowClient
# Use default SQLite database (tinyworkflow.db in current directory)
async with TinyWorkflowClient() as client:
pass
# Or specify custom SQLite path
async with TinyWorkflowClient(
database_url="sqlite+aiosqlite:///path/to/custom.db"
) as client:
passConfigure PostgreSQL connection (driver included by default):
from tinyworkflow import TinyWorkflowClient
# Connect to PostgreSQL
async with TinyWorkflowClient(
database_url="postgresql+asyncpg://user:password@localhost:5432/tinyworkflow"
) as client:
passSetup PostgreSQL database:
# Create database
createdb tinyworkflow
# Or using psql
psql -c "CREATE DATABASE tinyworkflow;"Configure MySQL connection (driver included by default):
from tinyworkflow import TinyWorkflowClient
# Connect to MySQL
async with TinyWorkflowClient(
database_url="mysql+asyncmy://user:password@localhost:3306/tinyworkflow"
) as client:
pass
# With charset specification
async with TinyWorkflowClient(
database_url="mysql+asyncmy://user:password@localhost:3306/tinyworkflow?charset=utf8mb4"
) as client:
passSetup MySQL database:
# Create database
mysql -u root -p -e "CREATE DATABASE tinyworkflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"You can specify the database URL when using the CLI:
# PostgreSQL
tinyworkflow --db "postgresql+asyncpg://user:pass@localhost/tinyworkflow" server
# MySQL
tinyworkflow --db "mysql+asyncmy://user:pass@localhost/tinyworkflow" worker
# Custom SQLite path
tinyworkflow --db "sqlite+aiosqlite:///custom/path/db.sqlite" serverSet database URL via environment variable:
export TINYWORKFLOW_DATABASE_URL="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
tinyworkflow serverPostgreSQL and MySQL use connection pooling by default:
- Pool Size: 10 connections
- Max Overflow: 20 additional connections
- Pool Pre-Ping: Enabled (verifies connections before use)
- Pool Recycle: 3600 seconds (1 hour)
These settings are optimized for most use cases and applied automatically.
Customize retry behavior:
from tinyworkflow import RetryPolicy
retry_policy = RetryPolicy(
max_retries=5,
initial_delay=1.0, # seconds
max_delay=60.0, # seconds
backoff_multiplier=2.0, # exponential backoff
jitter=True, # add randomness
jitter_factor=0.1 # 10% jitter
)
@activity(name="flaky_task", retry_policy=retry_policy)
async def flaky_task():
# May fail and will be retried
passclient = TinyWorkflowClient(auto_start_worker=True)
# Or manually configure
worker = WorkflowWorker(
state_manager=state_manager,
workflow_engine=engine,
poll_interval=1.0,
max_concurrent_workflows=10
)Every state change is recorded:
events = await client.get_workflow_events(run_id)
for event in events:
print(f"{event.timestamp}: {event.event_type}")workflow = await client.get_workflow_status(run_id)
print(f"Status: {workflow.status}")
print(f"Created: {workflow.created_at}")
print(f"Retries: {workflow.retry_count}/{workflow.max_retries}")Important: Workflows must be explicitly imported to be available in the CLI and web UI.
# IMPORTANT: Run from project root directory
cd /path/to/tinyworkflow
# Start server with example workflows
tinyworkflow server --import-workflows examples.workflows
# Start worker with your project workflows
tinyworkflow worker --import-workflows myproject.workflowsTroubleshooting: If you get "No module named 'examples'" error:
- Verify you're in the project root directory:
pwdorcd - Check that
examples/__init__.pyexists - Try:
python -c "import examples.workflows"to test imports
# myproject/workflows.py
"""Workflow registry - imports all workflow modules"""
from myproject.orders import order_workflow
from myproject.payments import payment_workflow
from myproject.notifications import notification_workflowThen start the server:
tinyworkflow server --import-workflows myproject.workflowsWorkflows are registered when their Python modules are imported via the @workflow decorator. Without explicit imports:
- β Web UI shows "No workflows registered"
- β Cannot start or schedule workflows
- β Registry appears empty
π See WORKFLOW_REGISTRATION.md for detailed guide
Run the test suite:
pytest tests/ -vCheck the examples/ directory for complete examples:
simple_workflow.py- Basic ETL workflowparallel_workflow.py- Parallel activity executionapproval_workflow.py- Human-in-the-loop approvalretry_workflow.py- Retry policies and failure handlingscheduling_workflow.py- Cron scheduling and delayed executionai_content_pipeline.py- AI content generation with sentiment analysis and moderationai_document_processor.py- AI document processing with parallel analysisdatabase_configuration.py- Multi-database configuration examples
Run the core examples to understand TinyWorkflow's features:
# Retry Policies - Handle failures with automatic retries
python examples/retry_workflow.py
# Scheduling - Cron expressions and delayed execution
python examples/scheduling_workflow.py
# Approval Workflows - Human-in-the-loop patterns
python examples/approval_workflow.pyRun the AI examples to see TinyWorkflow in action with AI workloads:
# AI Content Pipeline - Generate, analyze, and moderate content
python examples/ai_content_pipeline.py
# AI Document Processor - Extract, classify, and analyze documents
python examples/ai_document_processor.pyCore features demonstrated:
- β Retry policies with exponential backoff
- β Activity-level and workflow-level retries
- β Cron-based scheduling (daily, weekly, monthly jobs)
- β Delayed workflow execution
- β Human-in-the-loop approval workflows
- β Parallel execution patterns
AI/ML features demonstrated:
- β AI/ML task orchestration
- β Content generation and moderation pipelines
- β Document processing with parallel analysis
- β Sentiment analysis workflows
- β State persistence and recovery
- β Event sourcing for audit trails
- β Batch processing of multiple documents
Perfect for:
- π Learning workflow orchestration concepts
- π§ͺ Prototyping and experimenting with workflow patterns
- π Educational projects and tutorials
- π Quick demos and POCs
- π Simple data pipelines (< 1 hour execution)
- π€ AI experimentation with LLM chains
- π οΈ Small internal tools and automation scripts
- π Lightweight scheduled jobs
Key advantages:
- Zero infrastructure setup (SQLite by default)
- Simple decorator-based API
- Easy to understand codebase (~2000 LOC)
- Great for learning before Temporal
Use production systems instead for:
- β Critical business processes requiring guaranteed execution
- β Long-running workflows (hours/days) with crash recovery
- β High-scale production workloads (1000s of workflows/sec)
- β Distributed transactions requiring saga patterns
- β Complex compensations and rollback logic
- β Mission-critical systems where downtime costs money
For production, use:
- Temporal - Full-featured durable execution
- Azure Durable Functions - Serverless workflows
- DBOS - Database-backed workflows
- Prefect - Data engineering workflows
- Airflow - Batch data pipelines
| Feature | TinyWorkflow | Temporal | Azure Durable | DBOS |
|---|---|---|---|---|
| Setup Complexity | β Very Simple | βββ Complex | ββ Moderate | ββ Moderate |
| Target Use Case | Learning/Small | Production | Production | Production |
| Workflow Replay | β | β | β | β |
| Deterministic Execution | β | β | β | β |
| Fault Tolerance | β Full | β Full | β Full | |
| Durable Timers | β | β | β | β |
| Signals/Events | β | β | β | β |
| State Persistence | β SQLite/Postgres/MySQL | β | β | β |
| Retry Policies | β | β | β | β |
| Parallel Execution | β | β | β | β |
| Learning Curve | Low | High | Medium | Medium |
| Best For | Learning & Prototypes | Production Scale | Azure Ecosystem | DB-Centric Apps |
Contributions are welcome! Please feel free to submit a Pull Request.
MIT License - see LICENSE file for details.
Inspired by:
- Temporal - Durable execution primitives
- Prefect - Modern workflow orchestration
- DBOS - Durable execution with databases
- Quick Start Guide - Get started in 5 minutes
- Workflow Registration - How to register workflows
- Limitations - What TinyWorkflow does and doesn't provide
- GitHub Issues: Report bugs
- Discussions: Ask questions