From 71292ee68394c4e767c55f0ebd6c949de3f67936 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 31 Mar 2022 11:09:04 -0700 Subject: [PATCH] balance: remove `Pool` (#658) Per #456, there are a number of issues with the `balance::Pool` API that limit its usability, and it isn't widely used. In the discussion on that issue, we agreed that it should probably just be removed in 0.5 --- it can be replaced with something more useful later. This branch removes `balance::Pool`. CLoses #456. --- tower/src/balance/mod.rs | 21 +- tower/src/balance/p2c/service.rs | 4 - tower/src/balance/pool/mod.rs | 459 ------------------------------- tower/src/balance/pool/test.rs | 190 ------------- 4 files changed, 5 insertions(+), 669 deletions(-) delete mode 100644 tower/src/balance/pool/mod.rs delete mode 100644 tower/src/balance/pool/test.rs diff --git a/tower/src/balance/mod.rs b/tower/src/balance/mod.rs index 4e27f4ab3..7b4fc9c0e 100644 --- a/tower/src/balance/mod.rs +++ b/tower/src/balance/mod.rs @@ -5,21 +5,14 @@ //! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded //! service, even when spare capacity is available to handle that request elsewhere. //! -//! This module provides two pieces of middleware that helps with this type of load balancing: -//! -//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust -//! technique for spreading load across services with only inexact load measurements. Use this if -//! the set of available services is not within your control, and you simply want to spread load -//! among that set of services. +//! This module provides the [`p2c`] middleware, which implements the "[Power of +//! Two Random Choices]" algorithm. This is a simple but robust technique for +//! spreading load across services with only inexact load measurements. Use this +//! if the set of available services is not within your control, and you simply +//! want to spread load among that set of services. //! //! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf //! -//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall -//! current load by tracking successful and unsuccessful calls to [`poll_ready`], and uses an -//! exponentially weighted moving average to add (using [`MakeService`]) or remove (by dropping) -//! services in response to increases or decreases in load. Use this if you are able to -//! dynamically add more service endpoints to the system to handle added load. -//! //! # Examples //! //! ```rust @@ -52,10 +45,6 @@ //! } //! # } //! ``` -//! -//! [`MakeService`]: crate::MakeService -//! [`poll_ready`]: crate::Service::poll_ready pub mod error; pub mod p2c; -pub mod pool; diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index f85d2419c..48c6b45f1 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -213,10 +213,6 @@ where let (_, svc) = self.services.get_ready_index(index).expect("invalid index"); svc.load() } - - pub(crate) fn discover_mut(&mut self) -> &mut D { - &mut self.discover - } } impl Service for Balance diff --git a/tower/src/balance/pool/mod.rs b/tower/src/balance/pool/mod.rs deleted file mode 100644 index ab43936fe..000000000 --- a/tower/src/balance/pool/mod.rs +++ /dev/null @@ -1,459 +0,0 @@ -//! This module defines a load-balanced pool of services that adds new services when load is high. -//! -//! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned -//! to handle the current level of load. Specifically, every time `poll_ready` on the inner service -//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`] -//! considers it a 1. [`Pool`] then maintains an [exponential moving -//! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those -//! samples, which gives an estimate of how often the underlying service has been ready when it was -//! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see -//! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`]. -//! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or -//! more services, then the latest added service is removed. In either case, the load estimate is -//! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly -//! added or removed. -#![deny(missing_docs)] - -use super::p2c::Balance; -use crate::discover::Change; -use crate::load::Load; -use crate::make::MakeService; -use futures_core::{ready, Stream}; -use pin_project_lite::pin_project; -use slab::Slab; -use std::{ - fmt, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use tower_service::Service; - -#[cfg(test)] -mod test; - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -enum Level { - /// Load is low -- remove a service instance. - Low, - /// Load is normal -- keep the service set as it is. - Normal, - /// Load is high -- add another service instance. - High, -} - -pin_project! { - /// A wrapper around `MakeService` that discovers a new service when load is high, and removes a - /// service when load is low. See [`Pool`]. - pub struct PoolDiscoverer - where - MS: MakeService, - { - maker: MS, - #[pin] - making: Option, - target: Target, - load: Level, - services: Slab<()>, - died_tx: tokio::sync::mpsc::UnboundedSender, - #[pin] - died_rx: tokio::sync::mpsc::UnboundedReceiver, - limit: Option, - } -} - -impl fmt::Debug for PoolDiscoverer -where - MS: MakeService + fmt::Debug, - Target: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PoolDiscoverer") - .field("maker", &self.maker) - .field("making", &self.making.is_some()) - .field("target", &self.target) - .field("load", &self.load) - .field("services", &self.services) - .field("limit", &self.limit) - .finish() - } -} - -impl Stream for PoolDiscoverer -where - MS: MakeService, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone, -{ - type Item = Result>, MS::MakeError>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) { - this.services.remove(sid); - tracing::trace!( - pool.services = this.services.len(), - message = "removing dropped service" - ); - } - - if this.services.is_empty() && this.making.is_none() { - let _ = ready!(this.maker.poll_ready(cx))?; - tracing::trace!("construct initial pool connection"); - this.making - .set(Some(this.maker.make_service(this.target.clone()))); - } - - if let Level::High = this.load { - if this.making.is_none() { - if this - .limit - .map(|limit| this.services.len() >= limit) - .unwrap_or(false) - { - return Poll::Pending; - } - - tracing::trace!( - pool.services = this.services.len(), - message = "decided to add service to loaded pool" - ); - ready!(this.maker.poll_ready(cx))?; - tracing::trace!("making new service"); - // TODO: it'd be great if we could avoid the clone here and use, say, &Target - this.making - .set(Some(this.maker.make_service(this.target.clone()))); - } - } - - if let Some(fut) = this.making.as_mut().as_pin_mut() { - let svc = ready!(fut.poll(cx))?; - this.making.set(None); - - let id = this.services.insert(()); - let svc = DropNotifyService { - svc, - id, - notify: this.died_tx.clone(), - }; - tracing::trace!( - pool.services = this.services.len(), - message = "finished creating new service" - ); - *this.load = Level::Normal; - return Poll::Ready(Some(Ok(Change::Insert(id, svc)))); - } - - match this.load { - Level::High => { - unreachable!("found high load but no Service being made"); - } - Level::Normal => Poll::Pending, - Level::Low if this.services.len() == 1 => Poll::Pending, - Level::Low => { - *this.load = Level::Normal; - // NOTE: this is a little sad -- we'd prefer to kill short-living services - let rm = this.services.iter().next().unwrap().0; - // note that we _don't_ remove from self.services here - // that'll happen automatically on drop - tracing::trace!( - pool.services = this.services.len(), - message = "removing service for over-provisioned pool" - ); - Poll::Ready(Some(Ok(Change::Remove(rm)))) - } - } - } -} - -/// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is -/// loaded or not. See the [module-level documentation](self) and the builder's methods for -/// details. -/// -/// [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder -#[derive(Copy, Clone, Debug)] -pub struct Builder { - low: f64, - high: f64, - init: f64, - alpha: f64, - limit: Option, -} - -impl Default for Builder { - fn default() -> Self { - Builder { - init: 0.1, - low: 0.00001, - high: 0.2, - alpha: 0.03, - limit: None, - } - } -} - -impl Builder { - /// Create a new builder with default values for all load settings. - /// - /// If you just want to use the defaults, you can just use [`Pool::new`]. - pub fn new() -> Self { - Self::default() - } - - /// When the estimated load (see the [module-level docs](self)) drops below this - /// threshold, and there are at least two services active, a service is removed. - /// - /// The default value is 0.01. That is, when one in every 100 `poll_ready` calls return - /// `Pending`, then the underlying service is considered underutilized. - pub fn underutilized_below(&mut self, low: f64) -> &mut Self { - self.low = low; - self - } - - /// When the estimated load (see the [module-level docs](self)) exceeds this - /// threshold, and no service is currently in the process of being added, a new service is - /// scheduled to be added to the underlying [`Balance`]. - /// - /// The default value is 0.5. That is, when every other call to `poll_ready` returns - /// `Pending`, then the underlying service is considered highly loaded. - pub fn loaded_above(&mut self, high: f64) -> &mut Self { - self.high = high; - self - } - - /// The initial estimated load average. - /// - /// This is also the value that the estimated load will be reset to whenever a service is added - /// or removed. - /// - /// The default value is 0.1. - pub fn initial(&mut self, init: f64) -> &mut Self { - self.init = init; - self - } - - /// How aggressively the estimated load average is updated. - /// - /// This is the α parameter of the formula for the [exponential moving - /// average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average), and - /// dictates how quickly new samples of the current load affect the estimated load. If the - /// value is closer to 1, newer samples affect the load average a lot (when α is 1, the load - /// average is immediately set to the current load). If the value is closer to 0, newer samples - /// affect the load average very little at a time. - /// - /// The given value is clamped to `[0,1]`. - /// - /// The default value is 0.05, meaning, in very approximate terms, that each new load sample - /// affects the estimated load by 5%. - pub fn urgency(&mut self, alpha: f64) -> &mut Self { - self.alpha = alpha.max(0.0).min(1.0); - self - } - - /// The maximum number of backing `Service` instances to maintain. - /// - /// When the limit is reached, the load estimate is clamped to the high load threshhold, and no - /// new service is spawned. - /// - /// No maximum limit is imposed by default. - pub fn max_services(&mut self, limit: Option) -> &mut Self { - self.limit = limit; - self - } - - /// See [`Pool::new`]. - pub fn build( - &self, - make_service: MS, - target: Target, - ) -> Pool - where - MS: MakeService, - MS::Service: Load, - ::Metric: std::fmt::Debug, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone, - { - let (died_tx, died_rx) = tokio::sync::mpsc::unbounded_channel(); - let d = PoolDiscoverer { - maker: make_service, - making: None, - target, - load: Level::Normal, - services: Slab::new(), - died_tx, - died_rx, - limit: self.limit, - }; - - Pool { - balance: Balance::new(Box::pin(d)), - options: *self, - ewma: self.init, - } - } -} - -/// A dynamically sized, load-balanced pool of `Service` instances. -pub struct Pool -where - MS: MakeService, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone, -{ - // the Pin> here is needed since Balance requires the Service to be Unpin - balance: Balance>>, Request>, - options: Builder, - ewma: f64, -} - -impl fmt::Debug for Pool -where - MS: MakeService + fmt::Debug, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone + fmt::Debug, - MS::Service: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Pool") - .field("balance", &self.balance) - .field("options", &self.options) - .field("ewma", &self.ewma) - .finish() - } -} - -impl Pool -where - MS: MakeService, - MS::Service: Load, - ::Metric: std::fmt::Debug, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone, -{ - /// Construct a new dynamically sized `Pool`. - /// - /// If many calls to `poll_ready` return `Pending`, `new_service` is used to - /// construct another `Service` that is then added to the load-balanced pool. - /// If many calls to `poll_ready` succeed, the most recently added `Service` - /// is dropped from the pool. - pub fn new(make_service: MS, target: Target) -> Self { - Builder::new().build(make_service, target) - } -} - -type PinBalance = Balance>, Request>; - -impl Service for Pool -where - MS: MakeService, - MS::Service: Load, - ::Metric: std::fmt::Debug, - MS::MakeError: Into, - MS::Error: Into, - Target: Clone, -{ - type Response = , Req> as Service>::Response; - type Error = , Req> as Service>::Error; - type Future = , Req> as Service>::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if let Poll::Ready(()) = self.balance.poll_ready(cx)? { - // services was ready -- there are enough services - // update ewma with a 0 sample - self.ewma *= 1.0 - self.options.alpha; - - let discover = self.balance.discover_mut().as_mut().project(); - if self.ewma < self.options.low { - if *discover.load != Level::Low { - tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned"); - } - *discover.load = Level::Low; - - if discover.services.len() > 1 { - // reset EWMA so we don't immediately try to remove another service - self.ewma = self.options.init; - } - } else { - if *discover.load != Level::Normal { - tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned"); - } - *discover.load = Level::Normal; - } - - return Poll::Ready(Ok(())); - } - - let discover = self.balance.discover_mut().as_mut().project(); - if discover.making.is_none() { - // no services are ready -- we're overloaded - // update ewma with a 1 sample - self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma; - - if self.ewma > self.options.high { - if *discover.load != Level::High { - tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned"); - } - *discover.load = Level::High; - - // don't reset the EWMA -- in theory, poll_ready should now start returning - // `Ready`, so we won't try to launch another service immediately. - // we clamp it to high though in case the # of services is limited. - self.ewma = self.options.high; - - // we need to call balance again for PoolDiscover to realize - // it can make a new service - return self.balance.poll_ready(cx); - } else { - *discover.load = Level::Normal; - } - } - - Poll::Pending - } - - fn call(&mut self, req: Req) -> Self::Future { - self.balance.call(req) - } -} - -#[doc(hidden)] -#[derive(Debug)] -pub struct DropNotifyService { - svc: Svc, - id: usize, - notify: tokio::sync::mpsc::UnboundedSender, -} - -impl Drop for DropNotifyService { - fn drop(&mut self) { - let _ = self.notify.send(self.id).is_ok(); - } -} - -impl Load for DropNotifyService { - type Metric = Svc::Metric; - fn load(&self) -> Self::Metric { - self.svc.load() - } -} - -impl> Service for DropNotifyService { - type Response = Svc::Response; - type Future = Svc::Future; - type Error = Svc::Error; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.svc.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.svc.call(req) - } -} diff --git a/tower/src/balance/pool/test.rs b/tower/src/balance/pool/test.rs deleted file mode 100644 index 6861b25a9..000000000 --- a/tower/src/balance/pool/test.rs +++ /dev/null @@ -1,190 +0,0 @@ -use crate::load; -use futures_util::pin_mut; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; -use tower_test::{assert_request_eq, mock}; - -use super::*; - -#[tokio::test] -async fn basic() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let mut pool = mock::Spawn::new(Builder::new().build(mock, ())); - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // send a request to the one backing service - let mut fut = task::spawn(pool.call(())); - - assert_pending!(fut.poll()); - assert_request_eq!(svc1, ()).send_response("foobar"); - assert_eq!(assert_ready_ok!(fut.poll()), "foobar"); -} - -#[tokio::test] -async fn high_load() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so _any_ Pending will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .max_services(Some(2)) - .build(mock, ()); - let mut pool = mock::Spawn::new(pool); - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // make the one backing service not ready - let mut fut1 = task::spawn(pool.call(())); - - // if we poll_ready again, pool should notice that load is increasing - // since urgency == 1.0, it should immediately enter high load - assert_pending!(pool.poll_ready()); - // it should ask the maker for another service, so we give it one - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - - // the pool should now be ready again for one more request - assert_ready_ok!(pool.poll_ready()); - let mut fut2 = task::spawn(pool.call(())); - - assert_pending!(pool.poll_ready()); - - // the pool should _not_ try to add another service - // sicen we have max_services(2) - assert_pending!(handle.as_mut().poll_request()); - - // let see that each service got one request - assert_request_eq!(svc1, ()).send_response("foo"); - assert_request_eq!(svc2, ()).send_response("bar"); - assert_eq!(assert_ready_ok!(fut1.poll()), "foo"); - assert_eq!(assert_ready_ok!(fut2.poll()), "bar"); -} - -#[tokio::test] -async fn low_load() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so any event will change the service count - .build(mock, ()); - - let mut pool = mock::Spawn::new(pool); - - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // cycling a request should now work - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc1, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - // and pool should now not be ready (since svc1 isn't ready) - // it should immediately try to add another service - // which we give it - assert_pending!(pool.poll_ready()); - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - // pool is now ready - // which (because of urgency == 1.0) should immediately cause it to drop a service - // it'll drop svc1, so it'll still be ready - assert_ready_ok!(pool.poll_ready()); - // and even with another ready, it won't drop svc2 since its now the only service - assert_ready_ok!(pool.poll_ready()); - - // cycling a request should now work on svc2 - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - - // and again (still svc2) - svc2.allow(1); - assert_ready_ok!(pool.poll_ready()); - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); -} - -#[tokio::test] -async fn failing_service() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so _any_ Pending will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .build(mock, ()); - - let mut pool = mock::Spawn::new(pool); - - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // one request-response cycle - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc1, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - - // now make svc1 fail, so it has to be removed - svc1.send_error("ouch"); - // polling now should recognize the failed service, - // try to create a new one, and then realize the maker isn't ready - assert_pending!(pool.poll_ready()); - // then we release another service - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - - // the pool should now be ready again - assert_ready_ok!(pool.poll_ready()); - // and a cycle should work (and go through svc2) - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("bar"); - assert_eq!(assert_ready_ok!(fut.poll()), "bar"); -}