Skip to content

Commit

Permalink
Refine auto spill for fine grained executor (#8045)
Browse files Browse the repository at this point in the history
ref #7738
  • Loading branch information
windtalker committed Sep 4, 2023
1 parent 2ae9bc0 commit 569b567
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 25 deletions.
59 changes: 59 additions & 0 deletions dbms/src/Core/FineGrainedOperatorSpillContext.cpp
@@ -0,0 +1,59 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/FineGrainedOperatorSpillContext.h>

namespace DB
{
bool FineGrainedOperatorSpillContext::supportFurtherSpill() const
{
for (const auto & operator_spill_context : operator_spill_contexts)
{
if (operator_spill_context->supportFurtherSpill())
return true;
}
return false;
}

void FineGrainedOperatorSpillContext::addOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
/// fine grained operator spill context only used in auto spill
if likely (operator_spill_context->supportSpill() && operator_spill_context->supportAutoTriggerSpill())
{
operator_spill_contexts.push_back(operator_spill_context);
operator_spill_context->setAutoSpillMode();
}
}

Int64 FineGrainedOperatorSpillContext::getTotalRevocableMemoryImpl()
{
Int64 ret = 0;
for (auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}

Int64 FineGrainedOperatorSpillContext::triggerSpillImpl(Int64 expected_released_memories)
{
Int64 original_expected_released_memories = expected_released_memories;
for (auto & operator_spill_context : operator_spill_contexts)
{
if (expected_released_memories <= 0)
operator_spill_context->triggerSpill(original_expected_released_memories);
else
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
}
return expected_released_memories;
}
} // namespace DB
52 changes: 52 additions & 0 deletions dbms/src/Core/FineGrainedOperatorSpillContext.h
@@ -0,0 +1,52 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/OperatorSpillContext.h>

namespace DB
{
/// FineGrainedOperatorSpillContext is a wrap for all the operator spill contexts that belongs to a same executor
/// with fine grained shuffle enable. When fine grained shuffle is enabled, executors like sort and aggregation
/// will have n independent operators in TiFlash side, each of them have their own operator spill context. However
/// these n operators are not really independent: they share the same source operator(exchange receiver), if any one
/// of the operator stop consuming data from exchange receiver(for example, begin spill data), all the others will
/// be stuck because the exchange receiver has a bounded queue for all the operators, any one of the operator stop
/// consuming will make the queue full, and no other data can be pushed to exchange receiver. If all the n operator
/// is triggered to spill serially, it will affects the overall performance seriously.
/// FineGrainedOperatorSpillContext is used to make sure that all the operators belongs to the same executor
/// will be triggered to spill almost at the same time
class FineGrainedOperatorSpillContext final : public OperatorSpillContext
{
private:
std::vector<OperatorSpillContextPtr> operator_spill_contexts;

protected:
Int64 getTotalRevocableMemoryImpl() override;

public:
FineGrainedOperatorSpillContext(const String op_name, const LoggerPtr & log)
: OperatorSpillContext(0, op_name, log)
{
auto_spill_mode = true;
}
bool supportFurtherSpill() const override;
bool supportAutoTriggerSpill() const override { return true; }
Int64 triggerSpillImpl(Int64 expected_released_memories) override;
void addOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context);
/// only for test
size_t getOperatorSpillCount() const { return operator_spill_contexts.size(); }
};
} // namespace DB
13 changes: 13 additions & 0 deletions dbms/src/Core/OperatorSpillContext.cpp
Expand Up @@ -21,10 +21,12 @@ bool OperatorSpillContext::isSpillEnabled() const
{
return enable_spill && (auto_spill_mode || operator_spill_threshold > 0);
}

bool OperatorSpillContext::supportSpill() const
{
return enable_spill && (supportAutoTriggerSpill() || operator_spill_threshold > 0);
}

Int64 OperatorSpillContext::getTotalRevocableMemory()
{
assert(isSpillEnabled());
Expand All @@ -33,6 +35,7 @@ Int64 OperatorSpillContext::getTotalRevocableMemory()
else
return 0;
}

void OperatorSpillContext::markSpilled()
{
bool init_value = false;
Expand All @@ -41,11 +44,13 @@ void OperatorSpillContext::markSpilled()
LOG_INFO(log, "Begin spill in {}", op_name);
}
}

void OperatorSpillContext::finishSpillableStage()
{
LOG_INFO(log, "Operator finish spill stage");
in_spillable_stage = false;
}

Int64 OperatorSpillContext::triggerSpill(Int64 expected_released_memories)
{
assert(isSpillEnabled());
Expand All @@ -56,4 +61,12 @@ Int64 OperatorSpillContext::triggerSpill(Int64 expected_released_memories)
return triggerSpillImpl(expected_released_memories);
return expected_released_memories;
}

