Skip to content

Commit

Permalink
add engine_traits component
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Sep 11, 2019
1 parent d2e3b6a commit a296cf8
Show file tree
Hide file tree
Showing 11 changed files with 994 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ engine = { path = "components/engine" }
tidb_query = { path = "components/tidb_query" }
pd_client = { path = "components/pd_client" }
keys = { path = "components/keys" }
engine_traits = { path = "components/engine_traits" }

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
Expand Down
40 changes: 40 additions & 0 deletions components/engine_traits/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "engine_traits"
version = "0.0.1"
edition = "2018"
publish = false

[features]
jemalloc = ["engine_rocksdb/jemalloc"]
portable = ["engine_rocksdb/portable"]
sse = ["engine_rocksdb/sse"]

[dependencies]
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
protobuf = "2"
raft = "0.6.0-alpha"
quick-error = "1.2.2"
crc = "1.8"
lazy_static = "1.3"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" }
time = "0.1"
sys-info = "0.5.7"
tikv_alloc = { path = "../tikv_alloc", default-features = false }
serde = "1.0"
serde_derive = "1.0"
toml = "0.4"
hex = "0.3"
tikv_util = { path = "../tikv_util" }

[dependencies.prometheus]
version = "0.4.2"
default-features = false
features = ["nightly"]

[dependencies.engine_rocksdb]
git = "https://github.com/pingcap/rust-rocksdb.git"
package = "rocksdb"

[dev-dependencies]
tempfile = "3.0"
11 changes: 11 additions & 0 deletions components/engine_traits/src/cf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

pub type CfName = &'static str;
pub const CF_DEFAULT: CfName = "default";
pub const CF_LOCK: CfName = "lock";
pub const CF_WRITE: CfName = "write";
pub const CF_RAFT: CfName = "raft";
// Cfs that should be very large generally.
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];
70 changes: 70 additions & 0 deletions components/engine_traits/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::Debug;
use std::path::Path;

use crate::*;

pub trait Snapshot: 'static + Peekable + Send + Sync + Debug {
fn cf_names(&self) -> Vec<&str>;
}

pub trait WriteBatch: Mutable {
fn data_size(&self) -> usize;
fn count(&self) -> usize;
fn is_empty(&self) -> bool;
fn clear(&self);

fn set_save_point(&mut self);
fn pop_save_point(&mut self) -> Result<()>;
fn rollback_to_save_point(&mut self) -> Result<()>;
}

pub trait KvEngine: Peekable + Mutable + Iterable + Send + Sync + Clone {
type Snap: Snapshot;
type Batch: WriteBatch;

fn write_opt(&self, opts: &WriteOptions, wb: &Self::Batch) -> Result<()>;
fn write(&self, wb: &Self::Batch) -> Result<()> {
self.write_opt(&WriteOptions::default(), wb)
}
fn write_batch(&self, cap: usize) -> Self::Batch;
fn snapshot(&self) -> Self::Snap;
fn sync(&self) -> Result<()>;
fn cf_names(&self) -> Vec<&str>;
fn delete_all_in_range(
&self,
t: DeleteRangeType,
start_key: &[u8],
end_key: &[u8],
) -> Result<()> {
if start_key >= end_key {
return Ok(());
}
for cf in self.cf_names() {
self.delete_all_in_range_cf(t, cf, start_key, end_key)?;
}
Ok(())
}
fn delete_all_in_range_cf(
&self,
t: DeleteRangeType,
cf: &str,
start_key: &[u8],
end_key: &[u8],
) -> Result<()>;
fn ingest_external_file_cf(
&self,
opts: &IngestExternalFileOptions,
cf: &str,
files: &[&str],
) -> Result<()>;

fn validate_file_for_ingestion<P: AsRef<Path>>(
&self,
cf: &str,
path: P,
expected_size: u64,
expected_checksum: u32,
) -> Result<()>;
}
66 changes: 66 additions & 0 deletions components/engine_traits/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::{error, result};

