From 77caf9d566af419be493b59ff9b1b4bcd8df050a Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 11:43:09 +0800 Subject: [PATCH 1/9] add engine_traits component Signed-off-by: 5kbpers --- Cargo.toml | 1 + components/engine_traits/Cargo.toml | 40 +++ components/engine_traits/src/cf.rs | 11 + components/engine_traits/src/engine.rs | 70 ++++++ components/engine_traits/src/errors.rs | 66 +++++ components/engine_traits/src/iterable.rs | 112 +++++++++ components/engine_traits/src/lib.rs | 121 +++++++++ components/engine_traits/src/mutable.rs | 35 +++ components/engine_traits/src/options.rs | 192 +++++++++++++++ components/engine_traits/src/peekable.rs | 44 ++++ components/engine_traits/src/util.rs | 301 +++++++++++++++++++++++ 11 files changed, 993 insertions(+) create mode 100644 components/engine_traits/Cargo.toml create mode 100644 components/engine_traits/src/cf.rs create mode 100644 components/engine_traits/src/engine.rs create mode 100644 components/engine_traits/src/errors.rs create mode 100644 components/engine_traits/src/iterable.rs create mode 100644 components/engine_traits/src/lib.rs create mode 100644 components/engine_traits/src/mutable.rs create mode 100644 components/engine_traits/src/options.rs create mode 100644 components/engine_traits/src/peekable.rs create mode 100644 components/engine_traits/src/util.rs diff --git a/Cargo.toml b/Cargo.toml index 736712d917d..bdb54a41537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ engine = { path = "components/engine" } tidb_query = { path = "components/tidb_query" } pd_client = { path = "components/pd_client" } keys = { path = "components/keys" } +engine_traits = { path = "components/engine_traits" } [dependencies.murmur3] git = "https://github.com/pingcap/murmur3.git" diff --git a/components/engine_traits/Cargo.toml b/components/engine_traits/Cargo.toml new file mode 100644 index 00000000000..601a3717865 --- /dev/null +++ b/components/engine_traits/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "engine_traits" +version = "0.0.1" +edition = "2018" +publish = false + +[features] +jemalloc = ["engine_rocksdb/jemalloc"] +portable = ["engine_rocksdb/portable"] +sse = ["engine_rocksdb/sse"] + +[dependencies] +kvproto = { git = "https://github.com/pingcap/kvproto.git" } +protobuf = "2" +raft = "0.6.0-alpha" +quick-error = "1.2.2" +crc = "1.8" +lazy_static = "1.3" +slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } +slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" } +time = "0.1" +sys-info = "0.5.7" +tikv_alloc = { path = "../tikv_alloc", default-features = false } +serde = "1.0" +serde_derive = "1.0" +toml = "0.4" +hex = "0.3" +tikv_util = { path = "../tikv_util" } + +[dependencies.prometheus] +version = "0.4.2" +default-features = false +features = ["nightly"] + +[dependencies.engine_rocksdb] +git = "https://github.com/pingcap/rust-rocksdb.git" +package = "rocksdb" + +[dev-dependencies] +tempfile = "3.0" diff --git a/components/engine_traits/src/cf.rs b/components/engine_traits/src/cf.rs new file mode 100644 index 00000000000..f6caca40623 --- /dev/null +++ b/components/engine_traits/src/cf.rs @@ -0,0 +1,11 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +pub type CfName = &'static str; +pub const CF_DEFAULT: CfName = "default"; +pub const CF_LOCK: CfName = "lock"; +pub const CF_WRITE: CfName = "write"; +pub const CF_RAFT: CfName = "raft"; +// Cfs that should be very large generally. +pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE]; +pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT]; +pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE]; diff --git a/components/engine_traits/src/engine.rs b/components/engine_traits/src/engine.rs new file mode 100644 index 00000000000..d952a92893d --- /dev/null +++ b/components/engine_traits/src/engine.rs @@ -0,0 +1,70 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::fmt::Debug; +use std::path::Path; + +use crate::*; + +pub trait Snapshot: 'static + Peekable + Send + Sync + Debug { + fn cf_names(&self) -> Vec<&str>; +} + +pub trait WriteBatch: Mutable { + fn data_size(&self) -> usize; + fn count(&self) -> usize; + fn is_empty(&self) -> bool; + fn clear(&self); + + fn set_save_point(&mut self); + fn pop_save_point(&mut self) -> Result<()>; + fn rollback_to_save_point(&mut self) -> Result<()>; +} + +pub trait KvEngine: Peekable + Mutable + Iterable + Send + Sync + Clone { + type Snap: Snapshot; + type Batch: WriteBatch; + + fn write_opt(&self, opts: &WriteOptions, wb: &Self::Batch) -> Result<()>; + fn write(&self, wb: &Self::Batch) -> Result<()> { + self.write_opt(&WriteOptions::default(), wb) + } + fn write_batch(&self, cap: usize) -> Self::Batch; + fn snapshot(&self) -> Self::Snap; + fn sync(&self) -> Result<()>; + fn cf_names(&self) -> Vec<&str>; + fn delete_all_in_range( + &self, + t: DeleteRangeType, + start_key: &[u8], + end_key: &[u8], + ) -> Result<()> { + if start_key >= end_key { + return Ok(()); + } + for cf in self.cf_names() { + self.delete_all_in_range_cf(t, cf, start_key, end_key)?; + } + Ok(()) + } + fn delete_all_in_range_cf( + &self, + t: DeleteRangeType, + cf: &str, + start_key: &[u8], + end_key: &[u8], + ) -> Result<()>; + fn ingest_external_file_cf( + &self, + opts: &IngestExternalFileOptions, + cf: &str, + files: &[&str], + ) -> Result<()>; + + fn validate_file_for_ingestion>( + &self, + cf: &str, + path: P, + expected_size: u64, + expected_checksum: u32, + ) -> Result<()>; +} diff --git a/components/engine_traits/src/errors.rs b/components/engine_traits/src/errors.rs new file mode 100644 index 00000000000..a4f246cae09 --- /dev/null +++ b/components/engine_traits/src/errors.rs @@ -0,0 +1,66 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{error, result}; + +quick_error! { + #[derive(Debug)] + pub enum Error { + // Engine uses plain string as the error. + Engine(msg: String) { + from() + description("Storage Engine error") + display("Storage Engine {}", msg) + } + // FIXME: It should not know Region. + NotInRange( key: Vec, regoin_id: u64, start: Vec, end: Vec) { + description("Key is out of range") + display( + "Key {} is out of [region {}] [{}, {})", + hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end) + ) + } + Protobuf(err: protobuf::ProtobufError) { + from() + cause(err) + description(err.description()) + display("Protobuf {}", err) + } + Io(err: std::io::Error) { + from() + cause(err) + description(err.description()) + display("Io {}", err) + } + + Other(err: Box) { + from() + cause(err.as_ref()) + description(err.description()) + display("{:?}", err) + } + } +} + +pub type Result = result::Result; + +impl From for raft::Error { + fn from(err: Error) -> raft::Error { + raft::Error::Store(raft::StorageError::Other(err.into())) + } +} + +impl From for kvproto::errorpb::Error { + fn from(err: Error) -> kvproto::errorpb::Error { + let mut errorpb = kvproto::errorpb::Error::default(); + errorpb.set_message(format!("{}", err)); + + if let Error::NotInRange(key, region_id, start_key, end_key) = err { + errorpb.mut_key_not_in_region().set_key(key); + errorpb.mut_key_not_in_region().set_region_id(region_id); + errorpb.mut_key_not_in_region().set_start_key(start_key); + errorpb.mut_key_not_in_region().set_end_key(end_key); + } + + errorpb + } +} diff --git a/components/engine_traits/src/iterable.rs b/components/engine_traits/src/iterable.rs new file mode 100644 index 00000000000..4ae79fa96a0 --- /dev/null +++ b/components/engine_traits/src/iterable.rs @@ -0,0 +1,112 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use tikv_util::keybuilder::KeyBuilder; + +use crate::*; + +#[derive(Clone, PartialEq)] +pub enum SeekMode { + TotalOrder, + Prefix, +} + +pub enum SeekKey<'a> { + Start, + End, + Key(&'a [u8]), +} + +pub trait Iterator { + fn seek(&mut self, key: SeekKey) -> bool; + fn seek_for_prev(&mut self, key: SeekKey) -> bool; + + fn prev(&mut self) -> bool; + fn next(&mut self) -> bool; + + fn key(&self) -> Result<&[u8]>; + fn value(&self) -> Result<&[u8]>; + + fn valid(&self) -> bool; + fn status(&self) -> Result<()>; +} + +pub trait Iterable { + type Iter: Iterator; + + fn iterator_opt(&self, opts: &IterOptions) -> Result; + fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result; + + fn iterator(&self) -> Result { + self.iterator_opt(&IterOptions::default()) + } + + fn iterator_cf(&self, cf: &str) -> Result { + self.iterator_cf_opt(&IterOptions::default(), cf) + } + + fn scan(&self, start_key: &[u8], end_key: &[u8], fill_cache: bool, f: F) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result, + { + let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); + let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); + let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); + scan_impl(self.iterator_opt(&iter_opt)?, start_key, f) + } + + // like `scan`, only on a specific column family. + fn scan_cf( + &self, + cf: &str, + start_key: &[u8], + end_key: &[u8], + fill_cache: bool, + f: F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result, + { + let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); + let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); + let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); + scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f) + } + + // Seek the first key >= given key, if no found, return None. + fn seek(&self, key: &[u8]) -> Result, Vec)>> { + let mut iter = self.iterator()?; + iter.seek(SeekKey::Key(key)); + if iter.valid() { + Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + } else { + Ok(None) + } + } + + // Seek the first key >= given key, if no found, return None. + fn seek_cf(&self, cf: &str, key: &[u8]) -> Result, Vec)>> { + let mut iter = self.iterator_cf(cf)?; + iter.seek(SeekKey::Key(key)); + if iter.valid() { + Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + } else { + Ok(None) + } + } +} + +fn scan_impl(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()> +where + F: FnMut(&[u8], &[u8]) -> Result, +{ + it.seek(SeekKey::Key(start_key)); + while it.valid() { + let r = f(it.key()?, it.value()?)?; + + if !r || !it.next() { + break; + } + } + + it.status().map_err(From::from) +} diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs new file mode 100644 index 00000000000..073b85417d9 --- /dev/null +++ b/components/engine_traits/src/lib.rs @@ -0,0 +1,121 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +#![recursion_limit = "200"] + +#[macro_use(slog_error, slog_warn)] +extern crate slog; +#[macro_use] +extern crate slog_global; +#[macro_use] +extern crate prometheus; +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate quick_error; +#[macro_use] +extern crate serde_derive; +#[allow(unused_extern_crates)] +extern crate tikv_alloc; + +pub mod util; + +mod errors; +pub use crate::errors::*; +mod peekable; +pub use crate::peekable::*; +mod iterable; +pub use crate::iterable::*; +mod mutable; +pub use crate::mutable::*; +mod cf; +pub use crate::cf::*; +mod engine; +pub use crate::engine::*; +mod options; +pub use crate::options::*; +use engine_rocksdb::{WriteBatch, DB}; + +use std::sync::Arc; + +pub const DATA_KEY_PREFIX_LEN: usize = 1; + +// In our tests, we found that if the batch size is too large, running delete_all_in_range will +// reduce OLTP QPS by 30% ~ 60%. We found that 32K is a proper choice. +pub const MAX_DELETE_BATCH_SIZE: usize = 32 * 1024; + +#[derive(Clone, Debug)] +pub struct KvEngines { + pub kv: E, + pub raft: E, + pub shared_block_cache: bool, +} + +pub type Engines = KvEngines>; + +impl KvEngines { + pub fn new(kv_engine: E, raft_engine: E, shared_block_cache: bool) -> Self { + KvEngines { + kv: kv_engine, + raft: raft_engine, + shared_block_cache, + } + } + + pub fn write_kv(&self, wb: &E::Batch) -> Result<()> { + self.kv.write(wb) + } + + pub fn write_kv_opt(&self, wb: &E::Batch, opts: &WriteOptions) -> Result<()> { + self.kv.write_opt(opts, wb) + } + + pub fn sync_kv(&self) -> Result<()> { + self.kv.sync() + } + + pub fn write_raft(&self, wb: &E::Batch) -> Result<()> { + self.raft.write(wb) + } + + pub fn write_raft_opt(&self, wb: &E::Batch, opts: &WriteOptions) -> Result<()> { + self.raft.write_opt(opts, wb) + } + + pub fn sync_raft(&self) -> Result<()> { + self.raft.sync() + } +} + +impl Engines { + pub fn new(kv_engine: Arc, raft_engine: Arc, shared_block_cache: bool) -> Self { + KvEngines { + kv: kv_engine, + raft: raft_engine, + shared_block_cache, + } + } + + pub fn write_kv(&self, wb: &RawWriteBatch) -> Result<()> { + self.kv.write(wb).map_err(Error::Engine) + } + + pub fn write_kv_opt(&self, wb: &RawWriteBatch, opts: &WriteOptions) -> Result<()> { + self.kv.write_opt(wb, &opts.into()).map_err(Error::Engine) + } + + pub fn sync_kv(&self) -> Result<()> { + self.kv.sync_wal().map_err(Error::Engine) + } + + pub fn write_raft(&self, wb: &RawWriteBatch) -> Result<()> { + self.raft.write(wb).map_err(Error::Engine) + } + + pub fn write_raft_opt(&self, wb: &RawWriteBatch, opts: &WriteOptions) -> Result<()> { + self.raft.write_opt(wb, &opts.into()).map_err(Error::Engine) + } + + pub fn sync_raft(&self) -> Result<()> { + self.raft.sync_wal().map_err(Error::Engine) + } +} diff --git a/components/engine_traits/src/mutable.rs b/components/engine_traits/src/mutable.rs new file mode 100644 index 00000000000..27066a4ddba --- /dev/null +++ b/components/engine_traits/src/mutable.rs @@ -0,0 +1,35 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::*; + +pub trait Mutable { + fn put_opt(&self, opts: &WriteOptions, key: &[u8], value: &[u8]) -> Result<()>; + fn put_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8], value: &[u8]) -> Result<()>; + + fn delete_opt(&self, opts: &WriteOptions, key: &[u8]) -> Result<()>; + fn delete_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8]) -> Result<()>; + + fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.put_opt(&WriteOptions::default(), key, value) + } + + fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + self.put_cf_opt(&WriteOptions::default(), cf, key, value) + } + + fn delete(&self, key: &[u8]) -> Result<()> { + self.delete_opt(&WriteOptions::default(), key) + } + + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { + self.delete_cf_opt(&WriteOptions::default(), cf, key) + } + + fn put_msg(&self, key: &[u8], m: &M) -> Result<()> { + self.put(key, &m.write_to_bytes()?) + } + + fn put_msg_cf(&self, cf: &str, key: &[u8], m: &M) -> Result<()> { + self.put_cf(cf, key, &m.write_to_bytes()?) + } +} diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs new file mode 100644 index 00000000000..2c28aceef17 --- /dev/null +++ b/components/engine_traits/src/options.rs @@ -0,0 +1,192 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use tikv_util::keybuilder::KeyBuilder; + +use crate::SeekMode; + +#[derive(Clone)] +pub struct ReadOptions {} + +impl ReadOptions { + pub fn new() -> ReadOptions { + ReadOptions {} + } +} + +impl Default for ReadOptions { + fn default() -> ReadOptions { + ReadOptions {} + } +} + +#[derive(Clone)] +pub struct WriteOptions { + pub sync: bool, +} + +impl WriteOptions { + pub fn new() -> WriteOptions { + WriteOptions { sync: false } + } + + pub fn set_sync(&mut self, sync: bool) { + self.sync = sync; + } +} + +impl Default for WriteOptions { + fn default() -> WriteOptions { + WriteOptions { sync: false } + } +} + +#[derive(Clone)] +pub struct CFOptions {} + +impl CFOptions { + pub fn new() -> CFOptions { + CFOptions {} + } +} + +impl Default for CFOptions { + fn default() -> CFOptions { + CFOptions {} + } +} + +#[derive(Clone)] +pub struct IterOptions { + pub lower_bound: Option, + pub upper_bound: Option, + pub prefix_same_as_start: bool, + pub fill_cache: bool, + pub key_only: bool, + pub seek_mode: SeekMode, +} + +impl IterOptions { + pub fn new( + lower_bound: Option, + upper_bound: Option, + fill_cache: bool, + ) -> IterOptions { + IterOptions { + lower_bound, + upper_bound, + prefix_same_as_start: false, + fill_cache, + key_only: false, + seek_mode: SeekMode::TotalOrder, + } + } + + #[inline] + pub fn use_prefix_seek(mut self) -> IterOptions { + self.seek_mode = SeekMode::Prefix; + self + } + + #[inline] + pub fn total_order_seek_used(&self) -> bool { + self.seek_mode == SeekMode::TotalOrder + } + + #[inline] + pub fn fill_cache(&mut self, v: bool) { + self.fill_cache = v; + } + + #[inline] + pub fn key_only(&mut self, v: bool) { + self.key_only = v; + } + + #[inline] + pub fn lower_bound(&self) -> Option<&[u8]> { + self.lower_bound.as_ref().map(|v| v.as_slice()) + } + + #[inline] + pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { + let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); + self.lower_bound = Some(builder); + } + + pub fn set_vec_lower_bound(&mut self, bound: Vec) { + self.lower_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); + } + + pub fn set_lower_bound_prefix(&mut self, prefix: &[u8]) { + if let Some(ref mut builder) = self.lower_bound { + builder.set_prefix(prefix); + } + } + + #[inline] + pub fn upper_bound(&self) -> Option<&[u8]> { + self.upper_bound.as_ref().map(|v| v.as_slice()) + } + + #[inline] + pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { + let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); + self.upper_bound = Some(builder); + } + + pub fn set_vec_upper_bound(&mut self, bound: Vec) { + self.upper_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); + } + + pub fn set_upper_bound_prefix(&mut self, prefix: &[u8]) { + if let Some(ref mut builder) = self.upper_bound { + builder.set_prefix(prefix); + } + } + + #[inline] + pub fn set_prefix_same_as_start(mut self, enable: bool) -> IterOptions { + self.prefix_same_as_start = enable; + self + } +} + +impl Default for IterOptions { + fn default() -> IterOptions { + IterOptions { + lower_bound: None, + upper_bound: None, + prefix_same_as_start: false, + fill_cache: false, + key_only: false, + seek_mode: SeekMode::TotalOrder, + } + } +} + +#[derive(Clone, Copy)] +pub enum DeleteRangeType { + Normal, + DeleteFiles, + DeleteRange, +} + +#[derive(Clone)] +pub struct IngestExternalFileOptions { + pub move_files: bool, +} + +impl IngestExternalFileOptions { + pub fn new() -> IngestExternalFileOptions { + IngestExternalFileOptions { move_files: false } + } + + pub fn move_files(&mut self, v: bool) { + self.move_files = v; + } +} + +impl Default for IngestExternalFileOptions { + fn default() -> IngestExternalFileOptions { + IngestExternalFileOptions { move_files: false } + } +} diff --git a/components/engine_traits/src/peekable.rs b/components/engine_traits/src/peekable.rs new file mode 100644 index 00000000000..6729fcec6e3 --- /dev/null +++ b/components/engine_traits/src/peekable.rs @@ -0,0 +1,44 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::*; + +pub trait Peekable { + fn get_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>>; + fn get_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> Result>>; + + fn get(&self, key: &[u8]) -> Result>> { + self.get_opt(&ReadOptions::default(), key) + } + + fn get_cf(&self, cf: &str, key: &[u8]) -> Result>> { + self.get_cf_opt(&ReadOptions::default(), cf, key) + } + + fn get_msg(&self, key: &[u8]) -> Result> { + let value = self.get(key)?; + + if value.is_none() { + return Ok(None); + } + + let mut m = M::default(); + m.merge_from_bytes(&value.unwrap())?; + Ok(Some(m)) + } + + fn get_msg_cf( + &self, + cf: &str, + key: &[u8], + ) -> Result> { + let value = self.get_cf(cf, key)?; + + if value.is_none() { + return Ok(None); + } + + let mut m = M::default(); + m.merge_from_bytes(&value.unwrap())?; + Ok(Some(m)) + } +} diff --git a/components/engine_traits/src/util.rs b/components/engine_traits/src/util.rs new file mode 100644 index 00000000000..10b9f4e2d24 --- /dev/null +++ b/components/engine_traits/src/util.rs @@ -0,0 +1,301 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::u64; + +use crate::rocks; +use crate::rocks::util::get_cf_handle; +use crate::rocks::{DBIterator, Range, RawWriteBatch, TablePropertiesCollection, Writable, DB}; +use crate::CF_LOCK; + +use super::{Error, Result}; +use super::{IterOptions, MAX_DELETE_BATCH_SIZE}; +use tikv_util::keybuilder::KeyBuilder; + +pub fn put_msg_cf( + db: &DB, + wb: &RawWriteBatch, + cf: &str, + key: &[u8], + m: &M, +) -> Result<()> { + let handle = get_cf_handle(db, cf)?; + wb.put_cf(handle, key, &m.write_to_bytes()?)?; + Ok(()) +} + +/// Check if key in range [`start_key`, `end_key`). +pub fn check_key_in_range( + key: &[u8], + region_id: u64, + start_key: &[u8], + end_key: &[u8], +) -> Result<()> { + if key >= start_key && (end_key.is_empty() || key < end_key) { + Ok(()) + } else { + Err(Error::NotInRange( + key.to_vec(), + region_id, + start_key.to_vec(), + end_key.to_vec(), + )) + } +} + +pub fn delete_all_in_range( + db: &DB, + start_key: &[u8], + end_key: &[u8], + use_delete_range: bool, +) -> Result<()> { + if start_key >= end_key { + return Ok(()); + } + + for cf in db.cf_names() { + delete_all_in_range_cf(db, cf, start_key, end_key, use_delete_range)?; + } + + Ok(()) +} + +pub fn delete_all_in_range_cf( + db: &DB, + cf: &str, + start_key: &[u8], + end_key: &[u8], + use_delete_range: bool, +) -> Result<()> { + let handle = rocks::util::get_cf_handle(db, cf)?; + let wb = RawWriteBatch::default(); + if use_delete_range && cf != CF_LOCK { + wb.delete_range_cf(handle, start_key, end_key)?; + } else { + let start = KeyBuilder::from_slice(start_key, 0, 0); + let end = KeyBuilder::from_slice(end_key, 0, 0); + let mut iter_opt = IterOptions::new(Some(start), Some(end), false); + if db.is_titan() { + // Cause DeleteFilesInRange may expose old blob index keys, setting key only for Titan + // to avoid referring to missing blob files. + iter_opt.key_only(true); + } + let handle = rocks::util::get_cf_handle(db, cf)?; + let mut it = DBIterator::new_cf(db, handle, iter_opt.into()); + it.seek(start_key.into()); + while it.valid() { + wb.delete_cf(handle, it.key())?; + if wb.data_size() >= MAX_DELETE_BATCH_SIZE { + // Can't use write_without_wal here. + // Otherwise it may cause dirty data when applying snapshot. + db.write(&wb)?; + wb.clear(); + } + + if !it.next() { + break; + } + } + it.status()?; + } + + if wb.count() > 0 { + db.write(&wb)?; + } + + Ok(()) +} + +pub fn delete_all_files_in_range(db: &DB, start_key: &[u8], end_key: &[u8]) -> Result<()> { + if start_key >= end_key { + return Ok(()); + } + + for cf in db.cf_names() { + let handle = rocks::util::get_cf_handle(db, cf)?; + db.delete_files_in_range_cf(handle, start_key, end_key, false)?; + } + + Ok(()) +} + +pub fn get_range_properties_cf( + db: &DB, + cfname: &str, + start_key: &[u8], + end_key: &[u8], +) -> Result { + let cf = rocks::util::get_cf_handle(db, cfname)?; + let range = Range::new(start_key, end_key); + db.get_properties_of_tables_in_range(cf, &[range]) + .map_err(|e| e.into()) +} + +#[cfg(test)] +mod tests { + use tempfile::Builder; + + use crate::rocks; + use crate::rocks::util::{get_cf_handle, new_engine_opt, CFOptions}; + use crate::rocks::{ + DBOptions, RawCFOptions as ColumnFamilyOptions, RawSeekKey as SeekKey, RawWriteBatch, + Writable, DB, + }; + use crate::ALL_CFS; + + use super::*; + + fn check_data(db: &DB, cfs: &[&str], expected: &[(&[u8], &[u8])]) { + for cf in cfs { + let handle = get_cf_handle(db, cf).unwrap(); + let mut iter = db.iter_cf(handle); + iter.seek(SeekKey::Start); + for &(k, v) in expected { + assert_eq!(k, iter.key()); + assert_eq!(v, iter.value()); + iter.next(); + } + assert!(!iter.valid()); + } + } + + fn test_delete_all_in_range(use_delete_range: bool) { + let path = Builder::new() + .prefix("engine_delete_all_in_range") + .tempdir() + .unwrap(); + let path_str = path.path().to_str().unwrap(); + + let cfs_opts = ALL_CFS + .iter() + .map(|cf| CFOptions::new(cf, ColumnFamilyOptions::new())) + .collect(); + let db = new_engine_opt(path_str, DBOptions::new(), cfs_opts).unwrap(); + + let wb = RawWriteBatch::default(); + let ts: u8 = 12; + let keys: Vec<_> = vec![ + b"k1".to_vec(), + b"k2".to_vec(), + b"k3".to_vec(), + b"k4".to_vec(), + ] + .into_iter() + .map(|mut k| { + k.append(&mut vec![ts; 8]); + k + }) + .collect(); + + let mut kvs: Vec<(&[u8], &[u8])> = vec![]; + for (_, key) in keys.iter().enumerate() { + kvs.push((key.as_slice(), b"value")); + } + let kvs_left: Vec<(&[u8], &[u8])> = vec![(kvs[0].0, kvs[0].1), (kvs[3].0, kvs[3].1)]; + for &(k, v) in kvs.as_slice() { + for cf in ALL_CFS { + let handle = get_cf_handle(&db, cf).unwrap(); + wb.put_cf(handle, k, v).unwrap(); + } + } + db.write(&wb).unwrap(); + check_data(&db, ALL_CFS, kvs.as_slice()); + + // Delete all in ["k2", "k4"). + let start = b"k2"; + let end = b"k4"; + delete_all_in_range(&db, start, end, use_delete_range).unwrap(); + check_data(&db, ALL_CFS, kvs_left.as_slice()); + } + + #[test] + fn test_delete_all_in_range_use_delete_range() { + test_delete_all_in_range(true); + } + + #[test] + fn test_delete_all_in_range_not_use_delete_range() { + test_delete_all_in_range(false); + } + + #[test] + fn test_delete_all_files_in_range() { + let path = Builder::new() + .prefix("engine_delete_all_files_in_range") + .tempdir() + .unwrap(); + let path_str = path.path().to_str().unwrap(); + + let cfs_opts = ALL_CFS + .iter() + .map(|cf| { + let mut cf_opts = ColumnFamilyOptions::new(); + cf_opts.set_level_zero_file_num_compaction_trigger(1); + CFOptions::new(cf, cf_opts) + }) + .collect(); + let db = new_engine_opt(path_str, DBOptions::new(), cfs_opts).unwrap(); + + let keys = vec![b"k1", b"k2", b"k3", b"k4"]; + + let mut kvs: Vec<(&[u8], &[u8])> = vec![]; + for key in keys { + kvs.push((key, b"value")); + } + let kvs_left: Vec<(&[u8], &[u8])> = vec![(kvs[0].0, kvs[0].1), (kvs[3].0, kvs[3].1)]; + for cf in ALL_CFS { + let handle = get_cf_handle(&db, cf).unwrap(); + for &(k, v) in kvs.as_slice() { + db.put_cf(handle, k, v).unwrap(); + db.flush_cf(handle, true).unwrap(); + } + } + check_data(&db, ALL_CFS, kvs.as_slice()); + + delete_all_files_in_range(&db, b"k2", b"k4").unwrap(); + check_data(&db, ALL_CFS, kvs_left.as_slice()); + } + + #[test] + fn test_delete_range_prefix_bloom_case() { + let path = Builder::new() + .prefix("engine_delete_range_prefix_bloom") + .tempdir() + .unwrap(); + let path_str = path.path().to_str().unwrap(); + + let mut opts = DBOptions::new(); + opts.create_if_missing(true); + + let mut cf_opts = ColumnFamilyOptions::new(); + // Prefix extractor(trim the timestamp at tail) for write cf. + cf_opts + .set_prefix_extractor( + "FixedSuffixSliceTransform", + Box::new(rocks::util::FixedSuffixSliceTransform::new(8)), + ) + .unwrap_or_else(|err| panic!("{:?}", err)); + // Create prefix bloom filter for memtable. + cf_opts.set_memtable_prefix_bloom_size_ratio(0.1 as f64); + let cf = "default"; + let db = DB::open_cf(opts, path_str, vec![(cf, cf_opts)]).unwrap(); + let wb = RawWriteBatch::default(); + let kvs: Vec<(&[u8], &[u8])> = vec![ + (b"kabcdefg1", b"v1"), + (b"kabcdefg2", b"v2"), + (b"kabcdefg3", b"v3"), + (b"kabcdefg4", b"v4"), + ]; + let kvs_left: Vec<(&[u8], &[u8])> = vec![(b"kabcdefg1", b"v1"), (b"kabcdefg4", b"v4")]; + + for &(k, v) in kvs.as_slice() { + let handle = get_cf_handle(&db, cf).unwrap(); + wb.put_cf(handle, k, v).unwrap(); + } + db.write(&wb).unwrap(); + check_data(&db, &[cf], kvs.as_slice()); + + // Delete all in ["k2", "k4"). + delete_all_in_range(&db, b"kabcdefg2", b"kabcdefg4", true).unwrap(); + check_data(&db, &[cf], kvs_left.as_slice()); + } +} From 112aaf3899a7ee70b466f9836fe10ffe8ce7498c Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 11:46:43 +0800 Subject: [PATCH 2/9] remove util Signed-off-by: 5kbpers --- Cargo.lock | 26 +++ components/engine_traits/src/lib.rs | 2 - components/engine_traits/src/util.rs | 301 --------------------------- 3 files changed, 26 insertions(+), 303 deletions(-) delete mode 100644 components/engine_traits/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 6e6d3050532..f51013fd079 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,6 +682,31 @@ dependencies = [ "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "engine_traits" +version = "0.0.1" +dependencies = [ + "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "kvproto 0.0.2 (git+https://github.com/pingcap/kvproto.git)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "raft 0.6.0-alpha (registry+https://github.com/rust-lang/crates.io-index)", + "rocksdb 0.3.0 (git+https://github.com/pingcap/rust-rocksdb.git)", + "serde 1.0.71 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-global 0.1.0 (git+https://github.com/breeswish/slog-global.git?rev=91904ade)", + "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tikv_alloc 0.1.0", + "tikv_util 0.1.0", + "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", + "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "enum-primitive-derive" version = "0.1.2" @@ -2835,6 +2860,7 @@ dependencies = [ "crossbeam 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "engine 0.0.1", + "engine_traits 0.0.1", "fail 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "farmhash 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 073b85417d9..96ff66597a4 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -17,8 +17,6 @@ extern crate serde_derive; #[allow(unused_extern_crates)] extern crate tikv_alloc; -pub mod util; - mod errors; pub use crate::errors::*; mod peekable; diff --git a/components/engine_traits/src/util.rs b/components/engine_traits/src/util.rs deleted file mode 100644 index 10b9f4e2d24..00000000000 --- a/components/engine_traits/src/util.rs +++ /dev/null @@ -1,301 +0,0 @@ -// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - -use std::u64; - -use crate::rocks; -use crate::rocks::util::get_cf_handle; -use crate::rocks::{DBIterator, Range, RawWriteBatch, TablePropertiesCollection, Writable, DB}; -use crate::CF_LOCK; - -use super::{Error, Result}; -use super::{IterOptions, MAX_DELETE_BATCH_SIZE}; -use tikv_util::keybuilder::KeyBuilder; - -pub fn put_msg_cf( - db: &DB, - wb: &RawWriteBatch, - cf: &str, - key: &[u8], - m: &M, -) -> Result<()> { - let handle = get_cf_handle(db, cf)?; - wb.put_cf(handle, key, &m.write_to_bytes()?)?; - Ok(()) -} - -/// Check if key in range [`start_key`, `end_key`). -pub fn check_key_in_range( - key: &[u8], - region_id: u64, - start_key: &[u8], - end_key: &[u8], -) -> Result<()> { - if key >= start_key && (end_key.is_empty() || key < end_key) { - Ok(()) - } else { - Err(Error::NotInRange( - key.to_vec(), - region_id, - start_key.to_vec(), - end_key.to_vec(), - )) - } -} - -pub fn delete_all_in_range( - db: &DB, - start_key: &[u8], - end_key: &[u8], - use_delete_range: bool, -) -> Result<()> { - if start_key >= end_key { - return Ok(()); - } - - for cf in db.cf_names() { - delete_all_in_range_cf(db, cf, start_key, end_key, use_delete_range)?; - } - - Ok(()) -} - -pub fn delete_all_in_range_cf( - db: &DB, - cf: &str, - start_key: &[u8], - end_key: &[u8], - use_delete_range: bool, -) -> Result<()> { - let handle = rocks::util::get_cf_handle(db, cf)?; - let wb = RawWriteBatch::default(); - if use_delete_range && cf != CF_LOCK { - wb.delete_range_cf(handle, start_key, end_key)?; - } else { - let start = KeyBuilder::from_slice(start_key, 0, 0); - let end = KeyBuilder::from_slice(end_key, 0, 0); - let mut iter_opt = IterOptions::new(Some(start), Some(end), false); - if db.is_titan() { - // Cause DeleteFilesInRange may expose old blob index keys, setting key only for Titan - // to avoid referring to missing blob files. - iter_opt.key_only(true); - } - let handle = rocks::util::get_cf_handle(db, cf)?; - let mut it = DBIterator::new_cf(db, handle, iter_opt.into()); - it.seek(start_key.into()); - while it.valid() { - wb.delete_cf(handle, it.key())?; - if wb.data_size() >= MAX_DELETE_BATCH_SIZE { - // Can't use write_without_wal here. - // Otherwise it may cause dirty data when applying snapshot. - db.write(&wb)?; - wb.clear(); - } - - if !it.next() { - break; - } - } - it.status()?; - } - - if wb.count() > 0 { - db.write(&wb)?; - } - - Ok(()) -} - -pub fn delete_all_files_in_range(db: &DB, start_key: &[u8], end_key: &[u8]) -> Result<()> { - if start_key >= end_key { - return Ok(()); - } - - for cf in db.cf_names() { - let handle = rocks::util::get_cf_handle(db, cf)?; - db.delete_files_in_range_cf(handle, start_key, end_key, false)?; - } - - Ok(()) -} - -pub fn get_range_properties_cf( - db: &DB, - cfname: &str, - start_key: &[u8], - end_key: &[u8], -) -> Result { - let cf = rocks::util::get_cf_handle(db, cfname)?; - let range = Range::new(start_key, end_key); - db.get_properties_of_tables_in_range(cf, &[range]) - .map_err(|e| e.into()) -} - -#[cfg(test)] -mod tests { - use tempfile::Builder; - - use crate::rocks; - use crate::rocks::util::{get_cf_handle, new_engine_opt, CFOptions}; - use crate::rocks::{ - DBOptions, RawCFOptions as ColumnFamilyOptions, RawSeekKey as SeekKey, RawWriteBatch, - Writable, DB, - }; - use crate::ALL_CFS; - - use super::*; - - fn check_data(db: &DB, cfs: &[&str], expected: &[(&[u8], &[u8])]) { - for cf in cfs { - let handle = get_cf_handle(db, cf).unwrap(); - let mut iter = db.iter_cf(handle); - iter.seek(SeekKey::Start); - for &(k, v) in expected { - assert_eq!(k, iter.key()); - assert_eq!(v, iter.value()); - iter.next(); - } - assert!(!iter.valid()); - } - } - - fn test_delete_all_in_range(use_delete_range: bool) { - let path = Builder::new() - .prefix("engine_delete_all_in_range") - .tempdir() - .unwrap(); - let path_str = path.path().to_str().unwrap(); - - let cfs_opts = ALL_CFS - .iter() - .map(|cf| CFOptions::new(cf, ColumnFamilyOptions::new())) - .collect(); - let db = new_engine_opt(path_str, DBOptions::new(), cfs_opts).unwrap(); - - let wb = RawWriteBatch::default(); - let ts: u8 = 12; - let keys: Vec<_> = vec![ - b"k1".to_vec(), - b"k2".to_vec(), - b"k3".to_vec(), - b"k4".to_vec(), - ] - .into_iter() - .map(|mut k| { - k.append(&mut vec![ts; 8]); - k - }) - .collect(); - - let mut kvs: Vec<(&[u8], &[u8])> = vec![]; - for (_, key) in keys.iter().enumerate() { - kvs.push((key.as_slice(), b"value")); - } - let kvs_left: Vec<(&[u8], &[u8])> = vec![(kvs[0].0, kvs[0].1), (kvs[3].0, kvs[3].1)]; - for &(k, v) in kvs.as_slice() { - for cf in ALL_CFS { - let handle = get_cf_handle(&db, cf).unwrap(); - wb.put_cf(handle, k, v).unwrap(); - } - } - db.write(&wb).unwrap(); - check_data(&db, ALL_CFS, kvs.as_slice()); - - // Delete all in ["k2", "k4"). - let start = b"k2"; - let end = b"k4"; - delete_all_in_range(&db, start, end, use_delete_range).unwrap(); - check_data(&db, ALL_CFS, kvs_left.as_slice()); - } - - #[test] - fn test_delete_all_in_range_use_delete_range() { - test_delete_all_in_range(true); - } - - #[test] - fn test_delete_all_in_range_not_use_delete_range() { - test_delete_all_in_range(false); - } - - #[test] - fn test_delete_all_files_in_range() { - let path = Builder::new() - .prefix("engine_delete_all_files_in_range") - .tempdir() - .unwrap(); - let path_str = path.path().to_str().unwrap(); - - let cfs_opts = ALL_CFS - .iter() - .map(|cf| { - let mut cf_opts = ColumnFamilyOptions::new(); - cf_opts.set_level_zero_file_num_compaction_trigger(1); - CFOptions::new(cf, cf_opts) - }) - .collect(); - let db = new_engine_opt(path_str, DBOptions::new(), cfs_opts).unwrap(); - - let keys = vec![b"k1", b"k2", b"k3", b"k4"]; - - let mut kvs: Vec<(&[u8], &[u8])> = vec![]; - for key in keys { - kvs.push((key, b"value")); - } - let kvs_left: Vec<(&[u8], &[u8])> = vec![(kvs[0].0, kvs[0].1), (kvs[3].0, kvs[3].1)]; - for cf in ALL_CFS { - let handle = get_cf_handle(&db, cf).unwrap(); - for &(k, v) in kvs.as_slice() { - db.put_cf(handle, k, v).unwrap(); - db.flush_cf(handle, true).unwrap(); - } - } - check_data(&db, ALL_CFS, kvs.as_slice()); - - delete_all_files_in_range(&db, b"k2", b"k4").unwrap(); - check_data(&db, ALL_CFS, kvs_left.as_slice()); - } - - #[test] - fn test_delete_range_prefix_bloom_case() { - let path = Builder::new() - .prefix("engine_delete_range_prefix_bloom") - .tempdir() - .unwrap(); - let path_str = path.path().to_str().unwrap(); - - let mut opts = DBOptions::new(); - opts.create_if_missing(true); - - let mut cf_opts = ColumnFamilyOptions::new(); - // Prefix extractor(trim the timestamp at tail) for write cf. - cf_opts - .set_prefix_extractor( - "FixedSuffixSliceTransform", - Box::new(rocks::util::FixedSuffixSliceTransform::new(8)), - ) - .unwrap_or_else(|err| panic!("{:?}", err)); - // Create prefix bloom filter for memtable. - cf_opts.set_memtable_prefix_bloom_size_ratio(0.1 as f64); - let cf = "default"; - let db = DB::open_cf(opts, path_str, vec![(cf, cf_opts)]).unwrap(); - let wb = RawWriteBatch::default(); - let kvs: Vec<(&[u8], &[u8])> = vec![ - (b"kabcdefg1", b"v1"), - (b"kabcdefg2", b"v2"), - (b"kabcdefg3", b"v3"), - (b"kabcdefg4", b"v4"), - ]; - let kvs_left: Vec<(&[u8], &[u8])> = vec![(b"kabcdefg1", b"v1"), (b"kabcdefg4", b"v4")]; - - for &(k, v) in kvs.as_slice() { - let handle = get_cf_handle(&db, cf).unwrap(); - wb.put_cf(handle, k, v).unwrap(); - } - db.write(&wb).unwrap(); - check_data(&db, &[cf], kvs.as_slice()); - - // Delete all in ["k2", "k4"). - delete_all_in_range(&db, b"kabcdefg2", b"kabcdefg4", true).unwrap(); - check_data(&db, &[cf], kvs_left.as_slice()); - } -} From 77f44a4c74d34a91a91303bac6a82f34488e4214 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 11:50:47 +0800 Subject: [PATCH 3/9] fix typo & derive Clone for KeyBuilder Signed-off-by: 5kbpers --- components/engine_traits/src/lib.rs | 16 ++++++++-------- components/tikv_util/src/keybuilder.rs | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 96ff66597a4..220973dd9d4 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -2,10 +2,10 @@ #![recursion_limit = "200"] -#[macro_use(slog_error, slog_warn)] -extern crate slog; -#[macro_use] -extern crate slog_global; +//#[macro_use(slog_error, slog_warn)] +//extern crate slog; +//#[macro_use] +//extern crate slog_global; #[macro_use] extern crate prometheus; #[macro_use] @@ -93,11 +93,11 @@ impl Engines { } } - pub fn write_kv(&self, wb: &RawWriteBatch) -> Result<()> { + pub fn write_kv(&self, wb: &WriteBatch) -> Result<()> { self.kv.write(wb).map_err(Error::Engine) } - pub fn write_kv_opt(&self, wb: &RawWriteBatch, opts: &WriteOptions) -> Result<()> { + pub fn write_kv_opt(&self, wb: &WriteBatch, opts: &WriteOptions) -> Result<()> { self.kv.write_opt(wb, &opts.into()).map_err(Error::Engine) } @@ -105,11 +105,11 @@ impl Engines { self.kv.sync_wal().map_err(Error::Engine) } - pub fn write_raft(&self, wb: &RawWriteBatch) -> Result<()> { + pub fn write_raft(&self, wb: &WriteBatch) -> Result<()> { self.raft.write(wb).map_err(Error::Engine) } - pub fn write_raft_opt(&self, wb: &RawWriteBatch, opts: &WriteOptions) -> Result<()> { + pub fn write_raft_opt(&self, wb: &WriteBatch, opts: &WriteOptions) -> Result<()> { self.raft.write_opt(wb, &opts.into()).map_err(Error::Engine) } diff --git a/components/tikv_util/src/keybuilder.rs b/components/tikv_util/src/keybuilder.rs index b0026e75752..3d089561724 100644 --- a/components/tikv_util/src/keybuilder.rs +++ b/components/tikv_util/src/keybuilder.rs @@ -2,6 +2,7 @@ use std::ptr; +#[derive(Clone)] pub struct KeyBuilder { buf: Vec, start: usize, From a1b024aad239e7037ebab748a4e258131e05e3c2 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 11:54:21 +0800 Subject: [PATCH 4/9] remove type Engines & unused import Signed-off-by: 5kbpers --- components/engine_traits/src/lib.rs | 49 ----------------------------- 1 file changed, 49 deletions(-) diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 220973dd9d4..3599ae51f67 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -2,18 +2,8 @@ #![recursion_limit = "200"] -//#[macro_use(slog_error, slog_warn)] -//extern crate slog; -//#[macro_use] -//extern crate slog_global; -#[macro_use] -extern crate prometheus; -#[macro_use] -extern crate lazy_static; #[macro_use] extern crate quick_error; -#[macro_use] -extern crate serde_derive; #[allow(unused_extern_crates)] extern crate tikv_alloc; @@ -31,9 +21,6 @@ mod engine; pub use crate::engine::*; mod options; pub use crate::options::*; -use engine_rocksdb::{WriteBatch, DB}; - -use std::sync::Arc; pub const DATA_KEY_PREFIX_LEN: usize = 1; @@ -48,8 +35,6 @@ pub struct KvEngines { pub shared_block_cache: bool, } -pub type Engines = KvEngines>; - impl KvEngines { pub fn new(kv_engine: E, raft_engine: E, shared_block_cache: bool) -> Self { KvEngines { @@ -83,37 +68,3 @@ impl KvEngines { self.raft.sync() } } - -impl Engines { - pub fn new(kv_engine: Arc, raft_engine: Arc, shared_block_cache: bool) -> Self { - KvEngines { - kv: kv_engine, - raft: raft_engine, - shared_block_cache, - } - } - - pub fn write_kv(&self, wb: &WriteBatch) -> Result<()> { - self.kv.write(wb).map_err(Error::Engine) - } - - pub fn write_kv_opt(&self, wb: &WriteBatch, opts: &WriteOptions) -> Result<()> { - self.kv.write_opt(wb, &opts.into()).map_err(Error::Engine) - } - - pub fn sync_kv(&self) -> Result<()> { - self.kv.sync_wal().map_err(Error::Engine) - } - - pub fn write_raft(&self, wb: &WriteBatch) -> Result<()> { - self.raft.write(wb).map_err(Error::Engine) - } - - pub fn write_raft_opt(&self, wb: &WriteBatch, opts: &WriteOptions) -> Result<()> { - self.raft.write_opt(wb, &opts.into()).map_err(Error::Engine) - } - - pub fn sync_raft(&self) -> Result<()> { - self.raft.sync_wal().map_err(Error::Engine) - } -} From ad673f893f09afab7372c967f466f033de7c2205 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 11:59:45 +0800 Subject: [PATCH 5/9] remove rocksdb dependency Signed-off-by: 5kbpers --- Cargo.lock | 1 - components/engine_traits/Cargo.toml | 9 --------- components/engine_traits/src/options.rs | 2 +- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f51013fd079..91acc85672a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -694,7 +694,6 @@ dependencies = [ "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "raft 0.6.0-alpha (registry+https://github.com/rust-lang/crates.io-index)", - "rocksdb 0.3.0 (git+https://github.com/pingcap/rust-rocksdb.git)", "serde 1.0.71 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/engine_traits/Cargo.toml b/components/engine_traits/Cargo.toml index 601a3717865..60b2f7913f2 100644 --- a/components/engine_traits/Cargo.toml +++ b/components/engine_traits/Cargo.toml @@ -4,11 +4,6 @@ version = "0.0.1" edition = "2018" publish = false -[features] -jemalloc = ["engine_rocksdb/jemalloc"] -portable = ["engine_rocksdb/portable"] -sse = ["engine_rocksdb/sse"] - [dependencies] kvproto = { git = "https://github.com/pingcap/kvproto.git" } protobuf = "2" @@ -32,9 +27,5 @@ version = "0.4.2" default-features = false features = ["nightly"] -[dependencies.engine_rocksdb] -git = "https://github.com/pingcap/rust-rocksdb.git" -package = "rocksdb" - [dev-dependencies] tempfile = "3.0" diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index 2c28aceef17..eb46760e951 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -165,7 +165,7 @@ impl Default for IterOptions { #[derive(Clone, Copy)] pub enum DeleteRangeType { - Normal, + WriteBatch, DeleteFiles, DeleteRange, } From 03340c180928bc4002267c6bd43fdfa2f3f45623 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 16:47:12 +0800 Subject: [PATCH 6/9] address comments Signed-off-by: 5kbpers --- Cargo.lock | 13 ------------- components/engine_traits/Cargo.toml | 19 ------------------- components/engine_traits/src/errors.rs | 22 ---------------------- components/engine_traits/src/lib.rs | 18 +++++++++--------- 4 files changed, 9 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91acc85672a..938a1157a06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -686,24 +686,11 @@ dependencies = [ name = "engine_traits" version = "0.0.1" dependencies = [ - "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "kvproto 0.0.2 (git+https://github.com/pingcap/kvproto.git)", - "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prometheus 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "raft 0.6.0-alpha (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.71 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", - "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", - "slog-global 0.1.0 (git+https://github.com/breeswish/slog-global.git?rev=91904ade)", - "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tempfile 3.0.8 (registry+https://github.com/rust-lang/crates.io-index)", "tikv_alloc 0.1.0", "tikv_util 0.1.0", - "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", - "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/components/engine_traits/Cargo.toml b/components/engine_traits/Cargo.toml index 60b2f7913f2..c9180432c97 100644 --- a/components/engine_traits/Cargo.toml +++ b/components/engine_traits/Cargo.toml @@ -5,27 +5,8 @@ edition = "2018" publish = false [dependencies] -kvproto = { git = "https://github.com/pingcap/kvproto.git" } protobuf = "2" -raft = "0.6.0-alpha" quick-error = "1.2.2" -crc = "1.8" -lazy_static = "1.3" -slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } -slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" } -time = "0.1" -sys-info = "0.5.7" tikv_alloc = { path = "../tikv_alloc", default-features = false } -serde = "1.0" -serde_derive = "1.0" -toml = "0.4" hex = "0.3" tikv_util = { path = "../tikv_util" } - -[dependencies.prometheus] -version = "0.4.2" -default-features = false -features = ["nightly"] - -[dev-dependencies] -tempfile = "3.0" diff --git a/components/engine_traits/src/errors.rs b/components/engine_traits/src/errors.rs index a4f246cae09..5ea4e948e65 100644 --- a/components/engine_traits/src/errors.rs +++ b/components/engine_traits/src/errors.rs @@ -42,25 +42,3 @@ quick_error! { } pub type Result = result::Result; - -impl From for raft::Error { - fn from(err: Error) -> raft::Error { - raft::Error::Store(raft::StorageError::Other(err.into())) - } -} - -impl From for kvproto::errorpb::Error { - fn from(err: Error) -> kvproto::errorpb::Error { - let mut errorpb = kvproto::errorpb::Error::default(); - errorpb.set_message(format!("{}", err)); - - if let Error::NotInRange(key, region_id, start_key, end_key) = err { - errorpb.mut_key_not_in_region().set_key(key); - errorpb.mut_key_not_in_region().set_region_id(region_id); - errorpb.mut_key_not_in_region().set_start_key(start_key); - errorpb.mut_key_not_in_region().set_end_key(end_key); - } - - errorpb - } -} diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 3599ae51f67..50a0a3d682d 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -29,14 +29,14 @@ pub const DATA_KEY_PREFIX_LEN: usize = 1; pub const MAX_DELETE_BATCH_SIZE: usize = 32 * 1024; #[derive(Clone, Debug)] -pub struct KvEngines { - pub kv: E, - pub raft: E, +pub struct KvEngines { + pub kv: K, + pub raft: R, pub shared_block_cache: bool, } -impl KvEngines { - pub fn new(kv_engine: E, raft_engine: E, shared_block_cache: bool) -> Self { +impl KvEngines { + pub fn new(kv_engine: K, raft_engine: R, shared_block_cache: bool) -> Self { KvEngines { kv: kv_engine, raft: raft_engine, @@ -44,11 +44,11 @@ impl KvEngines { } } - pub fn write_kv(&self, wb: &E::Batch) -> Result<()> { + pub fn write_kv(&self, wb: &K::Batch) -> Result<()> { self.kv.write(wb) } - pub fn write_kv_opt(&self, wb: &E::Batch, opts: &WriteOptions) -> Result<()> { + pub fn write_kv_opt(&self, wb: &K::Batch, opts: &WriteOptions) -> Result<()> { self.kv.write_opt(opts, wb) } @@ -56,11 +56,11 @@ impl KvEngines { self.kv.sync() } - pub fn write_raft(&self, wb: &E::Batch) -> Result<()> { + pub fn write_raft(&self, wb: &R::Batch) -> Result<()> { self.raft.write(wb) } - pub fn write_raft_opt(&self, wb: &E::Batch, opts: &WriteOptions) -> Result<()> { + pub fn write_raft_opt(&self, wb: &R::Batch, opts: &WriteOptions) -> Result<()> { self.raft.write_opt(opts, wb) } From bf74a50f8263edb2e43fcdcbe1f7ac7a83b6a6e2 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 11 Sep 2019 17:00:08 +0800 Subject: [PATCH 7/9] address comments Signed-off-by: 5kbpers --- components/engine_traits/src/engine.rs | 7 +------ components/engine_traits/src/options.rs | 21 --------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/components/engine_traits/src/engine.rs b/components/engine_traits/src/engine.rs index d952a92893d..c1a73231b31 100644 --- a/components/engine_traits/src/engine.rs +++ b/components/engine_traits/src/engine.rs @@ -53,13 +53,8 @@ pub trait KvEngine: Peekable + Mutable + Iterable + Send + Sync + Clone { start_key: &[u8], end_key: &[u8], ) -> Result<()>; - fn ingest_external_file_cf( - &self, - opts: &IngestExternalFileOptions, - cf: &str, - files: &[&str], - ) -> Result<()>; + fn ingest_external_file_cf(&self, cf: &str, files: &[&str]) -> Result<()>; fn validate_file_for_ingestion>( &self, cf: &str, diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index eb46760e951..458921e815e 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -169,24 +169,3 @@ pub enum DeleteRangeType { DeleteFiles, DeleteRange, } - -#[derive(Clone)] -pub struct IngestExternalFileOptions { - pub move_files: bool, -} - -impl IngestExternalFileOptions { - pub fn new() -> IngestExternalFileOptions { - IngestExternalFileOptions { move_files: false } - } - - pub fn move_files(&mut self, v: bool) { - self.move_files = v; - } -} - -impl Default for IngestExternalFileOptions { - fn default() -> IngestExternalFileOptions { - IngestExternalFileOptions { move_files: false } - } -} From 848483dbd0c1612dcc1d546b186b583bc3be7169 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Thu, 19 Sep 2019 11:38:50 +0800 Subject: [PATCH 8/9] address comments Signed-off-by: 5kbpers --- components/backup/src/endpoint.rs | 2 +- components/engine/src/errors.rs | 4 +-- components/engine/src/iterable.rs | 4 +-- components/engine_traits/src/errors.rs | 5 ++-- components/engine_traits/src/iterable.rs | 8 ++--- components/engine_traits/src/options.rs | 30 +++++++------------ components/engine_traits/src/peekable.rs | 2 -- .../raftstore/test_update_region_size.rs | 4 +-- 8 files changed, 23 insertions(+), 36 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 8d4e6800329..636acdf48bb 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -748,7 +748,7 @@ pub mod tests { let commit = alloc_ts(); must_commit(&engine, key.as_bytes(), start, commit); - // Test whether it can correctly convert not leader to regoin error. + // Test whether it can correctly convert not leader to region error. engine.trigger_not_leader(); let now = alloc_ts(); req.set_start_version(now); diff --git a/components/engine/src/errors.rs b/components/engine/src/errors.rs index da5c7b77a49..bc81621eb44 100644 --- a/components/engine/src/errors.rs +++ b/components/engine/src/errors.rs @@ -12,11 +12,11 @@ quick_error! { display("RocksDb {}", msg) } // FIXME: It should not know Region. - NotInRange( key: Vec, regoin_id: u64, start: Vec, end: Vec) { + NotInRange( key: Vec, region_id: u64, start: Vec, end: Vec) { description("Key is out of range") display( "Key {} is out of [region {}] [{}, {})", - hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end) + hex::encode_upper(&key), region_id, hex::encode_upper(&start), hex::encode_upper(&end) ) } Protobuf(err: protobuf::ProtobufError) { diff --git a/components/engine/src/iterable.rs b/components/engine/src/iterable.rs index 0b2b672c51c..af2e026e82e 100644 --- a/components/engine/src/iterable.rs +++ b/components/engine/src/iterable.rs @@ -174,14 +174,14 @@ pub trait Iterable { scan_impl(self.new_iterator_cf(cf, iter_opt)?, start_key, f) } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek(&self, key: &[u8]) -> Result, Vec)>> { let mut iter = self.new_iterator(IterOption::default()); iter.seek(key.into()); Ok(iter.kv()) } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek_cf(&self, cf: &str, key: &[u8]) -> Result, Vec)>> { let mut iter = self.new_iterator_cf(cf, IterOption::default())?; iter.seek(key.into()); diff --git a/components/engine_traits/src/errors.rs b/components/engine_traits/src/errors.rs index 5ea4e948e65..8f22b64b25f 100644 --- a/components/engine_traits/src/errors.rs +++ b/components/engine_traits/src/errors.rs @@ -12,11 +12,11 @@ quick_error! { display("Storage Engine {}", msg) } // FIXME: It should not know Region. - NotInRange( key: Vec, regoin_id: u64, start: Vec, end: Vec) { + NotInRange( key: Vec, region_id: u64, start: Vec, end: Vec) { description("Key is out of range") display( "Key {} is out of [region {}] [{}, {})", - hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end) + hex::encode_upper(&key), region_id, hex::encode_upper(&start), hex::encode_upper(&end) ) } Protobuf(err: protobuf::ProtobufError) { @@ -31,7 +31,6 @@ quick_error! { description(err.description()) display("Io {}", err) } - Other(err: Box) { from() cause(err.as_ref()) diff --git a/components/engine_traits/src/iterable.rs b/components/engine_traits/src/iterable.rs index 4ae79fa96a0..55bcdb151f6 100644 --- a/components/engine_traits/src/iterable.rs +++ b/components/engine_traits/src/iterable.rs @@ -72,7 +72,7 @@ pub trait Iterable { scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f) } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek(&self, key: &[u8]) -> Result, Vec)>> { let mut iter = self.iterator()?; iter.seek(SeekKey::Key(key)); @@ -83,7 +83,7 @@ pub trait Iterable { } } - // Seek the first key >= given key, if no found, return None. + // Seek the first key >= given key, if not found, return None. fn seek_cf(&self, cf: &str, key: &[u8]) -> Result, Vec)>> { let mut iter = self.iterator_cf(cf)?; iter.seek(SeekKey::Key(key)); @@ -95,14 +95,14 @@ pub trait Iterable { } } -fn scan_impl(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()> +fn scan_impl(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()> where + Iter: Iterator, F: FnMut(&[u8], &[u8]) -> Result, { it.seek(SeekKey::Key(start_key)); while it.valid() { let r = f(it.key()?, it.value()?)?; - if !r || !it.next() { break; } diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index 458921e815e..686d5a12354 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -20,7 +20,7 @@ impl Default for ReadOptions { #[derive(Clone)] pub struct WriteOptions { - pub sync: bool, + sync: bool, } impl WriteOptions { @@ -56,12 +56,12 @@ impl Default for CFOptions { #[derive(Clone)] pub struct IterOptions { - pub lower_bound: Option, - pub upper_bound: Option, - pub prefix_same_as_start: bool, - pub fill_cache: bool, - pub key_only: bool, - pub seek_mode: SeekMode, + lower_bound: Option, + upper_bound: Option, + prefix_same_as_start: bool, + fill_cache: bool, + key_only: bool, + seek_mode: SeekMode, } impl IterOptions { @@ -80,33 +80,27 @@ impl IterOptions { } } - #[inline] pub fn use_prefix_seek(mut self) -> IterOptions { self.seek_mode = SeekMode::Prefix; self } - #[inline] pub fn total_order_seek_used(&self) -> bool { self.seek_mode == SeekMode::TotalOrder } - #[inline] - pub fn fill_cache(&mut self, v: bool) { + pub fn set_fill_cache(&mut self, v: bool) { self.fill_cache = v; } - #[inline] - pub fn key_only(&mut self, v: bool) { + pub fn set_key_only(&mut self, v: bool) { self.key_only = v; } - #[inline] pub fn lower_bound(&self) -> Option<&[u8]> { self.lower_bound.as_ref().map(|v| v.as_slice()) } - #[inline] pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); self.lower_bound = Some(builder); @@ -122,12 +116,10 @@ impl IterOptions { } } - #[inline] pub fn upper_bound(&self) -> Option<&[u8]> { self.upper_bound.as_ref().map(|v| v.as_slice()) } - #[inline] pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); self.upper_bound = Some(builder); @@ -143,10 +135,8 @@ impl IterOptions { } } - #[inline] - pub fn set_prefix_same_as_start(mut self, enable: bool) -> IterOptions { + pub fn set_prefix_same_as_start(&mut self, enable: bool) { self.prefix_same_as_start = enable; - self } } diff --git a/components/engine_traits/src/peekable.rs b/components/engine_traits/src/peekable.rs index 6729fcec6e3..4fe3d6a2cef 100644 --- a/components/engine_traits/src/peekable.rs +++ b/components/engine_traits/src/peekable.rs @@ -16,7 +16,6 @@ pub trait Peekable { fn get_msg(&self, key: &[u8]) -> Result> { let value = self.get(key)?; - if value.is_none() { return Ok(None); } @@ -32,7 +31,6 @@ pub trait Peekable { key: &[u8], ) -> Result> { let value = self.get_cf(cf, key)?; - if value.is_none() { return Ok(None); } diff --git a/tests/integrations/raftstore/test_update_region_size.rs b/tests/integrations/raftstore/test_update_region_size.rs index 6154ee2db79..eae9be2c638 100644 --- a/tests/integrations/raftstore/test_update_region_size.rs +++ b/tests/integrations/raftstore/test_update_region_size.rs @@ -13,7 +13,7 @@ fn flush(cluster: &mut Cluster) { } } -fn test_update_regoin_size(cluster: &mut Cluster) { +fn test_update_region_size(cluster: &mut Cluster) { cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(50); cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(50); cluster.cfg.raft_store.region_split_check_diff = ReadableSize::kb(1); @@ -70,5 +70,5 @@ fn test_update_regoin_size(cluster: &mut Cluster) { fn test_server_update_region_size() { let count = 1; let mut cluster = new_server_cluster(0, count); - test_update_regoin_size(&mut cluster); + test_update_region_size(&mut cluster); } From 7c93374b665e5acdfc1c4ba19b2a68b0e68d8037 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Thu, 19 Sep 2019 11:57:00 +0800 Subject: [PATCH 9/9] add util.rs Signed-off-by: 5kbpers --- components/engine_traits/src/lib.rs | 1 + components/engine_traits/src/util.rs | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 components/engine_traits/src/util.rs diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 50a0a3d682d..f10e76b7cb9 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -21,6 +21,7 @@ mod engine; pub use crate::engine::*; mod options; pub use crate::options::*; +mod util; pub const DATA_KEY_PREFIX_LEN: usize = 1; diff --git a/components/engine_traits/src/util.rs b/components/engine_traits/src/util.rs new file mode 100644 index 00000000000..d1da9142dd1 --- /dev/null +++ b/components/engine_traits/src/util.rs @@ -0,0 +1,23 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use super::{Error, Result}; + +/// Check if key in range [`start_key`, `end_key`). +#[allow(dead_code)] +pub fn check_key_in_range( + key: &[u8], + region_id: u64, + start_key: &[u8], + end_key: &[u8], +) -> Result<()> { + if key >= start_key && (end_key.is_empty() || key < end_key) { + Ok(()) + } else { + Err(Error::NotInRange( + key.to_vec(), + region_id, + start_key.to_vec(), + end_key.to_vec(), + )) + } +}