diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index bc0ebe45a01..04a72f1227a 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit bc0ebe45a0183c0afb67db609157119776210d5e +Subproject commit 04a72f1227ad382703d050e44c704ef54746d42f diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index a7cf3f2d431..81f83bb4c3b 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -38,7 +38,6 @@ namespace DB M(exception_before_dmfile_remove_encryption) \ M(exception_before_dmfile_remove_from_disk) \ M(force_triggle_background_merge_delta) \ - M(force_triggle_foreground_flush) \ M(exception_before_mpp_make_non_root_mpp_task_active) \ M(exception_before_mpp_register_non_root_mpp_task) \ M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \ @@ -100,6 +99,9 @@ namespace DB M(force_stop_background_checkpoint_upload) \ M(skip_seek_before_read_dmfile) \ M(exception_after_large_write_exceed) \ + M(proactive_flush_force_set_type) \ + M(proactive_flush_between_persist_cache_and_region) \ + M(proactive_flush_between_persist_regions) \ M(exception_when_fetch_disagg_pages) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ @@ -114,13 +116,15 @@ namespace DB M(pause_before_register_non_root_mpp_task) \ M(pause_before_make_non_root_mpp_task_active) -#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \ - M(pause_when_reading_from_dt_stream) \ - M(pause_when_writing_to_dt_store) \ - M(pause_when_ingesting_to_dt_store) \ - M(pause_when_altering_dt_store) \ - M(pause_after_copr_streams_acquired) \ - M(pause_query_init) +#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \ + M(pause_when_reading_from_dt_stream) \ + M(pause_when_writing_to_dt_store) \ + M(pause_when_ingesting_to_dt_store) \ + M(pause_when_altering_dt_store) \ + M(pause_after_copr_streams_acquired) \ + M(pause_query_init) \ + M(pause_proactive_flush_before_persist_region) \ + M(pause_passive_flush_before_persist_region) #define APPLY_FOR_RANDOM_FAILPOINTS(M) \ M(random_tunnel_wait_timeout_failpoint) \ diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 92e250ac748..8208d4d382b 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -125,12 +125,6 @@ namespace DB M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \ F(type_sync_schema_apply_duration, {{"type", "sync_schema_duration"}}, ExpBuckets{0.001, 2, 20}), \ F(type_sync_table_schema_apply_duration, {{"type", "sync_table_schema_duration"}}, ExpBuckets{0.001, 2, 20})) \ - M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ - M(tiflash_stale_read_count, "Total number of stale read", Counter) \ - M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \ - F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ - M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \ - F(type_raft_wait_index_duration, {{"type", "tmt_raft_wait_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_syncing_data_freshness, "The freshness of tiflash data with tikv data", Histogram, \ F(type_syncing_data_freshness, {{"type", "data_freshness"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \ @@ -148,7 +142,11 @@ namespace DB F(type_seg_split_fg, {"type", "seg_split_fg"}), \ F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \ F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \ - F(type_place_index_update, {"type", "place_index_update"})) \ + F(type_place_index_update, {"type", "place_index_update"}), \ + F(type_compact_log_segment_bg, {"type", "compact_log_segment_bg"}), \ + F(type_compact_log_segment_fg, {"type", "compact_log_segment_fg"}), \ + F(type_compact_log_region_bg, {"type", "compact_log_region_bg"}), \ + F(type_compact_log_region_fg, {"type", "compact_log_region_fg"})) \ M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \ F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ @@ -160,7 +158,11 @@ namespace DB F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \ + F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_bg, {{"type", "compact_log_bg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_fg, {{"type", "compact_log_fg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_fg_dm, {{"type", "compact_log_fg_dm"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_bg_dm, {{"type", "compact_log_bg_dm"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \ F(type_write, {"type", "write"}), /**/ \ F(type_ingest, {"type", "ingest"}), /**/ \ @@ -253,16 +255,42 @@ namespace DB F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \ F(type_apply_snapshot_predecode_upload, {{"type", "snapshot_predecode_upload"}}, ExpBuckets{0.05, 2, 10}), \ F(type_apply_snapshot_flush, {{"type", "snapshot_flush"}}, ExpBuckets{0.05, 2, 10})) \ - M(tiflash_raft_process_keys, "Total number of keys processed in some types of Raft commands", Counter, \ - F(type_apply_snapshot, {"type", "apply_snapshot"}), F(type_ingest_sst, {"type", "ingest_sst"})) \ - M(tiflash_raft_apply_write_command_duration_seconds, "Bucketed histogram of applying write command Raft logs", Histogram, \ + M(tiflash_raft_apply_write_command_duration_seconds, "Bucketed histogram of applying write command Raft logs", \ + Histogram, /* like tiflash_raft_command_duration_seconds but are smaller tasks */ \ F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_admin, {{"type", "admin"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_flush_region, {{"type", "flush_region"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_raft_process_keys, "Total number of keys processed in some types of Raft commands", Counter, \ + F(type_apply_snapshot, {"type", "apply_snapshot"}), F(type_ingest_sst, {"type", "ingest_sst"})) \ M(tiflash_raft_upstream_latency, "The latency that tikv sends raft log to tiflash.", Histogram, \ F(type_write, {{"type", "write"}}, ExpBuckets{0.001, 2, 30})) \ M(tiflash_raft_write_data_to_storage_duration_seconds, "Bucketed histogram of writting region into storage layer", Histogram, \ - F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \ + F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), \ + F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_raft_raft_log_lag_count, "Bucketed histogram raft index lag", Histogram, \ + F(type_compact_index, {{"type", "compact_index"}}, EqualWidthBuckets{0, 200, 5}), \ + F(type_applied_index, {{"type", "applied_index"}}, EqualWidthBuckets{0, 200, 5})) \ + M(tiflash_raft_raft_events_count, "Raft event counter", Counter, \ + F(type_pre_exec_compact, {{"type", "pre_exec_compact"}}), \ + F(type_flush_apply_snapshot, {{"type", "flush_apply_snapshot"}}), \ + F(type_flush_ingest_sst, {{"type", "flush_ingest_sst"}}), \ + F(type_flush_useless_admin, {{"type", "flush_useless_admin"}}), \ + F(type_flush_useful_admin, {{"type", "flush_useful_admin"}}), \ + F(type_flush_passive, {{"type", "flush_passive"}}), \ + F(type_flush_proactive, {{"type", "flush_proactive"}}), \ + F(type_flush_log_gap, {{"type", "flush_log_gap"}}), \ + F(type_flush_size, {{"type", "flush_size"}}), \ + F(type_flush_rowcount, {{"type", "flush_rowcount"}}), \ + F(type_exec_compact, {{"type", "exec_compact"}})) \ + M(tiflash_raft_region_flush_size, "Bucketed histogram of region flushed size", Histogram, \ + F(type_flushed, {{"type", "flushed"}}, ExpBuckets{32, 2, 16}), \ + F(type_unflushed, {{"type", "unflushed"}}, ExpBuckets{32, 2, 16})) \ + M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_stale_read_count, "Total number of stale read", Counter) \ + M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \ + F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \ + F(type_raft_wait_index_duration, {{"type", "tmt_raft_wait_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ /* required by DBaaS */ \ M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \ F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \ diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index 1f72d06a8d5..7b1168fc127 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -20,8 +20,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -31,6 +33,7 @@ #include #include + namespace DB { namespace RegionBench @@ -136,6 +139,20 @@ KVGetStatus fn_get_region_local_state(RaftStoreProxyPtr ptr, uint64_t region_id, return KVGetStatus::NotFound; } +void fn_notify_compact_log(RaftStoreProxyPtr ptr, uint64_t region_id, uint64_t compact_index, uint64_t compact_term, uint64_t applied_index) +{ + // Update flushed applied_index and truncated state. + auto & x = as_ref(ptr); + auto region = x.getRegion(region_id); + ASSERT(region); + LOG_INFO(&Poco::Logger::get("!!!!!"), "!!!! fn_notify_compact_log {} commit index {} applied_index {} compact_index {} compact_term {}", region_id, region->getLatestCommitIndex(), applied_index, compact_index, compact_term); + // `applied_index` in proxy's disk can still be less than the `applied_index` here when fg flush. + if (region && region->getApply().truncated_state().index() < compact_index) + { + region->tryUpdateTruncatedState(compact_index, compact_term); + } +} + RaftstoreVer fn_get_cluster_raftstore_version(RaftStoreProxyPtr ptr, uint8_t, int64_t) @@ -153,6 +170,7 @@ TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreP res.fn_make_async_waker = fn_make_async_waker; res.fn_handle_batch_read_index = fn_handle_batch_read_index; res.fn_get_region_local_state = fn_get_region_local_state; + res.fn_notify_compact_log = fn_notify_compact_log; res.fn_get_cluster_raftstore_version = fn_get_cluster_raftstore_version; { // make sure such function pointer will be set at most once. @@ -181,19 +199,44 @@ void MockProxyRegion::updateAppliedIndex(uint64_t index) this->apply.set_applied_index(index); } +void MockProxyRegion::persistAppliedIndex() +{ + // Assume persist after every advance for simplicity. + // So do nothing here. +} + +uint64_t MockProxyRegion::getPersistedAppliedIndex() +{ + // Assume persist after every advance for simplicity. + auto _ = genLockGuard(); + return this->apply.applied_index(); +} + uint64_t MockProxyRegion::getLatestAppliedIndex() { - return this->getApply().applied_index(); + auto _ = genLockGuard(); + return this->apply.applied_index(); } uint64_t MockProxyRegion::getLatestCommitTerm() { - return this->getApply().commit_term(); + auto _ = genLockGuard(); + return this->apply.commit_term(); } uint64_t MockProxyRegion::getLatestCommitIndex() { - return this->getApply().commit_index(); + auto _ = genLockGuard(); + return this->apply.commit_index(); +} + +void MockProxyRegion::tryUpdateTruncatedState(uint64_t index, uint64_t term) +{ + if (index > this->apply.truncated_state().index()) + { + this->apply.mutable_truncated_state()->set_index(index); + this->apply.mutable_truncated_state()->set_term(term); + } } void MockProxyRegion::updateCommitIndex(uint64_t index) @@ -202,7 +245,7 @@ void MockProxyRegion::updateCommitIndex(uint64_t index) this->apply.set_commit_index(index); } -void MockProxyRegion::setSate(raft_serverpb::RegionLocalState s) +void MockProxyRegion::setState(raft_serverpb::RegionLocalState s) { auto _ = genLockGuard(); this->state = s; @@ -426,6 +469,38 @@ void MockRaftStoreProxy::debugAddRegions( } } +void MockRaftStoreProxy::loadRegionFromKVStore( + KVStore & kvs, + TMTContext & tmt, + UInt64 region_id) +{ + UNUSED(tmt); + auto kvr = kvs.getRegion(region_id); + auto ori_r = getRegion(region_id); + auto commit_index = RAFT_INIT_LOG_INDEX; + auto commit_term = RAFT_INIT_LOG_TERM; + if (!ori_r) + { + regions.emplace(region_id, std::make_shared(region_id)); + } + else + { + commit_index = ori_r->getLatestCommitIndex(); + commit_term = ori_r->getLatestCommitTerm(); + } + MockProxyRegionPtr r = getRegion(region_id); + { + r->state = kvr->mutMeta().getRegionState().getBase(); + r->apply = kvr->mutMeta().clonedApplyState(); + if (r->apply.commit_index() == 0) + { + r->apply.set_commit_index(commit_index); + r->apply.set_commit_term(commit_term); + } + } + LOG_INFO(log, "loadRegionFromKVStore [region_id={}] region_state {} apply_state {}", region_id, r->state.DebugString(), r->apply.DebugString()); +} + std::tuple MockRaftStoreProxy::normalWrite( UInt64 region_id, std::vector && keys, @@ -628,7 +703,8 @@ void MockRaftStoreProxy::doApply( TMTContext & tmt, const FailCond & cond, UInt64 region_id, - uint64_t index) + uint64_t index, + std::optional check_proactive_flush) { auto region = getRegion(region_id); assert(region != nullptr); @@ -677,15 +753,46 @@ void MockRaftStoreProxy::doApply( if (cmd.has_raw_write_request()) { // TiFlash write - kvs.handleWriteRaftCmd(std::move(request), region_id, index, term, tmt); + DB::DM::WriteResult write_task; + kvs.handleWriteRaftCmdDebug(std::move(request), region_id, index, term, tmt, write_task); + if (check_proactive_flush) + { + if (check_proactive_flush.value()) + { + // fg flush + ASSERT(write_task.has_value()); + } + else + { + // bg flush + ASSERT(!write_task.has_value()); + } + } } if (cmd.has_admin_request()) { + if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::CompactLog) + { + auto res = kvs.tryFlushRegionData(region_id, false, true, tmt, index, term, region->getApply().truncated_state().index(), region->getApply().truncated_state().term()); + auto compact_index = cmd.admin().request.compact_log().compact_index(); + auto compact_term = cmd.admin().request.compact_log().compact_term(); + if (!res) + { + LOG_DEBUG(log, "mock pre exec reject"); + } + else + { + region->tryUpdateTruncatedState(compact_index, compact_term); + LOG_DEBUG(log, "mock pre exec success, update to {},{}", compact_index, compact_term); + } + } kvs.handleAdminRaftCmd(std::move(cmd.admin().request), std::move(cmd.admin().response), region_id, index, term, tmt); } if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_ADVANCE) { + // We reset applied to old one. + // TODO persistRegion to cowork with restore. kvs.getRegion(region_id)->setApplied(old_applied, old_applied_term); return; } @@ -698,12 +805,26 @@ void MockRaftStoreProxy::doApply( // TODO We should remove (0, index] here, it is enough to remove exactly index now. region->commands.erase(i); } + else if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::BatchSplit) + { + for (auto && sp : cmd.admin().response.splits().regions()) + { + auto r = sp.id(); + loadRegionFromKVStore(kvs, tmt, r); + } + } } // Proxy advance + // In raftstore v1, applied_index in ApplyFsm is advanced before forward to TiFlash. + // However, it is after persisted applied state that ApplyFsm will notify raft to advance. + // So keeping a in-memory applied_index is ambiguious here. + // We currently consider a flush for every command for simplify, + // so in-memory applied_index equals to persisted applied_index. if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE) return; region->updateAppliedIndex(index); + region->persistAppliedIndex(); } void MockRaftStoreProxy::replay( diff --git a/dbms/src/Debug/MockRaftStoreProxy.h b/dbms/src/Debug/MockRaftStoreProxy.h index ab0f38be70c..db50c7dfc63 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockRaftStoreProxy.h @@ -30,12 +30,15 @@ struct MockProxyRegion : MutexLockWrap { raft_serverpb::RegionLocalState getState(); raft_serverpb::RaftApplyState getApply(); + void persistAppliedIndex(); void updateAppliedIndex(uint64_t index); + uint64_t getPersistedAppliedIndex(); uint64_t getLatestAppliedIndex(); uint64_t getLatestCommitTerm(); uint64_t getLatestCommitIndex(); void updateCommitIndex(uint64_t index); - void setSate(raft_serverpb::RegionLocalState); + void tryUpdateTruncatedState(uint64_t index, uint64_t term); + void setState(raft_serverpb::RegionLocalState); explicit MockProxyRegion(uint64_t id); UniversalWriteBatch persistMeta(); void addPeer(uint64_t store_id, uint64_t peer_id, metapb::PeerRole role); @@ -209,6 +212,11 @@ struct MockRaftStoreProxy : MutexLockWrap std::vector region_ids, std::vector> && ranges); + void loadRegionFromKVStore( + KVStore & kvs, + TMTContext & tmt, + UInt64 region_id); + /// We assume that we generate one command, and immediately commit. /// Normal write to a region. std::tuple normalWrite( @@ -281,7 +289,8 @@ struct MockRaftStoreProxy : MutexLockWrap TMTContext & tmt, const FailCond & cond, UInt64 region_id, - uint64_t index); + uint64_t index, + std::optional check_proactive_flush = std::nullopt); void replay( KVStore & kvs, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeInterfaces.h b/dbms/src/Storages/DeltaMerge/DeltaMergeInterfaces.h new file mode 100644 index 00000000000..9e87ec8763c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeInterfaces.h @@ -0,0 +1,58 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// In this header defines interfaces that exposed to KVStore layer. + +#pragma once + +#include + +namespace DB +{ +namespace DM +{ +struct RaftWriteResult +{ + // We will find all segments and regions by this range. + std::vector pending_flush_ranges; + KeyspaceID keyspace_id; + TableID table_id; + + RaftWriteResult(std::vector && ranges, KeyspaceID keyspace, TableID table_id_) + : pending_flush_ranges(std::move(ranges)) + , keyspace_id(keyspace) + , table_id(table_id_) + {} + + DISALLOW_COPY(RaftWriteResult); + + RaftWriteResult(RaftWriteResult && other) + { + pending_flush_ranges = std::move(other.pending_flush_ranges); + keyspace_id = other.keyspace_id; + table_id = other.table_id; + } + + RaftWriteResult & operator=(RaftWriteResult && other) + { + pending_flush_ranges = std::move(other.pending_flush_ranges); + keyspace_id = other.keyspace_id; + table_id = other.table_id; + return *this; + } +}; +typedef std::optional WriteResult; +static_assert(std::is_move_constructible_v); +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0bd90fa4f02..bcbcc2d1893 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -95,11 +96,12 @@ extern const char skip_check_segment_update[]; extern const char pause_when_writing_to_dt_store[]; extern const char pause_when_altering_dt_store[]; extern const char force_triggle_background_merge_delta[]; -extern const char force_triggle_foreground_flush[]; extern const char random_exception_after_dt_write_done[]; extern const char force_slow_page_storage_snapshot_release[]; extern const char exception_before_drop_segment[]; extern const char exception_after_drop_segment[]; +extern const char pause_proactive_flush_before_persist_region[]; +extern const char proactive_flush_force_set_type[]; } // namespace FailPoints namespace DM @@ -128,6 +130,7 @@ std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back case TaskType::Compact: case TaskType::Flush: case TaskType::PlaceIndex: + case TaskType::NotifyCompactLog: is_heavy = false; // reserve some task space for heavy tasks if (max_task_num > 1 && light_tasks.size() >= static_cast(max_task_num * 0.9)) @@ -511,7 +514,7 @@ Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, const Co return std::move(block); } -void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_settings, Block & block) +DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_settings, Block & block) { LOG_TRACE(log, "Table write block, rows={} bytes={}", block.rows(), block.bytes()); @@ -519,7 +522,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ const auto rows = block.rows(); if (rows == 0) - return; + return std::nullopt; auto dm_context = newDMContext(db_context, db_settings, "write"); @@ -637,9 +640,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ throw Exception("Fail point random_exception_after_dt_write_done is triggered.", ErrorCodes::FAIL_POINT_ERROR); }); - // TODO: Update the tracing_id before checkSegmentUpdate - for (auto & segment : updated_segments) - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + // TODO: Update the tracing_id before checkSegmentsUpdateForKVStore + return checkSegmentsUpdateForKVStore(dm_context, updated_segments.begin(), updated_segments.end(), ThreadType::Write, InputType::RaftLog); } void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range) @@ -686,7 +688,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + // We don't handle delete range from raft, the delete range is for dm's purpose only. + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft); } bool DeltaMergeStore::flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed) @@ -891,7 +894,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, }); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { - this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); + this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read, InputType::NotRaft); }; size_t final_num_stream = std::min(num_streams, tasks.size()); String req_info; @@ -1007,7 +1010,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? - this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); + this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read, InputType::NotRaft); }; GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); @@ -1103,7 +1106,7 @@ void DeltaMergeStore::read( auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? - this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); + this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read, InputType::NotRaft); }; GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); @@ -1243,7 +1246,7 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen } // checkSegmentUpdate could do foreground merge delta, so call it before sleep. - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft); size_t sleep_step = 50; // Wait at most `sleep_ms` until the delta is merged. @@ -1253,7 +1256,7 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen size_t ms = std::min(sleep_ms, sleep_step); std::this_thread::sleep_for(std::chrono::milliseconds(ms)); sleep_ms -= ms; - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft); } } @@ -1262,12 +1265,13 @@ void DeltaMergeStore::waitForDeleteRange(const DB::DM::DMContextPtr &, const DB: // TODO: maybe we should wait, if there are too many delete ranges? } -void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const SegmentPtr & segment, ThreadType thread_type) +bool DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const SegmentPtr & segment, ThreadType thread_type, InputType input_type) { - fiu_do_on(FailPoints::skip_check_segment_update, { return; }); + bool should_trigger_foreground_kvstore_flush = false; + fiu_do_on(FailPoints::skip_check_segment_update, { return should_trigger_foreground_kvstore_flush; }); if (segment->hasAbandoned()) - return; + return should_trigger_foreground_kvstore_flush; const auto & delta = segment->getDelta(); size_t delta_saved_rows = delta->getRows(/* use_unsaved */ false); @@ -1304,6 +1308,8 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const auto delta_cache_limit_rows = dm_context->delta_cache_limit_rows; auto delta_cache_limit_bytes = dm_context->delta_cache_limit_bytes; + + bool should_background_compact_log = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes); bool should_background_flush = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes) // && (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows || delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes); @@ -1341,7 +1347,19 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const && delta_rows - delta_last_try_place_delta_index_rows >= delta_cache_limit_rows); fiu_do_on(FailPoints::force_triggle_background_merge_delta, { should_background_merge_delta = true; }); - fiu_do_on(FailPoints::force_triggle_foreground_flush, { should_foreground_flush = true; }); + + fiu_do_on(FailPoints::proactive_flush_force_set_type, { + // | set bg bit | bg value bit | set fg bit | fg value bit| + if (auto v = FailPointHelper::getFailPointVal(FailPoints::proactive_flush_force_set_type); v) + { + auto set_kind = std::any_cast>>(v.value()); + auto set_kind_int = set_kind->load(); + if ((set_kind_int >> 1) & 1) + should_foreground_flush = set_kind_int & 1; + if ((set_kind_int >> 3) & 1) + should_background_flush = (set_kind_int >> 2) & 1; + } + }); auto try_add_background_task = [&](const BackgroundTask & task) { if (shutdown_called.load(std::memory_order_relaxed)) @@ -1357,6 +1375,9 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const background_task_handle->wake(); }; + /// Note a bg flush task may still be added even when we have a fg flush here. + /// This bg flush may be better since it may update delta index. + /// Flush is always try first. if (thread_type != ThreadType::Read) { @@ -1375,8 +1396,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const delta_last_try_flush_rows = delta_rows; delta_last_try_flush_bytes = delta_bytes; - LOG_DEBUG(log, "Foreground flush cache in checkSegmentUpdate, thread={} segment={}", thread_type, segment->info()); + LOG_DEBUG(log, "Foreground flush cache in checkSegmentUpdate, thread={} segment={} input_type={}", thread_type, segment->info(), magic_enum::enum_name(input_type)); segment->flushCache(*dm_context); + if (input_type == InputType::RaftLog) + { + // Only the segment update is from a raft log write, will we notify KVStore to trigger a foreground flush. + // Raft Snapshot will always trigger to a KVStore fg flush. + // Raft IngestSST will trigger a KVStore fg flush at best effort, + // which means if the write cf has remained value, we still need to hold the sst file and wait for the next SST. + should_trigger_foreground_kvstore_flush = true; + } } else if (should_background_flush) { @@ -1391,12 +1420,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment}); } } + if (should_background_compact_log) + { + try_add_background_task(BackgroundTask{TaskType::NotifyCompactLog, dm_context, segment}); + } } // Need to check the latest delta (maybe updated after foreground flush). If it is updating by another thread, // give up adding more tasks on this version of delta. if (segment->getDelta()->isUpdating()) - return; + return should_trigger_foreground_kvstore_flush; auto try_fg_merge_delta = [&]() -> SegmentPtr { // If the table is already dropped, don't trigger foreground merge delta when executing `remove region peer`, @@ -1487,19 +1520,19 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const if (thread_type == ThreadType::Write) { if (try_fg_split(segment)) - return; + return should_trigger_foreground_kvstore_flush; if (SegmentPtr new_segment = try_fg_merge_delta(); new_segment) { // After merge delta, we better check split immediately. if (try_bg_split(new_segment)) - return; + return should_trigger_foreground_kvstore_flush; } } else if (thread_type == ThreadType::BG_MergeDelta) { if (try_bg_split(segment)) - return; + return should_trigger_foreground_kvstore_flush; } if (dm_context->enable_logical_split) @@ -1507,23 +1540,24 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const // Logical split point is calculated based on stable. Always try to merge delta into the stable // before logical split is good for calculating the split point. if (try_bg_merge_delta()) - return; + return should_trigger_foreground_kvstore_flush; if (try_bg_split(segment)) - return; + return should_trigger_foreground_kvstore_flush; } else { // During the physical split delta will be merged, so we prefer physical split over merge delta. if (try_bg_split(segment)) - return; + return should_trigger_foreground_kvstore_flush; if (try_bg_merge_delta()) - return; + return should_trigger_foreground_kvstore_flush; } if (try_bg_compact()) - return; + return should_trigger_foreground_kvstore_flush; if (try_place_delta_index()) - return; + return should_trigger_foreground_kvstore_flush; + return should_trigger_foreground_kvstore_flush; // The segment does not need any updates for now. } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index fc273fa344a..2d64034572b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -198,6 +199,14 @@ class DeltaMergeStore : private boost::noncopyable BG_GC, }; + enum class InputType + { + // We are not handling data from raft, maybe it's from a scheduled background service or a replicated dm snapshot. + NotRaft, + RaftLog, + RaftSSTAndSnap, + }; + enum TaskType { Split, @@ -205,6 +214,7 @@ class DeltaMergeStore : private boost::noncopyable Compact, Flush, PlaceIndex, + NotifyCompactLog, }; struct BackgroundTask @@ -282,7 +292,7 @@ class DeltaMergeStore : private boost::noncopyable static Block addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block); - void write(const Context & db_context, const DB::Settings & db_settings, Block & block); + DM::WriteResult write(const Context & db_context, const DB::Settings & db_settings, Block & block); void deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range); @@ -514,16 +524,31 @@ class DeltaMergeStore : private boost::noncopyable void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment); - /** - * Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc. - * If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`). - * See `segmentSplit`, `segmentMerge`, `segmentMergeDelta` for details. - * - * This may be called from multiple threads, e.g. at the foreground write moment, or in background threads. - * A `thread_type` should be specified indicating the type of the thread calling this function. - * Depend on the thread type, the "update" to do may be varied. - */ - void checkSegmentUpdate(const DMContextPtr & context, const SegmentPtr & segment, ThreadType thread_type); + // Deferencing `Iter` can get a pointer to a Segment. + template + DM::WriteResult checkSegmentsUpdateForKVStore(const DMContextPtr & context, Iter begin, Iter end, ThreadType thread_type, InputType input_type) + { + DM::WriteResult result = std::nullopt; + std::vector ranges; + if (thread_type != ThreadType::Write) + return result; + for (Iter it = begin; it != end; it++) + { + if (checkSegmentUpdate(context, *it, thread_type, input_type)) + { + ranges.push_back((*it)->getRowKeyRange()); + } + } + // TODO We can try merge ranges here. + if (!ranges.empty()) + { + result = RaftWriteResult{ + std::move(ranges), + keyspace_id, + physical_table_id}; + } + return result; + } enum class SegmentSplitReason { @@ -686,6 +711,18 @@ class DeltaMergeStore : private boost::noncopyable const SegmentIdSet & read_segments = {}, bool try_split_task = true); +private: + /** + * Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc. + * If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`). + * See `segmentSplit`, `segmentMerge`, `segmentMergeDelta` for details. + * + * This may be called from multiple threads, e.g. at the foreground write moment, or in background threads. + * A `thread_type` should be specified indicating the type of the thread calling this function. + * Depend on the thread type, the "update" to do may be varied. + */ + bool checkSegmentUpdate(const DMContextPtr & context, const SegmentPtr & segment, ThreadType thread_type, InputType input_type); + void triggerCompactLog(const DMContextPtr & dm_context, const RowKeyRange & range, bool is_background) const; #ifndef DBMS_PUBLIC_GTEST private: #else diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index b4b1f177b3b..9bf77356e39 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -737,7 +737,7 @@ UInt64 DeltaMergeStore::ingestFiles( // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap); return bytes; } @@ -1064,7 +1064,7 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo( } for (auto & segment : updated_segments) - checkSegmentUpdate(dm_context, segment, ThreadType::Write); + checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::NotRaft); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 20c6c17d98d..7447038cb9e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -43,7 +44,6 @@ extern const char pause_until_dt_background_delta_merge[]; namespace DM { - // A callback class for scanning the DMFiles on local filesystem class LocalDMFileGcScanner final { @@ -283,7 +283,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) for (auto & [end, segment] : segments) { (void)end; - checkSegmentUpdate(dm_context, segment, ThreadType::Init); + checkSegmentUpdate(dm_context, segment, ThreadType::Init, InputType::NotRaft); } } @@ -399,6 +399,8 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) left = task.segment; type = ThreadType::BG_Flush; break; + case TaskType::NotifyCompactLog: + triggerCompactLog(task.dm_context, task.segment->getRowKeyRange(), true); case TaskType::PlaceIndex: task.segment->placeDeltaIndex(*task.dm_context); break; @@ -424,9 +426,9 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) // continue to check whether we need to apply more tasks after this task is ended. if (left) - checkSegmentUpdate(task.dm_context, left, type); + checkSegmentUpdate(task.dm_context, left, type, InputType::NotRaft); if (right) - checkSegmentUpdate(task.dm_context, right, type); + checkSegmentUpdate(task.dm_context, right, type, InputType::NotRaft); return true; } @@ -689,7 +691,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread); if (new_segment) { - checkSegmentUpdate(dm_context, new_segment, ThreadType::BG_GC); + checkSegmentUpdate(dm_context, new_segment, ThreadType::BG_GC, InputType::NotRaft); } return new_segment; @@ -812,7 +814,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte } segment_snap = {}; - checkSegmentUpdate(dm_context, new_segment, ThreadType::BG_GC); + checkSegmentUpdate(dm_context, new_segment, ThreadType::BG_GC, InputType::NotRaft); return new_segment; } @@ -919,5 +921,12 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) return gc_segments_num; } +void DeltaMergeStore::triggerCompactLog(const DMContextPtr & dm_context, const RowKeyRange & range, bool is_background) const +{ + auto & tmt = dm_context->db_context.getTMTContext(); + auto & kv_store = tmt.getKVStore(); + + kv_store->proactiveFlushCacheAndRegion(tmt, range, keyspace_id, physical_table_id, is_background); +} } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.h b/dbms/src/Storages/DeltaMerge/RowKeyRange.h index 46e2024565d..d1acdbcfef1 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.h @@ -278,6 +278,11 @@ inline bool operator<(const RowKeyValueRef & a, const RowKeyValueRef & b) return compare(a, b) < 0; } +inline bool operator<=(const RowKeyValueRef & a, const RowKeyValueRef & b) +{ + return compare(a, b) <= 0; +} + inline bool operator<(const StringRef & a, const RowKeyValueRef & b) { return compare(a, b) < 0; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index a3f1b8e338b..fcfca2a8e2d 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index aea85de5fd4..f8438ff20a1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -53,11 +53,11 @@ namespace FailPoints extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; extern const char force_triggle_background_merge_delta[]; -extern const char force_triggle_foreground_flush[]; extern const char force_set_segment_ingest_packs_fail[]; extern const char segment_merge_after_ingest_packs[]; extern const char force_set_segment_physical_split[]; extern const char force_set_page_file_write_errno[]; +extern const char proactive_flush_force_set_type[]; } // namespace FailPoints namespace DM @@ -2455,7 +2455,9 @@ try { // write and triggle flush - FailPointHelper::enableFailPoint(FailPoints::force_triggle_foreground_flush); + std::shared_ptr> ai = std::make_shared>(); + ai->store(0b11); + FailPointHelper::enableFailPoint(FailPoints::proactive_flush_force_set_type, ai); Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write, num_rows_write * 2, false); { diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index a73676d8e52..0b5bf6a7468 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -430,7 +430,8 @@ class DMBlockOutputStream : public IBlockOutputStream if (db_settings.dt_insert_max_rows == 0) { Block to_write = decorator(block); - return store->write(db_context, db_settings, to_write); + store->write(db_context, db_settings, to_write); + return; } Block new_block = decorator(block); @@ -474,7 +475,7 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin return std::make_shared(getAndMaybeInitStore(), decorator, global_context, settings); } -void StorageDeltaMerge::write(Block & block, const Settings & settings) +WriteResult StorageDeltaMerge::write(Block & block, const Settings & settings) { auto & store = getAndMaybeInitStore(); #ifndef NDEBUG @@ -535,7 +536,7 @@ void StorageDeltaMerge::write(Block & block, const Settings & settings) FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_write_to_storage); - store->write(global_context, settings, block); + return store->write(global_context, settings, block); } std::unordered_set parseSegmentSet(const ASTPtr & ast) @@ -1672,7 +1673,7 @@ BlockInputStreamPtr StorageDeltaMerge::status() auto & name_col = columns[0]; auto & value_col = columns[1]; - StoreStats stat; + DM::StoreStats stat; if (storeInited()) { stat = _store->getStoreStats(); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index f3b95e5ff5e..dacc1f7279b 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -90,7 +91,7 @@ class StorageDeltaMerge BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; /// Write from raft layer. - void write(Block & block, const Settings & settings); + DM::WriteResult write(Block & block, const Settings & settings); void flushCache(const Context & context) override; diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index e450d951dd1..99aa53305cd 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -87,7 +87,7 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, old_region->setStateApplying(); tmt.getRegionTable().tryWriteBlockByRegionAndFlush(old_region, false); tryFlushRegionCacheInStorage(tmt, *old_region, log); - persistRegion(*old_region, ®ion_lock, "save previous region before apply"); + persistRegion(*old_region, ®ion_lock, PersistRegionReason::ApplySnapshotPrevRegion, ""); } } @@ -277,7 +277,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re manage_lock.index.add(new_region); } - persistRegion(*new_region, ®ion_lock, "save current region after apply"); + persistRegion(*new_region, ®ion_lock, PersistRegionReason::ApplySnapshotCurRegion, ""); tmt.getRegionTable().shrinkRegionRange(*new_region); } @@ -547,6 +547,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec // try to flush remain data in memory. func_try_flush(); auto tmp_region = handleIngestSSTByDTFile(region, snaps, index, term, tmt); + // Merge data from tmp_region. region->finishIngestSSTByDTFile(std::move(tmp_region), index, term); // after `finishIngestSSTByDTFile`, try to flush committed data into storage func_try_flush(); @@ -559,7 +560,9 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec } else { - persistRegion(*region, ®ion_task_lock, __FUNCTION__); + // We always try to flush dm cache and region if possible for every IngestSST, + // in order to have the raft log truncated and sst deleted. + persistRegion(*region, ®ion_task_lock, PersistRegionReason::IngestSst, ""); return EngineStoreApplyRes::Persist; } } diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 2d7013abf15..f82ee6498ce 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,9 @@ #include #include +#include +#include + namespace DB { namespace ErrorCodes @@ -42,6 +46,10 @@ extern const int TABLE_IS_DROPPED; namespace FailPoints { extern const char force_fail_in_flush_region_data[]; +extern const char pause_proactive_flush_before_persist_region[]; +extern const char pause_passive_flush_before_persist_region[]; +extern const char proactive_flush_between_persist_cache_and_region[]; +extern const char proactive_flush_between_persist_regions[]; } // namespace FailPoints KVStore::KVStore(Context & context) @@ -51,8 +59,10 @@ KVStore::KVStore(Context & context) , region_compact_log_period(120) , region_compact_log_min_rows(40 * 1024) , region_compact_log_min_bytes(32 * 1024 * 1024) + , region_compact_log_gap(500) { // default config about compact-log: period 120s, rows 40k, bytes 32MB. + LOG_INFO(log, "KVStore inited"); } void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper) @@ -102,6 +112,7 @@ RegionPtr KVStore::getRegion(RegionID region_id) const return it->second; return nullptr; } +// TODO: may get regions not in segment? RegionMap KVStore::getRegionsByRangeOverlap(const RegionRange & range) const { auto manage_lock = genRegionMgrReadLock(); @@ -177,7 +188,7 @@ void KVStore::tryPersistRegion(RegionID region_id) auto region = getRegion(region_id); if (region) { - persistRegion(*region, std::nullopt, ""); + persistRegion(*region, std::nullopt, PersistRegionReason::Debug, ""); } } @@ -238,12 +249,13 @@ RegionManager::RegionWriteLock KVStore::genRegionMgrWriteLock(const KVStoreTaskL return region_manager.genWriteLock(); } -EngineStoreApplyRes KVStore::handleWriteRaftCmd( +EngineStoreApplyRes KVStore::handleWriteRaftCmdInner( raft_cmdpb::RaftCmdRequest && request, UInt64 region_id, UInt64 index, UInt64 term, - TMTContext & tmt) const + TMTContext & tmt, + DM::WriteResult & write_result) { std::vector keys; std::vector vals; @@ -276,32 +288,73 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmd( throw Exception(fmt::format("Unsupport raft cmd {}", raft_cmdpb::CmdType_Name(type)), ErrorCodes::LOGICAL_ERROR); } } - return handleWriteRaftCmd( + return handleWriteRaftCmdInner( WriteCmdsView{.keys = keys.data(), .vals = vals.data(), .cmd_types = cmd_types.data(), .cmd_cf = cmd_cf.data(), .len = keys.size()}, region_id, index, term, - tmt); + tmt, + write_result); } -EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt) const +EngineStoreApplyRes KVStore::handleWriteRaftCmdInner(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt, DM::WriteResult & write_result) { - auto region_persist_lock = region_manager.genRegionTaskLock(region_id); - - const RegionPtr region = getRegion(region_id); - if (region == nullptr) + EngineStoreApplyRes res; { - return EngineStoreApplyRes::NotFound; - } + auto region_persist_lock = region_manager.genRegionTaskLock(region_id); + + const RegionPtr region = getRegion(region_id); + if (region == nullptr) + { + return EngineStoreApplyRes::NotFound; + } - auto res = region->handleWriteRaftCmd(cmds, index, term, tmt); - if (region->getClusterRaftstoreVer() == RaftstoreVer::V2) + auto && [r, w] = region->handleWriteRaftCmd(cmds, index, term, tmt); + + if (region->getClusterRaftstoreVer() == RaftstoreVer::V2) + { + region->orphanKeysInfo().advanceAppliedIndex(index); + } + write_result = std::move(w); + res = r; + } + /// Safety: + /// This call is from Proxy's applying thread of this region, so: + /// 1. No other thread can write from raft to this region even if we unlocked here. + /// 2. If `proactiveFlushCacheAndRegion` causes a write stall, it will be forwarded to raft layer. + if (write_result) { - region->orphanKeysInfo().advanceAppliedIndex(index); + auto & inner = write_result.value(); + for (auto it = inner.pending_flush_ranges.begin(); it != inner.pending_flush_ranges.end(); it++) + { + proactiveFlushCacheAndRegion(tmt, *it, inner.keyspace_id, inner.table_id, false); + } } return res; } +EngineStoreApplyRes KVStore::handleWriteRaftCmd( + raft_cmdpb::RaftCmdRequest && request, + UInt64 region_id, + UInt64 index, + UInt64 term, + TMTContext & tmt) +{ + DM::WriteResult write_result; + return handleWriteRaftCmdInner(std::move(request), region_id, index, term, tmt, write_result); +} + +EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt) +{ + DM::WriteResult write_result; + return handleWriteRaftCmdInner(cmds, region_id, index, term, tmt, write_result); +} + +EngineStoreApplyRes KVStore::handleWriteRaftCmdDebug(raft_cmdpb::RaftCmdRequest && request, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt, DM::WriteResult & write_result) +{ + return handleWriteRaftCmdInner(std::move(request), region_id, index, term, tmt, write_result); +} + void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt) { handleDestroy(region_id, tmt, genTaskLock()); @@ -320,25 +373,29 @@ void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTas removeRegion(region_id, /* remove_data */ true, tmt.getRegionTable(), task_lock, region_manager.genRegionTaskLock(region_id)); } -void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes) +void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes, UInt64 gap) { region_compact_log_period = sec; region_compact_log_min_rows = rows; region_compact_log_min_bytes = bytes; + region_compact_log_gap = gap; LOG_INFO( log, - "threshold config: period {}, rows {}, bytes {}", + "threshold config: period {}, rows {}, bytes {}, gap {}", sec, rows, - bytes); + bytes, + gap); } -void KVStore::persistRegion(const Region & region, std::optional region_task_lock, const char * caller) +void KVStore::persistRegion(const Region & region, std::optional region_task_lock, PersistRegionReason reason, const char * extra_msg) { RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString()); if (region_task_lock.has_value()) { + auto reason_id = magic_enum::enum_underlying(reason); + std::string caller = fmt::format("{} {}", PersistRegionReasonMap[reason_id], extra_msg); LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller); region_persister->persist(region, *region_task_lock.value()); LOG_DEBUG(log, "Persist {} done", region.toString(false)); @@ -349,6 +406,30 @@ void KVStore::persistRegion(const Region & region, std::optionalpersist(region); LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize()); } + switch (reason) + { + case PersistRegionReason::UselessAdminCommand: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_useless_admin).Increment(1); + break; + case PersistRegionReason::AdminCommand: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_useful_admin).Increment(1); + break; + case PersistRegionReason::Flush: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_passive).Increment(1); + break; + case PersistRegionReason::ProactiveFlush: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_proactive).Increment(1); + break; + case PersistRegionReason::ApplySnapshotPrevRegion: + case PersistRegionReason::ApplySnapshotCurRegion: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_apply_snapshot).Increment(1); + break; + case PersistRegionReason::IngestSst: + GET_METRIC(tiflash_raft_raft_events_count, type_flush_ingest_sst).Increment(1); + break; + default: + break; + } } bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt) @@ -356,13 +437,14 @@ bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt) auto region_task_lock = region_manager.genRegionTaskLock(region_id); const RegionPtr curr_region_ptr = getRegion(region_id); // TODO Should handle when curr_region_ptr is null. - return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock, 0, 0); + return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock, 0, 0, 0, 0); } -bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_until_succeed, TMTContext & tmt, UInt64 index, UInt64 term) +bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_until_succeed, TMTContext & tmt, UInt64 index, UInt64 term, uint64_t truncated_index, uint64_t truncated_term) { auto region_task_lock = region_manager.genRegionTaskLock(region_id); const RegionPtr curr_region_ptr = getRegion(region_id); + if (curr_region_ptr == nullptr) { /// If we can't find region here, we return true so proxy can trigger a CompactLog. @@ -372,6 +454,8 @@ bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_ LOG_WARNING(log, "region {} [index: {}, term {}], not exist when flushing, maybe have exec `RemoveNode` first", region_id, index, term); return true; } + + FAIL_POINT_PAUSE(FailPoints::pause_passive_flush_before_persist_region); if (force_persist) { auto & curr_region = *curr_region_ptr; @@ -384,11 +468,11 @@ bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_ } else { - return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock, index, term); + return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock, index, term, truncated_index, truncated_term); } } -bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term) +bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term, UInt64 truncated_index, UInt64 truncated_term) { if (curr_region_ptr == nullptr) { @@ -396,29 +480,46 @@ bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 fl } auto & curr_region = *curr_region_ptr; + bool can_flush = false; auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo(); - LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); - - bool can_flush = false; - if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) - || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) + if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)) { - // if rows or bytes more than threshold, flush cache and persist mem data. + GET_METRIC(tiflash_raft_raft_events_count, type_flush_rowcount).Increment(1); can_flush = true; } - else + if (size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) { - // if there is little data in mem, wait until time interval reached threshold. - // use random period so that lots of regions will not be persisted at same time. - auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT - can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); + GET_METRIC(tiflash_raft_raft_events_count, type_flush_size).Increment(1); + can_flush = true; } + auto gap_threshold = region_compact_log_gap.load(); + if (index > truncated_index + gap_threshold) + { + GET_METRIC(tiflash_raft_raft_events_count, type_flush_log_gap).Increment(1); + can_flush = true; + } + + auto current_gap = index > truncated_index ? index - truncated_index : 0; + auto last_compact_log_applied = curr_region.lastCompactLogApplied(); + auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0; + + GET_METRIC(tiflash_raft_raft_events_count, type_pre_exec_compact).Increment(1); + GET_METRIC(tiflash_raft_raft_log_lag_count, type_compact_index).Observe(current_gap); + LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}, gap {}/{} applied gap {}", curr_region.toString(false), rows, size_bytes, current_gap, gap_threshold, current_applied_gap); + if (can_flush && flush_if_possible) { - LOG_DEBUG(log, "{} flush region due to tryFlushRegionData, index {} term {}", curr_region.toString(false), index, term); + GET_METRIC(tiflash_raft_raft_events_count, type_exec_compact).Increment(1); + // This rarely happens when there are too may raft logs, which don't trigger a proactive flush. + LOG_INFO(log, "{} flush region due to tryFlushRegionData, index {} term {} truncated_index {} truncated_term {} gap {}/{}", curr_region.toString(false), index, term, truncated_index, truncated_term, current_gap, gap_threshold); + GET_METRIC(tiflash_raft_region_flush_size, type_flushed).Observe(size_bytes); return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term); } + else + { + GET_METRIC(tiflash_raft_region_flush_size, type_unflushed).Observe(size_bytes); + } return can_flush; } @@ -432,7 +533,7 @@ bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succ } if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed)) { - persistRegion(curr_region, ®ion_task_lock, "tryFlushRegionData"); + persistRegion(curr_region, ®ion_task_lock, PersistRegionReason::Flush, ""); curr_region.markCompactLog(); curr_region.cleanApproxMemCacheInfo(); GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds()); @@ -488,7 +589,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( || cmd_type == raft_cmdpb::AdminCmdType::BatchSwitchWitness) { tryFlushRegionCacheInStorage(tmt, curr_region, log); - persistRegion(curr_region, ®ion_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str()); + persistRegion(curr_region, ®ion_task_lock, PersistRegionReason::UselessAdminCommand, fmt::format("{}", cmd_type).c_str()); return EngineStoreApplyRes::Persist; } return EngineStoreApplyRes::None; @@ -571,7 +672,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ const auto persist_and_sync = [&](const Region & region) { tryFlushRegionCacheInStorage(tmt, region, log); - persistRegion(region, ®ion_task_lock, "admin raft cmd"); + persistRegion(region, ®ion_task_lock, PersistRegionReason::AdminCommand, ""); }; const auto handle_batch_split = [&](Regions & split_regions) { @@ -911,4 +1012,227 @@ FileUsageStatistics KVStore::getFileUsageStatistics() const return region_persister->getFileUsageStatistics(); } +void KVStore::proactiveFlushCacheAndRegion(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, KeyspaceID keyspace_id, TableID table_id, bool is_background) +{ + Stopwatch general_watch; + UInt64 total_dm_flush_millis = 0; + SCOPE_EXIT({ + if (is_background) + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_bg).Increment(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_bg).Observe(general_watch.elapsedSeconds()); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_bg_dm).Observe(total_dm_flush_millis / 1000.0); + } + else + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_fg).Increment(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_fg).Observe(general_watch.elapsedSeconds()); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_fg_dm).Observe(total_dm_flush_millis / 1000.0); + } + }); + + auto storage = tmt.getStorages().get(keyspace_id, table_id); + if (unlikely(storage == nullptr)) + { + LOG_WARNING(log, + "proactiveFlushCacheAndRegion can not get table for table id {}, ignored", + table_id); + return; + } + auto range = std::make_pair(TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.start.toRegionKey(table_id))), + TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.end.toRegionKey(table_id)))); + Stopwatch watch; + + LOG_INFO(log, "Start proactive flush region range [{},{}] [table_id={}] [keyspace_id={}] [is_background={}]", range.first.toDebugString(), range.second.toDebugString(), table_id, keyspace_id, is_background); + /// It finds r1,r2,r3 in the following case. + /// |------ range ------| + /// |--- r1 ---|--- r2 ---|--- r3 ---| + struct RegionCompactInfo + { + UInt64 applied_index; + UInt64 applied_term; + DM::RowKeyRange rowkey_range; + RegionPtr region_ptr; + bool skip_flush; + }; + std::unordered_map region_compact_indexes; + { + // Get the regions. + auto task_lock = genTaskLock(); + auto maybe_region_map = [&]() { + auto manage_lock = genRegionMgrReadLock(); + // Check if the region overlaps. + return manage_lock.index.findByRangeChecked(range); + }(); + + if (std::holds_alternative(maybe_region_map)) + { + auto & info = std::get(maybe_region_map); + FmtBuffer buffer; + buffer.joinStr( + std::get<1>(info).begin(), + std::get<1>(info).end(), + [&](const auto & e, FmtBuffer & b) { b.fmtAppend("{}", e); }, + " "); + std::string fmt_error = fmt::format("Find overlapped regions at {}, regions are {}, quit", std::get<0>(info).toDebugString(), buffer.toString()); + LOG_ERROR(log, fmt_error); + throw Exception(fmt_error, ErrorCodes::LOGICAL_ERROR); + } + + auto & region_map = std::get(maybe_region_map); + for (const auto & overlapped_region : region_map) + { + auto region_rowkey_range = DM::RowKeyRange::fromRegionRange( + overlapped_region.second->getRange(), + table_id, + storage->isCommonHandle(), + storage->getRowKeyColumnSize()); + region_compact_indexes[overlapped_region.first] = { + overlapped_region.second->appliedIndex(), + overlapped_region.second->appliedIndexTerm(), + region_rowkey_range, + overlapped_region.second, + false}; + } + } + FAIL_POINT_PAUSE(FailPoints::pause_proactive_flush_before_persist_region); + // Flush all segments in the range of regions. + // TODO: combine adjacent range to do one flush. + std::string reason = fmt::format("proactiveFlush{}", is_background ? "Bg" : "Fg"); + for (auto & region_compact_info : region_compact_indexes) + { + const auto & region_rowkey_range = region_compact_info.second.rowkey_range; + auto region_id = region_compact_info.first; + auto region_ptr = region_compact_info.second.region_ptr; + auto applied_index = region_compact_info.second.applied_index; + auto last_flushed_applied = region_ptr->lastCompactLogApplied(); + { + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + enum class SkipReason + { + None = 0, + Time = 1, + Log = 2 + }; + SkipReason skip_reason = SkipReason::None; + if (region_ptr->lastCompactLogTime() + Seconds{region_compact_log_period.load(std::memory_order_relaxed)} > Clock::now()) + { + skip_reason = SkipReason::Time; + } + else if (region_ptr->lastCompactLogApplied() + 15 < applied_index) + { + skip_reason = SkipReason::Log; + } + if (skip_reason != SkipReason::None) + { + LOG_INFO(log, "skip flush region {} for skip reason {}, region range:[{},{}], flushed segment range:[{},{}]", region_id, magic_enum::enum_name(skip_reason), region_rowkey_range.getStart().toDebugString(), region_rowkey_range.getEnd().toDebugString(), rowkey_range.getStart().toDebugString(), rowkey_range.getEnd().toDebugString()); + region_compact_info.second.skip_flush = true; + continue; + } + // Both flushCache and persistRegion should be protected by region task lock. + // We can avoid flushCache with a region lock held, if we save some meta info before flushing cache in memory. + // After flushCache, we will persist region and notify Proxy with the previously stored meta info. + // Meanwhile, other write/admin cmds may be executed, we have to handle the following cases: + // For write cmds, we need to support replay from KVStore level, like enhancing duplicate key detection. + // For admin cmds, it can cause insertion/deletion of regions, so it can't be replayed currently. + // Merely persisting applied_index is not enough, consider some cmds leads to modification of other meta data. + + if (rowkey_range.getStart() <= region_rowkey_range.getStart() + && region_rowkey_range.getEnd() <= rowkey_range.getEnd() + && last_flushed_applied >= applied_index) + { + // `region_rowkey_range` belongs to rowkey_range. + // E.g. [0,9223372036854775807] belongs to [-9223372036854775808,9223372036854775807]. + // This segment has been flushed, and the region is locked. + // However, writes may come between we lock regions. + + // TODO We can save the applied_index of every region, before the last time we flushCache. + // And we will persistRegion according to this applied_index, following the upper note. + LOG_DEBUG(log, "segment of region {} flushed, [applied_index={}] [applied_term={}] [last_flushed_applied={}]", region_compact_info.first, region_compact_info.second.applied_index, region_compact_info.second.applied_term, last_flushed_applied); + } + else + { + Stopwatch watch2; + LOG_DEBUG(log, "extra segment of region {} to flush, region range:[{},{}], flushed segment range:[{},{}] [last_flushed_applied={}]", region_compact_info.first, region_rowkey_range.getStart().toDebugString(), region_rowkey_range.getEnd().toDebugString(), rowkey_range.getStart().toDebugString(), rowkey_range.getEnd().toDebugString(), last_flushed_applied); + storage->flushCache(tmt.getContext(), region_rowkey_range); + total_dm_flush_millis += watch2.elapsedSecondsFromLastTime(); + } + fiu_do_on(FailPoints::proactive_flush_between_persist_cache_and_region, return;); + { + persistRegion(*region_ptr, std::make_optional(®ion_task_lock), PersistRegionReason::ProactiveFlush, reason.c_str()); + // So proxy can get the current compact state of this region of TiFlash's side. + region_ptr->markCompactLog(); + region_ptr->cleanApproxMemCacheInfo(); + } + // Drop region task lock. + } + // Flush the origin cache again, in case writes has been written between handing regions. + storage->flushCache(tmt.getContext(), rowkey_range); + SYNC_FOR("after_proactiveFlushCacheAndRegion::loop_region"); + } + auto elapsed_coupled_flush = watch.elapsedMilliseconds(); + watch.restart(); + + fiu_do_on(FailPoints::proactive_flush_between_persist_regions, return;); + // forbid regions being removed. + for (const auto & region_compact_info : region_compact_indexes) + { + // Can truncated to flushed index, which is applied_index in this case. + // Region can be removed since we don't lock kvstore here. + if (!region_compact_info.second.skip_flush) + { + auto region_id = region_compact_info.first; + LOG_DEBUG(log, "skip notify compactlog region {}", region_id); + notifyCompactLog(region_id, region_compact_info.second.applied_index, region_compact_info.second.applied_term, is_background, false); + } + } + auto elapsed_notify_proxy = watch.elapsedMilliseconds(); + + LOG_DEBUG(log, "Finished proactive flush region range [{},{}] of {} regions. [couple_flush={}] [notify_proxy={}] [table_id={}] [keyspace_id={}] [is_background={}]", range.first.toDebugString(), range.second.toDebugString(), region_compact_indexes.size(), elapsed_coupled_flush, elapsed_notify_proxy, table_id, keyspace_id, is_background); +} + +/// The function will notify Proxy to schedule a CheckCompact task. When this task is handled, +/// When handling, it will try to update `applied_index` and `truncated_state`, and then persist. +/// The updated `truncated_state` will not exceed the recorded `max_compact_index`. +/// `max_compact_index` is updated by CompactLog, whether it is filtered. +/// Requirements: +/// 1. The caller will guarantee that delta cache has been flushed. +/// 1. Region cache being persisted before notifying. +/// The truncated_index is mono-increase since: +/// 1. Every non-filtered passive flush uses what is from the `CompactLog`, +/// and `entry_storage::first_index`/`compact_raft_log` will guard that. +/// 1. Every proactive flush uses the newest `applied_index`. +void KVStore::notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background, bool lock_held) +{ + auto region = getRegion(region_id); + if (!region) + { + LOG_INFO(log, "region {} has been removed, ignore", region_id); + return; + } + + if (is_background) + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_bg).Increment(); + } + else + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_fg).Increment(); + } + auto f = [&]() { + // We will notify even if `flush_state.applied_index` is greater than `compact_index`, + // since this greater `applied_index` may not trigger a compact log. + // We will maintain the biggest on Proxy's side. + getProxyHelper()->notifyCompactLog(region_id, compact_index, compact_term, compact_index); + }; + if (lock_held) + { + f(); + } + else + { + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + f(); + } +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index c0dc93c6d1c..4cf992ddc6d 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -14,11 +14,15 @@ #pragma once +#include +#include #include #include #include #include +#include + namespace TiDB { struct TableInfo; @@ -77,6 +81,30 @@ class RegionPersister; struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; +enum class PersistRegionReason +{ + Debug, + UselessAdminCommand, // Does not include passive CompactLog + AdminCommand, + Flush, // passive CompactLog + ProactiveFlush, + ApplySnapshotPrevRegion, + ApplySnapshotCurRegion, + IngestSst +}; + +constexpr const char * PersistRegionReasonMap[magic_enum::enum_count()] = { + "debug", + "admin cmd useless", + "admin raft cmd", + "tryFlushRegionData", + "ProactiveFlush", + "save previous region before apply", + "save current region after apply", + "ingestsst"}; + +static_assert(magic_enum::enum_count() == sizeof(PersistRegionReasonMap) / sizeof(const char *)); + /// TODO: brief design document. class KVStore final : private boost::noncopyable { @@ -111,11 +139,20 @@ class KVStore final : private boost::noncopyable UInt64 region_id, UInt64 index, UInt64 term, - TMTContext & tmt) const; - EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt) const; + TMTContext & tmt); + EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt); + EngineStoreApplyRes handleWriteRaftCmdDebug(raft_cmdpb::RaftCmdRequest && request, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt, DM::WriteResult & write_result); + EngineStoreApplyRes handleWriteRaftCmdInner( + raft_cmdpb::RaftCmdRequest && request, + UInt64 region_id, + UInt64 index, + UInt64 term, + TMTContext & tmt, + DM::WriteResult & write_result); + EngineStoreApplyRes handleWriteRaftCmdInner(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt, DM::WriteResult & write_result); bool needFlushRegionData(UInt64 region_id, TMTContext & tmt); - bool tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_until_succeed, TMTContext & tmt, UInt64 index, UInt64 term); + bool tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_until_succeed, TMTContext & tmt, UInt64 index, UInt64 term, uint64_t truncated_index, uint64_t truncated_term); /** * Only used in tests. In production we will call preHandleSnapshotToFiles + applyPreHandledSnapshot. @@ -137,7 +174,7 @@ class KVStore final : private boost::noncopyable void applyPreHandledSnapshot(const RegionPtrWrap &, TMTContext & tmt); void handleDestroy(UInt64 region_id, TMTContext & tmt); - void setRegionCompactLogConfig(UInt64, UInt64, UInt64); + void setRegionCompactLogConfig(UInt64, UInt64, UInt64, UInt64); EngineStoreApplyRes handleIngestSST(UInt64 region_id, SSTViewVec, UInt64 index, UInt64 term, TMTContext & tmt); RegionPtr genRegionPtr(metapb::Region && region, UInt64 peer_id, UInt64 index, UInt64 term); const TiFlashRaftProxyHelper * getProxyHelper() const { return proxy_helper; } @@ -178,6 +215,8 @@ class KVStore final : private boost::noncopyable FileUsageStatistics getFileUsageStatistics() const; + void proactiveFlushCacheAndRegion(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, KeyspaceID keyspace_id, TableID table_id, bool is_background); + void notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background, bool lock_held = true); #ifndef DBMS_PUBLIC_GTEST private: #endif @@ -247,10 +286,10 @@ class KVStore final : private boost::noncopyable /// Notice that if flush_if_possible is set to false, we only check if a flush is allowed by rowsize/size/interval. /// It will not check if a flush will eventually succeed. /// In other words, `canFlushRegionDataImpl(flush_if_possible=true)` can return false. - bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term); + bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term, UInt64 truncated_index, UInt64 truncated_term); bool forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term); - void persistRegion(const Region & region, std::optional region_task_lock, const char * caller); + void persistRegion(const Region & region, std::optional region_task_lock, PersistRegionReason reason, const char * extra_msg); void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); @@ -273,6 +312,7 @@ class KVStore final : private boost::noncopyable std::atomic region_compact_log_period; std::atomic region_compact_log_min_rows; std::atomic region_compact_log_min_bytes; + std::atomic region_compact_log_gap; mutable std::mutex bg_gc_region_data_mutex; std::list bg_gc_region_data; diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 4fabc7b0078..d40835d2efa 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -53,7 +53,7 @@ extern const int TABLE_IS_DROPPED; } // namespace ErrorCodes -static void writeRegionDataToStorage( +static DM::WriteResult writeRegionDataToStorage( Context & context, const RegionPtrWithBlock & region, RegionDataReadInfoList & data_list_read, @@ -65,6 +65,7 @@ static void writeRegionDataToStorage( TableID table_id = region->getMappedTableID(); UInt64 region_decode_cost = -1, write_part_cost = -1; + DM::WriteResult write_result = std::nullopt; /// Declare lambda of atomic read then write to call multiple times. auto atomic_read_write = [&](bool force_decode) { /// Get storage based on table ID. @@ -143,9 +144,13 @@ static void writeRegionDataToStorage( { auto dm_storage = std::dynamic_pointer_cast(storage); if (need_decode) - dm_storage->write(*block_ptr, context.getSettingsRef()); + { + write_result = dm_storage->write(*block_ptr, context.getSettingsRef()); + } else - dm_storage->write(block, context.getSettingsRef()); + { + write_result = dm_storage->write(block, context.getSettingsRef()); + } break; } default: @@ -178,7 +183,7 @@ static void writeRegionDataToStorage( { if (atomic_read_write(false)) { - return; + return write_result; } } @@ -196,6 +201,7 @@ static void writeRegionDataToStorage( throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed", ErrorCodes::LOGICAL_ERROR); } + return write_result; } } @@ -334,7 +340,7 @@ static inline void reportUpstreamLatency(const RegionDataReadInfoList & data_lis } } -void RegionTable::writeBlockByRegion( +DM::WriteResult RegionTable::writeBlockByRegion( Context & context, const RegionPtrWithBlock & region, RegionDataReadInfoList & data_list_to_remove, @@ -353,15 +359,16 @@ void RegionTable::writeBlockByRegion( } if (!data_list_read) - return; + return std::nullopt; reportUpstreamLatency(*data_list_read); - writeRegionDataToStorage(context, region, *data_list_read, log); + auto write_result = writeRegionDataToStorage(context, region, *data_list_read, log); RemoveRegionCommitCache(region, *data_list_read, lock_region); /// Save removed data to outer. data_list_to_remove = std::move(*data_list_read); + return write_result; } RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegion(TMTContext & tmt, @@ -387,6 +394,7 @@ RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegi if (data_list_read.empty()) return RegionException::RegionReadStatus::OK; auto & context = tmt.getContext(); + // There is no raft input here, so we can just ignore the fg flush request. writeRegionDataToStorage(context, region, data_list_read, log); RemoveRegionCommitCache(region, data_list_read); return RegionException::RegionReadStatus::OK; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 4f4c0968c53..b5b0993da33 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -145,12 +145,12 @@ uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id) } } -uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t flush_pattern, uint64_t index, uint64_t term) +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t flush_pattern, uint64_t index, uint64_t term, uint64_t truncated_index, uint64_t truncated_term) { try { auto & kvstore = server->tmt->getKVStore(); - return kvstore->tryFlushRegionData(region_id, false, flush_pattern, *server->tmt, index, term); + return kvstore->tryFlushRegionData(region_id, false, flush_pattern, *server->tmt, index, term, truncated_index, truncated_term); } catch (...) { @@ -855,6 +855,11 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint return state; } +void TiFlashRaftProxyHelper::notifyCompactLog(uint64_t region_id, uint64_t compact_index, uint64_t compact_term, uint64_t applied_index) const +{ + this->fn_notify_compact_log(this->proxy_ptr, region_id, compact_index, compact_term, applied_index); +} + void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts) { try @@ -869,7 +874,6 @@ void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint } } - std::string_view buffToStrView(const BaseBuffView & buf) { return std::string_view{buf.data, buf.len}; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index f8d2658d60d..b14a888fa77 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -116,6 +116,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper TimerTask makeTimerTask(uint64_t time_ms) const; bool pollTimerTask(TimerTask & task, RawVoidPtr waker = nullptr) const; raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const; + void notifyCompactLog(uint64_t region_id, uint64_t compact_index, uint64_t compact_term, uint64_t applied_index) const; }; extern "C" { @@ -132,7 +133,7 @@ uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id); // `flush_pattern` values: // 0: try, but can fail. // 1: try until succeed. -uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t flush_pattern, uint64_t index, uint64_t term); +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t flush_pattern, uint64_t index, uint64_t term, uint64_t truncated_index, uint64_t truncated_term); RawCppPtr CreateWriteBatch(const EngineStoreServerWrap * dummy); void WriteBatchPutPage(RawVoidPtr ptr, BaseBuffView page_id, BaseBuffView value); void WriteBatchDelPage(RawVoidPtr ptr, BaseBuffView page_id); diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp index 680771fdc44..abc5c855dc2 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp @@ -526,6 +526,7 @@ void ReadIndexDataNode::runOneRound(const TiFlashRaftProxyHelper & helper, const auto _ = genLockGuard(); { + // Find the task with the maximum ts in all `waiting_tasks`. Timestamp max_ts = 0; ReadIndexFuturePtr max_ts_task = nullptr; { diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 9b5bfd95b17..326bb621c51 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -49,7 +50,18 @@ RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIte std::optional Region::readDataByWriteIt(const RegionData::ConstWriteCFIter & write_it, bool need_value, bool hard_error) { - return data.readDataByWriteIt(write_it, need_value, id(), appliedIndex(), hard_error); + try + { + return data.readDataByWriteIt(write_it, need_value, id(), appliedIndex(), hard_error); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(region id {}, applied_index:{}, applied_term:{})", + meta.regionId(), + appliedIndex(), + appliedIndexTerm())); + throw; + } } DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const @@ -195,7 +207,7 @@ Regions RegionRaftCommandDelegate::execBatchSplit( if (new_region_index == -1) throw Exception(std::string(__PRETTY_FUNCTION__) + ": region index not found", ErrorCodes::LOGICAL_ERROR); - RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState()); + RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.clonedApplyState()); new_meta.setApplied(index, term); meta.assignRegionMeta(std::move(new_meta)); } @@ -482,6 +494,13 @@ std::string Region::dataInfo() const void Region::markCompactLog() const { last_compact_log_time = Clock::now(); + uint64_t current_applied_index = 0; + if (last_compact_log_applied != 0) + { + uint64_t gap = current_applied_index > last_compact_log_applied ? current_applied_index - last_compact_log_applied : 0; + GET_METRIC(tiflash_raft_raft_log_lag_count, type_applied_index).Observe(gap); + } + last_compact_log_applied = current_applied_index; } Timepoint Region::lastCompactLogTime() const @@ -489,6 +508,11 @@ Timepoint Region::lastCompactLogTime() const return last_compact_log_time; } +UInt64 Region::lastCompactLogApplied() const +{ + return last_compact_log_applied; +} + Region::CommittedScanner Region::createCommittedScanner(bool use_lock, bool need_value) { return Region::CommittedScanner(this->shared_from_this(), use_lock, need_value); @@ -670,11 +694,11 @@ void Region::tryCompactionFilter(const Timestamp safe_point) } } -EngineStoreApplyRes Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt) +std::pair Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt) { if (index <= appliedIndex()) { - return EngineStoreApplyRes::None; + return std::make_pair(EngineStoreApplyRes::None, std::nullopt); } auto & context = tmt.getContext(); Stopwatch watch; @@ -754,17 +778,25 @@ EngineStoreApplyRes Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt6 approx_mem_cache_bytes += cache_written_size; }; + DM::WriteResult write_result = std::nullopt; { - std::unique_lock lock(mutex); - - handle_write_cmd_func(); + { + // RegionTable::writeBlockByRegion may lead to persistRegion when flush proactively. + // So we can't lock here. + // Safety: Mutations to a region come from raft applying and bg flushing of storage layer. + // 1. A raft applying process should acquire the region task lock. + // 2. While bg/fg flushing, applying raft logs should also be prevented with region task lock. + // So between here and RegionTable::writeBlockByRegion, there will be no new data applied. + std::unique_lock lock(mutex); + handle_write_cmd_func(); + } // If transfer-leader happened during ingest-sst, there might be illegal data. if (0 != cmds.len) { /// Flush data right after they are committed. RegionDataReadInfoList data_list_to_remove; - RegionTable::writeBlockByRegion(context, shared_from_this(), data_list_to_remove, log, false); + write_result = RegionTable::writeBlockByRegion(context, shared_from_this(), data_list_to_remove, log, true); } meta.setApplied(index, term); @@ -772,7 +804,7 @@ EngineStoreApplyRes Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt6 meta.notifyAll(); - return EngineStoreApplyRes::None; + return std::make_pair(EngineStoreApplyRes::None, std::move(write_result)); } void Region::finishIngestSSTByDTFile(RegionPtr && rhs, UInt64 index, UInt64 term) diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 25202cf1f57..e47c3e8a591 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include #include @@ -149,6 +151,7 @@ class Region : public std::enable_shared_from_this void markCompactLog() const; Timepoint lastCompactLogTime() const; + UInt64 lastCompactLogApplied() const; friend bool operator==(const Region & region1, const Region & region2) { @@ -191,7 +194,7 @@ class Region : public std::enable_shared_from_this TableID getMappedTableID() const; KeyspaceID getKeyspaceID() const; - EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt); + std::pair handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt); void finishIngestSSTByDTFile(RegionPtr && rhs, UInt64 index, UInt64 term); UInt64 getSnapshotEventFlag() const { return snapshot_event_flag; } @@ -242,6 +245,7 @@ class Region : public std::enable_shared_from_this std::atomic snapshot_event_flag{1}; const TiFlashRaftProxyHelper * proxy_helper{nullptr}; mutable std::atomic last_compact_log_time{Timepoint::min()}; + mutable std::atomic last_compact_log_applied{0}; mutable std::atomic approx_mem_cache_rows{0}; mutable std::atomic approx_mem_cache_bytes{0}; }; diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp index 4114044dc18..15643a65294 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -47,7 +47,6 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value if (!kv_pair) return 0; - return insert(std::move(*kv_pair), mode); } @@ -71,6 +70,7 @@ RegionDataRes RegionCFDataBase::insert(std::pair && kv_pair, prev_value = TiKVValue::copyFrom(getTiKVValue(kv_pair.second)); } auto [it, ok] = map.emplace(std::move(kv_pair)); + // We support duplicated kv pairs if they are the same in snapshot. // This is because kvs in raftstore v2's snapshot may be overlapped. // However, we still not permit duplicated kvs from raft cmd. diff --git a/dbms/src/Storages/Transaction/RegionManager.h b/dbms/src/Storages/Transaction/RegionManager.h index 348ac8e87a8..09e2f6b1ba7 100644 --- a/dbms/src/Storages/Transaction/RegionManager.h +++ b/dbms/src/Storages/Transaction/RegionManager.h @@ -26,10 +26,12 @@ class RegionTaskLock; struct RegionTaskCtrl : MutexLockWrap { + // TODO This lock may be changed back to simple mutex. + typedef std::recursive_mutex Mut; /// The life time of each RegionTaskElement element should be as long as RegionManager, just return const ref. struct RegionTaskElement : private boost::noncopyable { - mutable std::mutex mutex; + mutable Mut mutex; }; /// Encapsulate the task lock for region RegionTaskLock genRegionTaskLock(RegionID region_id) const; @@ -84,10 +86,10 @@ class RegionTaskLock : private boost::noncopyable { friend struct RegionTaskCtrl; - explicit RegionTaskLock(std::mutex & mutex_) + explicit RegionTaskLock(RegionTaskCtrl::Mut & mutex_) : lock(mutex_) {} - std::lock_guard lock; + std::lock_guard lock; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 8911b349193..234e578ca52 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -78,7 +78,7 @@ void RegionMeta::setPeer(metapb::Peer && p) peer = p; } -raft_serverpb::RaftApplyState RegionMeta::getApplyState() const +raft_serverpb::RaftApplyState RegionMeta::clonedApplyState() const { std::lock_guard lock(mutex); return apply_state; diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index e0b993df280..7f7ecb53371 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -81,7 +81,7 @@ class RegionMeta UInt64 confVer() const; - raft_serverpb::RaftApplyState getApplyState() const; + raft_serverpb::RaftApplyState clonedApplyState() const; void setApplied(UInt64 index, UInt64 term); void notifyAll() const; @@ -113,6 +113,7 @@ class RegionMeta metapb::Region cloneMetaRegion() const; const raft_serverpb::MergeState & getMergeState() const; raft_serverpb::MergeState cloneMergeState() const; + const RegionState & getRegionState() const { return region_state; }; RegionMeta() = delete; diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 4461830cc1c..e849fdd37a5 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -176,11 +177,11 @@ class RegionTable : private boost::noncopyable /// Will trigger schema sync on read error for only once, /// assuming that newer schema can always apply to older data by setting force_decode to true in RegionBlockReader::read. /// Note that table schema must be keep unchanged throughout the process of read then write, we take good care of the lock. - static void writeBlockByRegion(Context & context, - const RegionPtrWithBlock & region, - RegionDataReadInfoList & data_list_to_remove, - const LoggerPtr & log, - bool lock_region = true); + static DM::WriteResult writeBlockByRegion(Context & context, + const RegionPtrWithBlock & region, + RegionDataReadInfoList & data_list_to_remove, + const LoggerPtr & log, + bool lock_region = true); /// Check transaction locks in region, and write committed data in it into storage engine if check passed. Otherwise throw an LockException. /// The write logic is the same as #writeBlockByRegion, with some extra checks about region version and conf_version. diff --git a/dbms/src/Storages/Transaction/RegionsRangeIndex.cpp b/dbms/src/Storages/Transaction/RegionsRangeIndex.cpp index 8bcb647dff3..ec3b4f76d51 100644 --- a/dbms/src/Storages/Transaction/RegionsRangeIndex.cpp +++ b/dbms/src/Storages/Transaction/RegionsRangeIndex.cpp @@ -73,6 +73,31 @@ RegionMap RegionsRangeIndex::findByRangeOverlap(const RegionRange & range) const return res; } +std::variant RegionsRangeIndex::findByRangeChecked(const RegionRange & range) const +{ + auto begin_it = root.lower_bound(range.first); + auto end_it = root.lower_bound(range.second); + if (begin_it->first.compare(range.first) != 0) + --begin_it; + + RegionMap res; + for (auto it = begin_it; it != end_it; ++it) + { + if (it->second.region_map.size() < 2) + res.insert(it->second.region_map.begin(), it->second.region_map.end()); + else + { + std::vector v; + for (const auto & iter : it->second.region_map) + { + v.push_back(iter.first); + } + return std::make_tuple(it->first.copy(), std::move(v)); + } + } + return res; +} + RegionsRangeIndex::RegionsRangeIndex() { clear(); diff --git a/dbms/src/Storages/Transaction/RegionsRangeIndex.h b/dbms/src/Storages/Transaction/RegionsRangeIndex.h index b4ccb6fee11..73d44f21512 100644 --- a/dbms/src/Storages/Transaction/RegionsRangeIndex.h +++ b/dbms/src/Storages/Transaction/RegionsRangeIndex.h @@ -18,6 +18,7 @@ #include #include +#include namespace DB { @@ -42,6 +43,7 @@ class RegionsRangeIndex : private boost::noncopyable { public: using RootMap = std::map; + using OverlapInfo = std::tuple>; void add(const RegionPtr & new_region); @@ -49,6 +51,9 @@ class RegionsRangeIndex : private boost::noncopyable RegionMap findByRangeOverlap(const RegionRange & range) const; + // Returns a region map of all regions of range, or the id of the first region that is checked overlapped with another region. + std::variant findByRangeChecked(const RegionRange & range) const; + RegionsRangeIndex(); const RootMap & getRoot() const; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 3f579bbf0c2..d1f18d3eb33 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -341,6 +341,7 @@ void TMTContext::reloadConfig(const Poco::Util::AbstractConfiguration & config) static constexpr const char * COMPACT_LOG_MIN_PERIOD = "flash.compact_log_min_period"; static constexpr const char * COMPACT_LOG_MIN_ROWS = "flash.compact_log_min_rows"; static constexpr const char * COMPACT_LOG_MIN_BYTES = "flash.compact_log_min_bytes"; + static constexpr const char * COMPACT_LOG_MIN_GAP = "flash.compact_log_min_gap"; static constexpr const char * BATCH_READ_INDEX_TIMEOUT_MS = "flash.batch_read_index_timeout_ms"; static constexpr const char * WAIT_INDEX_TIMEOUT_MS = "flash.wait_index_timeout_ms"; static constexpr const char * WAIT_REGION_READY_TIMEOUT_SEC = "flash.wait_region_ready_timeout_sec"; @@ -349,7 +350,8 @@ void TMTContext::reloadConfig(const Poco::Util::AbstractConfiguration & config) // default config about compact-log: period 120s, rows 40k, bytes 32MB. getKVStore()->setRegionCompactLogConfig(std::max(config.getUInt64(COMPACT_LOG_MIN_PERIOD, 120), 1), std::max(config.getUInt64(COMPACT_LOG_MIN_ROWS, 40 * 1024), 1), - std::max(config.getUInt64(COMPACT_LOG_MIN_BYTES, 32 * 1024 * 1024), 1)); + std::max(config.getUInt64(COMPACT_LOG_MIN_BYTES, 32 * 1024 * 1024), 1), + std::max(config.getUInt64(COMPACT_LOG_MIN_GAP, 500), 1)); { batch_read_index_timeout_ms = config.getUInt64(BATCH_READ_INDEX_TIMEOUT_MS, DEFAULT_BATCH_READ_INDEX_TIMEOUT_MS); wait_index_timeout_ms = config.getUInt64(WAIT_INDEX_TIMEOUT_MS, DEFAULT_WAIT_INDEX_TIMEOUT_MS); diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 617f4c24ca7..0cb1f341e11 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -37,7 +37,7 @@ try raft_cmdpb::AdminResponse response; auto region = kvs.getRegion(1); region->markCompactLog(); - kvs.setRegionCompactLogConfig(100000, 1000, 1000); + kvs.setRegionCompactLogConfig(100000, 1000, 1000, 0); request.mutable_compact_log(); request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); // CompactLog always returns true now, even if we can't do a flush. @@ -45,7 +45,7 @@ try ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist); // Filter - ASSERT_EQ(kvs.tryFlushRegionData(1, false, false, ctx.getTMTContext(), 0, 0), false); + ASSERT_EQ(kvs.tryFlushRegionData(1, false, false, ctx.getTMTContext(), 0, 0, 0, 0), false); } } CATCH @@ -120,7 +120,7 @@ TEST_F(RegionKVStoreTest, ReadIndex) std::this_thread::sleep_for(std::chrono::milliseconds(2)); auto tar = kvs.getRegion(tar_region_id); ASSERT_EQ( - tar->handleWriteRaftCmd({}, 66, 6, ctx.getTMTContext()), + tar->handleWriteRaftCmd({}, 66, 6, ctx.getTMTContext()).first, EngineStoreApplyRes::None); } { @@ -153,6 +153,7 @@ TEST_F(RegionKVStoreTest, ReadIndex) // Test read index // Note `batchReadIndex` always returns latest committed index in our mock class. + // See `RawMockReadIndexTask::poll`. kvs.asyncRunReadIndexWorkers(); SCOPE_EXIT({ kvs.stopReadIndexWorkers(); @@ -668,7 +669,7 @@ TEST_F(RegionKVStoreTest, Writes) } catch (Exception & e) { - ASSERT_EQ(e.message(), "Raw TiDB PK: 800000000000091D, Prewrite ts: 2333 can not found in default cf for key: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE, region_id: 1, applied: 5"); + ASSERT_EQ(e.message(), "Raw TiDB PK: 800000000000091D, Prewrite ts: 2333 can not found in default cf for key: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE: (region id 1, applied_index:5, applied_term:5)"); ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 ]"); kvs.getRegion(1)->tryCompactionFilter(1000); } @@ -1090,7 +1091,7 @@ try { // A snapshot can set region to Tombstone. - proxy_instance->getRegion(22)->setSate(({ + proxy_instance->getRegion(22)->setState(({ raft_serverpb::RegionLocalState s; s.set_state(::raft_serverpb::PeerState::Tombstone); s; @@ -1194,6 +1195,8 @@ TEST_F(RegionKVStoreTest, RegionRange) auto res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); ASSERT_EQ(res.size(), 3); + auto res2 = region_index.findByRangeChecked(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT(std::holds_alternative(res2)); region_index.add(makeRegion(4, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4))); @@ -1206,6 +1209,9 @@ TEST_F(RegionKVStoreTest, RegionRange) res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), TiKVKey(""))); ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(res.find(1) != res.end()); + ASSERT_TRUE(res.find(2) != res.end()); + ASSERT_TRUE(res.find(4) != res.end()); res = region_index.findByRangeOverlap( RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 5))); diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp index b81ff68b808..ae6ff90a9db 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp @@ -177,7 +177,7 @@ void persistAfterWrite(Context & ctx, KVStore & kvs, std::unique_ptrwrite(std::move(wb), nullptr); // There shall be data to flush. ASSERT_EQ(kvs.needFlushRegionData(region_id, ctx.getTMTContext()), true); - ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0), true); + ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0, 0, 0), true); } TEST_F(RegionKVStoreTestFAP, RestoreRaftState) diff --git a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp index 0b19223bab7..df49479fc1f 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp @@ -23,7 +23,7 @@ namespace tests TEST_F(RegionKVStoreTest, KVStoreFailRecovery) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); KVStore & kvs = getKVS(); { auto applied_index = 0; @@ -103,6 +103,8 @@ try proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); ASSERT_EQ(kvr1->appliedIndex(), applied_index); + ASSERT_NE(kvr1->appliedIndex(), index); + // The persisted applied_index is `applied_index`. kvs.tryPersistRegion(region_id); } { @@ -158,7 +160,7 @@ CATCH TEST_F(RegionKVStoreTest, KVStoreInvalidWrites) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); { auto region_id = 1; { @@ -195,58 +197,7 @@ CATCH TEST_F(RegionKVStoreTest, KVStoreAdminCommands) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); - // CompactLog and passive persistence - { - KVStore & kvs = getKVS(); - UInt64 region_id = 1; - { - auto applied_index = 0; - proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); - MockRaftStoreProxy::FailCond cond; - - auto kvr1 = kvs.getRegion(region_id); - auto r1 = proxy_instance->getRegion(region_id); - ASSERT_NE(r1, nullptr); - ASSERT_NE(kvr1, nullptr); - applied_index = r1->getLatestAppliedIndex(); - ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); - auto [index, term] = proxy_instance->normalWrite(region_id, {33}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); - proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); - ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); - ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); - - kvr1->markCompactLog(); - kvs.setRegionCompactLogConfig(0, 0, 0); - auto && [request, response] = MockRaftStoreProxy::composeCompactLog(r1, index); - auto && [index2, term2] = proxy_instance->adminCommand(region_id, std::move(request), std::move(response)); - // In tryFlushRegionData we will call handleWriteRaftCmd, which will already cause an advance. - // Notice kvs is not tmt->getKVStore(), so we can't use the ProxyFFI version. - ASSERT_TRUE(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), index2, term)); - proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2); - ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 2); - ASSERT_EQ(kvr1->appliedIndex(), applied_index + 2); - } - { - proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); - // There shall be data to flush. - ASSERT_EQ(kvs.needFlushRegionData(region_id, ctx.getTMTContext()), true); - // If flush fails, and we don't insist a success. - FailPointHelper::enableFailPoint(FailPoints::force_fail_in_flush_region_data); - ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0), false); - FailPointHelper::disableFailPoint(FailPoints::force_fail_in_flush_region_data); - // Force flush until succeed only for testing. - ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), 0, 0), true); - // Non existing region. - // Flush and CompactLog will not panic. - ASSERT_EQ(kvs.tryFlushRegionData(1999, false, true, ctx.getTMTContext(), 0, 0), true); - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - request.mutable_compact_log(); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); - } - } + auto & ctx = TiFlashTestEnv::getGlobalContext(); { KVStore & kvs = getKVS(); UInt64 region_id = 2; @@ -296,7 +247,7 @@ try ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), region_id, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); { - kvs.setRegionCompactLogConfig(0, 0, 0); + kvs.setRegionCompactLogConfig(0, 0, 0, 0); request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response2), region_id, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); } @@ -336,7 +287,7 @@ static void validate(KVStore & kvs, std::unique_ptr & proxy_ TEST_F(RegionKVStoreTest, KVStoreSnapshotV1) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr); { UInt64 region_id = 1; @@ -511,7 +462,7 @@ CATCH TEST_F(RegionKVStoreTest, KVStoreSnapshotV2Extra) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr); UInt64 region_id = 2; TableID table_id; @@ -543,7 +494,7 @@ CATCH TEST_F(RegionKVStoreTest, KVStoreSnapshotV2Basic) try { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr); UInt64 region_id = 1; TableID table_id; @@ -835,6 +786,7 @@ try auto kvr1 = kvs.getRegion(region_id); auto r1 = proxy_instance->getRegion(region_id); + // Trigger a row2col. auto && [req, res] = MockRaftStoreProxy::composeCompactLog(r1, 10); proxy_instance->adminCommand(region_id, std::move(req), std::move(res), 20); EXPECT_THROW(proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, 20), Exception); diff --git a/dbms/src/Storages/Transaction/tests/gtest_proactive_flush.cpp b/dbms/src/Storages/Transaction/tests/gtest_proactive_flush.cpp new file mode 100644 index 00000000000..e81d990eb87 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_proactive_flush.cpp @@ -0,0 +1,345 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "kvstore_helper.h" + + +namespace DB +{ +namespace tests +{ + + +TEST_F(RegionKVStoreTest, KVStorePassivePersistence) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + // CompactLog and passive persistence + { + KVStore & kvs = getKVS(); + UInt64 region_id = 1; + { + auto applied_index = 0; + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); + MockRaftStoreProxy::FailCond cond; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_NE(r1, nullptr); + ASSERT_NE(kvr1, nullptr); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + auto [index, term] = proxy_instance->normalWrite(region_id, {33}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + + kvr1->markCompactLog(); + kvs.setRegionCompactLogConfig(0, 0, 0, 0); + auto && [request, response] = MockRaftStoreProxy::composeCompactLog(r1, index); + auto && [index2, term2] = proxy_instance->adminCommand(region_id, std::move(request), std::move(response)); + // In tryFlushRegionData we will call handleWriteRaftCmd, which will already cause an advance. + // Notice kvs is not tmt->getKVStore(), so we can't use the ProxyFFI version. + ASSERT_TRUE(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), index2, term, 0, 0)); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 2); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 2); + } + { + proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(region_id, ctx.getTMTContext()), true); + // If flush fails, and we don't insist a success. + FailPointHelper::enableFailPoint(FailPoints::force_fail_in_flush_region_data); + ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0, 0, 0), false); + FailPointHelper::disableFailPoint(FailPoints::force_fail_in_flush_region_data); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), 0, 0, 0, 0), true); + // Non existing region. + // Flush and CompactLog will not panic. + ASSERT_EQ(kvs.tryFlushRegionData(1999, false, true, ctx.getTMTContext(), 0, 0, 0, 0), true); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + } +} +CATCH + +std::tuple RegionKVStoreTest::prepareForProactiveFlushTest() +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + // Allow enough large segment size. + ctx.getSettingsRef().dt_segment_limit_rows = 1000000; + ctx.getSettingsRef().dt_segment_limit_size = 1000000; + ctx.getSettingsRef().dt_segment_delta_cache_limit_rows = 0; + ctx.getSettingsRef().dt_segment_delta_cache_limit_size = 0; + UInt64 region_id = 1; + UInt64 region_id2 = 7; + TableID table_id; + KVStore & kvs = getKVS(); + ctx.getTMTContext().debugSetKVStore(kvstore); + MockRaftStoreProxy::FailCond cond; + { + initStorages(); + table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + HandleID end_index = 100; + HandleID mid_index = 50; + auto start = RecordKVFormat::genKey(table_id, 0); + auto end = RecordKVFormat::genKey(table_id, end_index); + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::make_pair(start.toString(), end.toString())); + + auto source_region = kvs.getRegion(region_id); + auto old_epoch = source_region->mutMeta().getMetaRegion().region_epoch(); + auto && [request, response] = MockRaftStoreProxy::composeBatchSplit( + {region_id, region_id2}, + {{RecordKVFormat::genKey(table_id, mid_index), RecordKVFormat::genKey(table_id, end_index)}, + {RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, mid_index)}}, + old_epoch); + auto && [index2, term2] = proxy_instance->adminCommand(region_id, std::move(request), std::move(response)); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2); + + auto kvr1 = kvs.getRegion(region_id); + auto kvr2 = kvs.getRegion(region_id2); + ctx.getTMTContext().getRegionTable().updateRegion(*kvr1); + ctx.getTMTContext().getRegionTable().updateRegion(*kvr2); + } + return std::make_tuple(table_id, region_id, region_id2); +} + +TEST_F(RegionKVStoreTest, ProactiveFlushConsistency) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + auto tp = prepareForProactiveFlushTest(); + // auto table_id = std::get<0>(tp); + auto region_id = std::get<1>(tp); + // auto region_id2 = std::get<2>(tp); + MockRaftStoreProxy::FailCond cond; + KVStore & kvs = getKVS(); + + std::shared_ptr> ai = std::make_shared>(); + DB::FailPointHelper::enableFailPoint(DB::FailPoints::proactive_flush_force_set_type, ai); + ai->store(0b0000); + + { + // Newer passive and older proactive. + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + uint64_t compact_index = 10; + auto && [request, response] = MockRaftStoreProxy::composeCompactLog(r1, compact_index); + auto && [index1, term] = proxy_instance->adminCommand(region_id, std::move(request), std::move(response), 11); + kvs.setRegionCompactLogConfig(0, 0, 0, 500); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index1); + UNUSED(term); + kvs.notifyCompactLog(region_id, 1, 5, false, false); + ASSERT_EQ(r1->getApply().truncated_state().index(), compact_index); + } + + DB::FailPointHelper::disableFailPoint(DB::FailPoints::proactive_flush_force_set_type); +} +CATCH + +TEST_F(RegionKVStoreTest, ProactiveFlushLiveness) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + auto tp = prepareForProactiveFlushTest(); + auto table_id = std::get<0>(tp); + auto region_id = std::get<1>(tp); + auto region_id2 = std::get<2>(tp); + MockRaftStoreProxy::FailCond cond; + KVStore & kvs = getKVS(); + + std::shared_ptr> ai = std::make_shared>(); + DB::FailPointHelper::enableFailPoint(DB::FailPoints::proactive_flush_force_set_type, ai); + { + // A fg flush and a bg flush will not deadlock. + DB::FailPointHelper::enableFailPoint(DB::FailPoints::pause_proactive_flush_before_persist_region); + DB::FailPointHelper::enableFailPoint(DB::FailPoints::pause_passive_flush_before_persist_region); + ai->store(0b1011); + auto f1 = [&]() { + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 60, 111); + // Trigger a fg flush on region_id + auto [index, term] = proxy_instance->rawWrite(region_id, {k1}, {value_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + auto [index2, term2] = proxy_instance->rawWrite(region_id, {k1}, {value_write}, {WriteCmdType::Put}, {ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2, std::make_optional(true)); + }; + std::thread t1(f1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ai->store(0b1110); + // Force bg flush. + auto f2 = [&]() { + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 5, 111); + // Trigger a fg flush on region_id2 + auto [index, term] = proxy_instance->rawWrite(region_id2, {k1}, {value_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + auto [index2, term2] = proxy_instance->rawWrite(region_id2, {k1}, {value_write}, {WriteCmdType::Put}, {ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index2, std::make_optional(false)); + }; + std::thread t2(f2); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_proactive_flush_before_persist_region); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_passive_flush_before_persist_region); + t1.join(); + t2.join(); + ASSERT_EQ(proxy_instance->getRegion(region_id)->getApply().truncated_state().index(), proxy_instance->getRegion(region_id)->getLatestCommitIndex()); + // We can't assert for region_id2, since bg flush may be be finished. + } + kvs.setRegionCompactLogConfig(0, 0, 0, 500); // Every notify will take effect. + { + // Two fg flush will not deadlock. + DB::FailPointHelper::enableFailPoint(DB::FailPoints::pause_proactive_flush_before_persist_region); + ai->store(0b1011); + auto f1 = [&]() { + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 60, 111); + // Trigger a fg flush on region_id + auto [index, term] = proxy_instance->rawWrite(region_id, {k1}, {value_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + auto [index2, term2] = proxy_instance->rawWrite(region_id, {k1}, {value_write}, {WriteCmdType::Put}, {ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2); + }; + auto f2 = [&]() { + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 5, 111); + // Trigger a fg flush on region_id2 + auto [index, term] = proxy_instance->rawWrite(region_id2, {k1}, {value_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + auto [index2, term2] = proxy_instance->rawWrite(region_id2, {k1}, {value_write}, {WriteCmdType::Put}, {ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index2); + }; + std::thread t1(f1); + std::thread t2(f2); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_proactive_flush_before_persist_region); + t1.join(); + t2.join(); + ASSERT_EQ(proxy_instance->getRegion(region_id)->getApply().truncated_state().index(), proxy_instance->getRegion(region_id)->getLatestCommitIndex()); + ASSERT_EQ(proxy_instance->getRegion(region_id2)->getApply().truncated_state().index(), proxy_instance->getRegion(region_id2)->getLatestCommitIndex()); + } + { + // An obsolete notification triggered by another region's flush shall not override. + kvs.notifyCompactLog(region_id, 1, 5, true, false); + ASSERT_EQ(proxy_instance->getRegion(region_id)->getApply().truncated_state().index(), proxy_instance->getRegion(region_id)->getLatestCommitIndex()); + } + { + // Passive flush and fg proactive flush of the same region will not deadlock, + // since they must be executed by order in one thread. + // Passive flush and fg proactive flush will not deadlock. + ai->store(0b1011); // Force fg + DB::FailPointHelper::enableFailPoint(DB::FailPoints::pause_passive_flush_before_persist_region); + auto f1 = [&]() { + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 60, 111); + // Trigger a fg flush on region_id + auto [index, term] = proxy_instance->rawWrite(region_id, {k1}, {value_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + auto [index2, term2] = proxy_instance->rawWrite(region_id, {k1}, {value_write}, {WriteCmdType::Put}, {ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2, std::make_optional(true)); + }; + auto f2 = [&]() { + auto r2 = proxy_instance->getRegion(region_id2); + auto && [request, response] = MockRaftStoreProxy::composeCompactLog(r2, 555); + auto && [index2, term] = proxy_instance->adminCommand(region_id2, std::move(request), std::move(response), 600); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index2); + }; + std::thread t1(f1); + std::thread t2(f2); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_passive_flush_before_persist_region); + t2.join(); + ASSERT_EQ(proxy_instance->getRegion(region_id2)->getApply().truncated_state().index(), 555); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_proactive_flush_before_persist_region); + t1.join(); + ASSERT_EQ(proxy_instance->getRegion(region_id2)->getApply().truncated_state().index(), 555); + } + DB::FailPointHelper::disableFailPoint(DB::FailPoints::proactive_flush_force_set_type); +} +CATCH + +TEST_F(RegionKVStoreTest, ProactiveFlushRecover1) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + std::shared_ptr> ai = std::make_shared>(); + // Safe to abort between flushCache and persistRegion. + DB::FailPointHelper::enableFailPoint(DB::FailPoints::proactive_flush_force_set_type, ai); + { + auto tp = prepareForProactiveFlushTest(); + auto table_id = std::get<0>(tp); + auto region_id = std::get<1>(tp); + auto region_id2 = std::get<2>(tp); + MockRaftStoreProxy::FailCond cond; + + DB::FailPointHelper::enableFailPoint(DB::FailPoints::proactive_flush_between_persist_cache_and_region); + KVStore & kvs = getKVS(); + auto && [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k2 = RecordKVFormat::genKey(table_id, 5, 111); + // Will not trigger a fg flush on region_id2 + auto [index2, term2] = proxy_instance->rawWrite(region_id2, {k2, k2}, {value_default, value_write}, {WriteCmdType::Put, WriteCmdType::Put}, {ColumnFamilyType::Default, ColumnFamilyType::Write}); + + // Abort before persistRegion, but with DM flushed. + cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE; + ai->store(0b1011); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id2, index2); + + // If reload here, the data is lost since we don't persistRegion. + // However, meta is not advanced either in KVStore or Proxy. + auto & kvs2 = reloadKVSFromDisk(); + auto kvr2 = kvs2.getRegion(region_id2); + auto r2 = proxy_instance->getRegion(region_id2); + ASSERT_EQ(kvr2->appliedIndex() + 1, index2); + ASSERT_EQ(r2->getLatestAppliedIndex() + 1, index2); + + cond.type = MockRaftStoreProxy::FailCond::Type::NORMAL; + ai->store(0b1010); + // No data lost. + proxy_instance->doApply(kvs2, ctx.getTMTContext(), cond, region_id2, index2); + auto [index22, term22] = proxy_instance->rawWrite(region_id2, {k2, k2}, {value_default, value_write}, {WriteCmdType::Put, WriteCmdType::Put}, {ColumnFamilyType::Default, ColumnFamilyType::Write}); + // There is no flush after write, so will throw when duplicate key. + EXPECT_THROW(proxy_instance->doApply(kvs2, ctx.getTMTContext(), cond, region_id2, index22), Exception); + + ai->store(0b1011); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::proactive_flush_between_persist_cache_and_region); + auto kvr1 = kvs2.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + auto && [value_write1, value_default1] = proxy_instance->generateTiKVKeyValue(111, 999); + auto k1 = RecordKVFormat::genKey(table_id, 60, 111); + // Trigger a fg flush on region_id + auto [index1, term1] = proxy_instance->rawWrite(region_id, {k1, k1}, {value_default1, value_write1}, {WriteCmdType::Put, WriteCmdType::Put}, {ColumnFamilyType::Default, ColumnFamilyType::Write}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index1); + + auto & kvs3 = reloadKVSFromDisk(); + { + auto kvr1 = kvs3.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_EQ(kvr1->appliedIndex(), r1->getLatestAppliedIndex()); + auto kvr2 = kvs3.getRegion(region_id2); + auto r2 = proxy_instance->getRegion(region_id2); + ASSERT_EQ(kvr2->appliedIndex(), r2->getLatestAppliedIndex()); + } + } + + DB::FailPointHelper::disableFailPoint(DB::FailPoints::proactive_flush_force_set_type); +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp index 95e98fe9eaa..2302e1cc484 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp @@ -303,6 +303,7 @@ void ReadIndexTest::testNormal() { std::vector reqs; { + // One request of start_ts = 10 for every region. reqs.reserve(proxy_instance.size()); for (size_t i = 0; i < proxy_instance.size(); ++i) { diff --git a/dbms/src/Storages/Transaction/tests/kvstore_helper.h b/dbms/src/Storages/Transaction/tests/kvstore_helper.h index 347b702f165..7ea0152ff3a 100644 --- a/dbms/src/Storages/Transaction/tests/kvstore_helper.h +++ b/dbms/src/Storages/Transaction/tests/kvstore_helper.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -53,6 +54,10 @@ namespace FailPoints { extern const char skip_check_segment_update[]; extern const char force_fail_in_flush_region_data[]; +extern const char pause_proactive_flush_before_persist_region[]; +extern const char proactive_flush_force_set_type[]; +extern const char pause_passive_flush_before_persist_region[]; +extern const char proactive_flush_between_persist_cache_and_region[]; } // namespace FailPoints namespace RegionBench @@ -92,6 +97,8 @@ class RegionKVStoreTest : public ::testing::Test kvstore->setStore(store); ASSERT_EQ(kvstore->getStoreID(), store.id()); } + + LOG_INFO(Logger::get("Test"), "Finished setup"); } void TearDown() override @@ -166,6 +173,7 @@ class RegionKVStoreTest : public ::testing::Test } protected: + std::tuple prepareForProactiveFlushTest(); static void testRaftMerge(KVStore & kvs, TMTContext & tmt); static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt);