diff --git a/Cargo.lock b/Cargo.lock index e3973a438c1..567154f8150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,7 +426,7 @@ dependencies = [ "panic_hook", "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.2", - "static_assertions 1.0.0", + "static_assertions 1.1.0", "tikv_alloc", ] @@ -727,6 +727,7 @@ dependencies = [ name = "engine" version = "0.0.1" dependencies = [ + "engine_traits", "hex", "kvproto", "lazy_static", @@ -765,6 +766,7 @@ dependencies = [ "serde_derive", "slog", "slog-global", + "static_assertions 1.1.0", "sys-info", "tempfile", "tikv_alloc", @@ -2983,9 +2985,9 @@ checksum = "c19be23126415861cb3a23e501d34a708f7f9b2183c5252d690941c2e69199d5" [[package]] name = "static_assertions" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa13613355688665b68639b1c378a62dbedea78aff0fc59a4fa656cbbdec657" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "str_stack" diff --git a/components/engine/Cargo.toml b/components/engine/Cargo.toml index 48b05b2ddad..5887f335ae5 100644 --- a/components/engine/Cargo.toml +++ b/components/engine/Cargo.toml @@ -25,6 +25,7 @@ serde_derive = "1.0" toml = "0.4" hex = "0.3" tikv_util = { path = "../tikv_util" } +engine_traits = { path = "../engine_traits" } [dependencies.prometheus] git = "https://github.com/pingcap/rust-prometheus.git" diff --git a/components/engine/src/iterable.rs b/components/engine/src/iterable.rs index af2e026e82e..885eda33029 100644 --- a/components/engine/src/iterable.rs +++ b/components/engine/src/iterable.rs @@ -5,141 +5,36 @@ pub use crate::rocks::{DBIterator, ReadOptions, DB}; use crate::Result; use tikv_util::keybuilder::KeyBuilder; -#[derive(Clone, PartialEq)] -enum SeekMode { - TotalOrder, - Prefix, -} +pub use engine_traits::IterOptions as IterOption; +pub use engine_traits::SeekMode; -pub struct IterOption { - lower_bound: Option, - upper_bound: Option, - prefix_same_as_start: bool, - fill_cache: bool, - // only supported when Titan enabled, otherwise it doesn't take effect. - titan_key_only: bool, - seek_mode: SeekMode, +pub trait IterOptionsExt { + fn build_read_opts(self) -> ReadOptions; } -impl IterOption { - pub fn new( - lower_bound: Option, - upper_bound: Option, - fill_cache: bool, - ) -> IterOption { - IterOption { - lower_bound, - upper_bound, - prefix_same_as_start: false, - fill_cache, - titan_key_only: false, - seek_mode: SeekMode::TotalOrder, - } - } - - #[inline] - pub fn use_prefix_seek(mut self) -> IterOption { - self.seek_mode = SeekMode::Prefix; - self - } - - #[inline] - pub fn total_order_seek_used(&self) -> bool { - self.seek_mode == SeekMode::TotalOrder - } - - #[inline] - pub fn fill_cache(&mut self, v: bool) { - self.fill_cache = v; - } - - #[inline] - pub fn titan_key_only(&mut self, v: bool) { - self.titan_key_only = v; - } - - #[inline] - pub fn lower_bound(&self) -> Option<&[u8]> { - self.lower_bound.as_ref().map(|v| v.as_slice()) - } - - #[inline] - pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { - let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); - self.lower_bound = Some(builder); - } - - pub fn set_vec_lower_bound(&mut self, bound: Vec) { - self.lower_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); - } - - pub fn set_lower_bound_prefix(&mut self, prefix: &[u8]) { - if let Some(ref mut builder) = self.lower_bound { - builder.set_prefix(prefix); - } - } - - #[inline] - pub fn upper_bound(&self) -> Option<&[u8]> { - self.upper_bound.as_ref().map(|v| v.as_slice()) - } - - #[inline] - pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { - let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); - self.upper_bound = Some(builder); - } - - pub fn set_vec_upper_bound(&mut self, bound: Vec) { - self.upper_bound = Some(KeyBuilder::from_vec(bound, 0, 0)); - } - - pub fn set_upper_bound_prefix(&mut self, prefix: &[u8]) { - if let Some(ref mut builder) = self.upper_bound { - builder.set_prefix(prefix); - } - } - - #[inline] - pub fn set_prefix_same_as_start(mut self, enable: bool) -> IterOption { - self.prefix_same_as_start = enable; - self - } - - pub fn build_read_opts(self) -> ReadOptions { +impl IterOptionsExt for IterOption { + fn build_read_opts(self) -> ReadOptions { let mut opts = ReadOptions::new(); - opts.fill_cache(self.fill_cache); - if self.titan_key_only { + opts.fill_cache(self.fill_cache()); + if self.key_only() { opts.set_titan_key_only(true); } if self.total_order_seek_used() { opts.set_total_order_seek(true); - } else if self.prefix_same_as_start { + } else if self.prefix_same_as_start() { opts.set_prefix_same_as_start(true); } - if let Some(builder) = self.lower_bound { - opts.set_iterate_lower_bound(builder.build()); + let (lower, upper) = self.build_bounds(); + if let Some(lower) = lower { + opts.set_iterate_lower_bound(lower); } - if let Some(builder) = self.upper_bound { - opts.set_iterate_upper_bound(builder.build()); + if let Some(upper) = upper { + opts.set_iterate_upper_bound(upper); } opts } } -impl Default for IterOption { - fn default() -> IterOption { - IterOption { - lower_bound: None, - upper_bound: None, - prefix_same_as_start: false, - fill_cache: true, - titan_key_only: false, - seek_mode: SeekMode::TotalOrder, - } - } -} - // TODO: refactor this trait into rocksdb trait. pub trait Iterable { fn new_iterator(&self, iter_opt: IterOption) -> DBIterator<&DB>; diff --git a/components/engine/src/rocks/db.rs b/components/engine/src/rocks/db.rs index dd2c6b0f1c2..6fb53659579 100644 --- a/components/engine/src/rocks/db.rs +++ b/components/engine/src/rocks/db.rs @@ -3,6 +3,7 @@ use std::option::Option; use super::{util, DBIterator, DBVector, WriteBatch, DB}; +use crate::iterable::IterOptionsExt; use crate::{IterOption, Iterable, Mutable, Peekable, Result}; impl Peekable for DB { diff --git a/components/engine/src/rocks/snapshot.rs b/components/engine/src/rocks/snapshot.rs index 829f1fcfbb5..e525cd0c204 100644 --- a/components/engine/src/rocks/snapshot.rs +++ b/components/engine/src/rocks/snapshot.rs @@ -6,8 +6,10 @@ use std::option::Option; use std::sync::Arc; use super::{CFHandle, DBVector, ReadOptions, UnsafeSnap, DB}; +use crate::iterable::IterOptionsExt; use crate::{DBIterator, Error, IterOption, Iterable, Peekable, Result}; +#[repr(C)] // Guarantee same representation as in engine_rocks pub struct Snapshot { db: Arc, snap: UnsafeSnap, @@ -43,23 +45,6 @@ impl Snapshot { pub fn get_db(&self) -> Arc { Arc::clone(&self.db) } - - pub fn db_iterator(&self, iter_opt: IterOption) -> DBIterator> { - let mut opt = iter_opt.build_read_opts(); - unsafe { - opt.set_snapshot(&self.snap); - } - DBIterator::new(Arc::clone(&self.db), opt) - } - - pub fn db_iterator_cf(&self, cf: &str, iter_opt: IterOption) -> Result>> { - let handle = super::util::get_cf_handle(&self.db, cf)?; - let mut opt = iter_opt.build_read_opts(); - unsafe { - opt.set_snapshot(&self.snap); - } - Ok(DBIterator::new_cf(Arc::clone(&self.db), handle, opt)) - } } impl Debug for Snapshot { @@ -77,6 +62,7 @@ impl Drop for Snapshot { } #[derive(Debug, Clone)] +#[repr(transparent)] // Guarantee same representation as in engine_rocks pub struct SyncSnapshot(Arc); impl Deref for SyncSnapshot { diff --git a/components/engine/src/util.rs b/components/engine/src/util.rs index 0bbe6ff5423..47e512e1b50 100644 --- a/components/engine/src/util.rs +++ b/components/engine/src/util.rs @@ -47,7 +47,7 @@ pub fn delete_all_in_range_cf( if db.is_titan() { // Cause DeleteFilesInRange may expose old blob index keys, setting key only for Titan // to avoid referring to missing blob files. - iter_opt.titan_key_only(true); + iter_opt.set_key_only(true); } let mut it = db.new_iterator_cf(cf, iter_opt)?; it.seek(start_key.into()); diff --git a/components/engine_rocks/Cargo.toml b/components/engine_rocks/Cargo.toml index fc2ba6d1eb1..0790e98ef54 100644 --- a/components/engine_rocks/Cargo.toml +++ b/components/engine_rocks/Cargo.toml @@ -17,6 +17,7 @@ quick-error = "1.2.2" lazy_static = "1.3" slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "0e23a5baff302a9d7bccd85f8f31e43339c2f2c1" } +static_assertions = "1.1.0" time = "0.1" sys-info = "0.5.7" tikv_alloc = { path = "../tikv_alloc" } diff --git a/components/engine_rocks/src/compat.rs b/components/engine_rocks/src/compat.rs new file mode 100644 index 00000000000..5001327534b --- /dev/null +++ b/components/engine_rocks/src/compat.rs @@ -0,0 +1,47 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::engine::RocksEngine; +use crate::snapshot::RocksSnapshot; +use crate::snapshot::RocksSyncSnapshot; +use engine::Snapshot as RawSnapshot; +use engine::SyncSnapshot as RawSyncSnapshot; +use engine::DB; +use std::sync::Arc; + +/// A trait to enter the world of engine traits from a raw `Arc` +/// with as little syntax as possible. +/// +/// This will be used during the transition from RocksDB to the +/// `KvEngine` abstraction and then discarded. +pub trait Compat { + type Other; + + fn c(&self) -> &Self::Other; +} + +impl Compat for Arc { + type Other = RocksEngine; + + #[inline] + fn c(&self) -> &RocksEngine { + RocksEngine::from_ref(self) + } +} + +impl Compat for RawSnapshot { + type Other = RocksSnapshot; + + #[inline] + fn c(&self) -> &RocksSnapshot { + RocksSnapshot::from_ref(self) + } +} + +impl Compat for RawSyncSnapshot { + type Other = RocksSyncSnapshot; + + #[inline] + fn c(&self) -> &RocksSyncSnapshot { + RocksSyncSnapshot::from_ref(self) + } +} diff --git a/components/engine_rocks/src/db_vector.rs b/components/engine_rocks/src/db_vector.rs new file mode 100644 index 00000000000..3a7fa4fa7ac --- /dev/null +++ b/components/engine_rocks/src/db_vector.rs @@ -0,0 +1,36 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::DBVector; +use rocksdb::DBVector as RawDBVector; +use std::fmt::{self, Debug, Formatter}; +use std::ops::Deref; + +pub struct RocksDBVector(RawDBVector); + +impl RocksDBVector { + pub fn from_raw(raw: RawDBVector) -> RocksDBVector { + RocksDBVector(raw) + } +} + +impl DBVector for RocksDBVector {} + +impl Deref for RocksDBVector { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.0 + } +} + +impl Debug for RocksDBVector { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "{:?}", &**self) + } +} + +impl<'a> PartialEq<&'a [u8]> for RocksDBVector { + fn eq(&self, rhs: &&[u8]) -> bool { + **rhs == **self + } +} diff --git a/components/engine_rocks/src/engine.rs b/components/engine_rocks/src/engine.rs index 16eb9b2c246..896d200b485 100644 --- a/components/engine_rocks/src/engine.rs +++ b/components/engine_rocks/src/engine.rs @@ -9,9 +9,10 @@ use engine_traits::{ }; use rocksdb::{DBIterator, Writable, DB}; +use crate::db_vector::RocksDBVector; use crate::options::{RocksReadOptions, RocksWriteOptions}; use crate::util::get_cf_handle; -use crate::{RocksEngineIterator, Snapshot}; +use crate::{RocksEngineIterator, RocksSnapshot}; #[derive(Clone, Debug)] #[repr(transparent)] @@ -48,7 +49,7 @@ impl RocksEngine { } impl KvEngine for RocksEngine { - type Snapshot = Snapshot; + type Snapshot = RocksSnapshot; type WriteBatch = crate::WriteBatch; fn write_opt(&self, opts: &WriteOptions, wb: &Self::WriteBatch) -> Result<()> { @@ -69,8 +70,8 @@ impl KvEngine for RocksEngine { Self::WriteBatch::new(Arc::clone(&self.0)) } - fn snapshot(&self) -> Snapshot { - Snapshot::new(self.0.clone()) + fn snapshot(&self) -> RocksSnapshot { + RocksSnapshot::new(self.0.clone()) } fn sync(&self) -> Result<()> { @@ -85,7 +86,7 @@ impl KvEngine for RocksEngine { impl Iterable for RocksEngine { type Iterator = RocksEngineIterator; - fn iterator_opt(&self, opts: &IterOptions) -> Result { + fn iterator_opt(&self, opts: IterOptions) -> Result { let opt: RocksReadOptions = opts.into(); Ok(RocksEngineIterator::from_raw(DBIterator::new( self.0.clone(), @@ -93,7 +94,7 @@ impl Iterable for RocksEngine { ))) } - fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result { + fn iterator_cf_opt(&self, cf: &str, opts: IterOptions) -> Result { let handle = get_cf_handle(&self.0, cf)?; let opt: RocksReadOptions = opts.into(); Ok(RocksEngineIterator::from_raw(DBIterator::new_cf( @@ -105,17 +106,24 @@ impl Iterable for RocksEngine { } impl Peekable for RocksEngine { - fn get_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>> { + type DBVector = RocksDBVector; + + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { let opt: RocksReadOptions = opts.into(); let v = self.0.get_opt(key, &opt.into_raw())?; - Ok(v.map(|v| v.to_vec())) + Ok(v.map(RocksDBVector::from_raw)) } - fn get_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> Result>> { + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result> { let opt: RocksReadOptions = opts.into(); let handle = get_cf_handle(&self.0, cf)?; let v = self.0.get_cf_opt(handle, key, &opt.into_raw())?; - Ok(v.map(|v| v.to_vec())) + Ok(v.map(RocksDBVector::from_raw)) } } @@ -147,7 +155,7 @@ mod tests { use std::sync::Arc; use tempfile::Builder; - use crate::{RocksEngine, Snapshot}; + use crate::{RocksEngine, RocksSnapshot}; #[test] fn test_base() { @@ -197,9 +205,9 @@ mod tests { engine.put(b"k1", b"v1").unwrap(); engine.put_cf(cf, b"k1", b"v2").unwrap(); - assert_eq!(&*engine.get(b"k1").unwrap().unwrap(), b"v1"); - assert!(engine.get_cf("foo", b"k1").is_err()); - assert_eq!(&*engine.get_cf(cf, b"k1").unwrap().unwrap(), b"v2"); + assert_eq!(&*engine.get_value(b"k1").unwrap().unwrap(), b"v1"); + assert!(engine.get_value_cf("foo", b"k1").is_err()); + assert_eq!(&*engine.get_value_cf(cf, b"k1").unwrap().unwrap(), b"v2"); } #[test] @@ -264,7 +272,7 @@ mod tests { assert_eq!(data.len(), 1); - let snap = Snapshot::new(engine.get_sync_db()); + let snap = RocksSnapshot::new(engine.get_sync_db()); engine.put(b"a3", b"v3").unwrap(); assert!(engine.seek(b"a3").unwrap().is_some()); diff --git a/components/engine_rocks/src/engine_iterator.rs b/components/engine_rocks/src/engine_iterator.rs index d61b5835b7d..cce2a16297a 100644 --- a/components/engine_rocks/src/engine_iterator.rs +++ b/components/engine_rocks/src/engine_iterator.rs @@ -34,12 +34,12 @@ impl engine_traits::Iterator for RocksEngineIterator { self.0.next() } - fn key(&self) -> Result<&[u8]> { - Ok(self.0.key()) + fn key(&self) -> &[u8] { + self.0.key() } - fn value(&self) -> Result<&[u8]> { - Ok(self.0.value()) + fn value(&self) -> &[u8] { + self.0.value() } fn valid(&self) -> bool { diff --git a/components/engine_rocks/src/import.rs b/components/engine_rocks/src/import.rs index 95206fb2e8e..e784a7259da 100644 --- a/components/engine_rocks/src/import.rs +++ b/components/engine_rocks/src/import.rs @@ -188,7 +188,10 @@ mod tests { fn check_db_with_kvs(db: &RocksEngine, cf: &str, kvs: &[(&str, &str)]) { for &(k, v) in kvs { - assert_eq!(db.get_cf(cf, k.as_bytes()).unwrap().unwrap(), v.as_bytes()); + assert_eq!( + db.get_value_cf(cf, k.as_bytes()).unwrap().unwrap(), + v.as_bytes() + ); } } diff --git a/components/engine_rocks/src/lib.rs b/components/engine_rocks/src/lib.rs index 5ab542f9a18..839dde46068 100644 --- a/components/engine_rocks/src/lib.rs +++ b/components/engine_rocks/src/lib.rs @@ -25,6 +25,8 @@ mod cf_options; pub use crate::cf_options::*; mod db_options; pub use crate::db_options::*; +mod db_vector; +pub use crate::db_vector::*; mod engine; pub use crate::engine::*; mod import; @@ -43,3 +45,6 @@ pub use crate::engine_iterator::*; mod options; pub mod util; + +mod compat; +pub use compat::*; diff --git a/components/engine_rocks/src/options.rs b/components/engine_rocks/src/options.rs index 6f1cc51da82..ffc6c8ce265 100644 --- a/components/engine_rocks/src/options.rs +++ b/components/engine_rocks/src/options.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use engine::IterOptionsExt; use rocksdb::{ReadOptions as RawReadOptions, WriteOptions as RawWriteOptions}; pub struct RocksReadOptions(RawReadOptions); @@ -46,28 +47,7 @@ impl From<&engine_traits::WriteOptions> for RocksWriteOptions { impl From for RocksReadOptions { fn from(opts: engine_traits::IterOptions) -> Self { - let mut r = RawReadOptions::default(); - r.fill_cache(opts.fill_cache()); - if opts.key_only() { - r.set_titan_key_only(true); - } - if opts.total_order_seek_used() { - r.set_total_order_seek(true); - } else if opts.prefix_same_as_start() { - r.set_prefix_same_as_start(true); - } - if let Some(builder) = opts.lower_bound { - r.set_iterate_lower_bound(builder.build()); - } - if let Some(builder) = opts.upper_bound { - r.set_iterate_upper_bound(builder.build()); - } + let r = opts.build_read_opts(); RocksReadOptions(r) } } - -impl From<&engine_traits::IterOptions> for RocksReadOptions { - fn from(opts: &engine_traits::IterOptions) -> Self { - opts.clone().into() - } -} diff --git a/components/engine_rocks/src/snapshot.rs b/components/engine_rocks/src/snapshot.rs index d8e20d07b24..da1e8fc4fc6 100644 --- a/components/engine_rocks/src/snapshot.rs +++ b/components/engine_rocks/src/snapshot.rs @@ -4,35 +4,46 @@ use std::fmt::{self, Debug, Formatter}; use std::ops::Deref; use std::sync::Arc; -use engine_traits::{self, IterOptions, Iterable, Peekable, ReadOptions, Result}; +use engine::rocks::Snapshot as RawSnapshot; +use engine::rocks::SyncSnapshot as RawSyncSnapshot; +use engine_traits::{self, IterOptions, Iterable, Peekable, ReadOptions, Result, Snapshot}; use rocksdb::rocksdb_options::UnsafeSnap; use rocksdb::{DBIterator, DB}; +use crate::db_vector::RocksDBVector; use crate::options::RocksReadOptions; use crate::util::get_cf_handle; use crate::RocksEngineIterator; -pub struct Snapshot { +#[repr(C)] // Guarantee same representation as in engine/rocks +pub struct RocksSnapshot { // TODO: use &DB. db: Arc, snap: UnsafeSnap, } -unsafe impl Send for Snapshot {} -unsafe impl Sync for Snapshot {} +static_assertions::assert_eq_size!(RocksSnapshot, RawSnapshot); +static_assertions::assert_eq_align!(RocksSnapshot, RawSnapshot); -impl Snapshot { +unsafe impl Send for RocksSnapshot {} +unsafe impl Sync for RocksSnapshot {} + +impl RocksSnapshot { pub fn new(db: Arc) -> Self { unsafe { - Snapshot { + RocksSnapshot { snap: db.unsafe_snap(), db, } } } - pub fn into_sync(self) -> SyncSnapshot { - SyncSnapshot(Arc::new(self)) + pub fn from_ref(raw: &RawSnapshot) -> &RocksSnapshot { + unsafe { &*(raw as *const _ as *const _) } + } + + pub fn into_sync(self) -> RocksSyncSnapshot { + RocksSyncSnapshot(Arc::new(self)) } pub fn get_db(&self) -> &DB { @@ -40,19 +51,19 @@ impl Snapshot { } } -impl engine_traits::Snapshot for Snapshot { +impl Snapshot for RocksSnapshot { fn cf_names(&self) -> Vec<&str> { self.db.cf_names() } } -impl Debug for Snapshot { +impl Debug for RocksSnapshot { fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { write!(fmt, "Engine Snapshot Impl") } } -impl Drop for Snapshot { +impl Drop for RocksSnapshot { fn drop(&mut self) { unsafe { self.db.release_snap(&self.snap); @@ -60,10 +71,10 @@ impl Drop for Snapshot { } } -impl Iterable for Snapshot { +impl Iterable for RocksSnapshot { type Iterator = RocksEngineIterator; - fn iterator_opt(&self, opts: &IterOptions) -> Result { + fn iterator_opt(&self, opts: IterOptions) -> Result { let opt: RocksReadOptions = opts.into(); let mut opt = opt.into_raw(); unsafe { @@ -75,7 +86,7 @@ impl Iterable for Snapshot { ))) } - fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result { + fn iterator_cf_opt(&self, cf: &str, opts: IterOptions) -> Result { let opt: RocksReadOptions = opts.into(); let mut opt = opt.into_raw(); unsafe { @@ -90,18 +101,25 @@ impl Iterable for Snapshot { } } -impl Peekable for Snapshot { - fn get_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>> { +impl Peekable for RocksSnapshot { + type DBVector = RocksDBVector; + + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { let opt: RocksReadOptions = opts.into(); let mut opt = opt.into_raw(); unsafe { opt.set_snapshot(&self.snap); } let v = self.db.get_opt(key, &opt)?; - Ok(v.map(|v| v.to_vec())) + Ok(v.map(RocksDBVector::from_raw)) } - fn get_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> Result>> { + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result> { let opt: RocksReadOptions = opts.into(); let mut opt = opt.into_raw(); unsafe { @@ -109,23 +127,28 @@ impl Peekable for Snapshot { } let handle = get_cf_handle(self.db.as_ref(), cf)?; let v = self.db.get_cf_opt(handle, key, &opt)?; - Ok(v.map(|v| v.to_vec())) + Ok(v.map(RocksDBVector::from_raw)) } } #[derive(Clone, Debug)] -pub struct SyncSnapshot(Arc); +#[repr(transparent)] // Guarantee same representation as in engine/rocks +pub struct RocksSyncSnapshot(Arc); -impl Deref for SyncSnapshot { - type Target = Snapshot; +impl Deref for RocksSyncSnapshot { + type Target = RocksSnapshot; - fn deref(&self) -> &Snapshot { + fn deref(&self) -> &RocksSnapshot { &self.0 } } -impl SyncSnapshot { - pub fn new(db: Arc) -> SyncSnapshot { - SyncSnapshot(Arc::new(Snapshot::new(db))) +impl RocksSyncSnapshot { + pub fn new(db: Arc) -> RocksSyncSnapshot { + RocksSyncSnapshot(Arc::new(RocksSnapshot::new(db))) + } + + pub fn from_ref(raw: &RawSyncSnapshot) -> &RocksSyncSnapshot { + unsafe { &*(raw as *const _ as *const _) } } } diff --git a/components/engine_rocks/src/sst.rs b/components/engine_rocks/src/sst.rs index bd97fbdfe02..bc02569bda1 100644 --- a/components/engine_rocks/src/sst.rs +++ b/components/engine_rocks/src/sst.rs @@ -54,7 +54,7 @@ impl SstReader for RocksSstReader { impl Iterable for RocksSstReader { type Iterator = RocksSstIterator; - fn iterator_opt(&self, opts: &IterOptions) -> Result { + fn iterator_opt(&self, opts: IterOptions) -> Result { let opt: RocksReadOptions = opts.into(); let opt = opt.into_raw(); Ok(RocksSstIterator(SstFileReader::iter_opt_rc( @@ -63,7 +63,7 @@ impl Iterable for RocksSstReader { ))) } - fn iterator_cf_opt(&self, _opts: &IterOptions, _cf: &str) -> Result { + fn iterator_cf_opt(&self, _cf: &str, _opts: IterOptions) -> Result { unimplemented!() // FIXME: What should happen here? } } @@ -90,12 +90,12 @@ impl Iterator for RocksSstIterator { self.0.next() } - fn key(&self) -> Result<&[u8]> { - Ok(self.0.key()) + fn key(&self) -> &[u8] { + self.0.key() } - fn value(&self) -> Result<&[u8]> { - Ok(self.0.value()) + fn value(&self) -> &[u8] { + self.0.value() } fn valid(&self) -> bool { diff --git a/components/engine_traits/src/db_vector.rs b/components/engine_traits/src/db_vector.rs new file mode 100644 index 00000000000..ec06dae4b7d --- /dev/null +++ b/components/engine_traits/src/db_vector.rs @@ -0,0 +1,10 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::fmt::Debug; +use std::ops::Deref; + +/// A type that holds buffers queried from the database. +/// +/// The database may optimize this type to be a view into +/// its own cache. +pub trait DBVector: Debug + Deref + for<'a> PartialEq<&'a [u8]> {} diff --git a/components/engine_traits/src/iterable.rs b/components/engine_traits/src/iterable.rs index 287633d919f..788ab9cb568 100644 --- a/components/engine_traits/src/iterable.rs +++ b/components/engine_traits/src/iterable.rs @@ -4,12 +4,6 @@ use tikv_util::keybuilder::KeyBuilder; use crate::*; -#[derive(Clone, PartialEq)] -pub enum SeekMode { - TotalOrder, - Prefix, -} - pub enum SeekKey<'a> { Start, End, @@ -23,13 +17,13 @@ pub trait Iterator { fn prev(&mut self) -> bool; fn next(&mut self) -> bool; - fn key(&self) -> Result<&[u8]>; - fn value(&self) -> Result<&[u8]>; + fn key(&self) -> &[u8]; + fn value(&self) -> &[u8]; fn kv(&self) -> Option<(Vec, Vec)> { if self.valid() { - let k = self.key().ok()?; - let v = self.value().ok()?; + let k = self.key(); + let v = self.value(); Some((k.to_vec(), v.to_vec())) } else { None @@ -50,15 +44,15 @@ pub trait Iterator { pub trait Iterable { type Iterator: Iterator; - fn iterator_opt(&self, opts: &IterOptions) -> Result; - fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result; + fn iterator_opt(&self, opts: IterOptions) -> Result; + fn iterator_cf_opt(&self, cf: &str, opts: IterOptions) -> Result; fn iterator(&self) -> Result { - self.iterator_opt(&IterOptions::default()) + self.iterator_opt(IterOptions::default()) } fn iterator_cf(&self, cf: &str) -> Result { - self.iterator_cf_opt(&IterOptions::default(), cf) + self.iterator_cf_opt(cf, IterOptions::default()) } fn scan(&self, start_key: &[u8], end_key: &[u8], fill_cache: bool, f: F) -> Result<()> @@ -68,7 +62,7 @@ pub trait Iterable { let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); - scan_impl(self.iterator_opt(&iter_opt)?, start_key, f) + scan_impl(self.iterator_opt(iter_opt)?, start_key, f) } // like `scan`, only on a specific column family. @@ -86,7 +80,7 @@ pub trait Iterable { let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0); let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0); let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache); - scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f) + scan_impl(self.iterator_cf_opt(cf, iter_opt)?, start_key, f) } // Seek the first key >= given key, if not found, return None. @@ -94,7 +88,7 @@ pub trait Iterable { let mut iter = self.iterator()?; iter.seek(SeekKey::Key(key)); if iter.valid() { - Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + Ok(Some((iter.key().to_vec(), iter.value().to_vec()))) } else { Ok(None) } @@ -105,7 +99,7 @@ pub trait Iterable { let mut iter = self.iterator_cf(cf)?; iter.seek(SeekKey::Key(key)); if iter.valid() { - Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec()))) + Ok(Some((iter.key().to_vec(), iter.value().to_vec()))) } else { Ok(None) } @@ -119,7 +113,7 @@ where { it.seek(SeekKey::Key(start_key)); while it.valid() { - let r = f(it.key()?, it.value()?)?; + let r = f(it.key(), it.value())?; if !r || !it.next() { break; } @@ -143,3 +137,9 @@ impl<'a, I: Iterator> std::iter::Iterator for StdIterator<'a, I> { kv } } + +impl<'a> From<&'a [u8]> for SeekKey<'a> { + fn from(bs: &'a [u8]) -> SeekKey { + SeekKey::Key(bs) + } +} diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index a5e1b8b4231..16dbf84c704 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -2,12 +2,71 @@ //! A generic TiKV storage engine //! -//! This is a work-in-progress attempt to abstract all the features -//! needed by TiKV to persist its data. +//! This is a work-in-progress attempt to abstract all the features needed by +//! TiKV to persist its data. //! -//! This crate must not have any transitive dependencies on RocksDB. +//! This crate must not have any transitive dependencies on RocksDB. The RocksDB +//! implementation is in the `engine_rocks` crate. //! -//! # Notes +//! This documentation contains a description of the porting process, current +//! design decisions and design guidelines, and refactoring tips. +//! +//! +//! # The porting process +//! +//! These are some guidelines that seem to make the porting managable. As the +//! process continues new strategies are discovered and written here. This is a +//! big refactoring and will take many monthse. +//! +//! Refactoring is a craft, not a science, and figuring out how to overcome any +//! particular situation takes experience and intuation, but these principles +//! can help. +//! +//! A guiding principle is to do one thing at a time. In particular, don't +//! redesign while encapsulating. +//! +//! The port is happening in stages: +//! +//! 1) Migrating the `engine` crate +//! 2) Eliminating the `rocksdb` dep from TiKV +//! 3) "Pulling up" the generic abstractions though TiKV +//! +//! These stages are described in more detail: +//! +//! ## 1) Migrating the `engine` crate +//! +//! Migrating the `engine` crate. The engine crate was an earlier attempt to +//! abstract the storage engine. Much of its structure is duplicated +//! near-identically in engine_traits, the difference being that engine_traits +//! has no RocksDB dependencies. Having no RocksDB dependencies makes it trivial +//! to guarantee that the abstractions are truly abstract. +//! +//! During this stage, we will eliminate the `engine` trait to reduce code +//! duplication. We do this by identifying a small subsystem within `engine`, +//! duplicating it within `engine_traits` and `engine_rocks`, deleting the code +//! from `engine`, and fixing all the callers to work with the abstracted +//! implementation. +//! +//! At the end of this stage the `engine` dependency will be deleted, but +//! TiKV will still depend on the concrete RocksDB implementations from +//! `engine_rocks`, as well as the raw API's from the `rocksdb` crate. +//! +//! ## 2) Eliminating the `rocksdb` dep from TiKV +//! +//! TiKV uses RocksDB via both the `rocksdb` crate and the `engine` crate. +//! During this stage we need to convert all callers to use the `engine_rocks` +//! crate instead. +//! +//! ## 3) "Pulling up" the generic abstractions through TiKv +//! +//! Finally, with all of TiKV using the `engine_traits` traits in conjunction +//! with the concrete `engine_rocks` types, we can push generic type parameters +//! up through the application. Then we will remove the concrete `engine_rocks` +//! dependency from TiKV so that it is impossible to re-introduce +//! engine-specific code again. +//! +//! +//! # Design notes //! //! - `KvEngine` is the main engine trait. It requires many other traits, which //! have many other associated types that implement yet more traits. @@ -17,13 +76,39 @@ //! a trait, and an "extension" trait that associates that type with `KvEngine`, //! which is part of `KvEngine's trait requirements. //! -//! - For now, for simplicity, all extension traits are required +//! - For now, for simplicity, all extension traits are required by `KvEngine`. +//! In the future it may be feasible to separate them for engines with +//! different feature sets. //! //! - Associated types generally have the same name as the trait they -//! are required to implement. +//! are required to implement. Engine extensions generally have the same +//! name suffixed with `Ext`. Concrete implementations usually have the +//! same name prefixed with the database name, i.e. `Rocks`. +//! +//! Example: +//! +//! ``` +//! // in engine_traits +//! +//! trait IOLimiterExt { +//! type IOLimiter: IOLimiter; +//! } +//! +//! trait IOLimiter { } +//! ``` +//! +//! ``` +//! // in engine_rust +//! +//! impl IOLimiterExt for RocksEngine { +//! type IOLimiter = RocksIOLimiter; +//! } +//! +//! impl IOLimiter for RocksIOLimiter { } +//! ``` //! //! - All engines use the same error type, defined in this crate. Thus -//! engine-specefic type information is boxed and hidden. +//! engine-specific type information is boxed and hidden. //! //! - `KvEngine` is a factory type for some of its associated types, but not //! others. For now, use factory methods when RocksDB would require factory @@ -32,13 +117,30 @@ //! use a standard new method). If future engines require factory methods, the //! traits can be converted then. //! -//! # Porting suggestions +//! - Types that require a handle to the engine (or some other "parent" type) +//! do so with either Rc or Arc. An example is EngineIterator. The reason +//! for this is that associated types cannot contain lifetimes. That requires +//! "generic associated types". See +//! +//! - https://github.com/rust-lang/rfcs/pull/1598 +//! - https://github.com/rust-lang/rust/issues/44265 +//! +//! +//! # Refactoring tips //! //! - Port modules with the fewest RocksDB dependencies at a time, modifying //! those modules's callers to convert to and from the engine traits as //! needed. Move in and out of the engine_traits world with the //! `RocksDB::from_ref` and `RocksDB::as_inner` methods. //! +//! - Down follow the type system too far "down the rabbit hole". When you see +//! that another subsystem is blocking you from refactoring the system you +//! are trying to refactor, stop, stash your changes, and focus on the other +//! system instead. +//! +//! - You will through away branches that lead to dead ends. Learn from the +//! experience and try again from a different angle. +//! //! - For now, use the same APIs as the RocksDB bindings, as methods //! on the various engine traits, and with this crate's error type. //! @@ -49,6 +151,19 @@ //! - Port methods directly from the existing `engine` crate by re-implementing //! it in engine_traits and engine_rocks, replacing all the callers with calls //! into the traits, then delete the versions in the `engine` crate. +//! +//! - Use the .c() method from engine_rocks::compat::Compat to get a +//! KvEngine reference from Arc in the fewest characters. It also +//! works on Snapshot, and can be adapted to other types. +//! +//! - Use tikv::into_other::IntoOther to adapt between error types of dependencies +//! that are not themselves interdependent. E.g. raft::Error can be created +//! from engine_traits::Error even though neither `raft` tor `engine_traits` +//! know about each other. +//! +//! - "Plain old data" types in `engine` can be moved directly into +//! `engine_traits` and reexported from `engine` to ease the transition. +//! Likewise `engine_rocks` can temporarily call code from inside `engine`. #![recursion_limit = "200"] @@ -69,6 +184,8 @@ mod cf_options; pub use crate::cf_options::*; mod db_options; pub use crate::db_options::*; +mod db_vector; +pub use crate::db_vector::*; mod engine; pub use crate::engine::*; mod import; diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index 0fce9469186..76ebdae6e68 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -1,8 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use tikv_util::keybuilder::KeyBuilder; -use crate::SeekMode; - #[derive(Clone)] pub struct ReadOptions {} @@ -43,12 +41,18 @@ impl Default for WriteOptions { } } -#[derive(Clone)] +#[derive(Clone, PartialEq)] +pub enum SeekMode { + TotalOrder, + Prefix, +} + pub struct IterOptions { - pub lower_bound: Option, - pub upper_bound: Option, + lower_bound: Option, + upper_bound: Option, prefix_same_as_start: bool, fill_cache: bool, + // only supported when Titan enabled, otherwise it doesn't take effect. key_only: bool, seek_mode: SeekMode, } @@ -69,35 +73,43 @@ impl IterOptions { } } + #[inline] pub fn use_prefix_seek(mut self) -> IterOptions { self.seek_mode = SeekMode::Prefix; self } + #[inline] pub fn total_order_seek_used(&self) -> bool { self.seek_mode == SeekMode::TotalOrder } - pub fn set_fill_cache(&mut self, v: bool) { - self.fill_cache = v; - } - + #[inline] pub fn fill_cache(&self) -> bool { self.fill_cache } - pub fn set_key_only(&mut self, v: bool) { - self.key_only = v; + #[inline] + pub fn set_fill_cache(&mut self, v: bool) { + self.fill_cache = v; } + #[inline] pub fn key_only(&self) -> bool { self.key_only } + #[inline] + pub fn set_key_only(&mut self, v: bool) { + self.key_only = v; + } + + #[inline] pub fn lower_bound(&self) -> Option<&[u8]> { self.lower_bound.as_ref().map(|v| v.as_slice()) } + #[inline] pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); self.lower_bound = Some(builder); @@ -113,10 +125,12 @@ impl IterOptions { } } + #[inline] pub fn upper_bound(&self) -> Option<&[u8]> { self.upper_bound.as_ref().map(|v| v.as_slice()) } + #[inline] pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) { let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0); self.upper_bound = Some(builder); @@ -132,13 +146,23 @@ impl IterOptions { } } - pub fn set_prefix_same_as_start(&mut self, enable: bool) { - self.prefix_same_as_start = enable; + #[inline] + pub fn build_bounds(self) -> (Option>, Option>) { + let lower = self.lower_bound.map(KeyBuilder::build); + let upper = self.upper_bound.map(KeyBuilder::build); + (lower, upper) } + #[inline] pub fn prefix_same_as_start(&self) -> bool { self.prefix_same_as_start } + + #[inline] + pub fn set_prefix_same_as_start(mut self, enable: bool) -> IterOptions { + self.prefix_same_as_start = enable; + self + } } impl Default for IterOptions { @@ -147,7 +171,7 @@ impl Default for IterOptions { lower_bound: None, upper_bound: None, prefix_same_as_start: false, - fill_cache: false, + fill_cache: true, key_only: false, seek_mode: SeekMode::TotalOrder, } diff --git a/components/engine_traits/src/peekable.rs b/components/engine_traits/src/peekable.rs index 4fe3d6a2cef..ea2d378c9fd 100644 --- a/components/engine_traits/src/peekable.rs +++ b/components/engine_traits/src/peekable.rs @@ -3,19 +3,26 @@ use crate::*; pub trait Peekable { - fn get_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>>; - fn get_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> Result>>; + type DBVector: DBVector; - fn get(&self, key: &[u8]) -> Result>> { - self.get_opt(&ReadOptions::default(), key) + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>; + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result>; + + fn get_value(&self, key: &[u8]) -> Result> { + self.get_value_opt(&ReadOptions::default(), key) } - fn get_cf(&self, cf: &str, key: &[u8]) -> Result>> { - self.get_cf_opt(&ReadOptions::default(), cf, key) + fn get_value_cf(&self, cf: &str, key: &[u8]) -> Result> { + self.get_value_cf_opt(&ReadOptions::default(), cf, key) } fn get_msg(&self, key: &[u8]) -> Result> { - let value = self.get(key)?; + let value = self.get_value(key)?; if value.is_none() { return Ok(None); } @@ -30,7 +37,7 @@ pub trait Peekable { cf: &str, key: &[u8], ) -> Result> { - let value = self.get_cf(cf, key)?; + let value = self.get_value_cf(cf, key)?; if value.is_none() { return Ok(None); } diff --git a/components/engine_traits/src/snapshot.rs b/components/engine_traits/src/snapshot.rs index 174e903abd5..6026cb52bb5 100644 --- a/components/engine_traits/src/snapshot.rs +++ b/components/engine_traits/src/snapshot.rs @@ -1,8 +1,9 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::iterable::Iterable; use crate::peekable::Peekable; use std::fmt::Debug; -pub trait Snapshot: 'static + Peekable + Send + Sync + Debug { +pub trait Snapshot: 'static + Peekable + Iterable + Send + Sync + Debug { fn cf_names(&self) -> Vec<&str>; } diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 3963210b371..700ff25a324 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -188,7 +188,7 @@ impl SSTImporter { // the SST is empty, so no need to iterate at all (should be impossible?) return Ok(Some(meta.get_range().clone())); } - let start_key = keys::origin_key(iter.key()?); + let start_key = keys::origin_key(iter.key()); if is_before_start_bound(start_key, &range_start) { // SST's start is before the range to consume, so needs to iterate to skip over return Ok(None); @@ -197,7 +197,7 @@ impl SSTImporter { // seek to end and fetch the last (inclusive) key of the SST. iter.seek(SeekKey::End); - let last_key = keys::origin_key(iter.key()?); + let last_key = keys::origin_key(iter.key()); if is_after_end_bound(last_key, &range_end) { // SST's end is after the range to consume return Ok(None); @@ -228,7 +228,7 @@ impl SSTImporter { Bound::Excluded(_) => unreachable!(), }; while iter.valid() { - let old_key = keys::origin_key(iter.key()?); + let old_key = keys::origin_key(iter.key()); if is_after_end_bound(old_key, &range_end) { break; } @@ -242,7 +242,7 @@ impl SSTImporter { key.truncate(new_prefix_data_key_len); key.extend_from_slice(&old_key[old_prefix.len()..]); - sst_writer.put(&key, iter.value()?)?; + sst_writer.put(&key, iter.value())?; iter.next(); if first_key.is_none() { first_key = Some(keys::origin_key(&key).to_vec()); diff --git a/components/test_sst_importer/src/lib.rs b/components/test_sst_importer/src/lib.rs index 7b329321edd..8a76a8a75f3 100644 --- a/components/test_sst_importer/src/lib.rs +++ b/components/test_sst_importer/src/lib.rs @@ -46,7 +46,7 @@ where { for i in range.0..range.1 { let k = keys::data_key(&[i]); - assert_eq!(db.get(&k).unwrap().unwrap(), &[i]); + assert_eq!(db.get_value(&k).unwrap().unwrap(), &[i]); } } diff --git a/src/into_other.rs b/src/into_other.rs new file mode 100644 index 00000000000..8359ec5443f --- /dev/null +++ b/src/into_other.rs @@ -0,0 +1,35 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +// Various conversions between types that can't have their own +// interdependencies. These are used to convert between error_traits::Error and +// other Error's that error_traits can't depend on. + +use engine_traits::Error as EngineTraitsError; +use kvproto::errorpb::Error as ProtoError; +use raft::Error as RaftError; + +pub trait IntoOther { + fn into_other(self) -> O; +} + +impl IntoOther for EngineTraitsError { + fn into_other(self) -> ProtoError { + let mut errorpb = ProtoError::default(); + errorpb.set_message(format!("{}", self)); + + if let EngineTraitsError::NotInRange(key, region_id, start_key, end_key) = self { + errorpb.mut_key_not_in_region().set_key(key); + errorpb.mut_key_not_in_region().set_region_id(region_id); + errorpb.mut_key_not_in_region().set_start_key(start_key); + errorpb.mut_key_not_in_region().set_end_key(end_key); + } + + errorpb + } +} + +impl IntoOther for EngineTraitsError { + fn into_other(self) -> RaftError { + RaftError::Store(raft::StorageError::Other(self.into())) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1e807273ca2..9db86da7f97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ extern crate test; pub mod config; pub mod coprocessor; pub mod import; +pub mod into_other; pub mod raftstore; pub mod server; pub mod storage; diff --git a/src/raftstore/store/region_snapshot.rs b/src/raftstore/store/region_snapshot.rs index bb5a9b4656a..4c41c994085 100644 --- a/src/raftstore/store/region_snapshot.rs +++ b/src/raftstore/store/region_snapshot.rs @@ -1,10 +1,12 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use engine::rocks::{DBIterator, DBVector, SeekKey, TablePropertiesCollection, DB}; +use engine::rocks::{DBVector, TablePropertiesCollection, DB}; use engine::{ self, Error as EngineError, IterOption, Peekable, Result as EngineResult, Snapshot, SyncSnapshot, }; +use engine_rocks::{Compat, RocksEngineIterator}; +use engine_traits::SeekKey; use kvproto::metapb::Region; use std::sync::Arc; @@ -12,6 +14,7 @@ use crate::raftstore::store::keys::DATA_PREFIX_KEY; use crate::raftstore::store::{keys, util, PeerStorage}; use crate::raftstore::Result; use engine_traits::util::check_key_in_range; +use engine_traits::{Iterable, Iterator}; use tikv_util::keybuilder::KeyBuilder; use tikv_util::metrics::CRITICAL_ERROR; use tikv_util::{panic_when_unexpected_key_or_data, set_panic_mark}; @@ -200,7 +203,7 @@ impl Peekable for RegionSnapshot { /// iterate in the region. It behaves as if underlying /// db only contains one region. pub struct RegionIterator { - iter: DBIterator>, + iter: RocksEngineIterator, valid: bool, region: Arc, start_key: Vec, @@ -238,7 +241,10 @@ impl RegionIterator { update_upper_bound(&mut iter_opt, ®ion); let start_key = iter_opt.lower_bound().unwrap().to_vec(); let end_key = iter_opt.upper_bound().unwrap().to_vec(); - let iter = snap.db_iterator(iter_opt); + let iter = snap + .c() + .iterator_opt(iter_opt) + .expect("creating snapshot iterator"); // FIXME error handling RegionIterator { iter, valid: false, @@ -258,7 +264,10 @@ impl RegionIterator { update_upper_bound(&mut iter_opt, ®ion); let start_key = iter_opt.lower_bound().unwrap().to_vec(); let end_key = iter_opt.upper_bound().unwrap().to_vec(); - let iter = snap.db_iterator_cf(cf, iter_opt).unwrap(); + let iter = snap + .c() + .iterator_cf_opt(cf, iter_opt) + .expect("creating snapshot iterator"); // FIXME error handling RegionIterator { iter, valid: false, @@ -362,10 +371,7 @@ impl RegionIterator { #[inline] pub fn status(&self) -> Result<()> { - self.iter - .status() - .map_err(|e| EngineError::RocksDb(e)) - .map_err(From::from) + self.iter.status().map_err(From::from) } #[inline] diff --git a/src/server/debug.rs b/src/server/debug.rs index b4906a457ff..2fc4a96371c 100644 --- a/src/server/debug.rs +++ b/src/server/debug.rs @@ -13,6 +13,7 @@ use engine::rocks::{ CompactOptions, DBBottommostLevelCompaction, DBIterator as RocksIterator, Kv, ReadOptions, SeekKey, Writable, WriteBatch, WriteOptions, DB, }; +use engine::IterOptionsExt; use engine::{self, Engines, IterOption, Iterable, Mutable, Peekable}; use engine::{CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use kvproto::debugpb::{self, Db as DBType, Module}; diff --git a/src/storage/kv/mod.rs b/src/storage/kv/mod.rs index 1aaf3aa9328..a3f6d8af841 100644 --- a/src/storage/kv/mod.rs +++ b/src/storage/kv/mod.rs @@ -11,6 +11,7 @@ use engine::{CfName, CF_DEFAULT}; use kvproto::errorpb::Error as ErrorHeader; use kvproto::kvrpcpb::Context; +use crate::into_other::IntoOther; use crate::raftstore::coprocessor::SeekRegionCallback; use crate::storage::{Key, Value}; @@ -186,6 +187,12 @@ impl From for ErrorInner { } } +impl From for ErrorInner { + fn from(err: engine_traits::Error) -> ErrorInner { + ErrorInner::Request(err.into_other()) + } +} + impl ErrorInner { pub fn maybe_clone(&self) -> Option { match *self { diff --git a/src/storage/kv/rocksdb_engine.rs b/src/storage/kv/rocksdb_engine.rs index 1a8309b6fbb..e0cf593376d 100644 --- a/src/storage/kv/rocksdb_engine.rs +++ b/src/storage/kv/rocksdb_engine.rs @@ -9,11 +9,15 @@ use std::time::Duration; use engine::rocks; use engine::rocks::util::CFOptions; -use engine::rocks::{ColumnFamilyOptions, DBIterator, SeekKey, Writable, WriteBatch, DB}; +use engine::rocks::{ + ColumnFamilyOptions, DBIterator, SeekKey as DBSeekKey, Writable, WriteBatch, DB, +}; use engine::Engines; use engine::Error as EngineError; use engine::{CfName, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use engine::{IterOption, Peekable}; +use engine_rocks::{Compat, RocksEngineIterator}; +use engine_traits::{Iterable, Iterator, SeekKey}; use kvproto::kvrpcpb::Context; use tempfile::{Builder, TempDir}; @@ -291,7 +295,7 @@ impl Engine for RocksEngine { } impl Snapshot for RocksSnapshot { - type Iter = DBIterator>; + type Iter = RocksEngineIterator; fn get(&self, key: &Key) -> Result> { trace!("RocksSnapshot: get"; "key" => %key); @@ -307,7 +311,7 @@ impl Snapshot for RocksSnapshot { fn iter(&self, iter_opt: IterOption, mode: ScanMode) -> Result> { trace!("RocksSnapshot: create iterator"); - let iter = self.db_iterator(iter_opt); + let iter = self.c().iterator_opt(iter_opt)?; Ok(Cursor::new(iter, mode)) } @@ -318,11 +322,56 @@ impl Snapshot for RocksSnapshot { mode: ScanMode, ) -> Result> { trace!("RocksSnapshot: create cf iterator"); - let iter = self.db_iterator_cf(cf, iter_opt)?; + let iter = self.c().iterator_cf_opt(cf, iter_opt)?; Ok(Cursor::new(iter, mode)) } } +impl EngineIterator for RocksEngineIterator { + fn next(&mut self) -> bool { + Iterator::next(self) + } + + fn prev(&mut self) -> bool { + Iterator::prev(self) + } + + fn seek(&mut self, key: &Key) -> Result { + Ok(Iterator::seek(self, key.as_encoded().as_slice().into())) + } + + fn seek_for_prev(&mut self, key: &Key) -> Result { + Ok(Iterator::seek_for_prev( + self, + key.as_encoded().as_slice().into(), + )) + } + + fn seek_to_first(&mut self) -> bool { + Iterator::seek(self, SeekKey::Start) + } + + fn seek_to_last(&mut self) -> bool { + Iterator::seek(self, SeekKey::End) + } + + fn valid(&self) -> bool { + Iterator::valid(self) + } + + fn status(&self) -> Result<()> { + Iterator::status(self).map_err(From::from) + } + + fn key(&self) -> &[u8] { + Iterator::key(self) + } + + fn value(&self) -> &[u8] { + Iterator::value(self) + } +} + impl + Send> EngineIterator for DBIterator { fn next(&mut self) -> bool { DBIterator::next(self) @@ -344,11 +393,11 @@ impl + Send> EngineIterator for DBIterator { } fn seek_to_first(&mut self) -> bool { - DBIterator::seek(self, SeekKey::Start) + DBIterator::seek(self, DBSeekKey::Start) } fn seek_to_last(&mut self) -> bool { - DBIterator::seek(self, SeekKey::End) + DBIterator::seek(self, DBSeekKey::End) } fn valid(&self) -> bool { @@ -358,7 +407,7 @@ impl + Send> EngineIterator for DBIterator { fn status(&self) -> Result<()> { DBIterator::status(self) .map_err(|e| EngineError::RocksDb(e)) - .map_err(From::from) + .map_err(Error::from) } fn key(&self) -> &[u8] {