Skip to content

nwpz/do-over

do-over

Crates.io Documentation CI License

do-over is an async-first resilience and transient fault handling library for Rust, inspired by the .NET Polly library.

Goals

  • Explicit failure modeling
  • Async-native (Tokio)
  • No global state
  • Observable and composable
  • Familiar mental model for Polly users

Installation

From crates.io (recommended)

Add do-over to your Cargo.toml:

[dependencies]
do-over = "0.1"
tokio = { version = "1", features = ["full"] }

From GitHub

To use the latest development version directly from GitHub:

[dependencies]
do-over = { git = "https://github.com/nwpz/do-over.git", branch = "main" }
tokio = { version = "1", features = ["full"] }

Or pin to a specific commit:

[dependencies]
do-over = { git = "https://github.com/nwpz/do-over.git", rev = "d01bf82" }
tokio = { version = "1", features = ["full"] }

Feature Flags

Feature Description
http Enables reqwest integration for HTTP clients
metrics-prometheus Prometheus metrics integration
metrics-otel OpenTelemetry metrics integration
# With optional features
do-over = { version = "0.1", features = ["http", "metrics-prometheus"] }

Quick Start

1. Create a new project

cargo new my-resilient-app
cd my-resilient-app

2. Add dependencies to Cargo.toml

[dependencies]
do-over = { git = "https://github.com/nwpz/do-over.git" }
tokio = { version = "1", features = ["full"] }

3. Write your resilient code

Replace src/main.rs with:

use do_over::{policy::Policy, retry::RetryPolicy, timeout::TimeoutPolicy, wrap::Wrap};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a resilient policy: retry 3 times with 5s timeout per attempt
    let policy = Wrap::new(
        RetryPolicy::fixed(3, Duration::from_millis(100)),
        TimeoutPolicy::new(Duration::from_secs(5)),
    );

    // Execute your operation with resilience
    let result = policy.execute(|| async {
        // Your async operation here (e.g., HTTP request, database query)
        Ok::<_, std::io::Error>("Hello, resilience!")
    }).await?;

    println!("{}", result);
    Ok(())
}

4. Run your application

cargo run

Real-World Example: Resilient HTTP Client

use do_over::{
    policy::Policy,
    error::DoOverError,
    retry::RetryPolicy,
    timeout::TimeoutPolicy,
    circuit_breaker::CircuitBreaker,
    wrap::Wrap,
};
use std::time::Duration;

// Define your error type
type AppError = DoOverError<String>;

#[tokio::main]
async fn main() -> Result<(), AppError> {
    // Build a resilient policy stack
    let policy = Wrap::new(
        CircuitBreaker::new(5, Duration::from_secs(30)),  // Open after 5 failures
        Wrap::new(
            RetryPolicy::exponential(3, Duration::from_millis(100), 2.0),  // Retry with backoff
            TimeoutPolicy::new(Duration::from_secs(10)),  // 10s timeout per attempt
        ),
    );

    // Use the policy for API calls
    let user_data = policy.execute(|| async {
        // Simulate an API call
        println!("Calling API...");
        Ok::<_, AppError>("{ \"name\": \"Alice\", \"id\": 123 }".to_string())
    }).await?;

    println!("Response: {}", user_data);
    Ok(())
}

Table of Contents


Development Setup (VS Code Dev Containers)

Prerequisites

  • Docker
  • Visual Studio Code
  • VS Code Remote Containers extension

Steps

  1. Unzip this repository
  2. Open the folder in VS Code
  3. When prompted: Reopen in Container
  4. The container will:
    • Install Rust
    • Run cargo build
    • Enable rust-analyzer

You are now ready to develop.


Core Concepts

Policy Trait

All resilience policies in do-over implement the Policy<E> trait:

#[async_trait::async_trait]
pub trait Policy<E>: Send + Sync {
    async fn execute<F, Fut, T>(&self, f: F) -> Result<T, E>
    where
        F: Fn() -> Fut + Send + Sync,
        Fut: Future<Output = Result<T, E>> + Send,
        T: Send;
}

This trait allows you to wrap any async operation with resilience patterns. The operation must return a Result<T, E>, making error handling explicit.

Error Handling

do-over uses the DoOverError<E> type to wrap your application errors with policy-specific failures:

