Skip to content

tasker-systems/tasker-core-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Tasker Core Rust

High-performance Rust implementation of workflow orchestration designed to complement the existing Ruby on Rails Tasker engine. Built for production-scale deployments with comprehensive orchestration capabilities, fault tolerance, and 10-100x performance improvements over Rails equivalents.

🎯 Current Status: Production Ready

βœ… TAS-37 COMPREHENSIVE ORCHESTRATION ENHANCEMENT COMPLETE (August 17, 2025) βœ… 645+ tests passing with comprehensive orchestration system πŸš€ Production deployment ready with race condition elimination and dynamic scaling

Major Achievements

  • πŸ—οΈ Complete Orchestration System: Dynamic executor pools, health monitoring, auto-scaling
  • πŸ›‘οΈ Race Condition Elimination: Atomic SQL-based finalization claiming prevents duplicate processing
  • ⚑ Circuit Breaker Integration: Production-ready fault tolerance with configurable thresholds
  • πŸ“Š Component-Based Configuration: 10 focused TOML files with environment-specific overrides
  • πŸ”„ UUID v7 Architecture: Complete migration to time-ordered UUIDs for distributed safety

πŸš€ Architecture Overview

PostgreSQL Message Queue (PGMQ) Based System where Rust handles orchestration coordination and Ruby workers process steps autonomously through queue polling.

Core Components

  • OrchestrationCore: Unified bootstrap system with consistent component initialization
  • OrchestrationLoopCoordinator: Dynamic executor pool management with auto-scaling and health monitoring
  • Finalization System: Race-condition-free task finalization with atomic SQL claiming
  • Circuit Breaker Integration: Resilient messaging with configurable protection
  • Operational State Management: Context-aware system state with shutdown coordination

Queue Design Pattern

fulfillment_queue           - All fulfillment namespace steps
inventory_queue            - All inventory namespace steps
notifications_queue        - All notification namespace steps
orchestration_step_results - Step completion results processing

Key Technical Patterns

  • PostgreSQL-Centric Architecture: Shared database as API layer with PGMQ for reliable messaging
  • Component-Based Configuration: TOML configuration with environment overrides and validation
  • Atomic Operations: SQL-based claiming prevents race conditions and ensures consistency
  • Operational State Awareness: Health monitoring adapts behavior based on system state
  • Circuit Breaker Protection: Configurable resilience for database and messaging operations

πŸ“š Quick Start

Prerequisites

  • Rust: 1.75+ with Cargo
  • PostgreSQL: 14+ with PGMQ extension enabled
  • Ruby: 3.1+ (for Ruby bindings)

Installation

# Clone the repository
git clone https://github.com/tasker-systems/tasker-core
cd tasker-core

# Set up database (PostgreSQL with PGMQ extension)
createdb tasker_rust_development
psql tasker_rust_development -c "CREATE EXTENSION IF NOT EXISTS pgmq;"

# Run database migrations
DATABASE_URL=postgresql://user:pass@localhost/tasker_rust_development cargo sqlx migrate run

# Build with all features
cargo build --all-features

# Run tests
cargo test --all-features

Basic Usage

use tasker_core::orchestration::OrchestrationCore;
use tasker_core::config::ConfigManager;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize with environment-aware configuration
    let orchestration_core = OrchestrationCore::new().await?;

    // Or specify configuration explicitly
    let config_manager = ConfigManager::load()?;
    let orchestration_core = OrchestrationCore::from_config(config_manager).await?;

    println!("βœ… Orchestration core initialized successfully");
    Ok(())
}

πŸ› οΈ Configuration

Component-Based Configuration Structure

The system uses 10 focused TOML configuration files with environment-specific overrides:

config/tasker/base/
β”œβ”€β”€ auth.toml              # Authentication settings
β”œβ”€β”€ circuit_breakers.toml  # Circuit breaker configuration
β”œβ”€β”€ database.toml          # Database connection settings
β”œβ”€β”€ engine.toml            # Core engine configuration
β”œβ”€β”€ executor_pools.toml    # Executor pool settings
β”œβ”€β”€ orchestration.toml     # Orchestration parameters
β”œβ”€β”€ pgmq.toml             # Message queue configuration
β”œβ”€β”€ query_cache.toml      # Query caching settings
β”œβ”€β”€ system.toml           # System-level settings
└── telemetry.toml        # Monitoring and metrics

config/tasker/environments/
β”œβ”€β”€ development/          # Development overrides
β”œβ”€β”€ production/          # Production settings
└── test/               # Test environment

Example Configuration

# config/tasker/base/orchestration.toml
[orchestration]
auto_scaling_enabled = true
target_utilization = 0.75
scaling_interval_seconds = 30
health_check_interval_seconds = 10

