Skip to content

Commit

Permalink
Per-entry expiration
Browse files Browse the repository at this point in the history
Change the iterator created by `TimerWheel::advance` method to iterate over
`TimerEvent`s.
  • Loading branch information
tatsuya6502 committed Apr 3, 2023
1 parent d5bf6eb commit f3db798
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 52 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"Deque",
"Deques",
"deschedule",
"Descheduled",
"devcontainer",
"docsrs",
"Einziger",
Expand Down
1 change: 1 addition & 0 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(crate) trait AccessTime {
fn set_last_modified(&self, timestamp: Instant);
}

#[derive(Debug)]
pub(crate) struct KeyHash<K> {
pub(crate) key: Arc<K>,
pub(crate) hash: u64,
Expand Down
1 change: 1 addition & 0 deletions src/common/concurrent/atomic_time/atomic_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
sync::atomic::{AtomicU64, Ordering},
};

#[derive(Debug)]
pub(crate) struct AtomicInstant {
instant: AtomicU64,
}
Expand Down
1 change: 1 addition & 0 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use super::{AccessTime, KeyHash};
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};

#[derive(Debug)]
pub(crate) struct EntryInfo<K> {
#[allow(unused)] // TODO: Remove this.
key_hash: KeyHash<K>,
Expand Down
4 changes: 3 additions & 1 deletion src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ impl<T> Deque<T> {
/// This method takes care not to create mutable references to `element`, to
/// maintain validity of aliasing pointers.
///
/// Panics:
/// IMPORTANT: This method does not drop the node. If the node is no longer
/// needed, use `unlink_and_drop` instead, or drop it at the caller side.
/// Otherwise, the node will leak.
pub(crate) unsafe fn unlink(&mut self, mut node: NonNull<DeqNode<T>>) {
if self.is_at_cursor(node.as_ref()) {
self.advance_cursor();
Expand Down
144 changes: 93 additions & 51 deletions src/common/timer_wheel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,31 @@ impl<K> TimerWheel<K> {
}
}

/// Reschedules an active timer event for the node.
pub(crate) fn reschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) {
unsafe { self.unlink_timer(node) };

fn schedule_existing_node(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) -> bool {
// Since cache entry's ValueEntry has a pointer to this node, we must reuse
// the node.
let elem = &unsafe { node.as_ref() }.element;
if let Some(t) = elem.entry_info.expiration_time() {
let (level, index) = self.bucket_indices(t);
elem.level.store(level as u8, Ordering::Release);
elem.index.store(index as u8, Ordering::Release);
let node = unsafe { Box::from_raw(node.as_ptr()) };
self.wheels[level][index].push_back(node);
true // Successfully rescheduled.
} else {
// TODO: We must decide what to do when the expiration time is unset. We
// will have to unset the pointer to this node in the ValueEntry and then
// drop the node.
//
// For now, we will let the node to be leaked. Just unset the level and
// index.
// Unset the level and index.
elem.level.store(u8::MAX, Ordering::Release);
elem.index.store(u8::MAX, Ordering::Release);
false // The node no longer has the expiration time. Removed.
}
}

/// Reschedules an active timer event for the node.
pub(crate) fn reschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) -> bool {
unsafe { self.unlink_timer(node) };
self.schedule_existing_node(node)
}

/// Removes a timer event for this node if present.
pub(crate) fn deschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) {
unsafe {
Expand All @@ -178,26 +180,23 @@ impl<K> TimerWheel<K> {
}

/// Advances the timer wheel to the current time, and returns an iterator over
/// expired cache entries.
/// timer events.
pub(crate) fn advance(
&mut self,
current_time: Instant,
) -> impl Iterator<Item = TrioArc<EntryInfo<K>>> + '_ {
) -> impl Iterator<Item = TimerEvent<K>> + '_ {
let previous_time = self.current;
self.current = current_time;
ExpiredEntries::new(self, previous_time, current_time)
TimerEventsIter::new(self, previous_time, current_time)
}

/// Returns a reference to the timer event (cache entry) at the front of the
/// queue.
pub(crate) fn pop_timer(
/// Returns a pointer to the timer event (cache entry) at the front of the queue.
pub(crate) fn pop_timer_node(
&mut self,
level: usize,
index: usize,
) -> Option<TrioArc<EntryInfo<K>>> {
self.wheels[level][index]
.pop_front()
.map(|node| TrioArc::clone(&node.element.entry_info))
) -> Option<Box<DeqNode<TimerNode<K>>>> {
self.wheels[level][index].pop_front()
}

/// Returns the bucket indices to locate the bucket that the timer event
Expand Down Expand Up @@ -227,8 +226,22 @@ impl<K> TimerWheel<K> {
}
}

/// A timer event, which is either an expired/rescheduled cache entry, or a
/// descheduled timer. `TimerWheel::advance` returns an iterator over timer events.
#[derive(Debug)]
pub(crate) enum TimerEvent<K> {
/// This cache entry has expired.
Expired(TrioArc<EntryInfo<K>>),
// This cache entry has been rescheduled. Rescheduling includes moving a timer
// from one wheel to another in a lower level of the hierarchy. (This variant
// is mainly used for testing)
Rescheduled(TrioArc<EntryInfo<K>>),
/// This timer node (containing a cache entry) has been removed from the timer.
Descheduled(Box<DeqNode<TimerNode<K>>>),
}

/// An iterator over expired cache entries.
pub(crate) struct ExpiredEntries<'iter, K> {
pub(crate) struct TimerEventsIter<'iter, K> {
timer_wheel: &'iter mut TimerWheel<K>,
previous_time: Instant,
current_time: Instant,
Expand All @@ -240,7 +253,7 @@ pub(crate) struct ExpiredEntries<'iter, K> {
is_index_set: bool,
}

