Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Reduce lock contention in RegionPersister::doPersist #8584

Merged
merged 5 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ namespace DB
M(pause_query_init) \
M(pause_before_prehandle_snapshot) \
M(pause_before_prehandle_subtask) \
M(pause_when_persist_region) \
M(pause_before_wn_establish_task) \
M(pause_passive_flush_before_persist_region)

Expand Down Expand Up @@ -161,6 +162,7 @@ namespace DB
M(random_pipeline_model_execute_prefix_failpoint) \
M(random_pipeline_model_execute_suffix_failpoint) \
M(random_spill_to_disk_failpoint) \
M(random_region_persister_latency_failpoint) \
M(random_restore_from_disk_failpoint) \
M(random_exception_when_connect_local_tunnel) \
M(random_exception_when_construct_async_request_handler) \
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,7 @@ dt_page_gc_low_write_prob = 0.2
return;
}
auto & global_path_pool = global_ctx.getPathPool();
RegionManager region_manager;
RegionPersister persister(global_ctx, region_manager);
RegionPersister persister(global_ctx);
persister.restore(global_path_pool, nullptr, PageStorageConfig{});

auto verify_persister_reload_config = [&global_ctx](RegionPersister & persister) {
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Decode/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/nocopyable.h>
#include <Core/Block.h>
#include <Core/Names.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/DeltaMerge/ExternalDTFileInfo.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/KVStore/Decode/RegionDataRead.h>
Expand All @@ -41,7 +42,6 @@ struct TableInfo;
namespace DB
{
struct ColumnsDescription;
class Context;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
class TMTContext;
Expand Down
39 changes: 15 additions & 24 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ extern const char force_not_clean_fap_on_destroy[];

KVStore::KVStore(Context & context)
: region_persister(
context.getSharedContextDisagg()->isDisaggregatedComputeMode()
? nullptr
: std::make_unique<RegionPersister>(context, region_manager))
context.getSharedContextDisagg()->isDisaggregatedComputeMode() ? nullptr
: std::make_unique<RegionPersister>(context))
, raft_cmd_res(std::make_unique<RaftCommandResult>())
, log(Logger::get())
, region_compact_log_min_rows(40 * 1024)
Expand Down Expand Up @@ -375,33 +374,25 @@ void KVStore::setRegionCompactLogConfig(UInt64 rows, UInt64 bytes, UInt64 gap, U

void KVStore::persistRegion(
const Region & region,
std::optional<const RegionTaskLock *> region_task_lock,
const RegionTaskLock & region_task_lock,
PersistRegionReason reason,
const char * extra_msg) const
{
RUNTIME_CHECK_MSG(
region_persister,
"try access to region_persister without initialization, stack={}",
StackTrace().toString());
if (region_task_lock.has_value())
{
auto reason_id = magic_enum::enum_underlying(reason);
std::string caller = fmt::format("{} {}", PersistRegionReasonMap[reason_id], extra_msg);
LOG_INFO(
log,
"Start to persist {}, cache size: {} bytes for `{}`",
region.getDebugString(),
region.dataSize(),
caller);
region_persister->persist(region, *region_task_lock.value());
LOG_DEBUG(log, "Persist {} done", region.toString(false));
}
else
{
LOG_INFO(log, "Try to persist {}", region.toString(false));
region_persister->persist(region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize());
}

auto reason_id = magic_enum::enum_underlying(reason);
std::string caller = fmt::format("{} {}", PersistRegionReasonMap[reason_id], extra_msg);
LOG_INFO(
log,
"Start to persist {}, cache size: {} bytes for `{}`",
region.getDebugString(),
region.dataSize(),
caller);
region_persister->persist(region, region_task_lock);
LOG_DEBUG(log, "Persist {} done, cache size: {} bytes", region.toString(false), region.dataSize());

switch (reason)
{
Expand Down Expand Up @@ -611,7 +602,7 @@ bool KVStore::forceFlushRegionDataImpl(
}

// flush cache in storage level is done, persist the region info
persistRegion(curr_region, &region_task_lock, PersistRegionReason::Flush, "");
persistRegion(curr_region, region_task_lock, PersistRegionReason::Flush, "");
// CompactLog will be done in proxy soon, we advance the eager truncate index in TiFlash
curr_region.updateRaftLogEagerIndex(index);
curr_region.cleanApproxMemCacheInfo();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Interpreters/Context_fwd.h>
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/KVStore/Decode/RegionDataRead.h>
Expand All @@ -32,7 +33,6 @@ struct TableInfo;
}
namespace DB
{
class Context;
namespace RegionBench
{
extern void concurrentBatchInsert(const TiDB::TableInfo &, Int64, Int64, Int64, UInt64, UInt64, Context &);
Expand Down Expand Up @@ -357,7 +357,7 @@ class KVStore final : private boost::noncopyable

void persistRegion(
const Region & region,
std::optional<const RegionTaskLock *> region_task_lock,
const RegionTaskLock & region_task_lock,
PersistRegionReason reason,
const char * extra_msg) const;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
old_region->setStateApplying();
tmt.getRegionTable().tryWriteBlockByRegion(old_region);
tryFlushRegionCacheInStorage(tmt, *old_region, log);
persistRegion(*old_region, &region_lock, PersistRegionReason::ApplySnapshotPrevRegion, "");
persistRegion(*old_region, region_lock, PersistRegionReason::ApplySnapshotPrevRegion, "");
}
}

Expand Down Expand Up @@ -303,7 +303,7 @@ void KVStore::onSnapshot(
}

GET_METRIC(tiflash_raft_write_flow_bytes, type_snapshot_uncommitted).Observe(new_region->dataSize());
persistRegion(*new_region, &region_lock, PersistRegionReason::ApplySnapshotCurRegion, "");
persistRegion(*new_region, region_lock, PersistRegionReason::ApplySnapshotCurRegion, "");

tmt.getRegionTable().shrinkRegionRange(*new_region);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(
{
// We always try to flush dm cache and region if possible for every IngestSST,
// in order to have the raft log truncated and sst deleted.
persistRegion(*region, &region_task_lock, PersistRegionReason::IngestSst, "");
persistRegion(*region, region_task_lock, PersistRegionReason::IngestSst, "");
return EngineStoreApplyRes::Persist;
}
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmdInner(
/// We should execute eager RaftLog GC, persist the Region in both TiFlash and proxy
// Persist RegionMeta on the storage engine
tryFlushRegionCacheInStorage(tmt, *region, Logger::get());
persistRegion(*region, &region_persist_lock, PersistRegionReason::EagerRaftGc, "");
persistRegion(*region, region_persist_lock, PersistRegionReason::EagerRaftGc, "");
// return "Persist" to proxy for persisting the RegionMeta
apply_res = EngineStoreApplyRes::Persist;
}
Expand Down Expand Up @@ -133,7 +133,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(
curr_region,
&region_task_lock,
region_task_lock,
PersistRegionReason::UselessAdminCommand,
raft_cmdpb::AdminCmdType_Name(cmd_type).c_str());
return EngineStoreApplyRes::Persist;
Expand Down Expand Up @@ -236,7 +236,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(

const auto persist_and_sync = [&](const Region & region) {
tryFlushRegionCacheInStorage(tmt, region, log);
persistRegion(region, &region_task_lock, PersistRegionReason::AdminCommand, "");
persistRegion(region, region_task_lock, PersistRegionReason::AdminCommand, "");
};

const auto handle_batch_split = [&](Regions & split_regions) {
Expand Down
53 changes: 35 additions & 18 deletions dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand All @@ -24,12 +25,16 @@
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/WriteBatchImpl.h>
#include <Storages/Page/WriteBatchWrapperImpl.h>
#include <Storages/PathPool.h>
#include <common/logger_useful.h>
#include <fiu.h>

#include <chrono>
#include <magic_enum.hpp>
#include <memory>
#include <thread>

namespace CurrentMetrics
{
Expand All @@ -42,6 +47,11 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes
namespace FailPoints
{
extern const char pause_when_persist_region[];
extern const char random_region_persister_latency_failpoint[];
} // namespace FailPoints

void RegionPersister::drop(RegionID region_id, const RegionTaskLock &)
{
Expand Down Expand Up @@ -75,25 +85,12 @@ size_t RegionPersister::computeRegionWriteBuffer(const Region & region, WriteBuf
}

void RegionPersister::persist(const Region & region, const RegionTaskLock & lock)
{
doPersist(region, &lock);
}

void RegionPersister::persist(const Region & region)
{
doPersist(region, nullptr);
}

void RegionPersister::doPersist(const Region & region, const RegionTaskLock * lock)
{
// Support only one thread persist.
RegionCacheWriteElement region_buffer;
computeRegionWriteBuffer(region, region_buffer);

if (lock)
doPersist(region_buffer, *lock, region);
else
doPersist(region_buffer, region_manager.genRegionTaskLock(region.id()), region);
doPersist(region_buffer, lock, region);
}

void RegionPersister::doPersist(
Expand All @@ -103,8 +100,6 @@ void RegionPersister::doPersist(
{
auto & [region_id, buffer, region_size, applied_index] = region_write_buffer;

std::lock_guard lock(mutex);

auto entry = page_reader->getPageEntry(region_id);
if (entry.isValid() && entry.tag > applied_index)
return;
Expand All @@ -121,13 +116,35 @@ void RegionPersister::doPersist(
wb.putPage(region_id, applied_index, read_buf, region_size);
page_writer->write(std::move(wb), global_context.getWriteLimiter());

#ifdef FIU_ENABLE
fiu_do_on(FailPoints::pause_when_persist_region, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::pause_when_persist_region); v)
{
// Only pause for the given region_id
auto pause_region_id = std::any_cast<RegionID>(v.value());
if (region_id == pause_region_id)
{
SYNC_FOR("before_RegionPersister::persist_write_done");
}
}
else
{
// Pause for all persisting requests
SYNC_FOR("before_RegionPersister::persist_write_done");
}
});
fiu_do_on(FailPoints::random_region_persister_latency_failpoint, {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1ms);
});
#endif

region.updateLastCompactLogApplied(region_task_lock);
}

RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_)
RegionPersister::RegionPersister(Context & global_context_)
: global_context(global_context_)
, run_mode(global_context.getPageStorageRunMode())
, region_manager(region_manager_)
, log(Logger::get())
{}

Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Storages/KVStore/MultiRaft/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

#include <Common/Logger.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/KVStore/Types.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/Page/WriteBatchImpl.h>

namespace DB
{
class Context;

class PathPool;
class Region;
using RegionPtr = std::shared_ptr<Region>;
Expand All @@ -39,10 +37,9 @@ struct TiFlashRaftProxyHelper;
class RegionPersister final : private boost::noncopyable
{
public:
RegionPersister(Context & global_context_, const RegionManager & region_manager_);
explicit RegionPersister(Context & global_context_);

void drop(RegionID region_id, const RegionTaskLock &);
void persist(const Region & region);
void persist(const Region & region, const RegionTaskLock & lock);
RegionMap restore(
PathPool & path_pool,
Expand All @@ -62,9 +59,7 @@ class RegionPersister final : private boost::noncopyable
void forceTransformKVStoreV2toV3();

void doPersist(RegionCacheWriteElement & region_write_buffer, const RegionTaskLock & lock, const Region & region);
void doPersist(const Region & region, const RegionTaskLock * lock);

private:
inline std::variant<String, NamespaceID> getWriteBatchPrefix() const
{
switch (run_mode)
Expand All @@ -82,9 +77,7 @@ class RegionPersister final : private boost::noncopyable
PageWriterPtr page_writer;
PageReaderPtr page_reader;

NamespaceID ns_id = KVSTORE_NAMESPACE_ID;
const RegionManager & region_manager;
std::mutex mutex;
const NamespaceID ns_id = KVSTORE_NAMESPACE_ID;
LoggerPtr log;
};
} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Storages/KVStore/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Interpreters/Context_fwd.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/GCManager.h>
#include <Storages/KVStore/Decode/RegionTable.h>
Expand All @@ -23,8 +24,6 @@

namespace DB
{
class Context;

class PathPool;

class KVStore;
Expand Down
Loading