Skip to content

Commit

Permalink
Merge branch 'master' into busyjay/trigger-size-scan-for-new-region
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Jul 7, 2016
2 parents a2012ec + c2d53d6 commit 802e994
Show file tree
Hide file tree
Showing 21 changed files with 343 additions and 183 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ use std::net::UdpSocket;
use std::time::Duration;

use getopts::{Options, Matches};
use rocksdb::{DB, Options as RocksdbOptions, BlockBasedOptions};
use rocksdb::{Options as RocksdbOptions, BlockBasedOptions};
use mio::tcp::TcpListener;
use fs2::FileExt;
use cadence::{StatsdClient, NopMetricSink};

use tikv::storage::{Storage, Dsn, TEMP_DIR};
use tikv::storage::{Storage, Dsn, TEMP_DIR, DEFAULT_CFS};
use tikv::util::{self, logger, panic_hook};
use tikv::util::metric::{self, BufferedUdpMetricSink};
use tikv::server::{DEFAULT_LISTENING_ADDR, SendCh, Server, Node, Config, bind, create_event_loop,
create_raft_storage};
use tikv::server::{ServerTransport, ServerRaftStoreRouter, MockRaftStoreRouter};
use tikv::server::{MockStoreAddrResolver, PdStoreAddrResolver};
use tikv::raftstore::store::{self, SnapManager};
use tikv::raftstore::store::{self, SnapManager, new_engine_opt};
use tikv::pd::{new_rpc_client, RpcClient};

