diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index f9c11c866c9..2ffb52440aa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -151,7 +151,7 @@ size_t AggregatedDataVariants::getBucketNumberForTwoLevelHashTable(Type type) } } -void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) +void AggregatedDataVariants::setResizeCallbackIfNeeded(size_t thread_num) const { if (aggregator) { @@ -672,21 +672,19 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( { /// For all rows. AggregateDataPtr place = aggregates_pool->alloc(0); - size_t processed_rows = std::numeric_limits::max(); try { - for (size_t i = agg_process_info.start_row; i < agg_process_info.start_row + agg_size; ++i) + for (size_t i = 0; i < agg_size; ++i) { - state.emplaceKey(method.data, i, *aggregates_pool, sort_key_containers).setMapped(place); - processed_rows = i; + state.emplaceKey(method.data, agg_process_info.start_row, *aggregates_pool, sort_key_containers) + .setMapped(place); + ++agg_process_info.start_row; } } catch (ResizeException &) { LOG_INFO(log, "HashTable resize throw ResizeException since the data is already marked for spill"); } - if (processed_rows != std::numeric_limits::max()) - agg_process_info.start_row = processed_rows + 1; return; } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 6e69a3c1358..54f63f52d73 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -938,7 +938,7 @@ struct AggregatedDataVariants : private boost::noncopyable void convertToTwoLevel(); - void setResizeCallbackIfNeeded(size_t thread_num); + void setResizeCallbackIfNeeded(size_t thread_num) const; #define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \ M(key32_two_level) \