Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TryFlattenUnordered: propagate base stream error #2607

Merged
merged 7 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions futures-util/benches/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::executor::block_on;
use futures::future;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use futures_util::FutureExt;
use std::collections::VecDeque;
use std::thread;

Expand All @@ -34,17 +35,9 @@ fn oneshot_streams(b: &mut Bencher) {
}
});

let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
Box::pin(async {
if let Some(next) = vals.next() {
let val = next.await.unwrap();
Some((val, vals))
} else {
None
}
})
})
.flatten_unordered(None);
let mut flatten = stream::iter(rxs)
.map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
.flatten_unordered(None);

block_on(future::poll_fn(move |cx| {
let mut count = 0;
Expand Down
80 changes: 65 additions & 15 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::{
cell::UnsafeCell,
convert::identity,
fmt,
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::atomic::{AtomicU8, Ordering},
Expand All @@ -22,6 +23,10 @@ use futures_task::{waker, ArcWake};

use crate::stream::FuturesUnordered;

/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
/// method.
pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>;

/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.
const NONE: u8 = 0;

Expand Down Expand Up @@ -154,7 +159,7 @@ impl SharedPollState {

/// Resets current state allowing to poll the stream and wake up wakers.
fn reset(&self) -> u8 {
self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
}
}

Expand Down Expand Up @@ -276,10 +281,10 @@ impl<St: Stream + Unpin> Future for PollStreamFut<St> {

pin_project! {
/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
/// method.
#[project = FlattenUnorderedProj]
/// method with ability to specify flow controller.
#[project = FlattenUnorderedWithFlowControllerProj]
#[must_use = "streams do nothing unless polled"]
pub struct FlattenUnordered<St> where St: Stream {
pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
Expand All @@ -289,34 +294,40 @@ pin_project! {
is_stream_done: bool,
inner_streams_waker: Arc<WrappedWaker>,
stream_waker: Arc<WrappedWaker>,
flow_controller: PhantomData<Fc>
}
}

impl<St> fmt::Debug for FlattenUnordered<St>
impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + fmt::Debug,
St::Item: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlattenUnordered")
f.debug_struct("FlattenUnorderedWithFlowController")
.field("poll_state", &self.poll_state)
.field("inner_streams", &self.inner_streams)
.field("limit", &self.limit)
.field("stream", &self.stream)
.field("is_stream_done", &self.is_stream_done)
.field("flow_controller", &self.flow_controller)
.finish()
}
}

impl<St> FlattenUnordered<St>
impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
pub(crate) fn new(
stream: St,
limit: Option<usize>,
) -> FlattenUnorderedWithFlowController<St, Fc> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);

FlattenUnordered {
FlattenUnorderedWithFlowController {
inner_streams: FuturesUnordered::new(),
stream,
is_stream_done: false,
Expand All @@ -332,13 +343,35 @@ where
need_to_poll: NEED_TO_POLL_STREAM,
}),
poll_state,
flow_controller: PhantomData,
}
}

delegate_access_inner!(stream, St, ());
}

impl<St> FlattenUnorderedProj<'_, St>
/// Returns the next flow step based on the received item.
pub trait FlowController<I, O> {
/// Handles an item producing `FlowStep` describing the next flow step.
fn next_step(item: I) -> FlowStep<I, O>;
}

impl<I, O> FlowController<I, O> for () {
fn next_step(item: I) -> FlowStep<I, O> {
FlowStep::Continue(item)
}
}

/// Describes the next flow step.
#[derive(Debug, Clone)]
pub enum FlowStep<C, R> {
/// Just yields an item and continues standard flow.
Continue(C),
/// Immediately returns an underlying item from the function.
Return(R),
}

impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>
where
St: Stream,
{
Expand All @@ -348,19 +381,21 @@ where
}
}

impl<St> FusedStream for FlattenUnordered<St>
impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>
where
St: FusedStream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.inner_streams.is_empty()
}
}

impl<St> Stream for FlattenUnordered<St>
impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
type Item = <St::Item as Stream>::Item;
Expand Down Expand Up @@ -405,8 +440,23 @@ where
let mut cx = Context::from_waker(stream_waker.as_ref().unwrap());

match this.stream.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(inner_stream)) => {
let next_item_fut = PollStreamFut::new(inner_stream);
Poll::Ready(Some(item)) => {
let next_item_fut = match Fc::next_step(item) {
// Propagates an item immediately (the main use-case is for errors)
FlowStep::Return(item) => {
need_to_poll_next |= NEED_TO_POLL_STREAM
| (poll_state_value & NEED_TO_POLL_INNER_STREAMS);
poll_state_value &= !NEED_TO_POLL_INNER_STREAMS;

next_item = Some(item);

break;
}
// Yields an item and continues processing (normal case)
FlowStep::Continue(inner_stream) => {
PollStreamFut::new(inner_stream)
}
};
// Add new stream to the inner streams bucket
this.inner_streams.as_mut().push(next_item_fut);
// Inner streams must be polled afterward
Expand Down Expand Up @@ -478,7 +528,7 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, Item> Sink<Item> for FlattenUnordered<St>
impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + Sink<Item>,
{
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub use self::buffered::Buffered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
mod flatten_unordered;
pub(crate) mod flatten_unordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
3 changes: 2 additions & 1 deletion futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ pub trait TryStreamExt: TryStream {
}

/// Flattens a stream of streams into just one continuous stream. Produced streams
/// will be polled concurrently and any errors are passed through without looking at them.
/// will be polled concurrently and any errors will be passed through without looking at them.
/// If the underlying base stream returns an error, it will be **immediately** propagated.
///
/// The only argument is an optional limit on the number of concurrently
/// polled streams. If this limit is not `None`, no more than `limit` streams
Expand Down
Loading