Skip to content

Commit

Permalink
Merge branch 'master' into busyjay/support-avg
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Jul 15, 2016
2 parents 45f0914 + 45c68ce commit e140eb6
Show file tree
Hide file tree
Showing 22 changed files with 271 additions and 229 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ENABLE_FEATURES ?= dev
ENABLE_FEATURES ?= default

DEPS_PATH = $(CURDIR)/tmp
BIN_PATH = $(CURDIR)/bin
Expand All @@ -7,6 +7,9 @@ BIN_PATH = $(CURDIR)/bin

all: format build test

dev:
@export ENABLE_FEATURES=dev && make

build:
cargo build --features ${ENABLE_FEATURES}

Expand Down
1 change: 1 addition & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
+ Linux or Mac OS X.
+ Rust, [nightly version](https://www.rust-lang.org/downloads.html) is required.
+ Go, [1.5+](https://golang.org/doc/install) is required.
+ GCC 4.8+ is required.

### Installing TiKV

Expand Down
1 change: 0 additions & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#![feature(test)]
#![cfg_attr(feature = "dev", plugin(clippy))]
#![cfg_attr(not(feature = "dev"), allow(unknown_lints))]
#![allow(new_without_default)]

#[macro_use]
extern crate log;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#![feature(plugin)]
#![feature(box_syntax)]
#![feature(const_fn)]
#![feature(iter_arith)]
#![cfg_attr(feature = "dev", plugin(clippy))]
#![cfg_attr(not(feature = "dev"), allow(unknown_lints))]
#![recursion_limit="100"]
Expand Down
71 changes: 51 additions & 20 deletions src/storage/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
use std::{error, result};
use std::fmt::Debug;
use std::cmp::Ordering;
use std::boxed::FnBox;
use std::time::Duration;

use self::rocksdb::EngineRocksdb;
use storage::{Key, Value, CfName};
use kvproto::kvrpcpb::Context;
use kvproto::errorpb::Error as ErrorHeader;
use util::event::Event;

mod rocksdb;
pub mod raftkv;
Expand All @@ -30,6 +33,9 @@ pub const TEMP_DIR: &'static str = "";
pub const DEFAULT_CFNAME: CfName = "default";

const SEEK_BOUND: usize = 30;
const DEFAULT_TIMEOUT_SECS: u64 = 5;

pub type Callback<T> = Box<FnBox(Result<T>) + Send>;

#[derive(Debug)]
pub enum Modify {
Expand All @@ -38,10 +44,32 @@ pub enum Modify {
}

pub trait Engine: Send + Sync + Debug {
fn get(&self, ctx: &Context, key: &Key) -> Result<Option<Value>>;
fn get_cf(&self, ctx: &Context, cf: CfName, key: &Key) -> Result<Option<Value>>;
fn write(&self, ctx: &Context, batch: Vec<Modify>) -> Result<()>;
fn snapshot<'a>(&'a self, ctx: &Context) -> Result<Box<Snapshot + 'a>>;
fn async_write(&self, ctx: &Context, batch: Vec<Modify>, callback: Callback<()>) -> Result<()>;
fn async_snapshot(&self, ctx: &Context, callback: Callback<Box<Snapshot>>) -> Result<()>;

fn write(&self, ctx: &Context, batch: Vec<Modify>) -> Result<()> {
let finished = Event::new();
let finished2 = finished.clone();
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);

try!(self.async_write(ctx, batch, box move |res| finished2.set(res)));
if finished.wait_timeout(Some(timeout)) {
return finished.take().unwrap();
}
Err(Error::Timeout(timeout))
}

fn snapshot(&self, ctx: &Context) -> Result<Box<Snapshot>> {
let finished = Event::new();
let finished2 = finished.clone();
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);

try!(self.async_snapshot(ctx, box move |res| finished2.set(res)));
if finished.wait_timeout(Some(timeout)) {
return finished.take().unwrap();
}
Err(Error::Timeout(timeout))
}

fn put(&self, ctx: &Context, key: Key, value: Value) -> Result<()> {
self.put_cf(ctx, DEFAULT_CFNAME, key, value)
Expand All @@ -58,14 +86,9 @@ pub trait Engine: Send + Sync + Debug {
fn delete_cf(&self, ctx: &Context, cf: CfName, key: Key) -> Result<()> {
self.write(ctx, vec![Modify::Delete(cf, key)])
}

fn iter<'a>(&'a self, ctx: &Context) -> Result<Box<Cursor + 'a>>;

// maybe mut is better.
fn close(&self) {}
}

pub trait Snapshot {
pub trait Snapshot: Send {
fn get(&self, key: &Key) -> Result<Option<Value>>;
fn get_cf(&self, cf: CfName, key: &Key) -> Result<Option<Value>>;
fn iter<'a>(&'a self) -> Result<Box<Cursor + 'a>>;
Expand Down Expand Up @@ -186,6 +209,10 @@ quick_error! {
description("RocksDb error")
display("RocksDb {}", msg)
}
Timeout(d: Duration) {
description("request timeout")
display("timeout after {:?}", d)
}
Other(err: Box<error::Error + Send + Sync>) {
from()
cause(err.as_ref())
Expand Down Expand Up @@ -256,26 +283,28 @@ mod tests {
}

fn assert_has(engine: &Engine, key: &[u8], value: &[u8]) {
assert_eq!(engine.get(&Context::new(), &make_key(key)).unwrap().unwrap(),
value);
let snapshot = engine.snapshot(&Context::new()).unwrap();
assert_eq!(snapshot.get(&make_key(key)).unwrap().unwrap(), value);
}

fn assert_has_cf(engine: &Engine, cf: CfName, key: &[u8], value: &[u8]) {
assert_eq!(engine.get_cf(&Context::new(), cf, &make_key(key)).unwrap().unwrap(),
value);
let snapshot = engine.snapshot(&Context::new()).unwrap();
assert_eq!(snapshot.get_cf(cf, &make_key(key)).unwrap().unwrap(), value);
}

fn assert_none(engine: &Engine, key: &[u8]) {
assert_eq!(engine.get(&Context::new(), &make_key(key)).unwrap(), None);
let snapshot = engine.snapshot(&Context::new()).unwrap();
assert_eq!(snapshot.get(&make_key(key)).unwrap(), None);
}

fn assert_none_cf(engine: &Engine, cf: CfName, key: &[u8]) {
assert_eq!(engine.get_cf(&Context::new(), cf, &make_key(key)).unwrap(),
None);
let snapshot = engine.snapshot(&Context::new()).unwrap();
assert_eq!(snapshot.get_cf(cf, &make_key(key)).unwrap(), None);
}

fn assert_seek(engine: &Engine, key: &[u8], pair: (&[u8], &[u8])) {
let mut iter = engine.iter(&Context::new()).unwrap();
let snapshot = engine.snapshot(&Context::new()).unwrap();
let mut iter = snapshot.iter().unwrap();
iter.seek(&make_key(key)).unwrap();
assert_eq!((iter.key(), iter.value()),
(&*bytes::encode_bytes(pair.0), pair.1));
Expand Down Expand Up @@ -325,7 +354,8 @@ mod tests {
must_put(engine, b"z", b"2");
assert_seek(engine, b"y", (b"z", b"2"));
assert_seek(engine, b"x\x00", (b"z", b"2"));
let mut iter = engine.iter(&Context::new()).unwrap();
let snapshot = engine.snapshot(&Context::new()).unwrap();
let mut iter = snapshot.iter().unwrap();
assert!(!iter.seek(&make_key(b"z\x00")).unwrap());
must_delete(engine, b"x");
must_delete(engine, b"z");
Expand All @@ -334,7 +364,8 @@ mod tests {
fn test_near_seek(engine: &Engine) {
must_put(engine, b"x", b"1");
must_put(engine, b"z", b"2");
let mut cursor = engine.iter(&Context::new()).unwrap();
let snapshot = engine.snapshot(&Context::new()).unwrap();
let mut cursor = snapshot.iter().unwrap();
let cursor_mut = cursor.as_mut();
assert_near_seek(cursor_mut, b"x", (b"x", b"1"));
assert_near_seek(cursor_mut, b"a", (b"x", b"1"));
Expand Down
94 changes: 42 additions & 52 deletions src/storage/engine/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ use rocksdb::DB;
use protobuf::RepeatedField;

use storage::engine;
use super::{Engine, Modify, Cursor, Snapshot, DEFAULT_CFNAME};
use util::event::Event;
use super::{Engine, Modify, Cursor, Snapshot, Callback, DEFAULT_CFNAME};
use storage::{Key, Value, CfName};

const DEFAULT_TIMEOUT_SECS: u64 = 5;

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -138,26 +135,17 @@ impl<C: PdClient> RaftKv<C> {
}
}

fn call_command(&self, req: RaftCmdRequest) -> Result<CmdRes> {
fn call_command(&self, req: RaftCmdRequest, cb: Callback<CmdRes>) -> Result<()> {
let uuid = req.get_header().get_uuid().to_vec();
let l = req.get_requests().len();
let db = self.db.clone();
let finished = Event::new();
let finished2 = finished.clone();
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);

try!(self.router.rl().send_command(req,
box move |resp| {
let res = on_result(resp, l, &uuid, db);
finished2.set(res);
cb(on_result(resp, l, &uuid, db)
.map_err(Error::into));
Ok(())
}));

if finished.wait_timeout(Some(timeout)) {
return finished.take().unwrap();
}

Err(Error::Timeout(timeout))
Ok(())
}

fn new_request_header(&self, ctx: &Context) -> RaftRequestHeader {
Expand All @@ -169,21 +157,12 @@ impl<C: PdClient> RaftKv<C> {
header
}

fn exec_requests(&self, ctx: &Context, reqs: Vec<Request>) -> Result<CmdRes> {
fn exec_requests(&self, ctx: &Context, reqs: Vec<Request>, cb: Callback<CmdRes>) -> Result<()> {
let header = self.new_request_header(ctx);
let mut cmd = RaftCmdRequest::new();
cmd.set_header(header);
cmd.set_requests(RepeatedField::from_vec(reqs));
self.call_command(cmd)
}

fn raw_snapshot(&self, ctx: &Context) -> Result<RegionSnapshot> {
let mut req = Request::new();
req.set_cmd_type(CmdType::Snap);
match try!(self.exec_requests(ctx, vec![req])) {
CmdRes::Resp(rs) => Err(invalid_resp_type(CmdType::Snap, rs[0].get_cmd_type()).into()),
CmdRes::Snap(s) => Ok(s),
}
self.call_command(cmd, cb)
}
}

Expand All @@ -198,22 +177,11 @@ impl<C: PdClient> Debug for RaftKv<C> {
}

impl<C: PdClient> Engine for RaftKv<C> {
fn get(&self, ctx: &Context, key: &Key) -> engine::Result<Option<Value>> {
let snap = try!(self.snapshot(ctx));
snap.get(key)
}

fn get_cf(&self, ctx: &Context, cf: CfName, key: &Key) -> engine::Result<Option<Value>> {
let snap = try!(self.snapshot(ctx));
snap.get_cf(cf, key)
}

fn iter<'a>(&'a self, ctx: &Context) -> engine::Result<Box<Cursor + 'a>> {
let snap = try!(self.raw_snapshot(ctx));
Ok(box RegionIterator::new(self.db.iter(), snap.get_region().clone()))
}

fn write(&self, ctx: &Context, mut modifies: Vec<Modify>) -> engine::Result<()> {
fn async_write(&self,
ctx: &Context,
mut modifies: Vec<Modify>,
cb: Callback<()>)
-> engine::Result<()> {
if modifies.len() == 0 {
return Ok(());
}
Expand Down Expand Up @@ -244,18 +212,40 @@ impl<C: PdClient> Engine for RaftKv<C> {
}
reqs.push(req);
}
match try!(self.exec_requests(ctx, reqs)) {
CmdRes::Resp(_) => Ok(()),
CmdRes::Snap(_) => Err(box_err!("unexpect snapshot, should mutate instead.")),
}
try!(self.exec_requests(ctx,
reqs,
box move |res| {
match res {
Ok(CmdRes::Resp(_)) => cb(Ok(())),
Ok(CmdRes::Snap(_)) => {
cb(Err(box_err!("unexpect snapshot, should mutate instead.")))
}
Err(e) => cb(Err(e)),
}
}));
Ok(())
}

fn snapshot<'a>(&'a self, ctx: &Context) -> engine::Result<Box<Snapshot + 'a>> {
let snap = try!(self.raw_snapshot(ctx));
Ok(box snap)
fn async_snapshot(&self, ctx: &Context, cb: Callback<Box<Snapshot>>) -> engine::Result<()> {
let mut req = Request::new();
req.set_cmd_type(CmdType::Snap);
try!(self.exec_requests(ctx,
vec![req],
box move |res| {
match res {
Ok(CmdRes::Resp(r)) => {
cb(Err(invalid_resp_type(CmdType::Snap, r[0].get_cmd_type()).into()))
}
Ok(CmdRes::Snap(s)) => cb(Ok(box s)),
Err(e) => cb(Err(e)),
}
}));
Ok(())
}
}

fn close(&self) {
impl<C: PdClient> Drop for RaftKv<C> {
fn drop(&mut self) {
self.node.lock().unwrap().stop();
}
}
Expand Down
Loading

0 comments on commit e140eb6

Please sign in to comment.