Skip to content

Commit

Permalink
add test case for the gc of uninitialized stale peer
Browse files Browse the repository at this point in the history
  • Loading branch information
hhkbp2 committed Sep 4, 2016
1 parent a8d655b commit dda06dc
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 52 deletions.
8 changes: 7 additions & 1 deletion src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,13 @@ impl Peer {
self.leader_missing_time = Some(Instant::now())
}
} else {
if self.leader_missing_time.is_some() {
// When a peer is not initialized, it has no data at storage.
// Even if a leader sends heartbeats to it, we consider it as
// in the `leader missing` state. That is because if it's isolated from the leader
// before it could successfully receive snapshot from the leader and
// apply that snapshot, no raft ready event will be triggered,
// so that we could not detect the leader is missing for it at here.
if self.is_initialized() && self.leader_missing_time.is_some() {
self.leader_missing_time = None
}
}
Expand Down
19 changes: 11 additions & 8 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,23 @@ impl<T: Transport, C: PdClient> Store<T, C> {
// it should consider itself as a stale peer which is removed from
// the original cluster.
// This most likely happens in the following scenario:
// 1. At first, there are three node A, B, C in the cluster, and A is leader.
// Node B gets down. And then A adds D, E, F into the cluster.
// Node D becomes leader of the new cluster, and then removes node A, B, C.
// After all these node in and out, now the cluster has node D, E, F.
// If node B goes up at this moment, it still thinks it is one of the cluster
// 1. At first, there are three peer A, B, C in the cluster, and A is leader.
// Peer B gets down. And then A adds D, E, F into the cluster.
// Peer D becomes leader of the new cluster, and then removes peer A, B, C.
// After all these peer in and out, now the cluster has peer D, E, F.
// If peer B goes up at this moment, it still thinks it is one of the cluster
// and has peers A, C. However, it could not reach A, C since they are removed
// from the cluster or probably destroyed.
// Meantime, D, E, F would not reach B, since it's not in the cluster anymore.
// In this case, peer B would notice that the leader is missing for a long time,
// and it would check with pd to confirm whether it's still a member of the cluster.
// If not, it destroys itself as a stale peer which is removed out already.
// 2. A peer, B is initialized as a replicated peer without data after
// receiving a single raft AE message. But then it goes through some process like 1,
// it's removed out of the region and wouldn't be contacted anymore.
// In both cases, Node B would notice that the leader is missing for a long time,
// and it would check with pd to confirm whether it's still a member of the cluster.
// If not, it destroys itself as a stale peer which is removed out already.
// In this case, peer B would notice that the leader is missing for a long time,
// and it's an uninitialized peer without any data. It would destroy itself as
// a stale peer directly.
let duration = peer.since_leader_missing();
if duration >= self.cfg.max_leader_missing_duration {
info!("{} detects leader missing for a long time. To check with pd whether \
Expand Down
25 changes: 25 additions & 0 deletions src/raftstore/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@ mod tests {

use super::*;

// Tests the util function `check_key_in_region`.
#[test]
fn test_check_key_in_region() {
let test_cases = vec![// key, region start_key, region end_key, result(Ok: true, Err: false)
("", "", "", true),
("", "", "6", true),
("", "3", "6", false),
("4", "3", "6", true),
("4", "3", "", true),
("2", "3", "6", false),
("", "3", "6", false),
("", "3", "", false)];
for c in &test_cases {
let key = c.0.as_bytes();
let mut region = metapb::Region::new();
region.set_start_key(c.1.as_bytes().to_vec());
region.set_end_key(c.2.as_bytes().to_vec());
let result = check_key_in_region(key, &region);
match result {
Ok(_) => assert_eq!(c.3, true),
Err(_) => assert_eq!(c.3, false),
}
}
}

#[test]
fn test_peer() {
let mut region = metapb::Region::new();
Expand Down
70 changes: 41 additions & 29 deletions src/raftstore/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use util::transport::SendCh;
use pd::PdClient;
use raftstore::store::Msg;
use raftstore::Result;
use raftstore::store::util::check_key_in_region;

// Use an asynchronous thread to tell pd something.
pub enum Task {
Expand Down Expand Up @@ -183,19 +184,34 @@ impl<T: PdClient> Runner<T> {
}
}

// send a raft message to destroy the specified stale peer
fn send_destroy_peer_message(&self,
local_region: metapb::Region,
peer: metapb::Peer,
pd_region: metapb::Region) {
info!("[region {}] {} is not a valid member of region {:?}. To be destroyed soon.",
local_region.get_id(),
peer.get_id(),
pd_region);
let mut message = RaftMessage::new();
message.set_region_id(local_region.get_id());
message.set_from_peer(peer.clone());
message.set_to_peer(peer.clone());
message.set_region_epoch(pd_region.get_region_epoch().clone());
message.set_is_tombstone(true);
if let Err(e) = self.ch.send(Msg::RaftMessage(message)) {
error!("send gc peer request to region {} err {:?}",
local_region.get_id(),
e)
}
}

fn handle_validate_peer(&self, local_region: metapb::Region, peer: metapb::Peer) {
metric_incr!("pd.get_region");
match self.pd_client.get_region(local_region.get_start_key()) {
Ok(mut pd_region) => {
Ok(pd_region) => {
metric_incr!("pd.get_region");
if pd_region == local_region {
// Local region info is as fresh as pd, which means
// local peer is still in the region, let it be there.
info!("peer {:?} is still valid because local region info is as fresh as pd",
peer);
return;
}
if !(pd_region.get_start_key() == local_region.get_start_key()) {
if let Err(_) = check_key_in_region(local_region.get_start_key(), &pd_region) {
// The region [start_key, ...) is missing in pd currently. It's probably
// that a pending region split is happenning right now and that region
// doesn't report it's heartbeat(with updated region info) yet.
Expand All @@ -205,33 +221,29 @@ impl<T: PdClient> Runner<T> {
local_region.get_id(),
peer.get_id(),
local_region.get_start_key());
warn!("xxx local region: {:?}", local_region);
warn!("xxx pd region: {:?}", pd_region);
return;
}
if pd_region.get_id() != local_region.get_id() {
// The region range is covered by another region(different region id).
// Local peer must be obsolete.
self.send_destroy_peer_message(local_region, peer, pd_region);
return;
}

let valid = pd_region.get_peers().into_iter().any(|p| p.to_owned() == peer);
if !valid {
// Peer is not a member of this region anymore. Probably it's removed out.
// Send it a raft massage to destroy it since it's obsolete.
info!("[region {}] {} is not a valid member of region {:?}. To be destroyed \
soon.",
local_region.get_id(),
peer.get_id(),
pd_region);
let mut message = RaftMessage::new();
message.set_region_id(local_region.get_id());
message.set_from_peer(peer.clone());
message.set_to_peer(peer.clone());
message.set_region_epoch(pd_region.take_region_epoch());
message.set_is_tombstone(true);
if let Err(e) = self.ch.send(Msg::RaftMessage(message)) {
error!("send gc peer request to region {} err {:?}",
local_region.get_id(),
e)
}
} else {
info!("peer {} is still valid in region {:?}",
peer.get_id(),
pd_region);
self.send_destroy_peer_message(local_region, peer, pd_region);
return;
}
info!("[region {}] {} is still valid in region {:?}",
local_region.get_id(),
peer.get_id(),
pd_region);

}
Err(e) => error!("get region failed {:?}", e),
}
Expand Down
99 changes: 85 additions & 14 deletions tests/raftstore/test_stale_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,30 @@
use std::time::Duration;
use std::thread;

use kvproto::eraftpb::MessageType;
use kvproto::raft_serverpb::{RegionLocalState, PeerState};
use tikv::raftstore::store::{keys, Peekable};

use super::cluster::{Cluster, Simulator};
use super::node::new_node_cluster;
use super::server::new_server_cluster;
use super::transport_simulate::Isolate;
use super::transport_simulate::{Isolate, IsolateRegionStore};
use super::util::*;

/// This test case tests the behaviour of the gc of stale peer which is out of region.
/// A helper function for testing the behaviour of the gc of stale peer
/// which is out of region.
/// If a peer detects the leader is missing for a specified long time,
/// it should consider itself as a stale peer which is removed from the region.
/// This most likely happens in the following scenario:
/// At first, there are three node A, B, C in the cluster, and A is leader.
/// Node B gets down. And then A adds D, E, F int the cluster.
/// Node D becomes leader of the new cluster, and then removes node A, B, C.
/// After all these node in and out, now the cluster has node D, E, F.
/// If node B goes up at this moment, it still thinks it is one of the cluster
/// This test case covers the following scenario:
/// At first, there are three peer A, B, C in the cluster, and A is leader.
/// Peer B gets down. And then A adds D, E, F int the cluster.
/// Peer D becomes leader of the new cluster, and then removes peer A, B, C.
/// After all these peer in and out, now the cluster has peer D, E, F.
/// If peer B goes up at this moment, it still thinks it is one of the cluster
/// and has peers A, C. However, it could not reach A, C since they are removed from
/// the cluster or probably destroyed.
/// Meantime, D, E, F would not reach B, Since it's not in the cluster anymore.
/// In this case, Node B would notice that the leader is missing for a long time,
/// In this case, Peer B would notice that the leader is missing for a long time,
/// and it would check with pd to confirm whether it's still a member of the cluster.
/// If not, it should destroy itself as a stale peer which is removed out already.
fn test_stale_peer_out_of_region<T: Simulator>(cluster: &mut Cluster<T>) {
Expand All @@ -44,10 +49,10 @@ fn test_stale_peer_out_of_region<T: Simulator>(cluster: &mut Cluster<T>) {
// so that this test case works without depending on the default value of them
cluster.cfg.raft_store.raft_heartbeat_ticks = 3;
cluster.cfg.raft_store.raft_election_timeout_ticks = 15;
// Use a value of twice the election timeout here just for test.
// Use a value of 3 seconds as max time here just for test.
// In production environment, the value of max_leader_missing_duration
// should be configured far beyond the election timeout.
let max_leader_missing_duration = Duration::from_secs(2);
let max_leader_missing_duration = Duration::from_secs(3);
cluster.cfg.raft_store.max_leader_missing_duration = max_leader_missing_duration;
let pd_client = cluster.pd_client.clone();
// Disable default max peer number check.
Expand Down Expand Up @@ -83,21 +88,87 @@ fn test_stale_peer_out_of_region<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.must_put(key2, value2);
assert_eq!(cluster.get(key2), Some(value2.to_vec()));

// check whether peer 2 and its data are destroyed
// check whether peer(2, 2) and its data are destroyed
must_get_none(&engine_2, key);
must_get_none(&engine_2, key2);
}

#[test]
fn test_node_stale_peer() {
fn test_node_stale_peer_out_of_region() {
let count = 6;
let mut cluster = new_node_cluster(0, count);
test_stale_peer_out_of_region(&mut cluster);
}

#[test]
fn test_server_stale_peer() {
fn test_server_stale_peer_out_of_region() {
let count = 6;
let mut cluster = new_server_cluster(0, count);
test_stale_peer_out_of_region(&mut cluster);
}

/// A help function for testing the behaviour of the gc of stale peer
/// which is out or region.
/// If a peer detects the leader is missing for a specified long time,
/// it should consider itself as a stale peer which is removed from the region.
/// This test case covers the following scenario:
/// A peer, B is initialized as a replicated peer without data after
/// receiving a single raft AE message. But then it goes through some process like
/// the case of `test_stale_peer_out_of_region`, it's removed out of the region
/// and wouldn't be contacted anymore.
/// In both cases, peer B would notice that the leader is missing for a long time,
/// and it's an initialized peer without any data. It would destroy itself as
/// as stale peer directly.
fn test_stale_peer_without_data<T: Simulator>(cluster: &mut Cluster<T>) {
// Revise raft base tick interval small to make this test case run fast.
cluster.cfg.raft_store.raft_base_tick_interval = 10;
// Specify the heartbeat and election timeout ticks here
// so that this test case works without depending on the default value of them
cluster.cfg.raft_store.raft_heartbeat_ticks = 3;
cluster.cfg.raft_store.raft_election_timeout_ticks = 15;
cluster.cfg.raft_store.pd_heartbeat_tick_interval = 5;
// Use a value of 3 seconds as max time here just for test.
// In production environment, the value of max_leader_missing_duration
// should be configured far beyond the election timeout.
let max_leader_missing_duration = Duration::from_secs(3);
cluster.cfg.raft_store.max_leader_missing_duration = max_leader_missing_duration;
let pd_client = cluster.pd_client.clone();
// Disable default max peer number check.
pd_client.disable_default_rule();

let r1 = cluster.run_conf_change();
// block peer (2, 2) at receiving snapshot, but not the heartbeat
cluster.add_send_filter(IsolateRegionStore::new(1, 2).msg_type(MessageType::MsgSnapshot));

pd_client.must_add_peer(r1, new_peer(2, 2));

// wait for the heartbeat broadcasted from peer (1, 1) to peer (2, 2)
thread::sleep(Duration::from_millis(60));

// and then isolate peer (2, 2) from peer (1, 1)
cluster.add_send_filter(Isolate::new(2));

// wait for max_leader_missing_duration to timeout
thread::sleep(max_leader_missing_duration);

// check whether peer(2, 2) is destroyed
// if it's destroyed, it will write tomstone into the engine.
let engine = cluster.get_engine(2);
let state_key = keys::region_state_key(1);
let state: RegionLocalState = engine.get_msg(&state_key).unwrap().unwrap();
assert_eq!(state.get_state(), PeerState::Tombstone);
}

#[test]
fn test_node_stale_peer_without_data() {
let count = 2;
let mut cluster = new_node_cluster(0, count);
test_stale_peer_without_data(&mut cluster);
}

#[test]
fn test_server_stale_peer_without_data() {
let count = 2;
let mut cluster = new_server_cluster(0, count);
test_stale_peer_without_data(&mut cluster);
}

0 comments on commit dda06dc

Please sign in to comment.