Skip to content

Commit

Permalink
fix cancel
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Jan 10, 2024
1 parent 0124951 commit 0128eaf
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,6 @@ use crate::{
};

impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
// Check if the snapshot is the first after restart.
pub fn is_first_snapshot(
&self,
region_id: u64,
cached_info: Option<Arc<CachedRegionInfo>>,
) -> bool {
if let Some(c) = cached_info {
// THe fap snapshot inflight must be the first snapshot.
if (!c.inited_or_fallback.load(Ordering::SeqCst))
&& c.snapshot_inflight.load(Ordering::SeqCst) != 0
{
return true;
}
}
// After restarted, inited_or_fallback and snapshot_inflight could both be
// cleaned. We must ask TiFlash to check if there is already inited
// region peer.
// It's faster to query kvstore's memory rather than read rocksdb.
!self
.engine_store_server_helper
.kvstore_region_exist(region_id)
}

pub fn pre_apply_snapshot_for_fap_snapshot(
&self,
ob_region: &Region,
Expand Down Expand Up @@ -161,18 +138,18 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(o) => {
maybe_cached_info = Some(o.get().clone());
let is_first_snapshot = self.is_first_snapshot(region_id, Some(o.get().clone()));
let already_existed = self.engine_store_server_helper.kvstore_region_exist(region_id);
debug!("fast path: check should apply fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
"is_first_snapshot" => is_first_snapshot,
"already_existed" => already_existed,
);
if is_first_snapshot {
if !already_existed {
// May be a fap snapshot, try to apply.
applied_fap = try_apply_fap_snapshot(o.get().clone(), false);
}
// Otherwise, it could not be an fap snapshot.
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,9 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
let ctx = lock.deref_mut();
ctx.tracer.remove(&region_id)
};
if self.packed_envs.engine_store_cfg.enable_fast_add_peer {
self.engine_store_server_helper
.clear_fap_snapshot(region_id);
}

// We don't clear fap snapshot, because applying could be resumed after restart.
// Once resumed, the fap snapshot could be reused.
if let Some(t) = maybe_prehandle_task {
// If `cancel_applying_snap` is called, applying snapshot is cancelled.
// It will happen only if the peer is scheduled away.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub struct CachedRegionInfo {
// TiKV assumes a region's learner peer is added through snapshot.
// If `inited_or_fallback=false`, will try fast path when meet MsgAppend.
// If `inited_or_fallback=true`, it fap is finished or fallback.
// If fap fallbacks, we must set inited_or_fallback to true,
// NOTE After `apply_snapshot`, `is_initialized` will return true, but `inited_or_fallback` is
// false. If fap fallbacks, we must set inited_or_fallback to true,
// Otherwise, a tikv snapshot will be neglect in `post_apply_snapshot` and cause data loss.
pub inited_or_fallback: AtomicBool,
// If set to non-zero, a fap is sent and not handled.
Expand Down
41 changes: 0 additions & 41 deletions proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,6 @@ use engine_tiflash::CachedRegionInfo;

use crate::utils::v1::*;

#[test]
fn test_cancel_after_fap_phase1() {
tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster(0, 2);
fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
// fail::cfg("on_pre_write_apply_state", "return").unwrap();
fail::cfg("fap_mock_fake_snapshot", "return(1)").unwrap();
// fail::cfg("before_tiflash_check_double_write", "return").unwrap();
disable_auto_gen_compact_log(&mut cluster);
// Disable auto generate peer.
pd_client.disable_default_operator();
let _ = cluster.run_conf_change();

cluster.must_put(b"k0", b"v0");
fail::cfg("on_ob_cancel_after_pre_handle_snapshot", "return").unwrap();
fail::cfg("on_ob_post_apply_snapshot", "pause").unwrap();

pd_client.must_add_peer(1, new_learner_peer(2, 2));
iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
let r = ffi
.engine_store_server
.engines
.as_ref()
.unwrap()
.kv
.proxy_ext
.cached_region_info_manager
.as_ref();
assert!(r.is_some());
assert!(r.unwrap().contains(1));
});

fail::remove("on_ob_post_apply_snapshot");

cluster.shutdown();
fail::remove("on_ob_cancel_after_pre_handle_snapshot");
fail::remove("fap_core_no_fallback");
fail::remove("fap_mock_fake_snapshot");
}

#[test]
fn test_restart_meta_info() {
fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap();
Expand Down

0 comments on commit 0128eaf

Please sign in to comment.