Skip to content

Commit

Permalink
Add an examples that demonstrates p2c & rr behavior (#39)
Browse files Browse the repository at this point in the history
The new _demo_ example sends a million simulated requests through each
load balancer configuration and records the observed latency
distributions.

Furthermore, this fixes a critical bug in `Balancer`, where we did not
properly iterate through not-ready nodes.

* Use (0..n-1).rev() to iterate from right-to-left
  • Loading branch information
olix0r committed Jan 25, 2018
1 parent 4bedc52 commit 777888d
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 2 deletions.
6 changes: 6 additions & 0 deletions tower-balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ publish = false

[dependencies]
futures = "0.1"
log = "0.3"
rand = "0.4"
tower = { version = "0.1", path = "../" }
tower-discover = { version = "0.1", path = "../tower-discover" }
ordermap = "0.2"

[dev-dependencies]
log = "0.3"
env_logger = "0.4"
hdrsample = "6.0"
tokio-core = "^0.1.12"
tokio-timer = "0.1"
quickcheck = "0.6"
210 changes: 210 additions & 0 deletions tower-balance/examples/demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//! Exercises load balancers with mocked services.

extern crate env_logger;
#[macro_use]
extern crate futures;
extern crate hdrsample;
#[macro_use]
extern crate log;
extern crate rand;
extern crate tokio_core;
extern crate tokio_timer;
extern crate tower;
extern crate tower_balance;
extern crate tower_discover;

use futures::{Async, Future, Stream, Poll, future, stream};
use hdrsample::Histogram;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio_core::reactor::Core;
use tokio_timer::{Timer, TimerError, Sleep};
use tower::Service;
use tower_balance::*;
use tower_discover::{Change, Discover};

struct DelayService(Timer, Duration);

struct Delay(Sleep, Instant);

struct Disco(VecDeque<Change<usize, DelayService>>);

impl Service for DelayService {
type Request = ();
type Response = Duration;
type Error = TimerError;
type Future = Delay;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
debug!("polling delay service: ready");
Ok(Async::Ready(()))
}

fn call(&mut self, _: ()) -> Delay {
Delay(self.0.sleep(self.1), Instant::now())
}
}

impl Future for Delay {
type Item = Duration;
type Error = TimerError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
try_ready!(self.0.poll());
Ok(Async::Ready(self.1.elapsed()))
}
}

impl Discover for Disco {
type Key = usize;
type Request = ();
type Response = Duration;
type Error = TimerError;
type Service = DelayService;
type DiscoverError = ();

fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
let r = self.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
}

fn gen_disco(timer: &Timer) -> Disco {
use self::Change::Insert;

let mut changes = VecDeque::new();

let quick = Duration::from_millis(500);
for i in 0..8 {
changes.push_back(Insert(i, DelayService(timer.clone(), quick)));
}

let slow = Duration::from_secs(2);
changes.push_back((Insert(9, DelayService(timer.clone(), slow))));

Disco(changes)
}

struct SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
{
lb: Balance<D, C>,
send_remaining: usize,
responses: stream::FuturesUnordered<ResponseFuture<<D::Service as Service>::Future, D::DiscoverError>>,
}

impl<D, C> Stream for SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
{
type Item = Duration;
type Error = Error<D::Error, D::DiscoverError>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!("sending requests {} / {}", self.send_remaining, self.responses.len());
while self.send_remaining > 0 {
if !self.responses.is_empty() {
if let Async::Ready(Some(rsp)) = self.responses.poll()? {
return Ok(Async::Ready(Some(rsp)));
}
}

if self.send_remaining > 0 {
debug!("polling lb ready");
try_ready!(self.lb.poll_ready());

debug!("sending request");
let rsp = self.lb.call(());
self.responses.push(rsp);

self.send_remaining -= 1;
}
}

if !self.responses.is_empty() {
return self.responses.poll();
}

Ok(Async::Ready(None))
}
}

fn compute_histo<S>(times: S)
-> Box<Future<Item = Histogram<u64>, Error = S::Error> + 'static>
where
S: Stream<Item = Duration> + 'static
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
let fut = times
.fold(histo, |mut histo, elapsed| {
let ns: u32 = elapsed.subsec_nanos();
let ms = u64::from(ns) / 1_000 / 1_000
+ elapsed.as_secs() * 1_000;
histo += ms;

future::ok(histo)
});

Box::new(fut)
}

fn report(pfx: &str, histo: &Histogram<u64>) {
println!("{} samples: {}", pfx, histo.len());

if histo.len () < 2 {
return;
}
println!("{} p50: {}", pfx, histo.value_at_quantile(0.5));

if histo.len () < 10 {
return;
}
println!("{} p90: {}", pfx, histo.value_at_quantile(0.9));

if histo.len () < 50 {
return;
}
println!("{} p95: {}", pfx, histo.value_at_quantile(0.95));

if histo.len () < 100 {
return;
}
println!("{} p99: {}", pfx, histo.value_at_quantile(0.99));

if histo.len () < 1000 {
return;
}
println!("{} p999: {}", pfx, histo.value_at_quantile(0.999));
}

fn main() {
env_logger::init().unwrap();

let timer = Timer::default();
let mut core = Core::new().unwrap();
let requests = 1_000_000;

{
let lb = {
let loaded = load::WithPendingRequests::new(gen_disco(&timer));
power_of_two_choices(loaded, rand::thread_rng())
};
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("p2c", &histo)
}

{
let lb = round_robin(gen_disco(&timer));
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("rr", &histo)
}
}
14 changes: 12 additions & 2 deletions tower-balance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
extern crate ordermap;
#[cfg(test)]
extern crate quickcheck;
Expand Down Expand Up @@ -110,6 +112,7 @@ where
fn update_from_discover(&mut self)
-> Result<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
debug!("updating from discover");
use tower_discover::Change::*;

while let Async::Ready(change) = self.discover.poll().map_err(Error::Balance)? {
Expand Down Expand Up @@ -144,9 +147,11 @@ where
fn promote_to_ready(&mut self)
-> Result<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
let n = self.not_ready.len();
debug!("promoting to ready: {}", n);
// Iterate through the not-ready endpoints from right to left to prevent removals
// from reordering services in a way that could prevent a service from being polled.
for idx in self.not_ready.len()-1..0 {
for idx in (0..n-1).rev() {
let is_ready = {
let (_, svc) = self.not_ready
.get_index_mut(idx)
Expand All @@ -155,10 +160,13 @@ where
};

if is_ready {
debug!("promoting to ready");
let (key, svc) = self.not_ready
.swap_remove_index(idx)
.expect("invalid not_ready index");
self.ready.insert(key, svc);
} else {
debug!("not promoting to ready");
}
}

Expand Down Expand Up @@ -195,7 +203,9 @@ where
-> Poll<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
loop {
let idx = match self.ready.len() {
let n = self.ready.len();
debug!("choosing from {} replicas", n);
let idx = match n {
0 => return Ok(Async::NotReady),
1 => 0,
_ => {
Expand Down

0 comments on commit 777888d

Please sign in to comment.