Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add lru cache to speed up scanning logs #302

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ required-features = ["failpoints"]

[dependencies]
byteorder = "1.2"
bytes = "1.0"
crc32fast = "1.2"
crossbeam = "0.8"
fail = "0.5"
Expand Down
321 changes: 321 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use std::{collections::HashMap, mem, ptr::NonNull};

use bytes::Bytes;

use crate::pipe_log::FileBlockHandle;

struct CacheEntry {
key: FileBlockHandle,
data: Bytes,
}

struct Node {
prev: NonNull<Node>,
next: NonNull<Node>,
entry: CacheEntry,
#[cfg(test)]
_leak_check: self::tests::LeakCheck,
}

pub struct LruCache {
cache: HashMap<FileBlockHandle, NonNull<Node>>,

cap: usize,
free: usize,
head: *mut Node,
tail: *mut Node,

#[cfg(test)]
leak_check: self::tests::LeakCheck,
}

#[inline]
fn need_size(payload: &[u8]) -> usize {
mem::size_of::<Node>() + payload.len()
}

#[inline]
unsafe fn promote(mut node: NonNull<Node>, head: &mut *mut Node, tail: &mut *mut Node) {
if node.as_ptr() == *tail {
return;
}
let mut prev = unsafe { node.as_ref().prev };
let mut next = unsafe { node.as_ref().next };
if node.as_ptr() == *head {
*head = next.as_ptr();
next.as_mut().prev = NonNull::dangling();
} else {
prev.as_mut().next = next;
next.as_mut().prev = prev;
}
(**tail).next = node;
node.as_mut().prev = NonNull::new_unchecked(*tail);
node.as_mut().next = NonNull::dangling();
*tail = node.as_ptr();
}

impl LruCache {
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self {
cache: HashMap::default(),
cap,
free: cap,
head: std::ptr::null_mut(),
tail: std::ptr::null_mut(),
#[cfg(test)]
leak_check: self::tests::LeakCheck::default(),
}
}

#[inline]
pub fn insert(&mut self, key: FileBlockHandle, data: Bytes) -> Option<Bytes> {
match self.cache.get_mut(&key) {
None => (),
Some(node) => {
unsafe {
// Technically they should be exact the same. Using the new version
// to avoid potential bugs.
assert_eq!(data.len(), node.as_ref().entry.data.len());
node.as_mut().entry.data = data.clone();
promote(*node, &mut self.head, &mut self.tail);
return Some(data);

Check warning on line 84 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L77-L84

Added lines #L77 - L84 were not covered by tests
}
}
}
let need_size = need_size(&data);
if need_size > self.cap {
return Some(data);

Check warning on line 90 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L90

Added line #L90 was not covered by tests
}
while self.free < need_size && self.remove_head() {}

let node = Box::new(Node {
prev: NonNull::dangling(),
next: NonNull::dangling(),
entry: CacheEntry { key, data },
#[cfg(test)]
_leak_check: self.leak_check.clone(),
});

let node = Box::into_raw(node);

if self.head.is_null() {
self.head = node;
self.tail = node;
} else {
unsafe {
(*self.tail).next = NonNull::new_unchecked(node);
(*node).prev = NonNull::new_unchecked(self.tail);
self.tail = node;
}
}
self.free -= need_size;
self.cache
.insert(key, unsafe { NonNull::new_unchecked(node) });
None
}

#[inline]
pub fn get(&mut self, key: &FileBlockHandle) -> Option<Bytes> {
let node = self.cache.get(key)?;
unsafe {
promote(*node, &mut self.head, &mut self.tail);
Some(node.as_ref().entry.data.clone())
}
}

#[inline]
fn remove_head(&mut self) -> bool {
if self.head.is_null() {
return false;

Check warning on line 132 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L132

Added line #L132 was not covered by tests
}
let mut head = unsafe { Box::from_raw(self.head) };
if self.head != self.tail {
self.head = head.next.as_ptr();
head.prev = NonNull::dangling();
} else {
self.head = std::ptr::null_mut();
self.tail = std::ptr::null_mut();
}
self.free += need_size(&head.entry.data);
self.cache.remove(&head.entry.key);
true
}

#[inline]
pub fn resize(&mut self, new_cap: usize) {
while (self.cap - self.free) > new_cap {
self.remove_head();
}
self.free = new_cap - (self.cap - self.free);
self.cap = new_cap;
}

