Skip to content

Commit

Permalink
Agg auto spill refine (#8038)
Browse files Browse the repository at this point in the history
ref #7738
  • Loading branch information
windtalker committed Aug 31, 2023
1 parent d114bd4 commit 352100d
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 11 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Expand Up @@ -43,7 +43,7 @@ Block AggregatingBlockInputStream::readImpl()
/// no new spill can be triggered anymore
aggregator.getAggSpillContext()->finishSpillableStage();

if (!aggregator.hasSpilledData() && !aggregator.getAggSpillContext()->needFinalSpill(0))
if (!aggregator.hasSpilledData() && !aggregator.getAggSpillContext()->isThreadMarkedForAutoSpill(0))
{
ManyAggregatedDataVariants many_data{data_variants};
auto merging_buckets = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
Expand Down
Expand Up @@ -178,7 +178,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
bool need_final_spill = false;
for (size_t i = 0; i < parent.many_data.size(); ++i)
{
if (parent.aggregator.getAggSpillContext()->needFinalSpill(i))
if (parent.aggregator.getAggSpillContext()->isThreadMarkedForAutoSpill(i))
{
/// corner case, auto spill is triggered at the last time
need_final_spill = true;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Expand Up @@ -81,7 +81,7 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec
bool need_final_spill = false;
for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i)
{
if (aggregate_context->getAggSpillContext()->needFinalSpill(i))
if (aggregate_context->getAggSpillContext()->isThreadMarkedForAutoSpill(i))
{
need_final_spill = true;
break;
Expand Down
42 changes: 39 additions & 3 deletions dbms/src/Interpreters/AggSpillContext.cpp
Expand Up @@ -77,11 +77,47 @@ Int64 AggSpillContext::getTotalRevocableMemoryImpl()

Int64 AggSpillContext::triggerSpillImpl(Int64 expected_released_memories)
{
for (size_t i = 0; i < per_thread_revocable_memories.size() && expected_released_memories > 0; ++i)
size_t checked_thread = 0;
for (; checked_thread < per_thread_revocable_memories.size(); ++checked_thread)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
per_thread_auto_spill_status[i].compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - per_thread_revocable_memories[i], 0);
if (per_thread_auto_spill_status[checked_thread].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
{
LOG_DEBUG(
log,
"Mark thread {} to spill, expect to release {} bytes",
checked_thread,
per_thread_revocable_memories[checked_thread]);
}
expected_released_memories
= std::max(expected_released_memories - per_thread_revocable_memories[checked_thread], 0);
if (expected_released_memories == 0)
break;
}
if (spill_config.max_cached_data_bytes_in_spiller > 0)
{
auto spill_threshold = static_cast<Int64>(spill_config.max_cached_data_bytes_in_spiller);
for (size_t i = checked_thread + 1; i < per_thread_revocable_memories.size(); ++i)
{
/// unlike sort and hash join, the implementation of current agg spill does not support partial spill, that is to say,
/// once agg spill is triggered, all the data will be spilled in the end, so here to spill the data if memory usage is large enough
if (per_thread_revocable_memories[i] >= spill_threshold)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[i].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
{
LOG_DEBUG(
log,
"Mark thread {} to spill, expect to release {} bytes",
i,
per_thread_revocable_memories[i]);
}
}
}
}
return expected_released_memories;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/AggSpillContext.h
Expand Up @@ -43,7 +43,7 @@ class AggSpillContext final : public OperatorSpillContext
Int64 triggerSpillImpl(Int64 expected_released_memories) override;
bool supportAutoTriggerSpill() const override { return true; }
void finishOneSpill(size_t thread_num);
bool needFinalSpill(size_t thread_num) const
bool isThreadMarkedForAutoSpill(size_t thread_num) const
{
return per_thread_auto_spill_status[thread_num] != AutoSpillStatus::NO_NEED_AUTO_SPILL;
}
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Interpreters/Aggregator.cpp
Expand Up @@ -79,7 +79,6 @@ bool AggregatedDataVariants::tryMarkNeedSpill()
convertToTwoLevel();
}
need_spill = true;
aggregator->agg_spill_context->markSpilled();
return true;
}

Expand Down Expand Up @@ -891,7 +890,9 @@ bool Aggregator::executeOnBlock(

/** Flush data to disk if too much RAM is consumed.
*/
if (agg_spill_context->updatePerThreadRevocableMemory(result.revocableBytes(), thread_num))
auto revocable_bytes = result.revocableBytes();
LOG_TRACE(log, "Revocable bytes after insert one block {}, thread {}", revocable_bytes, thread_num);
if (agg_spill_context->updatePerThreadRevocableMemory(revocable_bytes, thread_num))
{
result.tryMarkNeedSpill();
}
Expand Down Expand Up @@ -922,6 +923,7 @@ void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_dat
void Aggregator::spill(AggregatedDataVariants & data_variants, size_t thread_num)
{
assert(data_variants.need_spill);
agg_spill_context->markSpilled();
/// Flush only two-level data and possibly overflow data.
#define M(NAME) \
case AggregationMethodType(NAME): \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Expand Up @@ -309,7 +309,7 @@ struct Settings
M(SettingUInt64, async_recv_version, 2, "1: reactor mode, 2: no additional threads") \
M(SettingUInt64, recv_queue_size, 0, "size of ExchangeReceiver queue, 0 means the size is set to data_source_mpp_task_num * 50") \
M(SettingUInt64, shallow_copy_cross_probe_threshold, 0, "minimum right rows to use shallow copy probe mode for cross join, default is max(1, max_block_size/10)") \
M(SettingInt64, max_buffered_bytes_in_executor, 100LL * 1024 * 1024, "The max buffered size in each executor, 0 mean unlimited, use 200MB as the default value") \
M(SettingInt64, max_buffered_bytes_in_executor, 100LL * 1024 * 1024, "The max buffered size in each executor, 0 mean unlimited, use 100MB as the default value") \
M(SettingUInt64, ddl_sync_interval_seconds, 60, "The interval of background DDL sync schema in seconds") \
M(SettingUInt64, ddl_restart_wait_seconds, 180, "The wait time for sync schema in seconds when restart") \
M(SettingDouble, auto_memory_revoke_trigger_threshold, 0.0, "Trigger auto memory revocation when the memory usage is above this percentage.") \
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Interpreters/tests/gtest_operator_spill_context.cpp
Expand Up @@ -91,6 +91,34 @@ try
}
CATCH

TEST_F(TestOperatorSpillContext, AggAutoTriggerSpill)
try
{
auto agg_spill_config = *spill_config_ptr;
/// unlimited cache bytes
std::vector<Int64> max_cached_bytes
= {0, OperatorSpillContext::MIN_SPILL_THRESHOLD + 1, OperatorSpillContext::MIN_SPILL_THRESHOLD - 1};
std::vector<size_t> expected_spill_threads = {5, 5, 10};
size_t threads = 10;
for (size_t index = 0; index < max_cached_bytes.size(); ++index)
{
agg_spill_config.max_cached_data_bytes_in_spiller = max_cached_bytes[index];
auto spill_context = std::make_shared<AggSpillContext>(threads, agg_spill_config, 0, logger);
spill_context->setAutoSpillMode();
for (size_t i = 0; i < threads; ++i)
spill_context->updatePerThreadRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD, i);
ASSERT_TRUE(spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 5) == 0);
size_t spilled_partition = 0;
for (size_t i = 0; i < threads; ++i)
{
if (spill_context->isThreadMarkedForAutoSpill(i))
spilled_partition++;
}
ASSERT_TRUE(spilled_partition == expected_spill_threads[index]);
}
}
CATCH

TEST_F(TestOperatorSpillContext, SortMarkSpill)
try
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/LocalAggregateTransform.cpp
Expand Up @@ -51,7 +51,7 @@ OperatorStatus LocalAggregateTransform::transformImpl(Block & block)
if unlikely (!block)
{
agg_context.getAggSpillContext()->finishSpillableStage();
return agg_context.hasSpilledData() || agg_context.getAggSpillContext()->needFinalSpill(0)
return agg_context.hasSpilledData() || agg_context.getAggSpillContext()->isThreadMarkedForAutoSpill(0)
? fromBuildToFinalSpillOrRestore()
: fromBuildToConvergent(block);
}
Expand Down

0 comments on commit 352100d

Please sign in to comment.