Skip to content

Commit

Permalink
Remove invalidator_enabled field from sync::base_cache::Inner
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya6502 committed Sep 17, 2023
1 parent 6dce691 commit 42149b0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 34 deletions.
39 changes: 25 additions & 14 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
// TODO: Maybe we can just call ScanningGet::scanning_get.
self.inner
.get_key_value_and(key, hash, |k, entry| {
let i = &self.inner;
Expand Down Expand Up @@ -495,6 +496,10 @@ where

let ts = self.current_time_from_expiration_clock();

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
// conflicted with other concurrent hash table operations. In that case, it
Expand Down Expand Up @@ -1089,6 +1094,14 @@ impl<K, V, S> Inner<K, V, S> {
)
}

fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
where
K: Hash + Eq,
S: BuildHasher,
{
self.key_locks.as_ref().map(|kls| kls.key_lock(key))
}

#[inline]
fn current_time_from_expiration_clock(&self) -> Instant {
if self.clocks.has_expiration_clock.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -1150,20 +1163,6 @@ impl<K, V, S> Inner<K, V, S> {
}
}

impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq,
S: BuildHasher,
{
fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
where
K: Hash + Eq,
S: BuildHasher,
{
self.key_locks.as_ref().map(|kls| kls.key_lock(key))
}
}

impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
Expand All @@ -1185,6 +1184,8 @@ where
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
) -> Self {
// TODO: Calculate the number of segments based on the max capacity and
// the number of CPUs.
let (num_segments, initial_capacity) = if max_capacity == Some(0) {
(1, 0)
} else {
Expand Down Expand Up @@ -1367,6 +1368,9 @@ where
}
}

// TODO: Calculate the batch size based on the number of entries in the cache (or an
// estimated number of entries to evict)

#[cfg(feature = "unstable-debug-counters")]
mod batch_size {
pub(crate) const EVICTION_BATCH_SIZE: usize = 10_000;
Expand Down Expand Up @@ -1452,6 +1456,8 @@ where
.await;
}

