Skip to content

Commit

Permalink
Optimize statistics aggregation for wide tables
Browse files Browse the repository at this point in the history
This patch optimizes statistics aggregation for extra wide tables (1000+ columns).

For extra wide tables creation of InMemoryHashAggregationBuilder could be expensive, as
it creates ~4 aggregators (one for every statistic collected) for every column. After each
partial results flush the InMemoryHashAggregationBuilder has to be recreated, what takes way more
CPU time that the aggregations itself.

As an optimization this patch:

- Removes partial aggregation memory limit to avoid frequent flushes
- Sets expected entries size to 200 instead of 10_000. The magic number 200 was chosen out of
  consideration that statistics aggregation is per partition. And currently it is not allowed
  to insert more than 100 partitions at once.
  • Loading branch information
arhimondr committed Sep 25, 2018
1 parent 19b42d8 commit f8c51c3
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 33 deletions.
Expand Up @@ -124,7 +124,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
Optional.empty(),
Optional.empty(),
10_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
JOIN_COMPILER);

return ImmutableList.of(tableScanOperator, tpchQuery1Operator, aggregationOperator);
Expand Down
Expand Up @@ -62,7 +62,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
Optional.empty(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
JOIN_COMPILER);
return ImmutableList.of(tableScanOperator, aggregationOperator);
}
Expand Down
Expand Up @@ -63,7 +63,7 @@ public static class HashAggregationOperatorFactory
private final Optional<Integer> groupIdChannel;

private final int expectedGroups;
private final DataSize maxPartialMemory;
private final Optional<DataSize> maxPartialMemory;
private final boolean spillEnabled;
private final DataSize memoryLimitForMerge;
private final DataSize memoryLimitForMergeWithMemory;
Expand All @@ -84,7 +84,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler)
{
this(operatorId,
Expand Down Expand Up @@ -120,7 +120,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize unspillMemoryLimit,
SpillerFactory spillerFactory,
Expand Down Expand Up @@ -158,7 +158,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize memoryLimitForMerge,
DataSize memoryLimitForMergeWithMemory,
Expand Down Expand Up @@ -250,7 +250,7 @@ public OperatorFactory duplicate()
private final Optional<Integer> hashChannel;
private final Optional<Integer> groupIdChannel;
private final int expectedGroups;
private final DataSize maxPartialMemory;
private final Optional<DataSize> maxPartialMemory;
private final boolean spillEnabled;
private final DataSize memoryLimitForMerge;
private final DataSize memoryLimitForMergeWithMemory;
Expand Down Expand Up @@ -280,7 +280,7 @@ public HashAggregationOperator(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize memoryLimitForMerge,
DataSize memoryLimitForMergeWithMemory,
Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static com.facebook.presto.SystemSessionProperties.isDictionaryAggregationEnabled;
import static com.facebook.presto.operator.GroupByHash.createGroupByHash;
Expand All @@ -59,7 +60,7 @@ public class InMemoryHashAggregationBuilder
private final List<Aggregator> aggregators;
private final OperatorContext operatorContext;
private final boolean partial;
private final long maxPartialMemory;
private final OptionalLong maxPartialMemory;
private final LocalMemoryContext systemMemoryContext;
private final LocalMemoryContext localUserMemoryContext;

Expand All @@ -73,7 +74,7 @@ public InMemoryHashAggregationBuilder(
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation)
{
Expand All @@ -98,7 +99,7 @@ public InMemoryHashAggregationBuilder(
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
Optional<Integer> overwriteIntermediateChannelOffset,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation)
Expand Down Expand Up @@ -126,7 +127,7 @@ public InMemoryHashAggregationBuilder(
updateMemory);
this.operatorContext = operatorContext;
this.partial = step.isOutputPartial();
this.maxPartialMemory = maxPartialMemory.toBytes();
this.maxPartialMemory = maxPartialMemory.map(dataSize -> OptionalLong.of(dataSize.toBytes())).orElseGet(OptionalLong::empty);
this.systemMemoryContext = operatorContext.newLocalSystemMemoryContext(InMemoryHashAggregationBuilder.class.getSimpleName());
this.localUserMemoryContext = operatorContext.localUserMemoryContext();

Expand Down Expand Up @@ -326,9 +327,10 @@ public List<Type> buildTypes()
private boolean updateMemoryWithYieldInfo()
{
long memorySize = getSizeInMemory();
if (partial) {
// if partial limit is not set, memory is considered as user memory
if (partial && maxPartialMemory.isPresent()) {
systemMemoryContext.setBytes(memorySize);
full = (memorySize > maxPartialMemory);
full = (memorySize > maxPartialMemory.getAsLong());
return true;
}
// Operator/driver will be blocked on memory after we call setBytes.
Expand Down
Expand Up @@ -144,7 +144,7 @@ private void rebuildHashAggregationBuilder()
groupByPartialChannels,
hashChannel,
operatorContext,
DataSize.succinctBytes(0),
Optional.of(DataSize.succinctBytes(0)),
Optional.of(overwriteIntermediateChannelOffset),
joinCompiler,
false);
Expand Down
Expand Up @@ -299,7 +299,7 @@ private void rebuildHashAggregationBuilder()
groupByChannels,
hashChannel,
operatorContext,
DataSize.succinctBytes(0),
Optional.of(DataSize.succinctBytes(0)),
joinCompiler,
false);
emptyHashAggregationBuilderSize = hashAggregationBuilder.getSizeInMemory();
Expand Down
Expand Up @@ -2173,7 +2173,14 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl
new DataSize(0, BYTE),
context,
2,
outputMapping);
outputMapping,
200,
// This aggregation must behave as INTERMEDIATE.
// Using INTERMEDIATE aggregation directly
// is not possible, as it doesn't accept raw input data.
// Disabling partial pre-aggregation memory limit effectively
// turns PARTIAL aggregation into INTERMEDIATE.
Optional.empty());
}).orElse(new DevNullOperatorFactory(context.getNextOperatorId(), node.getId()));