pub enum DoOverError<E> {
    Timeout,           // Operation exceeded timeout
    CircuitOpen,       // Circuit breaker is open
    BulkheadFull,      // Bulkhead or rate limiter rejected
    Inner(E),          // Your application error
}

This allows you to distinguish between infrastructure failures (timeout, circuit open) and application failures.


Policies

Retry Policy

The retry policy automatically retries failed operations with configurable backoff strategies.

Features

  • Fixed Backoff: Wait a constant duration between retries
  • Exponential Backoff: Increase delay exponentially with each retry
  • Metrics Integration: Track retry attempts and outcomes
flowchart TD
    Start([Execute Operation]) --> Execute[Run Operation]
    Execute --> Check{Success?}
    Check -->|Yes| Success([Return Result])
    Check -->|No| CountCheck{Retries<br/>Remaining?}
    CountCheck -->|Yes| Wait[Wait with Backoff]
    Wait --> Increment[Increment Attempt]
    Increment --> Execute
    CountCheck -->|No| Fail([Return Error])
    
    style Success fill:#90EE90
    style Fail fill:#FFB6C6
    style Wait fill:#FFE4B5
Loading

Usage

Fixed Backoff:

use do_over::retry::RetryPolicy;
use std::time::Duration;

// Retry up to 3 times with 100ms between attempts
let retry = RetryPolicy::fixed(3, Duration::from_millis(100));

let result = retry.execute(|| async {
    // Your async operation here
    Ok::<_, std::io::Error>("success")
}).await?;

Exponential Backoff:

// Retry up to 5 times with exponential backoff
// Base delay: 100ms, multiplier: 2.0
// Delays will be: 100ms, 200ms, 400ms, 800ms, 1600ms
let retry = RetryPolicy::exponential(
    5,
    Duration::from_millis(100),
    2.0
);

With Metrics:

let retry = RetryPolicy::fixed(3, Duration::from_millis(100))
    .with_metrics(metrics_collector);

When to Use

  • Network calls that may fail due to transient issues
  • Database operations that might temporarily fail
  • Any operation where temporary failures are expected
  • External API calls with occasional timeouts

Circuit Breaker

The circuit breaker prevents cascading failures by stopping requests to a failing service, giving it time to recover.

How It Works

The circuit breaker has three states:

  1. Closed: Requests flow normally. Failures are counted.
  2. Open: After reaching the failure threshold, the circuit opens and immediately rejects requests.
  3. Half-Open: After the reset timeout, one request is allowed through to test if the service recovered.
stateDiagram-v2
    [*] --> Closed
    Closed --> Open: Failure threshold reached
    Open --> HalfOpen: Reset timeout elapsed
    HalfOpen --> Closed: Request succeeds
    HalfOpen --> Open: Request fails
    Closed --> Closed: Request succeeds
    
    note right of Closed
        Requests pass through
        Failures are counted
    end note
    
    note right of Open
        Requests fail immediately
        No calls to service
    end note
    
    note right of HalfOpen
        One test request allowed
        Determines next state
    end note
Loading

Usage

use do_over::circuit_breaker::CircuitBreaker;
use std::time::Duration;

// Open after 5 failures, reset after 60 seconds
let breaker = CircuitBreaker::new(5, Duration::from_secs(60));

match breaker.execute(|| async {
    // Call to potentially failing service
    external_service_call().await
}).await {
    Ok(result) => println!("Success: {:?}", result),
    Err(DoOverError::CircuitOpen) => println!("Circuit is open, not calling service"),
    Err(e) => println!("Service error: {:?}", e),
}

Configuration

  • failure_threshold: Number of consecutive failures before opening the circuit
  • reset_timeout: How long to wait before transitioning from Open to Half-Open

When to Use

  • Protecting your application from cascading failures
  • Preventing resource exhaustion when a dependency is down
  • Giving failing services time to recover
  • Fast-failing when a service is known to be down

Timeout Policy

The timeout policy ensures operations complete within a specified duration, preventing indefinite hangs.

