diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index 3daf537af..618bf1b7b 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -19,7 +19,7 @@ pin_project! { struct OrderWrapper { #[pin] data: T, // A future or a future's output - index: usize, + index: isize, } } @@ -95,8 +95,8 @@ where pub struct FuturesOrdered { in_progress_queue: FuturesUnordered>, queued_outputs: BinaryHeap>, - next_incoming_index: usize, - next_outgoing_index: usize, + next_incoming_index: isize, + next_outgoing_index: isize, } impl Unpin for FuturesOrdered {} @@ -160,13 +160,9 @@ impl FuturesOrdered { /// task notifications. This future will be the next future to be returned /// complete. pub fn push_front(&mut self, future: Fut) { - if self.next_outgoing_index == 0 { - self.push_back(future) - } else { - let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; - self.next_outgoing_index -= 1; - self.in_progress_queue.push(wrapped); - } + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); } } diff --git a/futures/tests/stream_futures_ordered.rs b/futures/tests/stream_futures_ordered.rs index 8b85a3365..5a4a3e22e 100644 --- a/futures/tests/stream_futures_ordered.rs +++ b/futures/tests/stream_futures_ordered.rs @@ -146,3 +146,27 @@ fn queue_never_unblocked() { assert!(stream.poll_next_unpin(cx).is_pending()); assert!(stream.poll_next_unpin(cx).is_pending()); } + +#[test] +fn test_push_front_negative() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_front(a_rx); + stream.push_front(b_rx); + stream.push_front(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // These should all be recieved in reverse order + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); +}