Skip to content

Commit

Permalink
FAP: Support cancel FAP tasks (#8458)
Browse files Browse the repository at this point in the history
close #8382
  • Loading branch information
CalvinNeo committed Jan 11, 2024
1 parent 4e5e3e4 commit 7779b00
Show file tree
Hide file tree
Showing 37 changed files with 1,315 additions and 342 deletions.
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 22 files
+133 −60 proxy_components/engine_store_ffi/src/core/fast_add_peer.rs
+175 −0 proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs
+2 −0 proxy_components/engine_store_ffi/src/core/forward_raft/mod.rs
+28 −8 proxy_components/engine_store_ffi/src/core/forward_raft/region.rs
+23 −87 proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs
+1 −0 proxy_components/engine_tiflash/src/mixed_engine/elementary.rs
+47 −20 proxy_components/engine_tiflash/src/proxy_utils/cached_region_info_manager.rs
+5 −0 proxy_components/engine_tiflash/src/proxy_utils/mod.rs
+3 −0 proxy_components/engine_tiflash/src/proxy_utils/proxy_ext.rs
+19 −4 proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs
+60 −5 proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs
+4 −0 proxy_components/mock-engine-store/src/mock_store/mock_snapshot_impls.rs
+30 −2 proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
+27 −2 proxy_components/proxy_ffi/src/interfaces.rs
+4 −0 proxy_components/proxy_server/src/config.rs
+5 −5 proxy_components/proxy_server/src/setup.rs
+243 −444 proxy_tests/proxy/shared/fast_add_peer/fp.rs
+4 −0 proxy_tests/proxy/shared/fast_add_peer/mod.rs
+475 −0 proxy_tests/proxy/shared/fast_add_peer/simple.rs
+1 −1 proxy_tests/proxy/v2_compat/tablet_snapshot.rs
+1 −1 raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
+14 −1 raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ namespace DB
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block) \
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_ingesting_stage, {{"type", "ingesting_stage"}}), \
F(type_writing_stage, {{"type", "writing_stage"}}), \
F(type_queueing_stage, {{"type", "queueing_stage"}}), \
F(type_blocking_cancel_stage, {{"type", "blocking_cancel_stage"}}), \
F(type_selecting_stage, {{"type", "selecting_stage"}})) \
M(tiflash_fap_nomatch_reason, \
"", \
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Debug/MockKVStore/MockProxyRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ raft_serverpb::RegionLocalState MockProxyRegion::getState() NO_THREAD_SAFETY_ANA
return state;
}

raft_serverpb::RegionLocalState & MockProxyRegion::mutState() NO_THREAD_SAFETY_ANALYSIS
{
auto _ = genLockGuard();
return state;
}

raft_serverpb::RaftApplyState MockProxyRegion::getApply() NO_THREAD_SAFETY_ANALYSIS
{
auto _ = genLockGuard();
Expand Down
62 changes: 48 additions & 14 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,35 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle)
{
auto region = getRegion(region_id);
auto old_kv_region = kvs.getRegion(region_id);
RUNTIME_CHECK(old_kv_region != nullptr);
return snapshot(
kvs,
tmt,
region_id,
std::move(cfs),
old_kv_region->cloneMetaRegion(),
old_kv_region->mutMeta().peerId(),
index,
term,
deadline_index,
cancel_after_prehandle);
}

std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::vector<MockSSTGenerator> && cfs,
metapb::Region && region_meta,
UInt64 peer_id,
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle)
{
auto region = getRegion(region_id);
RUNTIME_CHECK(region != nullptr);
// We have catch up to index by snapshot.
// So we assume there are new data updated, so we inc index by 1.
if (index == 0)
Expand All @@ -605,8 +632,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
term = region->getLatestCommitTerm();
}

auto new_kv_region
= kvs.genRegionPtr(old_kv_region->cloneMetaRegion(), old_kv_region->mutMeta().peerId(), index, term);
auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term);
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
new_kv_region->setApplied(index, term);
Expand All @@ -621,20 +647,28 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
}
}
SSTViewVec snaps{ssts.data(), ssts.size()};
auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt);

auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)};
if (cancel_after_prehandle)
try
{
kvs.releasePreHandledSnapshot(rg, tmt);
auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt);
auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)};
if (cancel_after_prehandle)
{
kvs.releasePreHandledSnapshot(rg, tmt);
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
}
kvs.checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(rg, tmt);
// Though it is persisted earlier in real proxy, but the state is changed to Normal here.
region->updateAppliedIndex(index, true);
// Region changes during applying snapshot, must re-get.
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
}
kvs.checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(rg, tmt);
// Though it is persisted earlier in real proxy, but the state is changed to Normal here.
region->updateAppliedIndex(index, true);

