Skip to content

Commit

Permalink
raftstore: check stale region with region id
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Oct 18, 2016
1 parent f633c17 commit 058e25b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 72 deletions.
7 changes: 1 addition & 6 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ const TRANSFER_LEADER_ALLOW_LOG_LAG: u64 = 10;
pub enum StaleState {
Valid,
ToValidate,
Stale,
}

pub struct PendingCmd {
Expand Down Expand Up @@ -454,11 +453,7 @@ impl Peer {
// Resets the `leader_missing_time` to avoid sending the same tasks to
// PD worker continuously during the leader missing timeout.
self.leader_missing_time = None;
if self.is_initialized() {
return StaleState::ToValidate;
} else {
return StaleState::Stale;
}
return StaleState::ToValidate;
}
StaleState::Valid
}
Expand Down
22 changes: 1 addition & 21 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ impl<T: Transport, C: PdClient> Store<T, C> {

fn on_raft_base_tick(&mut self, event_loop: &mut EventLoop<Self>) {
let t = Instant::now();
let mut region_to_be_destroyed = vec![];
for (&region_id, peer) in &mut self.region_peers {
if !peer.get_store().is_applying() {
peer.raft_group.tick();
Expand All @@ -433,7 +432,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
// it should consider itself as a stale peer which is removed from
// the original cluster.
// This most likely happens in the following scenario:
// 1. At first, there are three peer A, B, C in the cluster, and A is leader.
// At first, there are three peer A, B, C in the cluster, and A is leader.
// Peer B gets down. And then A adds D, E, F into the cluster.
// Peer D becomes leader of the new cluster, and then removes peer A, B, C.
// After all these peer in and out, now the cluster has peer D, E, F.
Expand All @@ -444,12 +443,6 @@ impl<T: Transport, C: PdClient> Store<T, C> {
// In this case, peer B would notice that the leader is missing for a long time,
// and it would check with pd to confirm whether it's still a member of the cluster.
// If not, it destroys itself as a stale peer which is removed out already.
// 2. A peer, B is initialized as a replicated peer without data after
// receiving a single raft AE message. But then it goes through some process like 1,
// it's removed out of the region and wouldn't be contacted anymore.
// In this case, peer B would notice that the leader is missing for a long time,
// and it's an uninitialized peer without any data. It would destroy itself as
// a stale peer directly.
match peer.check_stale_state(self.cfg.max_leader_missing_duration) {
StaleState::Valid => {
self.pending_raft_groups.insert(region_id);
Expand All @@ -469,23 +462,10 @@ impl<T: Transport, C: PdClient> Store<T, C> {

self.pending_raft_groups.insert(region_id);
}
StaleState::Stale => {
info!("{} detects peer stale, to be destroyed", peer.tag);
// for peer B in case 2 above
// directly destroy peer without data since it doesn't have region range,
// so that it doesn't have the correct region start_key to
// validate peer with PD
region_to_be_destroyed.push((region_id, peer.peer.clone()));
}
}
}
}

// do perform the peer destroy
for (region_id, peer) in region_to_be_destroyed {
self.destroy_peer(region_id, peer);
}

PEER_RAFT_PROCESS_NANOS_COUNTER_VEC.with_label_values(&["tick"])
.inc_by(duration_to_nanos(t.elapsed()) as f64)
.unwrap();
Expand Down
33 changes: 3 additions & 30 deletions src/raftstore/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use util::transport::SendCh;
use pd::PdClient;
use raftstore::store::Msg;
use raftstore::Result;
use raftstore::store::util::{check_key_in_region, is_epoch_stale};
use raftstore::store::util::is_epoch_stale;

use super::metrics::*;

Expand Down Expand Up @@ -221,35 +221,9 @@ impl<T: PdClient> Runner<T> {

fn handle_validate_peer(&self, local_region: metapb::Region, peer: metapb::Peer) {
PD_REQ_COUNTER_VEC.with_label_values(&["get region", "all"]).inc();
match self.pd_client.get_region(local_region.get_start_key()) {
match self.pd_client.get_region_by_id(local_region.get_id()) {
Ok(pd_region) => {
PD_REQ_COUNTER_VEC.with_label_values(&["get region", "success"]).inc();
if let Err(_) = check_key_in_region(local_region.get_start_key(), &pd_region) {
// The region [start_key, ...) is missing in pd currently. It's probably
// that a pending region split is happenning right now and that region
// doesn't report it's heartbeat(with updated region info) yet.
// We should sit tight and try another get_region task later.
warn!("[region {}] {} fails to get region info from pd with start key: {:?}, \
retry later",
local_region.get_id(),
peer.get_id(),
local_region.get_start_key());
PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["region info missing"]).inc();
return;
}
if pd_region.get_id() != local_region.get_id() {
// The region range is covered by another region(different region id).
// Local peer must be obsolete.
info!("[region {}] {} the region has change its id to {}, this peer must be \
stale and destroyed later",
local_region.get_id(),
peer.get_id(),
pd_region.get_id());
PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["region id changed"]).inc();
self.send_destroy_peer_message(local_region, peer, pd_region);
return;
}

if is_epoch_stale(pd_region.get_region_epoch(),
local_region.get_region_epoch()) {
// The local region epoch is fresher than region epoch in PD
Expand All @@ -266,8 +240,7 @@ impl<T: PdClient> Runner<T> {
return;
}

let valid = pd_region.get_peers().into_iter().any(|p| p.to_owned() == peer);
if !valid {
if pd_region.get_peers().into_iter().all(|p| p != &peer) {
// Peer is not a member of this region anymore. Probably it's removed out.
// Send it a raft massage to destroy it since it's obsolete.
info!("[region {}] {} is not a valid member of region {:?}. To be destroyed \
Expand Down
33 changes: 18 additions & 15 deletions tests/raftstore/test_stale_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,56 +137,59 @@ fn test_stale_peer_without_data<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");
let region = cluster.get_region(b"");
cluster.must_split(&region, b"k2");
pd_client.must_add_peer(r1, new_peer(2, 2));
cluster.must_split(&region, b"k2");
pd_client.must_add_peer(r1, new_peer(3, 3));

let engine2 = cluster.get_engine(2);
must_get_equal(&engine2, b"k1", b"v1");
must_get_none(&engine2, b"k3");
let engine3 = cluster.get_engine(3);
must_get_equal(&engine3, b"k1", b"v1");
must_get_none(&engine3, b"k3");

let new_region = cluster.get_region(b"k3");
let new_region_id = new_region.get_id();
// Block peer (new_region_id, 2) at receiving snapshot, but not the heartbeat
cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(new_region_id, 2)
// Block peer (new_region_id, 4) at receiving snapshot, but not the heartbeat
cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(new_region_id, 4)
.msg_type(MessageType::MsgSnapshot)));

pd_client.must_add_peer(new_region_id, new_peer(2, 3));
pd_client.must_add_peer(new_region_id, new_peer(3, 4));

// Wait for the heartbeat broadcasted from peer (1, 1) to peer (2, 2).
// Wait for the heartbeat broadcasted from peer (1, 1000) to peer (3, 4).
thread::sleep(Duration::from_millis(60));

// And then isolate peer (2, 2) from peer (1, 1).
cluster.add_send_filter(IsolationFilterFactory::new(2));
// And then isolate peer (3, 4) from peer (1, 1000).
cluster.add_send_filter(IsolationFilterFactory::new(3));

pd_client.must_remove_peer(new_region_id, new_peer(3, 4));

// Wait for max_leader_missing_duration to time out.
thread::sleep(max_leader_missing_duration);
// Sleep one more second to make sure there is enough time for the peer to be destroyed.
thread::sleep(Duration::from_secs(1));

// There must be no data on store 2 belongs to new region
must_get_none(&engine2, b"k3");
must_get_none(&engine3, b"k3");

// Check whether peer(2, 3) is destroyed.
// Before peer 3 is destroyed, a tombstone mark will be written into the engine.
// So we could check the tombstone mark to make sure peer 3 is destroyed.
let state_key = keys::region_state_key(new_region_id);
let state: RegionLocalState = engine2.get_msg(&state_key).unwrap().unwrap();
let state: RegionLocalState = engine3.get_msg(&state_key).unwrap().unwrap();
assert_eq!(state.get_state(), PeerState::Tombstone);

// other region should not be affected.
must_get_equal(&engine2, b"k1", b"v1");
must_get_equal(&engine3, b"k1", b"v1");
}

#[test]
fn test_node_stale_peer_without_data() {
let count = 2;
let count = 3;
let mut cluster = new_node_cluster(0, count);
test_stale_peer_without_data(&mut cluster);
}

#[test]
fn test_server_stale_peer_without_data() {
let count = 2;
let count = 3;
let mut cluster = new_server_cluster(0, count);
test_stale_peer_without_data(&mut cluster);
}

0 comments on commit 058e25b

Please sign in to comment.