Skip to content

Commit

Permalink
util: Add then combinator
Browse files Browse the repository at this point in the history
Currently, `ServiceExt` and `ServiceBuilder` provide combinators for
mapping successful responses to other responses, and mapping errors to
other errors, but don't provide a way to map between `Ok` and `Err`
results.

For completeness, this branch adds a new `then` combinator, which takes
a function from `Result` to `Result` and applies it when the service's
future completes. This can be used for recovering from some errors or
for rejecting some `Ok` responses. It can also be used for behaviors
that should be run when a service's future completes regardless of
whether it completed successfully or not.

Depends on #499
  • Loading branch information
hawkw committed Dec 28, 2020
1 parent 1e7283c commit d2a0f29
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 0 deletions.
17 changes: 17 additions & 0 deletions tower/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ impl<L> ServiceBuilder<L> {
self.layer(crate::util::MapErrLayer::new(f))
}

/// Apply a function after the service, regardless of whether the future
/// succeeds or fails.
///
/// This is similar to the [`map_response`] and [`map_err] functions,
/// except that the *same* function is invoked when the service's future
/// completes, whether it completes successfully or fails. This function
/// takes the `Result` returned by the service's future, and returns a
/// `Result`.
///
/// See the documentation for the [`then` combinator] for details.
///
/// [`then` combinator]: crate::util::ServiceExt::then
#[cfg(feature = "util")]
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::ThenLayer<F>, L>> {
self.layer(crate::util::ThenLayer::new(f))
}

/// Obtains the underlying `Layer` implementation.
pub fn into_inner(self) -> L {
self.layer
Expand Down
211 changes: 211 additions & 0 deletions tower/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod oneshot;
mod optional;
mod ready;
mod service_fn;
mod then;
mod try_map_request;

pub use self::{
Expand All @@ -22,6 +23,7 @@ pub use self::{
optional::Optional,
ready::{Ready, ReadyAnd, ReadyOneshot},
service_fn::{service_fn, ServiceFn},
then::{Then, ThenFuture, ThenLayer},
try_map_request::{TryMapRequest, TryMapRequestLayer},
};

Expand Down Expand Up @@ -327,6 +329,215 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
{
TryMapRequest::new(self, f)
}

/// Composes a function after the service, regardless of whether the future
/// succeeds or fails.
///
/// This is similar to the [`map_response`] and [`map_err] combinators,
/// except that the *same* function is invoked when the service's future
/// completes, whether it completes successfully or fails. This function
/// takes the `Result` returned by the service's future, and returns a
/// `Result`.
///
/// Like the standard library's [`Result::and_then`], this method can be
/// used to implement control flow based on `Result` values. For example, it
/// may be used to implement error recovery, by turning some `Err`
/// responses from the service into `Ok` responses. Similarly, some
/// successful responses from the service could be rejected, by returning an
/// `Err` conditionally, depending on the value inside the `Ok`. Finally,
/// this method can also be used to implement behaviors that must run when a
/// service's future completes, regardless of whether it succeeded or failed.
///
/// This method can be used to change the [`Response`] type of the service
/// into a different type. It can also be used to change the [`Error`] type
/// of the service. However, because the `then` function is not applied
/// to the errors returned by the service's [`poll_ready`] method, it must
/// be possible to convert the service's [`Error`] type into the error type
/// returned by the `then` function. This is trivial when the function
/// returns the same error type as the service, but in other cases, it can
/// be useful to use [`BoxError`] to erase differing error types.
///
/// # Examples
///
/// Recovering from certain errors:
///
/// ```
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # struct Record {
/// # pub name: String,
/// # pub age: u16
/// # }
/// # enum DbError {
/// # Parse(std::num::ParseIntError)
/// # NoRecordsFound,
/// # }
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service returning Result<Vec<Record>, DbError>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // If the database returns no records for the query, we just want an empty `Vec`.
/// let mut new_service = service.then(|result| match result {
/// // If the error indicates that no records matched the query, return an empty
/// // `Vec` instead.
/// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
/// // Propagate all other responses (`Ok` and `Err`) unchanged
/// x => x,
/// });
///
/// // Call the new service
/// let id = 13;
/// let name = new_service.call(id).await.unwrap();
/// # };
/// # }
/// ```
///
/// Rejecting some `Ok` responses:
///
/// ```
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # struct Record {
/// # pub name: String,
/// # pub age: u16
/// # }
/// # type DbError = String;
/// # type AppError = String;
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// use tower::BoxError;
///
/// // A service returning Result<Record, DbError>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // If the user is zero years old, return an error.
/// let mut new_service = service.then(|result| {
/// let record = result?;
///
/// if record.age == 0 {
/// // Users must have been born to use our app!
/// let app_error = AppError::from("users cannot be 0 years old!");
///
/// // Box the error to erase its type (as it can be an `AppError`
/// // *or* the inner service's `DbError`).
/// return Err(BoxError::from(app_error));
/// }
///
/// // Otherwise, return the record.
/// Ok(records)
/// });
///
/// // Call the new service
/// let id = 13;
/// let name = new_service.call(id).await.unwrap();
/// # };
/// # }
/// ```
///
/// Performing an action that must be run for both successes and failures:
///
/// ```
/// # use std::convert::TryFrom;
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # impl Service<String> for DatabaseService {
/// # type Response = u8;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: String) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service returning Result<Record, DbError>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Print a message whenever a query completes.
/// let mut new_service = service.then(|result| {
/// println!("query completed; success={}", result.is_ok());
/// result
/// });
///
/// // Call the new service
/// let id = 13;
/// let response = new_service.call(id).await;
/// # };
/// # }
/// ```
///
/// [`Error`]: crate::Service::Error
/// [`poll_ready`]: crate::Service::poll_ready
/// [`BoxError`]: crate::BoxError
fn then<F, Response, Error>(self, f: F) -> Then<Self, F>
where
Self: Sized,
Error: From<Self::Error>,
F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
{
Then::new(self, f)
}
}

impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
72 changes: 72 additions & 0 deletions tower/src/util/then.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use futures_util::FutureExt;
use std::task::{Context, Poll};
use tower_layer::Layer;
use tower_service::Service;

pub use futures_util::future::Map as ThenFuture;

/// Service returned by the [`then`] combinator.
///
/// [`then`]: crate::util::ServiceExt::then
#[derive(Clone, Debug)]
pub struct Then<S, F> {
inner: S,
f: F,
}

/// A [`Layer`] that produces a [`Then`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug, Clone)]
pub struct ThenLayer<F> {
f: F,
}

impl<S, F> Then<S, F> {
/// Creates a new `Then` service.
pub fn new(inner: S, f: F) -> Self {
Then { f, inner }
}
}

impl<S, F, Request, Response, Error> Service<Request> for Then<S, F>
where
S: Service<Request>,
Error: From<S::Error>,
F: FnOnce(Result<S::Response, S::Error>) -> Result<Response, Error> + Clone,
{
type Response = Response;
type Error = Error;
type Future = ThenFuture<S::Future, F>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

#[inline]
fn call(&mut self, request: Request) -> Self::Future {
self.inner.call(request).map(self.f.clone())
}
}

impl<F> ThenLayer<F> {
/// Creates a new [`ThenLayer`] layer.
pub fn new(f: F) -> Self {
ThenLayer { f }
}
}

impl<S, F> Layer<S> for ThenLayer<F>
where
F: Clone,
{
type Service = Then<S, F>;

fn layer(&self, inner: S) -> Self::Service {
Then {
f: self.f.clone(),
inner,
}
}
}

0 comments on commit d2a0f29

Please sign in to comment.