List<Integer> inputChannels = node.getColumns().stream()
Expand Down Expand Up @@ -2227,7 +2234,10 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
new DataSize(0, BYTE),
context,
0,
outputMapping);
outputMapping,
200,
// final aggregation ignores partial pre-aggregation memory limit
Optional.empty());
}).orElse(new DevNullOperatorFactory(context.getNextOperatorId(), node.getId()));

Map<Symbol, Integer> aggregationOutput = outputMapping.build();
Expand Down Expand Up @@ -2544,7 +2554,9 @@ private PhysicalOperation planGroupByAggregation(
unspillMemoryLimit,
context,
0,
mappings);
mappings,
10_000,
Optional.of(maxPartialAggregationMemorySize));
return new PhysicalOperation(operatorFactory, mappings.build(), context, source);
}

Expand All @@ -2563,7 +2575,9 @@ private OperatorFactory createHashAggregationOperatorFactory(
DataSize unspillMemoryLimit,
LocalExecutionPlanContext context,
int startOutputChannel,
ImmutableMap.Builder<Symbol, Integer> outputMappings)
ImmutableMap.Builder<Symbol, Integer> outputMappings,
int expectedGroups,
Optional<DataSize> maxPartialAggregationMemorySize)
{
List<Symbol> aggregationOutputSymbols = new ArrayList<>();
List<AccumulatorFactory> accumulatorFactories = new ArrayList<>();
Expand Down Expand Up @@ -2626,7 +2640,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
accumulatorFactories,
hashChannel,
groupIdChannel,
10_000,
expectedGroups,
maxPartialAggregationMemorySize,
spillEnabled,
unspillMemoryLimit,
Expand Down
Expand Up @@ -165,7 +165,7 @@ private OperatorFactory createHashAggregationOperatorFactory(Optional<Integer> h
hashChannel,
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
false,
succinctBytes(8),
succinctBytes(Integer.MAX_VALUE),
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void testHashAggregation(boolean hashEnabled, boolean spillEnabled, long
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEna
rowPagesBuilder.getHashChannel(),
groupIdChannel,
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testMemoryLimit(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

toPages(operatorFactory, driverContext, input);
Expand Down Expand Up @@ -370,7 +370,7 @@ public void testHashBuilderResize(boolean hashEnabled, boolean spillEnabled, lon
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand All @@ -395,7 +395,7 @@ public void testMemoryReservationYield(Type type)
Optional.of(1),
Optional.empty(),
1,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

// get result with yield; pick a relatively small buffer for aggregator's memory usage
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testHashBuilderResizeLimit(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

toPages(operatorFactory, driverContext, input);
Expand Down Expand Up @@ -479,7 +479,7 @@ public void testMultiSliceAggregationOutput(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

assertEquals(toPages(operatorFactory, createDriverContext(), input).size(), 2);
Expand Down Expand Up @@ -509,7 +509,7 @@ public void testMultiplePartialFlushes(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(1, KILOBYTE),
Optional.of(new DataSize(1, KILOBYTE)),
joinCompiler);

DriverContext driverContext = createDriverContext(1024);
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testMergeWithMemorySpill()
rowPagesBuilder.getHashChannel(),
Optional.empty(),
1,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
true,
new DataSize(smallPagesSpillThresholdSize, Unit.BYTE),
succinctBytes(Integer.MAX_VALUE),
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testSpillerFailure()
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
true,
succinctBytes(8),
succinctBytes(Integer.MAX_VALUE),
Expand Down

0 comments on commit f8c51c3

Please sign in to comment.