Skip to content

Commit

Permalink
Per-entry expiration
Browse files Browse the repository at this point in the history
- Add an argument `last_modified_at` to `Expiry::expire_after_read`.
- Fix FIXME TODOs in the `TimerWheel`.
  • Loading branch information
tatsuya6502 committed Apr 11, 2023
1 parent 2642356 commit dc5fd49
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
33 changes: 23 additions & 10 deletions src/common/timer_wheel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,10 @@ impl<K> TimerWheel<K> {
/// Returns the bucket indices to locate the bucket that the timer event
/// should be added to.
fn bucket_indices(&self, time: Instant) -> (usize, usize) {
let duration = time
.checked_duration_since(self.current)
// FIXME: unwrap will panic if the time is earlier than self.current.
.unwrap()
.as_nanos() as u64;
let duration_nanos = self.duration_nanos_since_last_advanced(time);
let time_nanos = self.time_nanos(time);
for level in 0..=NUM_LEVELS {
if duration < SPANS[level + 1] {
if duration_nanos < SPANS[level + 1] {
let ticks = time_nanos >> SHIFT[level];
let index = ticks & (BUCKET_COUNTS[level] - 1);
return (level, index as usize);
Expand All @@ -321,11 +317,28 @@ impl<K> TimerWheel<K> {
(OVERFLOW_QUEUE_INDEX, 0)
}

// Nano-seconds since the timer wheel was created.
// Returns nano-seconds between the given `time` and the time when this timer
// wheel was advanced. If the `time` is earlier than other, returns zero.
fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 {
time.checked_duration_since(self.current)
// If `time` is earlier than `self.current`, use zero. This could happen
// when a user provided `Expiry` method returned zero or a very short
// duration.
.unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`.
.as_nanos() as u64
}

// Returns nano-seconds between the given `time` and the time when this timer
// wheel was created. If the `time` is earlier than other, returns zero.
fn time_nanos(&self, time: Instant) -> u64 {
// ENHANCEME: Check overflow? (u128 -> u64)
// FIXME: unwrap will panic if the time is earlier than self.origin.
time.checked_duration_since(self.origin).unwrap().as_nanos() as u64
time.checked_duration_since(self.origin)
// If `time` is earlier than `self.origin`, use zero. This would never
// happen in practice as there should be some delay between the timer
// wheel was created and the first timer event is scheduled. But we will
// do this just in case.
.unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`.
// TODO ENHANCEME: Check overflow? (u128 -> u64)
.as_nanos() as u64
}
}

Expand Down
1 change: 1 addition & 0 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2640,6 +2640,7 @@ mod tests {
_value: &&str,
_current_time: StdInstant,
_current_duration: Option<Duration>,
_last_modified_at: StdInstant,
) -> Option<Duration> {
Some(Duration::from_secs(10))
}
Expand Down
1 change: 1 addition & 0 deletions src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub trait Expiry<K, V> {
value: &V,
current_time: Instant,
current_duration: Option<Duration>,
last_modified_at: Instant,
) -> Option<Duration> {
current_duration
}
Expand Down
58 changes: 33 additions & 25 deletions src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ where
return None;
}

let now = self.current_time_from_expiration_clock();
let mut now = self.current_time_from_expiration_clock();

let maybe_entry = self
.inner
Expand All @@ -324,15 +324,28 @@ where
} else {
// Valid entry.
let maybe_key = if need_key { Some(Arc::clone(k)) } else { None };
Some((maybe_key, TrioArc::clone(entry), now))
Some((maybe_key, TrioArc::clone(entry)))
}
});

if let Some((maybe_key, entry, now)) = maybe_entry {
if let Some((maybe_key, entry)) = maybe_entry {
let mut is_expiry_modified = false;

// Call the user supplied `expire_after_read` method if any.
if let Some(expiry) = &self.inner.expiration_policy.expiry() {
let lm = entry.last_modified().expect("Last modified is not set");
// Check if the `last_modified` of entry is earlier than or equals to
// `now`. If not, update the `now` to `last_modified`. This is needed
// because there is a small chance that other threads have inserted
// the entry _after_ we obtained `now`.
now = now.max(lm);

// Convert `last_modified` from `moka::common::time::Instant` to
// `std::time::Instant`.
let lm = self.inner.clocks().to_std_instant(lm);

is_expiry_modified = Self::expire_after_read_or_update(
|k, v, t, d| expiry.expire_after_read(k, v, t, d),
|k, v, t, d| expiry.expire_after_read(k, v, t, d, lm),
&entry.entry_info().key_hash().key,
&entry,
self.inner.expiration_policy.time_to_live(),
Expand All @@ -341,6 +354,7 @@ where
self.inner.clocks(),
);
}

let v = entry.value.clone();
let op = ReadOp::Hit {
value_entry: entry,
Expand Down Expand Up @@ -952,7 +966,10 @@ pub(crate) struct Inner<K, V, S> {
clocks: Clocks,
}

//
// functions/methods used by BaseCache
//

impl<K, V, S> Inner<K, V, S> {
fn name(&self) -> Option<&str> {
self.name.as_deref()
Expand Down Expand Up @@ -987,6 +1004,14 @@ impl<K, V, S> Inner<K, V, S> {
.unwrap_or_default()
}

fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
where
K: Hash + Eq,
S: BuildHasher,
{
self.key_locks.as_ref().map(|kls| kls.key_lock(key))
}

#[cfg(feature = "unstable-debug-counters")]
pub fn debug_stats(&self) -> CacheDebugStats {
let ec = self.entry_count.load();
Expand Down Expand Up @@ -1016,6 +1041,10 @@ impl<K, V, S> Inner<K, V, S> {
}
}

fn clocks(&self) -> &Clocks {
&self.clocks
}

fn num_cht_segments(&self) -> usize {
self.cache.actual_num_segments()
}
Expand Down Expand Up @@ -1057,27 +1086,6 @@ impl<K, V, S> Inner<K, V, S> {
}
}

// functions/methods used by BaseCache
impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq,
S: BuildHasher,
{
fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>> {
self.key_locks.as_ref().map(|kls| kls.key_lock(key))
}
}

// functions/methods used by BaseCache
impl<K, V, S> Inner<K, V, S> {
fn clocks(&self) -> &Clocks {
&self.clocks
}
// fn to_std_instant(&self, instant: Instant) -> StdInstant {
// self.clocks.to_std_instant(instant)
// }
}

impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
Expand Down

0 comments on commit dc5fd49

Please sign in to comment.