// TODO: When run_pending_tasks was called explicitly, do not stop evicting
// at the batch size.
if self.has_expiry() || self.has_valid_after() {
self.evict_expired_entries_using_deqs(
&mut deqs,
Expand All @@ -1462,6 +1468,8 @@ where
.await;
}

// TODO: When run_pending_tasks was called explicitly, do not stop
// invalidating at the batch size.
if let Some(invalidator) = &self.invalidator {
if !invalidator.is_empty() {
self.invalidate_entries(
Expand All @@ -1475,6 +1483,9 @@ where
}
}

// TODO: When run_pending_tasks was called explicitly, do not stop evicting
// at the batch size.

// Evict if this cache has more entries than its capacity.
let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
if weights_to_evict > 0 {
Expand Down
1 change: 1 addition & 0 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,7 @@ where
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
// TODO: Like future::Cache, move this method to BaseCache.
#[inline]
fn schedule_write_op(
inner: &impl InnerSync,
Expand Down
53 changes: 33 additions & 20 deletions src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
// TODO: Maybe we can just call ScanningGet::scanning_get.
self.inner
.get_key_value_and(key, hash, |k, entry| {
let i = &self.inner;
Expand Down Expand Up @@ -492,6 +493,10 @@ where

let ts = self.current_time_from_expiration_clock();

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
// conflicted with other concurrent hash table operations. In that case, it
Expand Down Expand Up @@ -900,7 +905,6 @@ pub(crate) struct Inner<K, V, S> {
weigher: Option<Weigher<K, V>>,
removal_notifier: Option<RemovalNotifier<K, V>>,
key_locks: Option<KeyLockMap<K, S>>,
invalidator_enabled: bool,
invalidator: Option<Invalidator<K, V, S>>,
clocks: Clocks,
}
Expand Down Expand Up @@ -984,7 +988,7 @@ impl<K, V, S> Inner<K, V, S> {

#[inline]
fn is_write_order_queue_enabled(&self) -> bool {
self.expiration_policy.time_to_live().is_some() || self.invalidator_enabled
self.expiration_policy.time_to_live().is_some() || self.invalidator.is_some()
}

#[inline]
Expand Down Expand Up @@ -1024,6 +1028,8 @@ where
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
) -> Self {
// TODO: Calculate the number of segments based on the max capacity and the
// number of CPUs.
let (num_segments, initial_capacity) = if max_capacity == Some(0) {
(1, 0)
} else {
Expand Down Expand Up @@ -1077,7 +1083,6 @@ where
weigher,
removal_notifier,
key_locks,
invalidator_enabled,
invalidator,
clocks,
}
Expand Down Expand Up @@ -1155,10 +1160,8 @@ where
where
V: Clone,
{
if self.invalidator_enabled {
if let Some(inv) = &self.invalidator {
return inv.apply_predicates(key, entry);
}
if let Some(inv) = &self.invalidator {
return inv.apply_predicates(key, entry);
}
false
}
Expand Down Expand Up @@ -1202,6 +1205,9 @@ where
}
}

// TODO: Calculate the batch size based on the number of entries in the cache (or an
// estimated number of entries to evict)

#[cfg(feature = "unstable-debug-counters")]
mod batch_size {
pub(crate) const EVICTION_BATCH_SIZE: usize = 10_000;
Expand All @@ -1214,12 +1220,6 @@ mod batch_size {
pub(crate) const INVALIDATION_BATCH_SIZE: usize = 500;
}

// TODO: Divide this method into smaller methods so that unit tests can do more
// precise testing.
// - sync_reads
// - sync_writes
// - evict
// - invalidate_entries
impl<K, V, S> InnerSync for Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
Expand All @@ -1246,8 +1246,10 @@ where
return;
}

// Acquire some locks.
let mut deqs = self.deques.lock();
let mut timer_wheel = self.timer_wheel.lock();

let mut calls = 0;
let current_ec = self.entry_count.load();
let current_ws = self.weighted_size.load();
Expand Down Expand Up @@ -1284,6 +1286,8 @@ where
);
}

// TODO: When run_pending_tasks was called explicitly, do not stop evicting
// at the batch size.
if self.has_expiry() || self.has_valid_after() {
self.evict_expired_entries_using_deqs(
&mut deqs,
Expand All @@ -1293,6 +1297,8 @@ where
);
}

// TODO: When run_pending_tasks was called explicitly, do not stop
// invalidating at the batch size.
if let Some(invalidator) = &self.invalidator {
if !invalidator.is_empty() {
self.invalidate_entries(
Expand All @@ -1305,6 +1311,9 @@ where
}
}

// TODO: When run_pending_tasks was called explicitly, do not stop evicting
// at the batch size.

// Evict if this cache has more entries than its capacity.
let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
if weights_to_evict > 0 {
Expand All @@ -1324,6 +1333,9 @@ where
.store(eviction_state.counters.weighted_size);

crossbeam_epoch::pin().flush();

// Ensure the deqs lock is held until here.
drop(deqs);
}
}

Expand Down Expand Up @@ -1841,14 +1853,14 @@ where
deqs: &mut Deques<K>,
timer_wheel: &mut TimerWheel<K>,
batch_size: usize,
eviction_state: &mut EvictionState<'_, K, V>,
state: &mut EvictionState<'_, K, V>,
) where
V: Clone,
{
let now = self.current_time_from_expiration_clock();

if self.is_write_order_queue_enabled() {
self.remove_expired_wo(deqs, timer_wheel, batch_size, now, eviction_state);
self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state);
}

if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() {
Expand All @@ -1860,7 +1872,7 @@ where
);

let mut rm_expired_ao = |name, deq| {
self.remove_expired_ao(name, deq, wo, timer_wheel, batch_size, now, eviction_state)
self.remove_expired_ao(name, deq, wo, timer_wheel, batch_size, now, state)
};

rm_expired_ao("window", window);
Expand Down Expand Up @@ -2258,10 +2270,11 @@ where
S: BuildHasher + Clone,
{
fn invalidation_predicate_count(&self) -> usize {
self.invalidator
.as_ref()
.map(|inv| inv.predicate_count())
.unwrap_or(0)
if let Some(inv) = &self.invalidator {
inv.predicate_count()
} else {
0
}
}

fn set_expiration_clock(&self, clock: Option<Clock>) {
Expand Down

0 comments on commit 42149b0

Please sign in to comment.