sequenceDiagram
    participant C as Client
    participant T as Timeout Policy
    participant O as Operation
    participant Timer as Timer
    
    C->>T: execute()
    T->>Timer: Start timeout
    T->>O: Start operation
    
    alt Operation completes first
        O-->>T: Result
        T->>Timer: Cancel
        T-->>C: Return result
    else Timer expires first
        Timer-->>T: Timeout!
        T->>O: Cancel
        T-->>C: DoOverError::Timeout
    end
Loading

Usage

use do_over::timeout::TimeoutPolicy;
use std::time::Duration;

// Fail if operation takes longer than 5 seconds
let timeout = TimeoutPolicy::new(Duration::from_secs(5));

match timeout.execute(|| async {
    slow_operation().await
}).await {
    Ok(result) => println!("Completed in time: {:?}", result),
    Err(DoOverError::Timeout) => println!("Operation timed out"),
    Err(e) => println!("Other error: {:?}", e),
}

When to Use

  • HTTP requests to external services
  • Database queries that might hang
  • Any operation with SLA requirements
  • Preventing resource leaks from hanging operations
  • Ensuring responsive applications with bounded latency

Bulkhead Isolation

The bulkhead policy limits concurrent executions, preventing resource exhaustion and isolating failures.

Features

  • Concurrency Limiting: Control maximum parallel operations
  • Queue Timeout: Optionally fail fast when bulkhead is full
  • Resource Protection: Prevent thread pool or connection pool exhaustion
flowchart LR
    subgraph Bulkhead["Bulkhead (Max: 3)"]
        S1[Slot 1: 🔵]
        S2[Slot 2: 🔵]
        S3[Slot 3: 🔵]
    end
    
    R1[Request 1] --> S1
    R2[Request 2] --> S2
    R3[Request 3] --> S3
    R4[Request 4] -.->|Rejected| Reject[❌ BulkheadFull]
    R5[Request 5] -.->|Waiting| Queue[⏳ Queue]
    
    S1 --> Complete1[✅ Complete]
    Queue -.->|Slot available| S1
    
    style Reject fill:#FFB6C6
    style Queue fill:#FFE4B5
    style Complete1 fill:#90EE90
Loading

Usage

Basic Bulkhead:

use do_over::bulkhead::Bulkhead;

// Allow maximum 10 concurrent executions
let bulkhead = Bulkhead::new(10);

let result = bulkhead.execute(|| async {
    expensive_operation().await
}).await;

With Queue Timeout:

use std::time::Duration;

// Allow 10 concurrent, wait max 1 second for a slot
let bulkhead = Bulkhead::new(10)
    .with_queue_timeout(Duration::from_secs(1));

match bulkhead.execute(|| async {
    expensive_operation().await
}).await {
    Ok(result) => println!("Executed: {:?}", result),
    Err(DoOverError::BulkheadFull) => println!("No capacity available"),
    Err(e) => println!("Operation failed: {:?}", e),
}

When to Use

  • Protecting limited resources (database connections, file handles)
  • Preventing one service from monopolizing thread pools
  • Isolating different types of work
  • Rate-limiting resource-intensive operations
  • Implementing fair resource allocation

Rate Limiter

The rate limiter controls the rate of operations using a token bucket algorithm.

How It Works

The rate limiter maintains a bucket of tokens that refill at regular intervals:

  • Each operation consumes one token
  • If no tokens are available, the operation is rejected
  • Tokens refill to capacity after each interval
flowchart TD
    Start([Request]) --> Check{Tokens<br/>Available?}
    Check -->|Yes| Consume[Consume Token]
    Consume --> Execute[Execute Operation]
    Execute --> Success([Return Result])
    Check -->|No| Reject([Rate Limit Exceeded])
    
    Interval[Time Interval] -.->|Refill| Bucket[(Token Bucket<br/>🪙🪙🪙)]
    Bucket -.-> Check
    Consume -.->|Update| Bucket
    
    style Success fill:#90EE90
    style Reject fill:#FFB6C6
    style Bucket fill:#E6E6FA
    style Interval fill:#FFE4B5
Loading

Usage

use do_over::rate_limit::RateLimiter;
use std::time::Duration;

// Allow 100 requests per second
let limiter = RateLimiter::new(100, Duration::from_secs(1));

match limiter.execute(|| async {
    api_call().await
}).await {
    Ok(result) => println!("Success: {:?}", result),
    Err(DoOverError::BulkheadFull) => println!("Rate limit exceeded"),
    Err(e) => println!("Error: {:?}", e),
}

