Skip to content

Commit

Permalink
Minor improvements to future::join!'s implementation
Browse files Browse the repository at this point in the history
This is a follow-up from #91645, regarding [some remarks I made](https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async-foundations/topic/join!/near/264293660).

Mainly:
  - it hides the recursive munching through a private `macro`, to avoid leaking such details (a corollary is getting rid of the need to use `@` to disambiguate);
  - it uses a `match` binding, _outside_ the `async move` block, to better match the semantics from function-like syntax;
  - it pre-pins the future before calling into `poll_fn`, since `poll_fn`, alone, cannot guarantee that its capture does not move;
  - it uses `.ready()?` since it's such a neat pattern;
  - it renames `Took` to `Taken` for consistency with `Done`.
  • Loading branch information
danielhenrymantilla committed Dec 9, 2021
1 parent 0b42dea commit e936071
Showing 1 changed file with 97 additions and 53 deletions.
150 changes: 97 additions & 53 deletions library/core/src/future/join.rs
Expand Up @@ -45,59 +45,102 @@ use crate::task::{Context, Poll};
/// # };
/// ```
#[unstable(feature = "future_join", issue = "91642")]
pub macro join {
( $($fut:expr),* $(,)?) => {
join! { @count: (), @futures: {}, @rest: ($($fut,)*) }
},
// Recurse until we have the position of each future in the tuple
pub macro join( $($fut:expr),+ $(,)? ) {
// Funnel through an internal macro not to leak implementation details.
join_internal! {
current_position[]
futures_and_positions[]
munching[ $($fut)+ ]
}
}

/// To be able to *name* the i-th future in the tuple (say we want the .4-th),
/// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
/// In order to do that, we need to generate a `i`-long repetition of `_`,
/// for each i-th fut. Hence the recursive muncher approach.
macro join_internal {
// Recursion step: map each future with its "position" (underscore count).
(
// A token for each future that has been expanded: "_ _ _"
@count: ($($count:tt)*),
// Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
@futures: { $($fut:tt)* },
// Take a future from @rest to expand
@rest: ($current:expr, $($rest:tt)*)
) => {
join! {
@count: ($($count)* _),
@futures: { $($fut)* $current => ($($count)*), },
@rest: ($($rest)*)
// Accumulate a token for each future that has been expanded: "_ _ _".
current_position[
$($underscores:tt)*
]
// Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`.
futures_and_positions[
$($acc:tt)*
]
// Munch one future.
munching[
$current:tt
$($rest:tt)*
]
) => (
join_internal! {
current_position[
$($underscores)*
_
]
futures_and_positions[
$($acc)*
$current ( $($underscores)* )
]
munching[
$($rest)*
]
}
},
// Now generate the output future
(
@count: ($($count:tt)*),
@futures: {
$( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
},
@rest: ()
) => {
async move {
let mut futures = ( $( MaybeDone::Future($fut), )* );
),

// End of recursion: generate the output future.
(
current_position $_:tt
futures_and_positions[
$(
$fut_expr:tt ( $($pos:tt)* )
)*
]
// Nothing left to munch.
munching[]
) => (
match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
let mut futures = futures;
// SAFETY: this is `pin_mut!`.
let futures = unsafe { Pin::new_unchecked(&mut futures) };
poll_fn(move |cx| {
let mut done = true;

// For each `fut`, pin-project to it, and poll it.
$(
let ( $($pos,)* fut, .. ) = &mut futures;

// SAFETY: The futures are never moved
done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() };
// SAFETY: pinning projection
let fut = unsafe {
futures.as_mut().map_unchecked_mut(|it| {
let ( $($pos,)* fut, .. ) = it;
fut
})
};
// Despite how tempting it may be to `let () = fut.poll(cx).ready()?;`
// doing so would defeat the point of `join!`: to start polling eagerly all
// of the futures, to allow parallelizing the waits.
done &= fut.poll(cx).is_ready();
)*

if done {
// Extract all the outputs
Poll::Ready(($({
let ( $($pos,)* fut, .. ) = &mut futures;

fut.take_output().unwrap()
}),*))
} else {
Poll::Pending
if !done {
return Poll::Pending;
}
// All ready; time to extract all the outputs.

// SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
let futures = unsafe {
futures.as_mut().get_unchecked_mut()
};
Poll::Ready(
($(
{
let ( $($pos,)* fut, .. ) = &mut *futures;
fut.take_output().unwrap()
}
),*) // <- no trailing comma since we don't want 1-tuples.
)
}).await
}
}
}}
),
}

/// Future used by `join!` that stores it's output to
Expand All @@ -109,14 +152,14 @@ pub macro join {
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Took,
Taken,
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match &*self {
MaybeDone::Done(_) => match mem::replace(self, Self::Took) {
match *self {
MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
Expand All @@ -132,13 +175,14 @@ impl<F: Future> Future for MaybeDone<F> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: pinning in structural for `f`
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) {
Poll::Ready(val) => self.set(Self::Done(val)),
Poll::Pending => return Poll::Pending,
},
// Do not mix match ergonomics with unsafe.
match *self.as_mut().get_unchecked_mut() {
MaybeDone::Future(ref mut f) => {
let val = Pin::new_unchecked(f).poll(cx).ready()?;
self.set(Self::Done(val));
}
MaybeDone::Done(_) => {}
MaybeDone::Took => unreachable!(),
MaybeDone::Taken => unreachable!(),
}
}

Expand Down

0 comments on commit e936071

Please sign in to comment.