From b5459f1b86660799a55a119e9d6d45ea4f7f4561 Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 12 Jan 2019 14:10:55 +0800 Subject: [PATCH] util/timer: introduce steady timer (#4060) Steady timer is used to create futures that is notified after a given duration. It's called steady because it won't be affected by the time adjustment. This is important for raftstore as the lease mechanism is based on a steady time. --- Cargo.lock | 21 +++++---- Cargo.toml | 1 + src/lib.rs | 1 + src/util/time.rs | 2 +- src/util/timer.rs | 118 ++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 129 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b814b93921a..82def21fb01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -798,7 +798,7 @@ dependencies = [ "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1413,7 +1413,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1625,7 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "scoped-tls" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1916,7 +1916,7 @@ dependencies = [ "slog-global 0.1.0 (git+https://github.com/breeswish/slog-global.git?rev=91904ade)", "tempdir 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "tikv 3.0.0-alpha", - "tokio 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2028,8 +2028,9 @@ dependencies = [ "test_util 0.0.1", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tipb 0.0.1 (git+https://github.com/pingcap/tipb.git)", - "tokio 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2060,7 +2061,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2100,8 +2101,8 @@ dependencies = [ "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", - "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2611,7 +2612,7 @@ dependencies = [ "checksum rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a54aa04a10c68c1c4eacb4337fd883b435997ede17a9385784b990777686b09a" "checksum ryu 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "16aa12da69951804cddf5f74d96abcc414a31b064e610dc81e37c1536082f491" "checksum safemem 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dca453248a96cb0749e36ccdfe2b0b4e54a61bfef89fb97ec621eb8e0a93dd9" -"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" +"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" "checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" @@ -2650,7 +2651,7 @@ dependencies = [ "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" "checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520" "checksum tipb 0.0.1 (git+https://github.com/pingcap/tipb.git)" = "" -"checksum tokio 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "a7817d4c98cc5be21360b3b37d6036fe9b7aefa5b7a201b7b16ff33423822f7d" +"checksum tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4790d0be6f4ba6ae4f48190efa2ed7780c9e3567796abdb285003cf39840d9c5" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" "checksum tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "331c8acc267855ec06eb0c94618dcbbfea45bed2d20b77252940095273fb58f6" diff --git a/Cargo.toml b/Cargo.toml index e4ff7e83dbb..f64bc459b7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ futures-cpupool = "0.1" tokio = "0.1" tokio-core = "0.1" tokio-timer = "0.2" +tokio-executor = "0.1" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" diff --git a/src/lib.rs b/src/lib.rs index af524577782..f2c7bf5d045 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,6 +106,7 @@ extern crate time; extern crate tipb; extern crate tokio; extern crate tokio_core; +extern crate tokio_executor; extern crate tokio_timer; #[cfg(test)] extern crate toml; diff --git a/src/util/time.rs b/src/util/time.rs index 16f990d20d0..2bc2c62ba35 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -295,7 +295,7 @@ impl Instant { } } - fn elapsed_duration(later: Timespec, earlier: Timespec) -> Duration { + pub fn elapsed_duration(later: Timespec, earlier: Timespec) -> Duration { if later >= earlier { (later - earlier).to_std().unwrap() } else { diff --git a/src/util/timer.rs b/src/util/timer.rs index 82c4e1e479d..3b12a5ba55a 100644 --- a/src/util/timer.rs +++ b/src/util/timer.rs @@ -13,11 +13,13 @@ use std::cmp::{Ord, Ordering, Reverse}; use std::collections::BinaryHeap; -use std::sync::mpsc; +use std::sync::{mpsc, Arc}; use std::thread::Builder; use std::time::Duration; -use tokio_timer::{self, timer::Handle}; -use util::time::Instant; +use time::Timespec; +use tokio_executor::park::ParkThread; +use tokio_timer::{self, clock::Clock, clock::Now, timer::Handle, Delay}; +use util::time::{monotonic_raw_now, Instant}; pub struct Timer { pending: BinaryHeap>>, @@ -107,6 +109,107 @@ fn start_global_timer() -> Handle { rx.recv().unwrap() } +/// A struct that marks the *zero* time. +/// +/// A *zero* time can be any time, as what it represents is `Instant`, +/// which is Opaque. +struct TimeZero { + /// An arbitrary time used as the zero time. + /// + /// Note that `zero` doesn't have to be related to `steady_time_point`, as what's + /// observed here is elapsed time instead of time point. + zero: ::std::time::Instant, + /// A base time point. + /// + /// The source of time point should grow steady. + steady_time_point: Timespec, +} + +/// A clock that produces time in a steady speed. +/// +/// Time produced by the clock is not affected by clock jump or time adjustment. +/// Internally it uses CLOCK_MONOTONIC_RAW to get a steady time source. +/// +/// `Instant`s produced by this clock can't be compared or used to calculate elapse +/// unless they are produced using the same zero time. +#[derive(Clone)] +pub struct SteadyClock { + zero: Arc, +} + +lazy_static! { + static ref STEADY_CLOCK: SteadyClock = SteadyClock { + zero: Arc::new(TimeZero { + zero: ::std::time::Instant::now(), + steady_time_point: monotonic_raw_now(), + }), + }; +} + +impl Default for SteadyClock { + #[inline] + fn default() -> SteadyClock { + STEADY_CLOCK.clone() + } +} + +impl Now for SteadyClock { + #[inline] + fn now(&self) -> ::std::time::Instant { + let n = monotonic_raw_now(); + let dur = Instant::elapsed_duration(n, self.zero.steady_time_point); + self.zero.zero + dur + } +} + +/// A timer that creates steady delays. +/// +/// Delay created by this timer will not be affected by time adjustment. +#[derive(Clone)] +pub struct SteadyTimer { + clock: SteadyClock, + handle: Handle, +} + +impl SteadyTimer { + /// Creates a delay future that will be notified after the given duration. + pub fn delay(&self, dur: Duration) -> Delay { + self.handle.delay(self.clock.now() + dur) + } +} + +lazy_static! { + static ref GLOBAL_STEADY_TIMER: SteadyTimer = start_global_steady_timer(); +} + +impl Default for SteadyTimer { + #[inline] + fn default() -> SteadyTimer { + GLOBAL_STEADY_TIMER.clone() + } +} + +fn start_global_steady_timer() -> SteadyTimer { + let (tx, rx) = mpsc::channel(); + let clock = SteadyClock::default(); + let clock_ = clock.clone(); + Builder::new() + .name(thd_name!("steady-timer")) + .spawn(move || { + let c = Clock::new_with_now(clock_); + let mut timer = tokio_timer::Timer::new_with_now(ParkThread::new(), c); + tx.send(timer.handle()).unwrap(); + loop { + timer.turn(None).unwrap(); + } + }) + .unwrap(); + SteadyTimer { + clock, + handle: rx.recv().unwrap(), + } +} + #[cfg(test)] mod tests { use super::*; @@ -226,4 +329,13 @@ mod tests { delay.wait().unwrap(); assert!(timer.elapsed() >= Duration::from_millis(100)); } + + #[test] + fn test_global_steady_timer() { + let t = SteadyTimer::default(); + let timer = t.clock.now(); + let delay = t.delay(Duration::from_millis(100)); + delay.wait().unwrap(); + assert!(timer.elapsed() >= Duration::from_millis(100)); + } }