Skip to content

Commit

Permalink
*: Add table scan details logging; change default logging level to "i…
Browse files Browse the repository at this point in the history
…nfo" (#8616)

close #8563
  • Loading branch information
JaySon-Huang committed Dec 29, 2023
1 parent 4844f60 commit ce42814
Show file tree
Hide file tree
Showing 41 changed files with 359 additions and 107 deletions.
1 change: 0 additions & 1 deletion dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <Flash/Coprocessor/RemoteExecutionSummary.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <common/logger_useful.h>

#include <chrono>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/transformProfiles.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/KVStore/TMTContext.h>
#include <kvproto/disaggregated.pb.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -473,7 +474,7 @@ RU DAGContext::getReadRU() const
for (const auto & [id, sc] : scan_context_map)
{
(void)id; // Disable unused variable warnning.
read_bytes += sc->total_user_read_bytes;
read_bytes += sc->user_read_bytes;
}
return bytesToRU(read_bytes);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <Operators/OperatorProfileInfo.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <TiDB/Schema/TiDB.h>
namespace DB
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,15 @@ void DAGStorageInterpreter::prepare()
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();
scan_context->total_learner_read_ns += watch.elapsed();
scan_context->learner_read_ns += watch.elapsed();

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(context.getSettingsRef().schema_version);
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, may_need_add_cast_column) = getColumnsForTableScan();
scan_context->num_columns = required_columns.size();
}

void DAGStorageInterpreter::executeCastAfterTableScan(
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

#include <Flash/Coprocessor/ExecutionSummary.h>
#include <Flash/Statistics/BaseRuntimeStatistics.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB
{
ExecutionSummary::ExecutionSummary()
: scan_context(std::make_shared<DM::ScanContext>())
{}

void ExecutionSummary::merge(const ExecutionSummary & other)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <common/types.h>
#include <tipb/select.pb.h>

Expand All @@ -32,9 +32,9 @@ struct ExecutionSummary
UInt64 num_iterations = 0;
UInt64 concurrency = 0;

DM::ScanContextPtr scan_context = std::make_shared<DB::DM::ScanContext>();
DM::ScanContextPtr scan_context;

ExecutionSummary() = default;
ExecutionSummary();

void merge(const ExecutionSummary & other);
void merge(const tipb::ExecutorExecutionSummary & other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct MockWriter
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->total_create_snapshot_time_ns = 5;
summary.scan_context->create_snapshot_time_ns = 5;
summary.scan_context->total_local_region_num = 10;
summary.scan_context->total_remote_region_num = 5;
return summary;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Statistics/ExecutionSummaryHelper.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <Flash/Statistics/JoinImpl.h>
#include <Flash/Statistics/TableScanImpl.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Statistics/ExecutorStatisticsCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Common/Exception.h>
#include <Flash/Statistics/ExecutorStatisticsBase.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <tipb/executor.pb.h>
#include <tipb/select.pb.h>

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Statistics/TableScanImpl.h>
#include <Interpreters/Join.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB
{
Expand All @@ -25,10 +26,14 @@ String TableScanDetail::toJson() const

void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
{
auto scan_ctx_it = dag_context.scan_context_map.find(executor_id);
fmt_buffer.fmtAppend(
R"("connection_details":[{},{}])",
R"("connection_details":[{},{}],"scan_details":{})",
local_table_scan_detail.toJson(),
remote_table_scan_detail.toJson());
remote_table_scan_detail.toJson(),
scan_ctx_it != dag_context.scan_context_map.end() ? scan_ctx_it->second->toJson()
: "{}" // empty json object for nullptr
);
}

void TableScanStatistics::updateTableScanDetail(const std::vector<ConnectionProfileInfo> & connection_profile_infos)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Encryption/FileProvider.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Operators/UnorderedSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ class UnorderedSourceOp : public SourceOp
{
setHeader(AddExtraTableIDColumnTransformAction::buildHeader(columns_to_read_, extra_table_id_index_));
ref_no = task_pool->increaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
}

~UnorderedSourceOp() override
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no);
if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1)
{
LOG_INFO(
log,
"All unordered input streams are finished, pool_id={} last_stream_ref_no={}",
task_pool->pool_id,
ref_no);
}
}

String getName() const override { return "UnorderedSourceOp"; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ size_t ColumnFileSetReader::readRows(
if (row_ids == nullptr)
lac_bytes_collector.collect(delta_bytes);
if (likely(context.scan_context))
context.scan_context->total_user_read_bytes += delta_bytes;
context.scan_context->user_read_bytes += delta_bytes;
}

return actual_read;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>

namespace DB
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r

total_rows += rows;
passed_rows += rows;
if (scan_context)
{
scan_context->mvcc_input_rows += rows;
scan_context->mvcc_input_bytes += cur_raw_block.bytes();
scan_context->mvcc_output_rows += rows;
}

initNextBlock();

Expand Down Expand Up @@ -399,6 +405,12 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
++total_blocks;
total_rows += rows;
passed_rows += passed_count;
if (scan_context)
{
scan_context->mvcc_input_rows += rows;
scan_context->mvcc_input_bytes += cur_raw_block.bytes();
scan_context->mvcc_output_rows += passed_count;
}

// This block is empty after filter, continue to process next block
if (passed_count == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <DataStreams/SelectionByColumnIdTransformAction.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <common/logger_useful.h>

namespace DB
Expand Down Expand Up @@ -49,11 +50,13 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_,
const String & tracing_id = "")
const String & tracing_id = "",
const ScanContextPtr & scan_context_ = nullptr)
: version_limit(version_limit_)
, is_common_handle(is_common_handle_)
, header(toEmptyBlock(read_columns))
, select_by_colid_action(input->getHeader(), header)
, scan_context(scan_context_)
, log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME), tracing_id))
{
children.push_back(input);
Expand Down Expand Up @@ -270,6 +273,8 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream

SelectionByColumnIdTransformAction select_by_colid_action;

const ScanContextPtr scan_context;

const LoggerPtr log;
};
} // namespace DM
Expand Down
34 changes: 28 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,7 @@ BlockInputStreams DeltaMergeStore::read(
enable_read_thread,
final_num_stream,
dm_context->scan_context->resource_group_name);
dm_context->scan_context->read_mode = read_mode;

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -1235,7 +1236,11 @@ BlockInputStreams DeltaMergeStore::read(
}
res.push_back(stream);
}
LOG_DEBUG(tracing_logger, "Read create stream done");
LOG_INFO(
tracing_logger,
"Read create stream done, pool_id={} num_streams={}",
read_task_pool->pool_id,
final_num_stream);

return res;
}
Expand Down Expand Up @@ -1308,8 +1313,9 @@ void DeltaMergeStore::read(
enable_read_thread,
final_num_stream,
dm_context->scan_context->resource_group_name);
const auto & columns_after_cast = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
dm_context->scan_context->read_mode = read_mode;

const auto & columns_after_cast = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
if (enable_read_thread)
{
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -1350,7 +1356,11 @@ void DeltaMergeStore::read(
});
}

LOG_DEBUG(tracing_logger, "Read create PipelineExec done");
LOG_INFO(
tracing_logger,
"Read create PipelineExec done, pool_id={} num_streams={}",
read_task_pool->pool_id,
final_num_stream);
}

Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemoteReadSnapshot(
Expand All @@ -1370,10 +1380,14 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote
// could fetch the data segment by segment with these snapshots later.
// `try_split_task` is false because we need to ensure only one segment task
// for one segment.
SegmentReadTasks tasks
= getReadTasksByRanges(dm_context, sorted_ranges, num_streams, read_segments, /* try_split_task */ false);
SegmentReadTasks tasks = getReadTasksByRanges(
dm_context,
sorted_ranges,
num_streams,
read_segments,
/* try_split_task */ false);
GET_METRIC(tiflash_disaggregated_read_tasks_count).Increment(tasks.size());
LOG_DEBUG(tracing_logger, "Read create segment snapshot done");
LOG_INFO(tracing_logger, "Read create segment snapshot done");

return std::make_unique<Remote::DisaggPhysicalTableReadSnapshot>(
KeyspaceTableID{keyspace_id, physical_table_id},
Expand Down Expand Up @@ -2039,6 +2053,8 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
++seg_it;
}
}

// how many segments involved for the given key ranges
const auto tasks_before_split = tasks.size();
if (try_split_task)
{
Expand All @@ -2054,6 +2070,12 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
total_ranges += task->ranges.size();
}

if (dm_context->scan_context)
{
dm_context->scan_context->num_segments += tasks_before_split;
dm_context->scan_context->num_read_tasks += tasks.size();
}

auto tracing_logger = log->getChild(getLogTracingId(*dm_context));
LOG_INFO(
tracing_logger,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>


Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ size_t DMFileReader::skipNextBlock()
if (likely(read_rows > 0))
use_packs[next_pack_id - 1] = false;

scan_context->late_materialization_skip_rows += read_rows;
return read_rows;
}

Expand Down Expand Up @@ -438,6 +439,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
auto skip = std::distance(begin, it);
while (next_pack_id_cp < use_packs.size() && skip >= pack_stats[next_pack_id_cp].rows)
{
scan_context->late_materialization_skip_rows += pack_stats[next_pack_id_cp].rows;
use_packs[next_pack_id_cp] = false;
skip -= pack_stats[next_pack_id_cp].rows;
read_rows += pack_stats[next_pack_id_cp].rows;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <Storages/DeltaMerge/File/DMFilePackFilter.h>
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/MarkCache.h>

namespace DB
Expand Down Expand Up @@ -98,6 +98,8 @@ class DMFileReader
/// Return false if it is the end of stream.
bool getSkippedRows(size_t & skip_rows);

/// NOTE: skipNextBlock and readWithFilter are only used by late materialization.

/// Skip the packs to read next
/// Return the number of rows skipped.
/// Return 0 if it is the end of file.
Expand Down

0 comments on commit ce42814

Please sign in to comment.