Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better way to drive Service than poll_ready #110

Closed
jonhoo opened this issue Oct 16, 2018 · 19 comments
Closed

Better way to drive Service than poll_ready #110

jonhoo opened this issue Oct 16, 2018 · 19 comments

Comments

@jonhoo
Copy link
Collaborator

jonhoo commented Oct 16, 2018

I'm looking to implement tokio-tower::pipeline::Client, and as part of that I need to be able to "drive" the Service implementation forward to send pending requests, publish responses etc. Currently, the only way to do that is to do all the work in poll_ready. While that works just fine, it feels like a bit of an abuse of the API. @carllerche mentioned here that we might want a Service::drive method where we do this kind of heavy-lifting.

@carllerche
Copy link
Member

In theory, I really want something like this as it would reduce the required overhead of using the Tower Service abstraction.

That said, I thought more about it, and it is a bit more complicated...

The "drive" function would have to do two things:

  1. Drive the internal state of the Service
  2. Notify the caller when it is safe to drop the Service (i.e., the socket is closed and all response futures have completed).

The second point also implies that there needs to be some way to tell the service to start shutting down.

So, at the very least, there would need to be two additional functions:

  • Some "drive" function.
  • Some function to signal the service should start shutting down.

(Note that Sink provides APIs to achieve this).

This adds non trivial complexity to using a Service. It would be unfortunate if all users of Service are subject to this additional API.

This implies that there would need to be a separate trait. How would that work?

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 18, 2018

I don't really have an answer, just more observations about why the current situation isn't ideal: if you do work to "drive" the returned future(s) in poll_ready, you now end up in the weird situation where you need to basically alternate between polling the future and calling poll_ready until the former resolves. That's a pretty awkward pattern (maybe you have a nice way to deal with it?). It could be that all tower needs to provide is some nice way to say "drive the Service until this future resolves`?

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 18, 2018

The other advantage to have Service: Future (at least in some cases) is that I could stick a bunch of them in FuturesUnordered or something like that. In my case, I have multiple tokio_tower::pipeline::Client, one to each "shard" of a database. I send a request on some subset of them, and now I need to drive them all until I the reply futures resolve...

@carllerche
Copy link
Member

If you stash them in FuturesUnordered how do you call them?

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 18, 2018

FuturesUnordered::iter_mut, but it's obviously not ideal. FuturesOrdered may be needed to give predictable access to the services.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 18, 2018

