From 68ff2df66e10e69111ea33ecec1bdc7b66c4e6f2 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 03:42:07 +0000 Subject: [PATCH 01/17] balance: Specialize the balancer for P2C As described in #286, `Balance` had a few problems: - it is responsible for driving all inner services to readiness, making its `poll_ready` O(n) and not O(1); - the `choose` abstraction was a hinderance. If a round-robin balancer is needed it can be implemented separately without much duplicate code; and - endpoint errors were considered fatal to the balancer. This changes replaces `Balance` with `P2CBalance` and removes the `choose` module. Endpoint service failures now cause the service to be removed from the balancer gracefully. Endpoint selection is now effectively constant time, though it biases for availability in the case when random selection does not yield an available endpoint. `tower-test` had to be updated so that a mocked service could fail after advertising readiness. --- tower-balance/Cargo.toml | 2 + tower-balance/examples/demo.rs | 23 +- tower-balance/src/error.rs | 11 +- tower-balance/src/layer.rs | 49 ++++ tower-balance/src/lib.rs | 481 +++++++++++++++------------------ tower-balance/src/pool.rs | 67 ++--- tower-balance/src/test.rs | 240 ++++++++-------- tower-test/src/mock/mod.rs | 8 +- 8 files changed, 445 insertions(+), 436 deletions(-) create mode 100644 tower-balance/src/layer.rs diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index ebde3fcb8..48952ac97 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -29,6 +29,7 @@ log = "0.4.1" rand = "0.6.5" tokio-timer = "0.2.4" tower-discover = "0.1.0" +tower-layer = "0.1.0" tower-load = { version = "0.1.0", path = "../tower-load" } tower-service = "0.2.0" tower-util = "0.1.0" @@ -43,3 +44,4 @@ tokio-executor = "0.1.2" tower = { version = "0.1", path = "../tower" } tower-buffer = { version = "0.1", path = "../tower-buffer" } tower-limit = { version = "0.1", path = "../tower-limit" } +tower-test = { version = "0.1.0", path = "../tower-test" } diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index d338abefd..8e1caa1e2 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -14,7 +14,7 @@ use tower::{ use tower_balance as lb; use tower_load as load; -const REQUESTS: usize = 50_000; +const REQUESTS: usize = 100_000; const CONCURRENCY: usize = 500; const DEFAULT_RTT: Duration = Duration::from_millis(30); static ENDPOINT_CAPACITY: usize = CONCURRENCY; @@ -55,24 +55,19 @@ fn main() { let fut = future::lazy(move || { let decay = Duration::from_secs(10); let d = gen_disco(); - let pe = lb::Balance::p2c(load::PeakEwmaDiscover::new( + let pe = lb::P2CBalance::new(load::PeakEwmaDiscover::new( d, DEFAULT_RTT, decay, load::NoInstrument, )); - run("P2C+PeakEWMA", pe) + run("P2C+PeakEWMA...", pe) }); let fut = fut.then(move |_| { let d = gen_disco(); - let ll = lb::Balance::p2c(load::PendingRequestsDiscover::new(d, load::NoInstrument)); - run("P2C+LeastLoaded", ll) - }); - - let fut = fut.and_then(move |_| { - let rr = lb::Balance::round_robin(gen_disco()); - run("RoundRobin", rr) + let ll = lb::P2CBalance::new(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + run("P2C+LeastLoaded...", ll) }); rt.spawn(fut); @@ -133,18 +128,20 @@ fn gen_disco() -> impl Discover< ) } -fn run(name: &'static str, lb: lb::Balance) -> impl Future +fn run(name: &'static str, lb: lb::P2CBalance) -> impl Future where D: Discover + Send + 'static, D::Error: Into, D::Key: Send, - D::Service: Service + Send, + D::Service: Service + load::Load + Send, >::Future: Send, - C: lb::Choose + Send + 'static, + ::Metric: std::fmt::Debug, { println!("{}", name); let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64); + fn check>(_: &S) {} + check(&lb); let service = ConcurrencyLimit::new(lb, CONCURRENCY); let responses = service.call_all(requests).unordered(); diff --git a/tower-balance/src/error.rs b/tower-balance/src/error.rs index 9fa7959af..2836d4268 100644 --- a/tower-balance/src/error.rs +++ b/tower-balance/src/error.rs @@ -1,17 +1,20 @@ +//! Error types + use std::fmt; pub(crate) type Error = Box; +/// An error returned when the balancer's endpoint discovery stream fails. #[derive(Debug)] -pub struct Balance(pub(crate) Error); +pub struct Discover(pub(crate) Error); -impl fmt::Display for Balance { +impl fmt::Display for Discover { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "load balancing discover error: {}", self.0) + write!(f, "load balancer discovery error: {}", self.0) } } -impl std::error::Error for Balance { +impl std::error::Error for Discover { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { Some(&*self.0) } diff --git a/tower-balance/src/layer.rs b/tower-balance/src/layer.rs new file mode 100644 index 000000000..8cc9e4da6 --- /dev/null +++ b/tower-balance/src/layer.rs @@ -0,0 +1,49 @@ +use crate::P2CBalance; +use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng}; +use std::{fmt, marker::PhantomData}; +use tower_discover::Discover; +use tower_layer::Layer; + +/// Efficiently distributes requests across an arbitrary number of services +#[derive(Clone)] +pub struct P2CBalanceLayer { + rng: SmallRng, + _marker: PhantomData, +} + +impl P2CBalanceLayer { + /// Builds a balancer using the system entropy. + pub fn new() -> Self { + Self { + rng: SmallRng::from_entropy(), + _marker: PhantomData, + } + } + + /// Builds a balancer from the provided RNG. + /// + /// This may be preferrable when many balancers are initialized. + pub fn from_rng(rng: &mut R) -> Result { + let rng = SmallRng::from_rng(rng)?; + Ok(Self { + rng, + _marker: PhantomData, + }) + } +} + +impl Layer for P2CBalanceLayer { + type Service = P2CBalance; + + fn layer(&self, discover: D) -> Self::Service { + P2CBalance::from_rng(discover, self.rng.clone()) + } +} + +impl fmt::Debug for P2CBalanceLayer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("P2CBalanceLayer") + .field("rng", &self.rng) + .finish() + } +} diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index ff2d8478e..fdd41015b 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -1,337 +1,286 @@ +//! A load balancing middleware. + #![doc(html_root_url = "https://docs.rs/tower-balance/0.1.0")] +#![deny(missing_docs)] #![deny(rust_2018_idioms)] #![allow(elided_lifetimes_in_paths)] -#[cfg(test)] -extern crate quickcheck; - -use futures::{Async, Poll}; -use indexmap::IndexMap; -use log::{debug, trace}; -use rand::{rngs::SmallRng, SeedableRng}; -use std::fmt; -use tower_discover::Discover; -use tower_load::Load; -use tower_service::Service; +#![deny(warnings)] -pub mod choose; pub mod error; -pub mod future; +//pub mod future; +mod layer; pub mod pool; +pub use layer::P2CBalanceLayer; + #[cfg(test)] mod test; -pub use self::{choose::Choose, pool::Pool}; - -use self::{error::Error, future::ResponseFuture}; +use futures::{future, try_ready, Async, Future, Poll}; +use indexmap::IndexMap; +use log::{debug, info, trace}; +use rand::{rngs::SmallRng, FromEntropy}; +use std::cmp; +use tower_discover::{Change, Discover}; +use tower_load::Load; +use tower_service::Service; -/// Balances requests across a set of inner services. +/// Distributes requests across inner services using the [Power of Two Choices][p2c]. +/// +/// As described in the [Finagle Guide][finagle]: +/// +/// > The algorithm randomly picks two services from the set of ready endpoints and +/// > selects the least loaded of the two. By repeatedly using this strategy, we can +/// > expect a manageable upper bound on the maximum load of any server. +/// > +/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where +/// > `n` is the number of servers in the cluster. +/// +/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded +/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf #[derive(Debug)] -pub struct Balance { - /// Provides endpoints from service discovery. - discover: D, - - /// Determines which endpoint is ready to be used next. - choose: C, - - /// Holds an index into `ready`, indicating the service that has been chosen to - /// dispatch the next request. - chosen_ready_index: Option, +pub struct P2CBalance { + // XXX Pool requires direct access to this... Not ideal. + pub(crate) discover: D, - /// Holds an index into `ready`, indicating the service that dispatched the last - /// request. - dispatched_ready_index: Option, + endpoints: IndexMap, - /// Holds all possibly-available endpoints (i.e. from `discover`). - ready: IndexMap, + /// Holds an index into `endpoints`, indicating the service that has been + /// chosen to dispatch the next request. + ready_index: Option, - /// Newly-added endpoints that have not yet become ready. - not_ready: IndexMap, + rng: SmallRng, } -// ===== impl Balance ===== +// ===== impl P2CBalance ===== -impl Balance -where - D: Discover, - D::Service: Load, - ::Metric: PartialOrd + fmt::Debug, -{ - /// Chooses services using the [Power of Two Choices][p2c]. - /// - /// This configuration is prefered when a load metric is known. - /// - /// As described in the [Finagle Guide][finagle]: - /// - /// > The algorithm randomly picks two services from the set of ready endpoints and - /// > selects the least loaded of the two. By repeatedly using this strategy, we can - /// > expect a manageable upper bound on the maximum load of any server. - /// > - /// > The maximum load variance between any two servers is bound by `ln(ln(n))` where - /// > `n` is the number of servers in the cluster. - /// - /// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded - /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf - pub fn p2c(discover: D) -> Self { - Self::new(discover, choose::PowerOfTwoChoices::default()) +impl P2CBalance { + /// Initializes a P2C load balancer from the OS's entropy source. + pub fn new(discover: D) -> Self { + Self::from_rng(discover, SmallRng::from_entropy()) } /// Initializes a P2C load balancer from the provided randomization source. - /// - /// This may be preferable when an application instantiates many balancers. - pub fn p2c_with_rng(discover: D, rng: &mut R) -> Result { - let rng = SmallRng::from_rng(rng)?; - Ok(Self::new(discover, choose::PowerOfTwoChoices::new(rng))) - } -} - -impl Balance { - /// Attempts to choose services sequentially. - /// - /// This configuration is prefered when no load metric is known. - pub fn round_robin(discover: D) -> Self { - Self::new(discover, choose::RoundRobin::default()) - } -} - -impl Balance -where - D: Discover, - C: Choose, -{ - /// Creates a new balancer. - pub fn new(discover: D, choose: C) -> Self { + pub fn from_rng(discover: D, rng: SmallRng) -> Self { Self { + rng, discover, - choose, - chosen_ready_index: None, - dispatched_ready_index: None, - ready: IndexMap::default(), - not_ready: IndexMap::default(), + ready_index: None, + endpoints: IndexMap::default(), } } - /// Returns true iff there are ready services. - /// - /// This is not authoritative and is only useful after `poll_ready` has been called. - pub fn is_ready(&self) -> bool { - !self.ready.is_empty() - } - - /// Returns true iff there are no ready services. - /// - /// This is not authoritative and is only useful after `poll_ready` has been called. - pub fn is_not_ready(&self) -> bool { - self.ready.is_empty() - } - - /// Counts the number of services considered to be ready. - /// - /// This is not authoritative and is only useful after `poll_ready` has been called. - pub fn num_ready(&self) -> usize { - self.ready.len() + /// Returns the number of endpoints currently tracked by the balancer. + pub fn len(&self) -> usize { + self.endpoints.len() } - /// Counts the number of services not considered to be ready. - /// - /// This is not authoritative and is only useful after `poll_ready` has been called. - pub fn num_not_ready(&self) -> usize { - self.not_ready.len() - } -} - -impl Balance -where - D: Discover, - D::Error: Into, - C: Choose, -{ /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn update_from_discover(&mut self) -> Result<(), error::Balance> { + fn poll_discover(&mut self) -> Poll<(), error::Discover> + where + D::Error: Into, + { debug!("updating from discover"); - use tower_discover::Change::*; - - while let Async::Ready(change) = - self.discover.poll().map_err(|e| error::Balance(e.into()))? - { - match change { - Insert(key, svc) => { - // If the `Insert`ed service is a duplicate of a service already - // in the ready list, remove the ready service first. The new - // service will then be inserted into the not-ready list. - self.ready.remove(&key); - self.not_ready.insert(key, svc); - } - - Remove(key) => { - let _ejected = match self.ready.remove(&key) { - None => self.not_ready.remove(&key), - Some(s) => Some(s), - }; - // XXX is it safe to just drop the Service? Or do we need some sort of - // graceful teardown? - // TODO: poll_close + loop { + match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { + Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), + Change::Remove(rm_key) => { + // Update the ready index to account for reordering of endpoints. + let orig_sz = self.endpoints.len(); + if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { + self.ready_index = match self.ready_index { + Some(i) => Self::repair_index(i, rm_idx, orig_sz), + None => None, + }; + } } } } - - Ok(()) } - /// Calls `poll_ready` on all services in `not_ready`. - /// - /// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted - /// into `ready`, potentially altering the order of `ready` and/or `not_ready`. - fn promote_to_ready(&mut self) -> Result<(), >::Error> - where - D::Service: Service, - { - let n = self.not_ready.len(); - if n == 0 { - trace!("promoting to ready: not_ready is empty, skipping."); - return Ok(()); - } - - debug!("promoting to ready: {}", n); - // Iterate through the not-ready endpoints from right to left to prevent removals - // from reordering services in a way that could prevent a service from being polled. - for idx in (0..n).rev() { - let is_ready = { - let (_, svc) = self - .not_ready - .get_index_mut(idx) - .expect("invalid not_ready index");; - svc.poll_ready()?.is_ready() - }; - trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready); - if is_ready { - debug!("not_ready[{:?}]: promoting to ready", idx); - let (key, svc) = self - .not_ready - .swap_remove_index(idx) - .expect("invalid not_ready index"); - self.ready.insert(key, svc); - } else { - debug!("not_ready[{:?}]: not promoting to ready", idx); - } - } - - debug!("promoting to ready: done"); - - Ok(()) + // Returns the updated index of `orig_idx` after the entry at `rm_idx` was + // swap-removed from an IndexMap with `orig_sz` items. + // + // If `orig_idx` is the same as `rm_idx`, None is returned to indicate that + // index cannot be repaired. + fn repair_index(orig_idx: usize, rm_idx: usize, orig_sz: usize) -> Option { + debug_assert!(orig_sz > orig_idx && orig_sz > rm_idx); + let repaired = match orig_idx { + i if i == rm_idx => None, // removed + i if i == orig_sz - 1 => Some(rm_idx), // swapped + i => Some(i), // uneffected + }; + trace!( + "repair_index: orig={}; rm={}; sz={}; => {:?}", + orig_idx, + rm_idx, + orig_sz, + repaired, + ); + repaired } - /// Polls a `ready` service or moves it to `not_ready`. + /// Performs P2C on inner services to find a suitable endpoint. /// - /// If the service exists in `ready` and does not poll as ready, it is moved to - /// `not_ready`, potentially altering the order of `ready` and/or `not_ready`. - fn poll_ready_index( - &mut self, - idx: usize, - ) -> Option>::Error>> + /// When this function returns Ready, `self.ready_index` is set with the + /// value of a suitable (ready endpoint). When + fn poll_ready_index(&mut self) -> Poll where - D::Service: Service, + D: Discover, + Svc: Service + Load, + Svc::Error: Into, + Svc::Metric: std::fmt::Debug, { - match self.ready.get_index_mut(idx) { - None => return None, - Some((_, svc)) => match svc.poll_ready() { - Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))), - Err(e) => return Some(Err(e)), - Ok(Async::NotReady) => {} - }, + match self.endpoints.len() { + 0 => Ok(Async::NotReady), + 1 => { + // If there's only one endpoint, ignore its but require that it + // is ready. + match self.poll_endpoint_index_load(0) { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + self.ready_index = Some(0); + Ok(Async::Ready(0)) + } + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(0); + Ok(Async::NotReady) + } + } + } + len => { + // Get two distinct random indexes (in a random order). Poll the + // service at each index. + // + // If either fails, the service is removed from the set of + // endpoints. + let idxs = rand::seq::index::sample(&mut self.rng, len, 2); + + let aidx = idxs.index(0); + let bidx = idxs.index(1); + + let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { + Ok(ready) => (ready, bidx), + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(aidx); + let new_bidx = Self::repair_index(bidx, aidx, len) + .expect("random indices must be distinct"); + (Async::NotReady, new_bidx) + } + }; + + let (bload, aidx) = match self.poll_endpoint_index_load(bidx) { + Ok(ready) => (ready, aidx), + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(bidx); + let new_aidx = Self::repair_index(aidx, bidx, len) + .expect("random indices must be distinct"); + (Async::NotReady, new_aidx) + } + }; + + trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload); + + let ready = match (aload, bload) { + (Async::Ready(aload), Async::Ready(bload)) => { + if aload <= bload { + Async::Ready(aidx) + } else { + Async::Ready(bidx) + } + } + (Async::Ready(_), Async::NotReady) => Async::Ready(aidx), + (Async::NotReady, Async::Ready(_)) => Async::Ready(bidx), + (Async::NotReady, Async::NotReady) => Async::NotReady, + }; + trace!(" -> ready={:?}", ready); + Ok(ready) + } } - - let (key, svc) = self - .ready - .swap_remove_index(idx) - .expect("invalid ready index"); - self.not_ready.insert(key, svc); - Some(Ok(Async::NotReady)) } - /// Chooses the next service to which a request will be dispatched. - /// - /// Ensures that . - fn choose_and_poll_ready( + /// Accesses an endpoint by index and, if it is ready, returns its current load. + fn poll_endpoint_index_load( &mut self, - ) -> Poll<(), >::Error> + index: usize, + ) -> Poll where - D::Service: Service, + D: Discover, + Svc: Service + Load, + Svc::Error: Into, { - loop { - let n = self.ready.len(); - debug!("choosing from {} replicas", n); - let idx = match n { - 0 => return Ok(Async::NotReady), - 1 => 0, - _ => { - let replicas = choose::replicas(&self.ready).expect("too few replicas"); - self.choose.choose(replicas) - } - }; - - // XXX Should we handle per-endpoint errors? - if self - .poll_ready_index(idx) - .expect("invalid ready index")? - .is_ready() - { - self.chosen_ready_index = Some(idx); - return Ok(Async::Ready(())); - } - } + let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index"); + try_ready!(svc.poll_ready()); + Ok(Async::Ready(svc.load())) } } -impl Service for Balance +impl Service for P2CBalance where D: Discover, - D::Error: Into, - Svc: Service, - Svc::Error: Into, - C: Choose, + D::Error: Into, + Svc: Service + Load, + Svc::Error: Into, + Svc::Metric: std::fmt::Debug, { - type Response = Svc::Response; - type Error = Error; - type Future = ResponseFuture; + type Response = >::Response; + type Error = error::Error; + type Future = + future::MapErr<>::Future, fn(Svc::Error) -> error::Error>; /// Prepares the balancer to process a request. /// - /// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index + /// When `Async::Ready` is returned, `chosen` is set with a valid index /// into `ready` referring to a `Service` that is ready to disptach a request. fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // Clear before `ready` is altered. - self.chosen_ready_index = None; + // First and foremost, process discovery updates. This removes or updates a + // previously-selected `ready_index` if appropriate. + self.poll_discover()?; + + if let Some(index) = self.ready_index { + debug_assert!(!self.endpoints.is_empty()); + // Ensure the selected endpoint is still ready. + match self.poll_endpoint_index_load(index) { + Ok(Async::Ready(_)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {} + Err(e) => { + drop(self.endpoints.swap_remove_index(index)); + info!("evicting failed endpoint: {}", e.into()); + } + } - // Before `ready` is altered, check the readiness of the last-used service, moving it - // to `not_ready` if appropriate. - if let Some(idx) = self.dispatched_ready_index.take() { - // XXX Should we handle per-endpoint errors? - self.poll_ready_index(idx) - .expect("invalid dispatched ready key") - .map_err(Into::into)?; + self.ready_index = None; } - // Update `not_ready` and `ready`. - self.update_from_discover()?; - self.promote_to_ready().map_err(Into::into)?; + let tries = match self.endpoints.len() { + 0 => return Ok(Async::NotReady), + n => cmp::max(1, n / 2), + }; + for _ in 0..tries { + if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? { + trace!("ready: {:?}", idx); + self.ready_index = Some(idx); + return Ok(Async::Ready(())); + } + } - // Choose the next service to be used by `call`. - self.choose_and_poll_ready().map_err(Into::into) + trace!("exhausted {} attempts", tries); + Ok(Async::NotReady) } fn call(&mut self, request: Request) -> Self::Future { - let idx = self.chosen_ready_index.take().expect("not ready"); + let index = self.ready_index.take().expect("not ready"); let (_, svc) = self - .ready - .get_index_mut(idx) - .expect("invalid chosen ready index"); - self.dispatched_ready_index = Some(idx); + .endpoints + .get_index_mut(index) + .expect("invalid ready index"); - let rsp = svc.call(request); - ResponseFuture::new(rsp) + svc.call(request).map_err(Into::into) } } diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool.rs index c86f81125..a69c273f9 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool.rs @@ -14,13 +14,14 @@ //! added or removed. #![deny(missing_docs)] -use super::{Balance, Choose}; +use super::P2CBalance; use futures::{try_ready, Async, Future, Poll}; use tower_discover::{Change, Discover}; +use tower_load::Load; use tower_service::Service; use tower_util::MakeService; -enum Load { +enum Level { /// Load is low -- remove a service instance. Low, /// Load is normal -- keep the service set as it is. @@ -38,7 +39,7 @@ where maker: MS, making: Option, target: Target, - load: Load, + load: Level, services: usize, } @@ -59,7 +60,7 @@ where self.making = Some(self.maker.make_service(self.target.clone())); } - if let Load::High = self.load { + if let Level::High = self.load { if self.making.is_none() { try_ready!(self.maker.poll_ready()); // TODO: it'd be great if we could avoid the clone here and use, say, &Target @@ -70,7 +71,7 @@ where if let Some(mut fut) = self.making.take() { if let Async::Ready(s) = fut.poll()? { self.services += 1; - self.load = Load::Normal; + self.load = Level::Normal; return Ok(Async::Ready(Change::Insert(self.services, s))); } else { self.making = Some(fut); @@ -79,13 +80,13 @@ where } match self.load { - Load::High => { + Level::High => { unreachable!("found high load but no Service being made"); } - Load::Normal => Ok(Async::NotReady), - Load::Low if self.services == 1 => Ok(Async::NotReady), - Load::Low => { - self.load = Load::Normal; + Level::Normal => Ok(Async::NotReady), + Level::Low if self.services == 1 => Ok(Async::NotReady), + Level::Low => { + self.load = Level::Normal; let rm = self.services; self.services -= 1; Ok(Async::Ready(Change::Remove(rm))) @@ -177,29 +178,29 @@ impl Builder { } /// See [`Pool::new`]. - pub fn build( + pub fn build( &self, make_service: MS, target: Target, - choose: C, - ) -> Pool + ) -> Pool where MS: MakeService, + MS::Service: Load, + ::Metric: std::fmt::Debug, MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, - C: Choose, { let d = PoolDiscoverer { maker: make_service, making: None, target, - load: Load::Normal, + load: Level::Normal, services: 0, }; Pool { - balance: Balance::new(d, choose), + balance: P2CBalance::new(d), options: *self, ewma: self.init, } @@ -207,25 +208,26 @@ impl Builder { } /// A dynamically sized, load-balanced pool of `Service` instances. -pub struct Pool +pub struct Pool where MS: MakeService, MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, { - balance: Balance, C>, + balance: P2CBalance>, options: Builder, ewma: f64, } -impl Pool +impl Pool where MS: MakeService, + MS::Service: Load, + ::Metric: std::fmt::Debug, MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, - C: Choose, { /// Construct a new dynamically sized `Pool`. /// @@ -233,22 +235,23 @@ where /// `Service` that is then added to the load-balanced pool. If multiple services are available, /// `choose` is used to determine which one to use (just as in `Balance`). If many calls to /// `poll_ready` succeed, the most recently added `Service` is dropped from the pool. - pub fn new(make_service: MS, target: Target, choose: C) -> Self { - Builder::new().build(make_service, target, choose) + pub fn new(make_service: MS, target: Target) -> Self { + Builder::new().build(make_service, target) } } -impl Service for Pool +impl Service for Pool where MS: MakeService, + MS::Service: Load, + ::Metric: std::fmt::Debug, MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, - C: Choose, { - type Response = , C> as Service>::Response; - type Error = , C> as Service>::Error; - type Future = , C> as Service>::Future; + type Response = > as Service>::Response; + type Error = > as Service>::Error; + type Future = > as Service>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if let Async::Ready(()) = self.balance.poll_ready()? { @@ -257,14 +260,14 @@ where self.ewma = (1.0 - self.options.alpha) * self.ewma; if self.ewma < self.options.low { - self.balance.discover.load = Load::Low; + self.balance.discover.load = Level::Low; if self.balance.discover.services > 1 { // reset EWMA so we don't immediately try to remove another service self.ewma = self.options.init; } } else { - self.balance.discover.load = Load::Normal; + self.balance.discover.load = Level::Normal; } Ok(Async::Ready(())) @@ -274,12 +277,12 @@ where self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma; if self.ewma > self.options.high { - self.balance.discover.load = Load::High; + self.balance.discover.load = Level::High; // don't reset the EWMA -- in theory, poll_ready should now start returning // `Ready`, so we won't try to launch another service immediately. } else { - self.balance.discover.load = Load::Normal; + self.balance.discover.load = Level::Normal; } Ok(Async::NotReady) @@ -290,6 +293,6 @@ where } fn call(&mut self, req: Request) -> Self::Future { - Service::call(&mut self.balance, req) + self.balance.call(req) } } diff --git a/tower-balance/src/test.rs b/tower-balance/src/test.rs index 37fb5b761..a180eedc7 100644 --- a/tower-balance/src/test.rs +++ b/tower-balance/src/test.rs @@ -1,136 +1,142 @@ -use futures::{future, Async, Poll}; -use quickcheck::*; -use std::collections::VecDeque; -use tower_discover::Change; +use futures::{Async, Future}; +use tower_discover::ServiceList; +use tower_load as load; use tower_service::Service; +use tower_test::mock; use crate::*; -type Error = Box; +//type Error = Box; -struct ReluctantDisco(VecDeque>); - -struct ReluctantService { - polls_until_ready: usize, +macro_rules! assert_ready { + ($svc:expr) => {{ + assert_ready!($svc, "must be ready"); + }}; + ($svc:expr, $msg:expr) => {{ + assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg); + }}; } -impl Discover for ReluctantDisco { - type Key = usize; - type Service = ReluctantService; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let r = self - .0 - .pop_front() - .map(Async::Ready) - .unwrap_or(Async::NotReady); - debug!("polling disco: {:?}", r.is_ready()); - Ok(r) - } +macro_rules! assert_not_ready { + ($svc:expr) => {{ + assert_not_ready!($svc, "must not be ready"); + }}; + ($svc:expr, $msg:expr) => {{ + assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg); + }}; } -impl Service<()> for ReluctantService { - type Response = (); - type Error = Error; - type Future = future::FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.polls_until_ready == 0 { - return Ok(Async::Ready(())); - } - - self.polls_until_ready -= 1; - return Ok(Async::NotReady); - } - - fn call(&mut self, _: ()) -> Self::Future { - future::ok(()) - } +#[test] +fn empty() { + let empty: Vec, usize>> = vec![]; + let disco = ServiceList::new(empty); + let mut svc = P2CBalance::new(disco); + assert_not_ready!(svc); } -quickcheck! { - /// Creates a random number of services, each of which must be polled a random - /// number of times before becoming ready. As the balancer is polled, ensure that - /// it does not become ready prematurely and that services are promoted from - /// not_ready to ready. - fn poll_ready(service_tries: Vec) -> TestResult { - // Stores the number of pending services after each poll_ready call. - let mut pending_at = Vec::new(); - - let disco = { - let mut changes = VecDeque::new(); - for (i, n) in service_tries.iter().map(|n| *n).enumerate() { - for j in 0..n { - if j == pending_at.len() { - pending_at.push(1); - } else { - pending_at[j] += 1; - } - } - - let s = ReluctantService { polls_until_ready: n }; - changes.push_back(Change::Insert(i, s)); - } - ReluctantDisco(changes) - }; - pending_at.push(0); - - let mut balancer = Balance::new(disco, choose::RoundRobin::default()); - - let services = service_tries.len(); - let mut next_pos = 0; - for pending in pending_at.iter().map(|p| *p) { - assert!(pending <= services); - let ready = services - pending; - - match balancer.poll_ready() { - Err(_) => return TestResult::error("poll_ready failed"), - Ok(p) => { - if p.is_ready() != (ready > 0) { - return TestResult::failed(); - } - } - } - - if balancer.num_ready() != ready { - return TestResult::failed(); - } - - if balancer.num_not_ready() != pending { - return TestResult::failed(); - } +#[test] +fn single_endpoint() { + let (mock, mut handle) = mock::pair(); + let mock = load::Constant::new(mock, 0); + + let disco = ServiceList::new(vec![mock].into_iter()); + let mut svc = P2CBalance::new(disco); + + with_task(|| { + handle.allow(0); + assert_not_ready!(svc); + assert_eq!( + svc.endpoints.len(), + 1, + "balancer must have discovered endpoint" + ); + + handle.allow(1); + assert_ready!(svc); + + let fut = svc.call(()); + + let ((), rsp) = handle.next_request().unwrap(); + rsp.send_response(1); + + assert_eq!(fut.wait().expect("call must complete"), 1); + handle.allow(1); + assert_ready!(svc); + + handle.send_error("endpoint lost"); + assert_not_ready!(svc); + assert!( + svc.endpoints.is_empty(), + "balancer must drop failed endpoints" + ); + }); +} - if balancer.is_ready() != (ready > 0) { - return TestResult::failed(); - } - if balancer.is_not_ready() != (ready == 0) { - return TestResult::failed(); - } +#[test] +fn two_endpoints_with_equal_weight() { + let (mock_a, mut handle_a) = mock::pair(); + let (mock_b, mut handle_b) = mock::pair(); + let mock_a = load::Constant::new(mock_a, 1); + let mock_b = load::Constant::new(mock_b, 1); + + let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); + let mut svc = P2CBalance::new(disco); + + with_task(|| { + handle_a.allow(0); + handle_b.allow(0); + assert_not_ready!(svc); + assert_eq!( + svc.endpoints.len(), + 2, + "balancer must have discovered both endpoints" + ); + + handle_a.allow(1); + handle_b.allow(0); + assert_ready!(svc, "must be ready when one of two services is ready"); + { + let fut = svc.call(()); + let ((), rsp) = handle_a.next_request().unwrap(); + rsp.send_response("a"); + assert_eq!(fut.wait().expect("call must complete"), "a"); + } - if balancer.dispatched_ready_index.is_some() { - return TestResult::failed(); - } + handle_a.allow(0); + handle_b.allow(1); + assert_ready!(svc, "must be ready when both endpoints are ready"); + { + let fut = svc.call(()); + let ((), rsp) = handle_b.next_request().unwrap(); + rsp.send_response("b"); + assert_eq!(fut.wait().expect("call must complete"), "b"); + } - if ready == 0 { - if balancer.chosen_ready_index.is_some() { - return TestResult::failed(); - } - } else { - // Check that the round-robin chooser is doing its thing: - match balancer.chosen_ready_index { - None => return TestResult::failed(), - Some(idx) => { - if idx != next_pos { - return TestResult::failed(); - } - } + handle_a.allow(1); + handle_b.allow(1); + assert_ready!(svc, "must be ready when both endpoints are ready"); + { + let fut = svc.call(()); + for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] { + if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() { + tx.send_response(c); } - - next_pos = (next_pos + 1) % ready; } + fut.wait().expect("call must complete"); } - TestResult::passed() - } + handle_a.send_error("endpoint lost"); + handle_b.allow(1); + assert_ready!(svc, "must be ready after one endpoint is removed"); + assert_eq!( + svc.endpoints.len(), + 1, + "balancer must drop failed endpoints", + ); + }); +} + +fn with_task U, U>(f: F) -> U { + use futures::future::lazy; + lazy(|| Ok::<_, ()>(f())).wait().unwrap() } diff --git a/tower-test/src/mock/mod.rs b/tower-test/src/mock/mod.rs index e3e398896..97f6d7aef 100644 --- a/tower-test/src/mock/mod.rs +++ b/tower-test/src/mock/mod.rs @@ -93,14 +93,14 @@ impl Service for Mock { return Err(error::Closed::new().into()); } - if self.can_send { - return Ok(().into()); - } - if let Some(e) = state.err_with.take() { return Err(e); } + if self.can_send { + return Ok(().into()); + } + if state.rem > 0 { assert!(!state.tasks.contains_key(&self.id)); From cf5cc8cba78cd9394596ee21f76283c5e72bddb2 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 04:08:36 +0000 Subject: [PATCH 02/17] remove unneeded future impl --- tower-balance/src/future.rs | 23 ----------------------- tower-balance/src/lib.rs | 1 - 2 files changed, 24 deletions(-) delete mode 100644 tower-balance/src/future.rs diff --git a/tower-balance/src/future.rs b/tower-balance/src/future.rs deleted file mode 100644 index 58a76d082..000000000 --- a/tower-balance/src/future.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::error::Error; -use futures::{Future, Poll}; - -pub struct ResponseFuture(F); - -impl ResponseFuture { - pub(crate) fn new(future: F) -> ResponseFuture { - ResponseFuture(future) - } -} - -impl Future for ResponseFuture -where - F: Future, - F::Error: Into, -{ - type Item = F::Item; - type Error = Error; - - fn poll(&mut self) -> Poll { - self.0.poll().map_err(Into::into) - } -} diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index fdd41015b..810c12629 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -7,7 +7,6 @@ #![deny(warnings)] pub mod error; -//pub mod future; mod layer; pub mod pool; From 499ef37c425e920a9014900837a431f210f94d45 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 04:31:14 +0000 Subject: [PATCH 03/17] constructor naming... --- tower-balance/examples/demo.rs | 4 ++-- tower-balance/src/layer.rs | 2 +- tower-balance/src/lib.rs | 14 +++++++------- tower-balance/src/pool.rs | 2 +- tower-balance/src/test.rs | 6 +++--- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 8e1caa1e2..e3770d7c7 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -55,7 +55,7 @@ fn main() { let fut = future::lazy(move || { let decay = Duration::from_secs(10); let d = gen_disco(); - let pe = lb::P2CBalance::new(load::PeakEwmaDiscover::new( + let pe = lb::P2CBalance::from_entropy(load::PeakEwmaDiscover::new( d, DEFAULT_RTT, decay, @@ -66,7 +66,7 @@ fn main() { let fut = fut.then(move |_| { let d = gen_disco(); - let ll = lb::P2CBalance::new(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + let ll = lb::P2CBalance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); run("P2C+LeastLoaded...", ll) }); diff --git a/tower-balance/src/layer.rs b/tower-balance/src/layer.rs index 8cc9e4da6..b19e45e83 100644 --- a/tower-balance/src/layer.rs +++ b/tower-balance/src/layer.rs @@ -36,7 +36,7 @@ impl Layer for P2CBalanceLayer { type Service = P2CBalance; fn layer(&self, discover: D) -> Self::Service { - P2CBalance::from_rng(discover, self.rng.clone()) + P2CBalance::new(discover, self.rng.clone()) } } diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index 810c12629..ae27d55d4 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -54,21 +54,21 @@ pub struct P2CBalance { // ===== impl P2CBalance ===== impl P2CBalance { - /// Initializes a P2C load balancer from the OS's entropy source. - pub fn new(discover: D) -> Self { - Self::from_rng(discover, SmallRng::from_entropy()) - } - /// Initializes a P2C load balancer from the provided randomization source. - pub fn from_rng(discover: D, rng: SmallRng) -> Self { + pub fn new(discover: D, rng: SmallRng) -> Self { Self { rng, discover, - ready_index: None, endpoints: IndexMap::default(), + ready_index: None, } } + /// Initializes a P2C load balancer from the OS's entropy source. + pub fn from_entropy(discover: D) -> Self { + Self::new(discover, SmallRng::from_entropy()) + } + /// Returns the number of endpoints currently tracked by the balancer. pub fn len(&self) -> usize { self.endpoints.len() diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool.rs index a69c273f9..0756a1bce 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool.rs @@ -200,7 +200,7 @@ impl Builder { }; Pool { - balance: P2CBalance::new(d), + balance: P2CBalance::from_entropy(d), options: *self, ewma: self.init, } diff --git a/tower-balance/src/test.rs b/tower-balance/src/test.rs index a180eedc7..73d2ca737 100644 --- a/tower-balance/src/test.rs +++ b/tower-balance/src/test.rs @@ -30,7 +30,7 @@ macro_rules! assert_not_ready { fn empty() { let empty: Vec, usize>> = vec![]; let disco = ServiceList::new(empty); - let mut svc = P2CBalance::new(disco); + let mut svc = P2CBalance::from_entropy(disco); assert_not_ready!(svc); } @@ -40,7 +40,7 @@ fn single_endpoint() { let mock = load::Constant::new(mock, 0); let disco = ServiceList::new(vec![mock].into_iter()); - let mut svc = P2CBalance::new(disco); + let mut svc = P2CBalance::from_entropy(disco); with_task(|| { handle.allow(0); @@ -80,7 +80,7 @@ fn two_endpoints_with_equal_weight() { let mock_b = load::Constant::new(mock_b, 1); let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); - let mut svc = P2CBalance::new(disco); + let mut svc = P2CBalance::from_entropy(disco); with_task(|| { handle_a.allow(0); From de7813461ae80c3e00daaea3f4eab4a1cd506976 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 13:57:11 +0000 Subject: [PATCH 04/17] fmt --- tower-balance/examples/demo.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index e3770d7c7..dc76a17cd 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -66,7 +66,8 @@ fn main() { let fut = fut.then(move |_| { let d = gen_disco(); - let ll = lb::P2CBalance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + let ll = + lb::P2CBalance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); run("P2C+LeastLoaded...", ll) }); From 538e7378432925698c6ff78c0f68d6092e3a76a3 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 15:53:22 +0000 Subject: [PATCH 05/17] Introduce MakeP2CBalance The balance layer now operates over MakeP2CBalance, which composes over layers that produce Discover instances. --- tower-balance/src/layer.rs | 11 +++---- tower-balance/src/lib.rs | 2 ++ tower-balance/src/make.rs | 64 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) create mode 100644 tower-balance/src/make.rs diff --git a/tower-balance/src/layer.rs b/tower-balance/src/layer.rs index b19e45e83..a7c953826 100644 --- a/tower-balance/src/layer.rs +++ b/tower-balance/src/layer.rs @@ -1,7 +1,6 @@ -use crate::P2CBalance; +use crate::MakeP2CBalance; use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng}; use std::{fmt, marker::PhantomData}; -use tower_discover::Discover; use tower_layer::Layer; /// Efficiently distributes requests across an arbitrary number of services @@ -32,11 +31,11 @@ impl P2CBalanceLayer { } } -impl Layer for P2CBalanceLayer { - type Service = P2CBalance; +impl Layer for P2CBalanceLayer { + type Service = MakeP2CBalance; - fn layer(&self, discover: D) -> Self::Service { - P2CBalance::new(discover, self.rng.clone()) + fn layer(&self, make_discover: S) -> Self::Service { + MakeP2CBalance::new(make_discover, self.rng.clone()) } } diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index ae27d55d4..c86cac7b1 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -8,9 +8,11 @@ pub mod error; mod layer; +mod make; pub mod pool; pub use layer::P2CBalanceLayer; +pub use make::{MakeFuture, MakeP2CBalance}; #[cfg(test)] mod test; diff --git a/tower-balance/src/make.rs b/tower-balance/src/make.rs new file mode 100644 index 000000000..e5fdbab79 --- /dev/null +++ b/tower-balance/src/make.rs @@ -0,0 +1,64 @@ +use crate::P2CBalance; +use futures::{try_ready, Future, Poll}; +use rand::{rngs::SmallRng, FromEntropy}; +use tower_discover::Discover; +use tower_service::Service; + +/// Makes `P2CBalancers` given an inner service that makes `Discover`s. +pub struct MakeP2CBalance { + inner: S, + rng: SmallRng, +} + +/// Makes a balancer instance. +pub struct MakeFuture { + inner: F, + rng: SmallRng, +} + +impl MakeP2CBalance { + pub(crate) fn new(inner: S, rng: SmallRng) -> Self { + Self { inner, rng } + } + + /// Initializes a P2C load balancer from the OS's entropy source. + pub fn from_entropy(make_discover: S) -> Self { + Self::new(make_discover, SmallRng::from_entropy()) + } +} + +impl Service for MakeP2CBalance +where + S: Service, + S::Response: Discover, +{ + type Response = P2CBalance; + type Error = S::Error; + type Future = MakeFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, target: Target) -> Self::Future { + MakeFuture { + inner: self.inner.call(target), + rng: self.rng.clone(), + } + } +} + +impl Future for MakeFuture +where + F: Future, + F::Item: Discover, +{ + type Item = P2CBalance; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let inner = try_ready!(self.inner.poll()); + let svc = P2CBalance::new(inner, self.rng.clone()); + Ok(svc.into()) + } +} From ba8653a5fe846847da982376456ccc9b75b5f843 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 30 May 2019 19:37:53 +0000 Subject: [PATCH 06/17] Impl clone for MakeP2CBalance --- tower-balance/src/make.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tower-balance/src/make.rs b/tower-balance/src/make.rs index e5fdbab79..2ccbc1090 100644 --- a/tower-balance/src/make.rs +++ b/tower-balance/src/make.rs @@ -5,6 +5,7 @@ use tower_discover::Discover; use tower_service::Service; /// Makes `P2CBalancers` given an inner service that makes `Discover`s. +#[derive(Clone, Debug)] pub struct MakeP2CBalance { inner: S, rng: SmallRng, From a998084d340cbda3d19e59cf1116c96ab3d9d051 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 01:14:32 +0000 Subject: [PATCH 07/17] remove commented code --- tower-balance/src/test.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tower-balance/src/test.rs b/tower-balance/src/test.rs index 73d2ca737..1066b2cc0 100644 --- a/tower-balance/src/test.rs +++ b/tower-balance/src/test.rs @@ -6,8 +6,6 @@ use tower_test::mock; use crate::*; -//type Error = Box; - macro_rules! assert_ready { ($svc:expr) => {{ assert_ready!($svc, "must be ready"); From 5d00a447e59dfa7de0df42eed0e5648de1d1af79 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 01:17:42 +0000 Subject: [PATCH 08/17] Remove the choose module --- tower-balance/src/choose/mod.rs | 49 ----------- tower-balance/src/choose/p2c.rs | 108 ------------------------ tower-balance/src/choose/round_robin.rs | 23 ----- 3 files changed, 180 deletions(-) delete mode 100644 tower-balance/src/choose/mod.rs delete mode 100644 tower-balance/src/choose/p2c.rs delete mode 100644 tower-balance/src/choose/round_robin.rs diff --git a/tower-balance/src/choose/mod.rs b/tower-balance/src/choose/mod.rs deleted file mode 100644 index 7eda391ea..000000000 --- a/tower-balance/src/choose/mod.rs +++ /dev/null @@ -1,49 +0,0 @@ -use indexmap::IndexMap; - -mod p2c; -mod round_robin; - -pub use self::{p2c::PowerOfTwoChoices, round_robin::RoundRobin}; - -/// A strategy for choosing nodes. -// TODO hide `K` -pub trait Choose { - /// Returns the index of a replica to be used next. - /// - /// `replicas` cannot be empty, so this function must always return a valid index on - /// [0, replicas.len()-1]. - fn choose(&mut self, replicas: Replicas) -> usize; -} - -/// Creates a `Replicas` if there are two or more services. -/// -pub(crate) fn replicas(inner: &IndexMap) -> Result, TooFew> { - if inner.len() < 2 { - return Err(TooFew); - } - - Ok(Replicas(inner)) -} - -/// Indicates that there were not at least two services. -#[derive(Copy, Clone, Debug)] -pub struct TooFew; - -/// Holds two or more services. -// TODO hide `K` -pub struct Replicas<'a, K, S>(&'a IndexMap); - -impl Replicas<'_, K, S> { - pub fn len(&self) -> usize { - self.0.len() - } -} - -impl ::std::ops::Index for Replicas<'_, K, S> { - type Output = S; - - fn index(&self, idx: usize) -> &Self::Output { - let (_, service) = self.0.get_index(idx).expect("out of bounds"); - service - } -} diff --git a/tower-balance/src/choose/p2c.rs b/tower-balance/src/choose/p2c.rs deleted file mode 100644 index 89a26a14e..000000000 --- a/tower-balance/src/choose/p2c.rs +++ /dev/null @@ -1,108 +0,0 @@ -use log::trace; -use rand::{rngs::SmallRng, FromEntropy, Rng}; - -use crate::{ - choose::{Choose, Replicas}, - Load, -}; - -/// Chooses nodes using the [Power of Two Choices][p2c]. -/// -/// This is a load-aware strategy, so this may only be used to choose over services that -/// implement `Load`. -/// -/// As described in the [Finagle Guide][finagle]: -/// > The algorithm randomly picks two nodes from the set of ready endpoints and selects -/// > the least loaded of the two. By repeatedly using this strategy, we can expect a -/// > manageable upper bound on the maximum load of any server. -/// > -/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where `n` -/// > is the number of servers in the cluster. -/// -/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded -/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf -#[derive(Debug)] -pub struct PowerOfTwoChoices { - rng: SmallRng, -} - -// ==== impl PowerOfTwoChoices ==== - -impl Default for PowerOfTwoChoices { - fn default() -> Self { - Self::new(SmallRng::from_entropy()) - } -} - -impl PowerOfTwoChoices { - pub fn new(rng: SmallRng) -> Self { - Self { rng } - } - - /// Returns two random, distinct indices into `ready`. - fn random_pair(&mut self, len: usize) -> (usize, usize) { - debug_assert!(len >= 2); - - // Choose a random number on [0, len-1]. - let idx0 = self.rng.gen::() % len; - - let idx1 = { - // Choose a random number on [1, len-1]. - let delta = (self.rng.gen::() % (len - 1)) + 1; - // Add it to `idx0` and then mod on `len` to produce a value on - // [idx0+1, len-1] or [0, idx0-1]. - (idx0 + delta) % len - }; - - debug_assert!(idx0 != idx1, "random pair must be distinct"); - return (idx0, idx1); - } -} - -impl Choose for PowerOfTwoChoices -where - L: Load, - L::Metric: PartialOrd + ::std::fmt::Debug, -{ - /// Chooses two distinct nodes at random and compares their load. - /// - /// Returns the index of the lesser-loaded node. - fn choose(&mut self, replicas: Replicas) -> usize { - let (a, b) = self.random_pair(replicas.len()); - - let a_load = replicas[a].load(); - let b_load = replicas[b].load(); - trace!( - "choose node[{a}]={a_load:?} node[{b}]={b_load:?}", - a = a, - b = b, - a_load = a_load, - b_load = b_load - ); - if a_load <= b_load { - a - } else { - b - } - } -} - -#[cfg(test)] -mod tests { - use quickcheck::*; - - use super::*; - - quickcheck! { - fn distinct_random_pairs(n: usize) -> TestResult { - if n < 2 { - return TestResult::discard(); - } - - let mut p2c = PowerOfTwoChoices::default(); - - let (a, b) = p2c.random_pair(n); - TestResult::from_bool(a != b) - } - } -} diff --git a/tower-balance/src/choose/round_robin.rs b/tower-balance/src/choose/round_robin.rs deleted file mode 100644 index 6eeef670d..000000000 --- a/tower-balance/src/choose/round_robin.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::choose::{Choose, Replicas}; - -/// Chooses nodes sequentially. -/// -/// This strategy is load-agnostic and may therefore be used to choose over any type of -/// service. -/// -/// Note that ordering is not strictly enforced, especially when services are removed by -/// the balancer. -#[derive(Debug, Default)] -pub struct RoundRobin { - /// References the index of the next node to be used. - pos: usize, -} - -impl Choose for RoundRobin { - fn choose(&mut self, nodes: Replicas) -> usize { - let len = nodes.len(); - let idx = self.pos % len; - self.pos = (idx + 1) % len; - idx - } -} From d034739c6129fa6778219cd55f9569defad9cbc1 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 02:29:08 +0000 Subject: [PATCH 09/17] Move P2C into a submodule --- tower-balance/examples/demo.rs | 6 +- tower-balance/src/lib.rs | 280 +-------------------------- tower-balance/src/{ => p2c}/layer.rs | 16 +- tower-balance/src/{ => p2c}/make.rs | 16 +- tower-balance/src/p2c/mod.rs | 12 ++ tower-balance/src/p2c/service.rs | 273 ++++++++++++++++++++++++++ tower-balance/src/{ => p2c}/test.rs | 16 +- tower-balance/src/pool.rs | 37 ++-- 8 files changed, 333 insertions(+), 323 deletions(-) rename tower-balance/src/{ => p2c}/layer.rs (75%) rename tower-balance/src/{ => p2c}/make.rs (77%) create mode 100644 tower-balance/src/p2c/mod.rs create mode 100644 tower-balance/src/p2c/service.rs rename tower-balance/src/{ => p2c}/test.rs (92%) diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index dc76a17cd..b1b473802 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -55,7 +55,7 @@ fn main() { let fut = future::lazy(move || { let decay = Duration::from_secs(10); let d = gen_disco(); - let pe = lb::P2CBalance::from_entropy(load::PeakEwmaDiscover::new( + let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new( d, DEFAULT_RTT, decay, @@ -67,7 +67,7 @@ fn main() { let fut = fut.then(move |_| { let d = gen_disco(); let ll = - lb::P2CBalance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); run("P2C+LeastLoaded...", ll) }); @@ -129,7 +129,7 @@ fn gen_disco() -> impl Discover< ) } -fn run(name: &'static str, lb: lb::P2CBalance) -> impl Future +fn run(name: &'static str, lb: lb::p2c::Balance) -> impl Future where D: Discover + Send + 'static, D::Error: Into, diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index c86cac7b1..131004d75 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -1,4 +1,4 @@ -//! A load balancing middleware. +//! Load balancing middlewares. #![doc(html_root_url = "https://docs.rs/tower-balance/0.1.0")] #![deny(missing_docs)] @@ -7,281 +7,5 @@ #![deny(warnings)] pub mod error; -mod layer; -mod make; +pub mod p2c; pub mod pool; - -pub use layer::P2CBalanceLayer; -pub use make::{MakeFuture, MakeP2CBalance}; - -#[cfg(test)] -mod test; - -use futures::{future, try_ready, Async, Future, Poll}; -use indexmap::IndexMap; -use log::{debug, info, trace}; -use rand::{rngs::SmallRng, FromEntropy}; -use std::cmp; -use tower_discover::{Change, Discover}; -use tower_load::Load; -use tower_service::Service; - -/// Distributes requests across inner services using the [Power of Two Choices][p2c]. -/// -/// As described in the [Finagle Guide][finagle]: -/// -/// > The algorithm randomly picks two services from the set of ready endpoints and -/// > selects the least loaded of the two. By repeatedly using this strategy, we can -/// > expect a manageable upper bound on the maximum load of any server. -/// > -/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where -/// > `n` is the number of servers in the cluster. -/// -/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded -/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf -#[derive(Debug)] -pub struct P2CBalance { - // XXX Pool requires direct access to this... Not ideal. - pub(crate) discover: D, - - endpoints: IndexMap, - - /// Holds an index into `endpoints`, indicating the service that has been - /// chosen to dispatch the next request. - ready_index: Option, - - rng: SmallRng, -} - -// ===== impl P2CBalance ===== - -impl P2CBalance { - /// Initializes a P2C load balancer from the provided randomization source. - pub fn new(discover: D, rng: SmallRng) -> Self { - Self { - rng, - discover, - endpoints: IndexMap::default(), - ready_index: None, - } - } - - /// Initializes a P2C load balancer from the OS's entropy source. - pub fn from_entropy(discover: D) -> Self { - Self::new(discover, SmallRng::from_entropy()) - } - - /// Returns the number of endpoints currently tracked by the balancer. - pub fn len(&self) -> usize { - self.endpoints.len() - } - - /// Polls `discover` for updates, adding new items to `not_ready`. - /// - /// Removals may alter the order of either `ready` or `not_ready`. - fn poll_discover(&mut self) -> Poll<(), error::Discover> - where - D::Error: Into, - { - debug!("updating from discover"); - - loop { - match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { - Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), - Change::Remove(rm_key) => { - // Update the ready index to account for reordering of endpoints. - let orig_sz = self.endpoints.len(); - if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { - self.ready_index = match self.ready_index { - Some(i) => Self::repair_index(i, rm_idx, orig_sz), - None => None, - }; - } - } - } - } - } - - // Returns the updated index of `orig_idx` after the entry at `rm_idx` was - // swap-removed from an IndexMap with `orig_sz` items. - // - // If `orig_idx` is the same as `rm_idx`, None is returned to indicate that - // index cannot be repaired. - fn repair_index(orig_idx: usize, rm_idx: usize, orig_sz: usize) -> Option { - debug_assert!(orig_sz > orig_idx && orig_sz > rm_idx); - let repaired = match orig_idx { - i if i == rm_idx => None, // removed - i if i == orig_sz - 1 => Some(rm_idx), // swapped - i => Some(i), // uneffected - }; - trace!( - "repair_index: orig={}; rm={}; sz={}; => {:?}", - orig_idx, - rm_idx, - orig_sz, - repaired, - ); - repaired - } - - /// Performs P2C on inner services to find a suitable endpoint. - /// - /// When this function returns Ready, `self.ready_index` is set with the - /// value of a suitable (ready endpoint). When - fn poll_ready_index(&mut self) -> Poll - where - D: Discover, - Svc: Service + Load, - Svc::Error: Into, - Svc::Metric: std::fmt::Debug, - { - match self.endpoints.len() { - 0 => Ok(Async::NotReady), - 1 => { - // If there's only one endpoint, ignore its but require that it - // is ready. - match self.poll_endpoint_index_load(0) { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - self.ready_index = Some(0); - Ok(Async::Ready(0)) - } - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(0); - Ok(Async::NotReady) - } - } - } - len => { - // Get two distinct random indexes (in a random order). Poll the - // service at each index. - // - // If either fails, the service is removed from the set of - // endpoints. - let idxs = rand::seq::index::sample(&mut self.rng, len, 2); - - let aidx = idxs.index(0); - let bidx = idxs.index(1); - - let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { - Ok(ready) => (ready, bidx), - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(aidx); - let new_bidx = Self::repair_index(bidx, aidx, len) - .expect("random indices must be distinct"); - (Async::NotReady, new_bidx) - } - }; - - let (bload, aidx) = match self.poll_endpoint_index_load(bidx) { - Ok(ready) => (ready, aidx), - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(bidx); - let new_aidx = Self::repair_index(aidx, bidx, len) - .expect("random indices must be distinct"); - (Async::NotReady, new_aidx) - } - }; - - trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload); - - let ready = match (aload, bload) { - (Async::Ready(aload), Async::Ready(bload)) => { - if aload <= bload { - Async::Ready(aidx) - } else { - Async::Ready(bidx) - } - } - (Async::Ready(_), Async::NotReady) => Async::Ready(aidx), - (Async::NotReady, Async::Ready(_)) => Async::Ready(bidx), - (Async::NotReady, Async::NotReady) => Async::NotReady, - }; - trace!(" -> ready={:?}", ready); - Ok(ready) - } - } - } - - /// Accesses an endpoint by index and, if it is ready, returns its current load. - fn poll_endpoint_index_load( - &mut self, - index: usize, - ) -> Poll - where - D: Discover, - Svc: Service + Load, - Svc::Error: Into, - { - let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index"); - try_ready!(svc.poll_ready()); - Ok(Async::Ready(svc.load())) - } -} - -impl Service for P2CBalance -where - D: Discover, - D::Error: Into, - Svc: Service + Load, - Svc::Error: Into, - Svc::Metric: std::fmt::Debug, -{ - type Response = >::Response; - type Error = error::Error; - type Future = - future::MapErr<>::Future, fn(Svc::Error) -> error::Error>; - - /// Prepares the balancer to process a request. - /// - /// When `Async::Ready` is returned, `chosen` is set with a valid index - /// into `ready` referring to a `Service` that is ready to disptach a request. - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // First and foremost, process discovery updates. This removes or updates a - // previously-selected `ready_index` if appropriate. - self.poll_discover()?; - - if let Some(index) = self.ready_index { - debug_assert!(!self.endpoints.is_empty()); - // Ensure the selected endpoint is still ready. - match self.poll_endpoint_index_load(index) { - Ok(Async::Ready(_)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => {} - Err(e) => { - drop(self.endpoints.swap_remove_index(index)); - info!("evicting failed endpoint: {}", e.into()); - } - } - - self.ready_index = None; - } - - let tries = match self.endpoints.len() { - 0 => return Ok(Async::NotReady), - n => cmp::max(1, n / 2), - }; - for _ in 0..tries { - if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? { - trace!("ready: {:?}", idx); - self.ready_index = Some(idx); - return Ok(Async::Ready(())); - } - } - - trace!("exhausted {} attempts", tries); - Ok(Async::NotReady) - } - - fn call(&mut self, request: Request) -> Self::Future { - let index = self.ready_index.take().expect("not ready"); - let (_, svc) = self - .endpoints - .get_index_mut(index) - .expect("invalid ready index"); - - svc.call(request).map_err(Into::into) - } -} diff --git a/tower-balance/src/layer.rs b/tower-balance/src/p2c/layer.rs similarity index 75% rename from tower-balance/src/layer.rs rename to tower-balance/src/p2c/layer.rs index a7c953826..ebd43d678 100644 --- a/tower-balance/src/layer.rs +++ b/tower-balance/src/p2c/layer.rs @@ -1,16 +1,16 @@ -use crate::MakeP2CBalance; +use super::BalanceMake; use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng}; use std::{fmt, marker::PhantomData}; use tower_layer::Layer; /// Efficiently distributes requests across an arbitrary number of services #[derive(Clone)] -pub struct P2CBalanceLayer { +pub struct BalanceLayer { rng: SmallRng, _marker: PhantomData, } -impl P2CBalanceLayer { +impl BalanceLayer { /// Builds a balancer using the system entropy. pub fn new() -> Self { Self { @@ -31,17 +31,17 @@ impl P2CBalanceLayer { } } -impl Layer for P2CBalanceLayer { - type Service = MakeP2CBalance; +impl Layer for BalanceLayer { + type Service = BalanceMake; fn layer(&self, make_discover: S) -> Self::Service { - MakeP2CBalance::new(make_discover, self.rng.clone()) + BalanceMake::new(make_discover, self.rng.clone()) } } -impl fmt::Debug for P2CBalanceLayer { +impl fmt::Debug for BalanceLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("P2CBalanceLayer") + f.debug_struct("BalanceLayer") .field("rng", &self.rng) .finish() } diff --git a/tower-balance/src/make.rs b/tower-balance/src/p2c/make.rs similarity index 77% rename from tower-balance/src/make.rs rename to tower-balance/src/p2c/make.rs index 2ccbc1090..cd6addd03 100644 --- a/tower-balance/src/make.rs +++ b/tower-balance/src/p2c/make.rs @@ -1,12 +1,12 @@ -use crate::P2CBalance; +use super::Balance; use futures::{try_ready, Future, Poll}; use rand::{rngs::SmallRng, FromEntropy}; use tower_discover::Discover; use tower_service::Service; -/// Makes `P2CBalancers` given an inner service that makes `Discover`s. +/// Makes `Balancer`s given an inner service that makes `Discover`s. #[derive(Clone, Debug)] -pub struct MakeP2CBalance { +pub struct BalanceMake { inner: S, rng: SmallRng, } @@ -17,7 +17,7 @@ pub struct MakeFuture { rng: SmallRng, } -impl MakeP2CBalance { +impl BalanceMake { pub(crate) fn new(inner: S, rng: SmallRng) -> Self { Self { inner, rng } } @@ -28,12 +28,12 @@ impl MakeP2CBalance { } } -impl Service for MakeP2CBalance +impl Service for BalanceMake where S: Service, S::Response: Discover, { - type Response = P2CBalance; + type Response = Balance; type Error = S::Error; type Future = MakeFuture; @@ -54,12 +54,12 @@ where F: Future, F::Item: Discover, { - type Item = P2CBalance; + type Item = Balance; type Error = F::Error; fn poll(&mut self) -> Poll { let inner = try_ready!(self.inner.poll()); - let svc = P2CBalance::new(inner, self.rng.clone()); + let svc = Balance::new(inner, self.rng.clone()); Ok(svc.into()) } } diff --git a/tower-balance/src/p2c/mod.rs b/tower-balance/src/p2c/mod.rs new file mode 100644 index 000000000..a97c2dc7b --- /dev/null +++ b/tower-balance/src/p2c/mod.rs @@ -0,0 +1,12 @@ +//! A Power-of-Two-Choices Load Balancer + +mod layer; +mod make; +mod service; + +#[cfg(test)] +mod test; + +pub use layer::BalanceLayer; +pub use make::{MakeFuture, BalanceMake}; +pub use service::Balance; diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs new file mode 100644 index 000000000..ffb22ab3f --- /dev/null +++ b/tower-balance/src/p2c/service.rs @@ -0,0 +1,273 @@ +use crate::error; +use futures::{future, try_ready, Async, Future, Poll}; +use indexmap::IndexMap; +use log::{debug, info, trace}; +use rand::{rngs::SmallRng, FromEntropy}; +use std::cmp; +use tower_discover::{Change, Discover}; +use tower_load::Load; +use tower_service::Service; + +/// Distributes requests across inner services using the [Power of Two Choices][p2c]. +/// +/// As described in the [Finagle Guide][finagle]: +/// +/// > The algorithm randomly picks two services from the set of ready endpoints and +/// > selects the least loaded of the two. By repeatedly using this strategy, we can +/// > expect a manageable upper bound on the maximum load of any server. +/// > +/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where +/// > `n` is the number of servers in the cluster. +/// +/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded +/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +#[derive(Debug)] +pub struct Balance { + discover: D, + + endpoints: IndexMap, + + /// Holds an index into `endpoints`, indicating the service that has been + /// chosen to dispatch the next request. + ready_index: Option, + + rng: SmallRng, +} + +// ===== impl P2CBalance ===== + +impl Balance { + /// Initializes a P2C load balancer from the provided randomization source. + pub fn new(discover: D, rng: SmallRng) -> Self { + Self { + rng, + discover, + endpoints: IndexMap::default(), + ready_index: None, + } + } + + /// Initializes a P2C load balancer from the OS's entropy source. + pub fn from_entropy(discover: D) -> Self { + Self::new(discover, SmallRng::from_entropy()) + } + + /// Returns the number of endpoints currently tracked by the balancer. + pub fn len(&self) -> usize { + self.endpoints.len() + } + + // XXX `pool::Pool` requires direct access to this... Not ideal. + pub(crate) fn discover_mut(&mut self) -> &mut D { + &mut self.discover + } + + /// Polls `discover` for updates, adding new items to `not_ready`. + /// + /// Removals may alter the order of either `ready` or `not_ready`. + fn poll_discover(&mut self) -> Poll<(), error::Discover> + where + D::Error: Into, + { + debug!("updating from discover"); + + loop { + match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { + Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), + Change::Remove(rm_key) => { + // Update the ready index to account for reordering of endpoints. + let orig_sz = self.endpoints.len(); + if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { + self.ready_index = match self.ready_index { + Some(i) => Self::repair_index(i, rm_idx, orig_sz), + None => None, + }; + } + } + } + } + } + + // Returns the updated index of `orig_idx` after the entry at `rm_idx` was + // swap-removed from an IndexMap with `orig_sz` items. + // + // If `orig_idx` is the same as `rm_idx`, None is returned to indicate that + // index cannot be repaired. + fn repair_index(orig_idx: usize, rm_idx: usize, orig_sz: usize) -> Option { + debug_assert!(orig_sz > orig_idx && orig_sz > rm_idx); + let repaired = match orig_idx { + i if i == rm_idx => None, // removed + i if i == orig_sz - 1 => Some(rm_idx), // swapped + i => Some(i), // uneffected + }; + trace!( + "repair_index: orig={}; rm={}; sz={}; => {:?}", + orig_idx, + rm_idx, + orig_sz, + repaired, + ); + repaired + } + + /// Performs P2C on inner services to find a suitable endpoint. + /// + /// When this function returns Ready, `self.ready_index` is set with the + /// value of a suitable (ready endpoint). When + fn poll_ready_index(&mut self) -> Poll + where + D: Discover, + Svc: Service + Load, + Svc::Error: Into, + Svc::Metric: std::fmt::Debug, + { + match self.endpoints.len() { + 0 => Ok(Async::NotReady), + 1 => { + // If there's only one endpoint, ignore its but require that it + // is ready. + match self.poll_endpoint_index_load(0) { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + self.ready_index = Some(0); + Ok(Async::Ready(0)) + } + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(0); + Ok(Async::NotReady) + } + } + } + len => { + // Get two distinct random indexes (in a random order). Poll the + // service at each index. + // + // If either fails, the service is removed from the set of + // endpoints. + let idxs = rand::seq::index::sample(&mut self.rng, len, 2); + + let aidx = idxs.index(0); + let bidx = idxs.index(1); + + let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { + Ok(ready) => (ready, bidx), + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(aidx); + let new_bidx = Self::repair_index(bidx, aidx, len) + .expect("random indices must be distinct"); + (Async::NotReady, new_bidx) + } + }; + + let (bload, aidx) = match self.poll_endpoint_index_load(bidx) { + Ok(ready) => (ready, aidx), + Err(e) => { + info!("evicting failed endpoint: {}", e.into()); + let _ = self.endpoints.swap_remove_index(bidx); + let new_aidx = Self::repair_index(aidx, bidx, len) + .expect("random indices must be distinct"); + (Async::NotReady, new_aidx) + } + }; + + trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload); + + let ready = match (aload, bload) { + (Async::Ready(aload), Async::Ready(bload)) => { + if aload <= bload { + Async::Ready(aidx) + } else { + Async::Ready(bidx) + } + } + (Async::Ready(_), Async::NotReady) => Async::Ready(aidx), + (Async::NotReady, Async::Ready(_)) => Async::Ready(bidx), + (Async::NotReady, Async::NotReady) => Async::NotReady, + }; + trace!(" -> ready={:?}", ready); + Ok(ready) + } + } + } + + /// Accesses an endpoint by index and, if it is ready, returns its current load. + fn poll_endpoint_index_load( + &mut self, + index: usize, + ) -> Poll + where + D: Discover, + Svc: Service + Load, + Svc::Error: Into, + { + let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index"); + try_ready!(svc.poll_ready()); + Ok(Async::Ready(svc.load())) + } +} + +impl Service for Balance +where + D: Discover, + D::Error: Into, + Svc: Service + Load, + Svc::Error: Into, + Svc::Metric: std::fmt::Debug, +{ + type Response = >::Response; + type Error = error::Error; + type Future = + future::MapErr<>::Future, fn(Svc::Error) -> error::Error>; + + /// Prepares the balancer to process a request. + /// + /// When `Async::Ready` is returned, `chosen` is set with a valid index + /// into `ready` referring to a `Service` that is ready to disptach a request. + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // First and foremost, process discovery updates. This removes or updates a + // previously-selected `ready_index` if appropriate. + self.poll_discover()?; + + if let Some(index) = self.ready_index { + debug_assert!(!self.endpoints.is_empty()); + // Ensure the selected endpoint is still ready. + match self.poll_endpoint_index_load(index) { + Ok(Async::Ready(_)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {} + Err(e) => { + drop(self.endpoints.swap_remove_index(index)); + info!("evicting failed endpoint: {}", e.into()); + } + } + + self.ready_index = None; + } + + let tries = match self.endpoints.len() { + 0 => return Ok(Async::NotReady), + n => cmp::max(1, n / 2), + }; + for _ in 0..tries { + if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? { + trace!("ready: {:?}", idx); + self.ready_index = Some(idx); + return Ok(Async::Ready(())); + } + } + + trace!("exhausted {} attempts", tries); + Ok(Async::NotReady) + } + + fn call(&mut self, request: Request) -> Self::Future { + let index = self.ready_index.take().expect("not ready"); + let (_, svc) = self + .endpoints + .get_index_mut(index) + .expect("invalid ready index"); + + svc.call(request).map_err(Into::into) + } +} diff --git a/tower-balance/src/test.rs b/tower-balance/src/p2c/test.rs similarity index 92% rename from tower-balance/src/test.rs rename to tower-balance/src/p2c/test.rs index 1066b2cc0..45eac1a88 100644 --- a/tower-balance/src/test.rs +++ b/tower-balance/src/p2c/test.rs @@ -4,7 +4,7 @@ use tower_load as load; use tower_service::Service; use tower_test::mock; -use crate::*; +use super::*; macro_rules! assert_ready { ($svc:expr) => {{ @@ -28,7 +28,7 @@ macro_rules! assert_not_ready { fn empty() { let empty: Vec, usize>> = vec![]; let disco = ServiceList::new(empty); - let mut svc = P2CBalance::from_entropy(disco); + let mut svc = Balance::from_entropy(disco); assert_not_ready!(svc); } @@ -38,13 +38,13 @@ fn single_endpoint() { let mock = load::Constant::new(mock, 0); let disco = ServiceList::new(vec![mock].into_iter()); - let mut svc = P2CBalance::from_entropy(disco); + let mut svc = Balance::from_entropy(disco); with_task(|| { handle.allow(0); assert_not_ready!(svc); assert_eq!( - svc.endpoints.len(), + svc.len(), 1, "balancer must have discovered endpoint" ); @@ -64,7 +64,7 @@ fn single_endpoint() { handle.send_error("endpoint lost"); assert_not_ready!(svc); assert!( - svc.endpoints.is_empty(), + svc.len() == 0, "balancer must drop failed endpoints" ); }); @@ -78,14 +78,14 @@ fn two_endpoints_with_equal_weight() { let mock_b = load::Constant::new(mock_b, 1); let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); - let mut svc = P2CBalance::from_entropy(disco); + let mut svc = Balance::from_entropy(disco); with_task(|| { handle_a.allow(0); handle_b.allow(0); assert_not_ready!(svc); assert_eq!( - svc.endpoints.len(), + svc.len(), 2, "balancer must have discovered both endpoints" ); @@ -127,7 +127,7 @@ fn two_endpoints_with_equal_weight() { handle_b.allow(1); assert_ready!(svc, "must be ready after one endpoint is removed"); assert_eq!( - svc.endpoints.len(), + svc.len(), 1, "balancer must drop failed endpoints", ); diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool.rs index 0756a1bce..e79bea323 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool.rs @@ -14,7 +14,7 @@ //! added or removed. #![deny(missing_docs)] -use super::P2CBalance; +use super::p2c::Balance; use futures::{try_ready, Async, Future, Poll}; use tower_discover::{Change, Discover}; use tower_load::Load; @@ -200,7 +200,7 @@ impl Builder { }; Pool { - balance: P2CBalance::from_entropy(d), + balance: Balance::from_entropy(d), options: *self, ewma: self.init, } @@ -215,7 +215,7 @@ where MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, { - balance: P2CBalance>, + balance: Balance>, options: Builder, ewma: f64, } @@ -249,9 +249,9 @@ where MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, { - type Response = > as Service>::Response; - type Error = > as Service>::Error; - type Future = > as Service>::Future; + type Response = > as Service>::Response; + type Error = > as Service>::Error; + type Future = > as Service>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if let Async::Ready(()) = self.balance.poll_ready()? { @@ -259,37 +259,38 @@ where // update ewma with a 0 sample self.ewma = (1.0 - self.options.alpha) * self.ewma; + let discover = self.balance.discover_mut(); if self.ewma < self.options.low { - self.balance.discover.load = Level::Low; + discover.load = Level::Low; - if self.balance.discover.services > 1 { + if discover.services > 1 { // reset EWMA so we don't immediately try to remove another service self.ewma = self.options.init; } } else { - self.balance.discover.load = Level::Normal; + discover.load = Level::Normal; } - Ok(Async::Ready(())) - } else if self.balance.discover.making.is_none() { + return Ok(Async::Ready(())) + } + + let discover = self.balance.discover_mut(); + if discover.making.is_none() { // no services are ready -- we're overloaded // update ewma with a 1 sample self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma; if self.ewma > self.options.high { - self.balance.discover.load = Level::High; + discover.load = Level::High; // don't reset the EWMA -- in theory, poll_ready should now start returning // `Ready`, so we won't try to launch another service immediately. } else { - self.balance.discover.load = Level::Normal; + discover.load = Level::Normal; } - - Ok(Async::NotReady) - } else { - // no services are ready, but we're already making another service! - Ok(Async::NotReady) } + + Ok(Async::NotReady) } fn call(&mut self, req: Request) -> Self::Future { From 703bd83cb3eb0b79788c82d347df978bdc7853df Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 02:50:52 +0000 Subject: [PATCH 10/17] Cleanup per review --- tower-balance/examples/demo.rs | 6 ++++-- tower-balance/src/p2c/mod.rs | 2 +- tower-balance/src/p2c/service.rs | 20 +++++++++++--------- tower-balance/src/p2c/test.rs | 23 ++++------------------- tower-balance/src/pool.rs | 10 +++++----- 5 files changed, 25 insertions(+), 36 deletions(-) diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index b1b473802..45ab78c6d 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -66,8 +66,10 @@ fn main() { let fut = fut.then(move |_| { let d = gen_disco(); - let ll = - lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument)); + let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new( + d, + load::NoInstrument, + )); run("P2C+LeastLoaded...", ll) }); diff --git a/tower-balance/src/p2c/mod.rs b/tower-balance/src/p2c/mod.rs index a97c2dc7b..cf42fd2b9 100644 --- a/tower-balance/src/p2c/mod.rs +++ b/tower-balance/src/p2c/mod.rs @@ -8,5 +8,5 @@ mod service; mod test; pub use layer::BalanceLayer; -pub use make::{MakeFuture, BalanceMake}; +pub use make::{BalanceMake, MakeFuture}; pub use service::Balance; diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index ffb22ab3f..4d119fcee 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -76,12 +76,10 @@ impl Balance { Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), Change::Remove(rm_key) => { // Update the ready index to account for reordering of endpoints. - let orig_sz = self.endpoints.len(); if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { - self.ready_index = match self.ready_index { - Some(i) => Self::repair_index(i, rm_idx, orig_sz), - None => None, - }; + self.ready_index = self + .ready_index + .and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len() + 1)); } } } @@ -112,8 +110,12 @@ impl Balance { /// Performs P2C on inner services to find a suitable endpoint. /// - /// When this function returns Ready, `self.ready_index` is set with the - /// value of a suitable (ready endpoint). When + /// When this function returns NotReady, `ready_index` is unset. When this + /// function returns Ready, `ready_index` is set with an index into + /// `endpoints` of a ready endpoint service. + /// + /// If `endpoints` is reordered due to removals, `ready_index` is updated via + /// `repair_index()`. fn poll_ready_index(&mut self) -> Poll where D: Discover, @@ -124,7 +126,7 @@ impl Balance { match self.endpoints.len() { 0 => Ok(Async::NotReady), 1 => { - // If there's only one endpoint, ignore its but require that it + // If there's only one endpoint, ignore its load but require that it // is ready. match self.poll_endpoint_index_load(0) { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -223,7 +225,7 @@ where /// Prepares the balancer to process a request. /// - /// When `Async::Ready` is returned, `chosen` is set with a valid index + /// When `Async::Ready` is returned, `ready_index` is set with a valid index /// into `ready` referring to a `Service` that is ready to disptach a request. fn poll_ready(&mut self) -> Poll<(), Self::Error> { // First and foremost, process discovery updates. This removes or updates a diff --git a/tower-balance/src/p2c/test.rs b/tower-balance/src/p2c/test.rs index 45eac1a88..5f86f5fe2 100644 --- a/tower-balance/src/p2c/test.rs +++ b/tower-balance/src/p2c/test.rs @@ -43,11 +43,7 @@ fn single_endpoint() { with_task(|| { handle.allow(0); assert_not_ready!(svc); - assert_eq!( - svc.len(), - 1, - "balancer must have discovered endpoint" - ); + assert_eq!(svc.len(), 1, "balancer must have discovered endpoint"); handle.allow(1); assert_ready!(svc); @@ -63,10 +59,7 @@ fn single_endpoint() { handle.send_error("endpoint lost"); assert_not_ready!(svc); - assert!( - svc.len() == 0, - "balancer must drop failed endpoints" - ); + assert!(svc.len() == 0, "balancer must drop failed endpoints"); }); } @@ -84,11 +77,7 @@ fn two_endpoints_with_equal_weight() { handle_a.allow(0); handle_b.allow(0); assert_not_ready!(svc); - assert_eq!( - svc.len(), - 2, - "balancer must have discovered both endpoints" - ); + assert_eq!(svc.len(), 2, "balancer must have discovered both endpoints"); handle_a.allow(1); handle_b.allow(0); @@ -126,11 +115,7 @@ fn two_endpoints_with_equal_weight() { handle_a.send_error("endpoint lost"); handle_b.allow(1); assert_ready!(svc, "must be ready after one endpoint is removed"); - assert_eq!( - svc.len(), - 1, - "balancer must drop failed endpoints", - ); + assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",); }); } diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool.rs index e79bea323..cfcba1d79 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool.rs @@ -231,10 +231,10 @@ where { /// Construct a new dynamically sized `Pool`. /// - /// If many calls to `poll_ready` return `NotReady`, `new_service` is used to construct another - /// `Service` that is then added to the load-balanced pool. If multiple services are available, - /// `choose` is used to determine which one to use (just as in `Balance`). If many calls to - /// `poll_ready` succeed, the most recently added `Service` is dropped from the pool. + /// If many calls to `poll_ready` return `NotReady`, `new_service` is used to + /// construct another `Service` that is then added to the load-balanced pool. + /// If many calls to `poll_ready` succeed, the most recently added `Service` + /// is dropped from the pool. pub fn new(make_service: MS, target: Target) -> Self { Builder::new().build(make_service, target) } @@ -271,7 +271,7 @@ where discover.load = Level::Normal; } - return Ok(Async::Ready(())) + return Ok(Async::Ready(())); } let discover = self.balance.discover_mut(); From 75cdf4b050f01ffd5ad303195d97b0f32107ba72 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 03:30:32 +0000 Subject: [PATCH 11/17] remove an unnecessary assertion --- tower-balance/examples/demo.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 45ab78c6d..076348eda 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -143,8 +143,6 @@ where println!("{}", name); let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64); - fn check>(_: &S) {} - check(&lb); let service = ConcurrencyLimit::new(lb, CONCURRENCY); let responses = service.call_all(requests).unordered(); From 335d29fd6ef2e08a9d90fe8da5a55b5087e16783 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 16:12:43 +0000 Subject: [PATCH 12/17] incorrect comment --- tower-balance/src/p2c/service.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 4d119fcee..26611e35e 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -34,8 +34,6 @@ pub struct Balance { rng: SmallRng, } -// ===== impl P2CBalance ===== - impl Balance { /// Initializes a P2C load balancer from the provided randomization source. pub fn new(discover: D, rng: SmallRng) -> Self { From 6d29026438b53c5554dfa636a01e8df31af02062 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 16:15:27 +0000 Subject: [PATCH 13/17] +debug_assert --- tower-balance/src/p2c/service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 26611e35e..2075494a8 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -149,6 +149,7 @@ impl Balance { let aidx = idxs.index(0); let bidx = idxs.index(1); + debug_assert_ne!(aidx, bidx, "random indices must be distinct"); let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { Ok(ready) => (ready, bidx), From 9ad8ce82b6ed06ac4f074a845f629d7cc4a13e24 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 16:16:36 +0000 Subject: [PATCH 14/17] +debug_assert --- tower-balance/src/p2c/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 2075494a8..32d602979 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -232,7 +232,7 @@ where self.poll_discover()?; if let Some(index) = self.ready_index { - debug_assert!(!self.endpoints.is_empty()); + debug_assert!(index < self.endpoints.len()); // Ensure the selected endpoint is still ready. match self.poll_endpoint_index_load(index) { Ok(Async::Ready(_)) => return Ok(Async::Ready(())), From a7c9b49d1580c066ce36ae515ec6df30347d08da Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 16:26:29 +0000 Subject: [PATCH 15/17] temp val --- tower-balance/src/p2c/service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 32d602979..5eb3e3a2b 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -75,9 +75,10 @@ impl Balance { Change::Remove(rm_key) => { // Update the ready index to account for reordering of endpoints. if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { + let orig_sz = self.endpoints.len() + 1; self.ready_index = self .ready_index - .and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len() + 1)); + .and_then(|i| Self::repair_index(i, rm_idx, orig_sz)); } } } From 7479265e57f6c3f28eedae3e6ca19a56cba7d158 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 16:35:50 +0000 Subject: [PATCH 16/17] clearer logic for determining number of tries --- tower-balance/src/p2c/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 5eb3e3a2b..e577b0369 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -3,7 +3,6 @@ use futures::{future, try_ready, Async, Future, Poll}; use indexmap::IndexMap; use log::{debug, info, trace}; use rand::{rngs::SmallRng, FromEntropy}; -use std::cmp; use tower_discover::{Change, Discover}; use tower_load::Load; use tower_service::Service; @@ -249,7 +248,8 @@ where let tries = match self.endpoints.len() { 0 => return Ok(Async::NotReady), - n => cmp::max(1, n / 2), + 1 => 1, + n => n / 2, }; for _ in 0..tries { if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? { From 264b2a8031c6858f4b0860600246ad23558806cd Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 31 May 2019 17:21:16 +0000 Subject: [PATCH 17/17] less brittle repair_index --- tower-balance/src/p2c/service.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index e577b0369..c0bd7095e 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -74,10 +74,9 @@ impl Balance { Change::Remove(rm_key) => { // Update the ready index to account for reordering of endpoints. if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { - let orig_sz = self.endpoints.len() + 1; self.ready_index = self .ready_index - .and_then(|i| Self::repair_index(i, rm_idx, orig_sz)); + .and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len())); } } } @@ -89,18 +88,18 @@ impl Balance { // // If `orig_idx` is the same as `rm_idx`, None is returned to indicate that // index cannot be repaired. - fn repair_index(orig_idx: usize, rm_idx: usize, orig_sz: usize) -> Option { - debug_assert!(orig_sz > orig_idx && orig_sz > rm_idx); + fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option { + debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz); let repaired = match orig_idx { - i if i == rm_idx => None, // removed - i if i == orig_sz - 1 => Some(rm_idx), // swapped - i => Some(i), // uneffected + i if i == rm_idx => None, // removed + i if i == new_sz => Some(rm_idx), // swapped + i => Some(i), // uneffected }; trace!( "repair_index: orig={}; rm={}; sz={}; => {:?}", orig_idx, rm_idx, - orig_sz, + new_sz, repaired, ); repaired @@ -156,7 +155,7 @@ impl Balance { Err(e) => { info!("evicting failed endpoint: {}", e.into()); let _ = self.endpoints.swap_remove_index(aidx); - let new_bidx = Self::repair_index(bidx, aidx, len) + let new_bidx = Self::repair_index(bidx, aidx, self.endpoints.len()) .expect("random indices must be distinct"); (Async::NotReady, new_bidx) } @@ -167,7 +166,7 @@ impl Balance { Err(e) => { info!("evicting failed endpoint: {}", e.into()); let _ = self.endpoints.swap_remove_index(bidx); - let new_aidx = Self::repair_index(aidx, bidx, len) + let new_aidx = Self::repair_index(aidx, bidx, self.endpoints.len()) .expect("random indices must be distinct"); (Async::NotReady, new_aidx) }