quick_error! {
#[derive(Debug)]
pub enum Error {
// Engine uses plain string as the error.
Engine(msg: String) {
from()
description("Storage Engine error")
display("Storage Engine {}", msg)
}
// FIXME: It should not know Region.
NotInRange( key: Vec<u8>, regoin_id: u64, start: Vec<u8>, end: Vec<u8>) {
description("Key is out of range")
display(
"Key {} is out of [region {}] [{}, {})",
hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end)
)
}
Protobuf(err: protobuf::ProtobufError) {
from()
cause(err)
description(err.description())
display("Protobuf {}", err)
}
Io(err: std::io::Error) {
from()
cause(err)
description(err.description())
display("Io {}", err)
}

Other(err: Box<dyn error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("{:?}", err)
}
}
}

pub type Result<T> = result::Result<T, Error>;

impl From<Error> for raft::Error {
fn from(err: Error) -> raft::Error {
raft::Error::Store(raft::StorageError::Other(err.into()))
}
}

impl From<Error> for kvproto::errorpb::Error {
fn from(err: Error) -> kvproto::errorpb::Error {
let mut errorpb = kvproto::errorpb::Error::default();
errorpb.set_message(format!("{}", err));

if let Error::NotInRange(key, region_id, start_key, end_key) = err {
errorpb.mut_key_not_in_region().set_key(key);
errorpb.mut_key_not_in_region().set_region_id(region_id);
errorpb.mut_key_not_in_region().set_start_key(start_key);
errorpb.mut_key_not_in_region().set_end_key(end_key);
}

errorpb
}
}
112 changes: 112 additions & 0 deletions components/engine_traits/src/iterable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use tikv_util::keybuilder::KeyBuilder;

use crate::*;

#[derive(Clone, PartialEq)]
pub enum SeekMode {
TotalOrder,
Prefix,
}

pub enum SeekKey<'a> {
Start,
End,
Key(&'a [u8]),
}

pub trait Iterator {
fn seek(&mut self, key: SeekKey) -> bool;
fn seek_for_prev(&mut self, key: SeekKey) -> bool;

fn prev(&mut self) -> bool;
fn next(&mut self) -> bool;

fn key(&self) -> Result<&[u8]>;
fn value(&self) -> Result<&[u8]>;

fn valid(&self) -> bool;
fn status(&self) -> Result<()>;
}

pub trait Iterable {
type Iter: Iterator;

fn iterator_opt(&self, opts: &IterOptions) -> Result<Self::Iter>;
fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result<Self::Iter>;

fn iterator(&self) -> Result<Self::Iter> {
self.iterator_opt(&IterOptions::default())
}

fn iterator_cf(&self, cf: &str) -> Result<Self::Iter> {
self.iterator_cf_opt(&IterOptions::default(), cf)
}

fn scan<F>(&self, start_key: &[u8], end_key: &[u8], fill_cache: bool, f: F) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0);
let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0);
let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache);
scan_impl(self.iterator_opt(&iter_opt)?, start_key, f)
}

// like `scan`, only on a specific column family.
fn scan_cf<F>(
&self,
cf: &str,
start_key: &[u8],
end_key: &[u8],
fill_cache: bool,
f: F,
) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0);
let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0);
let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache);
scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f)
}

// Seek the first key >= given key, if no found, return None.
fn seek(&self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut iter = self.iterator()?;
iter.seek(SeekKey::Key(key));
if iter.valid() {
Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec())))
} else {
Ok(None)
}
}

// Seek the first key >= given key, if no found, return None.
fn seek_cf(&self, cf: &str, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut iter = self.iterator_cf(cf)?;
iter.seek(SeekKey::Key(key));
if iter.valid() {
Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec())))
} else {
Ok(None)
}
}
}

fn scan_impl<Iter: Iterator, F>(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
it.seek(SeekKey::Key(start_key));
while it.valid() {
let r = f(it.key()?, it.value()?)?;

if !r || !it.next() {
break;
}
}

it.status().map_err(From::from)
}
Loading

0 comments on commit a296cf8

Please sign in to comment.