Skip to content

Commit

Permalink
Use a constant-time load balancer (#266)
Browse files Browse the repository at this point in the history
tower-rs/tower#288 and tower-rs/tower#293 changed the Balance
implementation substantially. Now, a new layer, `spawn_ready` is
inserted around endpoint stack to ensure that readiness is driven
on a background task.

In support of this change, the `pending` layer was removed from the
endpoint stack and, instead, the discovery system is now responsible for
driving pending services to be materialized.
  • Loading branch information
olix0r committed Jul 11, 2019
1 parent b71349a commit 3a3ec3b
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 127 deletions.
37 changes: 35 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ dependencies = [
"http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.12.31 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-load 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

Expand Down Expand Up @@ -540,9 +541,11 @@ dependencies = [
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-discover 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)",
"tower-load 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-request-modifier 0.1.0 (git+https://github.com/tower-rs/tower-http)",
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-spawn-ready 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing-fmt 0.1.0 (git+https://github.com/tokio-rs/tracing)",
Expand Down Expand Up @@ -1473,14 +1476,17 @@ dependencies = [
[[package]]
name = "tower-balance"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#9b27863a6160e2146bcf1bc6548a0334e7ad1fb8"
source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15"
dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-discover 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-layer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-load 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
Expand Down Expand Up @@ -1555,6 +1561,18 @@ dependencies = [
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tower-load"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15"
dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-discover 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tower-load-shed"
version = "0.1.0"
Expand All @@ -1568,7 +1586,7 @@ dependencies = [
[[package]]
name = "tower-reconnect"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#9b27863a6160e2146bcf1bc6548a0334e7ad1fb8"
source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15"
dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
Expand Down Expand Up @@ -1605,6 +1623,19 @@ dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tower-spawn-ready"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15"
dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-layer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tower-timeout"
version = "0.1.0"
Expand Down Expand Up @@ -2042,11 +2073,13 @@ dependencies = [
"checksum tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "<none>"
"checksum tower-layer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0ddf07e10c07dcc8f41da6de036dc66def1a85b70eb8a385159e3908bb258328"
"checksum tower-limit 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "09d3d0fe82c2373225025d50881794e0792e544df9752dec66288b644b40fbfe"
"checksum tower-load 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-load-shed 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "04fbaf5bfb63d84204db87b9b2aeec61549613f2bbb8706dcc36f5f3ea8cd769"
"checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-request-modifier 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "<none>"
"checksum tower-retry 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "09e80588125061f276ed2a7b0939988b411e570a2dbb2965b1382ef4f71036f7"
"checksum tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2cc0c98637d23732f8de6dfd16494c9f1559c3b9e20b4a46462c8f9b9e827bfa"
"checksum tower-spawn-ready 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "daa179ec4087589dc67148dc661abce5badc2c3ed4197adc7bd64b39f1f33c31"
"checksum tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4792342fac093db5d2558655055a89a04ca909663467a4310c7739d9f8b64698"
"checksum tracing 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9bb7fc03e49466b8388752c8b23856f0ae0782d4bbc0e0320430e9f62933a998"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ tower-service = "0.2"
tower-util = "0.1"
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-load = { git = "https://github.com/tower-rs/tower" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-request-modifier = { git = "https://github.com/tower-rs/tower-http" }
tower-spawn-ready = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] }

# FIXME update to a release when available (>0.11)
Expand Down
1 change: 1 addition & 0 deletions lib/hyper-balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ http = "0.1"
hyper = "0.12"

tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-load = { git = "https://github.com/tower-rs/tower" }
tower-service = "0.2"
4 changes: 2 additions & 2 deletions lib/hyper-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures::{Async, Poll};
use http;
use hyper::body::Payload;
use tower_balance::load::Instrument;
use tower_load::Instrument;

/// Instruments HTTP responses to drop handles when their first body message is received.
#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -178,7 +178,7 @@ mod tests {
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::{Arc, Weak};
use tower_balance::load::Instrument;
use tower_load::Instrument;

use super::{PendingUntilEos, PendingUntilFirstData};

Expand Down
12 changes: 5 additions & 7 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use logging;
use metrics::FmtMetrics;
use never::Never;
use proxy::{
self, accept, buffer,
self, accept,
http::{
client, insert, metrics as http_metrics, normalize_uri, profiles, router, settings,
strip_header,
},
pending, reconnect,
reconnect,
};
use svc::{self, LayerExt};
use tap;
Expand Down Expand Up @@ -483,7 +483,6 @@ where
.layer(strip_header::response::layer(super::L5D_SERVER_ID))
.layer(strip_header::response::layer(super::L5D_REMOTE_IP))
.service(client_stack);

// A per-`dst::Route` layer that uses profile data to configure
// a per-route layer.
//
Expand Down Expand Up @@ -515,13 +514,14 @@ where
ep
},
))
.layer(buffer::layer(max_in_flight, DispatchDeadline::extract));
.buffer_pending(max_in_flight, DispatchDeadline::extract);

// Resolves the target via the control plane and balances requests
// over all endpoints returned from the destination service.
let balancer = svc::builder()
.layer(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.layer(resolve::layer(Resolve::new(resolver)));
.layer(resolve::layer(Resolve::new(resolver)))
.spawn_ready();

let distributor = svc::builder()
.layer(
Expand All @@ -531,8 +531,6 @@ where
fallback::layer(balancer, orig_dst_router)
.on_error::<control::destination::Unresolvable>(),
)
.layer(pending::layer())
.layer(balance::weight::layer())
.service(endpoint_stack);

// A per-`DstAddr` stack that does the following:
Expand Down
14 changes: 1 addition & 13 deletions src/app/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@ use std::{fmt, hash};

use super::identity;
use control::destination::{Metadata, ProtocolHint};
use proxy::{
self,
http::{
balance::{HasWeight, Weight},
settings,
},
};
use proxy::{self, http::settings};
use tap;
use transport::{connect, tls};
use {Conditional, NameAddr};
Expand Down Expand Up @@ -108,12 +102,6 @@ impl connect::HasPeerAddr for Endpoint {
}
}

impl HasWeight for Endpoint {
fn weight(&self) -> Weight {
self.metadata.weight()
}
}

impl settings::HasSettings for Endpoint {
fn http_settings(&self) -> &settings::Settings {
&self.http_settings
Expand Down
6 changes: 0 additions & 6 deletions src/control/destination/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod client;
mod resolution;
use self::client::Client;
pub use self::resolution::{Resolution, ResolveFuture, Unresolvable};
use proxy::http::balance::Weight;
use NameAddr;

/// A handle to request resolutions from the destination service.
Expand Down Expand Up @@ -164,9 +163,4 @@ impl Metadata {
pub fn identity(&self) -> Option<&identity::Name> {
self.identity.as_ref()
}

pub fn weight(&self) -> Weight {
let w: f64 = self.weight.into();
(w / 10_000.0).into()
}
}
Loading

0 comments on commit 3a3ec3b

Please sign in to comment.