When to Use

  • Complying with API rate limits
  • Protecting services from overload
  • Implementing fair usage policies
  • Throttling expensive operations
  • Controlling request rates to external services

Hedge Policy

The hedge policy improves latency by starting a backup request if the primary request is slow.

How It Works

  1. Start the primary request
  2. After a configured delay, start a hedged (backup) request
  3. Return whichever completes first
  4. Cancel the slower request

This reduces tail latency when some requests are unexpectedly slow.

sequenceDiagram
    participant C as Client
    participant H as Hedge Policy
    participant P1 as Primary Request
    participant P2 as Hedged Request
    
    C->>H: execute()
    H->>P1: Start primary
    Note over H: Wait hedge delay
    H->>P2: Start hedged request
    
    alt Primary completes first
        P1-->>H: Result ✅
        H->>P2: Cancel
        H-->>C: Return result
    else Hedged completes first
        P2-->>H: Result ✅
        H->>P1: Cancel
        H-->>C: Return result
    end
Loading

Usage

use do_over::hedge::Hedge;
use std::time::Duration;

// Start backup request after 100ms
let hedge = Hedge::new(Duration::from_millis(100));

let result = hedge.execute(|| async {
    potentially_slow_operation().await
}).await?;

When to Use

  • Read operations where latency matters more than cost
  • Services with high latency variance
  • Operations where sending duplicate requests is safe
  • Improving P99 latency
  • Important: Only use with idempotent operations (safe to execute multiple times)

Advanced Usage

Composing Policies with Wrap

The Wrap utility allows you to compose multiple policies together, creating sophisticated resilience strategies by layering policies.

How Wrap Works

Wrap takes two policies (outer and inner) and chains them together. Execution flows from outer → inner → operation.

flowchart LR
    Request([Request]) --> Outer[Outer Policy]
    Outer --> Inner[Inner Policy]
    Inner --> Operation[Your Operation]
    Operation --> Inner
    Inner --> Outer
    Outer --> Response([Response])
    
    style Outer fill:#E6F3FF
    style Inner fill:#FFF4E6
    style Operation fill:#E8F5E9
Loading

Basic Usage

Simple Two-Policy Composition:

use do_over::wrap::Wrap;
use do_over::retry::RetryPolicy;
use do_over::timeout::TimeoutPolicy;
use std::time::Duration;

// Create individual policies
let retry = RetryPolicy::fixed(3, Duration::from_millis(100));
let timeout = TimeoutPolicy::new(Duration::from_secs(5));

// Wrap them together
let policy = Wrap::new(retry, timeout);

// Execute with composed policy
let result = policy.execute(|| async {
    // Your operation here
    external_api_call().await
}).await?;

In this example, the retry policy wraps the timeout policy. Each retry attempt has a 5-second timeout.

sequenceDiagram
    participant C as Client
    participant R as Retry Policy
    participant T as Timeout Policy
    participant O as Operation
    
    C->>R: execute()
    R->>T: attempt 1
    T->>O: call with timeout
    O--xT: fails
    T--xR: error
    R->>R: wait backoff
    R->>T: attempt 2
    T->>O: call with timeout
    O-->>T: success ✅
    T-->>R: result
    R-->>C: result
Loading

Multi-Layer Composition

Wrapping Multiple Policies:

use do_over::wrap::Wrap;
use do_over::retry::RetryPolicy;
use do_over::circuit_breaker::CircuitBreaker;
use do_over::timeout::TimeoutPolicy;
use do_over::bulkhead::Bulkhead;
use std::time::Duration;

// Create policies
let bulkhead = Bulkhead::new(10);
let breaker = CircuitBreaker::new(5, Duration::from_secs(60));
let retry = RetryPolicy::exponential(3, Duration::from_millis(100), 2.0);
let timeout = TimeoutPolicy::new(Duration::from_secs(5));

// Nest wraps for complex composition
let policy = Wrap::new(
    bulkhead,
    Wrap::new(
        breaker,
        Wrap::new(retry, timeout)
    )
);

