A Lightweight Workflow Orchestration system written in Rust, providing high-performance workflow execution with minimal dependencies. This is a Rust implementation of the Python ddeutil-workflow package, designed for easy metadata-driven data workflows using YAML templates.
- YAML-based workflow configuration - Define workflows using intuitive YAML syntax
- High-performance execution - Built with Rust for speed and memory safety
- Async/await support - Full asynchronous execution with Tokio
- Job and stage orchestration - Hierarchical execution with jobs containing stages
- Cron scheduling - Built-in scheduler supporting cron expressions
- Parallel execution - Multi-threading support for concurrent job execution
- Matrix strategies - Parameterized workflows with cross-product execution
- Comprehensive error handling - Structured error types with detailed context
- Extensible stage types - Support for Bash, Python, Docker, and custom stages
- Audit and tracing - Complete execution tracking and logging
- Template engine - Handlebars-based parameter templating
- CLI interface - Command-line tool for workflow management
Add this to your Cargo.toml
:
[dependencies]
ddeutil-workflow = "0.1"
# For async runtime
tokio = { version = "1.0", features = ["full"] }
cargo install ddeutil-workflow
[dependencies]
ddeutil-workflow = { version = "0.1", features = ["docker", "api"] }
Available features:
docker
- Docker container stage supportapi
- REST API server functionalityall
- All optional features
use ddeutil_workflow::{Workflow, Result};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load workflow from configuration
let workflow = Workflow::from_config("my-workflow").await?;
// Execute with parameters
let mut params = HashMap::new();
params.insert("source".to_string(), serde_json::json!("input.csv"));
params.insert("target".to_string(), serde_json::json!("output.csv"));
let result = workflow.execute(params, None, None, None, None).await?;
if result.status.is_success() {
println!("β
Workflow completed successfully");
println!("π Run ID: {}", result.run_id);
} else {
println!("β Workflow failed: {:?}", result.get_errors());
}
Ok(())
}
# Execute a workflow
workflow-cli run data-pipeline --params '{"source": "input.csv", "target": "output.csv"}'
# List available workflows
workflow-cli list
# Validate workflow configuration
workflow-cli validate data-pipeline
# Show workflow information
workflow-cli info data-pipeline
# Start scheduler daemon
workflow-cli schedule --workflows data-pipeline,backup-job --interval 60
Workflows are defined using YAML configuration files:
# workflow.yaml
type: Workflow
name: data-processing
desc: "Process daily data files"
on:
- cronjob: '0 */6 * * *'
timezone: "UTC"
params:
source:
type: str
description: "Source file path"
target:
type: str
description: "Target file path"
batch_size:
type: int
default: 1000
jobs:
extract-data:
runs-on:
type: local
stages:
- name: "Extract Data"
echo: "Starting data extraction from ${{ params.source }}"
- name: "Process Data"
uses: "tasks/process-data@v1"
with:
input: ${{ params.source }}
output: ${{ params.target }}
batch_size: ${{ params.batch_size }}
validate-output:
needs: [extract-data]
runs-on:
type: local
stages:
- name: "Validate Output"
bash: |
if [ -f "${{ params.target }}" ]; then
echo "β
Output file created successfully"
wc -l "${{ params.target }}"
else
echo "β Output file not found"
exit 1
fi
The workflow system is built around several core concepts:
- Workflow: Top-level orchestration container
- Job: Execution unit containing multiple stages
- Stage: Individual task execution (Bash, Python, Docker, etc.)
- Result: Execution status and output management
- Event: Scheduling and trigger management
- Audit: Execution tracking and logging
- name: "Log Message"
echo: "Processing started"
sleep: 2
- name: "Run Script"
bash: |
echo "Hello $NAME"
ls -la
env:
NAME: "World"
- name: "Python Processing"
run: |
import json
data = {"processed": True, "count": len(items)}
print(json.dumps(data))
vars:
items: [1, 2, 3, 4, 5]
- name: "Custom Task"
uses: "tasks/process-data@v1"
with:
input_file: ${{ params.source }}
output_file: ${{ params.target }}
- name: "Docker Processing"
image: "python:3.11"
env:
PYTHONPATH: "/app"
volume:
"./data": "/app/data"
bash: "python /app/process.py"
Execute jobs with multiple parameter combinations:
jobs:
test-matrix:
strategy:
matrix:
python_version: ["3.9", "3.10", "3.11"]
os: ["ubuntu", "windows"]
max_parallel: 2
fail_fast: true
stages:
- name: "Test on ${{ matrix.python_version }} - ${{ matrix.os }}"
echo: "Testing Python ${{ matrix.python_version }} on ${{ matrix.os }}"
Environment variables for configuration:
# Core settings
WORKFLOW_CORE_CONFIG_PATH="/path/to/configs"
WORKFLOW_CORE_TIMEZONE="UTC"
WORKFLOW_CORE_MAX_JOB_PARALLEL="2"
WORKFLOW_CORE_DEFAULT_TIMEOUT="3600"
# Logging
WORKFLOW_TRACE_LEVEL="INFO"
WORKFLOW_TRACE_PATH="/var/log/workflow"
# Registry
WORKFLOW_REGISTRY_CALLER="my_tasks"
Run the test suite:
# Run all tests
cargo test
# Run with output
cargo test -- --nocapture
# Run specific test
cargo test test_workflow_execution
# Run integration tests
cargo test --test integration
The Rust implementation provides significant performance improvements over the Python version:
- Memory Usage: ~50% reduction in memory footprint
- Execution Speed: ~3-5x faster workflow execution
- Startup Time: ~10x faster cold start
- Concurrency: Better handling of parallel job execution
- Memory Safety: Rust's ownership system prevents memory-related vulnerabilities
- Type Safety: Compile-time type checking prevents runtime errors
- Secure Defaults: Safe configuration defaults and input validation
- Audit Logging: Comprehensive execution tracking for compliance
Contributions are welcome! Please see our Contributing Guide for details.
# Clone the repository
git clone https://github.com/ddeutils/ddeutil-workflow
cd ddeutil-workflow/.rust
# Install Rust (if not already installed)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Build the project
cargo build
# Run tests
cargo test
# Run with examples
cargo run --bin workflow-cli -- run example-workflow
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by the Python ddeutil-workflow package
- Built with the amazing Rust ecosystem including Tokio, Serde, and Clap
- Thanks to the workflow orchestration community for inspiration and feedback
- Python ddeutil-workflow - Original Python implementation
- Tokio - Asynchronous runtime for Rust
- Serde - Serialization framework for Rust