diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 4fb6b8783f4..572fdefb0da 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -45,8 +45,7 @@ impl Driver { let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); - let (time_driver, time_handle) = - create_time_driver(cfg.enable_time, io_stack, clock.clone()); + let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock); Ok(( Self { inner: time_driver }, @@ -111,10 +110,8 @@ impl Handle { .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") } - cfg_test_util! { - pub(crate) fn clock(&self) -> &Clock { - &self.clock - } + pub(crate) fn clock(&self) -> &Clock { + &self.clock } } } @@ -289,7 +286,7 @@ cfg_time! { fn create_time_driver( enable: bool, io_stack: IoStack, - clock: Clock, + clock: &Clock, ) -> (TimeDriver, TimeHandle) { if enable { let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); @@ -337,7 +334,7 @@ cfg_not_time! { fn create_time_driver( _enable: bool, io_stack: IoStack, - _clock: Clock, + _clock: &Clock, ) -> (TimeDriver, TimeHandle) { (io_stack, ()) } diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f0d613a3bb4..69f93823551 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -579,6 +579,11 @@ impl TimerEntry { pub(crate) fn driver(&self) -> &super::Handle { self.driver.driver().time() } + + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn clock(&self) -> &super::Clock { + self.driver.driver().clock() + } } impl TimerHandle { diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index f81cab8cc35..215714dd576 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -125,7 +125,7 @@ impl Driver { /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) { + pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { let time_source = TimeSource::new(clock); let handle = Handle { @@ -186,7 +186,7 @@ impl Driver { match next_wake { Some(when) => { - let now = handle.time_source.now(); + let now = handle.time_source.now(rt_handle.clock()); // Note that we effectively round up to 1ms here - this avoids // very short-duration microsecond-resolution sleeps that the OS // might treat as zero-length. @@ -214,13 +214,13 @@ impl Driver { } // Process pending timers after waking up - handle.process(); + handle.process(rt_handle.clock()); } cfg_test_util! { fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { let handle = rt_handle.time(); - let clock = &handle.time_source.clock; + let clock = rt_handle.clock(); if clock.can_auto_advance() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); @@ -231,7 +231,9 @@ impl Driver { // advance the clock. if !handle.did_wake() { // Simulate advancing time - clock.advance(duration); + if let Err(msg) = clock.advance(duration) { + panic!("{}", msg); + } } } else { self.park.park_timeout(rt_handle, duration); @@ -248,8 +250,8 @@ impl Driver { impl Handle { /// Runs timer related logic, and returns the next wakeup time - pub(self) fn process(&self) { - let now = self.time_source().now(); + pub(self) fn process(&self, clock: &Clock) { + let now = self.time_source().now(clock); self.process_at_time(now) } diff --git a/tokio/src/runtime/time/source.rs b/tokio/src/runtime/time/source.rs index e6788edcaf8..39483b5c0ad 100644 --- a/tokio/src/runtime/time/source.rs +++ b/tokio/src/runtime/time/source.rs @@ -5,15 +5,13 @@ use std::convert::TryInto; /// A structure which handles conversion from Instants to u64 timestamps. #[derive(Debug)] pub(crate) struct TimeSource { - pub(crate) clock: Clock, start_time: Instant, } impl TimeSource { - pub(crate) fn new(clock: Clock) -> Self { + pub(crate) fn new(clock: &Clock) -> Self { Self { start_time: clock.now(), - clock, } } @@ -36,7 +34,7 @@ impl TimeSource { Duration::from_millis(t) } - pub(crate) fn now(&self) -> u64 { - self.instant_to_tick(self.clock.now()) + pub(crate) fn now(&self, clock: &Clock) -> u64 { + self.instant_to_tick(clock.now()) } } diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 88c7d768d46..2468a1ae67b 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -62,12 +62,13 @@ fn single_timer() { thread::yield_now(); - let handle = handle.inner.driver().time(); + let time = handle.inner.driver().time(); + let clock = handle.inner.driver().clock(); // This may or may not return Some (depending on how it races with the // thread). If it does return None, however, the timer should complete // synchronously. - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + time.process_at_time(time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -97,10 +98,11 @@ fn drop_timer() { thread::yield_now(); - let handle = handle.inner.driver().time(); + let time = handle.inner.driver().time(); + let clock = handle.inner.driver().clock(); // advance 2s in the future. - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + time.process_at_time(time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -132,10 +134,11 @@ fn change_waker() { thread::yield_now(); - let handle = handle.inner.driver().time(); + let time = handle.inner.driver().time(); + let clock = handle.inner.driver().clock(); // advance 2s - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + time.process_at_time(time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index cd11a67527f..1e273554ec7 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -29,30 +29,40 @@ cfg_not_test_util! { cfg_test_util! { use crate::time::{Duration, Instant}; - use crate::loom::sync::{Arc, Mutex}; + use crate::loom::sync::Mutex; cfg_rt! { - fn clock() -> Option { + #[track_caller] + fn with_clock(f: impl FnOnce(Option<&Clock>) -> Result) -> R { use crate::runtime::Handle; - match Handle::try_current() { - Ok(handle) => Some(handle.inner.driver().clock().clone()), - Err(ref e) if e.is_missing_context() => None, + let res = match Handle::try_current() { + Ok(handle) => f(Some(handle.inner.driver().clock())), + Err(ref e) if e.is_missing_context() => f(None), Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + }; + + match res { + Ok(ret) => ret, + Err(msg) => panic!("{}", msg), } } } cfg_not_rt! { - fn clock() -> Option { - None + #[track_caller] + fn with_clock(f: impl FnOnce(Option<&Clock>) -> Result) -> R { + match f(None) { + Ok(ret) => ret, + Err(msg) => panic!("{}", msg), + } } } /// A handle to a source of time. - #[derive(Debug, Clone)] + #[derive(Debug)] pub(crate) struct Clock { - inner: Arc>, + inner: Mutex, } #[derive(Debug)] @@ -107,8 +117,12 @@ cfg_test_util! { /// [`advance`]: crate::time::advance #[track_caller] pub fn pause() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.pause(); + with_clock(|maybe_clock| { + match maybe_clock { + Some(clock) => clock.pause(), + None => Err("time cannot be frozen from outside the Tokio runtime"), + } + }) } /// Resumes time. @@ -122,14 +136,21 @@ cfg_test_util! { /// runtime. #[track_caller] pub fn resume() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut inner = clock.inner.lock(); + with_clock(|maybe_clock| { + let clock = match maybe_clock { + Some(clock) => clock, + None => return Err("time cannot be frozen from outside the Tokio runtime"), + }; - if inner.unfrozen.is_some() { - panic!("time is not frozen"); - } + let mut inner = clock.inner.lock(); - inner.unfrozen = Some(std::time::Instant::now()); + if inner.unfrozen.is_some() { + return Err("time is not frozen"); + } + + inner.unfrozen = Some(std::time::Instant::now()); + Ok(()) + }) } /// Advances time. @@ -164,19 +185,27 @@ cfg_test_util! { /// /// [`sleep`]: fn@crate::time::sleep pub async fn advance(duration: Duration) { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.advance(duration); + with_clock(|maybe_clock| { + let clock = match maybe_clock { + Some(clock) => clock, + None => return Err("time cannot be frozen from outside the Tokio runtime"), + }; + + clock.advance(duration) + }); crate::task::yield_now().await; } /// Returns the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { - if let Some(clock) = clock() { - clock.now() - } else { - Instant::from_std(std::time::Instant::now()) - } + with_clock(|maybe_clock| { + Ok(if let Some(clock) = maybe_clock { + clock.now() + } else { + Instant::from_std(std::time::Instant::now()) + }) + }) } impl Clock { @@ -186,34 +215,40 @@ cfg_test_util! { let now = std::time::Instant::now(); let clock = Clock { - inner: Arc::new(Mutex::new(Inner { + inner: Mutex::new(Inner { enable_pausing, base: now, unfrozen: Some(now), auto_advance_inhibit_count: 0, - })), + }), }; if start_paused { - clock.pause(); + if let Err(msg) = clock.pause() { + panic!("{}", msg); + } } clock } - #[track_caller] - pub(crate) fn pause(&self) { + pub(crate) fn pause(&self) -> Result<(), &'static str> { let mut inner = self.inner.lock(); if !inner.enable_pausing { drop(inner); // avoid poisoning the lock - panic!("`time::pause()` requires the `current_thread` Tokio runtime. \ + return Err("`time::pause()` requires the `current_thread` Tokio runtime. \ This is the default Runtime used by `#[tokio::test]."); } - let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed(); + let elapsed = match inner.unfrozen.as_ref() { + Some(v) => v.elapsed(), + None => return Err("time is already frozen") + }; inner.base += elapsed; inner.unfrozen = None; + + Ok(()) } /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). @@ -232,15 +267,15 @@ cfg_test_util! { inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0 } - #[track_caller] - pub(crate) fn advance(&self, duration: Duration) { + pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> { let mut inner = self.inner.lock(); if inner.unfrozen.is_some() { - panic!("time is not frozen"); + return Err("time is not frozen"); } inner.base += duration; + Ok(()) } pub(crate) fn now(&self) -> Instant { diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 0a012e25015..ee46a186c01 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -261,10 +261,11 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { + let clock = handle.driver().clock(); let handle = &handle.driver().time(); let time_source = handle.time_source(); let deadline_tick = time_source.deadline_to_tick(deadline); - let duration = deadline_tick.saturating_sub(time_source.now()); + let duration = deadline_tick.saturating_sub(time_source.now(clock)); let location = location.expect("should have location if tracing"); let resource_span = tracing::trace_span!( @@ -370,8 +371,9 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { + let clock = me.entry.clock(); let time_source = me.entry.driver().time_source(); - let now = time_source.now(); + let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) };