Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load: Add "weight" load variant #695

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tower::balance as lb;
use tower::discover::{Change, Discover};
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load;
use tower::load::weight::{HasWeight, Weight};
use tower::util::ServiceExt;
use tower_service::Service;

Expand All @@ -35,6 +36,7 @@ static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
Duration::from_millis(500),
Duration::from_millis(1000),
];
static ENDPOINT_WEIGHTS: [f64; 10] = [1.0, 1.0, 0.0, 0.01, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];

struct Summary {
latencies: Histogram<u64>,
Expand All @@ -55,6 +57,11 @@ async fn main() {
print!("{}ms, ", l);
}
println!("]");
print!("ENDPOINT_WEIGHTS=[");
for weight in &ENDPOINT_WEIGHTS {
print!("{}, ", weight);
}
println!("]");

let decay = Duration::from_secs(10);
let d = gen_disco();
Expand All @@ -66,17 +73,42 @@ async fn main() {
));
run("P2C+PeakEWMA...", pe).await;

let d = gen_disco();
let pe = lb::p2c::Balance::new(load::WeightedDiscover::new(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::CompleteOnResponse::default(),
)));
run("P2C+PeakEWMA+Weighted...", pe).await;

let d = gen_disco();
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
run("P2C+LeastLoaded...", ll).await;

let d = gen_disco();
let ll = lb::p2c::Balance::new(load::WeightedDiscover::new(
load::PendingRequestsDiscover::new(d, load::CompleteOnResponse::default()),
));
run("P2C+LeastLoaded+Weighted...", ll).await;
}

type Error = Box<dyn std::error::Error + Send + Sync>;

type Key = usize;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Key {
instance: usize,
weight: Weight,
}

impl HasWeight for Key {
fn weight(&self) -> Weight {
self.weight
}
}

