diff --git a/Cargo.lock b/Cargo.lock index 54d01fdec57..ffdfa700e32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -699,6 +699,17 @@ dependencies = [ "toml", ] +[[package]] +name = "engine_traits" +version = "0.0.1" +dependencies = [ + "hex", + "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error", + "tikv_alloc", + "tikv_util", +] + [[package]] name = "error-chain" version = "0.12.1" @@ -2939,6 +2950,7 @@ dependencies = [ "crossbeam", "derive_more", "engine", + "engine_traits", "fail", "failure", "farmhash", diff --git a/Cargo.toml b/Cargo.toml index deebe63be63..25e805627cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ engine = { path = "components/engine" } tidb_query = { path = "components/tidb_query" } pd_client = { path = "components/pd_client" } keys = { path = "components/keys" } +engine_traits = { path = "components/engine_traits" } sst_importer = { path = "components/sst_importer" } [dependencies.murmur3] diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 8d4e6800329..636acdf48bb 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -748,7 +748,7 @@ pub mod tests { let commit = alloc_ts(); must_commit(&engine, key.as_bytes(), start, commit); - // Test whether it can correctly convert not leader to regoin error. + // Test whether it can correctly convert not leader to region error. engine.trigger_not_leader(); let now = alloc_ts(); req.set_start_version(now); diff --git a/components/engine/src/errors.rs b/components/engine/src/errors.rs index da5c7b77a49..bc81621eb44 100644 --- a/components/engine/src/errors.rs +++ b/components/engine/src/errors.rs @@ -12,11 +12,11 @@ quick_error! { display("RocksDb {}", msg) } // FIXME: It should not know Region. - NotInRange( key: Vec, regoin_id: u64, start: Vec, end: Vec) { + NotInRange( key: Vec, region_id: u64, start: Vec, end: Vec) { description("Key is out of range") display( "Key {} is out of [region {}] [{}, {})", - hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end) + hex::encode_upper(&key), region_id, hex::encode_upper(&start), hex::encode_upper(&end) ) } Protobuf(err: protobuf::ProtobufError) { diff --git a/components/engine/src/iterable.rs b/components/engine/src/iterable.rs index 0b2b672c51c..af2e026e82e 100644 --- a/components/engine/src/iterable.rs +++ b/components/engine/src/iterable.rs @@ -174,14 +174,14 @@ pub trait Iterable { scan_impl(self.new_iterator_cf(cf, iter_opt)?, start_key, f) } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek(&self, key: &[u8]) -> Result, Vec)>> { let mut iter = self.new_iterator(IterOption::default()); iter.seek(key.into()); Ok(iter.kv()) } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek_cf(&self, cf: &str, key: &[u8]) -> Result, Vec)>> { let mut iter = self.new_iterator_cf(cf, IterOption::default())?; iter.seek(key.into()); diff --git a/components/engine_traits/Cargo.toml b/components/engine_traits/Cargo.toml new file mode 100644 index 00000000000..c9180432c97 --- /dev/null +++ b/components/engine_traits/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "engine_traits" +version = "0.0.1" +edition = "2018" +publish = false + +[dependencies] +protobuf = "2" +quick-error = "1.2.2" +tikv_alloc = { path = "../tikv_alloc", default-features = false } +hex = "0.3" +tikv_util = { path = "../tikv_util" } diff --git a/components/engine_traits/src/cf.rs b/components/engine_traits/src/cf.rs new file mode 100644 index 00000000000..f6caca40623 --- /dev/null +++ b/components/engine_traits/src/cf.rs @@ -0,0 +1,11 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +pub type CfName = &'static str; +pub const CF_DEFAULT: CfName = "default"; +pub const CF_LOCK: CfName = "lock"; +pub const CF_WRITE: CfName = "write"; +pub const CF_RAFT: CfName = "raft"; +// Cfs that should be very large generally. +pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE]; +pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT]; +pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE]; diff --git a/components/engine_traits/src/engine.rs b/components/engine_traits/src/engine.rs new file mode 100644 index 00000000000..c1a73231b31 --- /dev/null +++ b/components/engine_traits/src/engine.rs @@ -0,0 +1,65 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::fmt::Debug; +use std::path::Path; + +use crate::*; + +pub trait Snapshot: 'static + Peekable + Send + Sync + Debug { + fn cf_names(&self) -> Vec<&str>; +} + +pub trait WriteBatch: Mutable { + fn data_size(&self) -> usize; + fn count(&self) -> usize; + fn is_empty(&self) -> bool; + fn clear(&self); + + fn set_save_point(&mut self); + fn pop_save_point(&mut self) -> Result<()>; + fn rollback_to_save_point(&mut self) -> Result<()>; +} + +pub trait KvEngine: Peekable + Mutable + Iterable + Send + Sync + Clone { + type Snap: Snapshot; + type Batch: WriteBatch; + + fn write_opt(&self, opts: &WriteOptions, wb: &Self::Batch) -> Result<()>; + fn write(&self, wb: &Self::Batch) -> Result<()> { + self.write_opt(&WriteOptions::default(), wb) + } + fn write_batch(&self, cap: usize) -> Self::Batch; + fn snapshot(&self) -> Self::Snap; + fn sync(&self) -> Result<()>; + fn cf_names(&self) -> Vec<&str>; + fn delete_all_in_range( + &self, + t: DeleteRangeType, + start_key: &[u8], + end_key: &[u8], + ) -> Result<()> { + if start_key >= end_key { + return Ok(()); + } + for cf in self.cf_names() { + self.delete_all_in_range_cf(t, cf, start_key, end_key)?; + } + Ok(()) + } + fn delete_all_in_range_cf( + &self, + t: DeleteRangeType, + cf: &str, + start_key: &[u8], + end_key: &[u8], + ) -> Result<()>; + + fn ingest_external_file_cf(&self, cf: &str, files: &[&str]) -> Result<()>; + fn validate_file_for_ingestion>( + &self, + cf: &str, + path: P, + expected_size: u64, + expected_checksum: u32, + ) -> Result<()>; +} diff --git a/components/engine_traits/src/errors.rs b/components/engine_traits/src/errors.rs new file mode 100644 index 00000000000..8f22b64b25f --- /dev/null +++ b/components/engine_traits/src/errors.rs @@ -0,0 +1,43 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{error, result}; + +quick_error! { + #[derive(Debug)] + pub enum Error { + // Engine uses plain string as the error. + Engine(msg: String) { + from() + description("Storage Engine error") + display("Storage Engine {}", msg) + } + // FIXME: It should not know Region. + NotInRange( key: Vec, region_id: u64, start: Vec, end: Vec) { + description("Key is out of range") + display( + "Key {} is out of [region {}] [{}, {})", + hex::encode_upper(&key), region_id, hex::encode_upper(&start), hex::encode_upper(&end) + ) + } + Protobuf(err: protobuf::ProtobufError) { + from() + cause(err) + description(err.description()) + display("Protobuf {}", err) + } + Io(err: std::io::Error) { + from() + cause(err) + description(err.description()) + display("Io {}", err) + } + Other(err: Box) { + from() + cause(err.as_ref()) + description(err.description()) + display("{:?}", err) + } + } +} + +pub type Result = result::Result; diff --git a/components/engine_traits/src/iterable.rs b/components/engine_traits/src/iterable.rs new file mode 100644 index 00000000000..55bcdb151f6 --- /dev/null +++ b/components/engine_traits/src/iterable.rs @@ -0,0 +1,112 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use tikv_util::keybuilder::KeyBuilder; + +use crate::*; + +#[derive(Clone, PartialEq)] +pub enum SeekMode { + TotalOrder, + Prefix, +} + +pub enum SeekKey<'a> { + Start, + End, + Key(&'a [u8]), +} + +pub trait Iterator { + fn seek(&mut self, key: SeekKey) -> bool; + fn seek_for_prev(&mut self, key: SeekKey) -> bool; + + fn prev(&mut self) -> bool; + fn next(&mut self) -> bool; + + fn key(&self) -> Result<&[u8]>; + fn value(&self) -> Result<&[u8]>; + + fn valid(&self) -> bool; + fn status(&self) -> Result<()>; +} + +pub trait Iterable { + type Iter: Iterator; + + fn iterator_opt(&self, opts: &IterOptions) -> Result; + fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result; + + fn iterator(&self) -> Result { + self.iterator_opt(&IterOptions::default()) + } + + fn iterator_cf(&self, cf: &str) -> Result { + self.iterator_cf_opt(&IterOptions::default(), cf) + } + + fn scan(&self, start_key: &[u8], end_key: &[u8], fill_cache: bool, f: F) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result, + { + let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); + let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); + let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); + scan_impl(self.iterator_opt(&iter_opt)?, start_key, f) + } + + // like `scan`, only on a specific column family. + fn scan_cf( + &self, + cf: &str, + start_key: &[u8], + end_key: &[u8], + fill_cache: bool, + f: F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result, + { + let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); + let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); + let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); + scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f) + } + + // Seek the first key >= given key, if not found, return None. + fn seek(&self, key: &[u8]) -> Result, Vec)>> { + let mut iter = self.iterator()?; + iter.seek(SeekKey::Key(key)); + if iter.valid() { + Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + } else { + Ok(None) + } + } + + // Seek the first key >= given key, if not found, return None. + fn seek_cf(&self, cf: &str, key: &[u8]) -> Result, Vec)>> { + let mut iter = self.iterator_cf(cf)?; + iter.seek(SeekKey::Key(key)); + if iter.valid() { + Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + } else { + Ok(None) + } + } +} + +fn scan_impl(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()> +where + Iter: Iterator, + F: FnMut(&[u8], &[u8]) -> Result, +{ + it.seek(SeekKey::Key(start_key)); + while it.valid() { + let r = f(it.key()?, it.value()?)?; + if !r || !it.next() { + break; + } + } + + it.status().map_err(From::from) +} diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs new file mode 100644 index 00000000000..f10e76b7cb9 --- /dev/null +++ b/components/engine_traits/src/lib.rs @@ -0,0 +1,71 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +#![recursion_limit = "200"] + +#[macro_use] +extern crate quick_error; +#[allow(unused_extern_crates)] +extern crate tikv_alloc; + +mod errors; +pub use crate::errors::*; +mod peekable; +pub use crate::peekable::*; +mod iterable; +pub use crate::iterable::*; +mod mutable; +pub use crate::mutable::*; +mod cf; +pub use crate::cf::*; +mod engine; +pub use crate::engine::*; +mod options; +pub use crate::options::*; +mod util; + +pub const DATA_KEY_PREFIX_LEN: usize = 1; + +// In our tests, we found that if the batch size is too large, running delete_all_in_range will +// reduce OLTP QPS by 30% ~ 60%. We found that 32K is a proper choice. +pub const MAX_DELETE_BATCH_SIZE: usize = 32 * 1024; + +#[derive(Clone, Debug)] +pub struct KvEngines { + pub kv: K, + pub raft: R, + pub shared_block_cache: bool, +} + +impl KvEngines { + pub fn new(kv_engine: K, raft_engine: R, shared_block_cache: bool) -> Self { + KvEngines { + kv: kv_engine, + raft: raft_engine, + shared_block_cache, + } + } + + pub fn write_kv(&self, wb: &K::Batch) -> Result<()> { + self.kv.write(wb) + } + + pub fn write_kv_opt(&self, wb: &K::Batch, opts: &WriteOptions) -> Result<()> { + self.kv.write_opt(opts, wb) + } + + pub fn sync_kv(&self) -> Result<()> { + self.kv.sync() + } + + pub fn write_raft(&self, wb: &R::Batch) -> Result<()> { + self.raft.write(wb) + } + + pub fn write_raft_opt(&self, wb: &R::Batch, opts: &WriteOptions) -> Result<()> { + self.raft.write_opt(opts, wb) + } + + pub fn sync_raft(&self) -> Result<()> { + self.raft.sync() + } +} diff --git a/components/engine_traits/src/mutable.rs b/components/engine_traits/src/mutable.rs new file mode 100644 index 00000000000..27066a4ddba --- /dev/null +++ b/components/engine_traits/src/mutable.rs @@ -0,0 +1,35 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::*; + +pub trait Mutable { + fn put_opt(&self, opts: &WriteOptions, key: &[u8], value: &[u8]) -> Result<()>; + fn put_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8], value: &[u8]) -> Result<()>; + + fn delete_opt(&self, opts: &WriteOptions, key: &[u8]) -> Result<()>; + fn delete_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8]) -> Result<()>; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.put_opt(&WriteOptions::default(), key, value) + } + + fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + self.put_cf_opt(&WriteOptions::default(), cf, key, value) + } + + fn delete(&self, key: &[u8]) -> Result<()> { + self.delete_opt(&WriteOptions::default(), key) + } + + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { + self.delete_cf_opt(&WriteOptions::default(), cf, key) + } + + fn put_msg(&self, key: &[u8], m: &M) -> Result<()> { + self.put(key, &m.write_to_bytes()?) + } + + fn put_msg_cf(&self, cf: &str, key: &[u8], m: &M) -> Result<()> { + self.put_cf(cf, key, &m.write_to_bytes()?) + } +} diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs new file mode 100644 index 00000000000..686d5a12354 --- /dev/null +++ b/components/engine_traits/src/options.rs @@ -0,0 +1,161 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use tikv_util::keybuilder::KeyBuilder; + +use crate::SeekMode; + +#[derive(Clone)] +pub struct ReadOptions {} + +impl ReadOptions { + pub fn new() -> ReadOptions { + ReadOptions {} + } +} + +impl Default for ReadOptions { + fn default() -> ReadOptions { + ReadOptions {} + } +} + +#[derive(Clone)] +pub struct WriteOptions { + sync: bool, +} + +impl WriteOptions { + pub fn new() -> WriteOptions { + WriteOptions { sync: false } + } + + pub fn set_sync(&mut self, sync: bool) { + self.sync = sync; + } +} + +impl Default for WriteOptions { + fn default() -> WriteOptions { + WriteOptions { sync: false } + } +} + +#[derive(Clone)] +pub struct CFOptions {} + +impl CFOptions { + pub fn new() -> CFOptions { + CFOptions {} + } +} + +impl Default for CFOptions { + fn default() -> CFOptions { + CFOptions {} + } +} + +#[derive(Clone)] +pub struct IterOptions { + lower_bound: Option, + upper_bound: Option, + prefix_same_as_start: bool, + fill_cache: bool, + key_only: bool, + seek_mode: SeekMode, +} + +impl IterOptions { + pub fn new( + lower_bound: Option, + upper_bound: Option, + fill_cache: bool, + ) -> IterOptions { + IterOptions { + lower_bound, + upper_bound, + prefix_same_as_start: false, + fill_cache, + key_only: false, + seek_mode: SeekMode::TotalOrder, + } + } + + pub fn use_prefix_seek(mut self) -> IterOptions { + self.seek_mode = SeekMode::Prefix; + self + } + + pub fn total_order_seek_used(&self) -> bool { + self.seek_mode == SeekMode::TotalOrder + } + + pub fn set_fill_cache(&mut self, v: bool) { + self.fill_cache = v; + } + + pub fn set_key_only(&mut self, v: bool) { + self.key_only = v; + } + + pub fn lower_bound(&self) -> Option<&[u8]> { + self.lower_bound.as_ref().map(|v| v.as_slice()) + } + + pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { + let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); + self.lower_bound = Some(builder); + } + + pub fn set_vec_lower_bound(&mut self, bound: Vec) { + self.lower_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); + } + + pub fn set_lower_bound_prefix(&mut self, prefix: &[u8]) { + if let Some(ref mut builder) = self.lower_bound { + builder.set_prefix(prefix); + } + } + + pub fn upper_bound(&self) -> Option<&[u8]> { + self.upper_bound.as_ref().map(|v| v.as_slice()) + } + + pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { + let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); + self.upper_bound = Some(builder); + } + + pub fn set_vec_upper_bound(&mut self, bound: Vec) { + self.upper_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); + } + + pub fn set_upper_bound_prefix(&mut self, prefix: &[u8]) { + if let Some(ref mut builder) = self.upper_bound { + builder.set_prefix(prefix); + } + } + + pub fn set_prefix_same_as_start(&mut self, enable: bool) { + self.prefix_same_as_start = enable; + } +} + +impl Default for IterOptions { + fn default() -> IterOptions { + IterOptions { + lower_bound: None, + upper_bound: None, + prefix_same_as_start: false, + fill_cache: false, + key_only: false, + seek_mode: SeekMode::TotalOrder, + } + } +} + +#[derive(Clone, Copy)] +pub enum DeleteRangeType { + WriteBatch, + DeleteFiles, + DeleteRange, +} diff --git a/components/engine_traits/src/peekable.rs b/components/engine_traits/src/peekable.rs new file mode 100644 index 00000000000..4fe3d6a2cef --- /dev/null +++ b/components/engine_traits/src/peekable.rs @@ -0,0 +1,42 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::*; + +pub trait Peekable { + fn get_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>>; + fn get_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> Result>>; + + fn get(&self, key: &[u8]) -> Result>> { + self.get_opt(&ReadOptions::default(), key) + } + + fn get_cf(&self, cf: &str, key: &[u8]) -> Result>> { + self.get_cf_opt(&ReadOptions::default(), cf, key) + } + + fn get_msg(&self, key: &[u8]) -> Result> { + let value = self.get(key)?; + if value.is_none() { + return Ok(None); + } + + let mut m = M::default(); + m.merge_from_bytes(&value.unwrap())?; + Ok(Some(m)) + } + + fn get_msg_cf( + &self, + cf: &str, + key: &[u8], + ) -> Result> { + let value = self.get_cf(cf, key)?; + if value.is_none() { + return Ok(None); + } + + let mut m = M::default(); + m.merge_from_bytes(&value.unwrap())?; + Ok(Some(m)) + } +} diff --git a/components/engine_traits/src/util.rs b/components/engine_traits/src/util.rs new file mode 100644 index 00000000000..d1da9142dd1 --- /dev/null +++ b/components/engine_traits/src/util.rs @@ -0,0 +1,23 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use super::{Error, Result}; + +/// Check if key in range [`start_key`, `end_key`). +#[allow(dead_code)] +pub fn check_key_in_range( + key: &[u8], + region_id: u64, + start_key: &[u8], + end_key: &[u8], +) -> Result<()> { + if key >= start_key && (end_key.is_empty() || key < end_key) { + Ok(()) + } else { + Err(Error::NotInRange( + key.to_vec(), + region_id, + start_key.to_vec(), + end_key.to_vec(), + )) + } +} diff --git a/components/tikv_util/src/keybuilder.rs b/components/tikv_util/src/keybuilder.rs index b0026e75752..3d089561724 100644 --- a/components/tikv_util/src/keybuilder.rs +++ b/components/tikv_util/src/keybuilder.rs @@ -2,6 +2,7 @@ use std::ptr; +#[derive(Clone)] pub struct KeyBuilder { buf: Vec, start: usize, diff --git a/tests/integrations/raftstore/test_update_region_size.rs b/tests/integrations/raftstore/test_update_region_size.rs index 6154ee2db79..eae9be2c638 100644 --- a/tests/integrations/raftstore/test_update_region_size.rs +++ b/tests/integrations/raftstore/test_update_region_size.rs @@ -13,7 +13,7 @@ fn flush(cluster: &mut Cluster) { } } -fn test_update_regoin_size(cluster: &mut Cluster) { +fn test_update_region_size(cluster: &mut Cluster) { cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(50); cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(50); cluster.cfg.raft_store.region_split_check_diff = ReadableSize::kb(1); @@ -70,5 +70,5 @@ fn test_update_regoin_size(cluster: &mut Cluster) { fn test_server_update_region_size() { let count = 1; let mut cluster = new_server_cluster(0, count); - test_update_regoin_size(&mut cluster); + test_update_region_size(&mut cluster); }