Skip to content

Commit

Permalink
Format tower with rustfmt; check in CI (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbarsky authored and carllerche committed Feb 11, 2019
1 parent 4c5ba67 commit d7e1b8f
Show file tree
Hide file tree
Showing 33 changed files with 308 additions and 319 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---
language: rust
sudo: false
before_script:
- rustup component add rustfmt

matrix:
include:
Expand All @@ -11,6 +13,7 @@ matrix:
- rust: nightly

script:
- cargo fmt --all -- --check
- cargo test --all

deploy:
Expand Down
30 changes: 13 additions & 17 deletions examples/channel_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#![deny(warnings)]

extern crate futures;
extern crate tokio_timer;
extern crate futures_cpupool;
extern crate tokio_timer;
extern crate tower_service;
extern crate tower_util;

Expand All @@ -23,9 +23,9 @@ extern crate env_logger;
use tower_service::Service;
use tower_util::{MakeService, ServiceExt};

use futures::{Future, Stream, IntoFuture, Poll, Async};
use futures::future::{Executor, FutureResult};
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures_cpupool::CpuPool;
use tokio_timer::Timer;

Expand Down Expand Up @@ -95,12 +95,11 @@ impl Service<()> for NewChannelService {
// Create the task that proceses the request
self.pool
.execute(rx.for_each(move |(msg, tx)| {
timer.sleep(Duration::from_millis(500))
.then(move |res| {
res.unwrap();
let _ = tx.send(msg);
Ok(())
})
timer.sleep(Duration::from_millis(500)).then(move |res| {
res.unwrap();
let _ = tx.send(msg);
Ok(())
})
}))
.map(|_| ChannelService { tx })
.map_err(|_| io::ErrorKind::Other.into())
Expand All @@ -114,8 +113,7 @@ impl Service<String> for ChannelService {
type Future = ResponseFuture;

fn poll_ready(&mut self) -> Poll<(), Error> {
self.tx.poll_ready()
.map_err(|_| Error::Failed)
self.tx.poll_ready().map_err(|_| Error::Failed)
}

fn call(&mut self, request: String) -> ResponseFuture {
Expand All @@ -138,13 +136,11 @@ impl Future for ResponseFuture {

fn poll(&mut self) -> Poll<String, Error> {
match self.rx {
Some(ref mut rx) => {
match rx.poll() {
Ok(Async::Ready(v)) => Ok(v.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(Error::Failed),
}
}
Some(ref mut rx) => match rx.poll() {
Ok(Async::Ready(v)) => Ok(v.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(Error::Failed),
},
None => Err(Error::AtCapacity),
}
}
Expand Down
12 changes: 10 additions & 2 deletions tower-balance/examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,21 @@ fn main() {
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(d, DEFAULT_RTT, decay, lb::load::NoInstrument));
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(
d,
DEFAULT_RTT,
decay,
lb::load::NoInstrument,
));
run("P2C+PeakEWMA", pe)
});

let fut = fut.and_then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(lb::load::WithPendingRequests::new(d, lb::load::NoInstrument));
let ll = lb::Balance::p2c(lb::load::WithPendingRequests::new(
d,
lb::load::NoInstrument,
));
run("P2C+LeastLoaded", ll)
});

Expand Down
49 changes: 27 additions & 22 deletions tower-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ extern crate indexmap;
extern crate quickcheck;
extern crate rand;
extern crate tokio_timer;
extern crate tower_direct_service;
extern crate tower_discover;
extern crate tower_service;
extern crate tower_direct_service;

use futures::{Async, Future, Poll};
use indexmap::IndexMap;
use rand::{rngs::SmallRng, SeedableRng};
use std::{fmt, error};
use std::marker::PhantomData;
use std::{error, fmt};
use tower_direct_service::DirectService;
use tower_discover::Discover;
use tower_service::Service;
use tower_direct_service::DirectService;

pub mod choose;
pub mod load;
Expand Down Expand Up @@ -90,10 +90,7 @@ where
/// Initializes a P2C load balancer from the provided randomization source.
///
/// This may be preferable when an application instantiates many balancers.
pub fn p2c_with_rng<R: rand::Rng>(
discover: D,
rng: &mut R,
) -> Result<Self, rand::Error> {
pub fn p2c_with_rng<R: rand::Rng>(discover: D, rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self::new(discover, choose::PowerOfTwoChoices::new(rng)))
}
Expand Down Expand Up @@ -205,15 +202,17 @@ where
// from reordering services in a way that could prevent a service from being polled.
for idx in (0..n).rev() {
let is_ready = {
let (_, svc) = self.not_ready
let (_, svc) = self
.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
poll_ready(svc).map_err(Error::Inner)?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
debug!("not_ready[{:?}]: promoting to ready", idx);
let (key, svc) = self.not_ready
let (key, svc) = self
.not_ready
.swap_remove_index(idx)
.expect("invalid not_ready index");
self.ready.insert(key, svc);
Expand Down Expand Up @@ -241,16 +240,17 @@ where
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => {
match poll_ready(svc) {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
}
}
Some((_, svc)) => match poll_ready(svc) {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
},
}

let (key, svc) = self.ready.swap_remove_index(idx).expect("invalid ready index");
let (key, svc) = self
.ready
.swap_remove_index(idx)
.expect("invalid ready index");
self.not_ready.insert(key, svc);
Some(Ok(Async::NotReady))
}
Expand Down Expand Up @@ -315,7 +315,10 @@ where
FF: Future,
{
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index");
let (_, svc) = self
.ready
.get_index_mut(idx)
.expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);

let rsp = call(svc, request);
Expand Down Expand Up @@ -430,7 +433,6 @@ impl<F: Future, E> Future for ResponseFuture<F, E> {
}
}


// ===== impl Error =====

impl<T, U> fmt::Display for Error<T, U>
Expand All @@ -441,8 +443,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Balance(ref why) =>
write!(f, "load balancing failed: {}", why),
Error::Balance(ref why) => write!(f, "load balancing failed: {}", why),
Error::NotReady => f.pad("not ready"),
}
}
Expand Down Expand Up @@ -491,7 +492,11 @@ mod tests {
type Error = ();

fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
let r = self.0.pop_front().map(Async::Ready).unwrap_or(Async::NotReady);
let r = self
.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
Expand Down
5 changes: 1 addition & 4 deletions tower-balance/src/load/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ pub struct Constant<T, M> {

impl<T, M: Copy> Constant<T, M> {
pub fn new(inner: T, load: M) -> Self {
Self {
inner,
load,
}
Self { inner, load }
}
}

Expand Down
4 changes: 2 additions & 2 deletions tower-balance/src/load/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod instrument;
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;

pub use self::instrument::{Instrument, InstrumentFuture, NoInstrument};
pub use self::constant::Constant;
pub use self::instrument::{Instrument, InstrumentFuture, NoInstrument};
pub use self::peak_ewma::{PeakEwma, WithPeakEwma};
pub use self::pending_requests::{PendingRequests, WithPendingRequests};

Expand Down
40 changes: 28 additions & 12 deletions tower-balance/src/load/peak_ewma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ const NANOS_PER_MILLI: f64 = 1_000_000.0;

// ===== impl PeakEwma =====

impl<D, I> WithPeakEwma<D, I>
{
impl<D, I> WithPeakEwma<D, I> {
/// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
Expand All @@ -86,7 +85,7 @@ impl<D, I> WithPeakEwma<D, I>
where
D: Discover,
D::Service: Service<Request>,
I: Instrument<Handle, <D::Service as Service<Request>>::Response>
I: Instrument<Handle, <D::Service as Service<Request>>::Response>,
{
WithPeakEwma {
discover,
Expand All @@ -111,7 +110,12 @@ where

let change = match try_ready!(self.discover.poll()) {
Insert(k, svc) => {
let s = PeakEwma::new(svc, self.default_rtt, self.decay_ns, self.instrument.clone());
let s = PeakEwma::new(
svc,
self.default_rtt,
self.decay_ns,
self.instrument.clone(),
);
Insert(k, s)
}
Remove(k) => Remove(k),
Expand Down Expand Up @@ -156,7 +160,11 @@ where
}

fn call(&mut self, req: Request) -> Self::Future {
InstrumentFuture::new(self.instrument.clone(), self.handle(), self.service.call(req))
InstrumentFuture::new(
self.instrument.clone(),
self.handle(),
self.service.call(req),
)
}
}

Expand Down Expand Up @@ -285,11 +293,11 @@ mod tests {
extern crate tokio_executor;
extern crate tokio_timer;

use futures::{Future, Poll, future};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use self::tokio_executor::enter;
use self::tokio_timer::clock;
use futures::{future, Future, Poll};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use super::*;

Expand Down Expand Up @@ -324,8 +332,12 @@ mod tests {

let mut enter = enter().expect("enter");
clock::with_default(&clock, &mut enter, |_| {

let svc = PeakEwma::new(Svc, Duration::from_millis(10), NANOS_PER_MILLI * 1_000.0, NoInstrument);
let svc = PeakEwma::new(
Svc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
NoInstrument,
);
let Cost(load) = svc.load();
assert_eq!(load, 10.0 * NANOS_PER_MILLI);

Expand All @@ -348,8 +360,12 @@ mod tests {

let mut enter = enter().expect("enter");
clock::with_default(&clock, &mut enter, |_| {

let mut svc = PeakEwma::new(Svc, Duration::from_millis(20), NANOS_PER_MILLI * 1_000.0, NoInstrument);
let mut svc = PeakEwma::new(
Svc,
Duration::from_millis(20),
NANOS_PER_MILLI * 1_000.0,
NoInstrument,
);
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));

*time.lock().unwrap() += Duration::from_millis(100);
Expand Down

0 comments on commit d7e1b8f

Please sign in to comment.