Skip to content

Commit

Permalink
stack: Make failfast::Gate general purpose (#2279)
Browse files Browse the repository at this point in the history
`failfast::Gate` is a stack middleware that supports changing its
readiness based on some external signal (the failfast state changing).
Nothing about the gate's behavior is specific to failfast, though; and
this type of gating is general useful on its own.

To setup further reuse, this change moves `stack::failfast::gate` to
`stack::gate`. The gate is now tested independently from failfast.
Failfast continues to maintain an (optional) `gate::Tx` so that it can
control a gate.
  • Loading branch information
olix0r committed Mar 1, 2023
1 parent a1a8b07 commit c63af32
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 168 deletions.
69 changes: 17 additions & 52 deletions linkerd/stack/src/failfast.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
//! A middleware that limits the amount of time the service may be not ready
//! before requests are failed.

mod gate;
#[cfg(test)]
mod test;

pub use self::gate::Gate;

use crate::layer;
use crate::{gate, layer};
use futures::{FutureExt, TryFuture};
use linkerd_error::Error;
use pin_project::pin_project;
use std::{
future::Future,
mem,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
};
use thiserror::Error;
use tokio::{
sync::Notify,
task,
time::{self, Duration, Instant, Sleep},
};
Expand All @@ -48,7 +40,7 @@ pub struct FailFast<S> {
timeout: Duration,
wait: Pin<Box<Sleep>>,
state: State<S>,
shared: Arc<Shared>,
gate: Option<gate::Tx>,
}

/// An error representing that an operation timed out.
Expand All @@ -65,12 +57,6 @@ enum State<S> {
Invalid,
}

#[derive(Debug)]
struct Shared {
notify: Notify,
in_failfast: AtomicBool,
}

#[pin_project(project = ResponseFutureProj)]
pub enum ResponseFuture<F> {
Inner(#[pin] F),
Expand All @@ -80,18 +66,9 @@ pub enum ResponseFuture<F> {
// === impl FailFast ===

impl<S> FailFast<S> {
/// Returns a layer for producing a `FailFast` without a paired [`Gate`].
/// Returns a layer for producing a `FailFast` without a paired [`gate::Gate`].
pub fn layer(timeout: Duration) -> impl layer::Layer<S, Service = Self> + Clone {
layer::mk(move |inner| {
Self::new(
timeout,
Arc::new(Shared {
notify: Notify::new(),
in_failfast: AtomicBool::new(false),
}),
inner,
)
})
layer::mk(move |inner| Self::new(timeout, None, inner))
}

/// Returns a layer for producing a `FailFast` pair wrapping an inner
Expand All @@ -104,30 +81,26 @@ impl<S> FailFast<S> {
pub fn layer_gated<L>(
timeout: Duration,
inner_layer: L,
) -> impl layer::Layer<S, Service = Gate<L::Service>> + Clone
) -> impl layer::Layer<S, Service = gate::Gate<L::Service>> + Clone
where
L: layer::Layer<Self> + Clone,
{
layer::mk(move |inner| {
let shared = Arc::new(Shared {
notify: Notify::new(),
in_failfast: AtomicBool::new(false),
});
let inner = Self::new(timeout, shared.clone(), inner);
let inner = inner_layer.layer(inner);
Gate::new(inner, shared)
let (tx, rx) = gate::channel();
let inner = inner_layer.layer(Self::new(timeout, Some(tx), inner));
gate::Gate::new(inner, rx)
})
}

fn new(timeout: Duration, shared: Arc<Shared>, inner: S) -> Self {
fn new(timeout: Duration, gate: Option<gate::Tx>, inner: S) -> Self {
Self {
timeout,
// The sleep is reset whenever the service becomes unavailable; this
// initial one will never actually be used, so it's okay to start it
// now.
wait: Box::pin(time::sleep(Duration::default())),
state: State::Open(inner),
shared,
gate,
}
}
}
Expand Down Expand Up @@ -195,15 +168,19 @@ where
}

warn!("Service entering failfast after {:?}", self.timeout);
self.shared.in_failfast.store(true, Ordering::Release);
if let Some(gate) = self.gate.as_ref() {
gate.shut();
}

let shared = self.shared.clone();
let gate = self.gate.clone();
self.state = State::FailFast(task::spawn(async move {
let res = inner.ready().await;
// Notify the paired `Gate` instances to begin
// advertising readiness so that the failfast
// service can advance.
shared.exit_failfast();
if let Some(gate) = gate {
gate.open();
}
match res {
Ok(_) => {
info!("Service has recovered");
Expand Down Expand Up @@ -285,15 +262,3 @@ where
}
}
}

// === impl Shared ===

impl Shared {
fn exit_failfast(&self) {
// The load part of this operation can be `Relaxed` because this task
// is the only place where the the value is ever set.
if self.in_failfast.swap(false, Ordering::Release) {
self.notify.notify_waiters();
}
}
}
107 changes: 0 additions & 107 deletions linkerd/stack/src/failfast/gate.rs

This file was deleted.

6 changes: 1 addition & 5 deletions linkerd/stack/src/failfast/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ async fn fails_fast() {

let max_unavailable = Duration::from_millis(100);
let (service, mut handle) = mock::pair::<(), ()>();
let shared = Arc::new(Shared {
notify: tokio::sync::Notify::new(),
in_failfast: AtomicBool::new(false),
});
let mut service = Spawn::new(FailFast::new(max_unavailable, shared, service));
let mut service = Spawn::new(FailFast::new(max_unavailable, None, service));

// The inner starts unavailable.
handle.allow(0);
Expand Down
Loading

0 comments on commit c63af32

Please sign in to comment.