Skip to content

Commit

Permalink
Add tidb executor id in Join's log (#7964)
Browse files Browse the repository at this point in the history
ref #7738
  • Loading branch information
windtalker committed Aug 18, 2023
1 parent 42d357b commit 8cb548e
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 72 deletions.
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Expand Up @@ -346,6 +346,7 @@ void DAGQueryBlockInterpreter::handleJoin(
match_helper_name,
flag_mapped_entry_helper_name,
0,
0,
context.isTest());

recordJoinExecuteInfo(tiflash_join.build_side_index, join_ptr);
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Expand Up @@ -35,7 +35,7 @@ PhysicalPlanNode::PhysicalPlanNode(
, type(type_)
, schema(schema_)
, fine_grained_shuffle(fine_grained_shuffle_)
, log(Logger::get(req_id, type_.toString(), executor_id_))
, log(Logger::get(fmt::format("{}_{}_{}", req_id, type_.toString(), executor_id_)))
{}

String PhysicalPlanNode::toString()
Expand Down Expand Up @@ -87,7 +87,11 @@ void PhysicalPlanNode::buildBlockInputStream(DAGPipeline & pipeline, Context & c
if (is_restore_concurrency)
{
context.getDAGContext()->updateFinalConcurrency(pipeline.streams.size(), max_streams);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, context.getSettingsRef().max_buffered_bytes_in_executor, log);
restoreConcurrency(
pipeline,
context.getDAGContext()->final_concurrency,
context.getSettingsRef().max_buffered_bytes_in_executor,
log);
}
}

Expand All @@ -102,7 +106,10 @@ void PhysicalPlanNode::buildPipelineExecGroup(
context.getDAGContext()->addOperatorProfileInfos(executor_id, group_builder.getCurProfileInfos());
}

void PhysicalPlanNode::buildPipeline(PipelineBuilder & builder, Context & context, PipelineExecutorContext & exec_context)
void PhysicalPlanNode::buildPipeline(
PipelineBuilder & builder,
Context & context,
PipelineExecutorContext & exec_context)
{
RUNTIME_CHECK(childrenSize() <= 1);
if (childrenSize() == 1)
Expand Down
121 changes: 85 additions & 36 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Expand Up @@ -83,7 +83,8 @@ PhysicalPlanNodePtr PhysicalJoin::build(
const Block & build_side_header = build_plan->getSampleBlock();

String match_helper_name = tiflash_join.genMatchHelperName(left_input_header, right_input_header);
NamesAndTypes join_output_schema = tiflash_join.genJoinOutputColumns(left_input_header, right_input_header, match_helper_name);
NamesAndTypes join_output_schema
= tiflash_join.genJoinOutputColumns(left_input_header, right_input_header, match_helper_name);
auto & dag_context = *context.getDAGContext();

/// add necessary transformation if the join key is an expression
Expand All @@ -92,41 +93,70 @@ PhysicalPlanNodePtr PhysicalJoin::build(

JoinNonEqualConditions join_non_equal_conditions;
// prepare probe side
auto [probe_side_prepare_actions, probe_key_names, original_probe_key_names, probe_filter_column_name] = JoinInterpreterHelper::prepareJoin(
context,
probe_side_header,
tiflash_join.getProbeJoinKeys(),
tiflash_join.join_key_types,
/*left=*/true,
is_tiflash_right_join,
tiflash_join.getProbeConditions());
auto [probe_side_prepare_actions, probe_key_names, original_probe_key_names, probe_filter_column_name]
= JoinInterpreterHelper::prepareJoin(
context,
probe_side_header,
tiflash_join.getProbeJoinKeys(),
tiflash_join.join_key_types,
/*left=*/true,
is_tiflash_right_join,
tiflash_join.getProbeConditions());
RUNTIME_ASSERT(probe_side_prepare_actions, log, "probe_side_prepare_actions cannot be nullptr");
/// in TiFlash, left side is always the probe side
join_non_equal_conditions.left_filter_column = std::move(probe_filter_column_name);

// prepare build side
auto [build_side_prepare_actions, build_key_names, original_build_key_names, build_filter_column_name] = JoinInterpreterHelper::prepareJoin(
context,
build_side_header,
tiflash_join.getBuildJoinKeys(),
tiflash_join.join_key_types,
/*left=*/false,
is_tiflash_right_join,
tiflash_join.getBuildConditions());
auto [build_side_prepare_actions, build_key_names, original_build_key_names, build_filter_column_name]
= JoinInterpreterHelper::prepareJoin(
context,
build_side_header,
tiflash_join.getBuildJoinKeys(),
tiflash_join.join_key_types,
/*left=*/false,
is_tiflash_right_join,
tiflash_join.getBuildConditions());
RUNTIME_ASSERT(build_side_prepare_actions, log, "build_side_prepare_actions cannot be nullptr");
/// in TiFlash, right side is always the build side
join_non_equal_conditions.right_filter_column = std::move(build_filter_column_name);

tiflash_join.fillJoinOtherConditionsAction(context, left_input_header, right_input_header, probe_side_prepare_actions, original_probe_key_names, original_build_key_names, join_non_equal_conditions);
tiflash_join.fillJoinOtherConditionsAction(
context,
left_input_header,
right_input_header,
probe_side_prepare_actions,
original_probe_key_names,
original_build_key_names,
join_non_equal_conditions);

const Settings & settings = context.getSettingsRef();
size_t max_bytes_before_external_join = settings.max_bytes_before_external_join;
SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size);
SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size);
auto join_req_id = fmt::format("{}_{}", log->identifier(), executor_id);
SpillConfig build_spill_config(
context.getTemporaryPath(),
fmt::format("{}_0_build", join_req_id),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
context.getFileProvider(),
settings.max_threads,
settings.max_block_size);
SpillConfig probe_spill_config(
context.getTemporaryPath(),
fmt::format("{}_0_probe", join_req_id),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
context.getFileProvider(),
settings.max_threads,
settings.max_block_size);
size_t max_block_size = settings.max_block_size;
fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size = 1; });