const ROCKSDB_DSN: &'static str = "rocksdb";
Expand Down Expand Up @@ -337,7 +337,7 @@ fn build_raftkv(matches: &Matches,
let opts = get_rocksdb_option(matches, config);
let mut db_path = path.clone();
db_path.push("db");
let engine = Arc::new(DB::open(&opts, db_path.to_str().unwrap()).unwrap());
let engine = new_engine_opt(opts, db_path.to_str().unwrap(), DEFAULT_CFS).unwrap();

let mut event_loop = store::create_event_loop(&cfg.store_cfg).unwrap();
let mut node = Node::new(&mut event_loop, cfg, pd_client);
Expand Down
5 changes: 5 additions & 0 deletions src/raft/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,9 @@ impl<T: Storage> RawNode<T> {
m.set_from(transferee);
self.raft.step(m).is_ok();
}

#[inline]
pub fn get_store(&self) -> &T {
self.raft.get_store()
}
}
3 changes: 2 additions & 1 deletion src/raftstore/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ mod test {
use std::sync::*;
use std::fmt::Debug;
use protobuf::RepeatedField;
use storage::DEFAULT_CFS;

use kvproto::metapb::Region;
use kvproto::raft_cmdpb::{AdminRequest, Request, AdminResponse, Response, RaftCmdRequest,
Expand Down Expand Up @@ -214,7 +215,7 @@ mod test {
}

fn new_peer_storage(path: &TempDir) -> PeerStorage {
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = new_engine(path.path().to_str().unwrap(), DEFAULT_CFS).unwrap();
PeerStorage::new(engine, &Region::new(), worker::dummy_scheduler()).unwrap()
}

Expand Down
10 changes: 8 additions & 2 deletions src/raftstore/coprocessor/region_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ impl Peekable for RegionSnapshot {
let data_key = keys::data_key(key);
self.snap.get_value(&data_key)
}

fn get_value_cf(&self, cf: &str, key: &[u8]) -> Result<Option<DBVector>> {
try!(util::check_key_in_region(key, &self.region));
let data_key = keys::data_key(key);
self.snap.get_value_cf(cf, &data_key)
}
}

/// `RegionIterator` wrap a rocksdb iterator and only allow it to
Expand Down Expand Up @@ -209,7 +215,7 @@ mod tests {
use raftstore::store::engine::*;
use raftstore::store::keys::*;
use raftstore::store::PeerStorage;
use storage::{Cursor, Key};
use storage::{Cursor, Key, DEFAULT_CFS};
use util::worker;

use super::*;
Expand All @@ -219,7 +225,7 @@ mod tests {
type DataSet = Vec<(Vec<u8>, Vec<u8>)>;

fn new_temp_engine(path: &TempDir) -> Arc<DB> {
new_engine(path.path().to_str().unwrap()).unwrap()
new_engine(path.path().to_str().unwrap(), DEFAULT_CFS).unwrap()
}

fn new_peer_storage(engine: Arc<DB>, r: &Region) -> PeerStorage {
Expand Down
5 changes: 3 additions & 2 deletions src/raftstore/coprocessor/split_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ mod test {
use util::codec::bytes::encode_bytes;
use util::worker;
use byteorder::{BigEndian, WriteBytesExt};
use storage::DEFAULT_CFS;

fn new_peer_storage(path: &TempDir) -> PeerStorage {
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = new_engine(path.path().to_str().unwrap(), DEFAULT_CFS).unwrap();
PeerStorage::new(engine, &Region::new(), worker::dummy_scheduler()).unwrap()
}

Expand Down Expand Up @@ -156,7 +157,7 @@ mod test {
let region_start_key = new_row_key(256, 1, 0, 0);
let key = new_row_key(256, 2, 1, 0);
let path = TempDir::new("test-split").unwrap();
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = new_engine(path.path().to_str().unwrap(), DEFAULT_CFS).unwrap();
let mut r = Region::new();
r.set_id(10);
r.set_start_key(region_start_key);
Expand Down
70 changes: 63 additions & 7 deletions src/raftstore/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
use std::option::Option;
use std::sync::Arc;

use rocksdb::{DB, Writable, DBIterator, DBVector, WriteBatch, ReadOptions};
use rocksdb::{DB, Writable, DBIterator, DBVector, WriteBatch, ReadOptions, Options};
use rocksdb::rocksdb::UnsafeSnap;
use protobuf;
use byteorder::{ByteOrder, BigEndian};

use raftstore::Result;
use raftstore::{Error, Result};


pub struct Snapshot {
Expand Down Expand Up @@ -50,15 +50,39 @@ impl Drop for Snapshot {
}
}

pub fn new_engine(path: &str) -> Result<Arc<DB>> {
// TODO: set proper options here,
let db = try!(DB::open_default(path));
pub fn new_engine(path: &str, cfs: &[&str]) -> Result<Arc<DB>> {
let opts = Options::new();
new_engine_opt(opts, path, cfs)
}

pub fn new_engine_opt(mut opts: Options, path: &str, cfs: &[&str]) -> Result<Arc<DB>> {
// TODO: configurable opts for each CF.
// Currently we support 1) Create new db. 2) Open a db with CFs we want. 3) Open db with no
// CF.
// TODO: Support open db with incomplete CFs.
opts.create_if_missing(false);
match DB::open_cf(&opts, path, cfs) {
Ok(db) => return Ok(Arc::new(db)),
Err(e) => warn!("open rocksdb fail: {}", e),
}

opts.create_if_missing(true);
let mut db = match DB::open(&opts, path) {
Ok(db) => db,
Err(e) => return Err(Error::RocksDb(e)),
};
for cf in cfs {
if let Err(e) = db.create_cf(cf, &opts) {
return Err(Error::RocksDb(e));
}
}
Ok(Arc::new(db))
}

// TODO: refactor this trait into rocksdb trait.
pub trait Peekable {
fn get_value(&self, key: &[u8]) -> Result<Option<DBVector>>;
fn get_value_cf(&self, cf: &str, key: &[u8]) -> Result<Option<DBVector>>;

fn get_msg<M>(&self, key: &[u8]) -> Result<Option<M>>
where M: protobuf::Message + protobuf::MessageStatic
Expand Down Expand Up @@ -141,6 +165,13 @@ impl Peekable for DB {
let v = try!(self.get(key));
Ok(v)
}

fn get_value_cf(&self, cf: &str, key: &[u8]) -> Result<Option<DBVector>> {
let handle = try!(self.cf_handle(cf)
.ok_or_else(|| Error::RocksDb(format!("cf {} not found.", cf))));
let v = try!(self.get_cf(*handle, key));
Ok(v)
}
}

impl Iterable for DB {
Expand All @@ -158,6 +189,17 @@ impl Peekable for Snapshot {
let v = try!(self.db.get_opt(key, &opt));
Ok(v)
}
fn get_value_cf(&self, cf: &str, key: &[u8]) -> Result<Option<DBVector>> {
let handle = try!(self.db
.cf_handle(cf)
.ok_or_else(|| Error::RocksDb(format!("cf {} not found.", cf))));
let mut opt = ReadOptions::new();
unsafe {
opt.set_snapshot(&self.snap);
}
let v = try!(self.db.get_cf_opt(*handle, key, &opt));
Ok(v)
}
}

impl Iterable for Snapshot {
Expand Down Expand Up @@ -208,7 +250,7 @@ mod tests {
#[test]
fn test_base() {
let path = TempDir::new("var").unwrap();
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = new_engine(path.path().to_str().unwrap(), &[]).unwrap();

let mut r = Region::new();
r.set_id(10);
Expand Down Expand Up @@ -246,10 +288,24 @@ mod tests {
assert_eq!(snap.get_i64(key).unwrap(), Some(-1));
}

#[test]
fn test_peekable() {
let path = TempDir::new("var").unwrap();
let engine = new_engine(path.path().to_str().unwrap(), &["cf"]).unwrap();

engine.put(b"k1", b"v1").unwrap();
let handle = engine.cf_handle("cf").unwrap();
engine.put_cf(*handle, b"k1", b"v2").unwrap();

assert_eq!(&*engine.get_value(b"k1").unwrap().unwrap(), b"v1");
assert!(engine.get_value_cf("foo", b"k1").is_err());
assert_eq!(&*engine.get_value_cf("cf", b"k1").unwrap().unwrap(), b"v2");
}

#[test]
fn test_scan() {
let path = TempDir::new("var").unwrap();
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = new_engine(path.path().to_str().unwrap(), &[]).unwrap();

engine.put(b"a1", b"v1").unwrap();
engine.put(b"a2", b"v2").unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/raftstore/store/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub const REGION_META_MAX_KEY: &'static [u8] = &[LOCAL_PREFIX, REGION_META_PREFI
// For region id
pub const RAFT_LOG_SUFFIX: u8 = 0x01;
pub const RAFT_STATE_SUFFIX: u8 = 0x02;
pub const APPLY_STATE_SUFFIX: u8 = 0x03;

// For region meta
pub const REGION_STATE_SUFFIX: u8 = 0x01;
Expand Down Expand Up @@ -90,6 +91,10 @@ pub fn raft_state_key(region_id: u64) -> Vec<u8> {
make_region_id_key(region_id, RAFT_STATE_SUFFIX, 0)
}

pub fn apply_state_key(region_id: u64) -> Vec<u8> {
make_region_id_key(region_id, APPLY_STATE_SUFFIX, 0)
}

/// Get the log index from raft log key generated by `raft_log_key`.
pub fn raft_log_index(key: &[u8]) -> Result<u64> {
let expect_key_len = REGION_RAFT_PREFIX_KEY.len() + mem::size_of::<u64>() +
Expand Down Expand Up @@ -192,6 +197,7 @@ mod tests {
assert!(raft_log_prefix(region_id).starts_with(&prefix));
assert!(raft_log_key(region_id, 1).starts_with(&prefix));
assert!(raft_state_key(region_id).starts_with(&prefix));
assert!(apply_state_key(region_id).starts_with(&prefix));
}

// test sort.
Expand All @@ -200,14 +206,14 @@ mod tests {
let lhs = region_raft_prefix(lid);
let rhs = region_raft_prefix(rid);
assert_eq!(lhs.partial_cmp(&rhs), Some(order));
}

// test state sort.
let tbls = vec![(1, 0, Ordering::Greater), (1, 1, Ordering::Equal), (1, 2, Ordering::Less)];
for (lid, rid, order) in tbls {
let lhs = raft_state_key(lid);
let rhs = raft_state_key(rid);
assert_eq!(lhs.partial_cmp(&rhs), Some(order));

let lhs = apply_state_key(lid);
let rhs = apply_state_key(rid);
assert_eq!(lhs.partial_cmp(&rhs), Some(order));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use self::config::Config;
pub use self::transport::Transport;
pub use self::peer::Peer;
pub use self::bootstrap::{bootstrap_store, bootstrap_region, write_region, clear_region};
pub use self::engine::{Peekable, Iterable, Mutable};
pub use self::engine::{Peekable, Iterable, Mutable, new_engine, new_engine_opt};
pub use self::peer_storage::{PeerStorage, do_snapshot, SnapState, RaftStorage, RAFT_INIT_LOG_TERM,
RAFT_INIT_LOG_INDEX};
pub use self::snap::{SnapFile, SnapKey, SnapManager, new_snap_mgr, SnapEntry};

0 comments on commit 802e994

Please sign in to comment.