// Execution order: bulkhead → breaker → retry → timeout → operation
let result = policy.execute(|| async {
    database_query().await
}).await?;
flowchart TB
    Client([Client Request])
    
    subgraph Wrap1["Outermost Wrap"]
        BH[Bulkhead<br/>10 concurrent max]
        subgraph Wrap2["Middle Wrap"]
            CB[Circuit Breaker<br/>5 failure threshold]
            subgraph Wrap3["Inner Wrap"]
                RT[Retry Policy<br/>3 attempts, exponential]
                TO[Timeout<br/>5 seconds]
            end
        end
    end
    
    OP[Operation]
    Result([Result])
    
    Client --> BH
    BH -->|Has capacity| CB
    CB -->|Circuit closed| RT
    RT --> TO
    TO --> OP
    OP --> TO
    TO --> RT
    RT -->|Success/Max retries| CB
    CB --> BH
    BH --> Result
    
    style BH fill:#E6F3FF
    style CB fill:#FFE6E6
    style RT fill:#FFF4E6
    style TO fill:#E6FFE6
    style OP fill:#E8F5E9
    style Result fill:#90EE90
Loading

Common Patterns

Pattern 1: Retry with Timeout Each retry attempt is individually timed out:

let policy = Wrap::new(
    RetryPolicy::fixed(3, Duration::from_millis(100)),
    TimeoutPolicy::new(Duration::from_secs(5))
);

Pattern 2: Circuit Breaker with Retry Retries only happen when circuit is closed:

let policy = Wrap::new(
    CircuitBreaker::new(5, Duration::from_secs(60)),
    RetryPolicy::exponential(3, Duration::from_millis(100), 2.0)
);

Pattern 3: Bulkhead with Everything Limit concurrency before applying other policies:

let policy = Wrap::new(
    Bulkhead::new(20),
    Wrap::new(
        CircuitBreaker::new(10, Duration::from_secs(30)),
        Wrap::new(
            RetryPolicy::fixed(2, Duration::from_millis(50)),
            TimeoutPolicy::new(Duration::from_secs(3))
        )
    )
);

Best Practices for Policy Ordering

The order in which you wrap policies matters significantly:

  1. Bulkhead (Outermost): Limit concurrency first to protect resources
  2. Circuit Breaker: Fast-fail before attempting expensive operations
  3. Rate Limiter: Throttle before retrying
  4. Retry: Attempt multiple times for transient failures
  5. Timeout (Innermost): Apply time bounds to individual attempts
  6. Hedge: Use for read operations where duplicates are acceptable
flowchart TD
    Start([Request]) --> Order{Recommended<br/>Policy Order}
    
    Order --> L1[1️⃣ Bulkhead<br/>Control concurrency]
    L1 --> L2[2️⃣ Circuit Breaker<br/>Fast fail if open]
    L2 --> L3[3️⃣ Rate Limiter<br/>Throttle requests]
    L3 --> L4[4️⃣ Retry<br/>Handle transients]
    L4 --> L5[5️⃣ Timeout<br/>Bound attempts]
    L5 --> L6[6️⃣ Operation<br/>Your code]
    
    L6 --> Result([Result])
    
    style L1 fill:#E6F3FF
    style L2 fill:#FFE6E6
    style L3 fill:#F0E6FF
    style L4 fill:#FFF4E6
    style L5 fill:#E6FFE6
    style L6 fill:#E8F5E9
    style Result fill:#90EE90
Loading

Real-World Example

Complete example for a resilient HTTP client:

use do_over::wrap::Wrap;
use do_over::bulkhead::Bulkhead;
use do_over::circuit_breaker::CircuitBreaker;
use do_over::retry::RetryPolicy;
use do_over::timeout::TimeoutPolicy;
use do_over::error::DoOverError;
use std::time::Duration;

async fn fetch_user_data(user_id: u64) -> Result<String, DoOverError<reqwest::Error>> {
    // Build resilient policy stack
    let policy = Wrap::new(
        Bulkhead::new(100),  // Max 100 concurrent requests
        Wrap::new(
            CircuitBreaker::new(10, Duration::from_secs(60)),  // Open after 10 failures
            Wrap::new(
                RetryPolicy::exponential(3, Duration::from_millis(100), 2.0),  // 3 retries
                TimeoutPolicy::new(Duration::from_secs(10))  // 10s per attempt
            )
        )
    );
    
    // Execute HTTP request with full resilience
    policy.execute(|| async {
        let response = reqwest::get(format!("https://api.example.com/users/{}", user_id))
            .await
            .map_err(DoOverError::Inner)?;
        
        let body = response.text()
            .await
            .map_err(DoOverError::Inner)?;
        
        Ok(body)
    }).await
}

