Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: add split unit test #1742

Merged
merged 9 commits into from Apr 13, 2017
5 changes: 3 additions & 2 deletions src/raftstore/store/store.rs
Expand Up @@ -400,7 +400,8 @@ impl<T: Transport, C: PdClient> Store<T, C> {
self.register_consistency_check_tick(event_loop);
self.register_report_region_flow_tick(event_loop);

let split_check_runner = SplitCheckRunner::new(self.sendch.clone(),
let split_check_runner = SplitCheckRunner::new(self.engine.clone(),
self.sendch.clone(),
self.cfg.region_max_size,
self.cfg.region_split_size);
box_try!(self.split_check_worker.start(split_check_runner));
Expand Down Expand Up @@ -1433,7 +1434,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
peer.tag,
peer.size_diff_hint,
self.cfg.region_check_size_diff);
let task = SplitCheckTask::new(peer.get_store());
let task = SplitCheckTask::new(peer.region());
if let Err(e) = self.split_check_worker.schedule(task) {
error!("{} failed to schedule split check: {}", self.tag, e);
}
Expand Down
120 changes: 104 additions & 16 deletions src/raftstore/store/worker/split_check.rs
Expand Up @@ -19,12 +19,14 @@ use std::cmp::Ordering;
use rocksdb::DB;

use kvproto::metapb::RegionEpoch;
use raftstore::store::{PeerStorage, keys, Msg};
use kvproto::metapb::Region;

use raftstore::store::{keys, Msg};
use raftstore::store::engine::{Iterable, IterOption};
use raftstore::Result;
use rocksdb::DBIterator;
use util::escape;
use util::transport::SendCh;
use util::transport::{RetryableSendCh, Sender};
use util::worker::Runnable;
use storage::{CfName, LARGE_CFS};

Expand Down Expand Up @@ -121,17 +123,15 @@ pub struct Task {
epoch: RegionEpoch,
start_key: Vec<u8>,
end_key: Vec<u8>,
engine: Arc<DB>,
}

impl Task {
pub fn new(ps: &PeerStorage) -> Task {
pub fn new(region: &Region) -> Task {
Task {
region_id: ps.get_region_id(),
epoch: ps.get_region().get_region_epoch().clone(),
start_key: keys::enc_start_key(&ps.region),
end_key: keys::enc_end_key(&ps.region),
engine: ps.get_engine().clone(),
region_id: region.get_id(),
epoch: region.get_region_epoch().clone(),
start_key: keys::enc_start_key(region),
end_key: keys::enc_end_key(region),
}
}
}
Expand All @@ -142,23 +142,29 @@ impl Display for Task {
}
}

pub struct Runner {
ch: SendCh<Msg>,
pub struct Runner<C> {
engine: Arc<DB>,
ch: RetryableSendCh<Msg, C>,
region_max_size: u64,
split_size: u64,
}

impl Runner {
pub fn new(ch: SendCh<Msg>, region_max_size: u64, split_size: u64) -> Runner {
impl<C> Runner<C> {
pub fn new(engine: Arc<DB>,
ch: RetryableSendCh<Msg, C>,
region_max_size: u64,
split_size: u64)
-> Runner<C> {
Runner {
engine: engine,
ch: ch,
region_max_size: region_max_size,
split_size: split_size,
}
}
}

impl Runnable<Task> for Runner {
impl<C: Sender<Msg>> Runnable<Task> for Runner<C> {
fn run(&mut self, task: Task) {
debug!("[region {}] executing task {} {}",
task.region_id,
Expand All @@ -169,8 +175,7 @@ impl Runnable<Task> for Runner {
let mut size = 0;
let mut split_key = vec![];
let timer = CHECK_SPILT_HISTOGRAM.start_timer();

let res = MergedIterator::new(task.engine.as_ref(),
let res = MergedIterator::new(self.engine.as_ref(),
LARGE_CFS,
&task.start_key,
&task.end_key,
Expand Down Expand Up @@ -223,3 +228,86 @@ fn new_split_check_result(region_id: u64, epoch: RegionEpoch, split_key: Vec<u8>
split_key: split_key,
}
}

#[cfg(test)]
mod tests {
use std::sync::mpsc::{self, TryRecvError};
use std::sync::Arc;

use tempdir::TempDir;
use rocksdb::Writable;
use kvproto::metapb::Peer;

use storage::ALL_CFS;
use util::rocksdb;
use super::*;

#[test]
fn test_split_check() {
let path = TempDir::new("test-raftstore").unwrap();
let engine = Arc::new(rocksdb::new_engine(path.path().to_str().unwrap(), ALL_CFS).unwrap());

let mut region = Region::new();
region.set_id(1);
region.set_start_key(vec![]);
region.set_end_key(vec![]);
region.mut_peers().push(Peer::new());
region.mut_region_epoch().set_version(2);
region.mut_region_epoch().set_conf_ver(5);

let (tx, rx) = mpsc::sync_channel(100);
let ch = RetryableSendCh::new(tx, "test-split");
let mut runnable = Runner::new(engine.clone(), ch, 100, 60);

// so split key will be z0006
for i in 0..7 {
let s = keys::data_key(format!("{:04}", i).as_bytes());
engine.put(&s, &s).unwrap();
}

runnable.run(Task::new(&region));
// size has not reached the max_size 100 yet.
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
others => panic!("expect recv empty, but got {:?}", others),
}

for i in 7..11 {
let s = keys::data_key(format!("{:04}", i).as_bytes());
engine.put(&s, &s).unwrap();
}

runnable.run(Task::new(&region));
match rx.try_recv() {
Ok(Msg::SplitCheckResult { region_id, epoch, split_key }) => {
assert_eq!(region_id, region.get_id());
assert_eq!(&epoch, region.get_region_epoch());
assert_eq!(split_key, keys::data_key(b"0006"));
}
others => panic!("expect split check result, but got {:?}", others),
}

// So split key will be z0003
for i in 0..6 {
let s = keys::data_key(format!("{:04}", i).as_bytes());
for cf in ALL_CFS {
let handle = engine.cf_handle(cf).unwrap();
engine.put_cf(handle, &s, &s).unwrap();
}
}

runnable.run(Task::new(&region));
match rx.try_recv() {
Ok(Msg::SplitCheckResult { region_id, epoch, split_key }) => {
assert_eq!(region_id, region.get_id());
assert_eq!(&epoch, region.get_region_epoch());
assert_eq!(split_key, keys::data_key(b"0003"));
}
others => panic!("expect split check result, but got {:?}", others),
}

drop(rx);
// It should be safe even the result can't be sent back.
runnable.run(Task::new(&region));
}
}
2 changes: 1 addition & 1 deletion src/util/transport.rs
Expand Up @@ -87,7 +87,7 @@ impl<T> Sender<T> for mpsc::SyncSender<T> {
}

/// A channel that handle error with retry automatically.
pub struct RetryableSendCh<T, C: Sender<T>> {
pub struct RetryableSendCh<T, C> {
ch: C,
name: &'static str,

Expand Down