Skip to content

Commit

Permalink
Per-entry expiration
Browse files Browse the repository at this point in the history
- Add a public `Expiry` trait.
- Update `base_cache` module to use it when reading from and writing to a cache.
- Update `future::CacheBuilder` to allow to set an `Expiry` to a `future::Cache`.
  • Loading branch information
tatsuya6502 committed Apr 8, 2023
1 parent 2febcff commit 068083a
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 113 deletions.
6 changes: 5 additions & 1 deletion src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,12 @@ impl<K, V> AccessTime for TrioArc<ValueEntry<K, V>> {
}

pub(crate) enum ReadOp<K, V> {
Hit {
value_entry: TrioArc<ValueEntry<K, V>>,
timestamp: Instant,
is_expiry_modified: bool,
},
// u64 is the hash of the key.
Hit(u64, TrioArc<ValueEntry<K, V>>, Instant),
Miss(u64),
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/concurrent/atomic_time/atomic_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl AtomicInstant {
ai
}

pub(crate) fn clear(&self) {
self.instant.store(u64::MAX, Ordering::Release);
}

pub(crate) fn is_set(&self) -> bool {
self.instant.load(Ordering::Acquire) != u64::MAX
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/concurrent/atomic_time/atomic_time_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl AtomicInstant {
ai
}

pub(crate) fn clear(&self) {
*self.instant.write() = None;
}

pub(crate) fn is_set(&self) -> bool {
self.instant.read().is_some()
}
Expand Down
8 changes: 6 additions & 2 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ impl<K> EntryInfo<K> {
self.expiration_time.instant()
}

pub(crate) fn set_expiration_time(&self, time: Instant) {
self.expiration_time.set_instant(time);
pub(crate) fn set_expiration_time(&self, time: Option<Instant>) {
if let Some(t) = time {
self.expiration_time.set_instant(t);
} else {
self.expiration_time.clear();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/timer_wheel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ mod tests {
let key_hash = KeyHash::new(Arc::new(key), hash);
let policy_weight = 0;
let entry_info = TrioArc::new(EntryInfo::new(key_hash, now, policy_weight));
entry_info.set_expiration_time(now.checked_add(ttl).unwrap());
entry_info.set_expiration_time(Some(now.checked_add(ttl).unwrap()));
timer.schedule(entry_info);
}

Expand Down
40 changes: 22 additions & 18 deletions src/future/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use super::Cache;
use crate::{
common::{builder_utils, concurrent::Weigher},
notification::{self, DeliveryMode, EvictionListener, RemovalCause},
policy::ExpirationPolicy,
Expiry,
};

use std::{
Expand Down Expand Up @@ -60,8 +62,7 @@ pub struct CacheBuilder<K, V, C> {
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
cache_type: PhantomData<C>,
}
Expand All @@ -79,8 +80,7 @@ where
weigher: None,
eviction_listener: None,
eviction_listener_conf: None,
time_to_live: None,
time_to_idle: None,
expiration_policy: Default::default(),
invalidator_enabled: false,
cache_type: Default::default(),
}
Expand Down Expand Up @@ -110,7 +110,8 @@ where
/// expiration.
pub fn build(self) -> Cache<K, V, RandomState> {
let build_hasher = RandomState::default();
builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle);
let exp = &self.expiration_policy;
builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle());
Cache::with_everything(
self.name,
self.max_capacity,
Expand All @@ -119,8 +120,7 @@ where
self.weigher,
self.eviction_listener,
self.eviction_listener_conf,
self.time_to_live,
self.time_to_idle,
self.expiration_policy,
self.invalidator_enabled,
builder_utils::housekeeper_conf(true),
)
Expand Down Expand Up @@ -208,7 +208,8 @@ where
where
S: BuildHasher + Clone + Send + Sync + 'static,
{
builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle);
let exp = &self.expiration_policy;
builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle());
Cache::with_everything(
self.name,
self.max_capacity,
Expand All @@ -217,8 +218,7 @@ where
self.weigher,
self.eviction_listener,
self.eviction_listener_conf,
self.time_to_live,
self.time_to_idle,
self.expiration_policy,
self.invalidator_enabled,
builder_utils::housekeeper_conf(true),
)
Expand Down Expand Up @@ -302,10 +302,9 @@ impl<K, V, C> CacheBuilder<K, V, C> {
/// than 1000 years. This is done to protect against overflow when computing key
/// expiration.
pub fn time_to_live(self, duration: Duration) -> Self {
Self {
time_to_live: Some(duration),
..self
}
let mut builder = self;
builder.expiration_policy.set_time_to_live(duration);
builder
}

/// Sets the time to idle of the cache.
Expand All @@ -319,10 +318,15 @@ impl<K, V, C> CacheBuilder<K, V, C> {
/// than 1000 years. This is done to protect against overflow when computing key
/// expiration.
pub fn time_to_idle(self, duration: Duration) -> Self {
Self {
time_to_idle: Some(duration),
..self
}
let mut builder = self;
builder.expiration_policy.set_time_to_idle(duration);
builder
}

pub fn expire_after(self, expiry: impl Expiry<K, V> + Send + Sync + 'static) -> Self {
let mut builder = self;
builder.expiration_policy.set_expiry(Arc::new(expiry));
builder
}

/// Enables support for [Cache::invalidate_entries_if][cache-invalidate-if]
Expand Down
14 changes: 6 additions & 8 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
time::Instant,
},
notification::{self, EvictionListener},
policy::ExpirationPolicy,
sync_base::base_cache::{BaseCache, HouseKeeperArc},
Entry, Policy, PredicateError,
};
Expand Down Expand Up @@ -661,8 +662,7 @@ where
None,
None,
None,
None,
None,
Default::default(),
false,
housekeeper::Configuration::new_thread_pool(true),
)
Expand Down Expand Up @@ -693,8 +693,7 @@ where
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<EvictionListener<K, V>>,
eviction_listener_conf: Option<notification::Configuration>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
Expand All @@ -707,8 +706,7 @@ where
weigher,
eviction_listener,
eviction_listener_conf,
time_to_live,
time_to_idle,
expiration_policy,
invalidator_enabled,
housekeeper_conf,
),
Expand Down Expand Up @@ -1220,7 +1218,7 @@ where

let hash = self.base.hash(&key);
let key = Arc::new(key);
let (op, now) = self.base.do_insert_with_hash(key, hash, value, None);
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::blocking_schedule_write_op(
self.base.inner.as_ref(),
Expand Down Expand Up @@ -1733,7 +1731,7 @@ where
return;
}

let (op, now) = self.base.do_insert_with_hash(key, hash, value, None);
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub use common::error::PredicateError;
#[cfg(any(feature = "sync", feature = "future"))]
pub use common::entry::Entry;

pub use policy::Policy;
pub use policy::{Expiry, Policy};

#[cfg(feature = "dash")]
compile_error!(
Expand Down
105 changes: 104 additions & 1 deletion src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::time::Duration;
use std::{
sync::Arc,
time::{Duration, Instant},
};

#[derive(Clone, Debug)]
/// The policy of a cache.
Expand Down Expand Up @@ -54,3 +57,103 @@ impl Policy {
self.time_to_idle
}
}

/// Calculates when cache entries expire. A single expiration time is retained on
/// each entry so that the lifetime of an entry may be extended or reduced by
/// subsequent evaluations.
pub trait Expiry<K, V> {
/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after the entry's creation. Returning `None`
/// indicates no expiration for the entry.
#[allow(unused_variables)]
fn expire_after_create(&self, key: &K, value: &V, current_time: Instant) -> Option<Duration> {
None
}

/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after its last read. Returning `None` indicates no
/// expiration for the entry. Returning `current_duration` will not modify the
/// expiration time.
#[allow(unused_variables)]
fn expire_after_read(
&self,
key: &K,
value: &V,
current_time: Instant,
current_duration: Option<Duration>,
) -> Option<Duration> {
None
}

/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after the replacement of its value. Returning `None`
/// indicates no expiration for the entry. Returning `current_duration` will not
/// modify the expiration time.
#[allow(unused_variables)]
fn expire_after_update(
&self,
key: &K,
value: &V,
current_time: Instant,
current_duration: Option<Duration>,
) -> Option<Duration> {
None
}
}

#[derive(Clone)]
pub(crate) struct ExpirationPolicy<K, V> {
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
expiry: Option<Arc<dyn Expiry<K, V> + Send + Sync + 'static>>,
}

impl<K, V> Default for ExpirationPolicy<K, V> {
fn default() -> Self {
Self {
time_to_live: None,
time_to_idle: None,
expiry: None,
}
}
}

impl<K, V> ExpirationPolicy<K, V> {
pub(crate) fn new(
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
expiry: Option<Arc<dyn Expiry<K, V> + Send + Sync + 'static>>,
) -> Self {
Self {
time_to_live,
time_to_idle,
expiry,
}
}

/// Returns the `time_to_live` of the cache.
pub(crate) fn time_to_live(&self) -> Option<Duration> {
self.time_to_live
}

pub(crate) fn set_time_to_live(&mut self, duration: Duration) {
self.time_to_live = Some(duration);
}

/// Returns the `time_to_idle` of the cache.
pub(crate) fn time_to_idle(&self) -> Option<Duration> {
self.time_to_idle
}

pub(crate) fn set_time_to_idle(&mut self, duration: Duration) {
self.time_to_idle = Some(duration);
}

pub(crate) fn expiry(&self) -> Option<Arc<dyn Expiry<K, V> + Send + Sync + 'static>> {
self.expiry.as_ref().map(Arc::clone)
}

pub(crate) fn set_expiry(&mut self, expiry: Arc<dyn Expiry<K, V> + Send + Sync + 'static>) {
self.expiry = Some(expiry);
}
}
8 changes: 5 additions & 3 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
time::Instant,
},
notification::{self, EvictionListener},
policy::ExpirationPolicy,
sync::{Iter, PredicateId},
sync_base::{
base_cache::{BaseCache, HouseKeeperArc},
Expand Down Expand Up @@ -851,6 +852,8 @@ where
invalidator_enabled: bool,
housekeeper_conf: housekeeper::Configuration,
) -> Self {
// TODO
let expiration_policy = ExpirationPolicy::new(time_to_live, time_to_idle, None);
Self {
base: BaseCache::new(
name,
Expand All @@ -860,8 +863,7 @@ where
weigher,
eviction_listener,
eviction_listener_conf,
time_to_live,
time_to_idle,
expiration_policy,
invalidator_enabled,
housekeeper_conf,
),
Expand Down Expand Up @@ -1606,7 +1608,7 @@ where
return;
}

let (op, now) = self.base.do_insert_with_hash(key, hash, value, None);
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
Expand Down
Loading

0 comments on commit 068083a

Please sign in to comment.