Skip to content

Commit

Permalink
raftstore: only persist merge target if the merge is known to be succ…
Browse files Browse the repository at this point in the history
…eeded (#12251) (#12295)

close #12232, ref #12251

raftstore: only persist merge target if the merge is known to be succeeded

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: linning <linningde25@gmail.com>

Co-authored-by: NingLin-P <linningde25@gmail.com>
Co-authored-by: linning <linningde25@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people committed Apr 24, 2022
1 parent 7871728 commit 60a9315
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 1 deletion.
8 changes: 7 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,13 @@ where
&mut kv_wb,
&region,
PeerState::Tombstone,
self.pending_merge_state.clone(),
// Only persist the `merge_state` if the merge is known to be succeeded
// which is determined by the `keep_data` flag
if keep_data {
self.pending_merge_state.clone()
} else {
None
},
)?;
// write kv rocksdb first in case of restart happen between two write
let mut write_opts = WriteOptions::new();
Expand Down
22 changes: 22 additions & 0 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,28 @@ impl<T: Simulator> Cluster<T> {
.unwrap()
}

pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) {
for _ in 0..100 {
let state = self
.get_engine(store_id)
.c()
.get_msg_cf::<RegionLocalState>(
engine_traits::CF_RAFT,
&keys::region_state_key(region_id),
)
.unwrap()
.unwrap();
if state.get_state() == peer_state {
return;
}
sleep_ms(10);
}
panic!(
"[region {}] peer state still not reach {:?}",
region_id, peer_state
);
}

pub fn wait_last_index(
&mut self,
region_id: u64,
Expand Down
78 changes: 78 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,3 +1302,81 @@ fn test_merge_election_and_restart() {
// If logs from different term are process correctly, store 2 should have latest updates.
must_get_equal(&cluster.get_engine(2), b"k14", b"v14");
}

// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state`
// thus won't generate gc message to destroy other peers
#[test]
fn test_destroy_source_peer_while_merging() {
let mut cluster = new_node_cluster(0, 5);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");
for i in 1..=5 {
must_get_equal(&cluster.get_engine(i), b"k1", b"v1");
must_get_equal(&cluster.get_engine(i), b"k3", b"v3");
}

cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();
cluster.must_transfer_leader(right.get_id(), new_peer(1, 1));

let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

// Start merge and wait until peer 5 apply prepare merge
cluster.must_try_merge(right.get_id(), left.get_id());
cluster.must_peer_state(right.get_id(), 5, PeerState::Merging);

// filter heartbeat and append message for peer 5
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 5)
.direction(Direction::Recv)
.msg_type(MessageType::MsgHeartbeat)
.msg_type(MessageType::MsgAppend),
));

// remove peer from target region to trigger merge rollback.
pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone());
must_get_none(&cluster.get_engine(2), b"k1");

// Merge must rollbacked if we can put more data to the source region
fail::remove(schedule_merge_fp);
cluster.must_put(b"k4", b"v4");
for i in 1..=4 {
must_get_equal(&cluster.get_engine(i), b"k4", b"v4");
}

// remove peer 5 from peer list so it will destroy itself by tombstone message
// and should not persist the `merge_state`
pd_client.must_remove_peer(right.get_id(), new_peer(5, 5));
must_get_none(&cluster.get_engine(5), b"k3");

// so that other peers will send message to store 5
pd_client.must_add_peer(right.get_id(), new_peer(5, 6));
// but it is still in tombstone state due to the message filter
let state = cluster.region_local_state(right.get_id(), 5);
assert_eq!(state.get_state(), PeerState::Tombstone);

// let the peer on store 4 have a larger peer id
pd_client.must_remove_peer(right.get_id(), new_peer(4, 4));
pd_client.must_add_peer(right.get_id(), new_peer(4, 7));
must_get_equal(&cluster.get_engine(4), b"k4", b"v4");

// if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because
// store 5 will response their request vote message with a gc message, and peer 7 will cause
// store 5 panic because peer 7 have larger peer id than the peer in the merge state
cluster.clear_send_filters();
cluster.add_send_filter(IsolationFilterFactory::new(1));

cluster.must_put(b"k5", b"v5");
assert!(!state.has_merge_state(), "{:?}", state);
for i in 2..=5 {
must_get_equal(&cluster.get_engine(i), b"k5", b"v5");
}
}

0 comments on commit 60a9315

Please sign in to comment.