Skip to content

Commit

Permalink
Remove try_poll_next method
Browse files Browse the repository at this point in the history
  • Loading branch information
steffahn committed Oct 13, 2021
1 parent 3601bb7 commit b7f417d
Show file tree
Hide file tree
Showing 18 changed files with 24 additions and 63 deletions.
27 changes: 1 addition & 26 deletions futures-core/src/stream.rs
Expand Up @@ -154,32 +154,14 @@ where
}
}

mod private_try_stream {
use super::Stream;

pub trait Sealed {}

impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
}

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream: Stream + private_try_stream::Sealed {
pub trait TryStream: Stream<Item = Result<Self::Ok, Self::Error>> {
/// The type of successful values yielded by this future
type Ok;

/// The type of failures yielded by this future
type Error;

/// Poll this `TryStream` as if it were a `Stream`.
///
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Stream` trait; in the future it won't be
/// needed.
fn try_poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
}

impl<S, T, E> TryStream for S
Expand All @@ -188,13 +170,6 @@ where
{
type Ok = T;
type Error = E;

fn try_poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
self.poll_next(cx)
}
}

#[cfg(feature = "alloc")]
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/compat/compat03as01.rs
Expand Up @@ -114,7 +114,7 @@ where
type Error = St::Error;

fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
with_context(self, |inner, cx| match inner.poll_next(cx)? {
task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
task03::Poll::Pending => Ok(Async01::NotReady),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/try_future/try_flatten.rs
Expand Up @@ -95,7 +95,7 @@ where
}
},
TryFlattenProj::Second { f } => {
let output = ready!(f.try_poll_next(cx));
let output = ready!(f.poll_next(cx));
if output.is_none() {
self.set(Self::Empty);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/sink/send_all.rs
Expand Up @@ -81,7 +81,7 @@ where

loop {
let this = self.as_mut().project();
match this.stream.try_poll_next(cx)? {
match this.stream.poll_next(cx)? {
Poll::Ready(Some(item)) => ready!(self.as_mut().try_start_send(cx, item))?,
Poll::Ready(None) => {
ready!(Pin::new(this.sink).poll_flush(cx))?;
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/and_then.rs
Expand Up @@ -62,7 +62,7 @@ where
let item = ready!(fut.try_poll(cx));
this.future.set(None);
break Some(item);
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)?) {
this.future.set(Some((this.f)(item)));
} else {
break None;
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/try_stream/into_async_read.rs
@@ -1,4 +1,4 @@
use crate::stream::TryStreamExt;
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
Expand Down Expand Up @@ -69,7 +69,7 @@ where

return Poll::Ready(Ok(len));
}
ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
ReadState::PendingChunk => match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
Expand Down Expand Up @@ -121,7 +121,7 @@ where
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
while let ReadState::PendingChunk = self.state {
match ready!(self.stream.try_poll_next_unpin(cx)) {
match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/into_stream.rs
Expand Up @@ -35,7 +35,7 @@ impl<St: TryStream> Stream for IntoStream<St> {

#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.try_poll_next(cx)
self.project().stream.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
18 changes: 2 additions & 16 deletions futures-util/src/stream/try_stream/mod.rs
Expand Up @@ -829,10 +829,10 @@ pub trait TryStreamExt: TryStream {
///
/// let mut buffered = stream_of_futures.try_buffered(10);
///
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
/// assert!(buffered.poll_next_unpin(cx).is_pending());
///
/// send_two.send(2i32)?;
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
/// assert!(buffered.poll_next_unpin(cx).is_pending());
/// Ok::<_, i32>(buffered)
/// }).await?;
///
Expand Down Expand Up @@ -873,20 +873,6 @@ pub trait TryStreamExt: TryStream {
))
}

// TODO: false positive warning from rustdoc. Verify once #43466 settles
//
/// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
/// stream types.
fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>
where
Self: Unpin,
{
Pin::new(self).try_poll_next(cx)
}

/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/or_else.rs
Expand Up @@ -63,7 +63,7 @@ where
this.future.set(None);
break Some(item);
} else {
match ready!(this.stream.as_mut().try_poll_next(cx)) {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(item)) => break Some(Ok(item)),
Some(Err(e)) => {
this.future.set(Some((this.f)(e)));
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_chunks.rs
Expand Up @@ -48,7 +48,7 @@ impl<St: TryStream> Stream for TryChunks<St> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().try_poll_next(cx)) {
match ready!(this.stream.as_mut().poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_collect.rs
Expand Up @@ -43,7 +43,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
match ready!(this.stream.as_mut().poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
None => break mem::replace(this.items, Default::default()),
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_concat.rs
Expand Up @@ -37,7 +37,7 @@ where
let mut this = self.project();

Poll::Ready(Ok(loop {
if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
if let Some(x) = ready!(this.stream.as_mut().poll_next(cx)?) {
if let Some(a) = this.accum {
a.extend(x)
} else {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_filter.rs
Expand Up @@ -80,7 +80,7 @@ where
break this.pending_item.take().map(Ok);
}
*this.pending_item = None;
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)?) {
this.pending_fut.set(Some((this.f)(&item)));
*this.pending_item = Some(item);
} else {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_filter_map.rs
Expand Up @@ -73,7 +73,7 @@ where
if item.is_some() {
break item.map(Ok);
}
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)?) {
// No item in progress, but the stream is still going
this.pending.set(Some((this.f)(item)));
} else {
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/stream/try_stream/try_flatten.rs
Expand Up @@ -58,12 +58,12 @@ where

Poll::Ready(loop {
if let Some(s) = this.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.try_poll_next(cx)?) {
if let Some(item) = ready!(s.poll_next(cx)?) {
break Some(Ok(item));
} else {
this.next.set(None);
}
} else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)?) {
this.next.set(Some(s));
} else {
break None;
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/stream/try_stream/try_next.rs
@@ -1,4 +1,4 @@
use crate::stream::TryStreamExt;
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, TryStream};
Expand Down Expand Up @@ -29,6 +29,6 @@ impl<St: ?Sized + TryStream + Unpin> Future for TryNext<'_, St> {
type Output = Result<Option<St::Ok>, St::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.try_poll_next_unpin(cx)?.map(Ok)
self.stream.poll_next_unpin(cx)?.map(Ok)
}
}
4 changes: 2 additions & 2 deletions futures-util/src/stream/try_stream/try_skip_while.rs
Expand Up @@ -64,7 +64,7 @@ where
let mut this = self.project();

if *this.done_skipping {
return this.stream.try_poll_next(cx);
return this.stream.poll_next(cx);
}

Poll::Ready(loop {
Expand All @@ -77,7 +77,7 @@ where
*this.done_skipping = true;
break item.map(Ok);
}
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)?) {
this.pending_fut.set(Some((this.f)(&item)));
*this.pending_item = Some(item);
} else {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_take_while.rs
Expand Up @@ -82,7 +82,7 @@ where
*this.done_taking = true;
break None;
}
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)?) {
this.pending_fut.set(Some((this.f)(&item)));
*this.pending_item = Some(item);
} else {
Expand Down

0 comments on commit b7f417d

Please sign in to comment.