Skip to content

Commit

Permalink
In-memory engine: clean lock cf tombstone in a background worker (tik…
Browse files Browse the repository at this point in the history
…v#17128)

ref tikv#16141, close tikv#17127

clean lock cf tombstone in a background worker

Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang authored and overvenus committed Jun 17, 2024
1 parent fb87c7a commit 56ae893
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 106 deletions.
7 changes: 5 additions & 2 deletions components/hybrid_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ impl<EK: KvEngine> WriteBatch for HybridEngineWriteBatch<EK> {

fn write_callback_opt(&mut self, opts: &WriteOptions, mut cb: impl FnMut(u64)) -> Result<u64> {
let called = AtomicBool::new(false);
self.disk_write_batch
let res = self
.disk_write_batch
.write_callback_opt(opts, |s| {
if !called.fetch_or(true, Ordering::SeqCst) {
self.cache_write_batch.set_sequence_number(s).unwrap();
Expand All @@ -53,7 +54,9 @@ impl<EK: KvEngine> WriteBatch for HybridEngineWriteBatch<EK> {
.map(|s| {
cb(s);
s
})
});
self.cache_write_batch.maybe_compact_lock_cf();
res
}

fn data_size(&self) -> usize {
Expand Down
95 changes: 95 additions & 0 deletions components/region_cache_memory_engine/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub enum BackgroundTask {
LoadRange,
MemoryCheckAndEvict,
DeleteRange(Vec<CacheRange>),
CleanLockTombstone(u64),
SetRocksEngine(RocksEngine),
}

Expand All @@ -80,6 +81,10 @@ impl Display for BackgroundTask {
BackgroundTask::DeleteRange(ref r) => {
f.debug_struct("DeleteRange").field("range", r).finish()
}
BackgroundTask::CleanLockTombstone(ref r) => f
.debug_struct("CleanLockTombstone")
.field("seqno", r)
.finish(),
BackgroundTask::SetRocksEngine(_) => f.debug_struct("SetDiskEngine").finish(),
}
}
Expand Down Expand Up @@ -489,6 +494,11 @@ pub struct BackgroundRunner {
gc_range_remote: Remote<yatp::task::future::TaskCell>,
gc_range_worker: Worker,

lock_cleanup_remote: Remote<yatp::task::future::TaskCell>,
lock_cleanup_worker: Worker,

// The last sequence number for the lock cf tombstone cleanup
last_seqno: u64,
// RocksEngine is used to get the oldest snapshot sequence number.
rocks_engine: Option<RocksEngine>,
}
Expand All @@ -498,6 +508,7 @@ impl Drop for BackgroundRunner {
self.range_load_worker.stop();
self.delete_range_worker.stop();
self.gc_range_worker.stop();
self.lock_cleanup_worker.stop();
}
}

Expand All @@ -516,6 +527,9 @@ impl BackgroundRunner {
let delete_range_worker = Worker::new("background-delete-range_worker");
let delete_range_remote = delete_range_worker.remote();

let lock_cleanup_worker = Worker::new("lock-cleanup-worker");
let lock_cleanup_remote = lock_cleanup_worker.remote();

let gc_range_worker = Builder::new("background-range-load-worker")
// Gc must also use exactly one thread to handle it.
.thread_count(1)
Expand All @@ -532,6 +546,9 @@ impl BackgroundRunner {
delete_range_remote,
gc_range_worker,
gc_range_remote,
lock_cleanup_remote,
lock_cleanup_worker,
last_seqno: 0,
rocks_engine: None,
}
}
Expand Down Expand Up @@ -707,6 +724,84 @@ impl Runnable for BackgroundRunner {
let f = async move { core.delete_ranges(&ranges) };
self.delete_range_remote.spawn(f);
}
BackgroundTask::CleanLockTombstone(snapshot_seqno) => {
if snapshot_seqno < self.last_seqno {
return;
}
self.last_seqno = snapshot_seqno;
let core = self.core.clone();

let f = async move {
info!(
"begin to cleanup tombstones in lock cf";
"seqno" => snapshot_seqno,
);

let mut last_user_key = vec![];
let mut remove_rest = false;
let mut cached_to_remove: Option<Vec<u8>> = None;

let mut removed = 0;
let mut total = 0;
let now = Instant::now();
let lock_handle = core.engine.read().engine().cf_handle("lock");
let guard = &epoch::pin();
let mut iter = lock_handle.iterator();
iter.seek_to_first(guard);
while iter.valid() {
total += 1;
let InternalKey {
user_key,
v_type,
sequence,
} = decode_key(iter.key().as_bytes());
if user_key != last_user_key {
if let Some(remove) = cached_to_remove.take() {
removed += 1;
lock_handle.remove(&InternalBytes::from_vec(remove), guard);
}
last_user_key = user_key.to_vec();
if sequence >= snapshot_seqno {
remove_rest = false;
} else {
remove_rest = true;
if v_type == ValueType::Deletion {
cached_to_remove = Some(iter.key().as_bytes().to_vec());
}
}
} else if remove_rest {
assert!(sequence < snapshot_seqno);
removed += 1;
lock_handle.remove(iter.key(), guard);
} else if sequence < snapshot_seqno {
remove_rest = true;
if v_type == ValueType::Deletion {
assert!(cached_to_remove.is_none());
cached_to_remove = Some(iter.key().as_bytes().to_vec());
}
}

iter.next(guard);
}
if let Some(remove) = cached_to_remove.take() {
removed += 1;
lock_handle.remove(&InternalBytes::from_vec(remove), guard);
}

info!(
"cleanup tombstones in lock cf";
"seqno" => snapshot_seqno,
"total" => total,
"removed" => removed,
"duration" => ?now.saturating_elapsed(),
"current_count" => lock_handle.len(),
);

fail::fail_point!("clean_lock_tombstone_done");
};

self.lock_cleanup_remote.spawn(f);
}
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion components/region_cache_memory_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
fmt::{self, Debug},
ops::Bound,
result,
sync::Arc,
sync::{atomic::AtomicU64, Arc},
};

use crossbeam::epoch::{self, default_collector, Guard};
Expand Down Expand Up @@ -102,6 +102,14 @@ impl SkiplistHandle {
) -> OwnedIter<Arc<SkipList<InternalBytes, InternalBytes>>, InternalBytes, InternalBytes> {
self.0.owned_iter()
}

pub fn len(&self) -> usize {
self.0.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// A single global set of skiplists shared by all cached ranges
Expand Down Expand Up @@ -249,6 +257,11 @@ pub struct RangeCacheMemoryEngine {
memory_controller: Arc<MemoryController>,
statistics: Arc<Statistics>,
config: Arc<VersionTrack<RangeCacheEngineConfig>>,

// The increment amount of tombstones in the lock cf.
// When reaching to the threshold, a CleanLockTombstone task will be scheduled to clean lock cf
// tombstones.
pub(crate) lock_modification_bytes: Arc<AtomicU64>,
}

impl RangeCacheMemoryEngine {
Expand All @@ -274,6 +287,7 @@ impl RangeCacheMemoryEngine {
memory_controller,
statistics,
config,
lock_modification_bytes: Arc::default(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion components/region_cache_memory_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod statistics;
pub mod test_util;
mod write_batch;

pub use background::{BackgroundRunner, GcTask};
pub use background::{BackgroundRunner, BackgroundTask, GcTask};
pub use engine::{RangeCacheMemoryEngine, SkiplistHandle};
pub use keys::{decode_key, encoding_for_filter, InternalBytes, InternalKey, ValueType};
pub use metrics::flush_range_cache_engine_statistics;
Expand Down
Loading

0 comments on commit 56ae893

Please sign in to comment.