// Region changes during applying snapshot, must re-get.
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
catch (const Exception & e)
{
LOG_ERROR(log, "mock apply snapshot error {}", e.message());
e.rethrow();
}
LOG_FATAL(DB::Logger::get(), "Should not happen");
exit(-1);
}

TableID MockRaftStoreProxy::bootstrapTable(Context & ctx, KVStore & kvs, TMTContext & tmt, bool drop_at_first)
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t star
struct MockProxyRegion : MutexLockWrap
{
raft_serverpb::RegionLocalState getState();
raft_serverpb::RegionLocalState & mutState();
raft_serverpb::RaftApplyState getApply();
void persistAppliedIndex();
void persistAppliedIndex(const std::lock_guard<Mutex> & lock);
Expand Down Expand Up @@ -231,6 +232,17 @@ struct MockRaftStoreProxy : MutexLockWrap
std::vector<std::pair<std::string, std::string>> && ranges,
metapb::RegionEpoch old_epoch);

std::tuple<RegionPtr, PrehandleResult> snapshot(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::vector<MockSSTGenerator> && cfs,
metapb::Region && region_meta,
UInt64 peer_id,
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle);
std::tuple<RegionPtr, PrehandleResult> snapshot(
KVStore & kvs,
TMTContext & tmt,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ struct Settings
M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \
\
M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 80, "The max time wait for a usable checkpoint for FAP") \
M(SettingUInt64, fap_task_timeout_seconds, 120, "The max time FAP can take before fallback") \
M(SettingUInt64, fap_handle_concurrency, 25, "The number of threads for handling FAP tasks") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/SharedContexts/Disagg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <Storages/DeltaMerge/Remote/RNDeltaIndexCache.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache.h>
#include <Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathPool.h>

Expand Down Expand Up @@ -101,6 +101,7 @@ void SharedContextDisagg::initRemoteDataStore(const FileProviderPtr & file_provi

void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur)
{
LOG_INFO(Logger::get(), "Init FAP Context, concurrency={}", fap_concur);
fap_context = std::make_shared<FastAddPeerContext>(fap_concur);
}

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/DeltaMerge/Segment_fwd.h>
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h>
#include <Storages/Page/PageStorage_fwd.h>
Expand Down Expand Up @@ -76,8 +77,6 @@ namespace tests
class DeltaMergeStoreTest;
}

inline static const PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1;

struct SegmentStats
{
UInt64 segment_id = 0;
Expand Down
54 changes: 35 additions & 19 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/PathPool.h>

Expand Down Expand Up @@ -1145,26 +1145,42 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo(
"Build checkpoint from remote, store_id={} region_id={}",
checkpoint_info->remote_store_id,
checkpoint_info->region_id);
auto segment_meta_infos = Segment::readAllSegmentsMetaInfoInRange(*dm_context, range, checkpoint_info);
LOG_INFO(log, "Ingest checkpoint segments num {}", segment_meta_infos.size());
WriteBatches wbs{*dm_context->storage_pool};
auto restored_segments = Segment::createTargetSegmentsFromCheckpoint( //
log,
*dm_context,
checkpoint_info->remote_store_id,
segment_meta_infos,
range,
checkpoint_info->temp_ps,
wbs);

if (restored_segments.empty())
try
{
LOG_DEBUG(log, "No segments to ingest.");
return {};
auto segment_meta_infos = Segment::readAllSegmentsMetaInfoInRange(*dm_context, range, checkpoint_info);
auto restored_segments = Segment::createTargetSegmentsFromCheckpoint( //
log,
*dm_context,
checkpoint_info->remote_store_id,
segment_meta_infos,
range,
checkpoint_info->temp_ps,
wbs);
if (restored_segments.empty())
{
return {};
}
wbs.writeLogAndData();
LOG_INFO(
log,
"Finish write fap checkpoint, region_id={} segments_num={}",
checkpoint_info->region_id,
segment_meta_infos.size());
return restored_segments;
}
catch (const Exception & e)
{
LOG_INFO(
log,
"Build checkpoint from remote failed for {}, region_id={} remote_store_id={}",
e.message(),
checkpoint_info->region_id,
checkpoint_info->remote_store_id);
wbs.setRollback();
e.rethrow();
}
wbs.writeLogAndData();
LOG_INFO(log, "Finish write fap checkpoint, region_id={}", checkpoint_info->region_id);
return restored_segments;
return {};
}

