Skip to content

Commit

Permalink
Implement the invalidate_entries_if method
Browse files Browse the repository at this point in the history
- Add invalidate_entries_if to the unsync cache.
  • Loading branch information
tatsuya6502 committed May 5, 2021
1 parent 1c82344 commit 869edb3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/unsync/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct CacheBuilder<C> {
initial_capacity: Option<usize>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
cache_type: PhantomData<C>,
}

Expand All @@ -56,6 +57,7 @@ where
initial_capacity: None,
time_to_live: None,
time_to_idle: None,
invalidator_enabled: false,
cache_type: PhantomData::default(),
}
}
Expand All @@ -69,6 +71,7 @@ where
build_hasher,
self.time_to_live,
self.time_to_idle,
self.invalidator_enabled,
)
}

Expand All @@ -83,6 +86,7 @@ where
hasher,
self.time_to_live,
self.time_to_idle,
self.invalidator_enabled,
)
}
}
Expand Down Expand Up @@ -117,6 +121,13 @@ impl<C> CacheBuilder<C> {
..self
}
}

pub fn support_invalidation_closures(self) -> Self {
Self {
invalidator_enabled: true,
..self
}
}
}

#[cfg(test)]
Expand Down
104 changes: 97 additions & 7 deletions src/unsync/cache.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use super::{deques::Deques, KeyDate, KeyHashDate, ValueEntry};
use crate::common::{
deque::{CacheRegion, DeqNode, Deque},
frequency_sketch::FrequencySketch,
AccessTime,
use crate::{
common::{
deque::{CacheRegion, DeqNode, Deque},
frequency_sketch::FrequencySketch,
AccessTime,
},
PredicateRegistrationError,
};

use quanta::{Clock, Instant};
Expand Down Expand Up @@ -117,6 +120,7 @@ pub struct Cache<K, V, S = RandomState> {
frequency_sketch: FrequencySketch,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
expiration_clock: Option<Clock>,
}

Expand All @@ -132,7 +136,7 @@ where
/// [builder-struct]: ./struct.CacheBuilder.html
pub fn new(max_capacity: usize) -> Self {
let build_hasher = RandomState::default();
Self::with_everything(max_capacity, None, build_hasher, None, None)
Self::with_everything(max_capacity, None, build_hasher, None, None, false)
}
}

Expand All @@ -150,6 +154,7 @@ where
build_hasher: S,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
) -> Self {
let cache = HashMap::with_capacity_and_hasher(
initial_capacity.unwrap_or_default(),
Expand All @@ -165,6 +170,7 @@ where
frequency_sketch,
time_to_live,
time_to_idle,
invalidator_enabled,
expiration_clock: None,
}
}
Expand Down Expand Up @@ -248,6 +254,18 @@ where
self.deques.clear();
}

pub fn invalidate_entries_if(
&mut self,
predicate: impl FnMut(&K, &V) -> bool,
) -> Result<(), PredicateRegistrationError> {
if self.invalidator_enabled {
self.do_invalidate_entries(predicate);
Ok(())
} else {
Err(PredicateRegistrationError::InvalidationClosuresDisabled)
}
}

/// Returns the `max_capacity` of this cache.
pub fn max_capacity(&self) -> usize {
self.max_capacity
Expand Down Expand Up @@ -353,7 +371,7 @@ where
KeyHashDate::new(Rc::clone(&key), hash, timestamp),
&mut entry,
);
if self.time_to_live.is_some() {
if self.time_to_live.is_some() || self.invalidator_enabled {
deqs.push_back_wo(KeyDate::new(key, timestamp), &mut entry);
}
} else {
Expand Down Expand Up @@ -382,7 +400,7 @@ where
KeyHashDate::new(Rc::clone(&key), hash, timestamp),
&mut entry,
);
if self.time_to_live.is_some() {
if self.time_to_live.is_some() || self.invalidator_enabled {
deqs.push_back_wo(KeyDate::new(key, timestamp), &mut entry);
}
} else {
Expand Down Expand Up @@ -530,6 +548,31 @@ where
}
}
}

// Avoid a false Clippy warning about needless collect to create keys_to_invalidate.
// clippy 0.1.52 (9a1dfd2dc5c 2021-04-30) in Rust 1.52.0-beta.7
#[allow(clippy::needless_collect)]
fn do_invalidate_entries(&mut self, mut predicate: impl FnMut(&K, &V) -> bool) {
let Self { cache, deques, .. } = self;

let keys_to_invalidate = (&mut deques.write_order)
.filter(|kd| {
if let Some((k, entry)) = cache.get_key_value(&kd.key) {
(predicate)(k, &entry.value)
} else {
false
}
})
.map(|kd| Rc::clone(&kd.key))
.collect::<Vec<_>>();

keys_to_invalidate.into_iter().for_each(|k| {
if let Some(mut entry) = cache.remove(&k) {
deques.unlink_ao(&mut entry);
Deques::unlink_wo(&mut deques.write_order, &mut entry);
}
});
}
}

//
Expand Down Expand Up @@ -613,6 +656,53 @@ mod tests {
assert_eq!(cache.get(&"d"), Some(&"david"));
}

#[test]
fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashSet;

let mut cache = CacheBuilder::new(100)
.support_invalidation_closures()
.build();

let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));

cache.insert(0, "alice");
cache.insert(1, "bob");
cache.insert(2, "alex");

mock.increment(Duration::from_secs(5)); // 5 secs from the start.

assert_eq!(cache.get(&0), Some(&"alice"));
assert_eq!(cache.get(&1), Some(&"bob"));
assert_eq!(cache.get(&2), Some(&"alex"));

let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;

mock.increment(Duration::from_secs(5)); // 10 secs from the start.

cache.insert(3, "alice");

assert!(cache.get(&0).is_none());
assert!(cache.get(&2).is_none());
assert_eq!(cache.get(&1), Some(&"bob"));
// This should survive as it was inserted after calling invalidate_entries_if.
assert_eq!(cache.get(&3), Some(&"alice"));
assert_eq!(cache.cache.len(), 2);

mock.increment(Duration::from_secs(5)); // 15 secs from the start.

cache.invalidate_entries_if(|_k, &v| v == "alice")?;
cache.invalidate_entries_if(|_k, &v| v == "bob")?;

assert!(cache.get(&1).is_none());
assert!(cache.get(&3).is_none());
assert_eq!(cache.cache.len(), 0);

Ok(())
}

#[test]
fn time_to_live() {
let mut cache = CacheBuilder::new(100)
Expand Down

0 comments on commit 869edb3

Please sign in to comment.