A modern, async task processing framework written in Rust for building scalable workflow systems with support for multiple task types, dependency management, and distributed execution.
- Multiple Task Types: HTTP requests, shell commands, and custom task handlers
- Dependency Management: Tasks can depend on the completion of other tasks
- Async Execution: Built on Tokio for high-performance async execution
- Retry Mechanisms: Configurable retry policies with exponential backoff
- Real-time Monitoring: Live task metrics and status tracking
- Extensible Storage: Pluggable storage backends (in-memory, Redis, etc.)
- Worker System: Distributed execution with multiple worker nodes
- Comprehensive Logging: Structured logging with execution traces
- TaskFlow: Main framework orchestrator
- Scheduler: Manages task scheduling and dependency resolution
- Executor: Handles task execution with worker coordination
- Storage: Abstract storage layer for task persistence
- Task Handlers: Extensible system for different task types
- In-Memory: For development and testing
- Redis: For distributed deployments (planned)
- PostgreSQL: For persistent storage (planned)
Add to your Cargo.toml
:
cargo add taskflow-rs
Check the examples/
directory for complete working examples:
basic_usage.rs
: Basic task submission and executionyaml_config_usage.rs
: Using YAML configuration for taskssimple_execution.rs
: Basic task submission and executioncustom_handler.rs
: Custom task handler example
impl TaskFlow {
/// Create a new TaskFlow with configuration
pub async fn new(config: TaskFlowConfig) -> Result<Self, TaskFlowError>;
/// Create from YAML configuration file
pub async fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<Self, TaskFlowError>;
/// Create from YAML configuration string
pub async fn from_yaml_str(config_content: &str) -> Result<Self, TaskFlowError>;
/// Register a custom task handler
pub async fn register_handler(&self, handler: Arc<dyn TaskHandler>);
/// Submit a task for execution
pub async fn submit_task(&self, definition: TaskDefinition) -> Result<String, TaskFlowError>;
/// Submit an HTTP task (convenience method)
pub async fn submit_http_task(
&self,
name: &str,
url: &str,
method: Option<&str>
) -> Result<String, TaskFlowError>;
/// Submit a shell command task (convenience method)
pub async fn submit_shell_task(
&self,
name: &str,
command: &str,
args: Vec<&str>
) -> Result<String, TaskFlowError>;
/// Get task status
pub async fn get_task_status(&self, task_id: &str) -> Result<Option<TaskStatus>, TaskFlowError>;
/// Cancel a task
pub async fn cancel_task(&self, task_id: &str) -> Result<(), TaskFlowError>;
/// List tasks with optional status filter
pub async fn list_tasks(&self, status: Option<TaskStatus>) -> Result<Vec<Task>, TaskFlowError>;
/// Wait for task completion with timeout
pub async fn wait_for_completion(
&self,
task_id: &str,
timeout_seconds: Option<u64>
) -> Result<Task, TaskFlowError>;
/// Get task execution metrics
pub async fn get_task_metrics(&self) -> Result<TaskMetrics, TaskFlowError>;
/// Start the framework (scheduler and executor)
pub async fn start(&self) -> Result<(), TaskFlowError>;
/// Shutdown the framework
pub async fn shutdown(&self) -> Result<(), TaskFlowError>;
}
impl TaskDefinition {
/// Create a new task definition
pub fn new(name: &str, task_type: &str) -> Self;
/// Add a payload parameter (serde_json::Value)
pub fn with_payload(mut self, key: &str, value: serde_json::Value) -> Self;
/// Set task priority
pub fn with_priority(mut self, priority: i32) -> Self;
/// Set task timeout in seconds
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self;
/// Set task dependencies
pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self;
/// Set task tags
pub fn with_tags(mut self, tags: Vec<String>) -> Self;
/// Schedule task for future execution
pub fn schedule_at(mut self, scheduled_at: DateTime<Utc>) -> Self;
}
use taskflow_rs::framework::TaskFlowConfig;
let config = TaskFlowConfig {
max_workers: 4, // Maximum concurrent workers
task_timeout_ms: 30000, // Default task timeout (30 seconds)
retry_delay_ms: 1000, // Delay between retries
max_retries: 3, // Maximum retry attempts
storage_type: StorageType::Memory, // Storage backend type
};
let taskflow = TaskFlow::new(config).await?;
The framework uses TaskFlowError
for all error cases, with detailed error variants:
pub enum TaskFlowError {
StorageError(String),
TaskNotFound(String),
TaskValidationError(String),
DependencyCycle(String),
ExecutionError(String),
TimeoutError(String),
ConfigurationError(String),
}
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
This project is licensed under the Apache 2.0 License.