Skip to content

Commit

Permalink
ext: add ServiceExt::oneshot to call the service when it is ready (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Feb 23, 2019
1 parent f423389 commit 0dc8281
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
10 changes: 10 additions & 0 deletions tower-util/src/ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod apply;
mod from_err;
mod map;
mod map_err;
mod oneshot;
mod ready;
mod then;

Expand All @@ -16,6 +17,7 @@ pub use self::apply::Apply;
pub use self::from_err::FromErr;
pub use self::map::Map;
pub use self::map_err::MapErr;
pub use self::oneshot::Oneshot;
pub use self::ready::Ready;
pub use self::then::Then;

Expand Down Expand Up @@ -116,4 +118,12 @@ pub trait ServiceExt<Request>: Service<Request> {
{
MapErr::new(self, f)
}

/// Consume this `Service`, calling with the providing request once it is ready.
fn oneshot(self, req: Request) -> Oneshot<Self, Request>
where
Self: Sized,
{
Oneshot::new(self, req)
}
}
62 changes: 62 additions & 0 deletions tower-util/src/ext/oneshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::mem;

use futures::{Async, Future, Poll};
use tower_service::Service;

/// A `Future` consuming a `Service` and request, waiting until the `Service`
/// is ready, and then calling `Service::call` with the request, and
/// waiting for that `Future`.
pub struct Oneshot<S: Service<Req>, Req> {
state: State<S, Req>,
}

enum State<S: Service<Req>, Req> {
NotReady(S, Req),
Called(S::Future),
Tmp,
}

impl<S, Req> Oneshot<S, Req>
where
S: Service<Req>,
{
pub(super) fn new(svc: S, req: Req) -> Self {
Oneshot {
state: State::NotReady(svc, req),
}
}
}

impl<S, Req> Future for Oneshot<S, Req>
where
S: Service<Req>,
{
type Item = S::Response;
type Error = S::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Tmp) {
State::NotReady(mut svc, req) => match svc.poll_ready()? {
Async::Ready(()) => {
self.state = State::Called(svc.call(req));
}
Async::NotReady => {
self.state = State::NotReady(svc, req);
return Ok(Async::NotReady);
}
},
State::Called(mut fut) => match fut.poll()? {
Async::Ready(res) => {
return Ok(Async::Ready(res));
}
Async::NotReady => {
self.state = State::Called(fut);
return Ok(Async::NotReady);
}
},
State::Tmp => panic!("polled after complete"),
}
}
}
}

0 comments on commit 0dc8281

Please sign in to comment.