diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs index 1890bde1fe5..b79889f7174 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs @@ -153,8 +153,17 @@ impl ProxyForwarder { } } MapEntry::Vacant(_) => { - // Compat no fast add peer logic - // panic!("unknown snapshot!"); + if !self.is_initialized(region_id) { + // If there is an fap snapshot, and we'll skip here after restared. + // Otherwise, there could be redunduant prehandling. + if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted { + info!("fast path: prehandle first snapshot after restart {}:{} {}", self.store_id, region_id, peer_id; + "snap_key" => ?snap_key, + "region_id" => region_id, + ); + should_skip = true; + } + } } }, ).is_err() { diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index c91875e1a5a..9a082d05a17 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -365,6 +365,7 @@ pub fn gen_engine_store_server_helper( fn_handle_safe_ts_update: Some(ffi_handle_safe_ts_update), fn_fast_add_peer: Some(ffi_fast_add_peer), fn_apply_fap_snapshot: Some(ffi_apply_fap_snapshot), + fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state), ps: PageStorageInterfaces { fn_create_write_batch: Some(ffi_mockps_create_write_batch), fn_wb_put_page: Some(ffi_mockps_wb_put_page), diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs index f56207b026f..9a30480fc98 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs @@ -9,6 +9,21 @@ use super::{ }; use crate::mock_cluster; +pub(crate) unsafe extern "C" fn ffi_query_fap_snapshot_state( + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + region_id: u64, + peer_id: u64, +) -> interfaces_ffi::FapSnapshotState { + let store = into_engine_store_server_wrap(arg1); + if (*store.engine_store_server) + .tmp_fap_regions + .contains_key(&(region_id, peer_id)) + { + return interfaces_ffi::FapSnapshotState::Persisted; + } + interfaces_ffi::FapSnapshotState::NotFound +} + pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot( arg1: *mut interfaces_ffi::EngineStoreServerWrap, region_id: u64, diff --git a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs index d75ca68c9a6..fc7693513ff 100644 --- a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs @@ -232,6 +232,17 @@ impl EngineStoreServerHelper { } } + pub fn query_fap_snapshot_state( + &self, + region_id: u64, + new_peer_id: u64, + ) -> interfaces_ffi::FapSnapshotState { + debug_assert!(self.fn_query_fap_snapshot_state.is_some()); + unsafe { + (self.fn_query_fap_snapshot_state.into_inner())(self.inner, region_id, new_peer_id) + } + } + pub fn handle_ingest_sst( &self, snaps: Vec<(&[u8], ColumnFamilyType)>, diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index 2a7a48643e2..76d914b6049 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -350,6 +350,13 @@ pub mod root { pub apply_state: root::DB::CppStrWithView, pub region: root::DB::CppStrWithView, } + #[repr(u32)] + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] + pub enum FapSnapshotState { + NotFound = 0, + Persisted = 1, + Other = 2, + } #[repr(u64)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum ConfigJsonType { @@ -703,6 +710,13 @@ pub mod root { new_peer_id: u64, ) -> root::DB::FastAddPeerRes, >, + pub fn_query_fap_snapshot_state: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + region_id: u64, + new_peer_id: u64, + ) -> root::DB::FapSnapshotState, + >, } extern "C" { pub fn ffi_get_server_info_from_proxy( @@ -711,7 +725,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 12212357911599932158; + pub const RAFT_STORE_PROXY_VERSION: u64 = 17148513290160444417; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_tests/proxy/shared/fast_add_peer.rs b/proxy_tests/proxy/shared/fast_add_peer.rs index 6a7153f43eb..37e9ff8a77a 100644 --- a/proxy_tests/proxy/shared/fast_add_peer.rs +++ b/proxy_tests/proxy/shared/fast_add_peer.rs @@ -24,7 +24,7 @@ enum PauseType { fn basic_fast_add_peer() { tikv_util::set_panic_hook(true, "./"); let (mut cluster, pd_client) = new_mock_cluster(0, 2); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; // fail::cfg("on_pre_write_apply_state", "return").unwrap(); fail::cfg("fap_mock_fake_snapshot", "return(1)").unwrap(); @@ -67,7 +67,7 @@ fn test_overlap_last_apply_old() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); pd_client.disable_default_operator(); disable_auto_gen_compact_log(&mut cluster); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; tikv_util::set_panic_hook(true, "./"); // Can always apply snapshot immediately @@ -198,7 +198,7 @@ fn test_overlap_apply_legacy_in_the_middle() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); pd_client.disable_default_operator(); disable_auto_gen_compact_log(&mut cluster); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; cluster.cfg.tikv.raft_store.store_batch_system.pool_size = 4; cluster.cfg.tikv.raft_store.apply_batch_system.pool_size = 4; @@ -350,7 +350,7 @@ fn simple_fast_add_peer( // The case in TiFlash is (DelayedPeer, false, Build) tikv_util::set_panic_hook(true, "./"); let (mut cluster, pd_client) = new_mock_cluster(0, 3); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; if !check_timeout { fail::cfg("fap_core_fallback_millis", "return(1000000)").unwrap(); @@ -766,7 +766,7 @@ fn test_existing_peer() { tikv_util::set_panic_hook(true, "./"); let (mut cluster, pd_client) = new_mock_cluster(0, 2); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; // fail::cfg("on_pre_write_apply_state", "return").unwrap(); disable_auto_gen_compact_log(&mut cluster); @@ -815,7 +815,7 @@ fn test_existing_peer() { fn test_apply_snapshot() { tikv_util::set_panic_hook(true, "./"); let (mut cluster, pd_client) = new_mock_cluster(0, 3); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; // fail::cfg("on_pre_write_apply_state", "return").unwrap(); disable_auto_gen_compact_log(&mut cluster); @@ -895,7 +895,7 @@ fn test_apply_snapshot() { fn test_split_no_fast_add() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); pd_client.disable_default_operator(); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; tikv_util::set_panic_hook(true, "./"); @@ -938,7 +938,7 @@ fn test_split_no_fast_add() { fn test_split_merge() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); pd_client.disable_default_operator(); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; tikv_util::set_panic_hook(true, "./"); @@ -991,7 +991,7 @@ fn test_split_merge() { fn test_fall_back_to_slow_path() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2); pd_client.disable_default_operator(); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; tikv_util::set_panic_hook(true, "./"); @@ -1030,7 +1030,7 @@ fn test_fall_back_to_slow_path() { fn test_single_replica_migrate() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); pd_client.disable_default_operator(); - fail::cfg("post_apply_snapshot_allow_no_unips", ""); + fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap(); cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; tikv_util::set_panic_hook(true, "./"); diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 761d62cfdcd..7d2d963ae7a 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 12212357911599932158ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 17148513290160444417ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index bc112351667..bdf5432cb9e 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -240,6 +240,12 @@ struct FastAddPeerRes { CppStrWithView region; }; +enum class FapSnapshotState : uint32_t { + NotFound, + Persisted, + Other, +}; + enum class ConfigJsonType : uint64_t { ProxyConfigAddressed = 1 }; struct RaftStoreProxyFFIHelper { @@ -353,6 +359,9 @@ struct EngineStoreServerHelper { uint64_t leader_safe_ts); FastAddPeerRes (*fn_fast_add_peer)(EngineStoreServerWrap *, uint64_t region_id, uint64_t new_peer_id); + FapSnapshotState (*fn_query_fap_snapshot_state)(EngineStoreServerWrap *, + uint64_t region_id, + uint64_t new_peer_id); }; #ifdef __cplusplus