Skip to content

Commit

Permalink
merge checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Nov 9, 2016
1 parent 9fbeb68 commit bd3df73
Show file tree
Hide file tree
Showing 14 changed files with 656 additions and 42 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ lazy_static = "0.2.1"
signal = "0.2"
backtrace = "0.2.3"
clap = "2"
sha1 = "0.2"

# The getopts in crate.io is outdated, use the latest getopts instead.
[dependencies.getopts]
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ extern crate prometheus;
#[macro_use]
extern crate lazy_static;
extern crate backtrace;
extern crate sha1;

#[macro_use]
pub mod util;
Expand Down
5 changes: 5 additions & 0 deletions src/raftstore/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS: u64 = 60 * 10; // 10 min
// a peer should consider itself as a stale peer that is out of region.
const DEFAULT_MAX_LEADER_MISSING_SECS: u64 = 2 * 60 * 60;
const DEFAULT_SNAPSHOT_APPLY_BATCH_SIZE: usize = 1024 * 1024 * 10; // 10m
const DEFAULT_CONSISTENCY_CHECK_INTERVAL_SECS: u64 = 60 * 20; // 20 min

#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -99,6 +100,9 @@ pub struct Config {
pub max_leader_missing_duration: Duration,

pub snap_apply_batch_size: usize,

// Interval (s) to check region whether the data are consistent.
pub consistency_check_tick_interval: u64,
}

impl Default for Config {
Expand Down Expand Up @@ -129,6 +133,7 @@ impl Default for Config {
max_leader_missing_duration: Duration::from_secs(DEFAULT_MAX_LEADER_MISSING_SECS),
snap_apply_batch_size: DEFAULT_SNAPSHOT_APPLY_BATCH_SIZE,
lock_cf_compact_interval_secs: DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS,
consistency_check_tick_interval: DEFAULT_CONSISTENCY_CHECK_INTERVAL_SECS,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/raftstore/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use std::option::Option;
use std::sync::Arc;
use std::fmt::{self, Debug, Formatter};

use rocksdb::{DB, Writable, DBIterator, DBVector, WriteBatch, ReadOptions, CFHandle};
use rocksdb::rocksdb_options::UnsafeSnap;
Expand Down Expand Up @@ -59,6 +60,12 @@ impl Drop for Snapshot {
}
}

impl Debug for Snapshot {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
write!(fmt, "Engine Snapshot Impl")
}
}

// TODO: refactor this trait into rocksdb trait.
pub trait Peekable {
fn get_value(&self, key: &[u8]) -> Result<Option<DBVector>>;
Expand Down
16 changes: 16 additions & 0 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kvproto::raft_serverpb::RaftMessage;
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse};
use kvproto::metapb::RegionEpoch;
use raft::SnapshotStatus;
use util::escape;

pub type Callback = Box<FnBox(RaftCmdResponse) + Send>;

Expand All @@ -32,6 +33,7 @@ pub enum Tick {
PdStoreHeartbeat,
SnapGc,
CompactLockCf,
ConsistencyCheck,
}

pub enum Msg {
Expand Down Expand Up @@ -65,6 +67,13 @@ pub enum Msg {
region_id: u64,
snap: Option<Snapshot>,
},

// For consistency check
ComputeHashResult {
region_id: u64,
index: u64,
hash: Vec<u8>,
},
}

impl fmt::Debug for Msg {
Expand Down Expand Up @@ -94,6 +103,13 @@ impl fmt::Debug for Msg {
region_id,
snap.is_some())
}
Msg::ComputeHashResult { region_id, index, ref hash } => {
write!(fmt,
"ComputeHashResult [region_id: {}, index: {}, hash: {}]",
region_id,
index,
escape(&hash))
}
}
}
}
Expand Down
47 changes: 45 additions & 2 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub enum ExecResult {
},
CompactLog { state: RaftTruncatedState },
SplitRegion { regions: Vec<metapb::Region> },
ComputeHash {
region: metapb::Region,
index: u64,
snap: Snapshot,
},
VerifyHash { index: u64, hash: Vec<u8> },
}

