Skip to content

Commit

Permalink
Merge branch 'master' into busyjay/conditional-replica
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Nov 7, 2016
2 parents 187c8a4 + 528fc94 commit fafbd39
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ run:
cargo run --features "${ENABLE_FEATURES}"

release:
cargo build --release --bin tikv-server --features "${ENABLE_FEATURES}"
cargo build --release --features "${ENABLE_FEATURES}"
@mkdir -p bin
cp -f ./target/release/tikv-server ./bin
cp -f ./target/release/tikv-ctl ./target/release/tikv-server ./bin

static_release:
ROCKSDB_SYS_STATIC=1 ROCKSDB_SYS_PORTABLE=1 make release
Expand Down
37 changes: 24 additions & 13 deletions src/bin/tikv-ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ fn main() {
.arg(Arg::with_name("index")
.short("i")
.takes_value(true)
.help("set the raft log index")))
.help("set the raft log index"))
.arg(Arg::with_name("key")
.short("k")
.takes_value(true)
.help("set the raw key")))
.subcommand(SubCommand::with_name("region")
.about("print region info")
.arg(Arg::with_name("region")
Expand Down Expand Up @@ -110,9 +114,15 @@ fn main() {
dump_raw_value(db, cf_name, key);
} else if let Some(matches) = matches.subcommand_matches("raft") {
if let Some(matches) = matches.subcommand_matches("log") {
let region = String::from(matches.value_of("region").unwrap());
let index = String::from(matches.value_of("index").unwrap());
dump_raft_log_entry(db, region, index);
let key = match matches.value_of("key") {
None => {
let region = String::from(matches.value_of("region").unwrap());
let index = String::from(matches.value_of("index").unwrap());
keys::raft_log_key(region.parse().unwrap(), index.parse().unwrap())
}
Some(k) => unescape(k),
};
dump_raft_log_entry(db, &key);
} else if let Some(matches) = matches.subcommand_matches("region") {
let region = String::from(matches.value_of("region").unwrap());
dump_region_info(db, region);
Expand Down Expand Up @@ -259,18 +269,18 @@ fn dump_raw_value(db: DB, cf: &str, key: String) {
println!("value: {}", value.map_or("None".to_owned(), |v| escape(&v)));
}

fn dump_raft_log_entry(db: DB, region_id_str: String, idx_str: String) {
let region_id = u64::from_str_radix(&region_id_str, 10).unwrap();
let idx = u64::from_str_radix(&idx_str, 10).unwrap();

let idx_key = keys::raft_log_key(region_id, idx);
println!("idx_key: {}", escape(&idx_key));
let mut ent: Entry = db.get_msg_cf(CF_RAFT, &idx_key).unwrap().unwrap();
fn dump_raft_log_entry(db: DB, idx_key: &[u8]) {
let (region_id, idx) = keys::decode_raft_log_key(idx_key).unwrap();
println!("idx_key: {}", escape(idx_key));
println!("region: {}", region_id);
println!("log index: {}", idx);
let mut ent: Entry = db.get_msg_cf(CF_RAFT, idx_key).unwrap().unwrap();
let data = ent.take_data();
println!("entry {:?}", ent);
let mut msg = RaftCmdRequest::new();
msg.merge_from_bytes(&data).unwrap();
println!("msg {:?}", msg);
println!("msg len: {}", data.len());
println!("{:?}", msg);
}

fn dump_region_info(db: DB, region_id_str: String) {
Expand Down Expand Up @@ -307,7 +317,8 @@ fn dump_range(db: DB, from: String, to: Option<String>, limit: Option<u64>, cf:
&to,
true,
&mut |k, v| {
println!("key: {}, value: {}", escape(k), escape(v));
println!("key: {}, value len: {}", escape(k), v.len());
println!("{}", escape(v));
cnt += 1;
Ok(cnt < limit)
})
Expand Down
4 changes: 2 additions & 2 deletions src/raftstore/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::time::Duration;
use raftstore::Result;

const RAFT_BASE_TICK_INTERVAL: u64 = 100;
const RAFT_HEARTBEAT_TICKS: usize = 3;
const RAFT_ELECTION_TIMEOUT_TICKS: usize = 15;
const RAFT_HEARTBEAT_TICKS: usize = 10;
const RAFT_ELECTION_TIMEOUT_TICKS: usize = 50;
const RAFT_MAX_SIZE_PER_MSG: u64 = 1024 * 1024;
const RAFT_MAX_INFLIGHT_MSGS: usize = 256;
const RAFT_LOG_GC_INTERVAL: u64 = 5000;
Expand Down
28 changes: 28 additions & 0 deletions src/raftstore/store/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ pub fn raft_log_index(key: &[u8]) -> Result<u64> {
Ok(BigEndian::read_u64(&key[expect_key_len - mem::size_of::<u64>()..]))
}

/// Get the region id and index from raft log key generated by `raft_log_key`.
pub fn decode_raft_log_key(key: &[u8]) -> Result<(u64, u64)> {
let suffix_idx = REGION_RAFT_PREFIX_KEY.len() + mem::size_of::<u64>();
let expect_key_len = suffix_idx + mem::size_of::<u8>() + mem::size_of::<u64>();
if key.len() != expect_key_len || !key.starts_with(REGION_RAFT_PREFIX_KEY) ||
key[suffix_idx] != RAFT_LOG_SUFFIX {
return Err(box_err!("key {} is not a valid raft log key", escape(key)));
}
let region_id = BigEndian::read_u64(&key[REGION_RAFT_PREFIX_KEY.len()..suffix_idx]);
let index = BigEndian::read_u64(&key[suffix_idx + mem::size_of::<u8>()..]);
Ok((region_id, index))
}

pub fn raft_log_prefix(region_id: u64) -> Vec<u8> {
make_region_id_key(region_id, RAFT_LOG_SUFFIX, 0)
}
Expand Down Expand Up @@ -191,6 +204,7 @@ pub fn data_end_key(region_end_key: &[u8]) -> Vec<u8> {
#[cfg(test)]
mod tests {
use super::*;
use byteorder::{BigEndian, WriteBytesExt};
use kvproto::metapb::{Region, Peer};
use std::cmp::Ordering;

Expand Down Expand Up @@ -268,8 +282,22 @@ mod tests {
for idx_id in 1..10 {
let key = raft_log_key(region_id, idx_id);
assert_eq!(idx_id, raft_log_index(&key).unwrap());
assert_eq!((region_id, idx_id), decode_raft_log_key(&key).unwrap());
}
}

let mut state_key = raft_state_key(1);
// invalid length
assert!(decode_raft_log_key(&state_key).is_err());

state_key.write_u64::<BigEndian>(2).unwrap();
// invalid suffix
assert!(decode_raft_log_key(&state_key).is_err());

let mut region_state_key = region_state_key(1);
region_state_key.write_u64::<BigEndian>(2).unwrap();
// invalid prefix
assert!(decode_raft_log_key(&region_state_key).is_err());
}

#[test]
Expand Down
13 changes: 12 additions & 1 deletion src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
};
}

fn store_heartbeat_pd(&self) {
fn store_heartbeat_pd(&mut self) {
let mut stats = StoreStats::new();
let disk_stat = match get_disk_stat(self.engine.path()) {
Ok(disk_stat) => disk_stat,
Expand Down Expand Up @@ -1378,6 +1378,17 @@ impl<T: Transport, C: PdClient> Store<T, C> {
STORE_SNAPSHOT_TRAFFIC_GAUGE_VEC.with_label_values(&["receiving"])
.set(snap_stats.receiving_count as f64);

let mut apply_snapshot_count = 0;
for peer in self.region_peers.values_mut() {
if peer.mut_store().check_applying_snap() {
apply_snapshot_count += 1;
}
}

stats.set_applying_snap_count(apply_snapshot_count as u32);
STORE_SNAPSHOT_TRAFFIC_GAUGE_VEC.with_label_values(&["applying"])
.set(apply_snapshot_count as f64);

stats.set_start_time(self.start_time.sec as u32);

if let Err(e) = self.pd_worker.schedule(PdTask::StoreHeartbeat { stats: stats }) {
Expand Down

0 comments on commit fafbd39

Please sign in to comment.