void OperatorSpillContext::setAutoSpillMode()
{
RUNTIME_CHECK_MSG(supportAutoTriggerSpill(), "Only operator that support auto spill can be set in auto spill mode");
auto_spill_mode = true;
/// once auto spill is enabled, operator_spill_threshold will be ignored
operator_spill_threshold = 0;
}
} // namespace DB
7 changes: 1 addition & 6 deletions dbms/src/Core/OperatorSpillContext.h
Expand Up @@ -57,12 +57,7 @@ class OperatorSpillContext
void disableSpill() { enable_spill = false; }
virtual bool supportFurtherSpill() const { return in_spillable_stage; }
bool isInAutoSpillMode() const { return auto_spill_mode; }
void setAutoSpillMode()
{
auto_spill_mode = true;
/// once auto spill is enabled, operator_spill_threshold will be ignored
operator_spill_threshold = 0;
}
void setAutoSpillMode();
Int64 getTotalRevocableMemory();
UInt64 getOperatorSpillThreshold() const { return operator_spill_threshold; }
void markSpilled();
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Expand Up @@ -15,6 +15,7 @@
#include <Common/FailPoint.h>
#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/FineGrainedOperatorSpillContext.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ExchangeSenderBlockInputStream.h>
Expand Down Expand Up @@ -519,6 +520,10 @@ void DAGQueryBlockInterpreter::executeAggregation(

if (enable_fine_grained_shuffle)
{
std::shared_ptr<FineGrainedOperatorSpillContext> fine_grained_spill_context;
if (context.getDAGContext() != nullptr && context.getDAGContext()->isInAutoSpillMode()
&& pipeline.hasMoreThanOneStream())
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("aggregation", log);
/// Go straight forward without merging phase when enable_fine_grained_shuffle
pipeline.transform([&](auto & stream) {
stream = std::make_shared<AggregatingBlockInputStream>(
Expand All @@ -527,13 +532,17 @@ void DAGQueryBlockInterpreter::executeAggregation(
true,
log->identifier(),
[&](const OperatorSpillContextPtr & operator_spill_context) {
if (context.getDAGContext() != nullptr)
if (fine_grained_spill_context != nullptr)
fine_grained_spill_context->addOperatorSpillContext(operator_spill_context);
else if (context.getDAGContext() != nullptr)
{
context.getDAGContext()->registerOperatorSpillContext(operator_spill_context);
}
});
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
if (fine_grained_spill_context != nullptr)
context.getDAGContext()->registerOperatorSpillContext(fine_grained_spill_context);
recordProfileStreams(pipeline, query_block.aggregation_name);
}
else if (pipeline.streams.size() > 1)
Expand Down
29 changes: 22 additions & 7 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <Core/FineGrainedOperatorSpillContext.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
Expand Down Expand Up @@ -193,6 +194,10 @@ void orderStreams(

if (enable_fine_grained_shuffle)
{
std::shared_ptr<FineGrainedOperatorSpillContext> fine_grained_spill_context;
if (context.getDAGContext() != nullptr && context.getDAGContext()->isInAutoSpillMode()
&& pipeline.hasMoreThanOneStream())
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("sort", log);
pipeline.transform([&](auto & stream) {
stream = std::make_shared<MergeSortingBlockInputStream>(
stream,
Expand All @@ -209,13 +214,15 @@ void orderStreams(
context.getFileProvider()),
log->identifier(),
[&](const OperatorSpillContextPtr & operator_spill_context) {
if (context.getDAGContext() != nullptr)
{
if (fine_grained_spill_context != nullptr)
fine_grained_spill_context->addOperatorSpillContext(operator_spill_context);
else if (context.getDAGContext() != nullptr)
context.getDAGContext()->registerOperatorSpillContext(operator_spill_context);
}
});
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
if (fine_grained_spill_context != nullptr)
context.getDAGContext()->registerOperatorSpillContext(fine_grained_spill_context);
}
else
{
Expand Down Expand Up @@ -252,6 +259,7 @@ void executeLocalSort(
PipelineExecGroupBuilder & group_builder,
const SortDescription & order_descr,
std::optional<size_t> limit,
bool for_fine_grained_executor,
const Context & context,
const LoggerPtr & log)
{
Expand Down Expand Up @@ -281,6 +289,10 @@ void executeLocalSort(
const Settings & settings = context.getSettingsRef();
size_t max_bytes_before_external_sort
= getAverageThreshold(settings.max_bytes_before_external_sort, group_builder.concurrency());
std::shared_ptr<FineGrainedOperatorSpillContext> fine_grained_spill_context;
if (for_fine_grained_executor && context.getDAGContext() != nullptr
&& context.getDAGContext()->isInAutoSpillMode() && group_builder.concurrency() > 1)
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("sort", log);
SpillConfig spill_config{
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
Expand All @@ -296,8 +308,11 @@ void executeLocalSort(
limit.value_or(0), // 0 means that no limit in MergeSortTransformOp.
settings.max_block_size,
max_bytes_before_external_sort,
spill_config));
spill_config,
fine_grained_spill_context));
});
if (fine_grained_spill_context != nullptr)
exec_context.registerOperatorSpillContext(fine_grained_spill_context);
}
}

Expand Down Expand Up @@ -336,7 +351,6 @@ void executeFinalSort(
const Settings & settings = context.getSettingsRef();
executeUnion(exec_context, group_builder, settings.max_buffered_bytes_in_executor, log);

size_t max_bytes_before_external_sort = getAverageThreshold(settings.max_bytes_before_external_sort, 1);
SpillConfig spill_config{
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
Expand All @@ -351,8 +365,9 @@ void executeFinalSort(
order_descr,
limit.value_or(0), // 0 means that no limit in MergeSortTransformOp.
settings.max_block_size,
max_bytes_before_external_sort,
spill_config));
settings.max_bytes_before_external_sort,
spill_config,
nullptr));
});
}
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Expand Up @@ -82,6 +82,7 @@ void executeLocalSort(
PipelineExecGroupBuilder & group_builder,
const SortDescription & order_descr,
std::optional<size_t> limit,
bool for_fine_grained_executor,
const Context & context,
const LoggerPtr & log);

Expand Down
24 changes: 20 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Expand Up @@ -14,6 +14,7 @@

#include <Common/Logger.h>
#include <Common/TiFlashException.h>
#include <Core/FineGrainedOperatorSpillContext.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
Expand Down Expand Up @@ -119,6 +120,10 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont

if (fine_grained_shuffle.enable())
{
std::shared_ptr<FineGrainedOperatorSpillContext> fine_grained_spill_context;
if (context.getDAGContext() != nullptr && context.getDAGContext()->isInAutoSpillMode()
&& pipeline.hasMoreThanOneStream())
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("aggregation", log);
/// For fine_grained_shuffle, just do aggregation in streams independently
pipeline.transform([&](auto & stream) {
stream = std::make_shared<AggregatingBlockInputStream>(
Expand All @@ -127,13 +132,15 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
true,
log->identifier(),
[&](const OperatorSpillContextPtr & operator_spill_context) {
if (context.getDAGContext() != nullptr)
{
if (fine_grained_spill_context != nullptr)
fine_grained_spill_context->addOperatorSpillContext(operator_spill_context);
else if (context.getDAGContext() != nullptr)
context.getDAGContext()->registerOperatorSpillContext(operator_spill_context);
}
});
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
if (fine_grained_spill_context != nullptr)
context.getDAGContext()->registerOperatorSpillContext(fine_grained_spill_context);
}
else if (pipeline.streams.size() > 1)
{
Expand Down Expand Up @@ -203,6 +210,9 @@ void PhysicalAggregation::buildPipelineExecGroupImpl(

Block before_agg_header = group_builder.getCurrentHeader();
size_t concurrency = group_builder.concurrency();
std::shared_ptr<FineGrainedOperatorSpillContext> fine_grained_spill_context;
if (context.getDAGContext() != nullptr && context.getDAGContext()->isInAutoSpillMode() && concurrency > 1)
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("aggregation", log);
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
Expand All @@ -224,8 +234,14 @@ void PhysicalAggregation::buildPipelineExecGroupImpl(
is_final_agg,
spill_config);
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<LocalAggregateTransform>(exec_context, log->identifier(), params));
builder.appendTransformOp(std::make_unique<LocalAggregateTransform>(
exec_context,
log->identifier(),
params,
fine_grained_spill_context));
});
if (fine_grained_spill_context != nullptr)
context.getDAGContext()->registerOperatorSpillContext(fine_grained_spill_context);

executeExpression(exec_context, group_builder, expr_after_agg, log);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp
Expand Up @@ -79,7 +79,7 @@ void PhysicalTopN::buildPipelineExecGroupImpl(
// TODO find a suitable threshold is necessary; 10000 is just a value picked without much consideration.
if (group_builder.concurrency() * limit <= 10000)
{
executeLocalSort(exec_context, group_builder, order_descr, limit, context, log);
executeLocalSort(exec_context, group_builder, order_descr, limit, false, context, log);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalWindowSort.cpp
Expand Up @@ -65,7 +65,7 @@ void PhysicalWindowSort::buildPipelineExecGroupImpl(
size_t /*concurrency*/)
{
if (fine_grained_shuffle.enable())
executeLocalSort(exec_context, group_builder, order_descr, {}, context, log);
executeLocalSort(exec_context, group_builder, order_descr, {}, true, context, log);
else
executeFinalSort(exec_context, group_builder, order_descr, {}, context, log);
}
Expand Down

0 comments on commit 569b567

Please sign in to comment.