A distributed DAG execution engine built with Python and Rust (via PyO3 / Maturin). Oxidizer orchestrates multi-tier data pipelines across workers using Redis Streams for messaging, Redis JSON for state, and S3/MinIO for config storage.
Supports both batch DAG runs and live (streaming) topologies. Live nodes use a stateless batch+republish model — each worker processes one batch, re-publishes the task back to the stream, and any worker can claim it next. Redistribution across workers is automatic; scaling requires zero coordination.
| Component | Role |
|---|---|
| Oxidizer | DAG controller — builds execution tiers, dispatches nodes to workers |
| Reagent | Worker — consumes tasks from a stream and runs user-defined processing |
| Microscope | FastMCP control plane — HTTP API, MCP tools, resources, and prompts |
| Catalyst | Redis/Valkey cache client (Rust core) — streams, state, locking |
| Alloy | Pipeline config manager — loads, validates, and stores YAML configs in S3/MinIO |
| Anvil | S3 operations toolkit — static helpers for object storage via Rust engine |
| Formula | DAG engine — constructs runs from cached configs, manages DAG lifecycle |
| Residue | Structured logging — structlog + Rust pyo3_log bridge, optional Redis log sink |
All I/O (Redis, S3, stream reads/writes) runs in async Rust via Tokio. Python only handles business logic.
oxidizer/
├── examples/ # Self-contained pipeline examples
│ ├── Dockerfile # Shared multi-stage build
│ ├── main.py # Controller entrypoint
│ ├── api.py # API entrypoint
│ ├── skeleton/ # Minimal starter template (batch)
│ ├── batch_etl/ # E-Commerce ETL pipeline (batch)
│ ├── live_events/ # IoT sensor monitoring (live)
│ └── mixed_pipeline/ # Fintech transactions (mixed)
├── oxidizer/ # Python package
│ ├── ui.html # Dashboard UI (served by Microscope)
│ └── *.py # Core modules
├── rust/src/ # Rust source (PyO3 extension)
├── tests/ # Pytest + Rust unit tests
├── Cargo.toml
├── pyproject.toml
└── requirements.txt
Each example folder contains its own docker-compose.yml, worker file(s), and YAML config. See examples/README.md for details.
- Python 3.10+
- Rust 1.70+ (for local builds)
- Docker & Docker Compose (for containerized runs)
- Redis (with JSON module) and MinIO (or S3-compatible store)
The fastest way to run the full stack (using the skeleton example):
docker compose -f examples/skeleton/docker-compose.yml up --build -dEach example starts the following core services:
| Service | Description |
|---|---|
main |
Oxidizer controller — executes DAG runs |
worker (×N) |
Reagent workers — process nodes from streams |
api |
Microscope — FastMCP server on port 8000 (HTTP + MCP + Dashboard) |
redis |
Redis Stack (with JSON module) on port 6379 |
minio |
MinIO object store on ports 9000/9001 |
minio-init |
Sidecar — creates the bucket and uploads alloy configs |
Some examples add additional workers (e.g. dedicated stream workers). See each example's docker-compose.yml for specifics.
Once running, the dashboard is at http://localhost:8000, the API at http://localhost:8000/health, the MCP endpoint at http://localhost:8000/mcp, and the MinIO console at http://localhost:9001.
To stop:
docker compose -f examples/skeleton/docker-compose.yml downSee examples/README.md for all four examples and their deployment instructions.
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop # builds the Rust extension in-place
pip install -r requirements.txtThe controller builds a DAG from an alloy config, splits it into execution tiers, and dispatches each node to a worker stream.
from oxidizer import configure_logging
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.oxidizer import Oxidizer
configure_logging() # structlog + Rust pyo3_log bridge
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))
oxidizer = Oxidizer(catalyst=catalyst, alloy=alloy)
# Submit a DAG run by alloy name (non-blocking — queued to the alloy stream)
oxidizer.submit_run("example")
# Start the controller loop (blocks forever, dispatches runs as they arrive)
oxidizer.start()Workers consume tasks from the oxidizer stream. Define your processing logic with the @reagent.react() decorator:
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.reagent import Reagent
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
reagent = Reagent(catalyst=catalyst)
@reagent.react()
def process(data: dict, context: dict):
node_id = context["node_id"]
run_id = context["run_id"]
print(f"Processing {node_id} for run {run_id}")
# data: dict mapping alias → resolved upstream records
# context: run_id, node_id, alloy, layer, node_config, connections
return {"data": data}The decorated function receives (data, context):
- data: dict mapping alias → resolved upstream records (tiered retrieval handled by the framework)
- context: dict with keys
run_id,node_id,alloy,layer,node_config,connections
For dedicated streams (e.g. routing specific nodes to specialized workers):
@reagent.react(dedicated_stream="gold_summary_stream")
def process(data: dict, context: dict):
...Microscope is a FastMCP server that exposes HTTP endpoints for dashboards and curl, plus MCP tools, resources, and prompts for AI agent integration.
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.microscope import Microscope
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))
microscope = Microscope(catalyst=catalyst, alloy=alloy)
microscope.run(host="0.0.0.0", port=8000)See API Reference for the full list of HTTP endpoints, MCP tools, resources, and prompts.
Oxidizer supports live (streaming) topologies alongside batch DAG runs. A live topology deploys long-running nodes that continuously process data from Redis Streams using a batch+republish model — each worker processes one batch, re-publishes the task, and any worker can claim it next.
# Deploy a live topology
curl -X POST http://localhost:8000/topology/deploy \
-H 'Content-Type: application/json' \
-d '{"alloy_name": "example"}'
# Check status
curl http://localhost:8000/topology/{run_id}/status
# Graceful stop
curl -X POST http://localhost:8000/topology/{run_id}/stopSee Live Topologies for the full architecture, node pause/unpause, rebalancing, and crash recovery details.
Detailed documentation lives in the docs/ folder:
| Document | Description |
|---|---|
| Architecture | Component overview, data flow, node state machines, locking, error handling |
| API Reference | All HTTP endpoints, MCP tools, resources, prompts, metrics schemas |
| Configuration | Alloy YAML reference — layers, nodes, output blocks, scheduling, retention |
| Live Topologies | Batch+republish model, topology entity, deploy/stop/rebalance, pause/unpause |
| Batching | XRANGE cursor loop, per-record fan-out, auto-flatten, batch_size config |
| Retention | Task stream cleanup, producer-owned output, hierarchical data_retention |
| Plugins | Python + Rust plugin system with Maturin/PyO3, entry points |
| Testing | Pytest suite, integration tests, Docker Compose test process, pass/fail criteria |
| Examples | Four self-contained pipeline examples (skeleton, batch_etl, live_events, mixed_pipeline) |
| PyPI Publishing | Publishing to PyPI with GitHub Actions, trusted publisher, versioning strategies |
# Rust unit tests (no external services needed)
cargo test
# Python tests (no external services needed)
python -m pytest tests/ -vTBD