[executor_pools.task_claimer]
min_executors = 2
max_executors = 10
polling_interval_ms = 50
batch_size = 20
circuit_breaker_enabled = true

πŸ—οΈ Core Development Commands

Rust Core

# Core development (ALWAYS use --all-features for consistency)
cargo build --all-features                         # Build project
cargo test --all-features                          # Run tests
cargo clippy --all-targets --all-features          # Lint code
cargo fmt                                          # Format code

# Coverage and benchmarking
cargo llvm-cov --all-features                      # Generate coverage
cargo bench --all-features                         # Performance benchmarks

# Database operations
cargo sqlx migrate run                             # Run migrations
cargo check --all-features                        # Fast compilation check

Ruby Integration

cd workers/ruby
bundle install                                     # Install dependencies
bundle exec rake compile                           # Compile Ruby extension

# Integration testing
DATABASE_URL=postgresql://user:pass@localhost/tasker_rust_test \
TASKER_ENV=test bundle exec rspec spec/integration/ --format documentation

πŸ§ͺ Testing

The project maintains comprehensive test coverage across multiple dimensions:

Test Categories

  • Unit Tests: 645+ tests covering all components
  • Integration Tests: End-to-end workflow validation with real database operations
  • Race Condition Tests: Concurrent processor simulation for finalization claiming
  • Configuration Tests: All configuration scenarios with environment overrides
  • Ruby Integration: Full Ruby FFI binding validation

Running Tests

# Core orchestration tests
cargo test --all-features orchestration

# Configuration system tests
cargo test --all-features config

# Circuit breaker integration tests
cargo test --all-features circuit_breaker

# Ruby integration tests
TASKER_ENV=test bundle exec rspec spec/integration/ --format documentation

# Specific workflow validation
TASKER_ENV=test bundle exec rspec spec/integration/linear_workflow_integration_spec.rb
TASKER_ENV=test bundle exec rspec spec/integration/order_fulfillment_spec.rb

πŸ“Š Performance & Monitoring

Performance Targets Achieved

  • 10-100x faster dependency resolution vs PostgreSQL functions βœ…
  • <1ms overhead per step coordination handoff βœ…
  • >10k events/sec cross-language event processing βœ…
  • Zero race conditions through atomic finalization claiming βœ…

Key Metrics

The system provides comprehensive observability:

Finalization Claiming Metrics

  • finalization_claim_duration_seconds - Time taken to attempt claiming
  • finalization_claim_success_total - Successfully acquired claims
  • finalization_claim_contention_total - Failed attempts due to contention
  • finalization_claim_contention_by_reason_total - Contention breakdown by reason

Circuit Breaker Metrics

  • Circuit breaker state transitions and failure rates
  • Protected operation success/failure ratios
  • Recovery time measurements

Executor Pool Metrics

  • Pool utilization and scaling events
  • Health state transitions
  • Throughput measurements per executor type

Health Check Endpoint

curl http://localhost:3000/health/orchestration
{
  "status": "healthy",
  "coordinator": {
    "running": true,
    "uptime_seconds": 3600
  },
  "executor_pools": {
    "task_request_processor": {
      "healthy": 3,
      "unhealthy": 0,
      "throughput": 150.5
    }
  }
}

πŸ”§ Architecture Deep Dive

Orchestration Loop Coordinator

The OrchestrationLoopCoordinator manages pools of orchestration executors with dynamic scaling:

pub struct OrchestrationLoopCoordinator {
    executor_pools: Arc<DashMap<ExecutorType, ExecutorPool>>,
    config: CoordinatorConfig,
    db_pool: PgPool,
    circuit_breakers: Arc<DashMap<ExecutorType, Arc<CircuitBreaker>>>,
    scaling_state: Arc<RwLock<ScalingState>>,
}

Key Responsibilities:

  • Pool Management: Maintains min/max executors per type
  • Health Monitoring: Regular health checks and heartbeats
  • Auto-scaling: Scales based on load and performance metrics
  • Backpressure: Responds to database pool saturation
  • Circuit Breaking: Integrates with existing circuit breaker

Race Condition Elimination

TAS-37 implemented atomic finalization claiming using dedicated SQL functions:

-- Atomic claim function prevents multiple processors from finalizing same task
SELECT * FROM claim_task_for_finalization($1, $2, $3)

Benefits:

  • Zero "unclear state" logs in production
  • Transactional safety with automatic claim release on rollback
  • Complete audit trail for debugging
  • Database-visible claims for operational monitoring

Circuit Breaker Integration

Production-ready circuit breaker system protects all critical paths:

impl PgmqClient {
    pub async fn read_messages_with_circuit_breaker(&self, queue: &str) -> Result<Vec<Message>> {
        self.circuit_breaker
            .call(|| self.read_messages_internal(queue))
            .await
    }
}

