Skip to content

Commit

Permalink
z
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Jan 5, 2024
1 parent 1eaec8a commit 99a00ea
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,17 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
}
}
MapEntry::Vacant(_) => {
// Compat no fast add peer logic
// panic!("unknown snapshot!");
if !self.is_initialized(region_id) {
// If there is an fap snapshot, and we'll skip here after restared.
// Otherwise, there could be redunduant prehandling.
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
info!("fast path: prehandle first snapshot after restart {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
should_skip = true;
}
}
}
},
).is_err() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ pub fn gen_engine_store_server_helper(
fn_handle_safe_ts_update: Some(ffi_handle_safe_ts_update),
fn_fast_add_peer: Some(ffi_fast_add_peer),
fn_apply_fap_snapshot: Some(ffi_apply_fap_snapshot),
fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state),
ps: PageStorageInterfaces {
fn_create_write_batch: Some(ffi_mockps_create_write_batch),
fn_wb_put_page: Some(ffi_mockps_wb_put_page),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ use super::{
};
use crate::mock_cluster;

pub(crate) unsafe extern "C" fn ffi_query_fap_snapshot_state(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
region_id: u64,
peer_id: u64,
) -> interfaces_ffi::FapSnapshotState {
let store = into_engine_store_server_wrap(arg1);
if (*store.engine_store_server)
.tmp_fap_regions
.contains_key(&(region_id, peer_id))
{
return interfaces_ffi::FapSnapshotState::Persisted;
}
interfaces_ffi::FapSnapshotState::NotFound
}

pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
region_id: u64,
Expand Down
11 changes: 11 additions & 0 deletions proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ impl EngineStoreServerHelper {
}
}

pub fn query_fap_snapshot_state(
&self,
region_id: u64,
new_peer_id: u64,
) -> interfaces_ffi::FapSnapshotState {
debug_assert!(self.fn_query_fap_snapshot_state.is_some());
unsafe {
(self.fn_query_fap_snapshot_state.into_inner())(self.inner, region_id, new_peer_id)
}
}

pub fn handle_ingest_sst(
&self,
snaps: Vec<(&[u8], ColumnFamilyType)>,
Expand Down
16 changes: 15 additions & 1 deletion proxy_components/proxy_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ pub mod root {
pub apply_state: root::DB::CppStrWithView,
pub region: root::DB::CppStrWithView,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum FapSnapshotState {
NotFound = 0,
Persisted = 1,
Other = 2,
}
#[repr(u64)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum ConfigJsonType {
Expand Down Expand Up @@ -703,6 +710,13 @@ pub mod root {
new_peer_id: u64,
) -> root::DB::FastAddPeerRes,
>,
pub fn_query_fap_snapshot_state: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
region_id: u64,
new_peer_id: u64,
) -> root::DB::FapSnapshotState,
>,
}
extern "C" {
pub fn ffi_get_server_info_from_proxy(
Expand All @@ -711,7 +725,7 @@ pub mod root {
arg3: root::DB::RawVoidPtr,
) -> u32;
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 12212357911599932158;
pub const RAFT_STORE_PROXY_VERSION: u64 = 17148513290160444417;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
20 changes: 10 additions & 10 deletions proxy_tests/proxy/shared/fast_add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum PauseType {
fn basic_fast_add_peer() {
tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster(0, 2);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
// fail::cfg("on_pre_write_apply_state", "return").unwrap();
fail::cfg("fap_mock_fake_snapshot", "return(1)").unwrap();
Expand Down Expand Up @@ -67,7 +67,7 @@ fn test_overlap_last_apply_old() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
pd_client.disable_default_operator();
disable_auto_gen_compact_log(&mut cluster);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
tikv_util::set_panic_hook(true, "./");
// Can always apply snapshot immediately
Expand Down Expand Up @@ -198,7 +198,7 @@ fn test_overlap_apply_legacy_in_the_middle() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
pd_client.disable_default_operator();
disable_auto_gen_compact_log(&mut cluster);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
cluster.cfg.tikv.raft_store.store_batch_system.pool_size = 4;
cluster.cfg.tikv.raft_store.apply_batch_system.pool_size = 4;
Expand Down Expand Up @@ -350,7 +350,7 @@ fn simple_fast_add_peer(
// The case in TiFlash is (DelayedPeer, false, Build)
tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
if !check_timeout {
fail::cfg("fap_core_fallback_millis", "return(1000000)").unwrap();
Expand Down Expand Up @@ -766,7 +766,7 @@ fn test_existing_peer() {

tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster(0, 2);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
// fail::cfg("on_pre_write_apply_state", "return").unwrap();
disable_auto_gen_compact_log(&mut cluster);
Expand Down Expand Up @@ -815,7 +815,7 @@ fn test_existing_peer() {
fn test_apply_snapshot() {
tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
// fail::cfg("on_pre_write_apply_state", "return").unwrap();
disable_auto_gen_compact_log(&mut cluster);
Expand Down Expand Up @@ -895,7 +895,7 @@ fn test_apply_snapshot() {
fn test_split_no_fast_add() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
Expand Down Expand Up @@ -938,7 +938,7 @@ fn test_split_no_fast_add() {
fn test_split_merge() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
Expand Down Expand Up @@ -991,7 +991,7 @@ fn test_split_merge() {
fn test_fall_back_to_slow_path() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
Expand Down Expand Up @@ -1030,7 +1030,7 @@ fn test_fall_back_to_slow_path() {
fn test_single_replica_migrate() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "");
fail::cfg("post_apply_snapshot_allow_no_unips", "").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
Expand Down
2 changes: 1 addition & 1 deletion raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once
#include <cstdint>
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 12212357911599932158ull; }
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 17148513290160444417ull; }
9 changes: 9 additions & 0 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ struct FastAddPeerRes {
CppStrWithView region;
};

enum class FapSnapshotState : uint32_t {
NotFound,
Persisted,
Other,
};

enum class ConfigJsonType : uint64_t { ProxyConfigAddressed = 1 };

struct RaftStoreProxyFFIHelper {
Expand Down Expand Up @@ -353,6 +359,9 @@ struct EngineStoreServerHelper {
uint64_t leader_safe_ts);
FastAddPeerRes (*fn_fast_add_peer)(EngineStoreServerWrap *,
uint64_t region_id, uint64_t new_peer_id);
FapSnapshotState (*fn_query_fap_snapshot_state)(EngineStoreServerWrap *,
uint64_t region_id,
uint64_t new_peer_id);
};

#ifdef __cplusplus
Expand Down

0 comments on commit 99a00ea

Please sign in to comment.