Skip to content

Commit

Permalink
Added a circuit breaker layer
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jun 17, 2024
1 parent 89ce762 commit 1a29c81
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 1 deletion.
214 changes: 214 additions & 0 deletions quickwit/quickwit-common/src/tower/circuit_breaker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use coarsetime::Instant;
use pin_project::pin_project;
use tower::{Layer, Service};

/// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html).
///
/// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain threshold
/// within a certain time window, it will "open" the circuit.
///
/// Requests will then be rejected for a given timeout.
/// After this timeout, the circuit breaker ends up in a HalfOpen state. It will allow a single request to pass through.
/// Depending on the result of this request, the circuit breaker will either close the circuit again or open it again.
///
/// Implementation detail:
///
/// The circuit breaker does not attempt to measure accurately the error rate.
/// Instead, it counts errors, and check for the time window in which these errors occurred.
/// This approach is accurate enough, robust, very easy to code and avoids calling the `Instant::now()`
/// at every error in the open state.
///
/// Similarly all of the data is stored in Atomics and using the Relaxed state.
/// The loose nature of circuit breaker does not require a stricted ordering.
#[derive(Debug, Clone, Copy)]
pub struct CircuitBreakerLayer {
pub max_error_count_per_time_window: u64,
pub time_window: Duration,
pub timeout: Duration,
}

impl CircuitBreakerLayer {
/// Creates a new `LoadShedLayer` allowing at most `max_in_flight_requests` in-flight requests
/// before rejecting new incoming requests.
pub fn with_max_error_per_secs(max_num_errors_per_secs: u64, timeout: Duration) -> CircuitBreakerLayer {
CircuitBreakerLayer {
max_error_count_per_time_window: max_num_errors_per_secs,
time_window: Duration::from_secs(1),
timeout,
}
}
}

pub trait MakeCircuitBreakerError {
fn make_circuit_break_error() -> Self;
}

impl<S> Layer<S> for CircuitBreakerLayer {
type Service = CircuitBreaker<S>;

fn layer(&self, service: S) -> CircuitBreaker<S> {
CircuitBreaker {
underlying: service,
circuit_breaker_inner: Arc::new(CircuitBreakerInner {
max_error_count_per_time_window: self.max_error_count_per_time_window,
time_window: coarsetime::Duration::from_millis(self.time_window.as_millis() as u64),
timeout: coarsetime::Duration::from_millis(self.timeout.as_millis() as u64),
closed_until_ticks: AtomicU64::new(0u64),
error_counter: AtomicU64::new(0u64),
error_window_end_ticks: AtomicU64::new(coarsetime::Instant::now().as_ticks()),
}),
}
}
}


struct CircuitBreakerInner {
max_error_count_per_time_window: u64,
time_window: coarsetime::Duration,
timeout: coarsetime::Duration,

closed_until_ticks: AtomicU64,

error_counter: AtomicU64,
error_window_end_ticks: AtomicU64,
}

impl CircuitBreakerInner {
fn get_state(&self) -> CircuitBreakerState {
let closed_until_ticks: u64 = self.closed_until_ticks.load(Ordering::Relaxed);
if closed_until_ticks == 0u64 {
return CircuitBreakerState::Open;
}
// This could be a call to `Instant::recent()` if we had an updating task.
let now = coarsetime::Instant::now();
if now.as_ticks() < closed_until_ticks {
CircuitBreakerState::HalfOpen
} else {
CircuitBreakerState::Closed
}
}

fn set_state_as_closed(&self) {
self.closed_until_ticks.store(0u64, Ordering::Relaxed)
}

fn set_state_as_open(&self) {
let now = Instant::now();
let closed_until_ticks = (now + coarsetime::Duration::from_millis(self.timeout.as_millis() as u64)).as_ticks();
self.closed_until_ticks.fetch_max(closed_until_ticks, Ordering::Relaxed);
}
}

#[derive(Clone)]
pub struct CircuitBreaker<S> {
underlying: S,
circuit_breaker_inner: Arc<CircuitBreakerInner>,
}


#[derive(Debug, Clone, Copy)]
enum CircuitBreakerState {
Open,
HalfOpen,
Closed,
}

