diff --git a/.travis.yml b/.travis.yml index 68289e3aab..edb1a91433 100644 --- a/.travis.yml +++ b/.travis.yml @@ -52,6 +52,7 @@ matrix: - cargo build --manifest-path futures-io/Cargo.toml --all-features - cargo build --manifest-path futures-sink/Cargo.toml --all-features - cargo build --manifest-path futures-util/Cargo.toml --all-features + - cargo build --manifest-path futures-test/Cargo.toml --all-features - name: cargo build --all-features (with minimal versions) rust: nightly @@ -81,6 +82,7 @@ matrix: - RUSTDOCFLAGS=-Dwarnings cargo doc --all --exclude futures-preview --exclude futures-executor-preview + --exclude futures-test-preview - cargo doc script: diff --git a/Cargo.toml b/Cargo.toml index fa26b657f8..f02526a563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ members = [ "futures-io", "futures-sink", "futures-util", + "futures-test", ] diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml new file mode 100644 index 0000000000..d27b11c55d --- /dev/null +++ b/futures-test/Cargo.toml @@ -0,0 +1,26 @@ +cargo-features = ["edition"] + +[package] +name = "futures-test-preview" +edition = "2018" +version = "0.3.0-alpha.3" +authors = ["Wim Looman "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://rust-lang-nursery.github.io/futures-rs" +documentation = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test" +description = """ +Common utilities for testing components built off futures-rs. +""" + +[lib] +name = "futures_test" + +[dependencies] +futures-core-preview = { version = "0.3.0-alpha.2", path = "../futures-core", default-features = false } +futures-util-preview = { version = "0.3.0-alpha.2", path = "../futures-util", default-features = false } +futures-executor-preview = { version = "0.3.0-alpha.2", path = "../futures-executor", default-features = false } +pin-utils = { version = "0.1.0-alpha.1", default-features = false } + +[dev-dependencies] +futures-preview = { version = "0.3.0-alpha.2", path = "../futures", default-features = false, features = ["std"] } diff --git a/futures-test/LICENSE-APACHE b/futures-test/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-test/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-test/LICENSE-MIT b/futures-test/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-test/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-test/src/assert.rs b/futures-test/src/assert.rs new file mode 100644 index 0000000000..c02b08bd52 --- /dev/null +++ b/futures-test/src/assert.rs @@ -0,0 +1,125 @@ +use futures_core::stream::Stream; +use std::marker::Unpin; + +#[doc(hidden)] +pub fn assert_is_unpin_stream(_: &mut S) {} + +/// Assert that the next poll to the provided stream will return +/// [`Poll::Pending`](futures_core::task::Poll::Pending). +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_pending { + ($stream:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + let poll = $crate::futures_core_reexport::stream::Stream::poll_next( + stream, cx, + ); + if poll.is_ready() { + panic!("assertion failed: stream is not pending"); + } + }}; +} + +/// Assert that the next poll to the provided stream will return +/// [`Poll::Ready`](futures_core::task::Poll::Ready) with the provided item. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_next { + ($stream:expr, $item:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) { + $crate::futures_core_reexport::task::Poll::Ready(Some(x)) => { + assert_eq!(x, $item); + } + $crate::futures_core_reexport::task::Poll::Ready(None) => { + panic!("assertion failed: expected stream to provide item but stream is at its end"); + } + $crate::futures_core_reexport::task::Poll::Pending => { + panic!("assertion failed: expected stream to provide item but stream wasn't ready"); + } + } + }} +} + +/// Assert that the next poll to the provided stream will return an empty +/// [`Poll::Ready`](futures_core::task::Poll::Ready) signalling the +/// completion of the stream. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_done { + ($stream:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) { + $crate::futures_core_reexport::task::Poll::Ready(Some(_)) => { + panic!("assertion failed: expected stream to be done but had more elements"); + } + $crate::futures_core_reexport::task::Poll::Ready(None) => {} + $crate::futures_core_reexport::task::Poll::Pending => { + panic!("assertion failed: expected stream to be done but was pending"); + } + } + }} +} diff --git a/futures-test/src/future/delay.rs b/futures-test/src/future/delay.rs new file mode 100644 index 0000000000..6b6d5427a8 --- /dev/null +++ b/futures-test/src/future/delay.rs @@ -0,0 +1,45 @@ +use futures_core::future::Future; +use futures_core::task::{self, Poll}; +use std::mem::PinMut; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Combinator that guarantees one [`Poll::Pending`] before polling its inner +/// future. +/// +/// This is created by the [`FutureTestExt::delay`](super::FutureTestExt::delay) +/// method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Delayed { + future: Fut, + polled_before: bool, +} + +impl Delayed { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(polled_before: bool); + + pub(super) fn new(future: Fut) -> Self { + Self { + future, + polled_before: false, + } + } +} + +impl Future for Delayed { + type Output = Fut::Output; + + fn poll( + mut self: PinMut, + cx: &mut task::Context, + ) -> Poll { + if *self.polled_before() { + self.future().poll(cx) + } else { + *self.polled_before() = true; + cx.waker().wake(); + Poll::Pending + } + } +} diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs new file mode 100644 index 0000000000..8f3fb1632f --- /dev/null +++ b/futures-test/src/future/mod.rs @@ -0,0 +1,65 @@ +//! Additional combinators for testing futures. + +mod delay; + +use self::delay::Delayed; +use futures_core::future::Future; +use futures_executor; +use std::thread; + +/// Additional combinators for testing futures. +pub trait FutureTestExt: Future { + /// Introduces one [`Poll::Pending`](futures_core::task::Poll::Pending) + /// before polling the given future + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, futures_api, pin)] + /// use futures::task::Poll; + /// use futures::future::FutureExt; + /// use futures_test::task; + /// use futures_test::future::FutureTestExt; + /// use pin_utils::pin_mut; + /// + /// let future = (async { 5 }).delay(); + /// pin_mut!(future); + /// + /// let cx = &mut task::no_spawn_context(); + /// + /// assert_eq!(future.poll_unpin(cx), Poll::Pending); + /// assert_eq!(future.poll_unpin(cx), Poll::Ready(5)); + /// ``` + fn delay(self) -> Delayed + where + Self: Sized, + { + delay::Delayed::new(self) + } + + /// Runs this future on a dedicated executor running in a background thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, futures_api, pin)] + /// use futures::channel::oneshot; + /// use futures::executor::block_on; + /// use futures_test::future::FutureTestExt; + /// + /// let (tx, rx) = oneshot::channel::(); + /// + /// (async { tx.send(5).unwrap() }).run_in_background(); + /// + /// assert_eq!(block_on(rx), Ok(5)); + /// ``` + fn run_in_background(self) + where + Self: Sized + Send + 'static, + Self::Output: Send, + { + thread::spawn(|| futures_executor::block_on(self)); + } +} + +impl FutureTestExt for Fut where Fut: Future {} diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs new file mode 100644 index 0000000000..fc38aac255 --- /dev/null +++ b/futures-test/src/lib.rs @@ -0,0 +1,28 @@ +//! Utilities to make testing [`Future`s](futures_core::Future) easier + +#![feature( + arbitrary_self_types, + async_await, + await_macro, + futures_api, + pin, +)] +#![warn(missing_docs, missing_debug_implementations)] +#![deny(bare_trait_objects)] +#![doc( + html_root_url = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test" +)] + +#[doc(hidden)] +pub use std as std_reexport; + +#[doc(hidden)] +pub extern crate futures_core as futures_core_reexport; + +#[macro_use] +#[doc(hidden)] +pub mod assert; + +pub mod task; + +pub mod future; diff --git a/futures-test/src/task/context.rs b/futures-test/src/task/context.rs new file mode 100644 index 0000000000..2b73aae9f4 --- /dev/null +++ b/futures-test/src/task/context.rs @@ -0,0 +1,65 @@ +use crate::task::{spawn, wake}; +use futures_core::task::Context; + +/// Create a new [`task::Context`](futures_core::task::Context) where both +/// the [`waker`](futures_core::task::Context::waker) and +/// [`spawner`](futures_core::task::Context::spawner) will panic if used. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(futures_api)] +/// use futures_test::task; +/// +/// let cx = task::panic_context(); +/// cx.waker().wake(); // Will panic +/// ``` +pub fn panic_context() -> Context<'static> { + Context::new(wake::panic_local_waker_ref(), spawn::panic_mut()) +} + +/// Create a new [`task::Context`](futures_core::task::Context) where the +/// [`waker`](futures_core::task::Context::waker) will ignore any calls to +/// `wake` while the [`spawner`](futures_core::task::Context::spawner) will +/// panic if used. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::future::Future; +/// use futures::task::Poll; +/// use futures_test::task::no_spawn_context; +/// use pin_utils::pin_mut; +/// +/// let mut future = async { 5 }; +/// pin_mut!(future); +/// +/// assert_eq!(future.poll(&mut no_spawn_context()), Poll::Ready(5)); +/// ``` +pub fn no_spawn_context() -> Context<'static> { + Context::new(wake::noop_local_waker_ref(), spawn::panic_mut()) +} + +/// Create a new [`task::Context`](futures_core::task::Context) where the +/// [`waker`](futures_core::task::Context::waker) and +/// [`spawner`](futures_core::task::Context::spawner) will both ignore any +/// uses. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::future::Future; +/// use futures::task::Poll; +/// use futures_test::task::noop_context; +/// use pin_utils::pin_mut; +/// +/// let mut future = async { 5 }; +/// pin_mut!(future); +/// +/// assert_eq!(future.poll(&mut noop_context()), Poll::Ready(5)); +/// ``` +pub fn noop_context() -> Context<'static> { + Context::new(wake::noop_local_waker_ref(), spawn::noop_mut()) +} diff --git a/futures-test/src/task/mod.rs b/futures-test/src/task/mod.rs new file mode 100644 index 0000000000..94040dfba1 --- /dev/null +++ b/futures-test/src/task/mod.rs @@ -0,0 +1,19 @@ +//! Task related utilities. +//! +//! In the majority of use cases you can use the functions exported below to +//! create a [`Context`](futures_core::task::Context) appropriate to use in your +//! tests. +//! +//! For more complex test cases you can take a `Context` from one of these +//! functions and then use the +//! [`Context::with_waker`](futures_core::task::Context::with_waker) and +//! [`Context::with_spawner`](futures_core::task::Context::with_spawner) +//! methods to change the implementations used. See the examples on +//! the provided implementations in [`wake`] and +//! [`spawn`] for more details. + +pub mod spawn; +pub mod wake; + +mod context; +pub use self::context::{no_spawn_context, noop_context, panic_context}; diff --git a/futures-test/src/task/spawn/mod.rs b/futures-test/src/task/spawn/mod.rs new file mode 100644 index 0000000000..a40e9788eb --- /dev/null +++ b/futures-test/src/task/spawn/mod.rs @@ -0,0 +1,11 @@ +//! Implementations of [`Spawn`](futures_core::task::Spawn) with various +//! behaviour for test purposes. + +mod noop; +pub use self::noop::{Noop, noop_mut}; + +mod panic; +pub use self::panic::{Panic, panic_mut}; + +mod record; +pub use self::record::Record; diff --git a/futures-test/src/task/spawn/noop.rs b/futures-test/src/task/spawn/noop.rs new file mode 100644 index 0000000000..c4027f33e4 --- /dev/null +++ b/futures-test/src/task/spawn/noop.rs @@ -0,0 +1,55 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; +use std::cell::UnsafeCell; + +/// An implementation of [`Spawn`](futures_core::task::Spawn) that +/// discards spawned futures when used. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{panic_context, spawn}; +/// +/// let mut cx = panic_context(); +/// let mut spawn = spawn::Noop::new(); +/// let cx = &mut cx.with_spawner(&mut spawn); +/// +/// cx.spawner().spawn(async { }); +/// ``` +#[derive(Debug)] +pub struct Noop { + _reserved: (), +} + +impl Noop { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } +} + +impl Spawn for Noop { + fn spawn_obj( + &mut self, + _future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + Ok(()) + } +} + +impl Default for Noop { + fn default() -> Self { + Self::new() + } +} + +/// Get a thread local reference to a singleton instance of [`Noop`]. +pub fn noop_mut() -> &'static mut Noop { + thread_local! { + static INSTANCE: UnsafeCell = + UnsafeCell::new(Noop { _reserved: () }); + } + INSTANCE.with(|i| unsafe { &mut *i.get() }) +} diff --git a/futures-test/src/task/spawn/panic.rs b/futures-test/src/task/spawn/panic.rs new file mode 100644 index 0000000000..34b518bfc8 --- /dev/null +++ b/futures-test/src/task/spawn/panic.rs @@ -0,0 +1,55 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; +use std::cell::UnsafeCell; + +/// An implementation of [`Spawn`](futures_core::task::Spawn) that panics +/// when used. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{noop_context, spawn}; +/// +/// let mut cx = noop_context(); +/// let mut spawn = spawn::Panic::new(); +/// let cx = &mut cx.with_spawner(&mut spawn); +/// +/// cx.spawner().spawn(async { }); // Will panic +/// ``` +#[derive(Debug)] +pub struct Panic { + _reserved: (), +} + +impl Panic { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } +} + +impl Spawn for Panic { + fn spawn_obj( + &mut self, + _future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + panic!("should not spawn") + } +} + +impl Default for Panic { + fn default() -> Self { + Self::new() + } +} + +/// Get a thread local reference to a singleton instance of [`Panic`]. +pub fn panic_mut() -> &'static mut Panic { + thread_local! { + static INSTANCE: UnsafeCell = + UnsafeCell::new(Panic { _reserved: () }); + } + INSTANCE.with(|i| unsafe { &mut *i.get() }) +} diff --git a/futures-test/src/task/spawn/record.rs b/futures-test/src/task/spawn/record.rs new file mode 100644 index 0000000000..0e72998b91 --- /dev/null +++ b/futures-test/src/task/spawn/record.rs @@ -0,0 +1,57 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; + +/// An implementation of [`Spawn`](futures_core::task::Spawn) that records +/// any [`Future`](futures_core::future::Future)s spawned on it. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{panic_context, spawn}; +/// +/// let mut recorder = spawn::Record::new(); +/// +/// { +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_spawner(&mut recorder); +/// cx.spawner().spawn(async { }); +/// } +/// +/// assert_eq!(recorder.spawned().len(), 1); +/// ``` +#[derive(Debug)] +pub struct Record { + spawned: Vec>, +} + +impl Record { + /// Create a new instance + pub fn new() -> Self { + Self { + spawned: Vec::new(), + } + } + + /// Inspect any futures that were spawned onto this [`Spawn`]. + pub fn spawned(&self) -> &[FutureObj<'static, ()>] { + &self.spawned + } +} + +impl Spawn for Record { + fn spawn_obj( + &mut self, + future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + self.spawned.push(future); + Ok(()) + } +} + +impl Default for Record { + fn default() -> Self { + Self::new() + } +} diff --git a/futures-test/src/task/wake/counter.rs b/futures-test/src/task/wake/counter.rs new file mode 100644 index 0000000000..d097e4c3c7 --- /dev/null +++ b/futures-test/src/task/wake/counter.rs @@ -0,0 +1,71 @@ +use futures_core::task::{self, LocalWaker, Wake}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// An implementation of [`Wake`](futures_core::task::Wake) that tracks how many +/// times it has been woken. +/// +/// # Examples +/// +/// ``` +/// #![feature(futures_api)] +/// use futures_test::task::{panic_context, wake}; +/// +/// let wake_counter = wake::Counter::new(); +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_waker(wake_counter.local_waker()); +/// +/// assert_eq!(wake_counter.count(), 0); +/// +/// cx.waker().wake(); +/// cx.waker().wake(); +/// +/// assert_eq!(wake_counter.count(), 2); +/// ``` +#[derive(Debug)] +pub struct Counter { + inner: Arc, + local_waker: LocalWaker, +} + +#[derive(Debug)] +struct Inner { + count: AtomicUsize, +} + +impl Counter { + /// Create a new [`Counter`] + pub fn new() -> Counter { + let inner = Arc::new(Inner { + count: AtomicUsize::new(0), + }); + Counter { + local_waker: task::local_waker_from_nonlocal(inner.clone()), + inner, + } + } + + /// Creates an associated [`LocalWaker`]. Every call to its + /// [`wake`](LocalWaker::wake) and + /// [`wake_local`](LocalWaker::wake) methods increments the counter. + pub fn local_waker(&self) -> &LocalWaker { + &self.local_waker + } + + /// Get the number of times this [`Counter`] has been woken + pub fn count(&self) -> usize { + self.inner.count.load(Ordering::SeqCst) + } +} + +impl Default for Counter { + fn default() -> Self { + Self::new() + } +} + +impl Wake for Inner { + fn wake(arc_self: &Arc) { + arc_self.count.fetch_add(1, Ordering::SeqCst); + } +} diff --git a/futures-test/src/task/wake/mod.rs b/futures-test/src/task/wake/mod.rs new file mode 100644 index 0000000000..0fe1e177b9 --- /dev/null +++ b/futures-test/src/task/wake/mod.rs @@ -0,0 +1,11 @@ +//! Implementations of [`Wake`](futures_core::task::Wake) with various behaviour +//! for test purposes. + +mod counter; +pub use self::counter::Counter; + +mod noop; +pub use self::noop::{noop_local_waker, noop_local_waker_ref, Noop}; + +mod panic; +pub use self::panic::{panic_local_waker, panic_local_waker_ref, Panic}; diff --git a/futures-test/src/task/wake/noop.rs b/futures-test/src/task/wake/noop.rs new file mode 100644 index 0000000000..7db3a989fa --- /dev/null +++ b/futures-test/src/task/wake/noop.rs @@ -0,0 +1,76 @@ +use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker}; +use std::cell::UnsafeCell; +use std::ptr::NonNull; +use std::sync::Arc; + +/// An implementation of [`Wake`](futures_core::task::Wake) that does nothing +/// when woken. +/// +/// # Examples +/// +/// ``` +/// #![feature(futures_api)] +/// use futures_test::task::{panic_context, wake}; +/// +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_waker(wake::noop_local_waker_ref()); +/// +/// cx.waker().wake(); +/// ``` +#[derive(Debug)] +pub struct Noop { + _reserved: (), +} + +impl Noop { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } +} + +impl Default for Noop { + fn default() -> Self { + Self::new() + } +} + +impl Wake for Noop { + fn wake(_arc_self: &Arc) {} +} + +unsafe impl UnsafeWake for Noop { + unsafe fn clone_raw(&self) -> Waker { + noop_waker() + } + + unsafe fn drop_raw(&self) {} + + unsafe fn wake(&self) {} +} + +fn noop_unsafe_wake() -> NonNull { + static mut INSTANCE: Noop = Noop { _reserved: () }; + unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) } +} + +fn noop_waker() -> Waker { + unsafe { Waker::new(noop_unsafe_wake()) } +} + +/// Create a new [`LocalWaker`](futures_core::task::LocalWaker) referencing a +/// singleton instance of [`Noop`]. +pub fn noop_local_waker() -> LocalWaker { + unsafe { LocalWaker::new(noop_unsafe_wake()) } +} + +/// Get a thread local reference to a +/// [`LocalWaker`](futures_core::task::LocalWaker) referencing a singleton +/// instance of [`Noop`]. +pub fn noop_local_waker_ref() -> &'static LocalWaker { + thread_local! { + static LOCAL_WAKER_INSTANCE: UnsafeCell = + UnsafeCell::new(noop_local_waker()); + } + LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() }) +} diff --git a/futures-test/src/task/wake/panic.rs b/futures-test/src/task/wake/panic.rs new file mode 100644 index 0000000000..71d8c64628 --- /dev/null +++ b/futures-test/src/task/wake/panic.rs @@ -0,0 +1,80 @@ +use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker}; +use std::cell::UnsafeCell; +use std::ptr::NonNull; +use std::sync::Arc; + +/// An implementation of [`Wake`](futures_core::task::Wake) that panics when +/// woken. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(futures_api)] +/// use futures_test::task::{noop_context, wake}; +/// +/// let mut cx = noop_context(); +/// let cx = &mut cx.with_waker(wake::panic_local_waker_ref()); +/// +/// cx.waker().wake(); // Will panic +/// ``` +#[derive(Debug)] +pub struct Panic { + _reserved: (), +} + +impl Panic { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } +} + +impl Default for Panic { + fn default() -> Self { + Self::new() + } +} + +impl Wake for Panic { + fn wake(_arc_self: &Arc) { + panic!("should not be woken") + } +} + +unsafe impl UnsafeWake for Panic { + unsafe fn clone_raw(&self) -> Waker { + panic_waker() + } + + unsafe fn drop_raw(&self) {} + + unsafe fn wake(&self) { + panic!("should not be woken") + } +} + +fn panic_unsafe_wake() -> NonNull { + static mut INSTANCE: Panic = Panic { _reserved: () }; + unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) } +} + +fn panic_waker() -> Waker { + unsafe { Waker::new(panic_unsafe_wake()) } +} + +/// Create a new [`LocalWaker`](futures_core::task::LocalWaker) referencing +/// a singleton instance of [`Panic`]. +pub fn panic_local_waker() -> LocalWaker { + unsafe { LocalWaker::new(panic_unsafe_wake()) } +} + +/// Get a thread local reference to a +/// [`LocalWaker`](futures_core::task::LocalWaker) referencing a singleton +/// instance of [`Panic`]. +pub fn panic_local_waker_ref() -> &'static LocalWaker { + thread_local! { + static LOCAL_WAKER_INSTANCE: UnsafeCell = + UnsafeCell::new(panic_local_waker()); + } + LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() }) +} diff --git a/futures-util/src/compat/compat.rs b/futures-util/src/compat/compat.rs index 78f44abdab..8c43c4c6bc 100644 --- a/futures-util/src/compat/compat.rs +++ b/futures-util/src/compat/compat.rs @@ -1,7 +1,7 @@ -/// Converts a futures 0.3 [`TryFuture`][futures_core::future::TryFuture], -/// [`TryStream`][futures_core::stream::TryStream] or -/// [`Sink`][futures_sink::Sink] into a futures 0.1 [`Future`][futures::Future], -/// [`Stream`][futures::Stream] or [`Sink`][futures::Sink] and vice versa. +/// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture), +/// [`TryStream`](futures_core::stream::TryStream) or +/// [`Sink`](futures_sink::Sink) into a futures 0.1 [`Future`](futures::Future), +/// [`Stream`](futures::Stream) or [`Sink`](futures::Sink) and vice versa. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct Compat { diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs index 681e90967c..6066601240 100644 --- a/futures-util/src/compat/executor.rs +++ b/futures-util/src/compat/executor.rs @@ -7,15 +7,15 @@ use futures_core::task as task03; use futures_core::future::FutureObj; /// A future that can run on a futures 0.1 -/// [`Executor`][futures::future::Executor]. +/// [`Executor`](futures::future::Executor). pub type Executor01Future = Compat>, Box>; -/// Extension trait for futures 0.1 [`Executor`][futures::future::Executor]. +/// Extension trait for futures 0.1 [`Executor`](futures::future::Executor). pub trait Executor01CompatExt: Executor01 + Clone + Send + 'static { - /// Converts a futures 0.1 [`Executor`][futures::future::Executor] into a - /// futures 0.3 [`Executor`][futures_core::task::Executor]. + /// Converts a futures 0.1 [`Executor`](futures::future::Executor) into a + /// futures 0.3 [`Executor`](futures_core::task::Executor). fn compat(self) -> Executor01As03 where Self: Sized; } @@ -30,8 +30,8 @@ where Ex: Executor01 + Clone + Send + 'static } } -/// Converts a futures 0.1 [`Executor`][futures::future::Executor] into a -/// futures 0.3 [`Executor`][futures_core::task::Executor]. +/// Converts a futures 0.1 [`Executor`](futures::future::Executor) into a +/// futures 0.3 [`Executor`](futures_core::task::Executor). #[derive(Clone)] pub struct Executor01As03 { executor01: Ex diff --git a/futures-util/src/compat/future01ext.rs b/futures-util/src/compat/future01ext.rs index 170bc59381..aa4e6cad06 100644 --- a/futures-util/src/compat/future01ext.rs +++ b/futures-util/src/compat/future01ext.rs @@ -3,11 +3,11 @@ use futures::Future as Future01; impl Future01CompatExt for Fut {} -/// Extension trait for futures 0.1 [`Future`][futures::Future] +/// Extension trait for futures 0.1 [`Future`](futures::Future) pub trait Future01CompatExt: Future01 { - /// Converts a futures 0.1 [`Future`][futures::Future] + /// Converts a futures 0.1 [`Future`](futures::Future) /// into a futures 0.3 [`Future>`][futures_core::Future]. + /// E>>`](futures_core::Future). fn compat(self) -> Compat where Self: Sized { Compat::new(self, None) } diff --git a/futures-util/src/compat/stream01ext.rs b/futures-util/src/compat/stream01ext.rs index c83bf161c1..73a338ed21 100644 --- a/futures-util/src/compat/stream01ext.rs +++ b/futures-util/src/compat/stream01ext.rs @@ -3,11 +3,11 @@ use futures::Stream as Stream01; impl Stream01CompatExt for St {} -/// Extension trait for futures 0.1 [`Stream`][futures::Stream] +/// Extension trait for futures 0.1 [`Stream`](futures::Stream) pub trait Stream01CompatExt: Stream01 { - /// Converts a futures 0.1 [`Stream`][futures::Stream] + /// Converts a futures 0.1 [`Stream`](futures::Stream) /// into a futures 0.3 [`Stream>`][futures_core::Stream]. + /// E>>`](futures_core::Stream). fn compat(self) -> Compat where Self: Sized { Compat::new(self, None) } diff --git a/futures-util/src/compat/tokio.rs b/futures-util/src/compat/tokio.rs index 4553605fe4..2ff4c92eb0 100644 --- a/futures-util/src/compat/tokio.rs +++ b/futures-util/src/compat/tokio.rs @@ -4,7 +4,7 @@ use futures_core::task::{Spawn, SpawnErrorKind, SpawnObjError}; use tokio_executor::{DefaultExecutor, Executor as TokioExecutor}; /// A spawner that delegates to `tokio`'s -/// [`DefaultExecutor`][tokio_executor::DefaultExecutor], will panic if used in +/// [`DefaultExecutor`](tokio_executor::DefaultExecutor), will panic if used in /// the context of a task that is not running on `tokio`'s executor. /// /// *NOTE* The future of this struct in `futures` is uncertain. It may be diff --git a/futures/Cargo.toml b/futures/Cargo.toml index a9f7b74de6..4f20e19ffe 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -34,6 +34,7 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.3", de [dev-dependencies] pin-utils = "0.1.0-alpha.1" +futures-test-preview = { path = "../futures-test", version = "0.3.0-alpha.3", default-features = false } [features] nightly = ["futures-util-preview/nightly"] diff --git a/futures/tests/abortable.rs b/futures/tests/abortable.rs index fc9368694b..53493058aa 100644 --- a/futures/tests/abortable.rs +++ b/futures/tests/abortable.rs @@ -1,13 +1,10 @@ #![feature(pin, arbitrary_self_types, futures_api)] -use futures::FutureExt; use futures::channel::oneshot; use futures::executor::block_on; -use futures::future::{abortable, Aborted}; +use futures::future::{abortable, Aborted, FutureExt}; use futures::task::Poll; - -mod support; -use self::support::with_counter_waker_context; +use futures_test::task::{panic_context, wake}; #[test] fn abortable_works() { @@ -23,14 +20,15 @@ fn abortable_awakens() { let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); - with_counter_waker_context(|cx, counter| { - assert_eq!(0, counter.get()); - assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx)); - assert_eq!(0, counter.get()); - abort_handle.abort(); - assert_eq!(1, counter.get()); - assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx)); - }) + let wake_counter = wake::Counter::new(); + let mut cx = panic_context(); + let cx = &mut cx.with_waker(wake_counter.local_waker()); + assert_eq!(0, wake_counter.count()); + assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx)); + assert_eq!(0, wake_counter.count()); + abort_handle.abort(); + assert_eq!(1, wake_counter.count()); + assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx)); } #[test] diff --git a/futures/tests/basic_combinators.rs b/futures/tests/basic_combinators.rs index cff77a4ab4..e370a05ad6 100644 --- a/futures/tests/basic_combinators.rs +++ b/futures/tests/basic_combinators.rs @@ -1,11 +1,9 @@ #![feature(pin, arbitrary_self_types, futures_api)] use futures::future::{self, FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; use std::sync::mpsc; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn basic_future_combinators() { let (tx1, rx) = mpsc::channel(); diff --git a/futures/tests/eager_drop.rs b/futures/tests/eager_drop.rs index 2b97b7b2fa..7f9bff7d47 100644 --- a/futures/tests/eager_drop.rs +++ b/futures/tests/eager_drop.rs @@ -3,13 +3,11 @@ use futures::channel::oneshot; use futures::future::{self, Future, FutureExt, TryFutureExt}; use futures::task::{self, Poll}; +use futures_test::future::FutureTestExt; use pin_utils::unsafe_pinned; use std::mem::PinMut; use std::sync::mpsc; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn map_ok() { // The closure given to `map_ok` should have been dropped by the time `map` diff --git a/futures/tests/fuse.rs b/futures/tests/fuse.rs index 67c672e877..1c7d8e5e80 100644 --- a/futures/tests/fuse.rs +++ b/futures/tests/fuse.rs @@ -1,14 +1,12 @@ #![feature(pin, arbitrary_self_types, futures_api)] use futures::future::{self, FutureExt}; - -mod support; +use futures_test::task::panic_context; #[test] fn fuse() { let mut future = future::ready::(2).fuse(); - support::with_panic_waker_context(|cx| { - assert!(future.poll_unpin(cx).is_ready()); - assert!(future.poll_unpin(cx).is_pending()); - }) + let cx = &mut panic_context(); + assert!(future.poll_unpin(cx).is_ready()); + assert!(future.poll_unpin(cx).is_pending()); } diff --git a/futures/tests/futures_ordered.rs b/futures/tests/futures_ordered.rs index a98a32dfe2..8389febbbc 100644 --- a/futures/tests/futures_ordered.rs +++ b/futures/tests/futures_ordered.rs @@ -4,8 +4,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt, FutureObj}; use futures::stream::{StreamExt, futures_ordered, FuturesOrdered}; - -mod support; +use futures_test::task::no_spawn_context; #[test] fn works_1() { @@ -16,9 +15,7 @@ fn works_1() { let mut stream = futures_ordered(vec![a_rx, b_rx, c_rx]); b_tx.send(99).unwrap(); - support::with_noop_waker_context(|cx| { - assert!(stream.poll_next_unpin(cx).is_pending()); - }); + assert!(stream.poll_next_unpin(&mut no_spawn_context()).is_pending()); a_tx.send(33).unwrap(); c_tx.send(33).unwrap(); @@ -41,14 +38,13 @@ fn works_2() { FutureObj::new(Box::new(b_rx.join(c_rx).map(|(a, b)| Ok(a? + b?)))), ]); - support::with_noop_waker_context(|cx| { - a_tx.send(33).unwrap(); - b_tx.send(33).unwrap(); - assert!(stream.poll_next_unpin(cx).is_ready()); - assert!(stream.poll_next_unpin(cx).is_pending()); - c_tx.send(33).unwrap(); - assert!(stream.poll_next_unpin(cx).is_ready()); - }) + let cx = &mut no_spawn_context(); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); + assert!(stream.poll_next_unpin(cx).is_pending()); + c_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); } #[test] @@ -74,7 +70,7 @@ fn queue_never_unblocked() { Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box))) as _, ]); - support::with_noop_waker_context(f)(|cx| { + with_no_spawn_context(|cx| { for _ in 0..10 { assert!(stream.poll_next(cx).unwrap().is_pending()); } diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index de958c94f2..1d3fb89b95 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -5,10 +5,9 @@ use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt, FutureObj}; use futures::stream::{StreamExt, futures_unordered, FuturesUnordered}; use futures::task::Poll; +use futures_test::task::no_spawn_context; use std::boxed::Box; -mod support; - #[test] fn works_1() { let (a_tx, a_rx) = oneshot::channel::(); @@ -40,12 +39,12 @@ fn works_2() { a_tx.send(9).unwrap(); b_tx.send(10).unwrap(); - support::with_noop_waker_context(|cx| { - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9)))); - c_tx.send(20).unwrap(); - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30)))); - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None)); - }) + + let cx = &mut no_spawn_context(); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9)))); + c_tx.send(20).unwrap(); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30)))); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None)); } #[test] diff --git a/futures/tests/oneshot.rs b/futures/tests/oneshot.rs index b74a7c1a58..956680f6da 100644 --- a/futures/tests/oneshot.rs +++ b/futures/tests/oneshot.rs @@ -2,12 +2,10 @@ use futures::channel::oneshot; use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; use std::sync::mpsc; use std::thread; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn oneshot_send1() { let (tx1, rx1) = oneshot::channel::(); diff --git a/futures/tests/support/assert.rs b/futures/tests/support/assert.rs deleted file mode 100644 index d72a6fbebc..0000000000 --- a/futures/tests/support/assert.rs +++ /dev/null @@ -1,38 +0,0 @@ -use futures::stream::Stream; -use futures::task::Poll; -use std::fmt; -use std::mem::PinMut; - -use super::{with_noop_waker_context, with_panic_waker_context}; - -pub fn assert_stream_pending(stream: PinMut) { - with_noop_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(_) => panic!("stream is not pending"), - Poll::Pending => {}, - } - }) -} - -pub fn assert_stream_next(stream: PinMut, item: S::Item) - where S::Item: Eq + fmt::Debug -{ - with_panic_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(Some(x)) => assert_eq!(x, item), - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Pending => panic!("stream wasn't ready"), - } - }) -} - -pub fn assert_stream_done(stream: PinMut) -{ - with_panic_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(Some(_)) => panic!("stream had more elements"), - Poll::Ready(None) => {}, - Poll::Pending => panic!("stream wasn't ready"), - } - }) -} diff --git a/futures/tests/support/counter_waker_context.rs b/futures/tests/support/counter_waker_context.rs deleted file mode 100644 index 1a875acc17..0000000000 --- a/futures/tests/support/counter_waker_context.rs +++ /dev/null @@ -1,33 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - -pub struct CounterWaker(AtomicUsize); - -impl CounterWaker { - pub fn get(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - pub fn set(&self, x: usize) { - self.0.store(x, Ordering::SeqCst) - } -} - -pub fn with_counter_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context, &Arc) -> R -{ - impl Wake for CounterWaker { - fn wake(arc_self: &Arc) { - arc_self.0.fetch_add(1, Ordering::SeqCst); - } - } - - let counter_arc = Arc::new(CounterWaker(AtomicUsize::new(0))); - let counter_waker = unsafe { task::local_waker_ref(&counter_arc) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&counter_waker, exec); - f(cx, &counter_arc) -} diff --git a/futures/tests/support/delayed.rs b/futures/tests/support/delayed.rs deleted file mode 100644 index 82c7ed962e..0000000000 --- a/futures/tests/support/delayed.rs +++ /dev/null @@ -1,35 +0,0 @@ -use futures::future::Future; -use futures::task::{self, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use std::mem::PinMut; - -pub struct Delayed { - future: F, - polled_before: bool -} - -impl Delayed { - unsafe_pinned!(future: F); - unsafe_unpinned!(polled_before: bool); -} - -impl Future for Delayed { - type Output = F::Output; - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll { - if *self.polled_before() { - self.future().poll(cx) - } else { - *self.polled_before() = true; - cx.waker().wake(); - Poll::Pending - } - } -} - -/// Introduces one `Poll::Pending` before polling the given future -pub fn delayed(future: F) -> Delayed - where F: Future, -{ - Delayed { future, polled_before: false } -} diff --git a/futures/tests/support/mod.rs b/futures/tests/support/mod.rs deleted file mode 100644 index 1a501c089e..0000000000 --- a/futures/tests/support/mod.rs +++ /dev/null @@ -1,44 +0,0 @@ -#![allow(dead_code)] - -pub mod assert; - -mod delayed; -pub use self::delayed::{delayed, Delayed}; - -mod run_in_background; -pub use self::run_in_background::RunInBackgroundExt; - -mod counter_waker_context; -pub use self::counter_waker_context::with_counter_waker_context; - -mod noop_waker_context; -pub use self::noop_waker_context::with_noop_waker_context; - -mod panic_executor; - -mod panic_waker_context; -pub use self::panic_waker_context::with_panic_waker_context; - - -// pub fn f_ok(a: i32) -> FutureResult { Ok(a).into_future() } -// pub fn f_err(a: u32) -> FutureResult { Err(a).into_future() } -// pub fn r_ok(a: i32) -> Result { Ok(a) } -// pub fn r_err(a: u32) -> Result { Err(a) } - -// pub fn assert_done(f: F, result: Result) -// where T: Future, -// T::Item: Eq + fmt::Debug, -// T::Error: Eq + fmt::Debug, -// F: FnOnce() -> T, -// { -// assert_eq!(block_on(f()), result); -// } - -// pub fn assert_empty T>(mut f: F) -// where T::Error: Debug -// { -// panic_waker_cx(|cx| { -// assert!(f().poll(cx).unwrap().is_pending()) -// }) -// } - diff --git a/futures/tests/support/noop_waker_context.rs b/futures/tests/support/noop_waker_context.rs deleted file mode 100644 index 44cfed035f..0000000000 --- a/futures/tests/support/noop_waker_context.rs +++ /dev/null @@ -1,19 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; - -pub fn with_noop_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context) -> R -{ - struct NoopWake; - - impl Wake for NoopWake { - fn wake(_: &Arc) {} - } - - let noop_waker = unsafe { task::local_waker(Arc::new(NoopWake)) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&noop_waker, exec); - f(cx) -} diff --git a/futures/tests/support/panic_executor.rs b/futures/tests/support/panic_executor.rs deleted file mode 100644 index 9af821409f..0000000000 --- a/futures/tests/support/panic_executor.rs +++ /dev/null @@ -1,10 +0,0 @@ -use futures::future::FutureObj; -use futures::task::{Spawn, SpawnObjError}; - -pub struct PanicExecutor; - -impl Spawn for PanicExecutor { - fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> { - panic!("should not spawn") - } -} diff --git a/futures/tests/support/panic_waker_context.rs b/futures/tests/support/panic_waker_context.rs deleted file mode 100644 index 134c536499..0000000000 --- a/futures/tests/support/panic_waker_context.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; - -pub fn with_panic_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context) -> R -{ - struct PanicWake; - - impl Wake for PanicWake { - fn wake(_: &Arc) { - panic!("should not be woken"); - } - } - - let panic_waker = unsafe { task::local_waker(Arc::new(PanicWake)) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&panic_waker, exec); - f(cx) -} diff --git a/futures/tests/support/run_in_background.rs b/futures/tests/support/run_in_background.rs deleted file mode 100644 index 16634b1f2e..0000000000 --- a/futures/tests/support/run_in_background.rs +++ /dev/null @@ -1,16 +0,0 @@ -use futures::executor::block_on; -use futures::future::Future; -use std::thread; - -pub trait RunInBackgroundExt { - fn run_in_background(self); -} - -impl RunInBackgroundExt for F - where F: Future + Sized + Send + 'static, - F::Output: Send, -{ - fn run_in_background(self) { - thread::spawn(|| block_on(self)); - } -} diff --git a/futures/tests/unfold.rs b/futures/tests/unfold.rs index d7f15d8799..ed6fb3732a 100644 --- a/futures/tests/unfold.rs +++ b/futures/tests/unfold.rs @@ -2,37 +2,36 @@ use futures::future; use futures::stream; -use pin_utils::pin_mut; -mod support; -use self::support::assert::*; +use futures_test::{ + assert_stream_pending, assert_stream_next, assert_stream_done, +}; +use futures_test::future::FutureTestExt; #[test] fn unfold1() { - let stream = stream::unfold(0, |state| { + let mut stream = stream::unfold(0, |state| { if state <= 2 { - support::delayed(future::ready(Some((state * 2, state + 1)))) + future::ready(Some((state * 2, state + 1))).delay() } else { - support::delayed(future::ready(None)) + future::ready(None).delay() } }); - pin_mut!(stream); - // Creates the future with the closure // Not ready (delayed future) - assert_stream_pending(stream.reborrow()); + assert_stream_pending!(stream); // Future is ready, yields the item - assert_stream_next(stream.reborrow(), 0); + assert_stream_next!(stream, 0); // Repeat - assert_stream_pending(stream.reborrow()); - assert_stream_next(stream.reborrow(), 2); + assert_stream_pending!(stream); + assert_stream_next!(stream, 2); - assert_stream_pending(stream.reborrow()); - assert_stream_next(stream.reborrow(), 4); + assert_stream_pending!(stream); + assert_stream_next!(stream, 4); // No more items - assert_stream_pending(stream.reborrow()); - assert_stream_done(stream.reborrow()); + assert_stream_pending!(stream); + assert_stream_done!(stream); }