diff --git a/src/execution/aggregate_hashtable.cpp b/src/execution/aggregate_hashtable.cpp index d15583c2e17..8a15d15e341 100644 --- a/src/execution/aggregate_hashtable.cpp +++ b/src/execution/aggregate_hashtable.cpp @@ -512,7 +512,7 @@ void GroupedAggregateHashTable::Combine(GroupedAggregateHashTable &other) { } } -void GroupedAggregateHashTable::Combine(TupleDataCollection &other_data) { +void GroupedAggregateHashTable::Combine(TupleDataCollection &other_data, optional_ptr> progress) { D_ASSERT(other_data.GetLayout().GetAggrWidth() == layout.GetAggrWidth()); D_ASSERT(other_data.GetLayout().GetDataWidth() == layout.GetDataWidth()); D_ASSERT(other_data.GetLayout().GetRowWidth() == layout.GetRowWidth()); @@ -523,6 +523,9 @@ void GroupedAggregateHashTable::Combine(TupleDataCollection &other_data) { FlushMoveState fm_state(other_data); RowOperationsState row_state(*aggregate_allocator); + + idx_t chunk_idx = 0; + const auto chunk_count = other_data.ChunkCount(); while (fm_state.Scan()) { FindOrCreateGroups(fm_state.groups, fm_state.hashes, fm_state.group_addresses, fm_state.new_groups_sel); RowOperations::CombineStates(row_state, layout, fm_state.scan_state.chunk_state.row_locations, @@ -531,6 +534,10 @@ void GroupedAggregateHashTable::Combine(TupleDataCollection &other_data) { RowOperations::DestroyStates(row_state, layout, fm_state.scan_state.chunk_state.row_locations, fm_state.groups.size()); } + + if (progress) { + *progress = double(++chunk_idx) / double(chunk_count); + } } Verify(); diff --git a/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/execution/operator/aggregate/physical_hash_aggregate.cpp index 8561c5fb182..037f68d42f0 100644 --- a/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -782,13 +782,13 @@ class HashAggregateGlobalSourceState : public GlobalSourceState { } auto &ht_state = op.sink_state->Cast(); - idx_t partitions = 0; + idx_t threads = 0; for (size_t sidx = 0; sidx < op.groupings.size(); ++sidx) { auto &grouping = op.groupings[sidx]; auto &grouping_gstate = ht_state.grouping_states[sidx]; - partitions += grouping.table_data.NumberOfPartitions(*grouping_gstate.table_state); + threads += grouping.table_data.MaxThreads(*grouping_gstate.table_state); } - return MaxValue(1, partitions); + return MaxValue(1, threads); } }; @@ -850,6 +850,17 @@ SourceResultType PhysicalHashAggregate::GetData(ExecutionContext &context, DataC return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; } +double PhysicalHashAggregate::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { + auto &sink_gstate = sink_state->Cast(); + auto &gstate = gstate_p.Cast(); + double total_progress = 0; + for (idx_t radix_idx = 0; radix_idx < groupings.size(); radix_idx++) { + total_progress += groupings[radix_idx].table_data.GetProgress( + context, *sink_gstate.grouping_states[radix_idx].table_state, *gstate.radix_states[radix_idx]); + } + return total_progress / double(groupings.size()); +} + string PhysicalHashAggregate::ParamsToString() const { string result; auto &groups = grouped_aggregate_data.groups; diff --git a/src/execution/physical_plan/plan_create_table.cpp b/src/execution/physical_plan/plan_create_table.cpp index c5f809334d5..be854e0da30 100644 --- a/src/execution/physical_plan/plan_create_table.cpp +++ b/src/execution/physical_plan/plan_create_table.cpp @@ -1,16 +1,17 @@ #include "duckdb/catalog/catalog_entry/scalar_function_catalog_entry.hpp" +#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" +#include "duckdb/catalog/duck_catalog.hpp" +#include "duckdb/execution/operator/persistent/physical_batch_insert.hpp" +#include "duckdb/execution/operator/persistent/physical_insert.hpp" #include "duckdb/execution/operator/schema/physical_create_table.hpp" #include "duckdb/execution/physical_plan_generator.hpp" +#include "duckdb/main/config.hpp" +#include "duckdb/parallel/task_scheduler.hpp" #include "duckdb/parser/parsed_data/create_table_info.hpp" -#include "duckdb/execution/operator/persistent/physical_insert.hpp" +#include "duckdb/planner/constraints/bound_check_constraint.hpp" #include "duckdb/planner/expression/bound_function_expression.hpp" #include "duckdb/planner/operator/logical_create_table.hpp" -#include "duckdb/main/config.hpp" -#include "duckdb/execution/operator/persistent/physical_batch_insert.hpp" -#include "duckdb/planner/constraints/bound_check_constraint.hpp" -#include "duckdb/parallel/task_scheduler.hpp" -#include "duckdb/catalog/duck_catalog.hpp" namespace duckdb { @@ -21,10 +22,10 @@ unique_ptr DuckCatalog::PlanCreateTableAs(ClientContext &conte auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); unique_ptr create; if (!parallel_streaming_insert && use_batch_index) { - create = make_uniq(op, op.schema, std::move(op.info), op.estimated_cardinality); + create = make_uniq(op, op.schema, std::move(op.info), 0); } else { - create = make_uniq(op, op.schema, std::move(op.info), op.estimated_cardinality, + create = make_uniq(op, op.schema, std::move(op.info), 0, parallel_streaming_insert && num_threads > 1); } diff --git a/src/execution/radix_partitioned_hashtable.cpp b/src/execution/radix_partitioned_hashtable.cpp index 9a106960912..4f7bbd66352 100644 --- a/src/execution/radix_partitioned_hashtable.cpp +++ b/src/execution/radix_partitioned_hashtable.cpp @@ -69,9 +69,11 @@ unique_ptr RadixPartitionedHashTable::CreateHT(Client // Sink //===--------------------------------------------------------------------===// struct AggregatePartition { - explicit AggregatePartition(unique_ptr data_p) : data(std::move(data_p)), finalized(false) { + explicit AggregatePartition(unique_ptr data_p) + : data(std::move(data_p)), progress(0), finalized(false) { } unique_ptr data; + atomic progress; atomic finalized; }; @@ -135,6 +137,8 @@ class RadixHTGlobalSinkState : public GlobalSinkState { void Destroy(); public: + ClientContext &context; + //! The radix HT const RadixPartitionedHashTable &radix_ht; //! Config for partitioning @@ -168,10 +172,10 @@ class RadixHTGlobalSinkState : public GlobalSinkState { idx_t count_before_combining; }; -RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context, const RadixPartitionedHashTable &radix_ht_p) - : radix_ht(radix_ht_p), config(context, *this), finalized(false), external(false), active_threads(0), - any_combined(false), finalize_idx(0), scan_pin_properties(TupleDataPinProperties::DESTROY_AFTER_DONE), - count_before_combining(0) { +RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht_p) + : context(context_p), radix_ht(radix_ht_p), config(context, *this), finalized(false), external(false), + active_threads(0), any_combined(false), finalize_idx(0), + scan_pin_properties(TupleDataPinProperties::DESTROY_AFTER_DONE), count_before_combining(0) { } RadixHTGlobalSinkState::~RadixHTGlobalSinkState() { @@ -479,9 +483,32 @@ void RadixPartitionedHashTable::Finalize(ClientContext &, GlobalSinkState &gstat //===--------------------------------------------------------------------===// // Source //===--------------------------------------------------------------------===// -idx_t RadixPartitionedHashTable::NumberOfPartitions(GlobalSinkState &sink_p) const { +idx_t RadixPartitionedHashTable::MaxThreads(GlobalSinkState &sink_p) const { auto &sink = sink_p.Cast(); - return sink.partitions.size(); + if (sink.partitions.empty()) { + return 0; + } + + // We take the largest partition as an example + reference largest_partition = *sink.partitions[0]->data; + for (idx_t i = 1; i < sink.partitions.size(); i++) { + auto &partition = *sink.partitions[i]->data; + if (partition.Count() > largest_partition.get().Count()) { + largest_partition = partition; + } + } + + // Worst-case size if every value is unique + const auto maximum_combined_partition_size = + GroupedAggregateHashTable::GetCapacityForCount(largest_partition.get().Count()) * sizeof(aggr_ht_entry_t) + + largest_partition.get().SizeInBytes(); + + // How many of these can we fit in 60% of memory + const idx_t memory_limit = 0.6 * BufferManager::GetBufferManager(sink.context).GetMaxMemory(); + const auto partitions_that_fit = MaxValue(memory_limit / maximum_combined_partition_size, 1); + + // Of course, limit it to the number of threads + return MinValue(sink.partitions.size(), partitions_that_fit); } void RadixPartitionedHashTable::SetMultiScan(GlobalSinkState &sink_p) { @@ -649,7 +676,17 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob if (!ht) { // Create a HT with sufficient capacity const auto capacity = GroupedAggregateHashTable::GetCapacityForCount(partition.data->Count()); - ht = sink.radix_ht.CreateHT(gstate.context, capacity, 0); + + // However, we will limit the initial capacity so we don't do a huge over-allocation + const idx_t n_threads = TaskScheduler::GetScheduler(gstate.context).NumberOfThreads(); + const idx_t memory_limit = BufferManager::GetBufferManager(gstate.context).GetMaxMemory(); + const idx_t thread_limit = 0.6 * memory_limit / n_threads; + + const idx_t size_per_entry = partition.data->SizeInBytes() / partition.data->Count() + + idx_t(GroupedAggregateHashTable::LOAD_FACTOR * sizeof(aggr_ht_entry_t)); + const auto capacity_limit = NextPowerOfTwo(thread_limit / size_per_entry); + + ht = sink.radix_ht.CreateHT(gstate.context, MinValue(capacity, capacity_limit), 0); } else { // We may want to resize here to the size of this partition, but for now we just assume uniform partition sizes ht->InitializePartitionedData(); @@ -658,7 +695,7 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob } // Now combine the uncombined data using this thread's HT - ht->Combine(*partition.data); + ht->Combine(*partition.data, &partition.progress); ht->UnpinData(); // Move the combined data back to the partition @@ -812,4 +849,25 @@ SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, D } } +double RadixPartitionedHashTable::GetProgress(ClientContext &, GlobalSinkState &sink_p, + GlobalSourceState &gstate_p) const { + auto &sink = sink_p.Cast(); + auto &gstate = gstate_p.Cast(); + + // Get partition combine progress, weigh it 2x + double total_progress = 0; + for (auto &partition : sink.partitions) { + total_progress += partition->progress * 2.0; + } + + // Get scan progress, weigh it 1x + total_progress += gstate.scan_done; + + // Divide by 3x for the weights, and the number of partitions to get a value between 0 and 1 again + total_progress /= 3.0 * sink.partitions.size(); + + // Multiply by 100 to get a percentage + return 100.0 * total_progress; +} + } // namespace duckdb diff --git a/src/include/duckdb/execution/aggregate_hashtable.hpp b/src/include/duckdb/execution/aggregate_hashtable.hpp index b8a4a8fa483..c48d81bee3e 100644 --- a/src/include/duckdb/execution/aggregate_hashtable.hpp +++ b/src/include/duckdb/execution/aggregate_hashtable.hpp @@ -138,7 +138,7 @@ class GroupedAggregateHashTable : public BaseAggregateHashTable { //! Executes the filter(if any) and update the aggregates void Combine(GroupedAggregateHashTable &other); - void Combine(TupleDataCollection &other_data); + void Combine(TupleDataCollection &other_data, optional_ptr> progress = nullptr); //! Unpins the data blocks void UnpinData(); diff --git a/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp b/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp index d36b17c0ae4..3d1d6749b56 100644 --- a/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp +++ b/src/include/duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp @@ -93,6 +93,8 @@ class PhysicalHashAggregate : public PhysicalOperator { GlobalSourceState &gstate) const override; SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; + bool IsSource() const override { return true; } diff --git a/src/include/duckdb/execution/radix_partitioned_hashtable.hpp b/src/include/duckdb/execution/radix_partitioned_hashtable.hpp index c9827a357bd..2134de075ba 100644 --- a/src/include/duckdb/execution/radix_partitioned_hashtable.hpp +++ b/src/include/duckdb/execution/radix_partitioned_hashtable.hpp @@ -50,8 +50,10 @@ class RadixPartitionedHashTable { SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, GlobalSinkState &sink, OperatorSourceInput &input) const; + double GetProgress(ClientContext &context, GlobalSinkState &sink_p, GlobalSourceState &gstate) const; + const TupleDataLayout &GetLayout() const; - idx_t NumberOfPartitions(GlobalSinkState &sink) const; + idx_t MaxThreads(GlobalSinkState &sink) const; static void SetMultiScan(GlobalSinkState &sink); private: