Skip to content

tonybenoy/rustflow

Repository files navigation

Rustflow

A production-ready DAG-based task scheduler written in Rust. Define workflows in Python, TypeScript/JavaScript, YAML, JSON, or TOML and run them on cron schedules with retries, failure handling, and logging. Think Airflow/Dagster, but fast and lightweight.

Features

  • Multi-language DAG definitions — Python SDK, TypeScript SDK, YAML, JSON, TOML
  • Cron scheduling with dependency resolution and topological execution
  • Retry logic with fixed or exponential backoff
  • Worker pool with configurable concurrency and timeout enforcement
  • REST API for managing DAGs, runs, and streaming logs
  • Pluggable storage — SQLite (default) or Postgres + Redis for production
  • Docker support — single-container or full Postgres/Redis stack via Compose
  • Graceful shutdown — in-flight tasks complete before exit
  • Shared utilities — inject PYTHONPATH/NODE_PATH for cross-task code reuse

Quick Start

# Build
cargo build --release

# Initialize the database
cargo run -- migrate

# Validate your DAGs
cargo run -- validate examples/dags/

# List discovered DAGs
cargo run -- list examples/dags/

# Start the scheduler, workers, and API server
cargo run -- serve

# Trigger a DAG manually
cargo run -- run simple

# Health check
curl localhost:3000/health

Docker

# SQLite mode (single container)
docker compose --profile sqlite up --build

# Postgres + Redis mode (production)
docker compose --profile postgres up --build

Defining DAGs

Python SDK

from rustflow_sdk import DAG, Task, Python, Node, Shell, Binary

with DAG("etl_pipeline", schedule="0 0 */6 * * *", default_retries=2) as dag:
    extract = Task("extract", Python("tasks/extract.py"), retries=3, backoff="exponential")
    transform = Task("transform", Python("tasks/transform.py"), timeout_secs=300)
    load = Task("load", Shell("echo 'done'"))

    extract >> transform >> load

TypeScript SDK

import { DAG, Task, Python, Shell } from "rustflow-sdk";

const dag = new DAG("etl_pipeline", {
  schedule: "0 0 */6 * * *",
  default_retries: 2,
});

dag.build(() => {
  const extract = new Task("extract", new Python("tasks/extract.py"), {
    retries: 3,
    backoff: "exponential",
  });
  const transform = new Task("transform", new Python("tasks/transform.py"), {
    timeout_secs: 300,
  });
  const load = new Task("load", new Shell("echo 'done'"));

  extract.then(transform).then(load);
});

YAML

dag_id: simple
description: "A simple pipeline"
tags: [example]
tasks:
  - task_id: hello
    runner:
      type: shell
      command: "echo 'Hello!'"
    depends_on: []
  - task_id: goodbye
    runner:
      type: shell
      command: "echo 'Bye!'"
    depends_on: [hello]

JSON

{
  "dag_id": "simple",
  "tasks": [
    {
      "task_id": "hello",
      "runner": { "type": "shell", "command": "echo 'Hello!'" },
      "depends_on": []
    },
    {
      "task_id": "goodbye",
      "runner": { "type": "shell", "command": "echo 'Bye!'" },
      "depends_on": ["hello"]
    }
  ]
}

TOML

dag_id = "my_dag"
schedule = "0 0 * * * *"

[[tasks]]
task_id = "step1"
[tasks.runner]
type = "shell"
command = "echo step1"
depends_on = []

[[tasks]]
task_id = "step2"
[tasks.runner]
type = "python"
script = "tasks/process.py"
depends_on = ["step1"]

Configuration

Create a rustflow.toml in your project root:

[storage]
backend = "sqlite"          # "sqlite" or "postgres"
sqlite_path = "rustflow.db"
# postgres_url = "postgres://user:pass@localhost/rustflow"
# redis_url = "redis://localhost:6379"

[worker]
concurrency = 4
log_dir = "logs"

[scheduler]
tick_interval_secs = 1
dag_dir = "dags"
utils_dir = "utils"

[server]
host = "0.0.0.0"
port = 3000

[secrets]
provider = "env"

Postgres + Redis

For production deployments, use Postgres for CRUD and Redis for the delayed retry queue and distributed locks:

[storage]
backend = "postgres"
postgres_url = "postgres://user:pass@localhost/rustflow"
redis_url = "redis://localhost:6379"

CLI Reference

Command Description
rustflow serve Start scheduler + workers + API (all-in-one)
rustflow scheduler Run only the scheduler
rustflow worker Run only the worker pool
rustflow run <dag> Trigger a DAG run manually
rustflow migrate Create/update database tables
rustflow list [dir] List discovered DAGs (default dir: dags/)
rustflow validate <path> Validate a DAG file or directory

Global flag: --config <path> (default: rustflow.toml)

API Endpoints

Method Endpoint Description
GET /health Health check
GET /api/dags List all DAGs
GET /api/dags/{dag_id} Get DAG definition
DELETE /api/dags/{dag_id} Soft-delete a DAG
POST /api/dags/{dag_id}/trigger Trigger a DAG run
GET /api/runs?dag_id=&limit=50 List DAG runs
GET /api/runs/{dag_run_id} Get run details
GET /api/runs/{dag_run_id}/tasks Get tasks in a run
POST /api/runs/{dag_run_id}/cancel Cancel a run
GET /api/runs/{dag_run_id}/tasks/{task_id}/logs Stream logs (SSE)

Project Structure

rustflow/
├── Cargo.toml
├── Dockerfile
├── docker-compose.yml
├── rustflow.toml
├── migrations/
│   ├── 001_init.sql              # SQLite schema
│   └── 002_postgres_init.sql     # Postgres schema
├── src/
│   ├── main.rs                   # CLI entry point
│   ├── config.rs                 # Configuration
│   ├── discovery.rs              # DAG file discovery and parsing
│   ├── scheduler.rs              # Cron evaluation, dependency dispatch
│   ├── worker.rs                 # Process execution, retries, logging
│   ├── models/                   # DagDef, TaskDef, TaskRun, etc.
│   ├── store/
│   │   ├── mod.rs                # Store trait (20 methods)
│   │   ├── sqlite.rs             # SQLite backend
│   │   └── postgres.rs           # Postgres + Redis backend
│   └── api/                      # Axum REST API
├── sdks/
│   ├── python/                   # Python SDK
│   └── typescript/               # TypeScript SDK
└── examples/                     # Sample DAGs, tasks, configs

License

MIT

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors