diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 230523ac557..4f521dd149d 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -851,7 +851,13 @@ where &mut kv_wb, ®ion, PeerState::Tombstone, - self.pending_merge_state.clone(), + // Only persist the `merge_state` if the merge is known to be succeeded + // which is determined by the `keep_data` flag + if keep_data { + self.pending_merge_state.clone() + } else { + None + }, )?; // write kv rocksdb first in case of restart happen between two write let mut write_opts = WriteOptions::new(); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 251946a1cda..1483fccde97 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -1108,6 +1108,28 @@ impl Cluster { .unwrap() } + pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) { + for _ in 0..100 { + let state = self + .get_engine(store_id) + .c() + .get_msg_cf::( + engine_traits::CF_RAFT, + &keys::region_state_key(region_id), + ) + .unwrap() + .unwrap(); + if state.get_state() == peer_state { + return; + } + sleep_ms(10); + } + panic!( + "[region {}] peer state still not reach {:?}", + region_id, peer_state + ); + } + pub fn wait_last_index( &mut self, region_id: u64, diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index f783a12a76c..a696c147ce2 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -1363,3 +1363,139 @@ fn test_prewrite_before_max_ts_is_synced() { let resp = do_prewrite(&mut cluster); assert!(!resp.get_region_error().has_max_timestamp_not_synced()); } + +/// If term is changed in catching up logs, follower needs to update the term +/// correctly, otherwise will leave corrupted states. +#[test] +fn test_merge_election_and_restart() { + let mut cluster = new_node_cluster(0, 3); + configure_for_merge(&mut cluster); + + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + let on_raft_gc_log_tick_fp = "on_raft_gc_log_tick"; + fail::cfg(on_raft_gc_log_tick_fp, "return()").unwrap(); + + cluster.run(); + + let region = pd_client.get_region(b"k1").unwrap(); + cluster.must_split(®ion, b"k2"); + + let r1 = pd_client.get_region(b"k1").unwrap(); + let r1_on_store1 = find_peer(&r1, 1).unwrap().to_owned(); + cluster.must_transfer_leader(r1.get_id(), r1_on_store1.clone()); + cluster.must_put(b"k11", b"v11"); + must_get_equal(&cluster.get_engine(2), b"k11", b"v11"); + + let r1_on_store2 = find_peer(&r1, 2).unwrap().to_owned(); + cluster.must_transfer_leader(r1.get_id(), r1_on_store2); + cluster.must_put(b"k12", b"v12"); + must_get_equal(&cluster.get_engine(1), b"k12", b"v12"); + + cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(r1.get_id(), 2))); + + // Wait new leader elected. + cluster.must_transfer_leader(r1.get_id(), r1_on_store1); + cluster.must_put(b"k13", b"v13"); + must_get_equal(&cluster.get_engine(1), b"k13", b"v13"); + must_get_none(&cluster.get_engine(2), b"k13"); + + // Don't actually execute commit merge + fail::cfg("after_handle_catch_up_logs_for_merge", "return()").unwrap(); + // Now region 1 can still be merged into region 2 because leader has committed index cache. + let r2 = pd_client.get_region(b"k3").unwrap(); + cluster.must_try_merge(r1.get_id(), r2.get_id()); + // r1 on store 2 should be able to apply all committed logs. + must_get_equal(&cluster.get_engine(2), b"k13", b"v13"); + + cluster.shutdown(); + cluster.clear_send_filters(); + fail::remove("after_handle_catch_up_logs_for_merge"); + cluster.start().unwrap(); + + // Wait for region elected to avoid timeout and backoff. + cluster.leader_of_region(r2.get_id()); + // If merge can be resumed correctly, the put should succeed. + cluster.must_put(b"k14", b"v14"); + // If logs from different term are process correctly, store 2 should have latest updates. + must_get_equal(&cluster.get_engine(2), b"k14", b"v14"); +} + +// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state` +// thus won't generate gc message to destroy other peers +#[test] +fn test_destroy_source_peer_while_merging() { + let mut cluster = new_node_cluster(0, 5); + configure_for_merge(&mut cluster); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + for i in 1..=5 { + must_get_equal(&cluster.get_engine(i), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(i), b"k3", b"v3"); + } + + cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2"); + let left = pd_client.get_region(b"k1").unwrap(); + let right = pd_client.get_region(b"k3").unwrap(); + cluster.must_transfer_leader(right.get_id(), new_peer(1, 1)); + + let schedule_merge_fp = "on_schedule_merge"; + fail::cfg(schedule_merge_fp, "return()").unwrap(); + + // Start merge and wait until peer 5 apply prepare merge + cluster.must_try_merge(right.get_id(), left.get_id()); + cluster.must_peer_state(right.get_id(), 5, PeerState::Merging); + + // filter heartbeat and append message for peer 5 + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(right.get_id(), 5) + .direction(Direction::Recv) + .msg_type(MessageType::MsgHeartbeat) + .msg_type(MessageType::MsgAppend), + )); + + // remove peer from target region to trigger merge rollback. + pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone()); + must_get_none(&cluster.get_engine(2), b"k1"); + + // Merge must rollbacked if we can put more data to the source region + fail::remove(schedule_merge_fp); + cluster.must_put(b"k4", b"v4"); + for i in 1..=4 { + must_get_equal(&cluster.get_engine(i), b"k4", b"v4"); + } + + // remove peer 5 from peer list so it will destroy itself by tombstone message + // and should not persist the `merge_state` + pd_client.must_remove_peer(right.get_id(), new_peer(5, 5)); + must_get_none(&cluster.get_engine(5), b"k3"); + + // so that other peers will send message to store 5 + pd_client.must_add_peer(right.get_id(), new_peer(5, 6)); + // but it is still in tombstone state due to the message filter + let state = cluster.region_local_state(right.get_id(), 5); + assert_eq!(state.get_state(), PeerState::Tombstone); + + // let the peer on store 4 have a larger peer id + pd_client.must_remove_peer(right.get_id(), new_peer(4, 4)); + pd_client.must_add_peer(right.get_id(), new_peer(4, 7)); + must_get_equal(&cluster.get_engine(4), b"k4", b"v4"); + + // if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because + // store 5 will response their request vote message with a gc message, and peer 7 will cause + // store 5 panic because peer 7 have larger peer id than the peer in the merge state + cluster.clear_send_filters(); + cluster.add_send_filter(IsolationFilterFactory::new(1)); + + cluster.must_put(b"k5", b"v5"); + assert!(!state.has_merge_state(), "{:?}", state); + for i in 2..=5 { + must_get_equal(&cluster.get_engine(i), b"k5", b"v5"); + } +}