pin_project! {
struct Disco<S> {
Expand Down Expand Up @@ -117,8 +149,9 @@ fn gen_disco() -> impl Discover<
Disco::new(
MAX_ENDPOINT_LATENCIES
.iter()
.zip(&ENDPOINT_WEIGHTS)
.enumerate()
.map(|(instance, latency)| {
.map(|(instance, (latency, weight))| {
let svc = tower::service_fn(move |_| {
let start = Instant::now();

Expand All @@ -133,7 +166,12 @@ fn gen_disco() -> impl Discover<
}
});

(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
let key = Key {
instance,
weight: Weight::from(*weight),
};

(key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
})
.collect(),
)
Expand Down
8 changes: 7 additions & 1 deletion tower/src/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! - [`Constant`] — Always returns the same constant load value for a service.
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
//! - [`Weight`] - Adds a weighting to an inner Load.
//!
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
//! balance services depending on their load. Which load metric to use depends on your exact
Expand Down Expand Up @@ -63,16 +64,21 @@ pub mod completion;
mod constant;
pub mod peak_ewma;
pub mod pending_requests;
pub mod weight;

pub use self::{
completion::{CompleteOnResponse, TrackCompletion},
constant::Constant,
peak_ewma::PeakEwma,
pending_requests::PendingRequests,
weight::Weight,
};

#[cfg(feature = "discover")]
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
pub use self::{
peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover,
weight::WeightedDiscover,
};

/// Types that implement this trait can give an estimate of how loaded they are.
///
Expand Down
10 changes: 10 additions & 0 deletions tower/src/load/peak_ewma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::weight::Weight;
use super::Load;
use std::task::{Context, Poll};
use std::{
ops,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -69,6 +71,14 @@ pin_project! {
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);

impl ops::Div<Weight> for Cost {
type Output = f64;

fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}

/// Tracks an in-flight request and updates the RTT-estimate on Drop.
#[derive(Debug)]
pub struct Handle {
Expand Down
10 changes: 10 additions & 0 deletions tower/src/load/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use crate::discover::{Change, Discover};
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
use std::ops;
#[cfg(feature = "discover")]
use std::pin::Pin;

use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::weight::Weight;
use super::Load;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -43,6 +45,14 @@ pin_project! {
#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)]
pub struct Count(usize);

impl ops::Div<Weight> for Count {
type Output = f64;

fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}

/// Tracks an in-flight request by reference count.
#[derive(Debug)]
pub struct Handle(RefCount);
Expand Down
198 changes: 198 additions & 0 deletions tower/src/load/weight.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
//! A [`Load`] implementation which implements weighting on top of an inner [`Load`].
//!
//! This can be useful in such cases as canary deployments, where it is desirable for a
//! particular service to receive less than its fair share of load than other services.

#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::ready;
#[cfg(feature = "discover")]
use futures_core::Stream;
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;

use std::ops;
use std::task::{Context, Poll};
use tower_service::Service;

use super::Load;

/// A weight on [0.0, ∞].
///
/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes.
///
/// This is represented internally as an integer, rather than a float, so that it can implement
/// `Hash` and `Eq`.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct Weight(u32);

impl Weight {
/// Minimum Weight
pub const MIN: Weight = Weight(0);
/// Unit of Weight - what 1.0_f64 corresponds to
pub const UNIT: Weight = Weight(10_000);
/// Maximum Weight
pub const MAX: Weight = Weight(u32::MAX);
}

impl Default for Weight {
fn default() -> Self {
Weight::UNIT
}
}

impl From<f64> for Weight {
fn from(w: f64) -> Self {
if w < 0.0 || w == f64::NAN {
Self::MIN
} else if w == f64::INFINITY {
Self::MAX
} else {
Weight((w * (Weight::UNIT.0 as f64)).round() as u32)
}
}
}

impl Into<f64> for Weight {
fn into(self) -> f64 {
(self.0 as f64) / (Weight::UNIT.0 as f64)
}
}

impl ops::Div<Weight> for f64 {
type Output = f64;

fn div(self, w: Weight) -> f64 {
if w == Weight::MIN {
f64::INFINITY
} else {
let w: f64 = w.into();
self / w
}
}
}

impl ops::Div<Weight> for usize {
type Output = f64;

fn div(self, w: Weight) -> f64 {
(self as f64) / w
}
}

/// Measures the load of the underlying service by weighting that service's load by a constant
/// weighting factor.
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Weighted<S> {
inner: S,
weight: Weight,
}

impl<S> Weighted<S> {
/// Wraps an `S`-typed service so that its load is weighted by the given weight.
pub fn new<W: Into<Weight>>(inner: S, w: W) -> Self {
let weight = w.into();
Self { inner, weight }
}
}

impl<S> Load for Weighted<S>
where
S: Load,
S::Metric: ops::Div<Weight>,
<S::Metric as ops::Div<Weight>>::Output: PartialOrd,
{
type Metric = <S::Metric as ops::Div<Weight>>::Output;

fn load(&self) -> Self::Metric {
self.inner.load() / self.weight
}
}

impl<R, S: Service<R>> Service<R> for Weighted<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: R) -> Self::Future {
self.inner.call(req)
}
}

#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with [`Weighted`].
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct WeightedDiscover<D>{
#[pin]
discover: D,
}
}

#[cfg(feature = "discover")]
impl<D> WeightedDiscover<D> {
/// Wraps a [`Discover`], wrapping all of its services with [`Weighted`].
pub fn new(discover: D) -> Self {
Self { discover }
}
}

/// Allows [`Discover::Key`] to expose a weight, so that they can be included in a discover
/// stream
pub trait HasWeight {
/// Returns the [`Weight`]
fn weight(&self) -> Weight;
}

impl<T: HasWeight> From<T> for Weighted<T> {
fn from(inner: T) -> Self {
let weight = inner.weight();
Self { inner, weight }
}
}

impl<T> HasWeight for Weighted<T> {
fn weight(&self) -> Weight {
self.weight
}
}

#[cfg(feature = "discover")]
impl<D> Stream for WeightedDiscover<D>
where
D: Discover,
D::Key: HasWeight,
{
type Item = Result<Change<D::Key, Weighted<D::Service>>, D::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;

let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => {
let w = k.weight();
Insert(k, Weighted::new(svc, w))
}
Some(Remove(k)) => Remove(k),
};

Poll::Ready(Some(Ok(change)))
}
}

#[test]
fn div_min() {
assert_eq!(10.0 / Weight::MIN, f64::INFINITY);
assert_eq!(10 / Weight::MIN, f64::INFINITY);
assert_eq!(0 / Weight::MIN, f64::INFINITY);
}