Bah, FuturesOrdered also doesn't allow lookup :'(

@carllerche
Copy link
Member

it would be possible to impl a similar structure that does enable lookup though...

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 19, 2018

I actually think modelling Service as a Sink makes a tonne of sense. I don't think it should actually be Sink though, because Sink doesn't have a notion of returning a future from a send (afaict). That gives us both shutdown and polling! We could even then add a .drive() method (similar to .ready()) that keeps polling until there's no more work for the Service to do (i.e., no outstanding requests).

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 23, 2018

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 23, 2018

Some further discussion happened on Gitter. A brief-ish summary:

  • We'd like implementors of Service not to have to always insert an mpsc of some sort and then spawn something onto the current executor.
  • That means there has to be a way to "drive" the Service to make progress on in-flight requests.
  • Service wrappers also need a way to communicate to a Service that there are no more requests coming, and that it should shut down once idle.
  • We'd like to avoid adding complexity to simple Service implementations.

It seems like we need to figure out what methods Service needs, and a contract for when what should be called. We could add an additional extension trait of some sort for "fancy" Service impls, but it seems easier to just provide sensible default fns. I propose:

trait Service {
    /// Called to make room for another request.
    ///
    /// Should call `poll_outstanding` as necessary to finish in-flight requests.
    ///
    /// The default implementation always returns `Ready`, which must be safe because `call`
    /// (per spec) must always be prepared to be called, even if not ready.
    ///
    /// Implementors are encouraged to implement this if `call` can ever fail.
    default fn poll_ready(&mut self) -> Result<Async<()>, Self::Error> {
        Ok(Async::Ready(())
    }

    /// Called to make progress on in-flight requests.
    ///
    /// Should return `Ready` when all outstanding requests have been serviced.
    fn poll_outstanding(&mut self) -> Result<Async<()>, Self::Error>;

    /// Called when there will be no more calls to `call` or `on_ready`.
    ///
    /// This method will be called at most once.
    /// The default implementation does nothing.
    default fn on_shutdown(&mut self) { }

    /// Called when there will be no more calls to `call`.
    ///
    /// The returned `Future` resolves when the shutdown has been completed,
    /// and all outstanding requests have been responded to (i.e., when
    /// `poll_outstanding` next returns `Ready`.
    ///
    /// This will implicitly call `on_shutdown` on the `Service` before polling
    /// outstanding requests.
    fn shutdown(self) -> ShutdownFuture<Self> { ... }

    /// ...
    fn call(&mut self, req: Self::Request) -> Self::Future;

    /// Returns a `Future` that resolves when the given future resolves, and
    /// internally calls [`poll_outstanding`] to drive this `Service` forward.
    ///
    /// Akin to `future::select(fut, self)`.
    fn finish<F>(self, fut: F) -> OutstandingPoller<Self, F> { ... }

    /// Returns a `Future` that resolves the first time `call(req)` completes
    /// after `poll_ready` has returned `Ready`. It produces `Self` and the
    /// `Future` returned from `call`.
    fn enqueue(self, req: Self::Request) -> ReadyCallPoller<Self> { ... }
}

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 23, 2018

Following some further discussion on making the trait a bit more like Sink, and relying more on callers to maintain contract, how about:

trait Service {
    /// Called to make room for another request.
    ///
    /// Should call `poll_outstanding` as necessary to finish in-flight requests.
    ///
    /// The default implementation always returns `Ready`, which must be safe because `call`
    /// (per spec) must always be prepared to be called, even if not ready.
    ///
    /// Implementors are encouraged to implement this if `call` can ever fail.
    default fn poll_ready(&mut self) -> Result<Async<()>, Self::Error> {
        Ok(Async::Ready(())
    }

    /// Called to make progress on in-flight requests.
    ///
    /// Should return `Ready` when all outstanding requests have been serviced.
    fn poll_outstanding(&mut self) -> Result<Async<()>, Self::Error>;

    /// Called after there will be no more calls to `call`.
    ///
    /// `poll_close` should ensure that all in-progress requests resolve, as well
    /// as perform and finish any required service cleanup.
    default fn poll_close(&mut self) -> Result<Async<()>, Self::Error> { self.poll_outstanding() }

    /// ...
    fn call(&mut self, req: Self::Request) -> Self::Future;
}

trait ServiceExt: Service {
    /// Returns a `Future` that resolves when the given future resolves, and
    /// internally calls [`poll_outstanding`] to drive this `Service` forward.
    ///
    /// Akin to `future::select(fut, self)`.
    fn finish<F>(self, fut: F) -> OutstandingPoller<Self, F> { ... }

    /// Returns a `Future` that resolves the first time `call(req)` completes
    /// after `poll_ready` has returned `Ready`. It produces `Self` and the
    /// `Future` returned from `call`.
    fn enqueue(self, req: Self::Request) -> ReadyCallPoller<Self> { ... }
}

@seanmonstar
Copy link
Collaborator

I may be missing something obvious, but I don't understand why there would be Service::poll_outstanding, since call already returns a Future to poll to make progress.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 24, 2018

@seanmonstar the observation is that for some Service implementations, there may be a resource inside the Service that has to be polled for the returned future to make progress. For example imagine the Service wraps a TcpStream that will eventually yield something that resolves the future. The Service has to continue to be polled in some way, otherwise the Future returned from call will never resolve.

@carllerche
Copy link
Member

@seanmonstar
Sometimes, you can't drive the shared service state from each future. In this case, today, you need to add a message passing layer.

Also, there are cases where the service needs to be driven when there are no futures (think h2's ping / pong).

@carllerche
Copy link
Member

@jonhoo For clarity, could you move finish and enqueue to a ServiceExt trait?

I'm also leaning towards not having any default implementations for functions. I think that the main implementors of Service will be middleware and client / server implementors. In those cases, they will want to be sure to add implementations of all functions, if only to proxy to the inner service.

For the simple case, we can defer to service_fn:

let my_service = service_fn(|request| {
    // do something with request
    Ok(response);
});

@carllerche
Copy link
Member

Also, we may want to bikeshed poll_outstanding as a name as the intent is to allow the service to do work even if there are no outstanding responses. For example, HTTP/2.0 needs to be able to respond to Ping frames.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 24, 2018

We could just call it poll. The reason I went for poll_outstanding was to indicate that even if it returns Ready, that does not mean that the Service is "done". It just means that any futures that have been given out have been resolved.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 25, 2018

Much more discussion on Gitter today, culminating in this question (all chat history edited for clarity):

@carllerche: There is also an extra wrinkle… Say you want a “spawned” service so that you don’t have to worry about calling poll_outstanding, poll_close... How do you get that without double spawning if you take T: Service?
@jonhoo: Hmm, yeah, it really does feel like the "inner" Service is different. Basically, Buffer changes the contract of the Service. I almost wonder if we'll tons of things basically relying on a Buffer around whatever Service they operate on, or that they all explicitly add a Buffer "to be safe", and we end up with Buffer<Buffer<Buffer<S: Service>>>, which would be very sad indeed.
It could be that we want a Service trait with just call (and maaaaaybe poll_ready) like today which is what all the middleware operate on and expose, and then a DirectService which has all the poll_*, and which you can wrap with a Buffer if you want it to be spawned. Or you could directly implement Service and spawn internally if you really wanted to.
@carllerche: It might be worth experimenting w/ keeping a separate trait basically and see how it all fits together. Having two traits wasn’t super successful with ReadyService, but maybe this would be different. In this case, T: Service would impl DirectService and you would do DirectService::buffer(…) -> Service.

Note also that we should not have DirectService: Service; quoth @carllerche:

The issue is, if a function takes T: Service, and DirectService: Service, then you could pass a DirectService to that thing and it would be broken.

There's also the worry that we now need to have lots of things be able to take both DirectService and Service:

@carllerche: tokio-tower’s Server will need two separate constructors: one from normal service, one from DirectService (unfortunately)
@jonhoo: I guess what I mean is that most middleware will probably just operate on Service
@carllerche: Most middleware in tower can probably take both
@jonhoo: Probably, though that means they will also themselves implement DirectService, not Service
@carllerche: They will have impls based on the inner type
@jonhoo: So I guess the idea is that people construct a big DirectService hierarchy, and then do ::buffer at the end. Which maybe is what you want?
@carllerche: Yes. Another problem is, middleware constructs will not be able to have a where bound unless there is fn new(T: Service) and fn new(T: DirectService).
@jonhoo: I do think we'll want to be able to provide a Service to something that takes a DirectService. That'd make it a lot more ergonomic
@carllerche: Specialization would make our lives much better

It seems like we're basically agreed that DirectService is the thing to add. With one caveat (quoth @carllerche):

IMO, DirectService should not leak into “main” tower until it is proven out.
Well, it can be in the tower crate, but Service should be considered the main way to do things.

So, we would have:

trait Service<Request> {
    /// Like today's Service::poll_ready.
    /// Should this even be present on `Service`, or just on `DirectService`?
    fn poll_ready(&mut self) -> Result<Async<()>, Self::Error>;

    /// Like today's Service::call
    fn call(&mut self, req: Request) -> Self::Future;
}

trait ServiceExt<Request>: Service<Request> {
    /// Returns a `Future` that resolves the first time `call(req)` completes
    /// after `poll_ready` has returned `Ready`. It produces `Self` and the
    /// `Future` returned from `call`.
    fn enqueue(self, req: Request) -> ReadyCallPoller<Self> { ... }
}

trait DirectService<Request> {
    /// Called to make room for another request.
    ///
    /// Should call `poll_outstanding` as necessary to finish in-flight requests.
    fn poll_ready(&mut self) -> Result<Async<()>, Self::Error>;

    /// Called to make progress on in-flight requests.
    ///
    /// Should return `Ready` when all outstanding requests have been serviced.
    fn poll_outstanding(&mut self) -> Result<Async<()>, Self::Error>;

    /// Called after there will be no more calls to `call`.
    ///
    /// `poll_close` should ensure that all in-progress requests resolve, as well
    /// as perform and finish any required service cleanup.
    default fn poll_close(&mut self) -> Result<Async<()>, Self::Error> { self.poll_outstanding() }

    /// Like today's Service::call, but with the caveat that poll_outstanding must
    /// continue to be called for the returned futures to resolve.
    fn call(&mut self, req: Request) -> Self::Future;
}
  • various type associated types.

We may also want an (internal) struct that wraps a Service and impl DirectService with empty implementations for all fns so that it's easy to write middleware or servers that are able to drive an inner DirectService, and which can also trivially then accept a "regular" Service.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Oct 26, 2018

We'll probably also want to be very clear in the docs for Service and DirectService how they.re different. In particular, DirectService's contract is that returned futures will only complete if poll_outstanding is called. Service is a DirectService that is implicitly driven, so consumers only need to poll the future returned from call.

jonhoo added a commit to jonhoo/tower that referenced this issue Nov 5, 2018
This patch adds the `DirectService` trait, and related implementations
over it in `tower_balance` and `tower_buffer`. `DirectService` is
similar to a `Service`, but must be "driven" through calls to
`poll_service` for the futures returned by `call` to make progress.

The motivation behind adding this trait is that many current `Service`
implementations spawn long-running futures when the service is created,
which then drive the work necessary to turn requests into responses. A
simple example of this is a service that writes requests over a
`TcpStream` and reads responses over that same `TcpStream`. The
underlying stream must be read from to discover new responses, but there
is no single entity to drive that task. The returned futures would share
access to the stream (and worse yet, may get responses out of order),
and then service itself is not guaranteed to see any more calls to it as
the client is waiting for its requests to finish.

`DirectService` solves this by introducing a new method, `poll_service`,
which must be called to make progress on in-progress futures.
Furthermore, like `Future::poll`, `poll_service` must be called whenever
the associated task is notified so that the service can also respect
time-based operations like heartbeats.

The PR includes changes to both `tower_balance::Balance` and
`tower_buffer::Buffer` to add support for wrapping `DirectService`s. For
`Balance` this is straightforward: if the inner service is a `Service`,
the `Balance` also implements `Service`; if the inner service is a
`DirectService`, the `Balance` is itself also a `DirectService`. For
`Buffer`, this is more involved, as a `Buffer` turns any `DirectService`
*into* a `Service`. The `Buffer`'s `Worker` is spawned, and will
therefore drive the wrapped `DirectService`.

One complication arises in that `Buffer<T>` requires that `T: Service`,
but you can safely construct a `Buffer` over a `DirectService` per the
above. `Buffer` works around this by exposing

```rust
impl Service for HandleTo<S> where S: DirectService {}
```

And giving out `Buffer<HandleTo<S>>` when the `new_directed(s: S)`
constructor is invoked. Since `Buffer` never calls any methods on the
service it wraps, `HandleTo`'s implementation just consists of calls to
`unreachable!()`.

Note that `tower_buffer` now also includes a `DirectedService` type,
which is a wrapper around a `Service` that implements `DirectService`.
In theory, we could do away with this by adding a blanket impl:

```rust
impl<T> DirectedService for T where T: Service {}
```

but until we have specialization, this would prevent downstream users
from implementing `DirectService` themselves.

Fixes tower-rs#110.
jonhoo added a commit to jonhoo/tower that referenced this issue Nov 5, 2018
This patch adds the `DirectService` trait, and related implementations
over it in `tower_balance` and `tower_buffer`. `DirectService` is
similar to a `Service`, but must be "driven" through calls to
`poll_service` for the futures returned by `call` to make progress.

The motivation behind adding this trait is that many current `Service`
implementations spawn long-running futures when the service is created,
which then drive the work necessary to turn requests into responses. A
simple example of this is a service that writes requests over a
`TcpStream` and reads responses over that same `TcpStream`. The
underlying stream must be read from to discover new responses, but there
is no single entity to drive that task. The returned futures would share
access to the stream (and worse yet, may get responses out of order),
and then service itself is not guaranteed to see any more calls to it as
the client is waiting for its requests to finish.

`DirectService` solves this by introducing a new method, `poll_service`,
which must be called to make progress on in-progress futures.
Furthermore, like `Future::poll`, `poll_service` must be called whenever
the associated task is notified so that the service can also respect
time-based operations like heartbeats.

The PR includes changes to both `tower_balance::Balance` and
`tower_buffer::Buffer` to add support for wrapping `DirectService`s. For
`Balance` this is straightforward: if the inner service is a `Service`,
the `Balance` also implements `Service`; if the inner service is a
`DirectService`, the `Balance` is itself also a `DirectService`. For
`Buffer`, this is more involved, as a `Buffer` turns any `DirectService`
*into* a `Service`. The `Buffer`'s `Worker` is spawned, and will
therefore drive the wrapped `DirectService`.

One complication arises in that `Buffer<T>` requires that `T: Service`,
but you can safely construct a `Buffer` over a `DirectService` per the
above. `Buffer` works around this by exposing

```rust
impl Service for HandleTo<S> where S: DirectService {}
```

And giving out `Buffer<HandleTo<S>>` when the `new_directed(s: S)`
constructor is invoked. Since `Buffer` never calls any methods on the
service it wraps, `HandleTo`'s implementation just consists of calls to
`unreachable!()`.

Note that `tower_buffer` now also includes a `DirectedService` type,
which is a wrapper around a `Service` that implements `DirectService`.
In theory, we could do away with this by adding a blanket impl:

```rust
impl<T> DirectedService for T where T: Service {}
```

but until we have specialization, this would prevent downstream users
from implementing `DirectService` themselves.

Finally, this also makes `Buffer` use a bounded mpsc channel, which
introduces a new capacity argument to `Buffer::new`.

Fixes tower-rs#110.
@hawkw hawkw mentioned this issue Feb 2, 2022
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants