Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: only persist merge target if the merge is known to be succeeded (#12251) #12296

Merged
merged 5 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,13 @@ where
&mut kv_wb,
&region,
PeerState::Tombstone,
self.pending_merge_state.clone(),
// Only persist the `merge_state` if the merge is known to be succeeded
// which is determined by the `keep_data` flag
if keep_data {
self.pending_merge_state.clone()
} else {
None
},
)?;
// write kv rocksdb first in case of restart happen between two write
let mut write_opts = WriteOptions::new();
Expand Down
22 changes: 22 additions & 0 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,28 @@ impl<T: Simulator> Cluster<T> {
.unwrap()
}

pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) {
for _ in 0..100 {
let state = self
.get_engine(store_id)
.c()
.get_msg_cf::<RegionLocalState>(
engine_traits::CF_RAFT,
&keys::region_state_key(region_id),
)
.unwrap()
.unwrap();
if state.get_state() == peer_state {
return;
}
sleep_ms(10);
}
panic!(
"[region {}] peer state still not reach {:?}",
region_id, peer_state
);
}

pub fn wait_last_index(
&mut self,
region_id: u64,
Expand Down
78 changes: 78 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,3 +1302,81 @@ fn test_merge_election_and_restart() {
// If logs from different term are process correctly, store 2 should have latest updates.
must_get_equal(&cluster.get_engine(2), b"k14", b"v14");
}

// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state`
// thus won't generate gc message to destroy other peers
#[test]
fn test_destroy_source_peer_while_merging() {
let mut cluster = new_node_cluster(0, 5);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");
for i in 1..=5 {
must_get_equal(&cluster.get_engine(i), b"k1", b"v1");
must_get_equal(&cluster.get_engine(i), b"k3", b"v3");
}

cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();
cluster.must_transfer_leader(right.get_id(), new_peer(1, 1));

let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

// Start merge and wait until peer 5 apply prepare merge
cluster.must_try_merge(right.get_id(), left.get_id());
cluster.must_peer_state(right.get_id(), 5, PeerState::Merging);

// filter heartbeat and append message for peer 5
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 5)
.direction(Direction::Recv)
.msg_type(MessageType::MsgHeartbeat)
.msg_type(MessageType::MsgAppend),
));

// remove peer from target region to trigger merge rollback.
pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone());
must_get_none(&cluster.get_engine(2), b"k1");

// Merge must rollbacked if we can put more data to the source region
fail::remove(schedule_merge_fp);
cluster.must_put(b"k4", b"v4");
for i in 1..=4 {
must_get_equal(&cluster.get_engine(i), b"k4", b"v4");
}

// remove peer 5 from peer list so it will destroy itself by tombstone message
// and should not persist the `merge_state`
pd_client.must_remove_peer(right.get_id(), new_peer(5, 5));
must_get_none(&cluster.get_engine(5), b"k3");

// so that other peers will send message to store 5
pd_client.must_add_peer(right.get_id(), new_peer(5, 6));
// but it is still in tombstone state due to the message filter
let state = cluster.region_local_state(right.get_id(), 5);
assert_eq!(state.get_state(), PeerState::Tombstone);

// let the peer on store 4 have a larger peer id
pd_client.must_remove_peer(right.get_id(), new_peer(4, 4));
pd_client.must_add_peer(right.get_id(), new_peer(4, 7));
must_get_equal(&cluster.get_engine(4), b"k4", b"v4");

// if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because
// store 5 will response their request vote message with a gc message, and peer 7 will cause
// store 5 panic because peer 7 have larger peer id than the peer in the merge state
cluster.clear_send_filters();
cluster.add_send_filter(IsolationFilterFactory::new(1));

cluster.must_put(b"k5", b"v5");
assert!(!state.has_merge_state(), "{:?}", state);
for i in 2..=5 {
must_get_equal(&cluster.get_engine(i), b"k5", b"v5");
}
}