impl<'iter, K> ExpiredEntries<'iter, K> {
impl<'iter, K> TimerEventsIter<'iter, K> {
fn new(
timer_wheel: &'iter mut TimerWheel<K>,
previous_time: Instant,
Expand All @@ -260,7 +273,7 @@ impl<'iter, K> ExpiredEntries<'iter, K> {
}
}

impl<'iter, K> Drop for ExpiredEntries<'iter, K> {
impl<'iter, K> Drop for TimerEventsIter<'iter, K> {
fn drop(&mut self) {
// If dropped without completely consuming this iterator, reset the timer
// wheel's current time to the previous time.
Expand All @@ -270,8 +283,8 @@ impl<'iter, K> Drop for ExpiredEntries<'iter, K> {
}
}

impl<'iter, K> Iterator for ExpiredEntries<'iter, K> {
type Item = TrioArc<EntryInfo<K>>;
impl<'iter, K> Iterator for TimerEventsIter<'iter, K> {
type Item = TimerEvent<K>;

fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
Expand Down Expand Up @@ -305,15 +318,30 @@ impl<'iter, K> Iterator for ExpiredEntries<'iter, K> {
// Pop the next timer event (cache entry) from the current level and
// index.
let i = self.index & self.index_mask as u8;
match self.timer_wheel.pop_timer(self.level, i as usize) {
Some(entry_info) => {
if let Some(t) = entry_info.expiration_time() {
match self.timer_wheel.pop_timer_node(self.level, i as usize) {
Some(node) => {
let expiration_time = unsafe { node.as_ref() }
.element
.entry_info()
.expiration_time();
if let Some(t) = expiration_time {
if t <= self.current_time {
// The cache entry has expired. Return it.
return Some(entry_info);
let entry_info = unsafe { node.as_ref() }.element.entry_info();
return Some(TimerEvent::Expired(TrioArc::clone(entry_info)));
} else {
// The cache entry has not expired. Reschedule it.
self.timer_wheel.schedule(entry_info);
let node_p = NonNull::new(Box::into_raw(node)).expect("Got a null ptr");
if self.timer_wheel.schedule_existing_node(node_p) {
let entry_info = unsafe { node_p.as_ref() }.element.entry_info();
return Some(TimerEvent::Rescheduled(TrioArc::clone(entry_info)));
} else {
// The timer event has been removed from the timer
// wheel. Return it, so that the caller can remove the
// pointer to the node from a `ValueEntry`.
let node = unsafe { Box::from_raw(node_p.as_ptr()) };
return Some(TimerEvent::Descheduled(node));
}
}
}
}
Expand All @@ -340,7 +368,7 @@ impl<'iter, K> Iterator for ExpiredEntries<'iter, K> {
mod tests {
use std::{f32::MIN, sync::Arc, time::Duration};

use super::{TimerWheel, SPANS};
use super::{TimerEvent, TimerWheel, SPANS};
use crate::common::{
concurrent::{entry_info::EntryInfo, KeyHash},
time::{CheckedTimeOps, Clock, Instant, Mock},
Expand All @@ -349,7 +377,7 @@ mod tests {
use triomphe::Arc as TrioArc;

#[test]
fn bucket_indices() {
fn test_bucket_indices() {
fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) {
let t = now.checked_add(dur).unwrap();
timer.bucket_indices(t)
Expand Down Expand Up @@ -429,7 +457,7 @@ mod tests {
}

#[test]
fn advance() {
fn test_advance() {
fn schedule_timer(timer: &mut TimerWheel<u32>, key: u32, now: Instant, ttl: Duration) {
let hash = key as u64;
let key_hash = KeyHash::new(Arc::new(key), hash);
Expand All @@ -439,8 +467,20 @@ mod tests {
timer.schedule(entry_info);
}

fn key(maybe_entry: Option<TrioArc<EntryInfo<u32>>>) -> u32 {
*maybe_entry.expect("entry is none").key_hash().key
fn expired_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
let entry = maybe_entry.expect("entry is none");
match entry {
TimerEvent::Expired(entry) => *entry.key_hash().key,
_ => panic!("Expected an expired entry. Got {:?}", entry),
}
}

fn rescheduled_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
let entry = maybe_entry.expect("entry is none");
match entry {
TimerEvent::Rescheduled(entry) => *entry.key_hash().key,
_ => panic!("Expected a rescheduled entry. Got {:?}", entry),
}
}

let (clock, mock) = Clock::mock();
Expand All @@ -456,20 +496,20 @@ mod tests {

let now = advance_clock(&clock, &mock, s2d(4));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 2);
assert_eq!(key(expired_entries.next()), 4);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(4));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 1);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(64 - 8));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 3);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);

Expand All @@ -483,20 +523,20 @@ mod tests {

let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 2);
assert_eq!(key(expired_entries.next()), 4);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 1);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d((64 - 8) * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 3);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);

Expand All @@ -510,20 +550,21 @@ mod tests {

let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 2);
assert_eq!(key(expired_entries.next()), 4);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert_eq!(rescheduled_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 1);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d((32 - 8) * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 3);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);

Expand All @@ -538,20 +579,21 @@ mod tests {

let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 2);
assert_eq!(key(expired_entries.next()), 3);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 1);
assert_eq!(expired_key(expired_entries.next()), 1);
assert_eq!(rescheduled_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);

let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(key(expired_entries.next()), 4);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);
}
Expand Down

0 comments on commit f3db798

Please sign in to comment.