-
-
Notifications
You must be signed in to change notification settings - Fork 62
/
housekeeper.rs
81 lines (69 loc) · 2.3 KB
/
housekeeper.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use crate::common::{
concurrent::{
atomic_time::AtomicInstant,
constants::{
MAX_SYNC_REPEATS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, READ_LOG_FLUSH_POINT,
WRITE_LOG_FLUSH_POINT,
},
},
time::{CheckedTimeOps, Instant},
};
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use async_trait::async_trait;
#[async_trait]
pub(crate) trait InnerSync {
async fn run_pending_tasks(&self, max_sync_repeats: usize);
fn now(&self) -> Instant;
}
pub(crate) struct Housekeeper {
is_sync_running: AtomicBool,
sync_after: AtomicInstant,
}
impl Default for Housekeeper {
fn default() -> Self {
Self {
is_sync_running: Default::default(),
sync_after: AtomicInstant::new(Self::sync_after(Instant::now())),
}
}
}
impl Housekeeper {
pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now)
}
pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now)
}
#[inline]
fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now
}
pub(crate) async fn try_sync<T: InnerSync>(&self, cache: &T) -> bool {
// Try to flip the value of sync_scheduled from false to true.
match self.is_sync_running.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let now = cache.now();
self.sync_after.set_instant(Self::sync_after(now));
cache.run_pending_tasks(MAX_SYNC_REPEATS).await;
self.is_sync_running.store(false, Ordering::Release);
true
}
Err(_) => false,
}
}
fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
ts.expect("Timestamp overflow")
}
}