diff --git a/src/common/deque.rs b/src/common/deque.rs index 281224c1..f55f0391 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -654,6 +654,51 @@ mod tests { assert!((&mut deque).next().is_none()); } + #[test] + fn next_node() { + let mut deque: Deque = Deque::new(MainProbation); + + let node1 = DeqNode::new(MainProbation, "a".into()); + deque.push_back(Box::new(node1)); + let node2 = DeqNode::new(MainProbation, "b".into()); + let node2_ptr = deque.push_back(Box::new(node2)); + let node3 = DeqNode::new(MainProbation, "c".into()); + let node3_ptr = deque.push_back(Box::new(node3)); + + // ------------------------------------------------------- + // First iteration. + // peek_front() -> node1 + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node2a = node1a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + let node3a = node2a.next_node().unwrap(); + assert_eq!(node3a.element, "c".to_string()); + assert!(node3a.next_node().is_none()); + + // ------------------------------------------------------- + // Iterate after a move_to_back. + // Move "b" to the back. So now "a" -> "c" -> "b". + unsafe { deque.move_to_back(node2_ptr) }; + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node3a = node1a.next_node().unwrap(); + assert_eq!(node3a.element, "c".to_string()); + let node2a = node3a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + assert!(node2a.next_node().is_none()); + + // ------------------------------------------------------- + // Iterate after an unlink. + // Unlink the second node "c". Now "a" -> "c". + unsafe { deque.unlink(node3_ptr) }; + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node2a = node1a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + assert!(node2a.next_node().is_none()); + } + #[test] fn drop() { use std::{cell::RefCell, rc::Rc}; diff --git a/src/future/builder.rs b/src/future/builder.rs index 20653175..644db156 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -1,3 +1,5 @@ +use crate::common::Weighter; + use super::Cache; use std::{ @@ -36,16 +38,17 @@ use std::{ /// // after 30 minutes (TTL) from the insert(). /// ``` /// -pub struct CacheBuilder { +pub struct CacheBuilder { max_capacity: Option, initial_capacity: Option, + weighter: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, cache_type: PhantomData, } -impl CacheBuilder> +impl CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -54,6 +57,7 @@ where Self { max_capacity: None, initial_capacity: None, + weighter: None, time_to_live: None, time_to_idle: None, invalidator_enabled: false, @@ -77,6 +81,7 @@ where self.max_capacity, self.initial_capacity, build_hasher, + self.weighter, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -92,6 +97,7 @@ where self.max_capacity, self.initial_capacity, hasher, + self.weighter, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -99,7 +105,15 @@ where } } -impl CacheBuilder { +impl CacheBuilder { + /// Sets the max capacity of the cache. + pub fn max_capacity(self, max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..self + } + } + /// Sets the initial capacity of the cache. pub fn initial_capacity(self, capacity: usize) -> Self { Self { @@ -108,6 +122,14 @@ impl CacheBuilder { } } + /// Sets the weighter closure of the cache. + pub fn weighter(self, weighter: Weighter) -> Self { + Self { + weighter: Some(weighter), + ..self + } + } + /// Sets the time to live of the cache. /// /// A cached entry will be expired after the specified duration past from diff --git a/src/future/cache.rs b/src/future/cache.rs index 494afa7c..e5d08609 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -3,6 +3,7 @@ use super::{ CacheBuilder, ConcurrentCacheExt, }; use crate::{ + common::Weighter, sync::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, @@ -223,10 +224,18 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); - Self::with_everything(Some(max_capacity), None, build_hasher, None, None, false) + Self::with_everything( + Some(max_capacity), + None, + build_hasher, + None, + None, + None, + false, + ) } - pub fn builder() -> CacheBuilder> { + pub fn builder() -> CacheBuilder> { CacheBuilder::unbound() } } @@ -241,6 +250,7 @@ where max_capacity: Option, initial_capacity: Option, build_hasher: S, + weighter: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -250,6 +260,7 @@ where max_capacity, initial_capacity, build_hasher.clone(), + weighter, time_to_live, time_to_idle, invalidator_enabled, diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 9d854f6c..64b6a34e 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -86,6 +86,7 @@ where max_capacity: Option, initial_capacity: Option, build_hasher: S, + weighter: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -96,6 +97,7 @@ where max_capacity, initial_capacity, build_hasher, + weighter, r_rcv, w_rcv, time_to_live, @@ -454,6 +456,7 @@ where max_capacity: Option, initial_capacity: Option, build_hasher: S, + weighter: Option>, read_op_ch: Receiver>, write_op_ch: Receiver>, time_to_live: Option, @@ -484,7 +487,7 @@ where time_to_live, time_to_idle, valid_after: AtomicU64::new(0), - weighter: None, + weighter, invalidator_enabled, // When enabled, this field will be set later via the set_invalidator method. invalidator: RwLock::new(None), diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 79e709c9..82901794 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -1,3 +1,5 @@ +use crate::common::Weighter; + use super::{Cache, SegmentedCache}; use std::{ @@ -38,28 +40,28 @@ use std::{ /// // after 30 minutes (TTL) from the insert(). /// ``` /// -pub struct CacheBuilder { +pub struct CacheBuilder { max_capacity: Option, initial_capacity: Option, num_segments: Option, + weighter: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, cache_type: PhantomData, } -impl CacheBuilder> +impl CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` or - /// `SegmentedCache` holding up to `max_capacity` entries. - pub fn new(max_capacity: usize) -> Self { + pub(crate) fn unbound() -> Self { Self { - max_capacity: Some(max_capacity), + max_capacity: None, initial_capacity: None, num_segments: None, + weighter: None, time_to_live: None, time_to_idle: None, invalidator_enabled: false, @@ -67,18 +69,31 @@ where } } + /// Construct a new `CacheBuilder` that will be used to build a `Cache` or + /// `SegmentedCache` holding up to `max_capacity` entries. + pub fn new(max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..Self::unbound() + } + } + /// Sets the number of segments of the cache. /// /// # Panics /// /// Panics if `num_segments` is less than or equals to 1. - pub fn segments(self, num_segments: usize) -> CacheBuilder> { + pub fn segments( + self, + num_segments: usize, + ) -> CacheBuilder> { assert!(num_segments > 1); CacheBuilder { max_capacity: self.max_capacity, initial_capacity: self.initial_capacity, num_segments: Some(num_segments), + weighter: None, time_to_live: self.time_to_live, time_to_idle: self.time_to_idle, invalidator_enabled: self.invalidator_enabled, @@ -96,6 +111,7 @@ where self.max_capacity, self.initial_capacity, build_hasher, + self.weighter, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -114,6 +130,7 @@ where self.max_capacity, self.initial_capacity, hasher, + self.weighter, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -121,7 +138,7 @@ where } } -impl CacheBuilder> +impl CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -163,7 +180,15 @@ where } } -impl CacheBuilder { +impl CacheBuilder { + /// Sets the max capacity of the cache. + pub fn max_capacity(self, max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..self + } + } + /// Sets the initial capacity of the cache. pub fn initial_capacity(self, capacity: usize) -> Self { Self { @@ -172,6 +197,14 @@ impl CacheBuilder { } } + /// Sets the weighter closure of the cache. + pub fn weighter(self, weighter: Weighter) -> Self { + Self { + weighter: Some(weighter), + ..self + } + } + /// Sets the time to live of the cache. /// /// A cached entry will be expired after the specified duration past from diff --git a/src/sync/cache.rs b/src/sync/cache.rs index d37a6553..1fa80f98 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -2,9 +2,9 @@ use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, value_initializer::ValueInitializer, - ConcurrentCacheExt, PredicateId, WriteOp, + CacheBuilder, ConcurrentCacheExt, PredicateId, WriteOp, }; -use crate::{sync::value_initializer::InitResult, PredicateError}; +use crate::{common::Weighter, sync::value_initializer::InitResult, PredicateError}; use crossbeam_channel::{Sender, TrySendError}; use std::{ @@ -195,7 +195,19 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); - Self::with_everything(Some(max_capacity), None, build_hasher, None, None, false) + Self::with_everything( + Some(max_capacity), + None, + build_hasher, + None, + None, + None, + false, + ) + } + + pub fn builder() -> CacheBuilder> { + CacheBuilder::unbound() } } @@ -209,6 +221,7 @@ where max_capacity: Option, initial_capacity: Option, build_hasher: S, + weighter: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -218,6 +231,7 @@ where max_capacity, initial_capacity, build_hasher.clone(), + weighter, time_to_live, time_to_idle, invalidator_enabled, @@ -574,6 +588,69 @@ mod tests { assert_eq!(cache.get(&"b"), None); } + #[test] + fn size_aware_admission() { + let weighter = |_k: &&str, v: &(&str, u64)| v.1; + + let alice = ("alice", 10u64); + let bob = ("bob", 15); + let cindy = ("cindy", 5); + let david = ("david", 15); + let dennis = ("dennis", 15); + + let mut cache = Cache::builder() + .max_capacity(31) + .weighter(Box::new(weighter)) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", alice); + cache.insert("b", bob); + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order (LRU -> MRU) and counts: a -> 1, b -> 1 + + cache.insert("c", cindy); + assert_eq!(cache.get(&"c"), Some(cindy)); + // order and counts: a -> 1, b -> 1, c -> 1 + cache.sync(); + + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order and counts: c -> 1, a -> 2, b -> 2 + + // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). + // "d" must have higher count than 3, which is the aggregated count of "a" and "c". + cache.insert("d", david); // count: d -> 0 + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 3 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 4 + + // Finally "d" should be admitted by evicting "c" and "a". + cache.insert("d", dennis); + cache.sync(); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(bob)); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(dennis)); + } + #[test] fn basic_multi_threads() { let num_threads = 4; diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 2068fe73..1e50ba31 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -382,6 +382,7 @@ where seg_max_capacity, seg_init_capacity, build_hasher.clone(), + None, // TODO time_to_live, time_to_idle, invalidator_enabled,