Make Python async functions durable and resumable with a minimal, pluggable runtime.
This package treats durability as a core runtime concern. Async functions are transformed into a sequence of CodeBlocks that can be checkpointed, paused, and resumed through a DurableRuntime. Everything is open and pluggable: orchestration backends implement a small interface, and persistence is provided through StateStore implementations.
Key primitives:
DurableRuntime— drives execution and persistence.CallStack/FunctionCall/CodeBlock— durable execution model.OrchestrationBackend/DurableBackend— orchestration + event waiting.StateStore/InMemoryStateStore/DiskStateStore— persistence.DurableAstTransformer/make_durable— transform async functions into durable programs.
import asyncio
from durable import DurableBackend, InMemoryStateStore, PauseForEventException, make_durable
# Define your async function
async def greet(name: str):
print("waiting for permission to greet")
raise PauseForEventException("allow")
return f"hello {name}"
# Make it durable
durable_greet = make_durable(greet)
async def main():
backend = DurableBackend(InMemoryStateStore())
async def run_once():
# Triggers runtime execution; will pause on PauseForEventException
try:
return await durable_greet("Ada", orchestration_backend=backend, instance_id="greet-1")
except PauseForEventException as e:
print(f"paused for event {e.event}")
await run_once() # pauses and persists state
backend.publish_event("greet-1", "allow") # deliver event
result = await durable_greet("Ada", orchestration_backend=backend, instance_id="greet-1")
print(result) # "hello Ada"
asyncio.run(main())- Execution model:
FunctionCallholds locals, an ordered list ofCodeBlocks, and the current index.CallStackorchestrates nested calls and return propagation. - Durable runtime:
DurableRuntimerestores saved state (if any), drives the call stack, and delegates persistence plus event waiting to anOrchestrationBackend. - Backends:
OrchestrationBackenddefinesload_state,save_state, andwait_for_event.DurableBackendis the OSS implementation backed by aStateStoreand an in-memory event bus. - Persistence:
StateStoredefinesload/save. The OSS package shipsInMemoryStateStoreandDiskStateStore(path). - Transformation:
DurableAstTransformerrewrites async functions into checkpointable code.make_durable(fn)wraps a function into aDurableProgramthat builds a runtime and executes it.
Implement OrchestrationBackend to integrate with your own orchestration layer:
from durable import OrchestrationBackend
class CustomBackend(OrchestrationBackend):
async def load_state(self, instance_id: str) -> dict | None:
...
async def save_state(self, instance_id: str, state: dict) -> None:
...
async def wait_for_event(self, instance_id: str, event_name: str) -> None:
...Use it by passing orchestration_backend= to the callable returned by make_durable.
Install dev dependencies and run pytest:
poetry install --with dev
poetry run pytest