Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.
Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG
Zero required dependencies. Pure asyncio. Python 3.9+.
Most async task runners group tasks into "waves" by dependency depth:
A (10s) ─┐
├─→ C (1s)
B (1s) ─┘
D (1s) ─→ E (1s)
A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.
DAGExecutor starts each task the moment its specific dependencies complete.
E starts at t=1, completes at t=2 — 9 seconds earlier.
pip install wave-executorimport asyncio
from wave_executor import DAGExecutor, WaveConfig
async def process(task_id: str, spec: dict):
# Your actual work here
return f"result-{task_id}"
async def main():
executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
result = await executor.execute(
tasks={
"fetch_a": {"url": "..."},
"fetch_b": {"url": "..."},
"analyze": {"model": "fast"},
"report": {},
},
dependencies={
"analyze": {"fetch_a", "fetch_b"}, # waits for both fetches
"report": {"analyze"},
},
task_executor=process,
)
print(f"{result.total_successful}/{result.total_tasks} succeeded")
print(f"Critical path: {result.get_critical_path(dependencies)}")
asyncio.run(main())Key features:
asyncio.Eventper task — zero-overhead dependency signallingAdaptiveSemaphore— AIMD-controlled concurrency window (self-adjusts under load)- Failed deps automatically skip all downstream tasks
- Optional dep-output injection: completed results are injected into downstream specs under
_dep_outputs - Exponential-backoff retry, per-task timeout, total execution timeout
- Cycle detection (Kahn's algorithm) before execution starts
Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.
from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy
async def run_job(task_id: str, spec: dict):
# Work that might crash and needs restarting
...
reconciler = WaveReconciler(
task_executor=run_job,
config=WaveConfig(max_parallel_per_wave=10),
poll_interval_s=2.0,
)
await reconciler.start()
await reconciler.update(
tasks={
"validator": {"port": 8080},
"aggregator": {"upstream": "validator"},
},
dependencies={"aggregator": {"validator"}},
policies={
"validator": "always", # restart after any exit
"aggregator": "on-failure", # restart only on crash
},
max_restart_attempts={"validator": 100, "aggregator": 5},
restart_backoff_base=3.0, # exponential: 0s, 3s, 6s, 12s...
)
# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})
# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()Key features:
- Three restart policies:
no/on-failure/always - Exponential backoff:
base * 2^(n-1)seconds between retries - Hot-update desired state without stopping everything
max_restart_attemptsceiling — task marked"blocked"when exceeded- Dependency-aware: tasks only start once upstream deps have
"success"status - Dep-output injection: outputs from completed deps injected into downstream specs
Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.
from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec
async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...
meta = (
MetaDAG(inject_dep_outputs=True)
.add_node(MetaDAGNodeSpec(
node_id="ingest",
tasks={"page_a": url_a, "page_b": url_b},
task_executor=fetch,
))
.add_node(MetaDAGNodeSpec(
node_id="analyze",
tasks={"report": {}},
task_executor=analyze, # receives _upstream_outputs["ingest"]
), depends_on=["ingest"])
.add_node(MetaDAGNodeSpec(
node_id="store",
tasks={"db_write": {}},
task_executor=store,
), depends_on=["analyze"])
)
result = await meta.execute()
print(result.to_dict())Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.
from wave_executor import WaveConfig
config = WaveConfig(
max_parallel_per_wave=50, # global concurrency cap
default_task_timeout=30.0, # per-task timeout (seconds)
total_execution_timeout=600.0, # hard limit for the whole execution
max_retries=3, # exponential-backoff retries
retry_backoff_factor=2.0,
adaptive_batch_window=True, # AIMD concurrency auto-scaling
min_parallel=2, # floor for adaptive window
saturation_threshold=20, # waiting tasks that trigger shrink
)Subscribe to execution events on any level:
executor = DAGExecutor()
@executor.on_event
async def handle(event_type: str, event: dict):
print(event_type, event["task_id"], event.get("duration_ms"))
# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
# TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETEDMetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.
pip install wave-executor[dev]
pytestStructured escalation reports
When a task exhausts all retries or hits a timeout, DAGExecutor now emits an EscalationReport instead of a bare error string. The report is stored under task_result.output["escalation"] and contains:
failure_history— error message from every attemptroot_cause— heuristic: consistent errors across attempts → "code bug or bad input spec"; circuit trip → "downstream service unavailable"resolution_options— suggested next actionscircuit_state— circuit breaker state at escalation timeescalated_at— ISO timestamp
A TASK_ESCALATED event is also fired to all registered on_event callbacks.
Circuit breaker HALF_OPEN pre-entry guard
_check_circuit_breaker_state() is now called at the top of every retry iteration (attempt > 0), before the task executor and before any backoff sleep. It fast-fails if the circuit is already HALF_OPEN (a probe is in flight — additional retries must not pile on as extra probes) or OPEN with recovery timeout not yet elapsed. This makes the single-probe recovery semantics strict rather than advisory.
EscalationReport is now exported from the top-level package.
- Circuit breaker (
CLOSED → OPEN → HALF_OPEN → CLOSEDlifecycle) onDAGExecutor - Checkpoint persistence on task failure
AdaptiveSemaphoreAIMD concurrency control
- Initial release:
DAGExecutor,WaveReconciler,MetaDAG - Kahn's algorithm cycle detection
- Dep-output injection, exponential-backoff retry, per-task timeout
on_eventobservability callbacks
MIT