diff --git a/src/stream.rs b/src/stream.rs index 2def61c..9fa7bb6 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -793,27 +793,27 @@ where F: FnMut(T) -> Fut, Fut: Future>, { - enum State { - Paused(T), - Running(Pin>), - } - let mut state = Some(State::Paused(init)); + let mut state = Some(init); + let mut running = Box::pin(None); + crate::stream::poll_fn(move |context| -> Poll> { - let mut future = match state.take() { - Some(State::Running(fut)) => fut, - Some(State::Paused(st)) => Box::pin(f(st)), - None => panic!("this stream must not be polled any more"), - }; - match future.as_mut().poll(context) { - Poll::Pending => { - state = Some(State::Running(future)); - Poll::Pending - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some((item, new_state))) => { - state = Some(State::Paused(new_state)); + if let Some(state) = state.take() { + let fut = f(state); + running.set(Some(fut)); + } + + let fut = Option::as_pin_mut(running.as_mut()).expect("this stream must not be polled any more"); + let step = futures_core::ready!(fut.poll(context)); + + match step { + None => { + Poll::Ready(None) + }, + Some((item, new_state)) => { + state = Some(new_state); + running.set(None); Poll::Ready(Some(item)) - } + }, } }) }