Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Reduce lock contention in RegionPersister::doPersist (release-6.5) #8593

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ namespace DB
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task)
M(pause_before_register_non_root_mpp_task) \
M(pause_when_persist_region)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
Expand All @@ -131,7 +132,8 @@ namespace DB
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint)
M(random_min_tso_scheduler_failpoint) \
M(random_region_persister_latency_failpoint)

namespace FailPoints
{
Expand Down
28 changes: 26 additions & 2 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
Expand Down Expand Up @@ -46,6 +47,8 @@ namespace FailPoints
{
extern const char force_enable_region_persister_compatible_mode[];
extern const char force_disable_region_persister_compatible_mode[];
extern const char pause_when_persist_region[];
extern const char random_region_persister_latency_failpoint[];
} // namespace FailPoints

void RegionPersister::drop(RegionID region_id, const RegionTaskLock &)
Expand Down Expand Up @@ -107,8 +110,6 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
{
auto & [region_id, buffer, region_size, applied_index] = region_write_buffer;

std::lock_guard lock(mutex);

if (page_reader)
{
auto entry = page_reader->getPageEntry(region_id);
Expand Down Expand Up @@ -142,6 +143,29 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
wb.putPage(region_id, applied_index, read_buf, region_size);
stable_page_storage->write(std::move(wb));
}

#ifdef FIU_ENABLE
fiu_do_on(FailPoints::pause_when_persist_region, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::pause_when_persist_region); v)
{
// Only pause for the given region_id
auto pause_region_id = std::any_cast<RegionID>(v.value());
if (region_id == pause_region_id)
{
SYNC_FOR("before_RegionPersister::persist_write_done");
}
}
else
{
// Pause for all persisting requests
SYNC_FOR("before_RegionPersister::persist_write_done");
}
});
fiu_do_on(FailPoints::random_region_persister_latency_failpoint, {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1ms);
});
#endif
}

RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class RegionPersister final : private boost::noncopyable

NamespaceId ns_id = KVSTORE_NAMESPACE_ID;
const RegionManager & region_manager;
std::mutex mutex;
LoggerPtr log;
};
} // namespace DB
73 changes: 73 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/Ctl.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
Expand All @@ -29,13 +30,16 @@
#include <common/logger_useful.h>

#include <ext/scope_guard.h>
#include <future>

namespace DB
{
namespace FailPoints
{
extern const char force_enable_region_persister_compatible_mode[];
extern const char force_disable_region_persister_compatible_mode[];
extern const char force_region_persist_version[];
extern const char pause_when_persist_region[];
} // namespace FailPoints

namespace tests
Expand Down Expand Up @@ -239,6 +243,75 @@ class RegionPersisterTest : public ::testing::Test
LoggerPtr log;
};

TEST_F(RegionPersisterTest, Concurrency)
try
{
RegionManager region_manager;

auto ctx = TiFlashTestEnv::getGlobalContext();

RegionMap regions;
const TableID table_id = 100;

PageStorageConfig config;
config.file_roll_size = 128 * MB;

UInt64 diff = 0;
RegionPersister persister(ctx, region_manager);
persister.restore(*mocked_path_pool, nullptr, config);

// Persist region by region
const RegionID region_100 = 100;
FailPointHelper::enableFailPoint(FailPoints::pause_when_persist_region, region_100);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::pause_when_persist_region); });

auto sp_persist_region_100 = SyncPointCtl::enableInScope("before_RegionPersister::persist_write_done");
auto th_persist_region_100 = std::async([&]() {
auto region_task_lock = region_manager.genRegionTaskLock(region_100);

auto region = std::make_shared<Region>(createRegionMeta(region_100, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_100, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
});
LOG_INFO(log, "paused before persisting region 100");
sp_persist_region_100.waitAndPause();

LOG_INFO(log, "before persisting region 101");
const RegionID region_101 = 101;
{
auto region_task_lock = region_manager.genRegionTaskLock(region_101);

auto region = std::make_shared<Region>(createRegionMeta(region_101, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_101, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
}
LOG_INFO(log, "after persisting region 101");

sp_persist_region_100.next();
th_persist_region_100.get();

LOG_INFO(log, "finished");
}
CATCH

TEST_F(RegionPersisterTest, persister)
try
{
Expand Down