Skip to content

Commit

Permalink
util/timer: introduce steady timer (tikv#4060)
Browse files Browse the repository at this point in the history
Steady timer is used to create futures that is notified after a given
duration. It's called steady because it won't be affected by the time
adjustment. This is important for raftstore as the lease mechanism is
based on a steady time.
  • Loading branch information
BusyJay committed Jan 12, 2019
1 parent dcfa37d commit b5459f1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 14 deletions.
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -85,6 +85,7 @@ futures-cpupool = "0.1"
tokio = "0.1"
tokio-core = "0.1"
tokio-timer = "0.2"
tokio-executor = "0.1"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -106,6 +106,7 @@ extern crate time;
extern crate tipb;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_executor;
extern crate tokio_timer;
#[cfg(test)]
extern crate toml;
Expand Down
2 changes: 1 addition & 1 deletion src/util/time.rs
Expand Up @@ -295,7 +295,7 @@ impl Instant {
}
}

fn elapsed_duration(later: Timespec, earlier: Timespec) -> Duration {
pub fn elapsed_duration(later: Timespec, earlier: Timespec) -> Duration {
if later >= earlier {
(later - earlier).to_std().unwrap()
} else {
Expand Down
118 changes: 115 additions & 3 deletions src/util/timer.rs
Expand Up @@ -13,11 +13,13 @@

use std::cmp::{Ord, Ordering, Reverse};
use std::collections::BinaryHeap;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use std::thread::Builder;
use std::time::Duration;
use tokio_timer::{self, timer::Handle};
use util::time::Instant;
use time::Timespec;
use tokio_executor::park::ParkThread;
use tokio_timer::{self, clock::Clock, clock::Now, timer::Handle, Delay};
use util::time::{monotonic_raw_now, Instant};

pub struct Timer<T> {
pending: BinaryHeap<Reverse<TimeoutTask<T>>>,
Expand Down Expand Up @@ -107,6 +109,107 @@ fn start_global_timer() -> Handle {
rx.recv().unwrap()
}

/// A struct that marks the *zero* time.
///
/// A *zero* time can be any time, as what it represents is `Instant`,
/// which is Opaque.
struct TimeZero {
/// An arbitrary time used as the zero time.
///
/// Note that `zero` doesn't have to be related to `steady_time_point`, as what's
/// observed here is elapsed time instead of time point.
zero: ::std::time::Instant,
/// A base time point.
///
/// The source of time point should grow steady.
steady_time_point: Timespec,
}

/// A clock that produces time in a steady speed.
///
/// Time produced by the clock is not affected by clock jump or time adjustment.
/// Internally it uses CLOCK_MONOTONIC_RAW to get a steady time source.
///
/// `Instant`s produced by this clock can't be compared or used to calculate elapse
/// unless they are produced using the same zero time.
#[derive(Clone)]
pub struct SteadyClock {
zero: Arc<TimeZero>,
}

lazy_static! {
static ref STEADY_CLOCK: SteadyClock = SteadyClock {
zero: Arc::new(TimeZero {
zero: ::std::time::Instant::now(),
steady_time_point: monotonic_raw_now(),
}),
};
}

impl Default for SteadyClock {
#[inline]
fn default() -> SteadyClock {
STEADY_CLOCK.clone()
}
}

impl Now for SteadyClock {
#[inline]
fn now(&self) -> ::std::time::Instant {
let n = monotonic_raw_now();
let dur = Instant::elapsed_duration(n, self.zero.steady_time_point);
self.zero.zero + dur
}
}

/// A timer that creates steady delays.
///
/// Delay created by this timer will not be affected by time adjustment.
#[derive(Clone)]
pub struct SteadyTimer {
clock: SteadyClock,
handle: Handle,
}

impl SteadyTimer {
/// Creates a delay future that will be notified after the given duration.
pub fn delay(&self, dur: Duration) -> Delay {
self.handle.delay(self.clock.now() + dur)
}
}

lazy_static! {
static ref GLOBAL_STEADY_TIMER: SteadyTimer = start_global_steady_timer();
}

impl Default for SteadyTimer {
#[inline]
fn default() -> SteadyTimer {
GLOBAL_STEADY_TIMER.clone()
}
}

fn start_global_steady_timer() -> SteadyTimer {
let (tx, rx) = mpsc::channel();
let clock = SteadyClock::default();
let clock_ = clock.clone();
Builder::new()
.name(thd_name!("steady-timer"))
.spawn(move || {
let c = Clock::new_with_now(clock_);
let mut timer = tokio_timer::Timer::new_with_now(ParkThread::new(), c);
tx.send(timer.handle()).unwrap();
loop {
timer.turn(None).unwrap();
}
})
.unwrap();
SteadyTimer {
clock,
handle: rx.recv().unwrap(),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -226,4 +329,13 @@ mod tests {
delay.wait().unwrap();
assert!(timer.elapsed() >= Duration::from_millis(100));
}

#[test]
fn test_global_steady_timer() {
let t = SteadyTimer::default();
let timer = t.clock.now();
let delay = t.delay(Duration::from_millis(100));
delay.wait().unwrap();
assert!(timer.elapsed() >= Duration::from_millis(100));
}
}

0 comments on commit b5459f1

Please sign in to comment.