Skip to content

Commit

Permalink
Fix 1.45 backward compatibility + feature gate
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Mar 13, 2022
1 parent 49f1b08 commit 969a247
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 34 deletions.
6 changes: 3 additions & 3 deletions futures-util/src/future/try_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}

type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
Expand Down Expand Up @@ -57,9 +60,6 @@ where
})
}

type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;

impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
where
A: TryFuture,
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};
mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFlattenUnordered, TryNext,
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile,
TryStreamExt, TryTakeWhile, TryUnfold,
};

#[cfg(feature = "io")]
Expand All @@ -64,7 +64,7 @@ pub use self::try_stream::IntoAsyncRead;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered};
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered};

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
Expand Down
11 changes: 6 additions & 5 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,13 +679,14 @@ pub trait TryStreamExt: TryStream {
/// # });
/// ```
#[cfg(feature = "alloc")]
fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
fn try_flatten_unordered<I, E>(
self,
limit: impl Into<Option<usize>>,
) -> TryFlattenUnordered<Self, I, E>
where
Self: TryStream,
Self::Ok: TryStream
// Needed because either way compiler can't infer types properly...
+ Stream<Item = Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>>,
<Self::Ok as TryStream>::Error: From<Self::Error>,
Self::Ok: Stream<Item = Result<I, E>>,
E: From<Self::Error>,
Self: Sized,
{
assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
Expand Down
54 changes: 31 additions & 23 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,52 @@ use crate::StreamExt;

delegate_all!(
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
TryFlattenUnordered<St>(
FlattenUnordered<TryFlattenSuccessful<St>>
TryFlattenUnordered<St, I, E>(
FlattenUnordered<TryFlattenSuccessful<St, I, E>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ New[
|stream: St, limit: impl Into<Option<usize>>|
TryFlattenSuccessful::new(stream).flatten_unordered(limit)
]
where
St: TryStream,
St::Ok: TryStream,
<St::Ok as TryStream>::Error: From<St::Error>,
// Needed because either way compiler can't infer types properly...
St::Ok: Stream<Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>>
St::Ok: Stream<Item = Result<I, E>>,
E: From<St::Error>
);

pin_project! {
/// Flattens successful streams from the given stream, bubbling up the errors.
/// This's a wrapper for `TryFlattenUnordered` to reuse `FlattenUnordered` logic over `TryStream`.
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
#[derive(Debug)]
pub struct TryFlattenSuccessful<St> {
#[pin]
stream: St,
}
pub struct TryFlattenSuccessful<St, I, E>
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
E: From<St::Error>
{
#[pin]
stream: St,
}
}

impl<St> TryFlattenSuccessful<St> {
impl<St, I, E> TryFlattenSuccessful<St, I, E>
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
E: From<St::Error>,
{
fn new(stream: St) -> Self {
Self { stream }
}

delegate_access_inner!(stream, St, ());
}

impl<St> FusedStream for TryFlattenSuccessful<St>
impl<St, I, E> FusedStream for TryFlattenSuccessful<St, I, E>
where
St: TryStream + FusedStream,
St::Ok: TryStream,
<St::Ok as TryStream>::Error: From<St::Error>,
St::Ok: Stream<Item = Result<I, E>>,
E: From<St::Error>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
Expand Down Expand Up @@ -80,11 +88,11 @@ type OneResult<St> = One<
Result<<<St as TryStream>::Ok as TryStream>::Ok, <<St as TryStream>::Ok as TryStream>::Error>,
>;

impl<St> Stream for TryFlattenSuccessful<St>
impl<St, I, E> Stream for TryFlattenSuccessful<St, I, E>
where
St: TryStream,
St::Ok: TryStream,
<St::Ok as TryStream>::Error: From<St::Error>,
St::Ok: Stream<Item = Result<I, E>>,
E: From<St::Error>,
{
// Item is either an inner stream or a stream containing a single error.
// This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
Expand All @@ -110,13 +118,13 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for TryFlattenSuccessful<S>
impl<St, I, E, Item> Sink<Item> for TryFlattenSuccessful<St, I, E>
where
S: TryStream + Sink<Item>,
S::Ok: TryStream,
<S::Ok as TryStream>::Error: From<<S as TryStream>::Error>,
St: TryStream + Sink<Item>,
St::Ok: Stream<Item = Result<I, E>>,
E: From<<St as TryStream>::Error>,
{
type Error = <S as Sink<Item>>::Error;
type Error = <St as Sink<Item>>::Error;

delegate_sink!(stream, Item);
}

0 comments on commit 969a247

Please sign in to comment.