diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f86d9ed97f0..6aea2b15cb3 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -527,9 +527,9 @@ impl TimerEntry { unsafe { self.driver().clear_entry(NonNull::from(self.inner())) }; } - pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { + pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time; - unsafe { self.as_mut().get_unchecked_mut() }.registered = true; + unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister; let tick = self.driver().time_source().deadline_to_tick(new_time); @@ -537,9 +537,11 @@ impl TimerEntry { return; } - unsafe { - self.driver() - .reregister(&self.driver.driver().io, tick, self.inner().into()); + if reregister { + unsafe { + self.driver() + .reregister(&self.driver.driver().io, tick, self.inner().into()); + } } } @@ -553,7 +555,7 @@ impl TimerEntry { if !self.registered { let deadline = self.deadline; - self.as_mut().reset(deadline); + self.as_mut().reset(deadline, true); } let this = unsafe { self.get_unchecked_mut() }; diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 2468a1ae67b..155d99a348f 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -164,7 +164,7 @@ fn reset_future() { .as_mut() .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - entry.as_mut().reset(start + Duration::from_secs(2)); + entry.as_mut().reset(start + Duration::from_secs(2), true); // shouldn't complete before 2s block_on(futures::future::poll_fn(|cx| { diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index d933164daad..cc17c5a2627 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -482,7 +482,10 @@ impl Interval { timeout + self.period }; - self.delay.as_mut().reset(next); + // When we arrive here, the internal delay returned `Poll::Ready`. + // Reset the delay but do not register it. It should be registered with + // the next call to [`poll_tick`]. + self.delay.as_mut().reset_without_reregister(next); // Return the time when we were scheduled to tick Poll::Ready(timeout) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 370a98b9902..ccf53ef8ca2 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -353,9 +353,22 @@ impl Sleep { self.reset_inner(deadline) } + /// Resets the `Sleep` instance to a new deadline without reregistering it + /// to be woken up. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state and + /// without having it registered. This is required in e.g. the + /// [crate::time::Interval] where we want to reset the internal [Sleep] + /// without having it wake up the last task that polled it. + pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset(deadline, false); + } + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { let mut me = self.project(); - me.entry.as_mut().reset(deadline); + me.entry.as_mut().reset(deadline, true); #[cfg(all(tokio_unstable, feature = "tracing"))] { diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 186582e2e52..42285364a1a 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -1,10 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, Duration, Instant, MissedTickBehavior}; -use tokio_test::{assert_pending, assert_ready_eq, task}; +use std::pin::Pin; +use std::task::{Context, Poll}; -use std::task::Poll; +use futures::{Stream, StreamExt}; +use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior}; +use tokio_test::{assert_pending, assert_ready_eq, task}; // Takes the `Interval` task, `start` variable, and optional time deltas // For each time delta, it polls the `Interval` and asserts that the result is @@ -209,3 +211,113 @@ fn poll_next(interval: &mut task::Spawn) -> Poll { fn ms(n: u64) -> Duration { Duration::from_millis(n) } + +/// Helper struct to test the [tokio::time::Interval::poll_tick()] method. +/// +/// `poll_tick()` should register the waker in the context only if it returns +/// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an +/// interval timer and counts up on every tick when used as stream. When the +/// counter is a multiple of four, it yields the current counter value. +/// Depending on the value for `wake_on_pending`, it will reschedule itself when +/// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`, +/// we expect that the stream stalls because the timer will **not** reschedule +/// the next wake-up itself once it returned `Poll::Ready`. +struct IntervalStreamer { + counter: u32, + timer: Interval, + wake_on_pending: bool, +} + +impl Stream for IntervalStreamer { + type Item = u32; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + if this.counter > 12 { + return Poll::Ready(None); + } + + match this.timer.poll_tick(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => { + this.counter += 1; + if this.counter % 4 == 0 { + Poll::Ready(Some(this.counter)) + } else { + if this.wake_on_pending { + // Schedule this task for wake-up + cx.waker().wake_by_ref(); + } + Poll::Pending + } + } + } + } +} + +#[tokio::test(start_paused = true)] +async fn stream_with_interval_poll_tick_self_waking() { + let stream = IntervalStreamer { + counter: 0, + timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), + wake_on_pending: true, + }; + + let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); + + // Wrap task in timeout so that it will finish eventually even if the stream + // stalls. + tokio::spawn(tokio::time::timeout( + tokio::time::Duration::from_millis(150), + async move { + tokio::pin!(stream); + + while let Some(item) = stream.next().await { + res_tx.send(item).await.ok(); + } + }, + )); + + let mut items = Vec::with_capacity(3); + while let Some(result) = res_rx.recv().await { + items.push(result); + } + + // We expect the stream to yield normally and thus three items. + assert_eq!(items, vec![4, 8, 12]); +} + +#[tokio::test(start_paused = true)] +async fn stream_with_interval_poll_tick_no_waking() { + let stream = IntervalStreamer { + counter: 0, + timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), + wake_on_pending: false, + }; + + let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); + + // Wrap task in timeout so that it will finish eventually even if the stream + // stalls. + tokio::spawn(tokio::time::timeout( + tokio::time::Duration::from_millis(150), + async move { + tokio::pin!(stream); + + while let Some(item) = stream.next().await { + res_tx.send(item).await.ok(); + } + }, + )); + + let mut items = Vec::with_capacity(0); + while let Some(result) = res_rx.recv().await { + items.push(result); + } + + // We expect the stream to stall because it does not reschedule itself on + // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the + // task when returning `Poll::Ready`. + assert_eq!(items, vec![]); +}