Skip to content

Commit

Permalink
util: evict until fit when inserting (#14847) (#14850)
Browse files Browse the repository at this point in the history
close #14815

LRU cache now will try to make its size strictly less than the capacity.

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: hillium <yujuncen@pingcap.com>
Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 12, 2023
1 parent 68a4845 commit 2e7c907
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
32 changes: 32 additions & 0 deletions components/cdc/src/old_value.rs
Expand Up @@ -615,4 +615,36 @@ mod tests {
let perf_delta = perf_instant.delta();
assert_eq!(perf_delta.block_read_count, 1);
}

#[test]
fn test_old_value_capacity_not_exceed_quota() {
let mut cache = OldValueCache::new(ReadableSize(1000));
fn short_val() -> OldValue {
OldValue::Value {
value: b"s".to_vec(),
}
}
fn long_val() -> OldValue {
OldValue::Value {
value: vec![b'l'; 1024],
}
}
fn enc(i: i32) -> Key {
Key::from_encoded(i32::to_ne_bytes(i).to_vec())
}

for i in 0..100 {
cache.insert(enc(i), (short_val(), None));
}
for i in 100..200 {
// access the previous key for making it not be evicted
cache.cache.get(&enc(i - 1));
cache.insert(enc(i), (long_val(), None));
}
assert!(
cache.cache.size() <= 1000,
"but it is {}",
cache.cache.size()
);
}
}
47 changes: 45 additions & 2 deletions components/tikv_util/src/lru.rs
Expand Up @@ -245,7 +245,6 @@ where
let current_size = SizePolicy::<K, V>::current(&self.size_policy);
match self.map.entry(key) {
HashMapEntry::Occupied(mut e) => {
// TODO: evict entries if size exceeds capacity.
self.size_policy.on_remove(e.key(), &e.get().value);
self.size_policy.on_insert(e.key(), &value);
let mut entry = e.get_mut();
Expand All @@ -254,7 +253,6 @@ where
}
HashMapEntry::Vacant(v) => {
let record = if self.capacity <= current_size {
// TODO: evict not only one entry to fit capacity.
let res = self.trace.reuse_tail(v.key().clone());
old_key = Some(res.0);
res.1
Expand All @@ -270,6 +268,28 @@ where
let entry = self.map.remove(&o).unwrap();
self.size_policy.on_remove(&o, &entry.value);
}

// NOTE: now when inserting a value larger than the capacity, actually this
// implementation will clean the whole cache.
// Perhaps we can reject entries larger than capacity goes in the LRU cache, but
// that is impossible for now: the `SizePolicy` trait doesn't provide the
// interface of querying the actual size of an item.
self.evict_until_fit()
}

fn evict_until_fit(&mut self) {
let cap = self.capacity;
loop {
let current_size = self.size_policy.current();
// Should we keep at least one entry? So our users won't lose their fresh record
// once it exceeds the capacity.
if current_size <= cap || self.map.is_empty() {
break;
}
let key = self.trace.remove_tail();
let val = self.map.remove(&key).unwrap();
self.size_policy.on_remove(&key, &val.value);
}
}

#[inline]
Expand Down Expand Up @@ -583,4 +603,27 @@ mod tests {
assert_eq!(map.get(&i), Some(&vec![b' ']));
}
}

#[test]
fn test_oversized() {
let mut cache = LruCache::with_capacity_sample_and_trace(42, 0, TestTracker(0));
cache.insert(
42,
b"this is the answer... but will it being inserted?".to_vec(),
);
assert!(cache.size() <= 42);
cache.insert(42, b"Aha, perhaps an shorter answer.".to_vec());
assert!(cache.size() <= 42);
cache.insert(43, b"Yet a new challenger.".to_vec());
assert!(cache.size() <= 42);

for i in 0..100 {
cache.insert(i, vec![i as _]);
assert!(cache.size() <= 42);
}
for i in 90..200 {
cache.insert(i, vec![i as _; 8]);
assert!(cache.size() <= 42);
}
}
}

0 comments on commit 2e7c907

Please sign in to comment.