From e829c31ba0ce7e5c6d8e29c1769201d186821bf7 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 12 Sep 2023 09:30:16 +0800 Subject: [PATCH 1/6] support agg resize callback Signed-off-by: xufei --- dbms/src/Common/HashTable/TwoLevelHashTable.h | 6 + .../HashTable/TwoLevelStringHashTable.h | 6 + dbms/src/Core/CachedSpillHandler.cpp | 2 + dbms/src/Interpreters/Aggregator.cpp | 124 ++++++++++++++---- dbms/src/Interpreters/Aggregator.h | 2 + 5 files changed, 111 insertions(+), 29 deletions(-) diff --git a/dbms/src/Common/HashTable/TwoLevelHashTable.h b/dbms/src/Common/HashTable/TwoLevelHashTable.h index 6fbca128d38..6778cd4a3e8 100644 --- a/dbms/src/Common/HashTable/TwoLevelHashTable.h +++ b/dbms/src/Common/HashTable/TwoLevelHashTable.h @@ -65,6 +65,12 @@ class TwoLevelHashTable : private boost::noncopyable /// NOTE Bad for hash tables with more than 2^32 cells. static size_t getBucketFromHash(size_t hash_value) { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; } + void setResizeCallback(const ResizeCallback & resize_callback) + { + for (auto & impl : impls) + impl.setResizeCallback(resize_callback); + } + protected: typename Impl::iterator beginOfNextNonEmptyBucket(size_t & bucket) { diff --git a/dbms/src/Common/HashTable/TwoLevelStringHashTable.h b/dbms/src/Common/HashTable/TwoLevelStringHashTable.h index 1903716282d..5bdb24a3d13 100644 --- a/dbms/src/Common/HashTable/TwoLevelStringHashTable.h +++ b/dbms/src/Common/HashTable/TwoLevelStringHashTable.h @@ -38,6 +38,12 @@ class TwoLevelStringHashTable : private boost::noncopyable }); } + void setResizeCallback(const ResizeCallback & resize_callback) + { + for (auto & impl : impls) + impl.setResizeCallback(resize_callback); + } + size_t operator()(const Key & x) const { return hash(x); } /// NOTE Bad for hash tables with more than 2^32 cells. diff --git a/dbms/src/Core/CachedSpillHandler.cpp b/dbms/src/Core/CachedSpillHandler.cpp index cde2523da35..3dd6ba8cb4c 100644 --- a/dbms/src/Core/CachedSpillHandler.cpp +++ b/dbms/src/Core/CachedSpillHandler.cpp @@ -43,6 +43,8 @@ bool CachedSpillHandler::batchRead() { if unlikely (is_cancelled()) return false; + if unlikely (block.rows() == 0) + continue; ret.push_back(std::move(block)); current_return_size += ret.back().estimateBytesForSpill(); if (bytes_threshold > 0 && current_return_size >= bytes_threshold) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 84372008bea..f9c11c866c9 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -151,6 +151,33 @@ size_t AggregatedDataVariants::getBucketNumberForTwoLevelHashTable(Type type) } } +void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) +{ + if (aggregator) + { + auto agg_spill_context = aggregator->agg_spill_context; + if (agg_spill_context->isSpillEnabled() && agg_spill_context->isInAutoSpillMode()) + { + auto resize_callback = [agg_spill_context, thread_num]() { + return !agg_spill_context->isThreadMarkedForAutoSpill(thread_num); + }; +#define M(NAME) \ + case AggregationMethodType(NAME): \ + { \ + ToAggregationMethodPtr(NAME, aggregation_method_impl)->data.setResizeCallback(resize_callback); \ + break; \ + } + switch (type) + { + APPLY_FOR_VARIANTS_TWO_LEVEL(M) + default: + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + } +#undef M + } + } +} + void AggregatedDataVariants::convertToTwoLevel() { switch (type) @@ -645,9 +672,21 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( { /// For all rows. AggregateDataPtr place = aggregates_pool->alloc(0); - for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) - state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers).setMapped(place); - agg_process_info.start_row += agg_size; + size_t processed_rows = std::numeric_limits::max(); + try + { + for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) + { + state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers).setMapped(place); + processed_rows = i; + } + } + catch (ResizeException &) + { + LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); + } + if (processed_rows != std::numeric_limits::max()) + agg_process_info.start_row = processed_rows + 1; return; } @@ -657,6 +696,7 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; ++inst) { + /// no resize will happen for this kind of hash table, so don't catch resize exception inst->batch_that->addBatchLookupTable8( agg_process_info.start_row, agg_size, @@ -678,43 +718,65 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( /// Generic case. std::unique_ptr places(new AggregateDataPtr[agg_size]); + size_t processed_rows = std::numeric_limits::max(); + bool allow_exception = false; - for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) + try { - AggregateDataPtr aggregate_data = nullptr; + for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) + { + AggregateDataPtr aggregate_data = nullptr; - auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers); + allow_exception = true; - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.isInserted()) - { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.setMapped(nullptr); + auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers); - aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); - createAggregateStates(aggregate_data); + allow_exception = false; - emplace_result.setMapped(aggregate_data); - } - else - aggregate_data = emplace_result.getMapped(); + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.isInserted()) + { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.setMapped(nullptr); + + aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); + createAggregateStates(aggregate_data); + + emplace_result.setMapped(aggregate_data); + } + else + aggregate_data = emplace_result.getMapped(); - places[i - agg_process_info.start_row] = aggregate_data; + places[i - agg_process_info.start_row] = aggregate_data; + processed_rows = i; + } + } + catch (ResizeException &) + { + LOG_INFO( + log, + "HashTable resize throw ResizeException since the data is already marked for spill, allow_exception: {}", + allow_exception); + if unlikely (!allow_exception) + throw; } - /// Add values to the aggregate functions. - for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; - ++inst) + if (processed_rows != std::numeric_limits::max()) { - inst->batch_that->addBatch( - agg_process_info.start_row, - agg_size, - places.get(), - inst->state_offset, - inst->batch_arguments, - aggregates_pool); + /// Add values to the aggregate functions. + for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; + ++inst) + { + inst->batch_that->addBatch( + agg_process_info.start_row, + processed_rows - agg_process_info.start_row + 1, + places.get(), + inst->state_offset, + inst->batch_arguments, + aggregates_pool); + } + agg_process_info.start_row = processed_rows + 1; } - agg_process_info.start_row += agg_size; } void NO_INLINE @@ -896,7 +958,10 @@ bool Aggregator::executeOnBlock(AggProcessInfo & agg_process_info, AggregatedDat * It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel. */ if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level) + { result.convertToTwoLevel(); + result.setResizeCallbackIfNeeded(thread_num); + } /** Flush data to disk if too much RAM is consumed. */ @@ -953,6 +1018,7 @@ void Aggregator::spill(AggregatedDataVariants & data_variants, size_t thread_num /// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones. data_variants.init(data_variants.type); + data_variants.setResizeCallbackIfNeeded(thread_num); data_variants.need_spill = false; data_variants.aggregates_pools = Arenas(1, std::make_shared()); data_variants.aggregates_pool = data_variants.aggregates_pools.back().get(); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 6acdccd99f1..6e69a3c1358 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -938,6 +938,8 @@ struct AggregatedDataVariants : private boost::noncopyable void convertToTwoLevel(); + void setResizeCallbackIfNeeded(size_t thread_num); + #define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \ M(key32_two_level) \ M(key64_two_level) \ From 895fb5e10e558806c610c673c59e04a8cf86c84a Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 12 Sep 2023 14:18:33 +0800 Subject: [PATCH 2/6] address comments Signed-off-by: xufei --- dbms/src/Interpreters/Aggregator.cpp | 12 +++++------- dbms/src/Interpreters/Aggregator.h | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index f9c11c866c9..2ffb52440aa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -151,7 +151,7 @@ size_t AggregatedDataVariants::getBucketNumberForTwoLevelHashTable(Type type) } } -void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) +void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) const { if (aggregator) { @@ -672,21 +672,19 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( { /// For all rows. AggregateDataPtr place = aggregates_pool->alloc(0); - size_t processed_rows = std::numeric_limits::max(); try { - for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) + for (size_t i = 0; i < agg_size; ++i) { - state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers).setMapped(place); - processed_rows = i; + state.emplaceKey(method.data, agg_process_info.start_row, *aggregates_pool, sort_key_containers) + .setMapped(place); + ++agg_process_info.start_row; } } catch (ResizeException &) { LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); } - if (processed_rows != std::numeric_limits::max()) - agg_process_info.start_row = processed_rows + 1; return; } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 6e69a3c1358..54f63f52d73 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -938,7 +938,7 @@ struct AggregatedDataVariants : private boost::noncopyable void convertToTwoLevel(); - void setResizeCallbackIfNeeded(size_t thread_num); + void setResizeCallbackIfNeeded(size_t thread_num) const; #define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \ M(key32_two_level) \ From fed8b2ef5d7ce99a3f09854ea8c82b32e81434fd Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 14 Sep 2023 10:58:04 +0800 Subject: [PATCH 3/6] save work Signed-off-by: xufei --- dbms/src/Interpreters/Aggregator.cpp | 2 +- dbms/src/Interpreters/JoinPartition.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 2ffb52440aa..3902e4fed23 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -159,7 +159,7 @@ void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) const if (agg_spill_context->isSpillEnabled() && agg_spill_context->isInAutoSpillMode()) { auto resize_callback = [agg_spill_context, thread_num]() { - return !agg_spill_context->isThreadMarkedForAutoSpill(thread_num); + return !(agg_spill_context->supportAutoTriggerSpill() && agg_spill_context->isThreadMarkedForAutoSpill(thread_num)); }; #define M(NAME) \ case AggregationMethodType(NAME): \ diff --git a/dbms/src/Interpreters/JoinPartition.cpp b/dbms/src/Interpreters/JoinPartition.cpp index e5b86697515..5039923bdbb 100644 --- a/dbms/src/Interpreters/JoinPartition.cpp +++ b/dbms/src/Interpreters/JoinPartition.cpp @@ -236,7 +236,7 @@ void JoinPartition::setResizeCallbackIfNeeded() if (hash_join_spill_context->isSpillEnabled() && hash_join_spill_context->isInAutoSpillMode()) { auto resize_callback = [this]() { - return !hash_join_spill_context->isPartitionMarkedForAutoSpill(partition_index); + return !(hash_join_spill_context->supportFurtherSpill() && hash_join_spill_context->isPartitionMarkedForAutoSpill(partition_index)); }; assert(pool != nullptr); pool->setResizeCallback(resize_callback); From 86cfd48e375bf289b5e53593d777a75de6f234ec Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 14 Sep 2023 15:03:55 +0800 Subject: [PATCH 4/6] refine Signed-off-by: xufei --- dbms/src/Interpreters/Aggregator.cpp | 76 ++++++++++++++----------- dbms/src/Interpreters/Aggregator.h | 18 ++++++ dbms/src/Interpreters/JoinPartition.cpp | 4 +- 3 files changed, 64 insertions(+), 34 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 3902e4fed23..44f2432f056 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -159,7 +159,9 @@ void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) const if (agg_spill_context->isSpillEnabled() && agg_spill_context->isInAutoSpillMode()) { auto resize_callback = [agg_spill_context, thread_num]() { - return !(agg_spill_context->supportAutoTriggerSpill() && agg_spill_context->isThreadMarkedForAutoSpill(thread_num)); + return !( + agg_spill_context->supportAutoTriggerSpill() + && agg_spill_context->isThreadMarkedForAutoSpill(thread_num)); }; #define M(NAME) \ case AggregationMethodType(NAME): \ @@ -652,6 +654,24 @@ void NO_INLINE Aggregator::executeImpl( executeImplBatch(method, state, aggregates_pool, agg_process_info); } +template +std::optional Aggregator::emplaceKey( + Method & method, + typename Method::State & state, + size_t index, + Arena & aggregates_pool, + std::vector & sort_key_containers) const +{ + try + { + return state.emplaceKey(method.data, index, aggregates_pool, sort_key_containers); + } + catch (ResizeException &) + { + return {}; + } +} + template ALWAYS_INLINE void Aggregator::executeImplBatch( Method & method, @@ -717,46 +737,36 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( std::unique_ptr places(new AggregateDataPtr[agg_size]); size_t processed_rows = std::numeric_limits::max(); - bool allow_exception = false; - try + for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) { - for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) - { - AggregateDataPtr aggregate_data = nullptr; - - allow_exception = true; - - auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers); + AggregateDataPtr aggregate_data = nullptr; - allow_exception = false; + auto emplace_result_holder = emplaceKey(method, state, i, *aggregates_pool, sort_key_containers); + if unlikely (!emplace_result_holder.has_value()) + { + LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); + break; + } - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.isInserted()) - { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.setMapped(nullptr); + auto & emplace_result = emplace_result_holder.value(); - aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); - createAggregateStates(aggregate_data); + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.isInserted()) + { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.setMapped(nullptr); - emplace_result.setMapped(aggregate_data); - } - else - aggregate_data = emplace_result.getMapped(); + aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); + createAggregateStates(aggregate_data); - places[i - agg_process_info.start_row] = aggregate_data; - processed_rows = i; + emplace_result.setMapped(aggregate_data); } - } - catch (ResizeException &) - { - LOG_INFO( - log, - "HashTable resize throw ResizeException since the data is already marked for spill, allow_exception: {}", - allow_exception); - if unlikely (!allow_exception) - throw; + else + aggregate_data = emplace_result.getMapped(); + + places[i - agg_process_info.start_row] = aggregate_data; + processed_rows = i; } if (processed_rows != std::numeric_limits::max()) diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 54f63f52d73..4b20dd4010c 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -131,6 +131,7 @@ struct AggregationMethodOneNumber /// To use one `Method` in different threads, use different `State`. using State = ColumnsHashing:: HashMethodOneNumber; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; /// Shuffle key columns before `insertKeyIntoColumns` call if needed. std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -166,6 +167,7 @@ struct AggregationMethodString {} using State = ColumnsHashing::HashMethodString; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -198,6 +200,7 @@ struct AggregationMethodStringNoCache // Remove last zero byte. using State = ColumnsHashing::HashMethodString; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -229,6 +232,7 @@ struct AggregationMethodOneKeyStringNoCache {} using State = ColumnsHashing::HashMethodStringBin; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -262,6 +266,7 @@ struct AggregationMethodMultiStringNoCache {} using State = ColumnsHashing::HashMethodMultiString; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -292,6 +297,7 @@ struct AggregationMethodFastPathTwoKeysNoCache using State = ColumnsHashing::HashMethodFastPathTwoKeysSerialized; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -386,6 +392,7 @@ struct AggregationMethodFixedString {} using State = ColumnsHashing::HashMethodFixedString; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -417,6 +424,7 @@ struct AggregationMethodFixedStringNoCache {} using State = ColumnsHashing::HashMethodFixedString; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -451,6 +459,7 @@ struct AggregationMethodKeysFixed using State = ColumnsHashing::HashMethodKeysFixed; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector & key_columns, const Sizes & key_sizes) { @@ -538,6 +547,7 @@ struct AggregationMethodSerialized {} using State = ColumnsHashing::HashMethodSerialized; + using EmplaceResult = ColumnsHashing::columns_hashing_impl::EmplaceResultImpl; std::optional shuffleKeyColumns(std::vector &, const Sizes &) { return {}; } @@ -1268,6 +1278,14 @@ class Aggregator Arena * aggregates_pool, AggProcessInfo & agg_process_info) const; + template + std::optional emplaceKey( + Method & method, + typename Method::State & state, + size_t index, + Arena & aggregates_pool, + std::vector & sort_key_containers) const; + /// For case when there are no keys (all aggregate into one row). static void executeWithoutKeyImpl(AggregatedDataWithoutKey & res, AggProcessInfo & agg_process_info, Arena * arena); diff --git a/dbms/src/Interpreters/JoinPartition.cpp b/dbms/src/Interpreters/JoinPartition.cpp index 5039923bdbb..20fb1be0715 100644 --- a/dbms/src/Interpreters/JoinPartition.cpp +++ b/dbms/src/Interpreters/JoinPartition.cpp @@ -236,7 +236,9 @@ void JoinPartition::setResizeCallbackIfNeeded() if (hash_join_spill_context->isSpillEnabled() && hash_join_spill_context->isInAutoSpillMode()) { auto resize_callback = [this]() { - return !(hash_join_spill_context->supportFurtherSpill() && hash_join_spill_context->isPartitionMarkedForAutoSpill(partition_index)); + return !( + hash_join_spill_context->supportFurtherSpill() + && hash_join_spill_context->isPartitionMarkedForAutoSpill(partition_index)); }; assert(pool != nullptr); pool->setResizeCallback(resize_callback); From d13ceb6cb9480cdcd6e170f8dc5505ba530297ff Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 14 Sep 2023 15:10:08 +0800 Subject: [PATCH 5/6] fix Signed-off-by: xufei --- dbms/src/Interpreters/Aggregator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 44f2432f056..b46d4e3c11e 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -160,7 +160,7 @@ void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) const { auto resize_callback = [agg_spill_context, thread_num]() { return !( - agg_spill_context->supportAutoTriggerSpill() + agg_spill_context->supportFurtherSpill() && agg_spill_context->isThreadMarkedForAutoSpill(thread_num)); }; #define M(NAME) \ From 8668ec24e22b944b9dbbb66072131dee773f6ba5 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 14 Sep 2023 16:30:32 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: xufei --- dbms/src/Interpreters/Aggregator.cpp | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index b46d4e3c11e..3c80b56ce02 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -692,18 +692,20 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( { /// For all rows. AggregateDataPtr place = aggregates_pool->alloc(0); - try + for (size_t i = 0; i < agg_size; ++i) { - for (size_t i = 0; i < agg_size; ++i) + auto emplace_result_hold + = emplaceKey(method, state, agg_process_info.start_row, *aggregates_pool, sort_key_containers); + if likely (emplace_result_hold.has_value()) { - state.emplaceKey(method.data, agg_process_info.start_row, *aggregates_pool, sort_key_containers) - .setMapped(place); + emplace_result_hold.value().setMapped(place); ++agg_process_info.start_row; } - } - catch (ResizeException &) - { - LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); + else + { + LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); + break; + } } return; } @@ -714,7 +716,6 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; ++inst) { - /// no resize will happen for this kind of hash table, so don't catch resize exception inst->batch_that->addBatchLookupTable8( agg_process_info.start_row, agg_size, @@ -736,7 +737,7 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( /// Generic case. std::unique_ptr places(new AggregateDataPtr[agg_size]); - size_t processed_rows = std::numeric_limits::max(); + std::optional processed_rows; for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) { @@ -769,7 +770,7 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( processed_rows = i; } - if (processed_rows != std::numeric_limits::max()) + if (processed_rows) { /// Add values to the aggregate functions. for (AggregateFunctionInstruction * inst = agg_process_info.aggregate_functions_instructions.data(); inst->that; @@ -777,13 +778,13 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( { inst->batch_that->addBatch( agg_process_info.start_row, - processed_rows - agg_process_info.start_row + 1, + *processed_rows - agg_process_info.start_row + 1, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); } - agg_process_info.start_row = processed_rows + 1; + agg_process_info.start_row = *processed_rows + 1; } }