🚦 Deployment

Environment Configuration

Set the environment and the system will automatically load the appropriate configuration:

export TASKER_ENV=production
cargo run --all-features

Configuration Validation

The system validates configuration on startup:

# Test configuration loading
TASKER_ENV=production cargo run --example config_demo --all-features

Production Considerations

  • Database Pool: Configure appropriate pool size for your load
  • Circuit Breakers: Tune thresholds based on your failure patterns
  • Executor Pools: Set min/max executors based on expected throughput
  • Health Monitoring: Set up monitoring for circuit breaker states
  • Resource Limits: Ensure sufficient database connections for executor pools

πŸ”— Related Projects

  • tasker-engine: Production-ready Rails engine for workflow orchestration
  • tasker-blog: GitBook documentation with real-world engineering stories

πŸ“– Documentation

Key Documentation

  • CLAUDE.md: Complete project context and architecture overview
  • docs/ticket-specs/: Detailed implementation specifications for major features
  • Cargo.toml: Dependencies and feature configuration

Implementation Tickets

Completed Major Features:

  • TAS-31: Production resilience & performance optimization βœ…
  • TAS-32: Unified configuration manager βœ…
  • TAS-33: UUID v7 primary key migration βœ…
  • TAS-34: Component-based configuration architecture βœ…
  • TAS-37: Task finalization race condition elimination βœ…

Available for Implementation:

  • TAS-35: Executor pool lifecycle management
  • TAS-36: Auto-scaling algorithm enhancement
  • TAS-39: Health check integration

πŸ† Success Metrics Achieved

TAS-37: Orchestration Enhancement βœ…

  • Zero Race Conditions: Atomic finalization claiming eliminates "unclear state" logs
  • Production Ready: Comprehensive error handling, logging, and metrics collection
  • Operational Excellence: Context-aware health monitoring prevents false alerts during shutdowns
  • Auto-Scaling: Dynamic executor pools with threshold and rate-based algorithms
  • Thread Safety: Proper concurrent programming patterns with Arc/RwLock
  • Comprehensive Observability: 12+ metrics for finalization operations

Overall System Quality βœ…

  • Production Ready: All major components implemented with comprehensive testing
  • Comprehensive Testing: 645+ tests covering all scenarios and edge cases
  • Code Quality: Excellent assessment from comprehensive code review
  • Documentation: Thorough inline documentation and architectural guides
  • Observability: Detailed metrics and logging for production monitoring

🐳 Docker Deployment

Standalone Server Mode

The project includes a standalone server binary (tasker-server) that combines orchestration and web API for containerized deployment:

# Build and run with Docker Compose
docker-compose --profile server up -d

# Or build the Docker image directly
docker build -t tasker-server .
docker run -p 3000:8080 \
  -e DATABASE_URL=postgresql://user:pass@host/db \
  -e TASKER_ENV=production \
  tasker-server

Available Services

  • postgres: PostgreSQL database with PGMQ extension (always started)
  • tasker-server: Orchestration + Web API server (profile: server)

Usage Examples

# Start just the database
docker-compose up -d postgres

# Start database + server
docker-compose --profile server up -d

# View server logs
docker-compose logs -f tasker-server

# Stop everything
docker-compose down

Configuration

The server uses the component-based TOML configuration system. Key settings:

  • Environment: Set via TASKER_ENV (development, test, production)
  • Namespaces: Configured in config/tasker/base/orchestration.toml
  • Web API: Configured in config/tasker/base/web.toml
  • Database: Override via DATABASE_URL environment variable

Health Checks

  • Server Health: http://localhost:3000/health
  • API Documentation: http://localhost:3000/api-docs/ui

Production Deployment

The Docker image is optimized for production with:

  • Multi-stage build for minimal image size
  • Non-root user for security
  • Health checks for container orchestration
  • Proper signal handling for graceful shutdown

🀝 Contributing

Development Setup

  1. Clone the repository
  2. Install Rust 1.75+
  3. Set up PostgreSQL with PGMQ extension
  4. Run database migrations
  5. Install Ruby dependencies for integration tests
  6. Run the test suite to verify setup

Development Workflow

  • Branch Naming: Use feature/description or fix/description
  • Testing: All changes must include tests
  • Linting: Run cargo clippy before committing
  • Integration: Ensure Ruby integration tests pass

Code Style

  • Use cargo fmt for formatting
  • Follow Rust API guidelines
  • Include comprehensive error handling
  • Add tracing for production debugging
  • Update documentation for public APIs

πŸ“„ License

MIT License - see LICENSE for details.


Built with ❀️ by the Tasker Systems team

Ready for production deployment with comprehensive orchestration capabilities, fault tolerance, and performance optimization.

About

The Tasker Core in Rust

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •