Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <Common/TiFlashMetrics.h>
#include <common/defines.h>

#include <magic_enum.hpp>

namespace DB
{
TiFlashMetrics & TiFlashMetrics::instance()
Expand Down Expand Up @@ -66,6 +68,11 @@ TiFlashMetrics::TiFlashMetrics()

registered_storage_thread_memory_usage_family
= &prometheus::BuildGauge().Name(storages_thread_memory_usage).Help("").Register(*registry);

registered_storage_ru_read_bytes_family = &prometheus::BuildCounter()
.Name("tiflash_storage_ru_read_bytes")
.Help("Read bytes for storage RU calculation")
.Register(*registry);
}

void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru)
Expand Down Expand Up @@ -210,4 +217,36 @@ void TiFlashMetrics::setProvideProxyProcessMetrics(bool v)
process_collector->include_proxy_metrics = v;
}

prometheus::Counter & TiFlashMetrics::getStorageRUReadBytesCounter(
KeyspaceID keyspace,
const String & resource_group,
DM::ReadRUType type)
{
auto key = fmt::format("{}_{}_{}", keyspace, resource_group, magic_enum::enum_name(type));

// Fast path
{
std::shared_lock lock(storage_ru_read_bytes_mtx);
auto it = registered_storage_ru_read_bytes_metrics.find(key);
if (it != registered_storage_ru_read_bytes_metrics.end())
return *(it->second);
}

// Create counter for new keyspace/resource_group/type.
{
std::unique_lock lock(storage_ru_read_bytes_mtx);
// double-check: other threads may create the same counter
auto it = registered_storage_ru_read_bytes_metrics.find(key);
if (it != registered_storage_ru_read_bytes_metrics.end())
return *(it->second);

prometheus::Labels labels
= {{"keyspace", std::to_string(keyspace)},
{"resource_group", resource_group},
{"type", std::string(magic_enum::enum_name(type))}};
auto & counter = registered_storage_ru_read_bytes_family->Add(labels);
registered_storage_ru_read_bytes_metrics[key] = &counter;
return counter;
}
}
} // namespace DB
14 changes: 13 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ProcessCollector_fwd.h>
#include <Common/TiFlashBuildInfo.h>
#include <Common/nocopyable.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <common/types.h>
#include <prometheus/counter.h>
#include <prometheus/exposer.h>
Expand Down Expand Up @@ -1169,6 +1170,8 @@ namespace tests
struct TiFlashMetricsHelper;
}

using KeyspaceID = UInt32;

/// Centralized registry of TiFlash metrics.
/// Cope with MetricsPrometheus by registering
/// profile events, current metrics and customized metrics (as individual member for caller to access) into registry ahead of being updated.
Expand All @@ -1193,6 +1196,11 @@ class TiFlashMetrics
void registerStorageThreadMemory(const std::string & k);
void setProvideProxyProcessMetrics(bool v);

prometheus::Counter & getStorageRUReadBytesCounter(
KeyspaceID keyspace,
const String & resource_group,
const DM::ReadRUType type);

private:
TiFlashMetrics();

Expand All @@ -1213,7 +1221,6 @@ class TiFlashMetrics
std::unordered_map<std::string, prometheus::Gauge *> registered_async_metrics;

prometheus::Family<prometheus::Gauge> * registered_keypace_store_used_family;
using KeyspaceID = UInt32;
std::unordered_map<KeyspaceID, prometheus::Gauge *> registered_keypace_store_used_metrics;
prometheus::Gauge * store_used_total_metric;

Expand All @@ -1230,6 +1237,11 @@ class TiFlashMetrics
std::shared_mutex storage_thread_report_mtx;
std::unordered_map<std::string, prometheus::Gauge *> registered_storage_thread_memory_usage_metrics;

prometheus::Family<prometheus::Counter> * registered_storage_ru_read_bytes_family;
std::shared_mutex storage_ru_read_bytes_mtx;
// {keyspace}_{resource_group}_{type} -> Counter
std::unordered_map<std::string, prometheus::Counter *> registered_storage_ru_read_bytes_metrics;

public:
#define MAKE_METRIC_MEMBER_M(family_name, help, type, ...) \
MetricFamily<prometheus::type> family_name \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ UInt64 DAGContext::getReadBytes() const
for (const auto & [id, sc] : scan_context_map)
{
(void)id; // Disable unused variable warnning.
read_bytes += sc->user_read_bytes;
read_bytes += sc->userReadBytes();
}
return read_bytes;
}
Expand Down
20 changes: 9 additions & 11 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ namespace DB::DM
template <bool need_row_id>
ConcatSkippableBlockInputStream<need_row_id>::ConcatSkippableBlockInputStream(
SkippableBlockInputStreams inputs_,
const ScanContextPtr & scan_context_)
const ScanContextPtr & scan_context_,
ReadTag read_tag_)
: rows(inputs_.size(), 0)
, precede_stream_rows(0)
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
, lac_bytes_collector(scan_context_ ? scan_context_->newLACBytesCollector(read_tag_) : std::nullopt)
, read_tag(read_tag_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand All @@ -37,11 +39,13 @@ template <bool need_row_id>
ConcatSkippableBlockInputStream<need_row_id>::ConcatSkippableBlockInputStream(
SkippableBlockInputStreams inputs_,
std::vector<size_t> && rows_,
const ScanContextPtr & scan_context_)
const ScanContextPtr & scan_context_,
ReadTag read_tag_)
: rows(std::move(rows_))
, precede_stream_rows(0)
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
, lac_bytes_collector(scan_context_ ? scan_context_->newLACBytesCollector(read_tag_) : std::nullopt)
, read_tag(read_tag_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand Down Expand Up @@ -178,13 +182,7 @@ template <bool need_row_id>
void ConcatSkippableBlockInputStream<need_row_id>::addReadBytes(UInt64 bytes)
{
if (likely(scan_context != nullptr))
{
scan_context->user_read_bytes += bytes;
if constexpr (!need_row_id)
{
lac_bytes_collector.collect(bytes);
}
}
scan_context->addUserReadBytes(bytes, read_tag, lac_bytes_collector);
}

template class ConcatSkippableBlockInputStream<false>;
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@

#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>
#include <Storages/DeltaMerge/VectorIndexBlockInputStream.h>


namespace DB::DM
{

template <bool need_row_id = false>
class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
public:
ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, const ScanContextPtr & scan_context_);
ConcatSkippableBlockInputStream(
SkippableBlockInputStreams inputs_,
const ScanContextPtr & scan_context_,
ReadTag read_tag_);

ConcatSkippableBlockInputStream(
SkippableBlockInputStreams inputs_,
std::vector<size_t> && rows_,
const ScanContextPtr & scan_context_);
const ScanContextPtr & scan_context_,
ReadTag read_tag_);

void appendChild(SkippableBlockInputStreamPtr child, size_t rows_);

Expand Down Expand Up @@ -65,7 +69,8 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
std::vector<size_t> rows;
size_t precede_stream_rows;
const ScanContextPtr scan_context;
LACBytesCollector lac_bytes_collector;
std::optional<LACBytesCollector> lac_bytes_collector;
ReadTag read_tag;
};

class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream
Expand Down
16 changes: 13 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Core/Block.h>
#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h>
#include <Storages/DeltaMerge/Delta/MemTableSet.h>
#include <Storages/DeltaMerge/DeltaIndex.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/Page/PageDefinesBase.h>


