Skip to content

Commit

Permalink
Add the DirectService trait
Browse files Browse the repository at this point in the history
This patch adds the `DirectService` trait, and related implementations
over it in `tower_balance` and `tower_buffer`. `DirectService` is
similar to a `Service`, but must be "driven" through calls to
`poll_service` for the futures returned by `call` to make progress.

The motivation behind adding this trait is that many current `Service`
implementations spawn long-running futures when the service is created,
which then drive the work necessary to turn requests into responses. A
simple example of this is a service that writes requests over a
`TcpStream` and reads responses over that same `TcpStream`. The
underlying stream must be read from to discover new responses, but there
is no single entity to drive that task. The returned futures would share
access to the stream (and worse yet, may get responses out of order),
and then service itself is not guaranteed to see any more calls to it as
the client is waiting for its requests to finish.

`DirectService` solves this by introducing a new method, `poll_service`,
which must be called to make progress on in-progress futures.
Furthermore, like `Future::poll`, `poll_service` must be called whenever
the associated task is notified so that the service can also respect
time-based operations like heartbeats.

The PR includes changes to both `tower_balance::Balance` and
`tower_buffer::Buffer` to add support for wrapping `DirectService`s. For
`Balance` this is straightforward: if the inner service is a `Service`,
the `Balance` also implements `Service`; if the inner service is a
`DirectService`, the `Balance` is itself also a `DirectService`. For
`Buffer`, this is more involved, as a `Buffer` turns any `DirectService`
*into* a `Service`. The `Buffer`'s `Worker` is spawned, and will
therefore drive the wrapped `DirectService`.

One complication arises in that `Buffer<T>` requires that `T: Service`,
but you can safely construct a `Buffer` over a `DirectService` per the
above. `Buffer` works around this by exposing

```rust
impl Service for HandleTo<S> where S: DirectService {}
```

And giving out `Buffer<HandleTo<S>>` when the `new_directed(s: S)`
constructor is invoked. Since `Buffer` never calls any methods on the
service it wraps, `HandleTo`'s implementation just consists of calls to
`unreachable!()`.

Note that `tower_buffer` now also includes a `DirectedService` type,
which is a wrapper around a `Service` that implements `DirectService`.
In theory, we could do away with this by adding a blanket impl:

```rust
impl<T> DirectedService for T where T: Service {}
```

but until we have specialization, this would prevent downstream users
from implementing `DirectService` themselves.

Fixes tower-rs#110.
  • Loading branch information
jonhoo committed Nov 5, 2018
1 parent a422ee5 commit d052c7a
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 89 deletions.
198 changes: 156 additions & 42 deletions tower-balance/src/lib.rs
@@ -1,5 +1,3 @@
#![deny(dead_code)]

#[macro_use]
extern crate futures;
#[macro_use]
Expand All @@ -12,20 +10,20 @@ extern crate tokio_timer;
extern crate tower_discover;
extern crate tower_service;

use choose::Choose;
use futures::{Async, Future, Poll};
use indexmap::IndexMap;
use rand::{SeedableRng, rngs::SmallRng};
use load::Load;
use rand::{rngs::SmallRng, SeedableRng};
use std::collections::HashSet;
use std::{fmt, error};
use std::marker::PhantomData;
use tower_discover::Discover;
use tower_service::Service;
use tower_service::{DirectService, Service};

pub mod choose;
pub mod load;

pub use choose::Choose;
pub use load::Load;

