Skip to content

Commit

Permalink
address code review feedback 2 -- close tikv#11746
Browse files Browse the repository at this point in the history
Signed-off-by: qi.xu <tonxuqi@outlook.com>
  • Loading branch information
qi.xu committed Jan 11, 2022
1 parent 8545efc commit c92984c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
4 changes: 1 addition & 3 deletions components/raftstore/src/store/fsm/peer.rs
Expand Up @@ -1036,8 +1036,6 @@ where
}
};
self.ctx.snap_mgr.delete_snapshot(&key, a.as_ref(), false);
} else if self.fsm.peer.pending_remove {
fail_point!("pending_remove_is_true");
}
}
}
Expand Down Expand Up @@ -2361,7 +2359,7 @@ where
fail_point!("destroy_peer");
// Mark itself as pending_remove
self.fsm.peer.pending_remove = true;

fail_point!("destroy_peer_after_pending_move", |_| {true});
if self.fsm.peer.has_unpersisted_ready() {
assert!(self.ctx.sync_write_worker.is_none());
// The destroy must be delayed if there are some unpersisted readies.
Expand Down
77 changes: 67 additions & 10 deletions tests/failpoints/cases/test_snap.rs
Expand Up @@ -231,28 +231,81 @@ fn test_destroy_peer_on_pending_snapshot() {
let before_handle_normal_3_fp = "before_handle_normal_3";
fail::cfg(before_handle_normal_3_fp, "pause").unwrap();

let (sx, rx) = mpsc::sync_channel::<bool>(10);
// Because before_handle_normal_3 could pause the apply thread, so the destroy_peer task is not running.
// And thus the window of maybe_destroy and destroy_peer can be large enough for a snapshot gc to run.
fail::cfg_callback("pending_remove_is_true", move || {
sx.send(true).unwrap();
})
.unwrap();

cluster.clear_send_filters();
// Wait for leader send msg to peer 3.
// Then destroy peer 3 and create peer 4.
sleep_ms(100);

fail::remove(apply_snapshot_fp);
// give extra time for gc to finish in case slow ci machine, though 100ms is already 2x of gc interval.
rx.recv_timeout(Duration::from_millis(50)).unwrap();
fail::remove(before_handle_normal_3_fp);

cluster.must_put(b"k120", b"v1");
// After peer 4 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

// The peer 3 in store 3 is isolated for a while and then recovered.
// During its applying snapshot, however the peer is destroyed and thus applying snapshot is canceled.
// And when it's destroyed (destroy is not finished either), the machine restarted.
// After the restart, the snapshot should be applied successfully.println!
// And new data should be written to store 3 successfully.
#[test]
fn test_destroy_peer_on_pending_snapshot_and_restart() {
let mut cluster = new_server_cluster(0, 3);
configure_for_snapshot(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
pd_client.must_add_peer(r1, new_peer(2, 2));
pd_client.must_add_peer(r1, new_peer(3, 3));

cluster.must_put(b"k1", b"v1");
// Ensure peer 3 is initialized.
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

cluster.must_transfer_leader(1, new_peer(1, 1));
let destroy_peer_fp = "destroy_peer_after_pending_move";
fail::cfg(destroy_peer_fp, "return(true)").unwrap();

cluster.add_send_filter(IsolationFilterFactory::new(3));

for i in 0..20 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}

// skip applying snapshot into RocksDB to keep peer status is Applying
let apply_snapshot_fp = "apply_pending_snapshot";
fail::cfg(apply_snapshot_fp, "return()").unwrap();

cluster.clear_send_filters();
// Wait for leader send snapshot.
sleep_ms(100);

// Don't send check stale msg to PD
let peer_check_stale_state_fp = "peer_check_stale_state";
fail::cfg(peer_check_stale_state_fp, "return()").unwrap();

pd_client.must_remove_peer(r1, new_peer(3, 3));
pd_client.must_add_peer(r1, new_peer(3, 4)); // Without it, pd_client.must_remove_peer does not trigger destroy_peer!

let before_handle_normal_3_fp = "before_handle_normal_3";
fail::cfg(before_handle_normal_3_fp, "pause").unwrap(); // to pause ApplyTaskRes::Destroy so that peer gc could finish
// Wait for leader send msg to peer 3.
// Then destroy peer 3
sleep_ms(100);

fail::remove(before_handle_normal_3_fp); // allow destroy run

// restart node 3
cluster.stop_node(3);
fail::remove(apply_snapshot_fp);
fail::remove(peer_check_stale_state_fp);
fail::remove(destroy_peer_fp);
cluster.run_node(3).unwrap();
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");
// After peer 3 has applied snapshot, data should be got.
must_get_equal(&cluster.get_engine(3), b"k119", b"v1");
// In the end the snapshot file should be gc-ed anyway, either by new peer or by store
let now = Instant::now();
loop {
Expand All @@ -268,6 +321,10 @@ fn test_destroy_peer_on_pending_snapshot() {
}
sleep_ms(20);
}

cluster.must_put(b"k120", b"v1");
// new data should be replicated to peer 4 in store 3
must_get_equal(&cluster.get_engine(3), b"k120", b"v1");
}

#[test]
Expand Down

0 comments on commit c92984c

Please sign in to comment.