#[inline]
pub fn clear(&mut self) {
for (_, node) in self.cache.drain() {
unsafe {
drop(Box::from_raw(node.as_ptr()));
}
}
self.head = std::ptr::null_mut();
self.tail = std::ptr::null_mut();
self.free = self.cap;
}
}

impl Drop for LruCache {
fn drop(&mut self) {
self.clear();
}
}

unsafe impl Sync for LruCache {}
unsafe impl Send for LruCache {}

#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicIsize, Ordering},
Arc,
};

use crate::internals::LogQueue;

use super::*;

pub struct LeakCheck {
cnt: Arc<AtomicIsize>,
}

impl LeakCheck {
pub fn clone(&self) -> Self {
self.cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
cnt: self.cnt.clone(),
}
}
}

impl Default for LeakCheck {
fn default() -> Self {
Self {
cnt: Arc::new(AtomicIsize::new(1)),
}
}
}

impl Drop for LeakCheck {
fn drop(&mut self) {
self.cnt.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}

#[test]
fn test_basic_lru() {
let mut cache = LruCache::with_capacity(1024);
let lc = cache.leak_check.clone();
let mut key = FileBlockHandle::dummy(LogQueue::Append);
for offset in 0..100 {
key.offset = offset;
cache.insert(key, vec![offset as u8; 10].into());
}
for offset in 0..100 {
key.offset = offset;
cache.insert(key, vec![offset as u8; 10].into());
}
let entry_len = need_size(&[0; 10]);
let entries_fit = (1024 / entry_len) as u64;
assert_eq!(cache.cap, 1024);
assert_eq!(cache.free, 1024 % entry_len);
for offset in 0..(100 - entries_fit) {
key.offset = offset;
assert_eq!(cache.get(&key).as_deref(), None, "{offset}");
}
for offset in (100 - entries_fit)..100 {
key.offset = offset;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8]),
"{offset}"

Check warning on line 242 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L242

Added line #L242 was not covered by tests
);
}

let offset = 100 - entries_fit;
key.offset = offset;
// Get will promote the entry and it will be the last to be removed.
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8]),
"{offset}"

Check warning on line 252 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L252

Added line #L252 was not covered by tests
);
for i in 1..entries_fit {
key.offset = 200 + i;
cache.insert(key, vec![key.offset as u8; 10].into());
key.offset = offset + i;
assert_eq!(cache.get(&key).as_deref(), None, "{i}");
}
key.offset = offset;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8]),
"{offset}"

Check warning on line 264 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L264

Added line #L264 was not covered by tests
);

cache.resize(2048);
assert_eq!(cache.cap, 2048);
assert_eq!(cache.free, 2048 - (entries_fit as usize * entry_len));
key.offset = 201;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[key.offset as u8; 10] as &[u8])
);
key.offset = offset;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8])
);

cache.resize(entry_len);
assert_eq!(cache.cap, entry_len);
assert_eq!(cache.free, 0);
key.offset = 201;
assert_eq!(cache.get(&key).as_deref(), None);
key.offset = offset;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8])
);

cache.resize(entry_len - 1);
assert_eq!(cache.cap, entry_len - 1);
assert_eq!(cache.free, entry_len - 1);
key.offset = offset;
assert_eq!(cache.get(&key).as_deref(), None);

cache.resize(1024);
cache.clear();
for offset in 0..100 {
key.offset = offset;
cache.insert(key, vec![offset as u8; 10].into());
}
for offset in 0..(100 - entries_fit) {
key.offset = offset;
assert_eq!(cache.get(&key).as_deref(), None, "{offset}");
}
for offset in (100 - entries_fit)..100 {
key.offset = offset;
assert_eq!(
cache.get(&key).as_deref(),
Some(&[offset as u8; 10] as &[u8]),
"{offset}"

Check warning on line 313 in src/cache.rs

View check run for this annotation

Codecov / codecov/patch

src/cache.rs#L313

Added line #L313 was not covered by tests
);
}

drop(cache);
// If there is leak or double free, the count will unlikely to be 1.
assert_eq!(lc.cnt.load(Ordering::Relaxed), 1);
}
}
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub struct Config {
///
/// Default: None
pub prefill_limit: Option<ReadableSize>,

/// Initial cache capacity for entries.
pub cache_capacity: ReadableSize,
}

impl Default for Config {
Expand All @@ -136,6 +139,7 @@ impl Default for Config {
memory_limit: None,
enable_log_recycle: false,
prefill_for_recycle: false,
cache_capacity: ReadableSize::mb(256),
prefill_limit: None,
};
// Test-specific configurations.
Expand Down
Loading