Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add engine_traits component #5445

Merged
merged 15 commits into from Sep 25, 2019
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -120,6 +120,7 @@ engine = { path = "components/engine" }
tidb_query = { path = "components/tidb_query" }
pd_client = { path = "components/pd_client" }
keys = { path = "components/keys" }
engine_traits = { path = "components/engine_traits" }
sst_importer = { path = "components/sst_importer" }

[dependencies.murmur3]
Expand Down
12 changes: 12 additions & 0 deletions components/engine_traits/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "engine_traits"
version = "0.0.1"
edition = "2018"
publish = false

[dependencies]
protobuf = "2"
quick-error = "1.2.2"
tikv_alloc = { path = "../tikv_alloc", default-features = false }
hex = "0.3"
tikv_util = { path = "../tikv_util" }
11 changes: 11 additions & 0 deletions components/engine_traits/src/cf.rs
@@ -0,0 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

pub type CfName = &'static str;
overvenus marked this conversation as resolved.
Show resolved Hide resolved
pub const CF_DEFAULT: CfName = "default";
pub const CF_LOCK: CfName = "lock";
pub const CF_WRITE: CfName = "write";
pub const CF_RAFT: CfName = "raft";
// Cfs that should be very large generally.
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];
65 changes: 65 additions & 0 deletions components/engine_traits/src/engine.rs
@@ -0,0 +1,65 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::Debug;
use std::path::Path;

use crate::*;

pub trait Snapshot: 'static + Peekable + Send + Sync + Debug {
fn cf_names(&self) -> Vec<&str>;
}

pub trait WriteBatch: Mutable {
fn data_size(&self) -> usize;
fn count(&self) -> usize;
fn is_empty(&self) -> bool;
fn clear(&self);

fn set_save_point(&mut self);
fn pop_save_point(&mut self) -> Result<()>;
fn rollback_to_save_point(&mut self) -> Result<()>;
}

pub trait KvEngine: Peekable + Mutable + Iterable + Send + Sync + Clone {
type Snap: Snapshot;
type Batch: WriteBatch;

fn write_opt(&self, opts: &WriteOptions, wb: &Self::Batch) -> Result<()>;
fn write(&self, wb: &Self::Batch) -> Result<()> {
self.write_opt(&WriteOptions::default(), wb)
}
fn write_batch(&self, cap: usize) -> Self::Batch;
fn snapshot(&self) -> Self::Snap;
fn sync(&self) -> Result<()>;
fn cf_names(&self) -> Vec<&str>;
fn delete_all_in_range(
&self,
t: DeleteRangeType,
start_key: &[u8],
end_key: &[u8],
) -> Result<()> {
if start_key >= end_key {
return Ok(());
}
for cf in self.cf_names() {
self.delete_all_in_range_cf(t, cf, start_key, end_key)?;
}
Ok(())
}
fn delete_all_in_range_cf(
&self,
t: DeleteRangeType,
cf: &str,
start_key: &[u8],
end_key: &[u8],
) -> Result<()>;

fn ingest_external_file_cf(&self, cf: &str, files: &[&str]) -> Result<()>;
fn validate_file_for_ingestion<P: AsRef<Path>>(
&self,
cf: &str,
path: P,
expected_size: u64,
expected_checksum: u32,
) -> Result<()>;
}
44 changes: 44 additions & 0 deletions components/engine_traits/src/errors.rs
@@ -0,0 +1,44 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::{error, result};

quick_error! {
#[derive(Debug)]
pub enum Error {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
// Engine uses plain string as the error.
Engine(msg: String) {
from()
description("Storage Engine error")
display("Storage Engine {}", msg)
}
// FIXME: It should not know Region.
NotInRange( key: Vec<u8>, regoin_id: u64, start: Vec<u8>, end: Vec<u8>) {
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
description("Key is out of range")
display(
"Key {} is out of [region {}] [{}, {})",
hex::encode_upper(&key), regoin_id, hex::encode_upper(&start), hex::encode_upper(&end)
)
}
Protobuf(err: protobuf::ProtobufError) {
from()
cause(err)
description(err.description())
display("Protobuf {}", err)
}
Io(err: std::io::Error) {
from()
cause(err)
description(err.description())
display("Io {}", err)
}

Other(err: Box<dyn error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("{:?}", err)
}
}
}

pub type Result<T> = result::Result<T, Error>;
112 changes: 112 additions & 0 deletions components/engine_traits/src/iterable.rs
@@ -0,0 +1,112 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use tikv_util::keybuilder::KeyBuilder;

use crate::*;

#[derive(Clone, PartialEq)]
pub enum SeekMode {
TotalOrder,
Prefix,
}

pub enum SeekKey<'a> {
Start,
End,
Key(&'a [u8]),
}

pub trait Iterator {
fn seek(&mut self, key: SeekKey) -> bool;
fn seek_for_prev(&mut self, key: SeekKey) -> bool;

fn prev(&mut self) -> bool;
fn next(&mut self) -> bool;

fn key(&self) -> Result<&[u8]>;
fn value(&self) -> Result<&[u8]>;

fn valid(&self) -> bool;
fn status(&self) -> Result<()>;
}

