Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an examples that demonstrates p2c rr behavior #39

Merged
merged 3 commits into from
Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tower-balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ publish = false

[dependencies]
futures = "0.1"
log = "0.3"
rand = "0.4"
tower = { version = "0.1", path = "../" }
tower-discover = { version = "0.1", path = "../tower-discover" }
ordermap = "0.2"

[dev-dependencies]
log = "0.3"
env_logger = "0.4"
hdrsample = "6.0"
tokio-core = "^0.1.12"
tokio-timer = "0.1"
quickcheck = "0.6"
210 changes: 210 additions & 0 deletions tower-balance/examples/demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//! Exercises load balancers with mocked services.

extern crate env_logger;
#[macro_use]
extern crate futures;
extern crate hdrsample;
#[macro_use]
extern crate log;
extern crate rand;
extern crate tokio_core;
extern crate tokio_timer;
extern crate tower;
extern crate tower_balance;
extern crate tower_discover;

use futures::{Async, Future, Stream, Poll, future, stream};
use hdrsample::Histogram;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio_core::reactor::Core;
use tokio_timer::{Timer, TimerError, Sleep};
use tower::Service;
use tower_balance::*;
use tower_discover::{Change, Discover};

struct DelayService(Timer, Duration);

struct Delay(Sleep, Instant);

struct Disco(VecDeque<Change<usize, DelayService>>);

impl Service for DelayService {
type Request = ();
type Response = Duration;
type Error = TimerError;
type Future = Delay;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
debug!("polling delay service: ready");
Ok(Async::Ready(()))
}

fn call(&mut self, _: ()) -> Delay {
Delay(self.0.sleep(self.1), Instant::now())
}
}

impl Future for Delay {
type Item = Duration;
type Error = TimerError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
try_ready!(self.0.poll());
Ok(Async::Ready(self.1.elapsed()))
}
}

impl Discover for Disco {
type Key = usize;
type Request = ();
type Response = Duration;
type Error = TimerError;
type Service = DelayService;
type DiscoverError = ();

fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
let r = self.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
}

fn gen_disco(timer: &Timer) -> Disco {
use self::Change::Insert;

let mut changes = VecDeque::new();

let quick = Duration::from_millis(500);
for i in 0..8 {
changes.push_back(Insert(i, DelayService(timer.clone(), quick)));
}

let slow = Duration::from_secs(2);
changes.push_back((Insert(9, DelayService(timer.clone(), slow))));

Disco(changes)
}

struct SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
{
lb: Balance<D, C>,
send_remaining: usize,
responses: stream::FuturesUnordered<ResponseFuture<<D::Service as Service>::Future, D::DiscoverError>>,
}

impl<D, C> Stream for SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
{
type Item = Duration;
type Error = Error<D::Error, D::DiscoverError>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!("sending requests {} / {}", self.send_remaining, self.responses.len());
while self.send_remaining > 0 {
if !self.responses.is_empty() {
if let Async::Ready(Some(rsp)) = self.responses.poll()? {
return Ok(Async::Ready(Some(rsp)));
}
}

if self.send_remaining > 0 {
debug!("polling lb ready");
try_ready!(self.lb.poll_ready());

debug!("sending request");
let rsp = self.lb.call(());
self.responses.push(rsp);

self.send_remaining -= 1;
}
}

if !self.responses.is_empty() {
return self.responses.poll();
}

Ok(Async::Ready(None))
}
}

fn compute_histo<S>(times: S)
-> Box<Future<Item = Histogram<u64>, Error = S::Error> + 'static>
where
S: Stream<Item = Duration> + 'static
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
let fut = times
.fold(histo, |mut histo, elapsed| {
let ns: u32 = elapsed.subsec_nanos();
let ms = u64::from(ns) / 1_000 / 1_000
+ elapsed.as_secs() * 1_000;
histo += ms;

future::ok(histo)
});

Box::new(fut)
}

fn report(pfx: &str, histo: &Histogram<u64>) {
println!("{} samples: {}", pfx, histo.len());

if histo.len () < 2 {
return;
}
println!("{} p50: {}", pfx, histo.value_at_quantile(0.5));

if histo.len () < 10 {
return;
}
println!("{} p90: {}", pfx, histo.value_at_quantile(0.9));

if histo.len () < 50 {
return;
}
println!("{} p95: {}", pfx, histo.value_at_quantile(0.95));

if histo.len () < 100 {
return;
}
println!("{} p99: {}", pfx, histo.value_at_quantile(0.99));

if histo.len () < 1000 {
return;
}
println!("{} p999: {}", pfx, histo.value_at_quantile(0.999));
}

fn main() {
env_logger::init().unwrap();

let timer = Timer::default();
let mut core = Core::new().unwrap();
let requests = 1_000_000;

{
let lb = {
let loaded = load::WithPendingRequests::new(gen_disco(&timer));
power_of_two_choices(loaded, rand::thread_rng())
};
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("p2c", &histo)
}

{
let lb = round_robin(gen_disco(&timer));
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("rr", &histo)
}
}
14 changes: 12 additions & 2 deletions tower-balance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
extern crate ordermap;
#[cfg(test)]
extern crate quickcheck;
Expand Down Expand Up @@ -110,6 +112,7 @@ where
fn update_from_discover(&mut self)
-> Result<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
debug!("updating from discover");
use tower_discover::Change::*;

while let Async::Ready(change) = self.discover.poll().map_err(Error::Balance)? {
Expand Down Expand Up @@ -144,9 +147,11 @@ where
fn promote_to_ready(&mut self)
-> Result<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
let n = self.not_ready.len();
debug!("promoting to ready: {}", n);
// Iterate through the not-ready endpoints from right to left to prevent removals
// from reordering services in a way that could prevent a service from being polled.
for idx in self.not_ready.len()-1..0 {
for idx in (0..n-1).rev() {
let is_ready = {
let (_, svc) = self.not_ready
.get_index_mut(idx)
Expand All @@ -155,10 +160,13 @@ where
};

if is_ready {
debug!("promoting to ready");
let (key, svc) = self.not_ready
.swap_remove_index(idx)
.expect("invalid not_ready index");
self.ready.insert(key, svc);
} else {
debug!("not promoting to ready");
}
}

Expand Down Expand Up @@ -195,7 +203,9 @@ where
-> Poll<(), Error<<D::Service as Service>::Error, D::DiscoverError>>
{
loop {
let idx = match self.ready.len() {
let n = self.ready.len();
debug!("choosing from {} replicas", n);
let idx = match n {
0 => return Ok(Async::NotReady),
1 => 0,
_ => {
Expand Down