Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Proactive flush of delta merge layer phase 1 #7558

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace DB
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk) \
M(force_triggle_background_merge_delta) \
M(force_triggle_foreground_flush) \
M(exception_before_mpp_register_non_root_mpp_task) \
M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \
M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \
Expand Down Expand Up @@ -101,7 +100,10 @@ namespace DB
M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed)
M(exception_after_large_write_exceed) \
M(proactive_flush_force_set_type) \
M(proactive_flush_between_persist_cache_and_region) \
M(proactive_flush_between_persist_regions)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand All @@ -114,13 +116,15 @@ namespace DB
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_query_init)
#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_query_init) \
M(pause_proactive_flush_before_persist_region) \
M(pause_passive_flush_before_persist_region)


#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ namespace DB
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
F(type_place_index_update, {"type", "place_index_update"}), \
F(type_compact_log_segment_bg, {"type", "compact_log_segment_bg"}), \
F(type_compact_log_segment_fg, {"type", "compact_log_segment_fg"}), \
F(type_compact_log_region_bg, {"type", "compact_log_region_bg"}), \
F(type_compact_log_region_fg, {"type", "compact_log_region_fg"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
Expand All @@ -157,7 +161,9 @@ namespace DB
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20}), \
F(type_compact_log_bg, {{"type", "compact_log_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_compact_log_fg, {{"type", "compact_log_fg"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
F(type_ingest, {"type", "ingest"}), /**/ \
Expand Down Expand Up @@ -255,6 +261,10 @@ namespace DB
F(type_write, {{"type", "write"}}, ExpBuckets{0.001, 2, 30})) \
M(tiflash_raft_write_data_to_storage_duration_seconds, "Bucketed histogram of writting region into storage layer", Histogram, \
F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_raft_raft_log_lag_count, "Bucketed histogram of applying write command Raft logs", Histogram, \
F(type_compact_index, {{"type", "compact_index"}}, EqualWidthBuckets{0, 200, 5})) \
M(tiflash_raft_raft_events_count, "Raft event counter", Counter, \
F(type_pre_exec_compact, {{"type", "pre_exec_compact"}})) \
/* required by DBaaS */ \
M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \
Expand Down
146 changes: 139 additions & 7 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
#include <Debug/MockSSTReader.h>
#include <Debug/MockTiDB.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ProxyFFICommon.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionMeta.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/tests/region_helper.h>
#include <TestUtils/TiFlashTestEnv.h>
#include <google/protobuf/text_format.h>


namespace DB
{
namespace RegionBench
Expand Down Expand Up @@ -134,6 +138,20 @@ KVGetStatus fn_get_region_local_state(RaftStoreProxyPtr ptr, uint64_t region_id,
return KVGetStatus::NotFound;
}

void fn_notify_compact_log(RaftStoreProxyPtr ptr, uint64_t region_id, uint64_t compact_index, uint64_t compact_term, uint64_t applied_index)
{
// Update flushed applied_index and truncated state.
auto & x = as_ref(ptr);
auto region = x.getRegion(region_id);
ASSERT(region);
LOG_INFO(&Poco::Logger::get("!!!!!"), "!!!! fn_notify_compact_log {} commit index {} applied_index {} compact_index {} compact_term {}", region_id, region->getLatestCommitIndex(), applied_index, compact_index, compact_term);
// `applied_index` in proxy's disk can still be less than the `applied_index` here when fg flush.
if (region && region->getApply().truncated_state().index() < compact_index)
{
region->updateTruncatedState(compact_index, compact_term);
}
}

TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr proxy_ptr)
{
TiFlashRaftProxyHelper res{};
Expand All @@ -143,6 +161,7 @@ TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreP
res.fn_make_async_waker = fn_make_async_waker;
res.fn_handle_batch_read_index = fn_handle_batch_read_index;
res.fn_get_region_local_state = fn_get_region_local_state;
res.fn_notify_compact_log = fn_notify_compact_log;
{
// make sure such function pointer will be set at most once.
static std::once_flag flag;
Expand Down Expand Up @@ -172,17 +191,26 @@ void MockProxyRegion::updateAppliedIndex(uint64_t index)

uint64_t MockProxyRegion::getLatestAppliedIndex()
{
return this->getApply().applied_index();
auto _ = genLockGuard();
return this->apply.applied_index();
}

uint64_t MockProxyRegion::getLatestCommitTerm()
{
return this->getApply().commit_term();
auto _ = genLockGuard();
return this->apply.commit_term();
}

uint64_t MockProxyRegion::getLatestCommitIndex()
{
return this->getApply().commit_index();
auto _ = genLockGuard();
return this->apply.commit_index();
}

void MockProxyRegion::updateTruncatedState(uint64_t index, uint64_t term)
{
this->apply.mutable_truncated_state()->set_index(index);
this->apply.mutable_truncated_state()->set_term(term);
}

void MockProxyRegion::updateCommitIndex(uint64_t index)
Expand Down Expand Up @@ -406,6 +434,38 @@ void MockRaftStoreProxy::debugAddRegions(
}
}

void MockRaftStoreProxy::loadRegionFromKVStore(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id)
{
UNUSED(tmt);
auto kvr = kvs.getRegion(region_id);
auto ori_r = getRegion(region_id);
auto commit_index = RAFT_INIT_LOG_INDEX;
auto commit_term = RAFT_INIT_LOG_TERM;
if (!ori_r)
{
regions.emplace(region_id, std::make_shared<MockProxyRegion>(region_id));
}
else
{
commit_index = ori_r->getLatestCommitIndex();
commit_term = ori_r->getLatestCommitTerm();
}
MockProxyRegionPtr r = getRegion(region_id);
{
r->state = kvr->mutMeta().getRegionState().getBase();
r->apply = kvr->mutMeta().clonedApplyState();
if (r->apply.commit_index() == 0)
{
r->apply.set_commit_index(commit_index);
r->apply.set_commit_term(commit_term);
}
}
LOG_INFO(log, "loadRegionFromKVStore [region_id={}] region_state {} apply_state {}", region_id, r->state.DebugString(), r->apply.DebugString());
}

std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::normalWrite(
UInt64 region_id,
std::vector<HandleID> && keys,
Expand Down Expand Up @@ -481,15 +541,20 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::rawWrite(
}


std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest && request, raft_cmdpb::AdminResponse && response)
std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::adminCommand(
UInt64 region_id,
raft_cmdpb::AdminRequest && request,
raft_cmdpb::AdminResponse && response,
std::optional<uint64_t> forced_index)
{
uint64_t index = 0;
uint64_t term = 0;
{
auto region = getRegion(region_id);
assert(region != nullptr);
// We have a new entry.
index = region->getLatestCommitIndex() + 1;
index = forced_index.value_or(region->getLatestCommitIndex() + 1);
RUNTIME_CHECK(index > region->getLatestCommitIndex());
term = region->getLatestCommitTerm();
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
Expand Down Expand Up @@ -520,6 +585,20 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id,
return adminCommand(region_id, std::move(request), std::move(response));
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeCompactLog(MockProxyRegionPtr region, UInt64 compact_index)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.contains(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[compact_index].term);
}
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2)
{
raft_cmdpb::AdminRequest request;
Expand Down Expand Up @@ -603,7 +682,8 @@ void MockRaftStoreProxy::doApply(
TMTContext & tmt,
const FailCond & cond,
UInt64 region_id,
uint64_t index)
uint64_t index,
std::optional<bool> check_proactive_flush)
{
auto region = getRegion(region_id);
assert(region != nullptr);
Expand Down Expand Up @@ -652,10 +732,39 @@ void MockRaftStoreProxy::doApply(
if (cmd.has_raw_write_request())
{
// TiFlash write
kvs.handleWriteRaftCmd(std::move(request), region_id, index, term, tmt);
DB::DM::WriteResult write_task;
kvs.handleWriteRaftCmdDebug(std::move(request), region_id, index, term, tmt, write_task);
if (check_proactive_flush)
{
if (check_proactive_flush.value())
{
// fg flush
ASSERT(write_task.has_value());
}
else
{
// bg flush
ASSERT(!write_task.has_value());
}
}
}
if (cmd.has_admin_request())
{
if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::CompactLog)
{
auto res = kvs.tryFlushRegionData(region_id, false, true, tmt, index, term, region->getApply().truncated_state().index(), region->getApply().truncated_state().term());
auto compact_index = cmd.admin().request.compact_log().compact_index();
auto compact_term = cmd.admin().request.compact_log().compact_term();
if (!res)
{
LOG_DEBUG(log, "mock pre exec reject");
}
else
{
region->updateTruncatedState(compact_index, compact_term);
LOG_DEBUG(log, "mock pre exec success, update to {},{}", compact_index, compact_term);
}
}
kvs.handleAdminRaftCmd(std::move(cmd.admin().request), std::move(cmd.admin().response), region_id, index, term, tmt);
}

Expand All @@ -673,9 +782,18 @@ void MockRaftStoreProxy::doApply(
// TODO We should remove (0, index] here, it is enough to remove exactly index now.
region->commands.erase(i);
}
else if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::BatchSplit)
{
for (auto && sp : cmd.admin().response.splits().regions())
{
auto r = sp.id();
loadRegionFromKVStore(kvs, tmt, r);
}
}
}

// Proxy advance
// We currently consider a flush for every command for simplify.
if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE)
return;
region->updateAppliedIndex(index);
Expand Down Expand Up @@ -822,6 +940,20 @@ TableID MockRaftStoreProxy::bootstrapTable(
return table_id;
}

std::pair<std::string, std::string> MockRaftStoreProxy::generateTiKVKeyValue(uint64_t tso, int64_t t) const
{
WriteBufferFromOwnString buff;
writeChar(RecordKVFormat::CFModifyFlag::PutFlag, buff);
EncodeVarUInt(tso, buff);
std::string value_write = buff.releaseStr();
buff.restart();
auto && table_info = MockTiDB::instance().getTableInfoByID(table_id);
std::vector<Field> f{Field{std::move(t)}};
encodeRowV1(*table_info, f, buff);
std::string value_default = buff.releaseStr();
return std::make_pair(value_write, value_default);
}

void GCMonitor::add(RawObjType type, int64_t diff)
{
auto _ = genLockGuard();
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct MockProxyRegion : MutexLockWrap
uint64_t getLatestCommitTerm();
uint64_t getLatestCommitIndex();
void updateCommitIndex(uint64_t index);
void updateTruncatedState(uint64_t index, uint64_t term);
void setSate(raft_serverpb::RegionLocalState);
explicit MockProxyRegion(uint64_t id);
UniversalWriteBatch persistMeta();
Expand Down Expand Up @@ -209,6 +210,11 @@ struct MockRaftStoreProxy : MutexLockWrap
std::vector<UInt64> region_ids,
std::vector<std::pair<std::string, std::string>> && ranges);

void loadRegionFromKVStore(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id);

/// We assume that we generate one command, and immediately commit.
/// Normal write to a region.
std::tuple<uint64_t, uint64_t> normalWrite(
Expand All @@ -228,8 +234,9 @@ struct MockRaftStoreProxy : MutexLockWrap
/// Create a compactLog admin command, returns (index, term) of the admin command itself.
std::tuple<uint64_t, uint64_t> compactLog(UInt64 region_id, UInt64 compact_index);

std::tuple<uint64_t, uint64_t> adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest &&, raft_cmdpb::AdminResponse &&);
std::tuple<uint64_t, uint64_t> adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest &&, raft_cmdpb::AdminResponse &&, std::optional<uint64_t> forced_index = std::nullopt);

static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeCompactLog(MockProxyRegionPtr region, UInt64 compact_index);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2 = true);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composePrepareMerge(metapb::Region && target, UInt64 min_index);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeCommitMerge(metapb::Region && source, UInt64 commit);
Expand Down Expand Up @@ -280,7 +287,8 @@ struct MockRaftStoreProxy : MutexLockWrap
TMTContext & tmt,
const FailCond & cond,
UInt64 region_id,
uint64_t index);
uint64_t index,
std::optional<bool> check_proactive_flush = std::nullopt);

void replay(
KVStore & kvs,
Expand All @@ -294,6 +302,8 @@ struct MockRaftStoreProxy : MutexLockWrap
regions.clear();
}

std::pair<std::string, std::string> generateTiKVKeyValue(uint64_t tso, int64_t t) const;

MockRaftStoreProxy()
{
log = Logger::get("MockRaftStoreProxy");
Expand Down
Loading