pub trait Iterable {
type Iter: Iterator;

fn iterator_opt(&self, opts: &IterOptions) -> Result<Self::Iter>;
fn iterator_cf_opt(&self, opts: &IterOptions, cf: &str) -> Result<Self::Iter>;

fn iterator(&self) -> Result<Self::Iter> {
self.iterator_opt(&IterOptions::default())
}

fn iterator_cf(&self, cf: &str) -> Result<Self::Iter> {
self.iterator_cf_opt(&IterOptions::default(), cf)
}

fn scan<F>(&self, start_key: &[u8], end_key: &[u8], fill_cache: bool, f: F) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0);
let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0);
let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache);
scan_impl(self.iterator_opt(&iter_opt)?, start_key, f)
}

// like `scan`, only on a specific column family.
fn scan_cf<F>(
&self,
cf: &str,
start_key: &[u8],
end_key: &[u8],
fill_cache: bool,
f: F,
) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let start = KeyBuilder::from_slice(start_key, DATA_KEY_PREFIX_LEN, 0);
let end = KeyBuilder::from_slice(end_key, DATA_KEY_PREFIX_LEN, 0);
let iter_opt = IterOptions::new(Some(start), Some(end), fill_cache);
scan_impl(self.iterator_cf_opt(&iter_opt, cf)?, start_key, f)
}

// Seek the first key >= given key, if no found, return None.
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
fn seek(&self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut iter = self.iterator()?;
iter.seek(SeekKey::Key(key));
if iter.valid() {
Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec())))
} else {
Ok(None)
}
}

// Seek the first key >= given key, if no found, return None.
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
fn seek_cf(&self, cf: &str, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut iter = self.iterator_cf(cf)?;
iter.seek(SeekKey::Key(key));
if iter.valid() {
Ok(Some((iter.key()?.to_vec(), iter.value()?.to_vec())))
} else {
Ok(None)
}
}
}

fn scan_impl<Iter: Iterator, F>(mut it: Iter, start_key: &[u8], mut f: F) -> Result<()>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should move the Iterator bound to the where block here in order to be consistent?

where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
it.seek(SeekKey::Key(start_key));
while it.valid() {
let r = f(it.key()?, it.value()?)?;

5kbpers marked this conversation as resolved.
Show resolved Hide resolved
if !r || !it.next() {
break;
}
}

it.status().map_err(From::from)
}
70 changes: 70 additions & 0 deletions components/engine_traits/src/lib.rs
@@ -0,0 +1,70 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

#![recursion_limit = "200"]

#[macro_use]
extern crate quick_error;
#[allow(unused_extern_crates)]
extern crate tikv_alloc;

mod errors;
pub use crate::errors::*;
mod peekable;
pub use crate::peekable::*;
mod iterable;
pub use crate::iterable::*;
mod mutable;
pub use crate::mutable::*;
mod cf;
pub use crate::cf::*;
mod engine;
pub use crate::engine::*;
mod options;
pub use crate::options::*;

pub const DATA_KEY_PREFIX_LEN: usize = 1;

// In our tests, we found that if the batch size is too large, running delete_all_in_range will
// reduce OLTP QPS by 30% ~ 60%. We found that 32K is a proper choice.
pub const MAX_DELETE_BATCH_SIZE: usize = 32 * 1024;

#[derive(Clone, Debug)]
pub struct KvEngines<K, R> {
pub kv: K,
pub raft: R,
pub shared_block_cache: bool,
}

impl<K: KvEngine, R: KvEngine> KvEngines<K, R> {
pub fn new(kv_engine: K, raft_engine: R, shared_block_cache: bool) -> Self {
KvEngines {
kv: kv_engine,
raft: raft_engine,
shared_block_cache,
}
}

pub fn write_kv(&self, wb: &K::Batch) -> Result<()> {
self.kv.write(wb)
}

pub fn write_kv_opt(&self, wb: &K::Batch, opts: &WriteOptions) -> Result<()> {
self.kv.write_opt(opts, wb)
}

pub fn sync_kv(&self) -> Result<()> {
self.kv.sync()
}

pub fn write_raft(&self, wb: &R::Batch) -> Result<()> {
self.raft.write(wb)
}

pub fn write_raft_opt(&self, wb: &R::Batch, opts: &WriteOptions) -> Result<()> {
self.raft.write_opt(opts, wb)
}

pub fn sync_raft(&self) -> Result<()> {
self.raft.sync()
}
}
35 changes: 35 additions & 0 deletions components/engine_traits/src/mutable.rs
@@ -0,0 +1,35 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::*;

pub trait Mutable {
fn put_opt(&self, opts: &WriteOptions, key: &[u8], value: &[u8]) -> Result<()>;
fn put_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8], value: &[u8]) -> Result<()>;

fn delete_opt(&self, opts: &WriteOptions, key: &[u8]) -> Result<()>;
fn delete_cf_opt(&self, opts: &WriteOptions, cf: &str, key: &[u8]) -> Result<()>;

fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.put_opt(&WriteOptions::default(), key, value)
}

fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> {
self.put_cf_opt(&WriteOptions::default(), cf, key, value)
}

fn delete(&self, key: &[u8]) -> Result<()> {
self.delete_opt(&WriteOptions::default(), key)
}

fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> {
self.delete_cf_opt(&WriteOptions::default(), cf, key)
}

fn put_msg<M: protobuf::Message>(&self, key: &[u8], m: &M) -> Result<()> {
self.put(key, &m.write_to_bytes()?)
}

fn put_msg_cf<M: protobuf::Message>(&self, cf: &str, key: &[u8], m: &M) -> Result<()> {
self.put_cf(cf, key, &m.write_to_bytes()?)
}
}