Skip to content

Commit

Permalink
turn combinators into async trait methods
Browse files Browse the repository at this point in the history
  • Loading branch information
kpp committed Jul 25, 2019
1 parent 0896eeb commit 5191cdb
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -12,6 +12,7 @@ edition = "2018"

[dependencies]
pin-utils = "=0.1.0-alpha.4"
async-trait = "0.1.3"

[dependencies.futures]
version = "=0.3.0-alpha.17"
Expand Down
4 changes: 2 additions & 2 deletions examples/future.rs
Expand Up @@ -6,8 +6,8 @@ use futures::executor;
fn main() {
executor::block_on(async {
let future = ready(Ok::<i32, i32>(1));
let future = and_then(future, |x| ready(Ok::<i32, i32>(x + 3)));
let future = inspect(future, |x| { dbg!(x); });
let future = future.and_then(|x| ready(Ok::<i32, i32>(x + 3)));
let future = future.inspect(|x| { dbg!(x); });
assert_eq!(future.await, Ok(4));
});
}
199 changes: 111 additions & 88 deletions src/future.rs
@@ -1,99 +1,122 @@
use futures::future::Future;
use core::future::Future;
use futures::stream::Stream;

use core::task::{Poll, Context};
use async_trait::async_trait;

pub async fn ready<T>(value: T) -> T {
value
}

pub async fn map<Fut, U, F>(future: Fut, f: F) -> U
where F: FnOnce(Fut::Output) -> U,
Fut: Future,
{
f(future.await)
}
impl<T: ?Sized> FutureExt for T where T: Future {}

pub async fn then<FutA, FutB, F>(future: FutA, f: F) -> FutB::Output
where F: FnOnce(FutA::Output) -> FutB,
FutA: Future,
FutB: Future,
{
let new_future = f(future.await);
new_future.await
}
#[async_trait]
pub trait FutureExt: Future {
async fn map<U, F>(self, f: F) -> U
where F: FnOnce(Self::Output) -> U + Send,
Self: Sized,
{
f(self.await)
}

pub async fn and_then<FutA, FutB, F, T, U, E>(future: FutA, f: F) -> Result<U, E>
where F: FnOnce(T) -> FutB,
FutA: Future<Output = Result<T,E>>,
FutB: Future<Output = Result<U,E>>,
{
match future.await {
Ok(ok) => {
let new_future = f(ok);
new_future.await
},
Err(err) => Err(err),
async fn then<Fut, F>(self, f: F) -> Fut::Output
where F: FnOnce(Self::Output) -> Fut + Send,
Fut: Future + Send,
Self: Sized,
{
let new_future = f(self.await);
new_future.await
}
}

pub async fn or_else<FutA, FutB, F, T, E, U>(future: FutA, f: F) -> Result<T, U>
where F: FnOnce(E) -> FutB,
FutA: Future<Output = Result<T,E>>,
FutB: Future<Output = Result<T,U>>,
{
match future.await {
Ok(ok) => Ok(ok),
Err(err) => {
let new_future = f(err);
new_future.await
},
async fn flatten(self) -> <Self::Output as Future>::Output
where Self::Output: Future + Send,
Self: Sized,
{
let nested_future = self.await;
nested_future.await
}
}

pub async fn map_ok<Fut, F, T, U, E>(future: Fut, f: F) -> Result<U, E>
where F: FnOnce(T) -> U,
Fut: Future<Output = Result<T,E>>,
{
future.await.map(f)
async fn inspect<F>(self, f: F) -> Self::Output
where F: FnOnce(&Self::Output) + Send,
Self: Sized,
{
let future_result = self.await;
f(&future_result);
future_result
}
}

pub async fn map_err<Fut, F, T, E, U>(future: Fut, f: F) -> Result<T, U>
where F: FnOnce(E) -> U,
Fut: Future<Output = Result<T,E>>,
{
future.await.map_err(f)
}
impl<T, E, Fut: ?Sized> TryFutureExt<T, E> for Fut where Fut: Future<Output = Result<T, E>> {}

#[async_trait]
pub trait TryFutureExt<T, E>: Future<Output = Result<T, E>> {

async fn and_then<U, F, FutB>(self, f: F) -> Result<U, E>
where F: FnOnce(T) -> FutB + Send,
FutB: Future<Output = Result<U, E>> + Send,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
match self.await {
Ok(ok) => {
let new_future = f(ok);
new_future.await
},
Err(err) => Err(err),
}
}

pub async fn flatten<FutA, FutB>(future: FutA) -> FutB::Output
where FutA: Future<Output = FutB>,
FutB: Future,
{
let nested_future = future.await;
nested_future.await
}
async fn or_else<U, F, FutB>(self, f: F) -> Result<T, U>
where F: FnOnce(E) -> FutB + Send,
FutB: Future<Output = Result<T, U>> + Send,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
match self.await {
Ok(ok) => Ok(ok),
Err(err) => {
let new_future = f(err);
new_future.await
},
}
}

pub async fn inspect<Fut, F>(future: Fut, f: F) -> Fut::Output
where Fut: Future,
F: FnOnce(&Fut::Output),
{
let future_result = future.await;
f(&future_result);
future_result
}
async fn map_ok<U, F>(self, f: F) -> Result<U, E>
where F: FnOnce(T) -> U + Send,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
self.await.map(f)
}

pub async fn err_into<Fut, T, E, U>(future: Fut) -> Result<T,U>
where Fut: Future<Output = Result<T,E>>,
E: Into<U>,
{
future.await.map_err(Into::into)
}
async fn map_err<U, F>(self, f: F) -> Result<T, U>
where F: FnOnce(E) -> U + Send,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
self.await.map_err(f)
}

pub async fn unwrap_or_else<Fut, T, E, F>(future: Fut, f: F) -> T
where Fut: Future<Output = Result<T,E>>,
F: FnOnce(E) -> T,
{
future.await.unwrap_or_else(f)
async fn err_into<U>(self) -> Result<T, U>
where E: Into<U>,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
self.await.map_err(Into::into)
}

async fn unwrap_or_else<F>(self, f: F) -> T
where F: FnOnce(E) -> T + Send,
Self: Sized,
T: Send + 'async_trait,
E: Send + 'async_trait,
{
self.await.unwrap_or_else(f)
}
}

pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
Expand Down Expand Up @@ -165,7 +188,7 @@ mod tests {
fn test_map() {
executor::block_on(async {
let future = ready(1);
let new_future = map(future, |x| x + 3);
let new_future = future.map(|x| x + 3);
assert_eq!(new_future.await, 4);
});
}
Expand All @@ -174,7 +197,7 @@ mod tests {
fn test_then() {
executor::block_on(async {
let future = ready(1);
let new_future = then(future, |x| ready(x + 3));
let new_future = future.then(|x| ready(x + 3));
assert_eq!(new_future.await, 4);
});
}
Expand All @@ -183,7 +206,7 @@ mod tests {
fn test_and_then_ok() {
executor::block_on(async {
let future = ready(Ok::<i32, i32>(1));
let new_future = and_then(future, |x| ready(Ok::<i32, i32>(x + 3)));
let new_future = future.and_then(|x| ready(Ok::<i32, i32>(x + 3)));
assert_eq!(new_future.await, Ok(4));
});
}
Expand All @@ -192,7 +215,7 @@ mod tests {
fn test_and_then_err() {
executor::block_on(async {
let future = ready(Err::<i32, i32>(1));
let new_future = and_then(future, |x| ready(Ok::<i32, i32>(x + 3)));
let new_future = future.and_then(|x| ready(Ok(x + 3)));
assert_eq!(new_future.await, Err(1));
});
}
Expand All @@ -201,7 +224,7 @@ mod tests {
fn test_or_else() {
executor::block_on(async {
let future = ready(Err::<i32, i32>(1));
let new_future = or_else(future, |x| ready(Err::<i32, i32>(x + 3)));
let new_future = future.or_else(|x| ready(Err(x + 3)));
assert_eq!(new_future.await, Err(4));
});
}
Expand All @@ -210,7 +233,7 @@ mod tests {
fn test_map_ok() {
executor::block_on(async {
let future = ready(Ok::<i32, i32>(1));
let new_future = map_ok(future, |x| x + 3);
let new_future = future.map_ok(|x| x + 3);
assert_eq!(new_future.await, Ok(4));
});
}
Expand All @@ -219,7 +242,7 @@ mod tests {
fn test_map_err() {
executor::block_on(async {
let future = ready(Err::<i32, i32>(1));
let new_future = map_err(future, |x| x + 3);
let new_future = future.map_err(|x| x + 3);
assert_eq!(new_future.await, Err(4));
});
}
Expand All @@ -228,7 +251,7 @@ mod tests {
fn test_flatten() {
executor::block_on(async {
let nested_future = ready(ready(1));
let future = flatten(nested_future);
let future = nested_future.flatten();
assert_eq!(future.await, 1);
});
}
Expand All @@ -237,7 +260,7 @@ mod tests {
fn test_inspect() {
executor::block_on(async {
let future = ready(1);
let new_future = inspect(future, |&x| assert_eq!(x, 1));
let new_future = future.inspect(|&x| assert_eq!(x, 1));
assert_eq!(new_future.await, 1);
});
}
Expand All @@ -246,7 +269,7 @@ mod tests {
fn test_err_into() {
executor::block_on(async {
let future_err_u8 = ready(Err::<(), u8>(1));
let future_err_i32 = err_into::<_, _, _, i32>(future_err_u8);
let future_err_i32 = future_err_u8.err_into();

assert_eq!(future_err_i32.await, Err::<(), i32>(1));
});
Expand All @@ -256,7 +279,7 @@ mod tests {
fn test_unwrap_or_else() {
executor::block_on(async {
let future = ready(Err::<(), &str>("Boom!"));
let new_future = unwrap_or_else(future, |_| ());
let new_future = future.unwrap_or_else(|_| ());
assert_eq!(new_future.await, ());
});
}
Expand Down

0 comments on commit 5191cdb

Please sign in to comment.