Skip to content

Commit

Permalink
Minor tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Mar 13, 2022
1 parent 1e482ef commit c21306c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
1 change: 0 additions & 1 deletion futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ pub trait TryStreamExt: TryStream {
limit: impl Into<Option<usize>>,
) -> TryFlattenUnordered<Self, I, E>
where
Self: TryStream,
Self::Ok: futures_core::Stream<Item = Result<I, E>>,
E: From<Self::Error>,
Self: Sized,
Expand Down
21 changes: 11 additions & 10 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ delegate_all!(
);

pin_project! {
/// Flattens successful streams from the given stream, bubbling up the errors.
/// Emits either successful streams or single-item streams containing the underlying errors.
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlattenSuccessful<St, I, E>
where
St: TryStream,
Expand Down Expand Up @@ -66,13 +67,13 @@ where
}
}

/// Emits one item immediately, then stream will be terminated.
/// Emits single item immediately, then stream will be terminated.
#[derive(Debug, Clone)]
pub struct One<T>(Option<T>);
pub struct Single<T>(Option<T>);

impl<T> Unpin for One<T> {}
impl<T> Unpin for Single<T> {}

impl<T> Stream for One<T> {
impl<T> Stream for Single<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -84,7 +85,7 @@ impl<T> Stream for One<T> {
}
}

type OneResult<St> = One<
type SingleResult<St> = Single<
Result<<<St as TryStream>::Ok as TryStream>::Ok, <<St as TryStream>::Ok as TryStream>::Error>,
>;

Expand All @@ -96,19 +97,19 @@ where
{
// Item is either an inner stream or a stream containing a single error.
// This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
type Item = Either<St::Ok, OneResult<St>>;
type Item = Either<St::Ok, SingleResult<St>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.project().stream.try_poll_next(cx));

let out = item.map(|res| match res {
// Emit inner stream as is
// Emit successful inner stream as is
Ok(stream) => Either::Left(stream),
// Wrap an error into stream wrapper containing one item
// Wrap an error into a stream containing a single item
err @ Err(_) => {
let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);

Either::Right(One(Some(res)))
Either::Right(Single(Some(res)))
}
});

Expand Down

0 comments on commit c21306c

Please sign in to comment.