// When we apply commands in handing ready, we should also need a way to
Expand Down Expand Up @@ -179,6 +185,8 @@ pub struct Peer {
/// delete keys' count since last reset.
pub delete_keys_hint: u64,

pub last_consistency_check_time: Instant,

leader_missing_time: Option<Instant>,

pub tag: String,
Expand Down Expand Up @@ -270,6 +278,7 @@ impl Peer {
leader_missing_time: Some(Instant::now()),
tag: tag,
last_compacted_idx: 0,
last_consistency_check_time: Instant::now(),
};

peer.load_all_coprocessors();
Expand Down Expand Up @@ -737,7 +746,9 @@ impl Peer {
if req.has_admin_request() {
match req.get_admin_request().get_cmd_type() {
AdminCmdType::CompactLog |
AdminCmdType::InvalidAdmin => {}
AdminCmdType::InvalidAdmin |
AdminCmdType::ComputeHash |
AdminCmdType::VerifyHash => {}
AdminCmdType::Split => check_ver = true,
AdminCmdType::ChangePeer => check_conf_ver = true,
AdminCmdType::TransferLeader => {
Expand Down Expand Up @@ -1069,7 +1080,9 @@ impl Peer {
ExecResult::ChangePeer { ref region, .. } => {
storage.region = region.clone();
}
ExecResult::CompactLog { .. } => {}
ExecResult::CompactLog { .. } |
ExecResult::ComputeHash { .. } |
ExecResult::VerifyHash { .. } => {}
ExecResult::SplitRegion { ref regions, .. } => {
storage.region = regions.last().unwrap().clone();
}
Expand Down Expand Up @@ -1184,6 +1197,8 @@ impl Peer {
AdminCmdType::Split => self.exec_split(ctx, request),
AdminCmdType::CompactLog => self.exec_compact_log(ctx, request),
AdminCmdType::TransferLeader => Err(box_err!("transfer leader won't exec")),
AdminCmdType::ComputeHash => self.exec_compute_hash(ctx, request),
AdminCmdType::VerifyHash => self.exec_verify_hash(ctx, request),
AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")),
});
response.set_cmd_type(cmd_type);
Expand Down Expand Up @@ -1407,6 +1422,34 @@ impl Peer {
Some(ExecResult::CompactLog { state: ctx.apply_state.get_truncated_state().clone() })))
}

fn exec_compute_hash(&self,
ctx: &ExecContext,
_: &AdminRequest)
-> Result<(AdminResponse, Option<ExecResult>)> {
let resp = AdminResponse::new();
Ok((resp,
Some(ExecResult::ComputeHash {
region: self.region().clone(),
index: ctx.index,
snap: Snapshot::new(self.engine.clone()),
})))
}

fn exec_verify_hash(&self,
_: &ExecContext,
req: &AdminRequest)
-> Result<(AdminResponse, Option<ExecResult>)> {
let verify_req = req.get_verify_hash();
let index = verify_req.get_index();
let hash = verify_req.get_hash().to_vec();
let resp = AdminResponse::new();
Ok((resp,
Some(ExecResult::VerifyHash {
index: index,
hash: hash,
})))
}

fn exec_write_cmd(&mut self, ctx: &ExecContext) -> Result<RaftCmdResponse> {
let requests = ctx.req.get_requests();
let mut responses = Vec::with_capacity(requests.len());
Expand Down
10 changes: 1 addition & 9 deletions src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,9 +949,8 @@ mod test {
use raft::{StorageError, Error as RaftError};
use tempdir::*;
use protobuf;
use raftstore;
use raftstore::store::{Msg, bootstrap, new_snap_mgr, SnapKey};
use raftstore::store::worker::{RegionRunner, MsgSender};
use raftstore::store::worker::RegionRunner;
use util::codec::number::NumberEncoder;
use raftstore::store::worker::RegionTask;
use util::worker::{Worker, Scheduler};
Expand All @@ -962,13 +961,6 @@ mod test {

use super::*;

impl MsgSender for Sender<Msg> {
fn send(&self, msg: Msg) -> raftstore::Result<()> {
Sender::send(self, msg).unwrap();
Ok(())
}
}

fn new_storage(sched: Scheduler<RegionTask>, path: &TempDir) -> PeerStorage {
let db = new_engine(path.path().to_str().unwrap(), &[CF_DEFAULT, CF_RAFT]).unwrap();
let db = Arc::new(db);
Expand Down

0 comments on commit bd3df73

Please sign in to comment.