Skip to content

Commit

Permalink
KVStore: Reduce lock contention in RegionPersister::doPersist (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Dec 26, 2023
1 parent 9f7e054 commit 78846de
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 4 deletions.
7 changes: 5 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ namespace DB
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task)
M(pause_before_register_non_root_mpp_task) \
M(pause_when_persist_region)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
Expand Down Expand Up @@ -148,7 +149,9 @@ namespace DB
M(random_pipeline_model_cancel_failpoint) \
M(random_spill_to_disk_failpoint) \
M(random_restore_from_disk_failpoint) \
M(random_exception_when_connect_local_tunnel)
M(random_exception_when_connect_local_tunnel) \
M(random_region_persister_latency_failpoint)

namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
Expand Down
31 changes: 30 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
Expand Down Expand Up @@ -45,6 +46,12 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char pause_when_persist_region[];
extern const char random_region_persister_latency_failpoint[];
} // namespace FailPoints

void RegionPersister::drop(RegionID region_id, const RegionTaskLock &)
{
DB::WriteBatchWrapper wb{run_mode, getWriteBatchPrefix()};
Expand Down Expand Up @@ -95,7 +102,6 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
{
auto & [region_id, buffer, region_size, applied_index] = region_write_buffer;

std::lock_guard lock(mutex);

auto entry = page_reader->getPageEntry(region_id);
if (entry.isValid() && entry.tag > applied_index)
Expand All @@ -112,6 +118,29 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
DB::WriteBatchWrapper wb{run_mode, getWriteBatchPrefix()};
wb.putPage(region_id, applied_index, read_buf, region_size);
page_writer->write(std::move(wb), global_context.getWriteLimiter());

#ifdef FIU_ENABLE
fiu_do_on(FailPoints::pause_when_persist_region, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::pause_when_persist_region); v)
{
// Only pause for the given region_id
auto pause_region_id = std::any_cast<RegionID>(v.value());
if (region_id == pause_region_id)
{
SYNC_FOR("before_RegionPersister::persist_write_done");
}
}
else
{
// Pause for all persisting requests
SYNC_FOR("before_RegionPersister::persist_write_done");
}
});
fiu_do_on(FailPoints::random_region_persister_latency_failpoint, {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1ms);
});
#endif
}

RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class RegionPersister final : private boost::noncopyable

NamespaceID ns_id = KVSTORE_NAMESPACE_ID;
const RegionManager & region_manager;
std::mutex mutex;
LoggerPtr log;
};
} // namespace DB
76 changes: 76 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/Ctl.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/Context.h>
Expand All @@ -32,9 +33,15 @@
#include <common/types.h>

#include <ext/scope_guard.h>
#include <future>

namespace DB
{
namespace FailPoints
{
extern const char force_region_persist_version[];
extern const char pause_when_persist_region[];
} // namespace FailPoints

namespace tests
{
Expand Down Expand Up @@ -257,6 +264,75 @@ class RegionPersisterTest
LoggerPtr log;
};

TEST_P(RegionPersisterTest, Concurrency)
try
{
RegionManager region_manager;

auto ctx = TiFlashTestEnv::getGlobalContext();

RegionMap regions;
const TableID table_id = 100;

PageStorageConfig config;
config.file_roll_size = 128 * MB;

UInt64 diff = 0;
RegionPersister persister(ctx, region_manager);
persister.restore(*mocked_path_pool, nullptr, config);

// Persist region by region
const RegionID region_100 = 100;
FailPointHelper::enableFailPoint(FailPoints::pause_when_persist_region, region_100);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::pause_when_persist_region); });

auto sp_persist_region_100 = SyncPointCtl::enableInScope("before_RegionPersister::persist_write_done");
auto th_persist_region_100 = std::async([&]() {
auto region_task_lock = region_manager.genRegionTaskLock(region_100);

auto region = std::make_shared<Region>(createRegionMeta(region_100, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_100, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
});
LOG_INFO(log, "paused before persisting region 100");
sp_persist_region_100.waitAndPause();

LOG_INFO(log, "before persisting region 101");
const RegionID region_101 = 101;
{
auto region_task_lock = region_manager.genRegionTaskLock(region_101);

auto region = std::make_shared<Region>(createRegionMeta(region_101, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_101, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
}
LOG_INFO(log, "after persisting region 101");

sp_persist_region_100.next();
th_persist_region_100.get();

LOG_INFO(log, "finished");
}
CATCH

TEST_P(RegionPersisterTest, persister)
try
{
Expand Down

0 comments on commit 78846de

Please sign in to comment.