String flag_mapped_entry_helper_name = tiflash_join.genFlagMappedEntryHelperName(left_input_header, right_input_header, join_non_equal_conditions.other_cond_expr != nullptr);
String flag_mapped_entry_helper_name = tiflash_join.genFlagMappedEntryHelperName(
left_input_header,
right_input_header,
join_non_equal_conditions.other_cond_expr != nullptr);
Names join_output_column_names;
for (const auto & col : join_output_schema)
join_output_column_names.emplace_back(col.name);
Expand All @@ -140,7 +170,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
build_key_names,
tiflash_join.kind,
tiflash_join.strictness,
log->identifier(),
join_req_id,
fine_grained_shuffle.enable(),
fine_grained_shuffle.stream_count,
max_bytes_before_external_join,
Expand All @@ -155,6 +185,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
match_helper_name,
flag_mapped_entry_helper_name,
0,
0,
context.isTest(),
runtime_filter_list);

Expand All @@ -178,15 +209,26 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
{
const auto & settings = context.getSettingsRef();
/// probe side streams
executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side");
executeExpression(
probe_pipeline,
probe_side_prepare_actions,
log,
"append join key and join filters for probe side");
/// add join input stream
String join_probe_extra_info = fmt::format("join probe, join_executor_id = {}, scan_hash_map_after_probe = {}", execId(), needScanHashMapAfterProbe(join_ptr->getKind()));
join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(),
probe_pipeline.streams.size());
String join_probe_extra_info = fmt::format(
"join probe, join_executor_id = {}, scan_hash_map_after_probe = {}",
execId(),
needScanHashMapAfterProbe(join_ptr->getKind()));
join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(), probe_pipeline.streams.size());
size_t probe_index = 0;
for (auto & stream : probe_pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, probe_index++, log->identifier(), settings.max_block_size);
stream = std::make_shared<HashJoinProbeBlockInputStream>(
stream,
join_ptr,
probe_index++,
log->identifier(),
settings.max_block_size);
stream->setExtraInfo(join_probe_extra_info);
}
}
Expand All @@ -197,7 +239,11 @@ void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & co
size_t join_build_concurrency = build_pipeline.streams.size();

/// build side streams
executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side");
executeExpression(
build_pipeline,
build_side_prepare_actions,
log,
"append join key and join filters for build side");
// add a HashJoinBuildBlockInputStream to build a shared hash table
String join_build_extra_info = fmt::format("join build, build_side_root_executor_id = {}", build()->execId());
if (fine_grained_shuffle.enable())
Expand All @@ -207,20 +253,26 @@ void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & co
size_t build_index = 0;
for (auto & stream : streams)
{
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, build_index++, log->identifier());
stream
= std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, build_index++, log->identifier());
stream->setExtraInfo(join_build_extra_info);
join_execute_info.join_build_streams.push_back(stream);
}
};
build_streams(build_pipeline.streams);
// for test, join executor need the return blocks to output.
executeUnion(build_pipeline, max_streams, context.getSettingsRef().max_buffered_bytes_in_executor, log, /*ignore_block=*/!context.isTest(), "for join");
executeUnion(
build_pipeline,
max_streams,
context.getSettingsRef().max_buffered_bytes_in_executor,
log,
/*ignore_block=*/!context.isTest(),
"for join");

