diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index b7500ab0475..fabe2d6800a 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit b7500ab0475f04cc61a91cfded5b1b4c95b7283d +Subproject commit fabe2d6800aaf1b049140fb9b29a80b74bb2a0ad diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 939a8e017b3..d636e715819 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -107,6 +107,7 @@ namespace DB M(force_set_parallel_prehandle_threshold) \ M(force_raise_prehandle_exception) \ M(force_agg_on_partial_block) \ + M(force_set_fap_candidate_store_id) \ M(force_not_clean_fap_on_destroy) \ M(delta_tree_create_node_fail) \ M(disable_flush_cache) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 3320fdd9547..3ec774feb22 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -408,6 +408,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_ingesting_stage, {{"type", "ingesting_stage"}}), \ F(type_writing_stage, {{"type", "writing_stage"}}), \ F(type_queueing_stage, {{"type", "queueing_stage"}}), \ + F(type_blocking_cancel_stage, {{"type", "blocking_cancel_stage"}}), \ F(type_selecting_stage, {{"type", "selecting_stage"}})) \ M(tiflash_fap_nomatch_reason, \ "", \ diff --git a/dbms/src/Debug/MockKVStore/MockProxyRegion.cpp b/dbms/src/Debug/MockKVStore/MockProxyRegion.cpp index fef456f12d6..5b390669ef3 100644 --- a/dbms/src/Debug/MockKVStore/MockProxyRegion.cpp +++ b/dbms/src/Debug/MockKVStore/MockProxyRegion.cpp @@ -35,6 +35,12 @@ raft_serverpb::RegionLocalState MockProxyRegion::getState() NO_THREAD_SAFETY_ANA return state; } +raft_serverpb::RegionLocalState & MockProxyRegion::mutState() NO_THREAD_SAFETY_ANALYSIS +{ + auto _ = genLockGuard(); + return state; +} + raft_serverpb::RaftApplyState MockProxyRegion::getApply() NO_THREAD_SAFETY_ANALYSIS { auto _ = genLockGuard(); diff --git a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp index f36e679d474..2c958ef2068 100644 --- a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp @@ -595,8 +595,35 @@ std::tuple MockRaftStoreProxy::snapshot( std::optional deadline_index, bool cancel_after_prehandle) { - auto region = getRegion(region_id); auto old_kv_region = kvs.getRegion(region_id); + RUNTIME_CHECK(old_kv_region != nullptr); + return snapshot( + kvs, + tmt, + region_id, + std::move(cfs), + old_kv_region->cloneMetaRegion(), + old_kv_region->mutMeta().peerId(), + index, + term, + deadline_index, + cancel_after_prehandle); +} + +std::tuple MockRaftStoreProxy::snapshot( + KVStore & kvs, + TMTContext & tmt, + UInt64 region_id, + std::vector && cfs, + metapb::Region && region_meta, + UInt64 peer_id, + uint64_t index, + uint64_t term, + std::optional deadline_index, + bool cancel_after_prehandle) +{ + auto region = getRegion(region_id); + RUNTIME_CHECK(region != nullptr); // We have catch up to index by snapshot. // So we assume there are new data updated, so we inc index by 1. if (index == 0) @@ -605,8 +632,7 @@ std::tuple MockRaftStoreProxy::snapshot( term = region->getLatestCommitTerm(); } - auto new_kv_region - = kvs.genRegionPtr(old_kv_region->cloneMetaRegion(), old_kv_region->mutMeta().peerId(), index, term); + auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term); // The new entry is committed on Proxy's side. region->updateCommitIndex(index); new_kv_region->setApplied(index, term); @@ -621,20 +647,28 @@ std::tuple MockRaftStoreProxy::snapshot( } } SSTViewVec snaps{ssts.data(), ssts.size()}; - auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt); - - auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)}; - if (cancel_after_prehandle) + try { - kvs.releasePreHandledSnapshot(rg, tmt); + auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt); + auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)}; + if (cancel_after_prehandle) + { + kvs.releasePreHandledSnapshot(rg, tmt); + return std::make_tuple(kvs.getRegion(region_id), prehandle_result); + } + kvs.checkAndApplyPreHandledSnapshot(rg, tmt); + // Though it is persisted earlier in real proxy, but the state is changed to Normal here. + region->updateAppliedIndex(index, true); + // Region changes during applying snapshot, must re-get. return std::make_tuple(kvs.getRegion(region_id), prehandle_result); } - kvs.checkAndApplyPreHandledSnapshot(rg, tmt); - // Though it is persisted earlier in real proxy, but the state is changed to Normal here. - region->updateAppliedIndex(index, true); - - // Region changes during applying snapshot, must re-get. - return std::make_tuple(kvs.getRegion(region_id), prehandle_result); + catch (const Exception & e) + { + LOG_ERROR(log, "mock apply snapshot error {}", e.message()); + e.rethrow(); + } + LOG_FATAL(DB::Logger::get(), "Should not happen"); + exit(-1); } TableID MockRaftStoreProxy::bootstrapTable(Context & ctx, KVStore & kvs, TMTContext & tmt, bool drop_at_first) diff --git a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h index 8c35459a4e1..1e1973cf066 100644 --- a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h @@ -30,6 +30,7 @@ kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t star struct MockProxyRegion : MutexLockWrap { raft_serverpb::RegionLocalState getState(); + raft_serverpb::RegionLocalState & mutState(); raft_serverpb::RaftApplyState getApply(); void persistAppliedIndex(); void persistAppliedIndex(const std::lock_guard & lock); @@ -231,6 +232,17 @@ struct MockRaftStoreProxy : MutexLockWrap std::vector> && ranges, metapb::RegionEpoch old_epoch); + std::tuple snapshot( + KVStore & kvs, + TMTContext & tmt, + UInt64 region_id, + std::vector && cfs, + metapb::Region && region_meta, + UInt64 peer_id, + uint64_t index, + uint64_t term, + std::optional deadline_index, + bool cancel_after_prehandle); std::tuple snapshot( KVStore & kvs, TMTContext & tmt, diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 44e5b19714f..bc29b9ea27f 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -253,6 +253,7 @@ struct Settings M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \ \ M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 80, "The max time wait for a usable checkpoint for FAP") \ + M(SettingUInt64, fap_task_timeout_seconds, 120, "The max time FAP can take before fallback") \ M(SettingUInt64, fap_handle_concurrency, 25, "The number of threads for handling FAP tasks") \ \ M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \ diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.cpp b/dbms/src/Interpreters/SharedContexts/Disagg.cpp index e13352ac266..eee4d0f4547 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.cpp +++ b/dbms/src/Interpreters/SharedContexts/Disagg.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include @@ -101,6 +101,7 @@ void SharedContextDisagg::initRemoteDataStore(const FileProviderPtr & file_provi void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur) { + LOG_INFO(Logger::get(), "Init FAP Context, concurrency={}", fap_concur); fap_context = std::make_shared(fap_concur); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index da7610776fb..2ad8159ee64 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -76,8 +77,6 @@ namespace tests class DeltaMergeStoreTest; } -inline static const PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1; - struct SegmentStats { UInt64 segment_id = 0; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index b4fcfc82346..a57594a8dbc 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include @@ -1145,26 +1145,42 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo( "Build checkpoint from remote, store_id={} region_id={}", checkpoint_info->remote_store_id, checkpoint_info->region_id); - auto segment_meta_infos = Segment::readAllSegmentsMetaInfoInRange(*dm_context, range, checkpoint_info); - LOG_INFO(log, "Ingest checkpoint segments num {}", segment_meta_infos.size()); WriteBatches wbs{*dm_context->storage_pool}; - auto restored_segments = Segment::createTargetSegmentsFromCheckpoint( // - log, - *dm_context, - checkpoint_info->remote_store_id, - segment_meta_infos, - range, - checkpoint_info->temp_ps, - wbs); - - if (restored_segments.empty()) + try { - LOG_DEBUG(log, "No segments to ingest."); - return {}; + auto segment_meta_infos = Segment::readAllSegmentsMetaInfoInRange(*dm_context, range, checkpoint_info); + auto restored_segments = Segment::createTargetSegmentsFromCheckpoint( // + log, + *dm_context, + checkpoint_info->remote_store_id, + segment_meta_infos, + range, + checkpoint_info->temp_ps, + wbs); + if (restored_segments.empty()) + { + return {}; + } + wbs.writeLogAndData(); + LOG_INFO( + log, + "Finish write fap checkpoint, region_id={} segments_num={}", + checkpoint_info->region_id, + segment_meta_infos.size()); + return restored_segments; + } + catch (const Exception & e) + { + LOG_INFO( + log, + "Build checkpoint from remote failed for {}, region_id={} remote_store_id={}", + e.message(), + checkpoint_info->region_id, + checkpoint_info->remote_store_id); + wbs.setRollback(); + e.rethrow(); } - wbs.writeLogAndData(); - LOG_INFO(log, "Finish write fap checkpoint, region_id={}", checkpoint_info->region_id); - return restored_segments; + return {}; } void DeltaMergeStore::ingestSegmentsFromCheckpointInfo( @@ -1211,7 +1227,7 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo( // TODO(fap) This could be executed in a dedicated thread if it consumes too much time. for (auto & segment : updated_segments) - checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft); + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index f02f77a9f76..0f275e59697 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -400,28 +401,39 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( // // If cache is empty, we read from DELTA_MERGE_FIRST_SEGMENT_ID to the end and build the cache. // Otherwise, we just read the segment that cover the range. - PageIdU64 current_segment_id = 1; + PageIdU64 current_segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; auto end_to_segment_id_cache = checkpoint_info->checkpoint_data_holder->getEndToSegmentIdCache( KeyspaceTableID{context.keyspace_id, context.physical_table_id}); auto lock = end_to_segment_id_cache->lock(); bool is_cache_ready = end_to_segment_id_cache->isReady(lock); if (is_cache_ready) { - // TODO bisect for end current_segment_id = end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue()); } LOG_DEBUG(Logger::get(), "Read segment meta info from segment {}", current_segment_id); std::vector> end_key_and_segment_ids; SegmentMetaInfos segment_infos; - // TODO(fap) After #7642 there could be no segment, so it could panic later. while (current_segment_id != 0) { Segment::SegmentMetaInfo segment_info; auto target_id = UniversalPageIdFormat::toFullPageId( UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Meta, context.physical_table_id), current_segment_id); - auto page = checkpoint_info->temp_ps->read(target_id); + auto page = checkpoint_info->temp_ps->read(target_id, nullptr, {}, false); + if unlikely (!page.isValid()) + { + // After #7642, DELTA_MERGE_FIRST_SEGMENT_ID may not exist, however, such checkpoint won't be selected. + // If it were to be selected, the FAP task could fallback to regular snapshot. + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find page id {}, keyspace={} table_id={} current_segment_id={} range={}", + target_id, + context.keyspace_id, + context.physical_table_id, + current_segment_id, + target_range.toDebugString()); + } segment_info.segment_id = current_segment_id; ReadBufferFromMemory buf(page.data.begin(), page.data.size()); readSegmentMetaInfo(buf, segment_info); @@ -2350,6 +2362,16 @@ void Segment::drop(const FileProviderPtr & file_provider, WriteBatches & wbs) stable->drop(file_provider); } +void Segment::dropAsFAPTemp(const FileProviderPtr & file_provider, WriteBatches & wbs) +{ + // The segment_id, delta_id, stable_id are invalid, just cleanup the persisted page_id in + // delta layer and stable layer + delta->recordRemoveColumnFilesPages(wbs); + stable->recordRemovePacksPages(wbs); + wbs.writeAll(); + stable->drop(file_provider); +} + Segment::ReadInfo Segment::getReadInfo( const DMContext & dm_context, const ColumnDefines & read_columns, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 85bb94886e9..b90f7f52040 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -580,6 +580,9 @@ class Segment void setValidDataRatioChecked() { check_valid_data_ratio.store(true, std::memory_order_relaxed); } void drop(const FileProviderPtr & file_provider, WriteBatches & wbs); + /// Only used in FAP. + /// Drop a segment built with invalid id. + void dropAsFAPTemp(const FileProviderPtr & file_provider, WriteBatches & wbs); bool isFlushing() const { return delta->isFlushing(); } diff --git a/dbms/src/Storages/DeltaMerge/Segment_fwd.h b/dbms/src/Storages/DeltaMerge/Segment_fwd.h new file mode 100644 index 00000000000..0c8dc140058 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Segment_fwd.h @@ -0,0 +1,27 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +namespace DM +{ + +constexpr PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1; + +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp index f9744618474..5fb5fcf1fab 100644 --- a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp @@ -161,7 +161,7 @@ ContextPtr init(WorkloadOptions & opts) if (!opts.s3_bucket.empty()) { auto & kvstore = context->getTMTContext().getKVStore(); - auto store_meta = kvstore->getStoreMeta(); + auto store_meta = kvstore->clonedStoreMeta(); store_meta.set_id(test_store_id); kvstore->setStore(store_meta); context->getSharedContextDisagg()->initRemoteDataStore(context->getFileProvider(), /*is_s3_enabled*/ true); diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index a8e288f5112..07125e0ec32 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -56,7 +56,7 @@ bool GCManager::work() // For disagg enabled, we must wait before the store meta inited before doing compaction // on segments. Or it will upload new data with incorrect remote path. auto & kvstore = global_context.getTMTContext().getKVStore(); - auto store_info = kvstore->getStoreMeta(); + auto store_info = kvstore->clonedStoreMeta(); if (store_info.id() == InvalidStoreID) { LOG_INFO(log, "Skip GC because store meta is not initialized"); diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 9e6f54b3e4a..c6781d2ead1 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -749,6 +749,20 @@ void ReleasePreHandledSnapshot(EngineStoreServerWrap * server, RawVoidPtr res, R } } +bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id) +{ + try + { + auto & kvstore = server->tmt->getKVStore(); + return kvstore->getRegion(region_id) != nullptr; + } + catch (...) + { + tryLogCurrentFatalException(__PRETTY_FUNCTION__); + exit(-1); + } +} + void GcRawCppPtr(RawVoidPtr ptr, RawCppPtrType type) { if (ptr) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h index 16ca0ef1882..d317250bd53 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h @@ -187,7 +187,10 @@ void HandleSafeTSUpdate( uint64_t self_safe_ts, uint64_t leader_safe_ts); FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, uint64_t new_peer_id); -void ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id); +uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist); +FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id); +void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id); +bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id); } inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap * tiflash_instance_wrap) @@ -237,6 +240,9 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap .fn_set_pb_msg_by_bytes = SetPBMsByBytes, .fn_handle_safe_ts_update = HandleSafeTSUpdate, .fn_fast_add_peer = FastAddPeer, + .fn_query_fap_snapshot_state = QueryFapSnapshotState, + .fn_clear_fap_snapshot = ClearFapSnapshot, + .fn_kvstore_region_exists = KvstoreRegionExists, }; } diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 4133dcb556d..4864109df5e 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -333,6 +333,14 @@ void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt) void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock & task_lock) { const auto region = getRegion(region_id); + // Always try to clean obsolete FAP snapshot + if (tmt.getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) + { + // Everytime we remove region, we try to clean obsolete fap ingest info. + auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + fiu_do_on(FailPoints::force_not_clean_fap_on_destroy, { return; }); + fap_ctx->resolveFapSnapshotState(tmt, proxy_helper, region_id, false); + } if (region == nullptr) { LOG_INFO(log, "region_id={} not found, might be removed already", region_id); @@ -346,14 +354,6 @@ void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTas tmt.getRegionTable(), task_lock, region_manager.genRegionTaskLock(region_id)); - - if (tmt.getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) - { - fiu_do_on(FailPoints::force_not_clean_fap_on_destroy, { return; }); - // Everytime we remove region, we try to clean obsolete fap ingest info. - auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; - fap_ctx->cleanCheckpointIngestInfo(tmt, region_id); - } } void KVStore::setRegionCompactLogConfig(UInt64 rows, UInt64 bytes, UInt64 gap, UInt64 eager_gc_gap) @@ -627,11 +627,21 @@ KVStore::StoreMeta::Base KVStore::StoreMeta::getMeta() const return base; } -metapb::Store KVStore::getStoreMeta() const +metapb::Store KVStore::clonedStoreMeta() const { return getStore().getMeta(); } +const metapb::Store & KVStore::getStoreMeta() const +{ + return this->store.base; +} + +metapb::Store & KVStore::debugMutStoreMeta() +{ + return this->store.base; +} + KVStore::StoreMeta & KVStore::getStore() { return this->store; @@ -705,4 +715,9 @@ RegionPtr KVStore::genRegionPtr(metapb::Region && region, UInt64 peer_id, UInt64 return std::make_shared(std::move(meta), proxy_helper); } +RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const +{ + return region_manager.genRegionTaskLock(region_id); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 02b7a77c4a0..e26c60e91cd 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -214,7 +214,9 @@ class KVStore final : private boost::noncopyable // May return 0 if uninitialized StoreID getStoreID(std::memory_order = std::memory_order_relaxed) const; - metapb::Store getStoreMeta() const; + metapb::Store clonedStoreMeta() const; + const metapb::Store & getStoreMeta() const; + metapb::Store & debugMutStoreMeta(); BatchReadIndexRes batchReadIndex(const std::vector & req, uint64_t timeout_ms) const; @@ -366,6 +368,7 @@ class KVStore final : private boost::noncopyable void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper); + RegionTaskLock genRegionTaskLock(UInt64 region_id) const; #ifndef DBMS_PUBLIC_GTEST private: diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index a5ae1443df9..2dfbcf43387 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -20,7 +20,8 @@ #include #include #include -#include +#include +#include #include #include #include @@ -115,25 +116,19 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, } } } - - onSnapshot(new_region, old_region, old_applied_index, tmt); - + // NOTE Do NOT move it to prehandle stage! + // Otherwise a fap snapshot may be cleaned when prehandling after restarted. if (tmt.getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) { - auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; - // Everytime we meet a legacy snapshot, we try to clean obsolete fap ingest info. if constexpr (!std::is_same_v) { - // TODO(fap): Better cancel fap process in first, however, there is no case currently where a legacy snapshot runs with fap phase1/phase2 in parallel. - // The only case is a fap failed after phase 1 and fallback and failed to clean its phase 1 result. - fap_ctx->cleanCheckpointIngestInfo(tmt, new_region->id()); - } - // Another FAP will not take place if this stage is not finished. - if (fap_ctx->tasks_trace->leakingDiscardTask(new_region->id())) - { - LOG_ERROR(log, "FastAddPeer: find old fap task, region_id={}", new_region->id()); + auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + auto region_id = new_region->id(); + // Everytime we meet a regular snapshot, we try to clean obsolete fap ingest info. + fap_ctx->resolveFapSnapshotState(tmt, proxy_helper, region_id, true); } } + onSnapshot(new_region, old_region, old_applied_index, tmt); } // This function get tiflash replica count from local schema. @@ -314,19 +309,28 @@ void KVStore::onSnapshot( template void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) { - LOG_INFO(log, "Begin apply snapshot, new_region={}", new_region->toString(true)); + try + { + LOG_INFO(log, "Begin apply snapshot, new_region={}", new_region->toString(true)); - Stopwatch watch; - SCOPE_EXIT({ - GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_flush).Observe(watch.elapsedSeconds()); - }); + Stopwatch watch; + SCOPE_EXIT({ + GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_flush) + .Observe(watch.elapsedSeconds()); + }); - checkAndApplyPreHandledSnapshot(new_region, tmt); + checkAndApplyPreHandledSnapshot(new_region, tmt); - FAIL_POINT_PAUSE(FailPoints::pause_until_apply_raft_snapshot); + FAIL_POINT_PAUSE(FailPoints::pause_until_apply_raft_snapshot); - // `new_region` may change in the previous function, just log the region_id down - LOG_INFO(log, "Finish apply snapshot, cost={:.3f}s region_id={}", watch.elapsedSeconds(), new_region->id()); + // `new_region` may change in the previous function, just log the region_id down + LOG_INFO(log, "Finish apply snapshot, cost={:.3f}s region_id={}", watch.elapsedSeconds(), new_region->id()); + } + catch (Exception & e) + { + e.addMessage(fmt::format("(while applyPreHandledSnapshot region_id={})", new_region->id())); + e.rethrow(); + } } template void KVStore::applyPreHandledSnapshot( diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index 7da4356c89a..b565b8f548c 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -14,13 +14,14 @@ #include #include +#include #include #include #include -#include #include #include #include +#include #include #include #include @@ -28,31 +29,27 @@ namespace DB { - CheckpointIngestInfoPtr CheckpointIngestInfo::restore( TMTContext & tmt, const TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id) { - GET_METRIC(tiflash_fap_task_result, type_restore).Increment(); RegionPtr region; DM::Segments restored_segments; auto log = DB::Logger::get("CheckpointIngestInfo"); auto uni_ps = tmt.getContext().getWriteNodePageStorage(); + RUNTIME_CHECK(uni_ps != nullptr); auto snapshot = uni_ps->getSnapshot(fmt::format("read_fap_i_{}", region_id)); + RUNTIME_CHECK(snapshot != nullptr); auto page_id = UniversalPageIdFormat::toLocalKVPrefix(UniversalPageIdFormat::LocalKVKeyType::FAPIngestInfo, region_id); Page page = uni_ps->read(page_id, nullptr, snapshot, /*throw_on_not_exist*/ false); if (!page.isValid()) { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Failed to restore CheckpointIngestInfo, region_id={} peer_id={} store_id={}", - region_id, - peer_id, - tmt.getKVStore()->getStoreID(std::memory_order_relaxed)); + // The restore failed, we can safely return null here to make FAP fallback in ApplyFapSnapshotImpl. + return nullptr; } FastAddPeerProto::CheckpointIngestInfoPersisted ingest_info_persisted; @@ -97,10 +94,11 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( LOG_DEBUG( log, - "Restore segments for checkpoint, remote_segment_id={} range={} remote_store_id={}", + "Restore segments for checkpoint, remote_segment_id={} range={} remote_store_id={} region_id={}", segment_info.segment_id, segment_info.range.toDebugString(), - remote_store_id); + remote_store_id, + region_id); restored_segments.push_back(std::make_shared( log, segment_info.epoch, @@ -126,23 +124,12 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( /*begin_time_*/ 0); } -void CheckpointIngestInfo::persistToLocal() const +FastAddPeerProto::CheckpointIngestInfoPersisted CheckpointIngestInfo::serializeMeta() const { - if (region->isPendingRemove()) - { - // A pending remove region should not be selected as candidate. - LOG_ERROR(log, "candidate region {} is pending remove", region->toString(false)); - return; - } - auto uni_ps = tmt.getContext().getWriteNodePageStorage(); - UniversalWriteBatch wb; - // Write: // - The region, which is actually data and meta in KVStore. // - The segment ids point to segments which are already persisted but not ingested. - FastAddPeerProto::CheckpointIngestInfoPersisted ingest_info_persisted; - { for (const auto & restored_segment : restored_segments) { @@ -158,7 +145,21 @@ void CheckpointIngestInfo::persistToLocal() const ingest_info_persisted.set_region_info(wb.releaseStr()); } ingest_info_persisted.set_remote_store_id(remote_store_id); + return ingest_info_persisted; +} +void CheckpointIngestInfo::persistToLocal() const +{ + if (region->isPendingRemove()) + { + // A pending remove region should not be selected as candidate. + LOG_ERROR(log, "candidate region {} is pending remove", region->toString(false)); + return; + } + auto uni_ps = tmt.getContext().getWriteNodePageStorage(); + UniversalWriteBatch wb; + + auto ingest_info_persisted = serializeMeta(); auto s = ingest_info_persisted.SerializeAsString(); auto data_size = s.size(); auto read_buf = std::make_shared(s); @@ -168,17 +169,16 @@ void CheckpointIngestInfo::persistToLocal() const uni_ps->write(std::move(wb), DB::PS::V3::PageType::Local, nullptr); LOG_INFO( log, - "Successfully persist CheckpointIngestInfo, region_id={} peer_id={} remote_store_id={} region={}", + "Successfully persist CheckpointIngestInfo, region_id={} peer_id={} remote_store_id={} region={} segments={}", region_id, peer_id, remote_store_id, - region->getDebugString()); + region->getDebugString(), + restored_segments.size()); } -void CheckpointIngestInfo::removeFromLocal(TMTContext & tmt, UInt64 region_id) +static void removeFromLocal(TMTContext & tmt, UInt64 region_id) { - auto log = DB::Logger::get(); - LOG_INFO(log, "Erase CheckpointIngestInfo from disk, region_id={}", region_id); auto uni_ps = tmt.getContext().getWriteNodePageStorage(); UniversalWriteBatch del_batch; del_batch.delPage( @@ -186,24 +186,98 @@ void CheckpointIngestInfo::removeFromLocal(TMTContext & tmt, UInt64 region_id) uni_ps->write(std::move(del_batch), PageType::Local); } -// Like removeFromLocal, but is static and with check. -bool CheckpointIngestInfo::forciblyClean(TMTContext & tmt, UInt64 region_id, bool pre_check) +void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, const DM::Segments & segments) { - if (!pre_check) + auto region_id = region->id(); + auto & storages = tmt.getStorages(); + auto keyspace_id = region->getKeyspaceID(); + auto table_id = region->getMappedTableID(); + auto storage = storages.get(keyspace_id, table_id); + + auto log = DB::Logger::get("CheckpointIngestInfo"); + if (storage && storage->engineType() == TiDB::StorageEngine::DT) { - CheckpointIngestInfo::removeFromLocal(tmt, region_id); - return true; + auto dm_storage = std::dynamic_pointer_cast(storage); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + for (const auto & segment_to_drop : segments) + { + DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter()); + // No need to call `abandon`, since the segment is not ingested or in use. + LOG_DEBUG( + log, + "Delete segment from local, segment_id={}, page_id={}, region_id={}", + segment_to_drop->segmentId(), + UniversalPageIdFormat::toFullPageId( + UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Meta, table_id), + segment_to_drop->segmentId()), + region->id()); + segment_to_drop->dropAsFAPTemp(tmt.getContext().getFileProvider(), wbs); + } + } + else + { + LOG_INFO( + log, + "No found storage in clean stale FAP data region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); } - auto uni_ps = tmt.getContext().getWriteNodePageStorage(); - auto snapshot = uni_ps->getSnapshot(fmt::format("read_fap_i_{}", region_id)); - auto page_id - = UniversalPageIdFormat::toLocalKVPrefix(UniversalPageIdFormat::LocalKVKeyType::FAPIngestInfo, region_id); + UniversalWriteBatch wb; + auto wn_ps = tmt.getContext().getWriteNodePageStorage(); + RaftDataReader raft_data_reader(*wn_ps); + raft_data_reader.traverseRemoteRaftLogForRegion( + region_id, + [&](const UniversalPageId & page_id, PageSize size, const PS::V3::CheckpointLocation &) { + LOG_DEBUG( + log, + "Delete raft log size {}, region_id={} index={}", + size, + region_id, + UniversalPageIdFormat::getU64ID(page_id)); + wb.delPage(page_id); + }); + wn_ps->write(std::move(wb)); + + LOG_INFO( + log, + "Finish clean stale FAP data region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); +} + +bool CheckpointIngestInfo::cleanOnSuccess(TMTContext & tmt, UInt64 region_id) +{ + auto log = DB::Logger::get(); + LOG_INFO(log, "Erase CheckpointIngestInfo from disk on success, region_id={}", region_id); + removeFromLocal(tmt, region_id); + return true; +} + +bool CheckpointIngestInfo::forciblyClean( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool in_memory) +{ + auto log = DB::Logger::get(); // For most cases, ingest infos are deleted in `removeFromLocal`. - Page page = uni_ps->read(page_id, nullptr, snapshot, /*throw_on_not_exist*/ false); - if (unlikely(page.isValid())) + auto checkpoint_ptr = CheckpointIngestInfo::restore(tmt, proxy_helper, region_id, 0); + LOG_INFO( + log, + "Erase CheckpointIngestInfo from disk by force, region_id={} exist={} in_memory={}", + region_id, + checkpoint_ptr != nullptr, + in_memory); + if (unlikely(checkpoint_ptr)) { - CheckpointIngestInfo::removeFromLocal(tmt, region_id); + CheckpointIngestInfo::deleteWrittenData( + tmt, + checkpoint_ptr->getRegion(), + checkpoint_ptr->getRestoredSegments()); + removeFromLocal(tmt, region_id); return true; } return false; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h index bbf36eb4b51..edcb57055b9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include @@ -74,14 +75,21 @@ struct CheckpointIngestInfo UInt64 peer_id); // Only call to clean dangling CheckpointIngestInfo. - static bool forciblyClean(TMTContext & tmt, UInt64 region_id, bool pre_check = true); + static bool forciblyClean( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool in_memory); + static bool cleanOnSuccess(TMTContext & tmt, UInt64 region_id); + + FastAddPeerProto::CheckpointIngestInfoPersisted serializeMeta() const; private: friend class FastAddPeerContext; // Safety: raftstore ensures a region is handled in a single thread. // `persistToLocal` is called at a fixed place in this thread. void persistToLocal() const; - static void removeFromLocal(TMTContext & tmt, UInt64 region_id); + static void deleteWrittenData(TMTContext & tmt, RegionPtr region, const DM::Segments & segments); private: TMTContext & tmt; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 166df50b682..407443b4654 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -13,18 +13,20 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include #include #include #include #include #include -#include #include +#include #include #include #include @@ -47,6 +49,12 @@ namespace DB { + +namespace FailPoints +{ +extern const char force_set_fap_candidate_store_id[]; +} // namespace FailPoints + FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_str, std::string && region_str) { auto * apply = RawCppString::New(apply_str); @@ -60,6 +68,7 @@ FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_ std::vector getCandidateStoreIDsForRegion(TMTContext & tmt_context, UInt64 region_id, UInt64 current_store_id) { + fiu_do_on(FailPoints::force_set_fap_candidate_store_id, { return {1234}; }); auto pd_client = tmt_context.getPDClient(); auto resp = pd_client->getRegionByID(region_id); const auto & region = resp.region(); @@ -87,7 +96,7 @@ std::optional tryParseRegionInfoFromCheckpointData( ParsedCheckpointDataHolderPtr checkpoint_data_holder, UInt64 remote_store_id, UInt64 region_id, - TiFlashRaftProxyHelper * proxy_helper) + const TiFlashRaftProxyHelper * proxy_helper) { auto * log = &Poco::Logger::get("FastAddPeer"); RegionPtr region; @@ -103,7 +112,6 @@ std::optional tryParseRegionInfoFromCheckpointData( else { GET_METRIC(tiflash_fap_nomatch_reason, type_no_meta).Increment(); - LOG_DEBUG(log, "Failed to find region key region_id={}", region_id); return std::nullopt; } } @@ -174,7 +182,7 @@ bool tryResetPeerIdInRegion(RegionPtr region, const RegionLocalState & region_st std::variant FastAddPeerImplSelect( TMTContext & tmt, - TiFlashRaftProxyHelper * proxy_helper, + const TiFlashRaftProxyHelper * proxy_helper, uint64_t region_id, uint64_t new_peer_id) { @@ -185,9 +193,13 @@ std::variant FastAddPeerImplSelect( Stopwatch watch; std::unordered_map checked_seq_map; auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id); + + // Get candidate stores. const auto & settings = tmt.getContext().getSettingsRef(); - auto current_store_id = tmt.getKVStore()->getStoreMeta().id(); - auto candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id); + auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id(); + std::vector candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id); + if (candidate_store_ids.empty()) { LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id); @@ -195,16 +207,18 @@ std::variant FastAddPeerImplSelect( return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); } LOG_DEBUG(log, "Begin to select checkpoint for region_id={}", region_id); + // It will return with FastAddPeerRes or failed with timeout result wrapped in FastAddPeerRes. while (true) { // Check all candidate stores in this loop. for (const auto store_id : candidate_store_ids) { - RUNTIME_CHECK(store_id != current_store_id); + RUNTIME_CHECK(store_id != current_store_id, store_id, current_store_id); auto iter = checked_seq_map.find(store_id); auto checked_seq = (iter == checked_seq_map.end()) ? 0 : iter->second; auto [data_seq, checkpoint_data] = fap_ctx->getNewerCheckpointData(tmt.getContext(), store_id, checked_seq); + checked_seq_map[store_id] = data_seq; if (data_seq > checked_seq) { @@ -244,19 +258,33 @@ std::variant FastAddPeerImplSelect( { if (watch.elapsedSeconds() >= settings.fap_wait_checkpoint_timeout_seconds) { - // TODO(fap) Cancel and remove from AsyncTasks // This could happen if there are too many pending tasks in queue, - LOG_INFO(log, "FastAddPeer timeout region_id={} new_peer_id={}", region_id, new_peer_id); + LOG_INFO( + log, + "FastAddPeer timeout when select checkpoints region_id={} new_peer_id={}", + region_id, + new_peer_id); GET_METRIC(tiflash_fap_task_result, type_failed_timeout).Increment(); return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + SYNC_FOR("in_FastAddPeerImplSelect::before_sleep"); + if (cancel_handle->blockedWaitFor(std::chrono::milliseconds(1000))) + { + LOG_INFO(log, "FAP is canceled during peer selecting, region_id={}", region_id); + // Just remove the task from AsyncTasks, it will not write anything in disk during this stage. + // NOTE once canceled, Proxy should no longer polling `FastAddPeer`, since it will result in `OtherError`. + fap_ctx->tasks_trace->leakingDiscardTask(region_id); + // We immediately increase this metrics when cancel, since a canceled task may not be fetched. + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + } } } } FastAddPeerRes FastAddPeerImplWrite( TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 new_peer_id, CheckpointRegionInfoAndData && checkpoint, @@ -264,6 +292,7 @@ FastAddPeerRes FastAddPeerImplWrite( { auto log = Logger::get("FastAddPeer"); auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id); const auto & settings = tmt.getContext().getSettingsRef(); Stopwatch watch; @@ -273,19 +302,10 @@ FastAddPeerRes FastAddPeerImplWrite( auto [checkpoint_info, region, apply_state, region_state] = checkpoint; - auto & storages = tmt.getStorages(); auto keyspace_id = region->getKeyspaceID(); auto table_id = region->getMappedTableID(); - auto storage = storages.get(keyspace_id, table_id); - if (!storage) - { - // TODO(fap) add some ddl syncing to prevent fallback. - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Can't get storage engine keyspace_id={} table_id={}", - keyspace_id, - table_id); - } + const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region, tmt); + UNUSED(schema_snap); RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine"); auto dm_storage = std::dynamic_pointer_cast(storage); auto new_key_range = DM::RowKeyRange::fromRegionRange( @@ -294,7 +314,20 @@ FastAddPeerRes FastAddPeerImplWrite( storage->isCommonHandle(), storage->getRowKeyColumnSize()); + if (cancel_handle->isCanceled()) + { + LOG_INFO( + log, + "FAP is canceled before write, region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); + fap_ctx->cleanTask(tmt, proxy_helper, region_id, false); + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + } auto segments = dm_storage->buildSegmentsFromCheckpointInfo(new_key_range, checkpoint_info, settings); + fap_ctx->insertCheckpointIngestInfo( tmt, region_id, @@ -304,6 +337,19 @@ FastAddPeerRes FastAddPeerImplWrite( std::move(segments), start_time); + SYNC_FOR("in_FastAddPeerImplWrite::after_write_segments"); + if (cancel_handle->isCanceled()) + { + LOG_INFO( + log, + "FAP is canceled after write segments, region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); + fap_ctx->cleanTask(tmt, proxy_helper, region_id, false); + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + } // Write raft log to uni ps, we do this here because we store raft log seperately. // Currently, FAP only handle when the peer is newly created in this store. // TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots. @@ -322,17 +368,36 @@ FastAddPeerRes FastAddPeerImplWrite( }); auto wn_ps = tmt.getContext().getWriteNodePageStorage(); wn_ps->write(std::move(wb)); - + SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log"); + if (cancel_handle->isCanceled()) + { + LOG_INFO( + log, + "FAP is canceled after write raft log, region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); + fap_ctx->cleanTask(tmt, proxy_helper, region_id, false); + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + } + LOG_DEBUG( + log, + "Finish write FAP snapshot, region_id={} keyspace_id={} table_id={}", + region_id, + keyspace_id, + table_id); return genFastAddPeerRes( FastAddPeerStatus::Ok, apply_state.SerializeAsString(), region_state.region().SerializeAsString()); } +// This function executes FAP phase 1 from a thread in a dedicated pool. FastAddPeerRes FastAddPeerImpl( FastAddPeerContextPtr fap_ctx, TMTContext & tmt, - TiFlashRaftProxyHelper * proxy_helper, + const TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 new_peer_id, UInt64 start_time) @@ -349,11 +414,15 @@ FastAddPeerRes FastAddPeerImpl( auto elapsed = maybe_elapsed.value(); GET_METRIC(tiflash_fap_task_duration_seconds, type_queue_stage).Observe(elapsed / 1000.0); GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Decrement(); + // Consider phase1 -> restart -> phase1 -> fallback -> regular snapshot, + // We may find stale fap snapshot. + CheckpointIngestInfo::forciblyClean(tmt, proxy_helper, region_id, false); auto res = FastAddPeerImplSelect(tmt, proxy_helper, region_id, new_peer_id); if (std::holds_alternative(res)) { auto final_res = FastAddPeerImplWrite( tmt, + proxy_helper, region_id, new_peer_id, std::move(std::get(res)), @@ -390,44 +459,94 @@ FastAddPeerRes FastAddPeerImpl( } } -void ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id) +uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id) { auto log = Logger::get("FastAddPeer"); - LOG_INFO(log, "Begin apply fap snapshot, region_id={}, peer_id={}", region_id, peer_id); - GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Increment(); - SCOPE_EXIT({ GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Decrement(); }); Stopwatch watch_ingest; auto kvstore = tmt.getKVStore(); auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; auto checkpoint_ingest_info = fap_ctx->getOrRestoreCheckpointIngestInfo(tmt, proxy_helper, region_id, peer_id); - kvstore->handleIngestCheckpoint(checkpoint_ingest_info->getRegion(), checkpoint_ingest_info, tmt); - fap_ctx->cleanCheckpointIngestInfo(tmt, region_id); - GET_METRIC(tiflash_fap_task_duration_seconds, type_ingest_stage).Observe(watch_ingest.elapsedSeconds()); + if (!checkpoint_ingest_info) + { + // If fap is enabled, and this region is not currently exists on proxy's side, + // proxy will check if we have a fap snapshot first. + // If we don't, the snapshot should be a regular snapshot. + LOG_DEBUG( + log, + "Failed to get fap snapshot, it's regular snapshot, region_id={}, peer_id={}", + region_id, + peer_id); + return false; + } auto begin = checkpoint_ingest_info->beginTime(); - auto current = FAPAsyncTasks::getCurrentMillis(); - if (begin != 0) + if (kvstore->getRegion(region_id)) { - GET_METRIC(tiflash_fap_task_duration_seconds, type_total).Observe((current - begin) / 1000.0); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Don't support FAP for an existing region, region_id={} peer_id={} begin_time={}", + region_id, + peer_id, + begin); + } + LOG_INFO(log, "Begin apply fap snapshot, region_id={} peer_id={} begin_time={}", region_id, peer_id, begin); + // If there is `checkpoint_ingest_info`, it is exactly the data we want to ingest. Consider two scene: + // 1. If there was a failed FAP which failed to clean, its data will be overwritten by current FAP which has finished phase 1. + // 2. It is not possible that a restart happens at FAP phase 2, and a regular snapshot is sent, because snapshots can only be accepted once the previous snapshot it handled. + { + GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Increment(); + SCOPE_EXIT({ GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Decrement(); }); + kvstore->handleIngestCheckpoint(checkpoint_ingest_info->getRegion(), checkpoint_ingest_info, tmt); + fap_ctx->cleanTask(tmt, proxy_helper, region_id, true); + GET_METRIC(tiflash_fap_task_duration_seconds, type_ingest_stage).Observe(watch_ingest.elapsedSeconds()); + auto current = FAPAsyncTasks::getCurrentMillis(); + auto elapsed = (current - begin) / 1000.0; + if (begin != 0) + { + GET_METRIC(tiflash_fap_task_duration_seconds, type_total).Observe(elapsed); + } + LOG_INFO(log, "Finish apply fap snapshot, region_id={} peer_id={} elapsed={}", region_id, peer_id, elapsed); + GET_METRIC(tiflash_fap_task_result, type_succeed).Increment(); + return true; } - LOG_INFO( - log, - "Finish apply fap snapshot, region_id={} peer_id={} begin_time={} current_time={}", - region_id, - peer_id, - begin, - current); - GET_METRIC(tiflash_fap_task_result, type_succeed).Increment(); } -void ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id) +FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id) { try { RUNTIME_CHECK_MSG(server->tmt, "TMTContext is null"); RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null"); if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) - return; - ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id); + return FapSnapshotState::Other; + auto fap_ctx = server->tmt->getContext().getSharedContextDisagg()->fap_context; + // We just restore it, since if there is, it will soon be used. + if (fap_ctx->getOrRestoreCheckpointIngestInfo(*(server->tmt), server->proxy_helper, region_id, peer_id) + != nullptr) + { + return FapSnapshotState::Persisted; + } + return FapSnapshotState::NotFound; + } + catch (...) + { + DB::tryLogCurrentFatalException( + "QueryFapSnapshotState", + fmt::format("Failed query fap snapshot state region_id={} peer_id={}", region_id, peer_id)); + exit(-1); + } +} + +uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist) +{ + // TODO(fap) use assert_exist to check. + UNUSED(assert_exist); + try + { + RUNTIME_CHECK_MSG(server->tmt, "TMTContext is null"); + RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null"); + if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) + return false; + return ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id); } catch (...) { @@ -456,33 +575,48 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u // We need to schedule the task. auto current_time = FAPAsyncTasks::getCurrentMillis(); GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Increment(); - auto res - = fap_ctx->tasks_trace->addTask(region_id, [server, region_id, new_peer_id, fap_ctx, current_time]() { - std::string origin_name = getThreadName(); - SCOPE_EXIT({ setThreadName(origin_name.c_str()); }); - setThreadName("fap-builder"); - return FastAddPeerImpl( - fap_ctx, - *(server->tmt), - server->proxy_helper, - region_id, - new_peer_id, - current_time); - }); + auto job_func = [server, region_id, new_peer_id, fap_ctx, current_time]() { + std::string origin_name = getThreadName(); + SCOPE_EXIT({ setThreadName(origin_name.c_str()); }); + setThreadName("fap-builder"); + return FastAddPeerImpl( + fap_ctx, + *(server->tmt), + server->proxy_helper, + region_id, + new_peer_id, + current_time); + }; + auto res = fap_ctx->tasks_trace->addTaskWithCancel(region_id, job_func, [log, region_id, new_peer_id]() { + LOG_INFO( + log, + "FAP is canceled in queue due to timeout region_id={} new_peer_id={}", + region_id, + new_peer_id); + // It is already canceled in queue. + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + }); if (res) { GET_METRIC(tiflash_fap_task_state, type_ongoing).Increment(); - LOG_INFO(log, "Add new task new_peer_id={} region_id={}", new_peer_id, region_id); + LOG_INFO(log, "Add new task success, new_peer_id={} region_id={}", new_peer_id, region_id); } else { // If the queue is full, the task won't be registered, return OtherError for quick fallback. - LOG_ERROR(log, "Add new task fail(queue full) new_peer_id={} region_id={}", new_peer_id, region_id); + // If proxy still mistakenly polls canceled task, it will also fails here. + LOG_ERROR( + log, + "Add new task fail(queue full) or poll canceled, new_peer_id={} region_id={}", + new_peer_id, + region_id); GET_METRIC(tiflash_fap_task_result, type_failed_other).Increment(); return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", ""); } } + // If the task is canceled, the task will not be `isScheduled`. if (fap_ctx->tasks_trace->isReady(region_id)) { GET_METRIC(tiflash_fap_task_state, type_ongoing).Decrement(); @@ -500,11 +634,39 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u } else { + const auto & settings = server->tmt->getContext().getSettingsRef(); + auto maybe_elapsed = fap_ctx->tasks_trace->queryElapsed(region_id); + RUNTIME_CHECK_MSG( + maybe_elapsed.has_value(), + "Task not found, region_id={} new_peer_id={}", + region_id, + new_peer_id); + auto elapsed = maybe_elapsed.value(); + if (elapsed >= 1000 * settings.fap_task_timeout_seconds) + { + /// NOTE: Make sure FastAddPeer is the only place to cancel FAP phase 1. + // If the task is running, we have to wait it return on cancel and clean, + // otherwise a later regular may race with this clean. + auto prev_state = fap_ctx->tasks_trace->queryState(region_id); + LOG_INFO( + log, + "Cancel FAP due to timeout region_id={} new_peer_id={} prev_state={}", + region_id, + new_peer_id, + magic_enum::enum_name(prev_state)); + GET_METRIC(tiflash_fap_task_state, type_blocking_cancel_stage).Increment(); + { + [[maybe_unused]] auto s = fap_ctx->tasks_trace->blockedCancelRunningTask(region_id); + } + GET_METRIC(tiflash_fap_task_state, type_blocking_cancel_stage).Decrement(); + // Return Canceled because it is cancel from outside FAP worker. + return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); + } LOG_DEBUG(log, "Task is still pending new_peer_id={} region_id={}", new_peer_id, region_id); return genFastAddPeerRes(FastAddPeerStatus::WaitForData, "", ""); } } - catch (...) + catch (const Exception & e) { DB::tryLogCurrentException( "FastAddPeer", @@ -512,8 +674,35 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u "Failed when try to restore from checkpoint region_id={} new_peer_id={} {}", region_id, new_peer_id, - StackTrace().toString())); + e.message())); + return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", ""); + } + catch (...) + { + DB::tryLogCurrentException( + "FastAddPeer", + fmt::format( + "Failed when try to restore from checkpoint region_id={} new_peer_id={}", + region_id, + new_peer_id)); return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", ""); } } + +void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id) +{ + try + { + RUNTIME_CHECK_MSG(server->tmt, "TMTContext is null"); + RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null"); + if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) + return; + CheckpointIngestInfo::forciblyClean(*(server->tmt), server->proxy_helper, region_id, false); + } + catch (...) + { + tryLogCurrentFatalException(__PRETTY_FUNCTION__); + exit(-1); + } +} } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h index 5119fce89d3..cffe7db7d3a 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h @@ -1,4 +1,4 @@ -// Copyright 2023 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,93 +15,36 @@ #pragma once #include -#include #include namespace DB { -using FAPAsyncTasks = AsyncTasks, FastAddPeerRes>; struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; class Region; using RegionPtr = std::shared_ptr; using CheckpointRegionInfoAndData = std::tuple; -struct CheckpointIngestInfo; -using CheckpointIngestInfoPtr = std::shared_ptr; - -namespace DM -{ -class Segment; -using SegmentPtr = std::shared_ptr; -using Segments = std::vector; -} // namespace DM - -class FastAddPeerContext -{ -public: - explicit FastAddPeerContext(uint64_t thread_count = 0); - - // Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair - std::pair getNewerCheckpointData( - Context & context, - UInt64 store_id, - UInt64 required_seq); - - // Checkpoint ingest management - CheckpointIngestInfoPtr getOrRestoreCheckpointIngestInfo( - TMTContext & tmt, - const struct TiFlashRaftProxyHelper * proxy_helper, - UInt64 region_id, - UInt64 peer_id); - void insertCheckpointIngestInfo( - TMTContext & tmt, - UInt64 region_id, - UInt64 peer_id, - UInt64 remote_store_id, - RegionPtr region, - DM::Segments && segments, - UInt64 start_time); - std::optional tryGetCheckpointIngestInfo(UInt64 region_id) const; - void cleanCheckpointIngestInfo(TMTContext & tmt, UInt64 region_id); - - // Remove the checkpoint ingest info from memory. Only for testing. - void debugRemoveCheckpointIngestInfo(UInt64 region_id); - -public: - std::shared_ptr tasks_trace; - -private: - class CheckpointCacheElement - { - public: - explicit CheckpointCacheElement(const String & manifest_key_, UInt64 dir_seq_) - : manifest_key(manifest_key_) - , dir_seq(dir_seq_) - {} - - ParsedCheckpointDataHolderPtr getParsedCheckpointData(Context & context); - - private: - std::mutex mu; - String manifest_key; - UInt64 dir_seq; - ParsedCheckpointDataHolderPtr parsed_checkpoint_data; - }; - using CheckpointCacheElementPtr = std::shared_ptr; - - std::atomic temp_ps_dir_sequence = 0; - - std::mutex cache_mu; - // Store the latest manifest data for every store - // StoreId -> std::pair - std::unordered_map> checkpoint_cache_map; - - // Checkpoint that is persisted, but yet to be ingested into DeltaTree. - mutable std::mutex ingest_info_mu; - // RegionID->CheckpointIngestInfoPtr - std::unordered_map checkpoint_ingest_info_map; - - LoggerPtr log; -}; +FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_str, std::string && region_str); +std::variant FastAddPeerImplSelect( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + uint64_t region_id, + uint64_t new_peer_id); +FastAddPeerRes FastAddPeerImplWrite( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + UInt64 new_peer_id, + CheckpointRegionInfoAndData && checkpoint, + UInt64 start_time); + +FastAddPeerRes FastAddPeerImpl( + FastAddPeerContextPtr fap_ctx, + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + UInt64 new_peer_id, + UInt64 start_time); +uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id); } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.cpp index 3eea4573520..02f7731c1c1 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.cpp @@ -159,7 +159,7 @@ ParsedCheckpointDataHolderPtr buildParsedCheckpointData(Context & context, const { if (!record.entry.checkpoint_info.data_location.isValid()) { - // Eventually fallback to legacy snapshot + // Eventually fallback to regular snapshot throw Exception( ErrorCodes::LOGICAL_ERROR, "buildParsedCheckpointData: can't put remote page with empty data_location, page_id={}", @@ -185,7 +185,7 @@ ParsedCheckpointDataHolderPtr buildParsedCheckpointData(Context & context, const RUNTIME_CHECK(record.entry.checkpoint_info.has_value()); if (!record.entry.checkpoint_info.data_location.isValid()) { - // Eventually fallback to legacy snapshot + // Eventually fallback to regular snapshot throw Exception( ErrorCodes::LOGICAL_ERROR, "buildParsedCheckpointData: can't put external page with empty data_location, page_id={}", diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h index 3dea23f91e9..0165102c707 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h @@ -38,7 +38,8 @@ class EndToSegmentId bool isReady(std::unique_lock & lock) const; - // The caller must ensure `end_key_and_segment_id` is ordered + // The caller must ensure `end_key_and_segment_id` is ordered. + // Called in `Segment::readAllSegmentsMetaInfoInRange`. void build( std::unique_lock & lock, std::vector> && end_key_and_segment_ids); @@ -69,6 +70,7 @@ class ParsedCheckpointDataHolder { for (const auto & path : paths) { + LOG_DEBUG(DB::Logger::get(), "ParsedCheckpointDataHolder destroyed path={}", path); Poco::File(path).remove(true); } } diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp index 79d5b989abf..95295077045 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -128,8 +128,12 @@ CheckpointIngestInfoPtr FastAddPeerContext::getOrRestoreCheckpointIngestInfo( // The caller ensure there is no concurrency operation on the same region_id so // that we can call restore without locking `ingest_info_mu` auto info = CheckpointIngestInfo::restore(tmt, proxy_helper, region_id, peer_id); - std::scoped_lock lock(ingest_info_mu); - checkpoint_ingest_info_map.emplace(region_id, info); + if (info) + { + GET_METRIC(tiflash_fap_task_result, type_restore).Increment(); + std::scoped_lock lock(ingest_info_mu); + checkpoint_ingest_info_map.emplace(region_id, info); + } return info; } } @@ -140,24 +144,31 @@ void FastAddPeerContext::debugRemoveCheckpointIngestInfo(UInt64 region_id) checkpoint_ingest_info_map.erase(region_id); } -void FastAddPeerContext::cleanCheckpointIngestInfo(TMTContext & tmt, UInt64 region_id) +void FastAddPeerContext::cleanTask( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool is_succeed) { - // TODO(fap) We can move checkpoint_ingest_info to a dedicated queue, and schedule a timed task to clean it, if this costs much. + bool in_memory = false; + // TODO(fap) We use a dedicated queue and thread to clean, if this costs much. // However, we have to make sure the clean task will not override if a new fap snapshot of the same region comes later. - bool pre_check = true; { - // If it's still managed by fap context. std::scoped_lock lock(ingest_info_mu); auto iter = checkpoint_ingest_info_map.find(region_id); if (iter != checkpoint_ingest_info_map.end()) { - // the ingest info exist, do not need to check again later - pre_check = false; + in_memory = true; checkpoint_ingest_info_map.erase(iter); } } - // clean without locking `ingest_info_mu` - CheckpointIngestInfo::forciblyClean(tmt, region_id, pre_check); + if (is_succeed) + CheckpointIngestInfo::cleanOnSuccess(tmt, region_id); + else + { + RUNTIME_CHECK(proxy_helper != nullptr); + CheckpointIngestInfo::forciblyClean(tmt, proxy_helper, region_id, in_memory); + } } std::optional FastAddPeerContext::tryGetCheckpointIngestInfo(UInt64 region_id) const @@ -209,4 +220,39 @@ void FastAddPeerContext::insertCheckpointIngestInfo( info->persistToLocal(); } +void FastAddPeerContext::resolveFapSnapshotState( + TMTContext & tmt, + const TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool is_regular_snapshot) +{ + auto prev_state = tasks_trace->queryState(region_id); + /// --- The regular snapshot case --- + /// Legacy snapshot is not concurrent with FAP snapshot in both phase 1 and phase 2: + /// Can't be InQueue/Running because: + /// - Proxy will not actively cancel FAP, so it will not fallback if FAP phase 1 is still running. + /// Cancel in `FastAddPeer` is blocking, so a regular snapshot won't meet a canceling snapshot. + /// Can't be Finished because: + /// - A finished task must be fetched by proxy on the next `FastAddPeer`. + /// -- The destroy region case --- + /// When FAP goes on, it blocks all MsgAppend messages to this region peer, so the destroy won't happen. + /// If the region is destroyed now and sent to this store later, it must be with another peer_id. + RUNTIME_CHECK_MSG( + prev_state == FAPAsyncTasks::TaskState::NotScheduled, + "FastAddPeer: find scheduled fap task, region_id={} fap_state={} is_regular_snapshot={}", + region_id, + magic_enum::enum_name(prev_state), + is_regular_snapshot); + // 1. There leaves some non-ingested data on disk after restart. + // 2. There has been no fap at all. + // 3. FAP is enabled before, but disabled for now. + LOG_DEBUG( + log, + "FastAddPeer: no find ongoing fap task, region_id={} is_regular_snapshot={}", + region_id, + is_regular_snapshot); + // Still need to clean because there could be data left. + cleanTask(tmt, proxy_helper, region_id, false); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h new file mode 100644 index 00000000000..59169b3e012 --- /dev/null +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h @@ -0,0 +1,116 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +using FAPAsyncTasks = AsyncTasks, FastAddPeerRes>; +struct CheckpointInfo; +using CheckpointInfoPtr = std::shared_ptr; +class Region; +using RegionPtr = std::shared_ptr; +using CheckpointRegionInfoAndData + = std::tuple; +struct CheckpointIngestInfo; +using CheckpointIngestInfoPtr = std::shared_ptr; + +namespace DM +{ +class Segment; +using SegmentPtr = std::shared_ptr; +using Segments = std::vector; +} // namespace DM + +class FastAddPeerContext +{ +public: + explicit FastAddPeerContext(uint64_t thread_count = 0); + + // Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair + std::pair getNewerCheckpointData( + Context & context, + UInt64 store_id, + UInt64 required_seq); + + // Checkpoint ingest management + CheckpointIngestInfoPtr getOrRestoreCheckpointIngestInfo( + TMTContext & tmt, + const struct TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + UInt64 peer_id); + void insertCheckpointIngestInfo( + TMTContext & tmt, + UInt64 region_id, + UInt64 peer_id, + UInt64 remote_store_id, + RegionPtr region, + DM::Segments && segments, + UInt64 start_time); + std::optional tryGetCheckpointIngestInfo(UInt64 region_id) const; + void cleanTask( + TMTContext & tmt, + const struct TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool is_succeed); + void resolveFapSnapshotState( + TMTContext & tmt, + const struct TiFlashRaftProxyHelper * proxy_helper, + UInt64 region_id, + bool is_regular_snapshot); + + // Remove the checkpoint ingest info from memory. Only for testing. + void debugRemoveCheckpointIngestInfo(UInt64 region_id); + +public: + std::shared_ptr tasks_trace; + +private: + class CheckpointCacheElement + { + public: + explicit CheckpointCacheElement(const String & manifest_key_, UInt64 dir_seq_) + : manifest_key(manifest_key_) + , dir_seq(dir_seq_) + {} + + ParsedCheckpointDataHolderPtr getParsedCheckpointData(Context & context); + + private: + std::mutex mu; + String manifest_key; + UInt64 dir_seq; + ParsedCheckpointDataHolderPtr parsed_checkpoint_data; + }; + using CheckpointCacheElementPtr = std::shared_ptr; + + std::atomic temp_ps_dir_sequence = 0; + + std::mutex cache_mu; + // Store the latest manifest data for every store + // StoreId -> std::pair + std::unordered_map> checkpoint_cache_map; + + // Checkpoint that is persisted, but yet to be ingested into DeltaTree. + mutable std::mutex ingest_info_mu; + // RegionID->CheckpointIngestInfoPtr + std::unordered_map checkpoint_ingest_info_map; + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h b/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h index 97709c0cc4c..604afcde63f 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h +++ b/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h @@ -54,6 +54,7 @@ struct PreHandlingTrace : MutexLockWrap std::atomic abort_error; }; + // Prehandle use thread pool from Proxy's side, so it can't benefit from AsyncTasks. std::unordered_map> tasks; std::atomic ongoing_prehandle_subtask_count{0}; std::mutex cpu_resource_mut; @@ -81,10 +82,7 @@ struct PreHandlingTrace : MutexLockWrap tasks.erase(it); return b; } - else - { - return nullptr; - } + return nullptr; } bool hasTask(uint64_t region_id) NO_THREAD_SAFETY_ANALYSIS { diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index 93f0478f9ae..caf0e10a29c 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -16,12 +16,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -244,6 +246,7 @@ PrehandleResult KVStore::preHandleSnapshotToFiles( TMTContext & tmt) { new_region->beforePrehandleSnapshot(new_region->id(), deadline_index); + ongoing_prehandle_task_count.fetch_add(1); FAIL_POINT_PAUSE(FailPoints::pause_before_prehandle_snapshot); @@ -293,7 +296,7 @@ size_t KVStore::getMaxParallelPrehandleSize() const return total_concurrency; } -// If size is 0, do not parallel prehandle for this snapshot, which is legacy. +// If size is 0, do not parallel prehandle for this snapshot, which is regular. // If size is non-zero, use extra this many threads to prehandle. static inline std::pair, size_t> getSplitKey( LoggerPtr log, @@ -673,7 +676,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Can't get table"); } - // Get a gc safe point for compacting + // Get a gc safe point for compact filter. Timestamp gc_safepoint = 0; if (auto pd_client = tmt.getPDClient(); !pd_client->isMock()) { diff --git a/dbms/src/Storages/KVStore/tests/gtests_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp similarity index 100% rename from dbms/src/Storages/KVStore/tests/gtests_async_tasks.cpp rename to dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 43b4d7d0f46..9c1b648b88f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1181,10 +1181,12 @@ try ASSERT_EQ( e.message(), fmt::format( - "try to apply with older index, region_id={} applied_index={} new_index={}", + "try to apply with older index, region_id={} applied_index={} new_index={}: (while " + "applyPreHandledSnapshot region_id={})", region_id, 8, - 6)); + 6, + region_id)); } } diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 8f766be1bcd..1e13eaf0259 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include +#include #include #include #include @@ -40,14 +42,10 @@ using raft_serverpb::RegionLocalState; namespace DB { -FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_str, std::string && region_str); -FastAddPeerRes FastAddPeerImplWrite( - TMTContext & tmt, - UInt64 region_id, - UInt64 new_peer_id, - CheckpointRegionInfoAndData && checkpoint, - UInt64 start_time); -void ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id); +namespace FailPoints +{ +extern const char force_set_fap_candidate_store_id[]; +} // namespace FailPoints namespace FailPoints { @@ -56,17 +54,45 @@ extern const char force_not_clean_fap_on_destroy[]; namespace tests { + +struct FAPTestOpt +{ + bool mock_add_new_peer = false; + bool persist_empty_segment = false; +}; + class RegionKVStoreTestFAP : public KVStoreTestBase { public: void SetUp() override { - KVStoreTestBase::SetUp(); + auto & global_context = TiFlashTestEnv::getGlobalContext(); + // clean data and create path pool instance + path_pool = TiFlashTestEnv::createCleanPathPool(test_path); + + initStorages(); + + // Must be called before `initializeWriteNodePageStorageIfNeed` to have S3 lock services registered. DB::tests::TiFlashTestEnv::enableS3Config(); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); - auto & global_context = TiFlashTestEnv::getGlobalContext(); + orig_disagg_mode = global_context.getSharedContextDisagg()->disaggregated_mode; + global_context.getSharedContextDisagg()->disaggregated_mode = DisaggregatedMode::Storage; + if (global_context.getWriteNodePageStorage() == nullptr) + { + already_initialize_write_ps = false; + orig_mode = global_context.getPageStorageRunMode(); + global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); + global_context.tryReleaseWriteNodePageStorageForTest(); + global_context.initializeWriteNodePageStorageIfNeed(*path_pool); + } + else + { + // It will currently happen in `initStorages` when we call `getContext`. + already_initialize_write_ps = true; + } + if (global_context.getSharedContextDisagg()->remote_data_store == nullptr) { already_initialize_data_store = false; @@ -79,34 +105,33 @@ class RegionKVStoreTestFAP : public KVStoreTestBase { already_initialize_data_store = true; } - if (global_context.getWriteNodePageStorage() == nullptr) - { - already_initialize_write_ps = false; - orig_mode = global_context.getPageStorageRunMode(); - global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); - global_context.tryReleaseWriteNodePageStorageForTest(); - global_context.initializeWriteNodePageStorageIfNeed(global_context.getPathPool()); - } - else + + global_context.getSharedContextDisagg()->initFastAddPeerContext(25); + proxy_instance = std::make_unique(); + proxy_helper = proxy_instance->generateProxyHelper(); + KVStoreTestBase::reloadKVSFromDisk(false); { - already_initialize_write_ps = true; + auto store = metapb::Store{}; + store.set_id(1234); + kvstore->setStore(store); + ASSERT_EQ(kvstore->getStoreID(), store.id()); } - orig_mode = global_context.getPageStorageRunMode(); - global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); - global_context.getSharedContextDisagg()->initFastAddPeerContext(25); - KVStoreTestBase::SetUp(); + LOG_INFO(log, "Finished setup"); } void TearDown() override { auto & global_context = TiFlashTestEnv::getGlobalContext(); + KVStoreTestBase::TearDown(); + global_context.getSharedContextDisagg()->fap_context.reset(); if (!already_initialize_data_store) { global_context.getSharedContextDisagg()->remote_data_store = nullptr; } - global_context.setPageStorageRunMode(orig_mode); + global_context.getSharedContextDisagg()->disaggregated_mode = orig_disagg_mode; if (!already_initialize_write_ps) { + global_context.tryReleaseWriteNodePageStorageForTest(); global_context.setPageStorageRunMode(orig_mode); } auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); @@ -114,7 +139,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase DB::tests::TiFlashTestEnv::disableS3Config(); } - CheckpointRegionInfoAndData prepareForRestart(); + CheckpointRegionInfoAndData prepareForRestart(FAPTestOpt); protected: void dumpCheckpoint() @@ -159,6 +184,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase bool already_initialize_data_store = false; bool already_initialize_write_ps = false; DB::PageStorageRunMode orig_mode; + DisaggregatedMode orig_disagg_mode; }; void persistAfterWrite( @@ -179,6 +205,64 @@ void persistAfterWrite( ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0, 0, 0), true); } +template +void eventuallyThrow(F f) +{ + using namespace std::chrono_literals; + bool thrown = false; + for (int i = 0; i < 5; i++) + { + try + { + f(); + } + catch (...) + { + thrown = true; + break; + } + std::this_thread::sleep_for(500ms); + } + ASSERT_TRUE(thrown); +} + +template +void eventuallyPredicate(F f) +{ + using namespace std::chrono_literals; + for (int i = 0; i < 5; i++) + { + if (f()) + return; + std::this_thread::sleep_for(500ms); + } + ASSERT_TRUE(false); +} + +void assertNoSegment( + TMTContext & tmt, + const RegionPtr & region, + const FastAddPeerProto::CheckpointIngestInfoPersisted & ingest_info_persisted) +{ + auto & storages = tmt.getStorages(); + auto keyspace_id = region->getKeyspaceID(); + auto table_id = region->getMappedTableID(); + auto storage = storages.get(keyspace_id, table_id); + RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT); + auto dm_storage = std::dynamic_pointer_cast(storage); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + for (const auto & seg_persisted : ingest_info_persisted.segments()) + { + ReadBufferFromString buf(seg_persisted.segment_meta()); + DM::Segment::SegmentMetaInfo segment_info; + readSegmentMetaInfo(buf, segment_info); + + // Delta layer is persisted with `CheckpointIngestInfoPersisted`. + ReadBufferFromString buf_stable(seg_persisted.stable_meta()); + EXPECT_THROW(DM::StableValueSpace::restore(*dm_context, buf_stable, segment_info.stable_id), Exception); + } +} + TEST_F(RegionKVStoreTestFAP, RestoreRaftState) try { @@ -224,10 +308,16 @@ try region_state.ParseFromArray(page.data.begin(), page.data.size()); } + { + auto region_key = UniversalPageIdFormat::toKVStoreKey(region_id); + auto page = checkpoint_data_holder->getUniversalPageStorage() + ->read(region_key, /*read_limiter*/ nullptr, {}, /*throw_on_not_exist*/ false); + RUNTIME_CHECK(page.isValid()); + } + ASSERT_TRUE(apply_state == region->getApply()); ASSERT_TRUE(region_state == region->getState()); } - { auto [data_seq, checkpoint_data_holder] = fap_context->getNewerCheckpointData(global_context, store_id, upload_sequence); @@ -258,16 +348,15 @@ void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRan ASSERT_INPUTSTREAM_NROWS(in, rows); } -CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart() +CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt opt) { auto & global_context = TiFlashTestEnv::getGlobalContext(); - global_context.getSharedContextDisagg()->disaggregated_mode = DisaggregatedMode::Storage; uint64_t region_id = 1; auto peer_id = 1; - initStorages(); KVStore & kvs = getKVS(); global_context.getTMTContext().debugSetKVStore(kvstore); auto page_storage = global_context.getWriteNodePageStorage(); + TableID table_id = proxy_instance->bootstrapTable(global_context, kvs, global_context.getTMTContext()); auto fap_context = global_context.getSharedContextDisagg()->fap_context; proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt); @@ -277,28 +366,51 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart() region->addPeer(store_id, peer_id, metapb::PeerRole::Learner); // Write some data, and persist meta. - auto k1 = RecordKVFormat::genKey(table_id, 1, 111); - auto && [value_write1, value_default1] = proxy_instance->generateTiKVKeyValue(111, 999); - auto [index, term] = proxy_instance->rawWrite( - region_id, - {k1, k1}, - {value_default1, value_write1}, - {WriteCmdType::Put, WriteCmdType::Put}, - {ColumnFamilyType::Default, ColumnFamilyType::Write}); + UInt64 index = 0; + if (!opt.persist_empty_segment) + { + auto k1 = RecordKVFormat::genKey(table_id, 1, 111); + auto && [value_write1, value_default1] = proxy_instance->generateTiKVKeyValue(111, 999); + UInt64 term = 0; + std::tie(index, term) = proxy_instance->rawWrite( + region_id, + {k1, k1}, + {value_default1, value_write1}, + {WriteCmdType::Put, WriteCmdType::Put}, + {ColumnFamilyType::Default, ColumnFamilyType::Write}); + } + kvs.setRegionCompactLogConfig(0, 0, 0, 0); + if (opt.mock_add_new_peer) + { + *kvs.getRegion(region_id)->mutMeta().debugMutRegionState().getMutRegion().add_peers() = createPeer(2333, true); + proxy_instance->getRegion(region_id)->addPeer(store_id, 2333, metapb::PeerRole::Learner); + } persistAfterWrite(global_context, kvs, proxy_instance, page_storage, region_id, index); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); RUNTIME_CHECK(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); dumpCheckpoint(); + LOG_INFO(log, "build checkpoint manifest from {}", upload_sequence); const auto manifest_key = S3::S3Filename::newCheckpointManifest(kvs.getStoreID(), upload_sequence).toFullKey(); auto checkpoint_info = std::make_shared(); checkpoint_info->remote_store_id = kvs.getStoreID(); checkpoint_info->region_id = 1000; checkpoint_info->checkpoint_data_holder = buildParsedCheckpointData(global_context, manifest_key, /*dir_seq*/ 100); + { + auto region_key = UniversalPageIdFormat::toKVStoreKey(region_id); + auto page = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage() + ->read(region_key, /*read_limiter*/ nullptr, {}, /*throw_on_not_exist*/ false); + RUNTIME_CHECK(page.isValid()); + } checkpoint_info->temp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage(); RegionPtr kv_region = kvs.getRegion(1); + { + auto task_lock = kvs.genTaskLock(); + auto region_lock = kvs.region_manager.genRegionTaskLock(region_id); + kvs.removeRegion(region_id, false, global_context.getTMTContext().getRegionTable(), task_lock, region_lock); + } CheckpointRegionInfoAndData mock_data = std::make_tuple( checkpoint_info, kv_region, @@ -311,14 +423,23 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart() TEST_F(RegionKVStoreTestFAP, RestoreFromRestart1) try { - CheckpointRegionInfoAndData mock_data = prepareForRestart(); - KVStore & kvs = getKVS(); - RegionPtr kv_region = kvs.getRegion(1); + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + RegionPtr kv_region = std::get<1>(mock_data); auto & global_context = TiFlashTestEnv::getGlobalContext(); auto fap_context = global_context.getSharedContextDisagg()->fap_context; uint64_t region_id = 1; - FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + + std::mutex exe_mut; + std::unique_lock exe_lock(exe_mut); + fap_context->tasks_trace->addTask(region_id, [&]() { + // Keep the task in `tasks_trace` to prevent from canceling. + std::scoped_lock wait_exe_lock(exe_mut); + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); + FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); + exe_lock.unlock(); + fap_context->tasks_trace->fetchResult(region_id); // Remove the checkpoint ingest info and region from memory. // Testing whether FAP can be handled properly after restart. @@ -346,10 +467,10 @@ try 1); } // CheckpointIngestInfo is removed. - ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); - EXPECT_THROW( - CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333), - Exception); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + ASSERT_FALSE(fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); } CATCH @@ -357,21 +478,30 @@ CATCH TEST_F(RegionKVStoreTestFAP, RestoreFromRestart2) try { - CheckpointRegionInfoAndData mock_data = prepareForRestart(); - KVStore & kvs = getKVS(); - RegionPtr kv_region = kvs.getRegion(1); + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + RegionPtr kv_region = std::get<1>(mock_data); auto & global_context = TiFlashTestEnv::getGlobalContext(); auto fap_context = global_context.getSharedContextDisagg()->fap_context; uint64_t region_id = 1; - FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + std::mutex exe_mut; + std::unique_lock exe_lock(exe_mut); + fap_context->tasks_trace->addTask(region_id, [&]() { + // Keep the task in `tasks_trace` to prevent from canceling. + std::scoped_lock wait_exe_lock(exe_mut); + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); + FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); + exe_lock.unlock(); + fap_context->tasks_trace->fetchResult(region_id); + fap_context->debugRemoveCheckpointIngestInfo(region_id); kvstore->handleDestroy(region_id, global_context.getTMTContext()); // CheckpointIngestInfo is removed. - ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); - EXPECT_THROW( - CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333), - Exception); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + ASSERT_FALSE(fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); } CATCH @@ -379,18 +509,26 @@ CATCH TEST_F(RegionKVStoreTestFAP, RestoreFromRestart3) try { - CheckpointRegionInfoAndData mock_data = prepareForRestart(); + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); KVStore & kvs = getKVS(); - RegionPtr kv_region = kvs.getRegion(1); + RegionPtr kv_region = std::get<1>(mock_data); auto & global_context = TiFlashTestEnv::getGlobalContext(); auto fap_context = global_context.getSharedContextDisagg()->fap_context; uint64_t region_id = 1; + std::mutex exe_mut; + std::unique_lock exe_lock(exe_mut); + fap_context->tasks_trace->addTask(region_id, [&]() { + // Keep the task in `tasks_trace` to prevent from canceling. + std::scoped_lock wait_exe_lock(exe_mut); + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); // Will generate and persist some information in local ps, which will not be uploaded. - FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); dumpCheckpoint(); - FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); + exe_lock.unlock(); auto in_mem_ingest_info = fap_context->getOrRestoreCheckpointIngestInfo( global_context.getTMTContext(), proxy_helper.get(), @@ -412,5 +550,282 @@ try } CATCH +// Test cancel from peer select +TEST_F(RegionKVStoreTestFAP, Cancel1) +try +{ + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + EngineStoreServerWrap server = { + .tmt = &global_context.getTMTContext(), + .proxy_helper = proxy_helper.get(), + }; + + kvstore->getStore().store_id.store(1, std::memory_order_release); + kvstore->debugMutStoreMeta().set_id(1); + ASSERT_EQ(1, kvstore->getStoreID()); + ASSERT_EQ(1, kvstore->clonedStoreMeta().id()); + FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id); + auto sp = SyncPointCtl::enableInScope("in_FastAddPeerImplSelect::before_sleep"); + // The FAP will fail because it doesn't contain the new peer in region meta. + auto t = std::thread([&]() { FastAddPeer(&server, region_id, 2333); }); + // Retry for some times, then cancel. + sp.waitAndPause(); + sp.next(); + sp.waitAndPause(); + fap_context->tasks_trace->asyncCancelTask(region_id); + sp.next(); + sp.disable(); + t.join(); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); + FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id); +} +CATCH + +// Test cancel from write +TEST_F(RegionKVStoreTestFAP, Cancel2) +try +{ + using namespace std::chrono_literals; + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{ + .mock_add_new_peer = true, + }); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + EngineStoreServerWrap server = { + .tmt = &global_context.getTMTContext(), + .proxy_helper = proxy_helper.get(), + }; + + kvstore->getStore().store_id.store(1, std::memory_order_release); + kvstore->debugMutStoreMeta().set_id(1); + ASSERT_EQ(1, kvstore->getStoreID()); + ASSERT_EQ(1, kvstore->clonedStoreMeta().id()); + FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id); + auto sp = SyncPointCtl::enableInScope("in_FastAddPeerImplWrite::after_write_segments"); + // The FAP will fail because it doesn't contain the new peer in region meta. + auto t = std::thread([&]() { FastAddPeer(&server, region_id, 2333); }); + sp.waitAndPause(); + // Make sure the data is written. + auto maybe_info = fap_context->getOrRestoreCheckpointIngestInfo( + global_context.getTMTContext(), + proxy_helper.get(), + region_id, + 2333); + ASSERT_NE(maybe_info, nullptr); + auto ingest_info_persisted = maybe_info->serializeMeta(); + auto region = maybe_info->getRegion(); + fap_context->tasks_trace->asyncCancelTask(region_id); + sp.next(); + sp.disable(); + t.join(); + // Cancel async tasks, and make sure the data is cleaned after limited time. + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id)); + FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id); + assertNoSegment(global_context.getTMTContext(), region, ingest_info_persisted); +} +CATCH + +// Test cancel and destroy +TEST_F(RegionKVStoreTestFAP, Cancel3) +try +{ + using namespace std::chrono_literals; + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{ + .mock_add_new_peer = true, + }); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + EngineStoreServerWrap server = { + .tmt = &global_context.getTMTContext(), + .proxy_helper = proxy_helper.get(), + }; + + kvstore->getStore().store_id.store(1, std::memory_order_release); + kvstore->debugMutStoreMeta().set_id(1); + ASSERT_EQ(1, kvstore->getStoreID()); + ASSERT_EQ(1, kvstore->clonedStoreMeta().id()); + FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id); + auto sp = SyncPointCtl::enableInScope("in_FastAddPeerImplWrite::after_write_segments"); + auto t = std::thread([&]() { FastAddPeer(&server, region_id, 2333); }); + sp.waitAndPause(); + EXPECT_THROW(kvstore->handleDestroy(region_id, global_context.getTMTContext()), Exception); + sp.next(); + sp.disable(); + t.join(); + auto prev_fap_task_timeout_seconds = server.tmt->getContext().getSettingsRef().fap_task_timeout_seconds; + SCOPE_EXIT({ server.tmt->getContext().getSettingsRef().fap_task_timeout_seconds = prev_fap_task_timeout_seconds; }); + server.tmt->getContext().getSettingsRef().fap_task_timeout_seconds = 0; + // Use another call to cancel + FastAddPeer(&server, region_id, 2333); + LOG_INFO(log, "Try another destroy"); + kvstore->handleDestroy(region_id, global_context.getTMTContext()); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + // Wait async cancel in `FastAddPeerImplWrite`. + ASSERT_FALSE(fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); + FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id); +} +CATCH + +// Test cancel and regular snapshot +TEST_F(RegionKVStoreTestFAP, Cancel4) +try +{ + using namespace std::chrono_literals; + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{ + .mock_add_new_peer = true, + }); + KVStore & kvs = getKVS(); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + EngineStoreServerWrap server = { + .tmt = &global_context.getTMTContext(), + .proxy_helper = proxy_helper.get(), + }; + + kvstore->getStore().store_id.store(1, std::memory_order_release); + kvstore->debugMutStoreMeta().set_id(1); + ASSERT_EQ(1, kvstore->getStoreID()); + ASSERT_EQ(1, kvstore->clonedStoreMeta().id()); + FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id); + auto sp = SyncPointCtl::enableInScope("in_FastAddPeerImplWrite::after_write_segments"); + auto t = std::thread([&]() { FastAddPeer(&server, region_id, 2333); }); + sp.waitAndPause(); + + // Test of ingesting multiple files with MultiSSTReader. + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{region_id, 1, ColumnFamilyType::Default}; + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + // Exception: found running scheduled fap task + EXPECT_THROW( + proxy_instance->snapshot(kvs, global_context.getTMTContext(), region_id, {default_cf}, 10, 10, std::nullopt), + Exception); + sp.next(); + sp.disable(); + t.join(); + + server.tmt->getContext().getSettingsRef().fap_task_timeout_seconds = 0; + // Use another call to cancel + FastAddPeer(&server, region_id, 2333); + LOG_INFO(log, "Try another snapshot"); + proxy_instance->snapshot( + kvs, + global_context.getTMTContext(), + region_id, + {default_cf}, + kv_region->cloneMetaRegion(), + 2, + 11, + 11, + std::nullopt, + false); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + // Wait async cancel in `FastAddPeerImplWrite`. + ASSERT_FALSE(fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); + FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id); +} +CATCH + +TEST_F(RegionKVStoreTestFAP, EmptySegment) +try +{ + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{.persist_empty_segment = true}); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + fap_context->tasks_trace->addTask(region_id, []() { + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); + EXPECT_THROW( + FastAddPeerImplWrite( + global_context.getTMTContext(), + proxy_helper.get(), + region_id, + 2333, + std::move(mock_data), + 0), + Exception); +} +CATCH + +TEST_F(RegionKVStoreTestFAP, OnExistingPeer) +try +{ + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + KVStore & kvs = getKVS(); + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{region_id, 1, ColumnFamilyType::Default}; + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + proxy_instance->snapshot( + kvs, + global_context.getTMTContext(), + region_id, + {default_cf}, + kv_region->cloneMetaRegion(), + 2, + 10, + 10, + std::nullopt, + false); + + std::mutex exe_mut; + std::unique_lock exe_lock(exe_mut); + fap_context->tasks_trace->addTask(region_id, [&]() { + // Keep the task in `tasks_trace` to prevent from canceling. + std::scoped_lock wait_exe_lock(exe_mut); + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); + FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); + exe_lock.unlock(); + fap_context->tasks_trace->fetchResult(region_id); + + // Make sure prehandling will not clean fap snapshot. + std::vector ssts; + SSTViewVec snaps{ssts.data(), ssts.size()}; + kvs.preHandleSnapshotToFiles(kv_region, snaps, 100, 100, std::nullopt, global_context.getTMTContext()); + + EXPECT_THROW(ApplyFapSnapshotImpl(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333), Exception); +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp index 992b54202fb..d22ae3db9e4 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp @@ -131,7 +131,7 @@ bool UniversalPageStorageService::uploadCheckpoint() auto & tmt = global_context.getTMTContext(); - auto store_info = tmt.getKVStore()->getStoreMeta(); + auto store_info = tmt.getKVStore()->clonedStoreMeta(); if (store_info.id() == InvalidStoreID) { LOG_INFO(log, "Skip checkpoint because store meta is not initialized"); diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h b/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h index 9cd158ca855..4d40d9c76f2 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h +++ b/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h @@ -196,6 +196,8 @@ class UniversalWriteBatch : private boost::noncopyable bool empty() const { return writes.empty(); } + size_t size() const { return writes.size(); } + const Writes & getWrites() const { return writes; } Writes & getMutWrites() { return writes; } diff --git a/dbms/src/Storages/Page/WriteBatchImpl.h b/dbms/src/Storages/Page/WriteBatchImpl.h index cfb6cc812d6..8cbec70528f 100644 --- a/dbms/src/Storages/Page/WriteBatchImpl.h +++ b/dbms/src/Storages/Page/WriteBatchImpl.h @@ -217,6 +217,8 @@ class WriteBatch : private boost::noncopyable total_data_size = 0; } + size_t size() const { return writes.size(); } + SequenceID getSequence() const { return sequence; } size_t getTotalDataSize() const { return total_data_size; } diff --git a/dbms/src/Storages/Page/WriteBatchWrapperImpl.h b/dbms/src/Storages/Page/WriteBatchWrapperImpl.h index fb8bc2eb73f..b2f6d60982e 100644 --- a/dbms/src/Storages/Page/WriteBatchWrapperImpl.h +++ b/dbms/src/Storages/Page/WriteBatchWrapperImpl.h @@ -156,6 +156,14 @@ class WriteBatchWrapper : private boost::noncopyable return uwb->empty(); } + size_t size() const + { + if (wb) + return wb->size(); + else + return uwb->size(); + } + void clear() { if (wb)