Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: only persist merge target if the merge is known to be succeeded (#12251) #12293

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,13 @@ where
&mut kv_wb,
&region,
PeerState::Tombstone,
self.pending_merge_state.clone(),
// Only persist the `merge_state` if the merge is known to be succeeded
// which is determined by the `keep_data` flag
if keep_data {
self.pending_merge_state.clone()
} else {
None
},
)?;
// write kv rocksdb first in case of restart happen between two write
let mut write_opts = WriteOptions::new();
Expand Down
22 changes: 22 additions & 0 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,28 @@ impl<T: Simulator> Cluster<T> {
.unwrap()
}

pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) {
for _ in 0..100 {
let state = self
.get_engine(store_id)
.c()
.get_msg_cf::<RegionLocalState>(
engine_traits::CF_RAFT,
&keys::region_state_key(region_id),
)
.unwrap()
.unwrap();
if state.get_state() == peer_state {
return;
}
sleep_ms(10);
}
panic!(
"[region {}] peer state still not reach {:?}",
region_id, peer_state
);
}

pub fn wait_last_index(
&mut self,
region_id: u64,
Expand Down
136 changes: 136 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1363,3 +1363,139 @@ fn test_prewrite_before_max_ts_is_synced() {
let resp = do_prewrite(&mut cluster);
assert!(!resp.get_region_error().has_max_timestamp_not_synced());
}

/// If term is changed in catching up logs, follower needs to update the term
/// correctly, otherwise will leave corrupted states.
#[test]
fn test_merge_election_and_restart() {
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);

let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let on_raft_gc_log_tick_fp = "on_raft_gc_log_tick";
fail::cfg(on_raft_gc_log_tick_fp, "return()").unwrap();

cluster.run();

let region = pd_client.get_region(b"k1").unwrap();
cluster.must_split(&region, b"k2");

let r1 = pd_client.get_region(b"k1").unwrap();
let r1_on_store1 = find_peer(&r1, 1).unwrap().to_owned();
cluster.must_transfer_leader(r1.get_id(), r1_on_store1.clone());
cluster.must_put(b"k11", b"v11");
must_get_equal(&cluster.get_engine(2), b"k11", b"v11");

let r1_on_store2 = find_peer(&r1, 2).unwrap().to_owned();
cluster.must_transfer_leader(r1.get_id(), r1_on_store2);
cluster.must_put(b"k12", b"v12");
must_get_equal(&cluster.get_engine(1), b"k12", b"v12");

cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(r1.get_id(), 2)));

// Wait new leader elected.
cluster.must_transfer_leader(r1.get_id(), r1_on_store1);
cluster.must_put(b"k13", b"v13");
must_get_equal(&cluster.get_engine(1), b"k13", b"v13");
must_get_none(&cluster.get_engine(2), b"k13");

// Don't actually execute commit merge
fail::cfg("after_handle_catch_up_logs_for_merge", "return()").unwrap();
// Now region 1 can still be merged into region 2 because leader has committed index cache.
let r2 = pd_client.get_region(b"k3").unwrap();
cluster.must_try_merge(r1.get_id(), r2.get_id());
// r1 on store 2 should be able to apply all committed logs.
must_get_equal(&cluster.get_engine(2), b"k13", b"v13");

cluster.shutdown();
cluster.clear_send_filters();
fail::remove("after_handle_catch_up_logs_for_merge");
cluster.start().unwrap();

// Wait for region elected to avoid timeout and backoff.
cluster.leader_of_region(r2.get_id());
// If merge can be resumed correctly, the put should succeed.
cluster.must_put(b"k14", b"v14");
// If logs from different term are process correctly, store 2 should have latest updates.
must_get_equal(&cluster.get_engine(2), b"k14", b"v14");
}

// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state`
// thus won't generate gc message to destroy other peers
#[test]
fn test_destroy_source_peer_while_merging() {
let mut cluster = new_node_cluster(0, 5);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");
for i in 1..=5 {
must_get_equal(&cluster.get_engine(i), b"k1", b"v1");
must_get_equal(&cluster.get_engine(i), b"k3", b"v3");
}

cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();
cluster.must_transfer_leader(right.get_id(), new_peer(1, 1));

let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

// Start merge and wait until peer 5 apply prepare merge
cluster.must_try_merge(right.get_id(), left.get_id());
cluster.must_peer_state(right.get_id(), 5, PeerState::Merging);

// filter heartbeat and append message for peer 5
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 5)
.direction(Direction::Recv)
.msg_type(MessageType::MsgHeartbeat)
.msg_type(MessageType::MsgAppend),
));

// remove peer from target region to trigger merge rollback.
pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone());
must_get_none(&cluster.get_engine(2), b"k1");

// Merge must rollbacked if we can put more data to the source region
fail::remove(schedule_merge_fp);
cluster.must_put(b"k4", b"v4");
for i in 1..=4 {
must_get_equal(&cluster.get_engine(i), b"k4", b"v4");
}

// remove peer 5 from peer list so it will destroy itself by tombstone message
// and should not persist the `merge_state`
pd_client.must_remove_peer(right.get_id(), new_peer(5, 5));
must_get_none(&cluster.get_engine(5), b"k3");

// so that other peers will send message to store 5
pd_client.must_add_peer(right.get_id(), new_peer(5, 6));
// but it is still in tombstone state due to the message filter
let state = cluster.region_local_state(right.get_id(), 5);
assert_eq!(state.get_state(), PeerState::Tombstone);

// let the peer on store 4 have a larger peer id
pd_client.must_remove_peer(right.get_id(), new_peer(4, 4));
pd_client.must_add_peer(right.get_id(), new_peer(4, 7));
must_get_equal(&cluster.get_engine(4), b"k4", b"v4");

// if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because
// store 5 will response their request vote message with a gc message, and peer 7 will cause
// store 5 panic because peer 7 have larger peer id than the peer in the merge state
cluster.clear_send_filters();
cluster.add_send_filter(IsolationFilterFactory::new(1));

cluster.must_put(b"k5", b"v5");
assert!(!state.has_merge_state(), "{:?}", state);
for i in 2..=5 {
must_get_equal(&cluster.get_engine(i), b"k5", b"v5");
}
}