SubqueryForSet build_query;
build_query.source = build_pipeline.firstStream();
build_query.join = join_ptr;
join_ptr->initBuild(build_query.source->getHeader(),
join_build_concurrency);
join_ptr->initBuild(build_query.source->getHeader(), join_build_concurrency);
dag_context.addSubquery(execId(), std::move(build_query));
}

Expand All @@ -240,10 +292,7 @@ void PhysicalJoin::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & c
}
}

void PhysicalJoin::buildPipeline(
PipelineBuilder & builder,
Context & context,
PipelineExecutorContext & exec_context)
void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, PipelineExecutorContext & exec_context)
{
// Break the pipeline for join build.
auto join_build = std::make_shared<PhysicalJoinBuild>(
Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Interpreters/Join.cpp
Expand Up @@ -141,6 +141,7 @@ Join::Join(
const String & match_helper_name_,
const String & flag_mapped_entry_helper_name_,
size_t restore_round_,
size_t restore_part,
bool is_test_,
const std::vector<RuntimeFilterPtr> & runtime_filter_list_)
: restore_round(restore_round_)
Expand All @@ -149,6 +150,7 @@ Join::Join(
, kind(kind_)
, strictness(strictness_)
, original_strictness(strictness)
, join_req_id(req_id)
, may_probe_side_expanded_after_join(mayProbeSideExpandedAfterJoin(kind, strictness))
, key_names_left(key_names_left_)
, key_names_right(key_names_right_)
Expand All @@ -166,7 +168,9 @@ Join::Join(
: std::max(1, max_block_size / 10))
, tidb_output_column_names(tidb_output_column_names_)
, is_test(is_test_)
, log(Logger::get(req_id))
, log(Logger::get(
restore_round == 0 ? join_req_id
: fmt::format("{}_round_{}_part_{}", join_req_id, restore_round, restore_part)))
, enable_fine_grained_shuffle(enable_fine_grained_shuffle_)
, fine_grained_shuffle_count(fine_grained_shuffle_count_)
{
Expand Down Expand Up @@ -348,21 +352,19 @@ void Join::setSampleBlock(const Block & block)
sample_block_with_columns_to_add.insert(ColumnWithTypeAndName(Join::match_helper_type, match_helper_name));
}

std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_join_)
std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_join_, size_t restore_partition_id)
{
return std::make_shared<Join>(
key_names_left,
key_names_right,
kind,
original_strictness,
log->identifier(),
join_req_id,
false,
0,
max_bytes_before_external_join_,
hash_join_spill_context->createBuildSpillConfig(
fmt::format("{}_hash_join_{}_build", log->identifier(), restore_round + 1)),
hash_join_spill_context->createProbeSpillConfig(
fmt::format("{}_hash_join_{}_probe", log->identifier(), restore_round + 1)),
hash_join_spill_context->createBuildSpillConfig(fmt::format("{}_{}_build", join_req_id, restore_round + 1)),
hash_join_spill_context->createProbeSpillConfig(fmt::format("{}_{}_probe", join_req_id, restore_round + 1)),
join_restore_concurrency,
tidb_output_column_names,
collators,
Expand All @@ -372,6 +374,7 @@ std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_j
match_helper_name,
flag_mapped_entry_helper_name,
restore_round + 1,
restore_partition_id,
is_test);
}

Expand Down Expand Up @@ -2183,7 +2186,7 @@ std::optional<RestoreInfo> Join::getOneRestoreStream(size_t max_block_size_)
auto new_max_bytes_before_external_join = static_cast<size_t>(
hash_join_spill_context->getOperatorSpillThreshold()
* (static_cast<double>(restore_join_build_concurrency) / build_concurrency));
restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join));
restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join), spilled_partition_index);
restore_join->initBuild(build_sample_block, restore_join_build_concurrency);
restore_join->setInitActiveBuildThreads();
restore_join->initProbe(probe_sample_block, restore_join_build_concurrency);
Expand Down

0 comments on commit 8cb548e

Please sign in to comment.