Skip to content

Commit

Permalink
raftstore: skip deleting snapshot files in peer pending_remove is true (
Browse files Browse the repository at this point in the history
tikv#11782)

* raftstore: skip deleting snapshot files in peer when the peer is pending removal and the snapshot is being applied and canceled -- close tikv#11746

This is to avoid the potential panic when the snapshot files are deleted, but the peer's status (Tombstone) is not persisted in disk due to tikv crash.

Signed-off-by: tonyxuqqi <tonyxuqi@outlook.com>

* address code review feedback  -- close tikv#11746

Signed-off-by: qi.xu <tonxuqi@outlook.com>

* address code review feedback 2  -- close tikv#11746

Signed-off-by: qi.xu <tonxuqi@outlook.com>

* address code review feedback 2  -- close tikv#11746

Signed-off-by: qi.xu <tonxuqi@outlook.com>

* address code review feedback 2  -- close tikv#11746

Signed-off-by: qi.xu <tonxuqi@outlook.com>

* address code review feedback 2  -- close tikv#11746

Signed-off-by: qi.xu <tonxuqi@outlook.com>

Co-authored-by: qi.xu <tonxuqi@outlook.com>
Signed-off-by: xiejiandong <xiejiandong@pingcap.com>
  • Loading branch information
2 people authored and xiejiandong committed Feb 14, 2022
1 parent 30293c7 commit 0f45a79
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 3 deletions.
7 changes: 5 additions & 2 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,10 @@ where
}
}
} else if key.term <= compacted_term
&& (key.idx < compacted_idx || key.idx == compacted_idx && !is_applying_snap)
&& (key.idx < compacted_idx
|| key.idx == compacted_idx
&& !is_applying_snap
&& !self.fsm.peer.pending_remove)
{
info!(
"deleting applied snap file";
Expand Down Expand Up @@ -2395,7 +2398,7 @@ where
fail_point!("destroy_peer");
// Mark itself as pending_remove
self.fsm.peer.pending_remove = true;

fail_point!("destroy_peer_after_pending_move", |_| { true });
if self.fsm.peer.has_unpersisted_ready() {
assert!(self.ctx.sync_write_worker.is_none());
// The destroy must be delayed if there are some unpersisted readies.
Expand Down
86 changes: 85 additions & 1 deletion tests/failpoints/cases/test_snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,98 @@ fn test_destroy_peer_on_pending_snapshot() {
sleep_ms(100);

fail::remove(apply_snapshot_fp);

fail::remove(before_handle_normal_3_fp);

cluster.must_put(b"k120", b"v1");
// After peer 4 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

// The peer 3 in store 3 is isolated for a while and then recovered.
// During its applying snapshot, however the peer is destroyed and thus applying snapshot is canceled.
// And when it's destroyed (destroy is not finished either), the machine restarted.
// After the restart, the snapshot should be applied successfully.println!
// And new data should be written to store 3 successfully.
#[test]
fn test_destroy_peer_on_pending_snapshot_and_restart() {
let mut cluster = new_server_cluster(0, 3);
configure_for_snapshot(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
pd_client.must_add_peer(r1, new_peer(2, 2));
pd_client.must_add_peer(r1, new_peer(3, 3));

cluster.must_put(b"k1", b"v1");
// Ensure peer 3 is initialized.
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

cluster.must_transfer_leader(1, new_peer(1, 1));
let destroy_peer_fp = "destroy_peer_after_pending_move";
fail::cfg(destroy_peer_fp, "return(true)").unwrap();

cluster.add_send_filter(IsolationFilterFactory::new(3));

for i in 0..20 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}

// skip applying snapshot into RocksDB to keep peer status is Applying
let apply_snapshot_fp = "apply_pending_snapshot";
fail::cfg(apply_snapshot_fp, "return()").unwrap();

cluster.clear_send_filters();
// Wait for leader send snapshot.
sleep_ms(100);

// Don't send check stale msg to PD
let peer_check_stale_state_fp = "peer_check_stale_state";
fail::cfg(peer_check_stale_state_fp, "return()").unwrap();

pd_client.must_remove_peer(r1, new_peer(3, 3));
// Without it, pd_client.must_remove_peer does not trigger destroy_peer!
pd_client.must_add_peer(r1, new_peer(3, 4));

let before_handle_normal_3_fp = "before_handle_normal_3";
// to pause ApplyTaskRes::Destroy so that peer gc could finish
fail::cfg(before_handle_normal_3_fp, "pause").unwrap();
// Wait for leader send msg to peer 3.
// Then destroy peer 3
sleep_ms(100);

fail::remove(before_handle_normal_3_fp); // allow destroy run

// restart node 3
cluster.stop_node(3);
fail::remove(apply_snapshot_fp);
fail::remove(peer_check_stale_state_fp);
fail::remove(destroy_peer_fp);
cluster.run_node(3).unwrap();
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");
// After peer 3 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k119", b"v1");
// In the end the snapshot file should be gc-ed anyway, either by new peer or by store
let now = Instant::now();
loop {
let mut snap_files = vec![];
let snap_dir = cluster.get_snap_dir(3);
// snapfiles should be gc.
snap_files.extend(fs::read_dir(snap_dir).unwrap().map(|p| p.unwrap().path()));
if snap_files.is_empty() {
break;
}
if now.saturating_elapsed() > Duration::from_secs(5) {
panic!("snap files are not gc-ed");
}
sleep_ms(20);
}

cluster.must_put(b"k120", b"v1");
// new data should be replicated to peer 4 in store 3
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

#[test]
fn test_shutdown_when_snap_gc() {
let mut cluster = new_node_cluster(0, 2);
Expand Down

0 comments on commit 0f45a79

Please sign in to comment.