Skip to content

Commit

Permalink
cherry pick tikv#12251 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: linning <linningde25@gmail.com>
  • Loading branch information
NingLin-P committed Mar 29, 2022
1 parent 5a6afd4 commit 5e00028
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 6 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use kvproto::errorpb::Error as PbError;
use kvproto::metapb::{self, Peer, RegionEpoch};
use kvproto::pdpb;
use kvproto::raft_cmdpb::*;
use kvproto::raft_serverpb::{RaftApplyState, RaftMessage, RaftTruncatedState};
use kvproto::raft_serverpb::{
PeerState, RaftApplyState, RaftMessage, RaftTruncatedState, RegionLocalState,
};
use tempdir::TempDir;

use engine::rocks;
Expand Down Expand Up @@ -953,6 +955,31 @@ impl<T: Simulator> Cluster<T> {
.unwrap()
}

pub fn region_local_state(&self, region_id: u64, store_id: u64) -> RegionLocalState {
self.get_engine(store_id)
.get_msg_cf::<RegionLocalState>(engine::CF_RAFT, &keys::region_state_key(region_id))
.unwrap()
.unwrap()
}

pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) {
for _ in 0..100 {
let state = self
.get_engine(store_id)
.get_msg_cf::<RegionLocalState>(engine::CF_RAFT, &keys::region_state_key(region_id))
.unwrap()
.unwrap();
if state.get_state() == peer_state {
return;
}
sleep_ms(10);
}
panic!(
"[region {}] peer state still not reach {:?}",
region_id, peer_state
);
}

/// Make sure region exists on that store.
pub fn must_region_exist(&mut self, region_id: u64, store_id: u64) {
let mut try_cnt = 0;
Expand Down
8 changes: 7 additions & 1 deletion src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,13 @@ impl Peer {
&kv_wb,
&region,
PeerState::Tombstone,
self.pending_merge_state.clone(),
// Only persist the `merge_state` if the merge is known to be succeeded
// which is determined by the `keep_data` flag
if keep_data {
self.pending_merge_state.clone()
} else {
None
},
)?;
// write kv rocksdb first in case of restart happen between two write
let mut write_opts = WriteOptions::new();
Expand Down
78 changes: 78 additions & 0 deletions tests/integrations/raftstore/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,3 +1104,81 @@ fn test_merge_remove_target_peer_isolated() {
must_get_none(&cluster.get_engine(3), format!("k{}", i).as_bytes());
}
}

// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state`
// thus won't generate gc message to destroy other peers
#[test]
fn test_destroy_source_peer_while_merging() {
let mut cluster = new_node_cluster(0, 5);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");
for i in 1..=5 {
must_get_equal(&cluster.get_engine(i), b"k1", b"v1");
must_get_equal(&cluster.get_engine(i), b"k3", b"v3");
}

cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();
cluster.must_transfer_leader(right.get_id(), new_peer(1, 1));

let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

// Start merge and wait until peer 5 apply prepare merge
cluster.try_merge(right.get_id(), left.get_id());
cluster.must_peer_state(right.get_id(), 5, PeerState::Merging);

// filter heartbeat and append message for peer 5
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 5)
.direction(Direction::Recv)
.msg_type(MessageType::MsgHeartbeat)
.msg_type(MessageType::MsgAppend),
));

// remove peer from target region to trigger merge rollback.
pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone());
must_get_none(&cluster.get_engine(2), b"k1");

// Merge must rollbacked if we can put more data to the source region
fail::remove(schedule_merge_fp);
cluster.must_put(b"k4", b"v4");
for i in 1..=4 {
must_get_equal(&cluster.get_engine(i), b"k4", b"v4");
}

// remove peer 5 from peer list so it will destroy itself by tombstone message
// and should not persist the `merge_state`
pd_client.must_remove_peer(right.get_id(), new_peer(5, 5));
must_get_none(&cluster.get_engine(5), b"k3");

// so that other peers will send message to store 5
pd_client.must_add_peer(right.get_id(), new_peer(5, 6));
// but it is still in tombstone state due to the message filter
let state = cluster.region_local_state(right.get_id(), 5);
assert_eq!(state.get_state(), PeerState::Tombstone);

// let the peer on store 4 have a larger peer id
pd_client.must_remove_peer(right.get_id(), new_peer(4, 4));
pd_client.must_add_peer(right.get_id(), new_peer(4, 7));
must_get_equal(&cluster.get_engine(4), b"k4", b"v4");

// if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because
// store 5 will response their request vote message with a gc message, and peer 7 will cause
// store 5 panic because peer 7 have larger peer id than the peer in the merge state
cluster.clear_send_filters();
cluster.add_send_filter(IsolationFilterFactory::new(1));

cluster.must_put(b"k5", b"v5");
assert!(!state.has_merge_state(), "{:?}", state);
for i in 2..=5 {
must_get_equal(&cluster.get_engine(i), b"k5", b"v5");
}
}

0 comments on commit 5e00028

Please sign in to comment.