namespace DB::DM
{
namespace tests
Expand Down Expand Up @@ -456,8 +457,17 @@ class DeltaValueReader
ColumnDefinesPtr col_defs;
RowKeyRange segment_range;

const DMContext & dm_context;
ReadTag read_tag;
std::optional<LACBytesCollector> lac_bytes_collector;

private:
DeltaValueReader() = default;
DeltaValueReader(const DMContext & dm_context_, ReadTag read_tag_)
: dm_context(dm_context_)
, read_tag(read_tag_)
, lac_bytes_collector(
dm_context.scan_context ? dm_context.scan_context->newLACBytesCollector(read_tag_) : std::nullopt)
{}

public:
DeltaValueReader(
Expand Down
24 changes: 22 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>

Expand Down Expand Up @@ -87,11 +88,15 @@ DeltaValueReader::DeltaValueReader(
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
, dm_context(context)
, read_tag(read_tag_)
, lac_bytes_collector(
dm_context.scan_context ? dm_context.scan_context->newLACBytesCollector(read_tag_) : std::nullopt)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag_)
{
auto * new_reader = new DeltaValueReader();
auto * new_reader = new DeltaValueReader(dm_context, read_tag_);
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
Expand All @@ -102,6 +107,14 @@ DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & n
return std::shared_ptr<DeltaValueReader>(new_reader);
}

static size_t columnsBytes(const MutableColumns & columns)
{
size_t bytes = 0;
for (const auto & col : columns)
bytes += col->byteSize();
return bytes;
}

size_t DeltaValueReader::readRows(
MutableColumns & output_cols,
size_t offset,
Expand Down Expand Up @@ -132,6 +145,7 @@ size_t DeltaValueReader::readRows(
? 0
: std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);

const auto initial_bytes = columnsBytes(output_cols);
size_t actual_read = 0;
size_t persisted_read_rows = 0;
if (persisted_files_start < persisted_files_end)
Expand Down Expand Up @@ -163,6 +177,12 @@ size_t DeltaValueReader::readRows(
[mem_table_rows_offset](UInt32 id) { return id + mem_table_rows_offset; });
}

if (dm_context.scan_context)
{
const auto final_bytes = columnsBytes(output_cols);
dm_context.scan_context->addUserReadBytes(final_bytes - initial_bytes, read_tag, lac_bytes_collector);
}

return actual_read;
}

Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/ReadMode.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,16 @@ enum class ReadMode

enum class ReadTag
{
Internal, // Read columns required by some internal tasks.
Internal, // Read columns required by some internal tasks, such as building delta index, building version chain, compaction.
Query, // Read columns required by queries.
MVCC, // Read columns to build MVCC bitmap.
LMFilter, // Read columns required by late-materialization filter.
};

enum class ReadRUType
{
MVCC_ESTIMATE,
MVCC_READ,
QUERY_READ,
};
} // namespace DB::DM
36 changes: 35 additions & 1 deletion dbms/src/Storages/DeltaMerge/ScanContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ String ScanContext::toJson() const
json->set("num_local_region", total_local_region_num.load());
json->set("num_stale_read", num_stale_read.load());

json->set("read_bytes", user_read_bytes.load());
json->set("query_read_bytes", query_read_bytes.load());
json->set("mvcc_read_bytes", mvcc_read_bytes.load());

if (disagg_read_cache_hit_size.load() > 0 && disagg_read_cache_miss_size.load() > 0)
{
Expand Down Expand Up @@ -268,4 +269,37 @@ void ScanContext::initCurrentInstanceId(Poco::Util::AbstractConfiguration & conf
current_instance_id = getCurrentInstanceId(flash_server_addr, log);
LOG_INFO(log, "flash_server_addr={}, current_instance_id={}", flash_server_addr, current_instance_id);
}

std::optional<LACBytesCollector> ScanContext::newLACBytesCollector(ReadTag read_tag)
{
if (resource_group_name.empty())
return std::nullopt;
if (read_tag != ReadTag::Query && read_tag != ReadTag::LMFilter)
return std::nullopt;
return LACBytesCollector(resource_group_name);
}

void ScanContext::addUserReadBytes(
size_t bytes,
ReadTag read_tag,
std::optional<LACBytesCollector> & lac_bytes_collector)
{
if (read_tag != ReadTag::Query && read_tag != ReadTag::LMFilter && read_tag != ReadTag::MVCC)
return;
if (read_tag == ReadTag::MVCC)
{
mvcc_read_bytes += bytes;
if (mvcc_read_bytes_counter)
mvcc_read_bytes_counter->Increment(bytes);
}
else
{
query_read_bytes += bytes;
if (query_read_bytes_counter)
query_read_bytes_counter->Increment(bytes);
if (lac_bytes_collector)
lac_bytes_collector->collect(bytes);
}
}

} // namespace DB::DM
Loading