impl<S, R> Service<R> for CircuitBreaker<S>
where
S: Service<R>,
S::Error: MakeCircuitBreakerError,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let state = self.circuit_breaker_inner.get_state();
match state {
CircuitBreakerState::Open | CircuitBreakerState::HalfOpen => {
self.underlying.poll_ready(cx)
}
CircuitBreakerState::Closed => {
Poll::Ready(Err(S::Error::make_circuit_break_error()))
}
}
}

fn call(&mut self, request: R) -> Self::Future {
self.underlying.call(request)
}
}


#[pin_project]
struct CircuitBreakerFuture<F> {
#[pin]
underlying_fut: F,
circuit_breaker_inner: Arc<CircuitBreakerInner>,
state: CircuitBreakerState,
}

impl<O, E, F> Future for CircuitBreakerFuture<F>
where
F: Future<Output = Result<O, E>>
{
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = self.state;
let circuit_breaker_inner = self.circuit_breaker_inner.clone();
let poll_res = self.project().underlying_fut.poll(cx);
match poll_res {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
if result.is_err() {
match state {
CircuitBreakerState::Closed => {
let error_count = circuit_breaker_inner.error_counter.fetch_add(1u64, Ordering::Relaxed);
if error_count >= circuit_breaker_inner.max_error_count_per_time_window {
let error_window_end_ticks = circuit_breaker_inner.error_window_end_ticks.load(Ordering::Relaxed);
let now = coarsetime::Instant::now();
if error_window_end_ticks >= now.as_ticks() {
// We exhausted our error budget before the end of the window.
// We "open" the circuit breaker!
circuit_breaker_inner.set_state_as_open();
} else {
// We exceeded the error count, but the window was not elapsed.
// Let's reset both the window end and the counter.
circuit_breaker_inner.error_counter.store(1, Ordering::Relaxed);
let error_window_end = (now + circuit_breaker_inner.time_window).as_ticks();
circuit_breaker_inner.error_window_end_ticks.fetch_max(error_window_end, Ordering::Relaxed);
circuit_breaker_inner.error_counter.store(0u64, Ordering::Relaxed);
}
}
},
CircuitBreakerState::HalfOpen => {
circuit_breaker_inner.set_state_as_closed();
},
CircuitBreakerState::Open => {
unreachable!();
},
}
} else {
match state {
CircuitBreakerState::Closed => {},
CircuitBreakerState::HalfOpen => {
circuit_breaker_inner.set_state_as_closed();
},
CircuitBreakerState::Open => {
unreachable!();
},
}
}
Poll::Ready(result)
}
}
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod box_layer;
mod box_service;
mod buffer;
mod change;
mod circuit_breaker;
mod delay;
mod estimate_rate;
mod event_listener;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub use rate_estimator::{RateEstimator, SmaRateEstimator};
pub use rate_limit::{RateLimit, RateLimitLayer};
pub use retry::{RetryLayer, RetryPolicy};
pub use transport::{make_channel, warmup_channel, BalanceChannel};
pub use circuit_breaker::{CircuitBreaker, CircuitBreakerLayer, MakeCircuitBreakerError};

pub type BoxError = Box<dyn error::Error + Send + Sync + 'static>;

Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use bytes::Bytes;
use bytesize::ByteSize;
use quickwit_common::tower::MakeLoadShedError;
use quickwit_common::tower::{MakeCircuitBreakerError, MakeLoadShedError};

use self::ingester::{PersistFailureReason, ReplicateFailureReason};
use self::router::IngestFailureReason;
Expand Down Expand Up @@ -50,6 +50,12 @@ pub enum IngestV2Error {
Unavailable(String),
}

impl MakeCircuitBreakerError for IngestV2Error {
fn make_circuit_break_error() -> IngestV2Error {
IngestV2Error::TooManyRequests
}
}

impl From<quickwit_common::tower::TaskCancelled> for IngestV2Error {
fn from(task_cancelled: quickwit_common::tower::TaskCancelled) -> IngestV2Error {
IngestV2Error::Internal(task_cancelled.to_string())
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ fn ingester_service_layer_stack(
layer_stack
.stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())
.stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_persist_layer(quickwit_common::tower::CircuitBreakerLayer::with_max_error_per_secs(100, Duration::from_millis(500)))
.stack_open_replication_stream_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_init_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_retain_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
Expand Down

0 comments on commit 1a29c81

Please sign in to comment.