Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring cleaning for tower::balance #449

Merged
merged 5 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() {

let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
Expand All @@ -67,7 +67,7 @@ async fn main() {
run("P2C+PeakEWMA...", pe).await;

let d = gen_disco();
let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
Expand Down
4 changes: 2 additions & 2 deletions tower/src/balance/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Error types
//! Error types for the `tower::balance` middleware.

use std::fmt;

/// An error returned when the balancer's endpoint discovery stream fails.
/// The balancer's endpoint discovery stream failed.
#[derive(Debug)]
pub struct Discover(pub(crate) crate::BoxError);

Expand Down
55 changes: 54 additions & 1 deletion tower/src/balance/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,57 @@
//! Load balancing middlewares.
//! Middleware that allows balancing load among multiple services.
//!
//! In larger systems, multiple endpoints are often available for a given service. As load
//! increases, you want to ensure that that load is spread evenly across the available services.
//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded
//! service, even when spare capacity is available to handle that request elsewhere.
//!
//! This module provides two pieces of middleware that helps with this type of load balancing:
//!
//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust
//! technique for spreading load across services with only inexact load measurements. Use this if
//! the set of available services is not within your control, and you simply want to spread load
//! among that set of services.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//!
//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall
//! current load by tracking successful and unsuccessful calls to `poll_ready`, and uses an
//! exponentially weighted moving average to add (using [`tower::make_service::MakeService`]) or
//! remove (by dropping) services in response to increases or decreases in load. Use this if you
//! are able to dynamically add more service endpoints to the system to handle added load.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! # #[cfg(feature = "load")]
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
//! # fn warnings_are_errors() {
//! use tower::balance::p2c::Balance;
//! use tower::load::Load;
//! use tower::{Service, ServiceExt};
//! use futures_util::pin_mut;
//! # use futures_core::Stream;
//! # use futures_util::StreamExt;
//!
//! async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
//! where
//! S::Error: Into<tower::BoxError>,
//! # // this bound is pretty unfortunate, and the compiler does _not_ help
//! S::Metric: std::fmt::Debug,
//! {
//! // Spread load evenly across the two services
//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
//!
//! // Issue all the requests that come in.
//! // Some will go to svc1, some will go to svc2.
//! pin_mut!(reqs);
//! let mut responses = p2c.call_all(reqs);
//! while let Some(rsp) = responses.next().await {
//! // ...
//! }
//! }
//! # }
//! ```

pub mod error;
pub mod p2c;
Expand Down
32 changes: 21 additions & 11 deletions tower/src/balance/p2c/layer.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
use super::BalanceMake;
use super::MakeBalance;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;

/// Efficiently distributes requests across an arbitrary number of services
/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the
/// "inner" service in response to requests coming from the "outer" service.
///
/// This construction may seem a little odd at first glance. This is not a layer that takes
/// requests and produces responses in the traditional sense. Instead, it is more like
/// [`MakeService`](tower::make_service::MakeService) in that it takes service _descriptors_ (see
/// `Target` on `MakeService`) and produces _services_. Since [`Balance`] spreads requests across a
/// _set_ of services, the inner service should produce a [`Discover`], not just a single
/// [`Service`], given a service descriptor.
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone)]
pub struct BalanceLayer<D, Req> {
pub struct MakeBalanceLayer<D, Req> {
rng: SmallRng,
_marker: PhantomData<fn(D, Req)>,
}

impl<D, Req> BalanceLayer<D, Req> {
/// Builds a balancer using the system entropy.
impl<D, Req> MakeBalanceLayer<D, Req> {
/// Build balancers using operating system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}

/// Builds a balancer from the provided RNG.
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
Expand All @@ -31,17 +41,17 @@ impl<D, Req> BalanceLayer<D, Req> {
}
}

impl<S, Req> Layer<S> for BalanceLayer<S, Req> {
type Service = BalanceMake<S, Req>;
impl<S, Req> Layer<S> for MakeBalanceLayer<S, Req> {
type Service = MakeBalance<S, Req>;

fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone())
MakeBalance::from_rng(make_discover, self.rng.clone()).expect("SmallRng is infallible")
}
}

impl<D, Req> fmt::Debug for BalanceLayer<D, Req> {
impl<D, Req> fmt::Debug for MakeBalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer")
f.debug_struct("MakeBalanceLayer")
.field("rng", &self.rng)
.finish()
}
Expand Down
41 changes: 28 additions & 13 deletions tower/src/balance/p2c/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
Expand All @@ -12,15 +12,22 @@ use std::{
};
use tower_service::Service;

/// Makes `Balancer`s given an inner service that makes `Discover`s.
/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service.
///
/// This is effectively an implementation of [`MakeService`](tower::make_service::MakeService),
/// except that it forwards the service descriptors (`Target`) to an inner service (`S`), and
/// expects that service to produce a service set in the form of a [`Discover`]. It then wraps the
/// service set in a [`Balance`] before returning it as the "made" service.
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone, Debug)]
pub struct BalanceMake<S, Req> {
pub struct MakeBalance<S, Req> {
inner: S,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
}

/// Makes a balancer instance.
/// A [`Balance`] in the making.
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
Expand All @@ -30,22 +37,30 @@ pub struct MakeFuture<F, Req> {
_marker: PhantomData<fn(Req)>,
}

impl<S, Req> BalanceMake<S, Req> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
impl<S, Req> MakeBalance<S, Req> {
/// Build balancers using operating system entropy.
pub fn new(make_discover: S) -> Self {
Self {
inner,
rng,
inner: make_discover,
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}

/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(make_discover: S) -> Self {
Self::new(make_discover, SmallRng::from_entropy())
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(inner: S, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
inner,
rng,
_marker: PhantomData,
})
}
}

impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
impl<S, Target, Req> Service<Target> for MakeBalance<S, Req>
where
S: Service<Target>,
S::Response: Discover,
Expand Down Expand Up @@ -83,7 +98,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner, this.rng.clone());
let svc = Balance::from_rng(inner, this.rng.clone()).expect("SmallRng is infallible");
Poll::Ready(Ok(svc))
}
}
34 changes: 31 additions & 3 deletions tower/src/balance/p2c/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
//! A Power-of-Two-Choices Load Balancer
//! This module implements the "[Power of Two Random Choices]" load balancing algorithm.
//!
//! It is a simple but robust technique for spreading load across services with only inexact load
//! measurements. As its name implies, whenever a request comes in, it samples two ready services
//! at random, and issues the request to whichever service is less loaded. How loaded a service is
//! is determined by the return value of [`Load`](tower::load::Load).
//!
//! As described in the [Finagle Guide][finagle]:
//!
//! > The algorithm randomly picks two services from the set of ready endpoints and
//! > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > expect a manageable upper bound on the maximum load of any server.
//! >
//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > `n` is the number of servers in the cluster.
//!
//! The balance service and layer implementations rely on _service discovery_ to provide the
//! underlying set of services to balance requests across. This happens through the
//! [`Discover`](tower::discover::Discover) trait, which is essentially a `Stream` that indicates
//! when services become available or go away. If you have a fixed set of services, consider using
//! [`ServiceList`](tower::discover::ServiceList).
//!
//! Since the load balancer needs to perform _random_ choices, the constructors in this module
//! usually come in two forms: one that uses randomness provided by the operating system, and one
//! that lets you specify the random seed to use. Usually the former is what you'll want, though
//! the latter may come in handy for reproducability or to reduce reliance on the operating system.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded

mod layer;
mod make;
Expand All @@ -7,6 +35,6 @@ mod service;
#[cfg(test)]
mod test;

pub use layer::BalanceLayer;
pub use make::{BalanceMake, MakeFuture};
pub use layer::MakeBalanceLayer;
pub use make::{MakeBalance, MakeFuture};
pub use service::Balance;
39 changes: 19 additions & 20 deletions tower/src/balance/p2c/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
Expand All @@ -18,24 +18,15 @@ use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};

/// Distributes requests across inner services using the [Power of Two Choices][p2c].
/// Efficiently distributes requests across an arbitrary number of services.
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
/// See the [module-level documentation](..) for details.
///
/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement
/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes
/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you
/// construct the `Balance` instance. For more details, see [#319].
Comment on lines 25 to 28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably best saved for a follow-up PR, but should Discover just have an Unpin bound on it? Since the unpin bound is only on the Service impl for Balance, this will currently fail very late, when a user tries to wrap a Balance in another layer or actually call a Balance, and it probably won't be super clear why Service isn't implemented. If Discover required Self: Unpin, the compiler error would occur when the user writes a bad Discover impl, which would be much clearer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid Unpin bounds wherever possible. They are really sad, especially when we one day get generators that implement Stream. This also ties into the larger discussion in #319.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but if Discover is going to be polled in poll_ready, it will always have to be Unpin unless poll_ready is changed to take a Pin<&mut Self> receiver. In that case, we would be making a breaking change anyway and could remove an Unpin bound from Discover.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if Discover will only ever be polled in poll_ready though? But you're right that if poll_ready later changes to be Pin, we could remove the bound then (although I think removing the bound is backwards-compatible anyway?).

///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
pub struct Balance<D, Req>
Expand Down Expand Up @@ -68,10 +59,10 @@ where
}
}

#[pin_project]
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
Expand All @@ -94,10 +85,10 @@ where
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
/// Constructs a load balancer that uses operating system entropy.
pub fn new(discover: D) -> Self {
Self {
rng,
rng: SmallRng::from_entropy(),
discover,
services: ReadyCache::default(),
ready_index: None,
Expand All @@ -106,9 +97,17 @@ where
}
}

/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(discover: D) -> Self {
Self::new(discover, SmallRng::from_entropy())
/// Constructs a load balancer seeded with the provided random number generator.
pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
discover,
services: ReadyCache::default(),
ready_index: None,

_req: PhantomData,
})
}

/// Returns the number of endpoints currently tracked by the balancer.
Expand Down
6 changes: 3 additions & 3 deletions tower/src/balance/p2c/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::*;
async fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));
assert_pending!(svc.poll_ready());
}

Expand All @@ -20,7 +20,7 @@ async fn single_endpoint() {
let (mut svc, mut handle) = mock::spawn_with(|s| {
let mock = load::Constant::new(s, 0);
let disco = ServiceList::new(vec![mock].into_iter());
Balance::from_entropy(disco)
Balance::new(disco)
});

handle.allow(0);
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn two_endpoints_with_equal_load() {
pin_mut!(handle_b);

let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));

handle_a.allow(0);
handle_b.allow(0);
Expand Down
2 changes: 1 addition & 1 deletion tower/src/balance/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl Builder {
};

Pool {
balance: Balance::from_entropy(Box::pin(d)),
balance: Balance::new(Box::pin(d)),
options: *self,
ewma: self.init,
}
Expand Down