diff --git a/src/server/mod.rs b/src/server/mod.rs index 76d193890880..f3e78049e572 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -19,6 +19,7 @@ pub mod snap; pub mod status_server; pub mod transport; pub mod ttl; +mod truncate; pub use self::config::{Config, ServerConfigManager, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR}; pub use self::errors::{Error, Result}; diff --git a/src/server/truncate.rs b/src/server/truncate.rs new file mode 100644 index 000000000000..b367f0006d89 --- /dev/null +++ b/src/server/truncate.rs @@ -0,0 +1,263 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use super::Result; +use engine_rocks::{ RocksEngineIterator, RocksWriteBatch}; +use engine_traits::Iterable; +use engine_traits::{IterOptions, Iterator, CF_DEFAULT, CF_WRITE}; +use engine_traits::{Mutable, SeekKey}; +use std::sync::{Arc, Mutex}; +use txn_types::{Key, TimeStamp, Write, WriteRef}; +use engine_traits::WriteBatch; +use engine_rocks::RocksEngine; +use engine_traits::WriteBatchExt; +use std::thread::JoinHandle; + +const BATCH_SIZE: usize = 256; + +#[derive(Debug, Clone)] +pub struct TruncateState { + // todo: estimated_total: usize, + // todo: delete_count: usize, + scan_count: usize, + done: bool, +} + +pub struct TruncateWorker { + ts: TimeStamp, + write_iter: RocksEngineIterator, + state: Arc>, +} + +#[allow(dead_code)] +impl TruncateWorker { + pub fn new( + mut write_iter: RocksEngineIterator, + ts: TimeStamp, + state: Arc>, + ) -> Self { + state.lock().expect("failed to lock `state` in `TruncateWorker::new`").done = false; + write_iter.seek(SeekKey::Start).unwrap(); + Self { + write_iter, + ts, + state, + } + } + + fn next_write(&mut self) -> Result, Write)>> { + if self.write_iter.valid().unwrap() { + let mut state = self + .state + .lock() + .expect("failed to lock TruncateWorker::state"); + state.scan_count += 1; + drop(state); + let write = box_try!(WriteRef::parse(self.write_iter.value())).to_owned(); + let key = self.write_iter.key().to_vec(); + self.write_iter.next().unwrap(); + return Ok(Some((key, write))); + } + Ok(None) + } + + fn scan_next_batch(&mut self, batch_size: usize) -> Result<(Vec<(Vec, Write)>, bool)> { + let mut writes = Vec::with_capacity(batch_size); + let mut has_more = true; + for _ in 0..batch_size { + if let Some((key, write)) = self.next_write()? { + let commit_ts = box_try!(Key::decode_ts_from(keys::origin_key(&key))); + if commit_ts > self.ts { + writes.push((key, write)); + } + } else { + has_more = false; + break; + } + } + Ok((writes, has_more)) + } + + pub fn process_next_batch( + &mut self, + batch_size: usize, + wb: &mut RocksWriteBatch, + ) -> Result { + let (writes, has_more) = self.scan_next_batch(batch_size)?; + for (key, write) in writes { + let default_key = Key::from_encoded_slice(&key).truncate_ts().unwrap().append_ts(write.start_ts); + box_try!(wb.delete_cf(CF_WRITE, &key)); + box_try!(wb.delete_cf(CF_DEFAULT, default_key.as_encoded())); + } + wb.write().unwrap(); + if !has_more { + self.state.lock().expect("failed to lock `TruncateWorker::state` in `TruncateWorker::process_next_batch`").done = true; + } + wb.clear(); + Ok(has_more) + } +} + + +pub struct TruncateManager { + state: Arc>, + engine: RocksEngine, + worker_handle: Option>, +} + +#[allow(dead_code)] +impl TruncateManager { + pub fn new(engine: RocksEngine) -> Self { + let state = Arc::new(Mutex::new(TruncateState { + scan_count: 0, + done: false + })); + TruncateManager { + state, + engine, + worker_handle: None, + } + } + + pub fn start(&mut self, ts: TimeStamp) { + let readopts = IterOptions::new(None, None, false); + let write_iter = self.engine.iterator_cf_opt(CF_WRITE, readopts).unwrap(); + let mut worker = TruncateWorker::new(write_iter, ts, self.state.clone()); + let mut wb = self.engine.write_batch(); + let props = tikv_util::thread_group::current_properties(); + self.worker_handle = Some(std::thread::Builder::new() + .name("truncate".to_string()) + .spawn(move || { + tikv_util::thread_group::set_properties(props); + tikv_alloc::add_thread_memory_accessor(); + + while worker.process_next_batch(BATCH_SIZE, &mut wb).expect("truncate failed") { + } + + tikv_alloc::remove_thread_memory_accessor(); + }) + .expect("failed to spawn truncate thread")); + } + + pub fn state(&self) -> TruncateState { + self.state.lock().expect("failed to lock `state` in `TruncateManager::state`").clone() + } + + #[cfg(test)] + pub fn wait(&mut self) { + self.worker_handle.take().unwrap().join().unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use engine_traits::{CF_LOCK, CF_RAFT}; + use engine_rocks::raw::{ColumnFamilyOptions, DBOptions}; + use engine_rocks::raw_util::CFOptions; + use engine_rocks::Compat; + use engine_traits::{WriteBatch, WriteBatchExt}; + use tempfile::Builder; + use txn_types::WriteType; + + #[test] + fn test_basic() { + enum Expect { + Keep, + Remove, + } + + let tmp = Builder::new() + .prefix("test_basic") + .tempdir() + .unwrap(); + let path = tmp.path().to_str().unwrap(); + let fake_engine = Arc::new( + engine_rocks::raw_util::new_engine_opt( + path, + DBOptions::new(), + vec![ + CFOptions::new(CF_DEFAULT, ColumnFamilyOptions::new()), + CFOptions::new(CF_WRITE, ColumnFamilyOptions::new()), + CFOptions::new(CF_LOCK, ColumnFamilyOptions::new()), + CFOptions::new(CF_RAFT, ColumnFamilyOptions::new()), + ], + ) + .unwrap(), + ); + + let write = vec![ + // key, start_ts, commit_ts + (b"k", 104, 105, Expect::Remove), + (b"k", 102, 103, Expect::Remove), + (b"k", 100, 101, Expect::Keep), + (b"k", 98, 99, Expect::Keep), + ]; + let default = vec![ + // key, start_ts + (b"k", 104, Expect::Remove), + (b"k", 102, Expect::Remove), + (b"k", 100, Expect::Keep), + (b"k", 98, Expect::Keep), + ]; + let mut kv = vec![]; + for (key, start_ts, commit_ts, expect) in write { + let write = Write::new(WriteType::Put, start_ts.into(), None); + kv.push(( + CF_WRITE, + Key::from_raw(key).append_ts(commit_ts.into()), + write.as_ref().to_bytes(), + expect, + )); + } + for (key, ts, expect) in default { + kv.push(( + CF_DEFAULT, + Key::from_raw(key).append_ts(ts.into()), + b"v".to_vec(), + expect, + )); + } + + let mut wb = fake_engine.c().write_batch(); + for &(cf, ref k, ref v, _) in &kv { + wb.put_cf(cf, &keys::data_key(k.as_encoded()), v).unwrap(); + } + wb.write().unwrap(); + + let mut manager = TruncateManager::new(fake_engine.c().clone()); + manager.start(100.into()); + manager.wait(); + + let readopts = IterOptions::new(None, None, false); + let mut write_iter = fake_engine + .c() + .iterator_cf_opt(CF_WRITE, readopts.clone()) + .unwrap(); + write_iter.seek(SeekKey::Start).unwrap(); + let mut remaining_writes = vec![]; + while write_iter.valid().unwrap() { + let write = WriteRef::parse(write_iter.value()).unwrap().to_owned(); + let key = write_iter.key().to_vec(); + write_iter.next().unwrap(); + remaining_writes.push((key, write)); + } + let mut default_iter = fake_engine + .c() + .iterator_cf_opt(CF_DEFAULT, readopts) + .unwrap(); + default_iter.seek(SeekKey::Start).unwrap(); + let mut remaining_defaults = vec![]; + while default_iter.valid().unwrap() { + let key = default_iter.key().to_vec(); + let value = default_iter.value().to_vec(); + default_iter.next().unwrap(); + remaining_defaults.push((key, value)); + } + assert_eq!(remaining_writes.len(), 1); + let (key, _) = &remaining_writes[0]; + assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 99.into()); + assert_eq!(remaining_defaults.len(), 1); + let (key, _) = &remaining_defaults[0]; + assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 98.into()); + } +}