Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add {Future,Stream}::wait_until #167

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }

[dev-dependencies]
async-io = "2.3.2"
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = [
"async",
Expand Down
39 changes: 39 additions & 0 deletions src/future/futures_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures_core::Future;

use super::join::tuple::Join2;
use super::race::tuple::Race2;
use super::WaitUntil;

/// An extension trait for the `Future` trait.
pub trait FutureExt: Future {
Expand All @@ -19,6 +20,44 @@ pub trait FutureExt: Future {
where
Self: Future<Output = T> + Sized,
S2: IntoFuture<Output = T>;

/// Delay resolving the future until the given deadline.
///
/// The underlying future will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple futures and streams.
///
/// # Example
///
/// ```
/// # #[cfg(miri)]fn main() {}
/// # #[cfg(not(miri))]
/// # fn main() {
/// use async_io::Timer;
/// use futures_concurrency::prelude::*;
/// use futures_lite::future::block_on;
/// use std::time::{Duration, Instant};
///
/// block_on(async {
/// let now = Instant::now();
/// let duration = Duration::from_millis(100);
///
/// async { "meow" }
/// .wait_until(Timer::after(duration))
/// .await;
///
/// assert!(now.elapsed() >= duration);
/// });
/// # }
/// ```
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
WaitUntil::new(self, deadline.into_future())
}
}

impl<F1> FutureExt for F1
Expand Down
2 changes: 2 additions & 0 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub use join::Join;
pub use race::Race;
pub use race_ok::RaceOk;
pub use try_join::TryJoin;
pub use wait_until::WaitUntil;

/// A growable group of futures which act as a single unit.
#[cfg(feature = "alloc")]
Expand All @@ -86,3 +87,4 @@ pub(crate) mod join;
pub(crate) mod race;
pub(crate) mod race_ok;
pub(crate) mod try_join;
pub(crate) mod wait_until;
61 changes: 61 additions & 0 deletions src/future/wait_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// Suspends a future until the specified deadline.
///
/// This `struct` is created by the [`wait_until`] method on [`FutureExt`]. See its
/// documentation for more.
///
/// [`wait_until`]: crate::future::FutureExt::wait_until
/// [`FutureExt`]: crate::future::FutureExt
#[derive(Debug)]
#[pin_project::pin_project]
#[must_use = "futures do nothing unless polled or .awaited"]
pub struct WaitUntil<F, D> {
#[pin]
future: F,
#[pin]
deadline: D,
state: State,
}

/// The internal state
#[derive(Debug)]
enum State {
Started,
PollFuture,
Completed,
}

impl<F, D> WaitUntil<F, D> {
pub(super) fn new(future: F, deadline: D) -> Self {
Self {
future,
deadline,
state: State::Started,
}
}
}

impl<F: Future, D: Future> Future for WaitUntil<F, D> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.state {
State::Started => {
ready!(this.deadline.as_mut().poll(cx));
*this.state = State::PollFuture;
}
State::PollFuture => {
let value = ready!(this.future.as_mut().poll(cx));
*this.state = State::Completed;
return Poll::Ready(value);
}
State::Completed => panic!("future polled after completing"),
}
}
}
}
2 changes: 2 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use stream_ext::StreamExt;
#[doc(inline)]
#[cfg(feature = "alloc")]
pub use stream_group::StreamGroup;
pub use wait_until::WaitUntil;
pub use zip::Zip;

/// A growable group of streams which act as a single unit.
Expand All @@ -64,4 +65,5 @@ pub(crate) mod chain;
mod into_stream;
pub(crate) mod merge;
mod stream_ext;
pub(crate) mod wait_until;
pub(crate) mod zip;
43 changes: 42 additions & 1 deletion src/stream/stream_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use core::future::IntoFuture;

use crate::stream::{IntoStream, Merge};
use futures_core::Stream;

#[cfg(feature = "alloc")]
use crate::concurrent_stream::FromStream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};
use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};

/// An extension trait for the `Stream` trait.
pub trait StreamExt: Stream {
Expand Down Expand Up @@ -34,6 +36,45 @@ pub trait StreamExt: Stream {
{
FromStream::new(self)
}

/// Delay the yielding of items from the stream until the given deadline.
///
/// The underlying stream will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple streams and futures.
///
/// # Example
/// ```
/// # #[cfg(miri)] fn main() {}
/// # #[cfg(not(miri))]
/// # fn main() {
/// use async_io::Timer;
/// use futures_concurrency::prelude::*;
/// use futures_lite::{future::block_on, stream};
/// use futures_lite::prelude::*;
/// use std::time::{Duration, Instant};
///
/// block_on(async {
/// let now = Instant::now();
/// let duration = Duration::from_millis(100);
///
/// stream::once("meow")
/// .wait_until(Timer::after(duration))
/// .next()
/// .await;
///
/// assert!(now.elapsed() >= duration);
/// });
/// # }
/// ```
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
WaitUntil::new(self, deadline.into_future())
}
}

impl<S1> StreamExt for S1
Expand Down
63 changes: 63 additions & 0 deletions src/stream/wait_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::stream::Stream;
use pin_project::pin_project;

/// Delay execution of a stream once for the specified duration.
///
/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its
/// documentation for more.
///
/// [`wait_until`]: crate::stream::StreamExt::wait_until
/// [`StreamExt`]: crate::stream::StreamExt
#[derive(Debug)]
#[must_use = "streams do nothing unless polled or .awaited"]
#[pin_project]
pub struct WaitUntil<S, D> {
#[pin]
stream: S,
#[pin]
deadline: D,
state: State,
}

#[derive(Debug)]
enum State {
Timer,
Streaming,
}

impl<S, D> WaitUntil<S, D> {
pub(crate) fn new(stream: S, deadline: D) -> Self {
WaitUntil {
stream,
deadline,
state: State::Timer,
}
}
}

impl<S, D> Stream for WaitUntil<S, D>
where
S: Stream,
D: Future,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

match this.state {
State::Timer => match this.deadline.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
*this.state = State::Streaming;
this.stream.poll_next(cx)
}
},
State::Streaming => this.stream.poll_next(cx),
}
}
}