void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
Expand Down Expand Up @@ -1211,7 +1227,7 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(

// TODO(fap) This could be executed in a dedicated thread if it consumes too much time.
for (auto & segment : updated_segments)
checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft);
checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap);
}

} // namespace DM
Expand Down
30 changes: 26 additions & 4 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <Storages/DeltaMerge/RowKeyOrderedBlockInputStream.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/DeltaMerge/Segment_fwd.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/KVStore/KVStore.h>
Expand Down Expand Up @@ -400,28 +401,39 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //

// If cache is empty, we read from DELTA_MERGE_FIRST_SEGMENT_ID to the end and build the cache.
// Otherwise, we just read the segment that cover the range.
PageIdU64 current_segment_id = 1;
PageIdU64 current_segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
auto end_to_segment_id_cache = checkpoint_info->checkpoint_data_holder->getEndToSegmentIdCache(
KeyspaceTableID{context.keyspace_id, context.physical_table_id});
auto lock = end_to_segment_id_cache->lock();
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
if (is_cache_ready)
{
// TODO bisect for end
current_segment_id
= end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue());
}
LOG_DEBUG(Logger::get(), "Read segment meta info from segment {}", current_segment_id);
std::vector<std::pair<DM::RowKeyValue, UInt64>> end_key_and_segment_ids;
SegmentMetaInfos segment_infos;
// TODO(fap) After #7642 there could be no segment, so it could panic later.
while (current_segment_id != 0)
{
Segment::SegmentMetaInfo segment_info;
auto target_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Meta, context.physical_table_id),
current_segment_id);
auto page = checkpoint_info->temp_ps->read(target_id);
auto page = checkpoint_info->temp_ps->read(target_id, nullptr, {}, false);
if unlikely (!page.isValid())
{
// After #7642, DELTA_MERGE_FIRST_SEGMENT_ID may not exist, however, such checkpoint won't be selected.
// If it were to be selected, the FAP task could fallback to regular snapshot.
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't find page id {}, keyspace={} table_id={} current_segment_id={} range={}",
target_id,
context.keyspace_id,
context.physical_table_id,
current_segment_id,
target_range.toDebugString());
}
segment_info.segment_id = current_segment_id;
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
readSegmentMetaInfo(buf, segment_info);
Expand Down Expand Up @@ -2350,6 +2362,16 @@ void Segment::drop(const FileProviderPtr & file_provider, WriteBatches & wbs)
stable->drop(file_provider);
}

void Segment::dropAsFAPTemp(const FileProviderPtr & file_provider, WriteBatches & wbs)
{
// The segment_id, delta_id, stable_id are invalid, just cleanup the persisted page_id in
// delta layer and stable layer
delta->recordRemoveColumnFilesPages(wbs);
stable->recordRemovePacksPages(wbs);
wbs.writeAll();
stable->drop(file_provider);
}

Segment::ReadInfo Segment::getReadInfo(
const DMContext & dm_context,
const ColumnDefines & read_columns,
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ class Segment
void setValidDataRatioChecked() { check_valid_data_ratio.store(true, std::memory_order_relaxed); }

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);
/// Only used in FAP.
/// Drop a segment built with invalid id.
void dropAsFAPTemp(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }

Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
namespace DM
{

constexpr PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1;

} // namespace DM
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ ContextPtr init(WorkloadOptions & opts)
if (!opts.s3_bucket.empty())
{
auto & kvstore = context->getTMTContext().getKVStore();
auto store_meta = kvstore->getStoreMeta();
auto store_meta = kvstore->clonedStoreMeta();
store_meta.set_id(test_store_id);
kvstore->setStore(store_meta);
context->getSharedContextDisagg()->initRemoteDataStore(context->getFileProvider(), /*is_s3_enabled*/ true);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool GCManager::work()
// For disagg enabled, we must wait before the store meta inited before doing compaction
// on segments. Or it will upload new data with incorrect remote path.
auto & kvstore = global_context.getTMTContext().getKVStore();
auto store_info = kvstore->getStoreMeta();
auto store_info = kvstore->clonedStoreMeta();
if (store_info.id() == InvalidStoreID)
{
LOG_INFO(log, "Skip GC because store meta is not initialized");
Expand Down

0 comments on commit 7779b00

Please sign in to comment.