Executing Operations

All policies use the same execution pattern:

let result = policy.execute(|| async {
    // Your async operation
    Ok::<_, YourErrorType>(value)
}).await;

The operation must be a closure that returns a Future<Output = Result<T, E>>.


Tower Integration

do-over integrates with Tower middleware for HTTP services:

use do_over::tower;
use tower::ServiceBuilder;

let service = ServiceBuilder::new()
    .layer(do_over::tower::PolicyLayer::new(retry_policy))
    .service(your_service);

This allows you to apply resilience policies to Tower services, including:

  • Hyper HTTP services
  • Tonic gRPC services
  • Any service implementing tower::Service

Metrics

Observability

do-over provides hooks for metrics collection to monitor the health of your resilience policies.

Prometheus Integration

Enable Prometheus metrics:

do-over = { path = ".", features = ["metrics-prometheus"] }

OpenTelemetry Integration

Enable OpenTelemetry metrics:

do-over = { path = ".", features = ["metrics-otel"] }

Custom Metrics

Implement the Metrics trait to integrate with your observability system:

pub trait Metrics: Send + Sync {
    fn on_success(&self);
    fn on_failure(&self);
    fn on_retry(&self);
}

Philosophy

Unlike Polly, do-over:

  • Uses Result<T, E> instead of exceptions
  • Makes failures explicit
  • Avoids hidden background behavior

This makes do-over ideal for high-reliability systems.

Design Principles

  1. Explicit over Implicit: All errors are returned as Result types
  2. Async-First: Built on Tokio for native async/await support
  3. Zero Global State: All state is explicit and contained in policy instances
  4. Composable: Policies can be easily combined and nested
  5. Type-Safe: Leverages Rust's type system for correctness
  6. Observable: Metrics and instrumentation built into the design

Examples

The examples/ directory contains comprehensive demonstrations of each policy:

Example Description Run Command
retry Fixed and exponential backoff strategies cargo run --example retry
circuit_breaker State transitions: Closed → Open → Half-Open cargo run --example circuit_breaker
timeout Time-bounded operations cargo run --example timeout
bulkhead Concurrency limiting with queue timeout cargo run --example bulkhead
rate_limiter Token bucket rate limiting cargo run --example rate_limiter
hedge Hedged requests for latency reduction cargo run --example hedge
composition Policy composition patterns with Wrap cargo run --example composition
comprehensive Real-world order processing system cargo run --example comprehensive

Running Examples

# Run a specific example
cargo run --example retry

# Run all examples (build check)
cargo build --examples

Example Output

Each example produces visual output showing policy behavior:

=== Do-Over Retry Policy Example ===

📌 Scenario 1: Fixed Backoff (fails twice, succeeds on third)
   Configuration: max_retries=2, delay=100ms

   [+   0ms] Attempt 1: ❌ Simulated failure
   [+ 105ms] Attempt 2: ❌ Simulated failure
   [+ 209ms] Attempt 3: ✅ Success!

   Result: "Operation completed successfully"

API Reference

Core Types

Type Description
Policy<E> Trait implemented by all resilience policies
DoOverError<E> Error type wrapping policy and application errors
Wrap<O, I> Composes two policies together

Policy Constructors

// Retry
RetryPolicy::fixed(max_retries, delay)
RetryPolicy::exponential(max_retries, base_delay, factor)

// Circuit Breaker
CircuitBreaker::new(failure_threshold, reset_timeout)

// Timeout
TimeoutPolicy::new(duration)

// Bulkhead
Bulkhead::new(max_concurrent)
Bulkhead::new(max_concurrent).with_queue_timeout(duration)

// Rate Limiter
RateLimiter::new(capacity, interval)

// Hedge
Hedge::new(delay)

// Composition
Wrap::new(outer_policy, inner_policy)

License

MIT or Apache 2.0

About

No description, website, or topics provided.

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages