Skip to content

kowalski21/restate-saga-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

restate-saga

Saga pattern implementation for Restate durable workflows in Python with automatic compensation.

Features

  • Automatic compensation - When a step fails, all previous steps are rolled back in reverse order
  • Flexible step types - Hybrid (create_saga_step) or strict (create_saga_step_strict) compensation modes
  • Decorator API - @saga_step, @saga_step_strict, @saga_workflow decorators for concise definitions
  • Global error registry - Register error classes that should always trigger compensation
  • Composable workflows - Embed workflows within workflows using run_as_step
  • Virtual Object support - Saga pattern for stateful keyed entities
  • Restate Workflows - Long-running workflows with signals, queries, and saga support
  • Pydantic validation - Automatic input validation when type hints use Pydantic models

Installation

pip install restate-saga

Requires: Python 3.11+, restate_sdk[serde]

Quick Start

import restate
from restate_saga import (
    SagaContext,
    StepResponse,
    saga_step_strict,
    saga_workflow,
)

# Define steps with compensation

async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val["customer_id"])
    return StepResponse(output={"order_id": order.id})

async def refund_payment(data):
    await payment_service.refund(data["payment_id"])

@saga_step_strict("ProcessPayment", compensate=refund_payment)
async def process_payment(ctx: restate.Context, input_val):
    payment = await payment_service.charge(input_val["order_id"])
    return StepResponse(output={"payment_id": payment.id})

# Define the workflow

@saga_workflow("CheckoutWorkflow")
async def checkout_workflow(saga: SagaContext, input_val):
    # If process_payment fails, create_order.compensate() runs automatically
    order = await create_order(saga, input_val)
    payment = await process_payment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"], "payment_id": payment["payment_id"]}

Core Concepts

Saga Pattern

Each step has a corresponding compensation (undo) action. If a later step fails, all earlier compensations run in reverse order.

Step 1 → Step 2 → Step 3 (fails!)
                    ↓
         Compensate 2 ← Compensate 1

Step Types

Hybrid (create_saga_step / @saga_step)

Registers compensation before execution. Compensation runs even if the step fails partway through. The failed keyword argument indicates whether the forward action completed.

async def cancel_order(data, failed=False):
    # `failed` tells you if the step threw an error
    await order_service.cancel(data.get("order_id"))

@saga_step("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order_id = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order_id},
        compensation_data={"order_id": order_id},
    )

Strict (create_saga_step_strict / @saga_step_strict)

Registers compensation after success. Use when compensation requires data that only exists after completion.

async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order.id},
        compensation_data={"order_id": order.id},
    )

StepResponse

# compensation_data defaults to output when omitted
StepResponse(output={"order_id": "123"})

# explicit compensation_data
StepResponse(output={"order_id": "123"}, compensation_data={"order_id": "123", "extra": True})

# no compensation data (e.g. validation step)
StepResponse(output={"valid": True}, compensation_data=None)

# permanent failure — triggers compensation with provided data
StepResponse.permanent_failure("Payment declined", {"auth_id": "abc"})

Steps Without Compensation

For validation, read-only operations, or idempotent actions, omit the compensate function:

@saga_step("ValidateInput")
async def validate_input(ctx: restate.Context, input_val):
    if not input_val.get("email"):
        return StepResponse.permanent_failure("Email required", None)
    return StepResponse(output={"valid": True}, compensation_data=None)

Function API

Steps and workflows can also be created without decorators:

from restate_saga import create_saga_step, create_saga_step_strict, create_saga_workflow

create_order = create_saga_step(
    name="CreateOrder",
    run=create_order_handler,
    compensate=cancel_order,
)

checkout = create_saga_workflow(
    name="CheckoutWorkflow",
    handler=checkout_handler,
)

Global Error Registry

Register error classes that should always trigger compensation without retrying:

from restate_saga import register_terminal_errors

class ValidationError(Exception): ...
class NotFoundError(Exception): ...

register_terminal_errors([ValidationError, NotFoundError])

# Now any step that raises these will trigger compensation
@saga_step("MyStep")
async def my_step(ctx, input_val):
    raise ValidationError("Invalid input")  # → triggers compensation

Custom error mapping:

from restate_saga import set_global_error_mapper
import restate

def my_mapper(err: Exception) -> restate.TerminalError | None:
    if isinstance(err, BusinessError):
        return restate.TerminalError(str(err))
    return None

set_global_error_mapper(my_mapper)

Composing Workflows

Use run_as_step to embed a workflow within another, sharing the compensation context:

@saga_workflow("PaymentWorkflow")
async def payment_workflow(saga, input_val):
    auth = await authorize_payment(saga, input_val)
    capture = await capture_payment(saga, {"auth_id": auth["id"]})
    return {"payment_id": capture["id"]}

@saga_workflow("OrderWorkflow")
async def order_workflow(saga, input_val):
    order = await create_order(saga, input_val)

    # Payment workflow's compensations join this saga
    payment = await payment_workflow.run_as_step(saga, {"amount": order["total"]})

    # If shipping fails, both order AND payment are compensated
    shipment = await create_shipment(saga, {"order_id": order["id"]})

    return {"order_id": order["id"], "payment_id": payment["payment_id"]}

Nested Sagas

For inline nested logic without a full workflow:

from restate_saga import run_nested_saga, create_saga_module

# Inline nested saga
async def handle_payment(saga):
    auth = await authorize(saga, {"amount": 100})
    capture = await capture(saga, {"auth_id": auth["id"]})
    return capture

result = await run_nested_saga(saga, handle_payment)

# Reusable saga module (not a Restate service)
payment_module = create_saga_module(payment_handler)
result = await payment_module(saga, input_val)

Virtual Objects

Stateful entities with saga support:

from restate_saga import create_saga_virtual_object, SagaContext

cart = create_saga_virtual_object("ShoppingCart")

@cart.handler()
async def checkout(saga: SagaContext, ctx: restate.ObjectContext, input_val):
    payment = await charge_payment(saga, {"amount": input_val["total"]})
    ctx.clear("items")
    return {"order_id": payment["order_id"]}

@cart.handler(kind="shared")
async def get_items(ctx: restate.ObjectSharedContext):
    return await ctx.get("items") or []

Restate Workflows (Long-Running)

For workflows with signals and queries:

from restate_saga import create_saga_restate_workflow, SagaContext

wf = create_saga_restate_workflow("ApprovalWorkflow")

@wf.main()
async def run(saga: SagaContext, ctx: restate.WorkflowContext, input_val):
    order = await create_order(saga, input_val)

    # Wait for approval signal (durable promise)
    approved = await ctx.promise("approval").value()

    if not approved:
        raise restate.TerminalError("Order rejected")

    shipment = await create_shipment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"]}

@wf.handler()
async def approve(ctx: restate.WorkflowSharedContext, input_val):
    await ctx.promise("approval").resolve(input_val["approved"])

Pydantic Validation

When step or workflow handlers use Pydantic model type hints, inputs are automatically validated:

from pydantic import BaseModel

class CheckoutInput(BaseModel):
    customer_id: str

@saga_workflow("CheckoutWorkflow")
async def checkout(saga: SagaContext, input_val: CheckoutInput):
    # input_val is validated and converted to CheckoutInput
    order = await create_order(saga, input_val)
    return {"order_id": order["order_id"]}

Step & Workflow Options

Configure retry policies and service-level options:

from datetime import timedelta
from restate_saga import SagaStepOptions, StepRetryPolicy, SagaWorkflowOptions, WorkflowRetryPolicy

# Step-level retry
step = create_saga_step(
    name="ChargePayment",
    run=charge_handler,
    compensate=refund_handler,
    options=SagaStepOptions(
        retry=StepRetryPolicy(
            max_retry_attempts=3,
            initial_retry_interval=timedelta(seconds=1),
            retry_interval_factor=2.0,
            max_retry_interval=timedelta(seconds=30),
        ),
        compensation_retry=StepRetryPolicy(max_retry_attempts=5),
    ),
)

# Workflow-level options
workflow = create_saga_workflow(
    name="OrderWorkflow",
    handler=order_handler,
    options=SagaWorkflowOptions(
        retry_policy=WorkflowRetryPolicy(max_attempts=3),
        idempotency_retention=timedelta(days=1),
        ingress_private=True,
    ),
)

Project Structure

restate_saga/
├── __init__.py            # Public API exports
├── steps.py               # create_saga_step, create_saga_step_strict, decorators
├── workflows.py           # create_saga_workflow, SagaWorkflowService
├── restate_workflows.py   # create_saga_restate_workflow (long-running)
├── virtual_objects.py     # create_saga_virtual_object
├── step_response.py       # StepResponse class
├── error_registry.py      # Terminal error registration
├── nested.py              # run_nested_saga, create_saga_module
├── types.py               # SagaContext, options, policies
└── _validation.py         # Pydantic input validation
app/
├── main.py                # FastAPI app with Restate mounted
└── services/              # Example service handlers
tests/                     # Test suite

Running the Example App

Prerequisites

  1. Install Restate Server:

    # macOS
    brew install restatedev/tap/restate-server
    
    # Or Docker
    docker run -d --name restate -p 8080:8080 -p 9070:9070 docker.io/restatedev/restate:latest
  2. Start Restate Server:

    restate-server

Start the Service

# Install dependencies
pip install -e ".[dev]"

# Run the server
hypercorn app.main:api --bind 0.0.0.0:9080

Register with Restate

restate deployments register http://localhost:9080/restate/v1

Invoke a Workflow

curl -X POST http://localhost:8080/CheckoutWorkflow/run \
  -H "Content-Type: application/json" \
  -d '{"customer_id": "cust_123"}'

API Reference

Steps

  • create_saga_step(name, run, compensate?, options?) - Hybrid compensation (registered before execution)
  • create_saga_step_strict(name, run, compensate?, options?) - Strict compensation (registered after success)
  • @saga_step(name, compensate?, options?) - Decorator for hybrid steps
  • @saga_step_strict(name, compensate?, options?) - Decorator for strict steps
  • StepResponse(output, compensation_data?) - Step result with optional compensation data
  • StepResponse.permanent_failure(message, compensation_data) - Signal permanent failure

Workflows

  • create_saga_workflow(name, handler, options?) - Create a saga workflow service
  • @saga_workflow(name, options?) - Decorator for workflows
  • SagaWorkflowService.run_as_step(parent_saga, input) - Embed in parent workflow

Restate Workflows

  • create_saga_restate_workflow(name, options?) - Create a long-running workflow
  • SagaRestateWorkflow.main() - Register main handler with saga support
  • SagaRestateWorkflow.handler() - Register shared signal/query handlers
  • SagaRestateWorkflow.run_as_step(parent_saga, input) - Embed in parent workflow

Virtual Objects

  • create_saga_virtual_object(name, options?) - Create a Virtual Object
  • SagaVirtualObject.handler(kind="exclusive") - Exclusive handler with saga
  • SagaVirtualObject.handler(kind="shared") - Shared handler without saga

Error Registry

  • register_terminal_errors(error_classes) - Register error classes as terminal
  • unregister_terminal_errors(error_classes) - Unregister error classes
  • clear_terminal_errors() - Clear all registered errors
  • set_global_error_mapper(mapper) - Set a custom error mapper
  • resolve_terminal_error(err, step_mapper?) - Resolve error to terminal

Nested Sagas

  • run_nested_saga(saga, handler) - Run inline saga with shared compensation
  • create_saga_module(handler) - Create a reusable saga module

Types

  • SagaContext - Context with Restate ctx and compensations stack
  • StepRetryPolicy - Retry config for step-level operations
  • SagaStepOptions - Step options (retry, compensation_retry, as_terminal_error)
  • WorkflowRetryPolicy - Retry config for service/workflow level
  • SagaWorkflowOptions - Workflow options (retry, retention, timeouts)

License

MIT

About

Saga pattern implementation for Restate durable workflows in Python with automatic compensation

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages