Skip to content

Commit

Permalink
Add preliminary versions of variadic join and select
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Jun 30, 2018
1 parent 38379ea commit d67e293
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 6 deletions.
38 changes: 38 additions & 0 deletions futures-util/src/await/join.rs
@@ -0,0 +1,38 @@
//! The `join` macro.

#[macro_export]
macro_rules! join {
($($fut:ident),*) => { {
$(
let mut $fut = $crate::future::maybe_done($fut);
pin_mut!($fut);
)*
loop {
let mut all_done = true;
$(
if let ::core::task::Poll::Pending = poll!($fut.reborrow()) {
all_done = false;
}
)*
if all_done {
break;
} else {
pending!();
}
}

($(
$fut.reborrow().take_output().unwrap(),
)*)
} }
}

async fn a() {}
async fn b() -> usize { 5 }

#[allow(unused)]
async fn test_join_compiles() -> ((), usize) {
let a = a();
let b = b();
join!(a, b)
}
81 changes: 81 additions & 0 deletions futures-util/src/await/mod.rs
@@ -0,0 +1,81 @@
//! Await
//!
//! This module contains a number of functions and combinators for working
//! with `async`/`await` code.

use futures_core::{task, Future, Poll};
use std::marker::Unpin;
use std::mem::PinMut;

#[doc(hidden)]
pub fn assert_unpin<T: Future + Unpin>(_: &T) {}

/// A macro which returns the result of polling a future once within the
/// current `async` context.
///
/// This macro is only usable inside of `async` functions, closures, and blocks.
#[macro_export]
macro_rules! poll {
($x:expr) => {
await!($crate::await::poll($x))
}
}

#[doc(hidden)]
pub fn poll<F: Future + Unpin>(future: F) -> impl Future<Output = Poll<F::Output>> {
PollOnce { future }
}

#[allow(missing_debug_implementations)]
struct PollOnce<F: Future + Unpin> {
future: F,
}

impl<F: Future + Unpin> Future for PollOnce<F> {
type Output = Poll<F::Output>;
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
Poll::Ready(PinMut::new(&mut self.future).poll(cx))
}
}

/// A macro which yields to the event loop once.
/// This is similar to returning `Poll::Pending` from a `Future::poll` implementation.
/// If `pending!` is used, the current task should be scheduled to receive a wakeup
/// when it is ready to make progress.
///
/// This macro is only usable inside of `async` functions, closures, and blocks.
#[macro_export]
macro_rules! pending {
() => {
await!($crate::await::pending_once())
}
}

#[doc(hidden)]
pub fn pending_once() -> impl Future<Output = ()> {
PendingOnce { is_ready: false }
}

#[allow(missing_debug_implementations)]
struct PendingOnce {
is_ready: bool,
}

impl Future for PendingOnce {
type Output = ();
fn poll(mut self: PinMut<Self>, _: &mut task::Context) -> Poll<Self::Output> {
if self.is_ready {
Poll::Ready(())
} else {
self.is_ready = true;
Poll::Pending
}
}
}

// Primary export is a macro
mod join;

// Primary export is a macro
mod select;

52 changes: 52 additions & 0 deletions futures-util/src/await/select.rs
@@ -0,0 +1,52 @@
//! The `select` macro.

#[macro_export]
macro_rules! select {
() => {
compile_error!("The `select!` macro requires at least one branch")
};
($(
$name:ident => $body:expr,
)*) => { {
$(
$crate::await::assert_unpin(&$name);
let mut $name = $crate::future::maybe_done(&mut $name);
let mut $name = ::core::mem::PinMut::new(&mut $name);
)*
loop {
$(
if let ::core::task::Poll::Ready(()) = poll!($name.reborrow()) {
break;
}
)*
pending!();
}
if false {
unreachable!()
}
$(
else if let Some($name) = $name.take_output() {
$body
}
)*
else {
unreachable!()
}
} };
}

async fn num() -> usize { 5 }

#[allow(unused)]
async fn test_select_compiles() -> usize {
let a = num();
let b = num();
pin_mut!(a, b);
select! {
a => {
let x = num();
a + await!(x)
},
b => b + 4,
}
}
70 changes: 70 additions & 0 deletions futures-util/src/future/maybe_done.rs
@@ -0,0 +1,70 @@
//! Definition of the MaybeDone combinator

use core::mem::{self, PinMut};
use core::marker::Unpin;

use futures_core::{Future, Poll};
use futures_core::task;

/// `MaybeDone`, a future that may have completed.
///
/// This is created by the `maybe_done` function.
#[derive(Debug)]
pub enum MaybeDone<F: Future> {
/// A not-yet-completed future
Future(F),
/// The output of the completed future
Done(F::Output),
/// The empty variant after the result of a `maybe_done` has been
/// taken using the `take_output` method.
Gone,
}

// safe because we never generate `PinMut<A::Output>`
impl<F: Future + Unpin> Unpin for MaybeDone<F> {}

/// Creates a new future from a closure.
pub fn maybe_done<F: Future>(f: F) -> MaybeDone<F> {
MaybeDone::Future(f)
}

impl<F: Future> MaybeDone<F> {
/// Attempt to take the output of a `MaybeDone` without driving it
/// towards completion.
pub fn take_output(self: PinMut<Self>) -> Option<F::Output> {
unsafe {
let this = PinMut::get_mut_unchecked(self);
match this {
MaybeDone::Done(_) => {},
MaybeDone::Future(_) | MaybeDone::Gone => return None,
};
if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
Some(output)
} else {
unreachable!()
}
}
}
}

impl<A: Future> Future for MaybeDone<A> {
type Output = ();

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let res = unsafe {
match PinMut::get_mut_unchecked(self.reborrow()) {
MaybeDone::Future(a) => {
if let Poll::Ready(res) = PinMut::new_unchecked(a).poll(cx) {
res
} else {
return Poll::Pending
}
}
MaybeDone::Done(_) => return Poll::Ready(()),
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
};
PinMut::set(self, MaybeDone::Done(res));
Poll::Ready(())
}
}
2 changes: 2 additions & 0 deletions futures-util/src/future/mod.rs
Expand Up @@ -8,9 +8,11 @@ use futures_core::{Future, Stream};
// Primitive futures
mod empty;
mod lazy;
mod maybe_done;
mod poll_fn;
pub use self::empty::{empty, Empty};
pub use self::lazy::{lazy, Lazy};
pub use self::maybe_done::{maybe_done, MaybeDone};
pub use self::poll_fn::{poll_fn, PollFn};

// combinators
Expand Down
14 changes: 8 additions & 6 deletions futures-util/src/lib.rs
@@ -1,10 +1,9 @@
//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
//! and the `AsyncRead` and `AsyncWrite` traits.

#![feature(pin, arbitrary_self_types, futures_api)]

#![no_std]
#![feature(async_await, await_macro, pin, arbitrary_self_types, futures_api)]

#![cfg_attr(not(feature = "std"), no_std)]
#![deny(missing_docs, missing_debug_implementations, warnings)]
#![deny(bare_trait_objects)]

Expand Down Expand Up @@ -32,9 +31,8 @@ macro_rules! if_std {
)*)
}

#[cfg(feature = "std")]
//#[macro_use]
extern crate std;
#[macro_use]
extern crate core;

macro_rules! delegate_sink {
($field:ident) => {
Expand All @@ -57,6 +55,10 @@ macro_rules! delegate_sink {
}
}

// FIXME: currently async/await is only available with std
#[cfg(feature = "std")]
pub mod await;

#[cfg(all(feature = "std", any(test, feature = "bench")))]
pub mod lock;
#[cfg(all(feature = "std", not(any(test, feature = "bench"))))]
Expand Down

0 comments on commit d67e293

Please sign in to comment.