/// Balances requests across a set of inner services.
#[derive(Debug)]
pub struct Balance<D: Discover, C> {
Expand All @@ -43,6 +41,10 @@ pub struct Balance<D: Discover, C> {
/// request.
dispatched_ready_index: Option<usize>,

/// Holds keys for services that have outstanding requests (i.e., that need
/// `poll_outstanding`).
pending: HashSet<D::Key>,

/// Holds all possibly-available endpoints (i.e. from `discover`).
ready: IndexMap<D::Key, D::Service>,

Expand Down Expand Up @@ -120,6 +122,7 @@ where
choose,
chosen_ready_index: None,
dispatched_ready_index: None,
pending: HashSet::default(),
ready: IndexMap::default(),
not_ready: IndexMap::default(),
}
Expand Down Expand Up @@ -156,11 +159,7 @@ where
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn update_from_discover<Request>(&mut self)
-> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
D::Service: Service<Request>
{
fn update_from_discover<E>(&mut self) -> Result<(), Error<E, D::Error>> {
debug!("updating from discover");
use tower_discover::Change::*;

Expand All @@ -176,12 +175,14 @@ where
}

Remove(key) => {
self.pending.remove(&key);
let _ejected = match self.ready.remove(&key) {
None => self.not_ready.remove(&key),
Some(s) => Some(s),
};
// XXX is it safe to just drop the Service? Or do we need some sort of
// graceful teardown?
// TODO: poll_close
}
}
}
Expand All @@ -193,10 +194,9 @@ where
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<Request>(&mut self)
-> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
fn promote_to_ready<F, E>(&mut self, mut poll_ready: F) -> Result<(), Error<E, D::Error>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
let n = self.not_ready.len();
if n == 0 {
Expand All @@ -212,7 +212,7 @@ where
let (_, svc) = self.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
svc.poll_ready().map_err(Error::Inner)?.is_ready()
poll_ready(svc).map_err(Error::Inner)?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
Expand All @@ -235,15 +235,18 @@ where
///
/// If the service exists in `ready` and does not poll as ready, it is moved to
/// `not_ready`, potentially altering the order of `ready` and/or `not_ready`.
fn poll_ready_index<Request>(&mut self, idx: usize)
-> Option<Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>>
fn poll_ready_index<F, E>(
&mut self,
idx: usize,
mut poll_ready: F,
) -> Option<Poll<(), Error<E, D::Error>>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => {
match svc.poll_ready() {
match poll_ready(svc) {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
Expand All @@ -259,10 +262,9 @@ where
/// Chooses the next service to which a request will be dispatched.
///
/// Ensures that .
fn choose_and_poll_ready<Request>(&mut self)
-> Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
fn choose_and_poll_ready<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
loop {
let n = self.ready.len();
Expand All @@ -277,12 +279,52 @@ where
};

// XXX Should we handle per-endpoint errors?
if self.poll_ready_index(idx).expect("invalid ready index")?.is_ready() {
if self
.poll_ready_index(idx, &mut poll_ready)
.expect("invalid ready index")?
.is_ready()
{
self.chosen_ready_index = Some(idx);
return Ok(Async::Ready(()));
}
}
}

fn poll_ready_inner<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
// Clear before `ready` is altered.
self.chosen_ready_index = None;

// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx, &mut poll_ready)
.expect("invalid dispatched ready key")?;
}

// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready(&mut poll_ready)?;

// Choose the next service to be used by `call`.
self.choose_and_poll_ready(&mut poll_ready)
}

fn call<Request, F, FF>(&mut self, call: F, request: Request) -> ResponseFuture<FF, D::Error>
where
F: FnOnce(&mut D::Service, Request) -> FF,
FF: Future,
{
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);

let rsp = call(svc, request);
ResponseFuture(rsp, PhantomData)
}
}

impl<D, C, Request> Service<Request> for Balance<D, C>
Expand All @@ -300,31 +342,103 @@ where
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// Clear before `ready` is altered.
self.chosen_ready_index = None;
self.poll_ready_inner(D::Service::poll_ready)
}

// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx).expect("invalid dispatched ready key")?;
}
fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}
}

// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready()?;
impl<D, C, Request> DirectService<Request> for Balance<D, C>
where
D: Discover,
D::Service: DirectService<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as DirectService<Request>>::Response;
type Error = Error<<D::Service as DirectService<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as DirectService<Request>>::Future, D::Error>;

// Choose the next service to be used by `call`.
self.choose_and_poll_ready()
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
}

fn call(&mut self, request: Request) -> Self::Future {
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);
self.call(D::Service::call, request)
}

let rsp = svc.call(request);
ResponseFuture(rsp, PhantomData)
fn poll_service(&mut self) -> Result<Async<()>, Self::Error> {
let ready = &mut self.ready;
let not_ready = &mut self.not_ready;
let mut err = None;
self.pending.retain(|k| {
// NOTE: We here have to decide whether it's more likely for a service to be ready or
// not ready. Choosing incorrectly means we do an extra IndexMap lookup...
if let Some(s) = ready.get_mut(k) {
match s.poll_service() {
Err(e) => {
err = Some(e);
return false;
}
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
}
}

if let Some(s) = not_ready.get_mut(k) {
match s.poll_service() {
Err(e) => {
err = Some(e);
return false;
}
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
}
}

unreachable!("pending service is neither ready nor not ready");
});

if let Some(e) = err {
return Err(Error::Inner(e));
}

if self.pending.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}

fn poll_close(&mut self) -> Result<Async<()>, Self::Error> {
let mut err = None;
self.ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});
self.not_ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});

if let Some(e) = err {
return Err(Error::Inner(e));
}

if self.ready.is_empty() && self.not_ready.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

Expand Down

0 comments on commit d052c7a

Please sign in to comment.