Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flat hash aggregation #207

Merged
merged 20 commits into from
Dec 19, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ public boolean isFull()
return sliceOutput.size() > maxBlockSize;
}

public int size()
{
return sliceOutput.size();
}

public int writableBytes()
{
return maxBlockSize - sliceOutput.size();
}

public BlockBuilder append(long value)
{
tupleBuilder.append(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

public class QueryManagerConfig
{
private boolean importsEnabled = true;
private DataSize maxOperatorMemoryUsage = new DataSize(256, Unit.MEGABYTE);
private int maxNumberOfGroups = 1_000_000;

public boolean isImportsEnabled()
{
Expand All @@ -37,17 +35,4 @@ public QueryManagerConfig setMaxOperatorMemoryUsage(DataSize maxOperatorMemoryUs
this.maxOperatorMemoryUsage = maxOperatorMemoryUsage;
return this;
}

@Min(1)
public int getMaxNumberOfGroups()
{
return maxNumberOfGroups;
}

@Config("query.group-by.max-group-count")
public QueryManagerConfig setMaxNumberOfGroups(int maxNumberOfGroups)
{
this.maxNumberOfGroups = maxNumberOfGroups;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class SqlTaskExecution
private final PlanFragment fragment;
private final Metadata metadata;
private final DataSize maxOperatorMemoryUsage;
private final int maxNumberOfGroups;

public SqlTaskExecution(String queryId,
String stageId,
Expand All @@ -60,8 +59,7 @@ public SqlTaskExecution(String queryId,
PlanFragmentSourceProvider sourceProvider,
Metadata metadata,
FairBatchExecutor shardExecutor,
DataSize maxOperatorMemoryUsage,
int maxNumberOfGroups)
DataSize maxOperatorMemoryUsage)
{
Preconditions.checkNotNull(queryId, "queryId is null");
Preconditions.checkNotNull(stageId, "stageId is null");
Expand All @@ -85,7 +83,6 @@ public SqlTaskExecution(String queryId,
this.shardExecutor = shardExecutor;
this.metadata = metadata;
this.maxOperatorMemoryUsage = maxOperatorMemoryUsage;
this.maxNumberOfGroups = maxNumberOfGroups;

// create output buffers
this.taskOutput = new TaskOutput(queryId, stageId, taskId, location, outputIds, pageBufferMax, splits.size());
Expand All @@ -111,7 +108,7 @@ public void run()
final SourceHashProviderFactory sourceHashProviderFactory = new SourceHashProviderFactory(maxOperatorMemoryUsage);
if (splits.size() <= 1) {
PlanFragmentSource split = splits.isEmpty() ? null : splits.get(0);
SplitWorker worker = new SplitWorker(taskOutput, fragment, split, exchangeSources, sourceHashProviderFactory, sourceProvider, metadata, maxOperatorMemoryUsage, maxNumberOfGroups);
SplitWorker worker = new SplitWorker(taskOutput, fragment, split, exchangeSources, sourceHashProviderFactory, sourceProvider, metadata, maxOperatorMemoryUsage);
worker.call();
}
else {
Expand All @@ -120,7 +117,7 @@ public void run()
@Override
public Callable<Void> apply(PlanFragmentSource split)
{
return new SplitWorker(taskOutput, fragment, split, exchangeSources, sourceHashProviderFactory, sourceProvider, metadata, maxOperatorMemoryUsage, maxNumberOfGroups);
return new SplitWorker(taskOutput, fragment, split, exchangeSources, sourceHashProviderFactory, sourceProvider, metadata, maxOperatorMemoryUsage);
}
}));

Expand Down Expand Up @@ -216,8 +213,7 @@ private SplitWorker(TaskOutput taskOutput,
SourceHashProviderFactory sourceHashProviderFactory,
PlanFragmentSourceProvider sourceProvider,
Metadata metadata,
DataSize maxOperatorMemoryUsage,
int maxNumberOfGroups)
DataSize maxOperatorMemoryUsage)
{
this.taskOutput = taskOutput;

Expand All @@ -229,8 +225,8 @@ private SplitWorker(TaskOutput taskOutput,
exchangeSources,
operatorStats,
sourceHashProviderFactory,
maxOperatorMemoryUsage,
maxNumberOfGroups);
maxOperatorMemoryUsage
);

operator = planner.plan(fragment.getRoot());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class SqlTaskManager
private final DataSize maxOperatorMemoryUsage;

private final ConcurrentMap<String, TaskExecution> tasks = new ConcurrentHashMap<>();
private final int maxNumberOfGroups;

@Inject
public SqlTaskManager(
Expand All @@ -66,7 +65,6 @@ public SqlTaskManager(
this.httpServerInfo = httpServerInfo;
this.pageBufferMax = 20;
this.maxOperatorMemoryUsage = config.getMaxOperatorMemoryUsage();
this.maxNumberOfGroups = config.getMaxNumberOfGroups();

int processors = Runtime.getRuntime().availableProcessors();
taskExecutor = new ThreadPoolExecutor(1000,
Expand Down Expand Up @@ -140,8 +138,8 @@ public TaskInfo createTask(String queryId,
sourceProvider,
metadata,
shardExecutor,
maxOperatorMemoryUsage,
maxNumberOfGroups);
maxOperatorMemoryUsage
);

taskExecutor.submit(new TaskStarter(taskExecution));

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package com.facebook.presto.metadata;

import com.facebook.presto.operator.AggregationFunctionDefinition;
import com.facebook.presto.operator.aggregation.AggregationFunction;
import com.facebook.presto.operator.aggregation.Input;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.tuple.TupleInfo;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;

import javax.annotation.Nullable;
import javax.inject.Provider;
import java.util.List;

import static com.facebook.presto.operator.AggregationFunctionDefinition.aggregation;

public class FunctionInfo
{
private final int id;
Expand All @@ -20,16 +22,16 @@ public class FunctionInfo

private final boolean isAggregate;
private final TupleInfo.Type intermediateType;
private final FunctionBinder binder;
private final AggregationFunction function;

public FunctionInfo(int id, QualifiedName name, TupleInfo.Type returnType, List<TupleInfo.Type> argumentTypes, TupleInfo.Type intermediateType, FunctionBinder binder)
public FunctionInfo(int id, QualifiedName name, TupleInfo.Type returnType, List<TupleInfo.Type> argumentTypes, TupleInfo.Type intermediateType, AggregationFunction function)
{
this.id = id;
this.name = name;
this.returnType = returnType;
this.argumentTypes = argumentTypes;
this.intermediateType = intermediateType;
this.binder = binder;
this.function = function;
this.isAggregate = true;
}

Expand All @@ -42,7 +44,7 @@ public FunctionInfo(int id, QualifiedName name, TupleInfo.Type returnType, List<

this.isAggregate = false;
this.intermediateType = null;
this.binder = null;
this.function = null;
}

public FunctionHandle getHandle()
Expand Down Expand Up @@ -75,9 +77,18 @@ public TupleInfo.Type getIntermediateType()
return intermediateType;
}

public Provider<AggregationFunction> bind(List<Input> inputs)
public AggregationFunctionDefinition bind(List<Input> inputs)
{
return binder.bind(inputs);
if (inputs.isEmpty()) {
return aggregation(function, -1);
}
else {
Preconditions.checkArgument(inputs.size() == 1, "expected at most one input");
Input input = inputs.get(0);
// todo remove this assumption that field is 0 when we add field support
Preconditions.checkArgument(input.getField() == 0, "expected field to be 0");
return aggregation(function, input.getChannel());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package com.facebook.presto.metadata;

import com.facebook.presto.operator.aggregation.CountAggregation;
import com.facebook.presto.operator.aggregation.DoubleAverageAggregation;
import com.facebook.presto.operator.aggregation.DoubleMaxAggregation;
import com.facebook.presto.operator.aggregation.DoubleMinAggregation;
import com.facebook.presto.operator.aggregation.DoubleSumAggregation;
import com.facebook.presto.operator.aggregation.LongAverageAggregation;
import com.facebook.presto.operator.aggregation.LongMaxAggregation;
import com.facebook.presto.operator.aggregation.LongMinAggregation;
import com.facebook.presto.operator.aggregation.LongSumAggregation;
import com.facebook.presto.operator.aggregation.VarBinaryMaxAggregation;
import com.facebook.presto.operator.aggregation.VarBinaryMinAggregation;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.tuple.TupleInfo;
import com.google.common.base.Joiner;
Expand All @@ -22,6 +11,17 @@
import java.util.List;
import java.util.Map;

import static com.facebook.presto.operator.aggregation.CountAggregation.COUNT;
import static com.facebook.presto.operator.aggregation.DoubleAverageAggregation.DOUBLE_AVERAGE;
import static com.facebook.presto.operator.aggregation.DoubleMaxAggregation.DOUBLE_MAX;
import static com.facebook.presto.operator.aggregation.DoubleMinAggregation.DOUBLE_MIN;
import static com.facebook.presto.operator.aggregation.DoubleSumAggregation.DOUBLE_SUM;
import static com.facebook.presto.operator.aggregation.LongAverageAggregation.LONG_AVERAGE;
import static com.facebook.presto.operator.aggregation.LongMaxAggregation.LONG_MAX;
import static com.facebook.presto.operator.aggregation.LongMinAggregation.LONG_MIN;
import static com.facebook.presto.operator.aggregation.LongSumAggregation.LONG_SUM;
import static com.facebook.presto.operator.aggregation.VarBinaryMaxAggregation.VAR_BINARY_MAX;
import static com.facebook.presto.operator.aggregation.VarBinaryMinAggregation.VAR_BINARY_MIN;
import static com.facebook.presto.tuple.TupleInfo.Type.DOUBLE;
import static com.facebook.presto.tuple.TupleInfo.Type.FIXED_INT_64;
import static com.facebook.presto.tuple.TupleInfo.Type.VARIABLE_BINARY;
Expand All @@ -35,17 +35,17 @@ public class FunctionRegistry
public FunctionRegistry()
{
List<FunctionInfo> functions = ImmutableList.of(
new FunctionInfo(1, QualifiedName.of("count"), FIXED_INT_64, ImmutableList.<TupleInfo.Type>of(), FIXED_INT_64, CountAggregation.BINDER),
new FunctionInfo(2, QualifiedName.of("sum"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LongSumAggregation.BINDER),
new FunctionInfo(3, QualifiedName.of("sum"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DoubleSumAggregation.BINDER),
new FunctionInfo(4, QualifiedName.of("avg"), DOUBLE, ImmutableList.of(DOUBLE), VARIABLE_BINARY, DoubleAverageAggregation.BINDER),
new FunctionInfo(5, QualifiedName.of("avg"), DOUBLE, ImmutableList.of(FIXED_INT_64), VARIABLE_BINARY, LongAverageAggregation.BINDER),
new FunctionInfo(6, QualifiedName.of("max"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LongMaxAggregation.BINDER),
new FunctionInfo(7, QualifiedName.of("max"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DoubleMaxAggregation.BINDER),
new FunctionInfo(8, QualifiedName.of("max"), VARIABLE_BINARY, ImmutableList.of(VARIABLE_BINARY), VARIABLE_BINARY, VarBinaryMaxAggregation.BINDER),
new FunctionInfo(9, QualifiedName.of("min"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LongMinAggregation.BINDER),
new FunctionInfo(10, QualifiedName.of("min"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DoubleMinAggregation.BINDER),
new FunctionInfo(11, QualifiedName.of("min"), VARIABLE_BINARY, ImmutableList.of(VARIABLE_BINARY), VARIABLE_BINARY, VarBinaryMinAggregation.BINDER)
new FunctionInfo(1, QualifiedName.of("count"), FIXED_INT_64, ImmutableList.<TupleInfo.Type>of(), FIXED_INT_64, COUNT),
new FunctionInfo(2, QualifiedName.of("sum"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LONG_SUM),
new FunctionInfo(3, QualifiedName.of("sum"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DOUBLE_SUM),
new FunctionInfo(4, QualifiedName.of("avg"), DOUBLE, ImmutableList.of(DOUBLE), VARIABLE_BINARY, DOUBLE_AVERAGE),
new FunctionInfo(5, QualifiedName.of("avg"), DOUBLE, ImmutableList.of(FIXED_INT_64), VARIABLE_BINARY, LONG_AVERAGE),
new FunctionInfo(6, QualifiedName.of("max"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LONG_MAX),
new FunctionInfo(7, QualifiedName.of("max"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DOUBLE_MAX),
new FunctionInfo(8, QualifiedName.of("max"), VARIABLE_BINARY, ImmutableList.of(VARIABLE_BINARY), VARIABLE_BINARY, VAR_BINARY_MAX),
new FunctionInfo(9, QualifiedName.of("min"), FIXED_INT_64, ImmutableList.of(FIXED_INT_64), FIXED_INT_64, LONG_MIN),
new FunctionInfo(10, QualifiedName.of("min"), DOUBLE, ImmutableList.of(DOUBLE), DOUBLE, DOUBLE_MIN),
new FunctionInfo(11, QualifiedName.of("min"), VARIABLE_BINARY, ImmutableList.of(VARIABLE_BINARY), VARIABLE_BINARY, VAR_BINARY_MIN)
);

functionsByName = Multimaps.index(functions, FunctionInfo.nameGetter());
Expand All @@ -67,5 +67,4 @@ public FunctionInfo get(FunctionHandle handle)
{
return functionsByHandle.get(handle);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2004-present Facebook. All Rights Reserved.
*/
package com.facebook.presto.operator;

import com.facebook.presto.operator.aggregation.AggregationFunction;
import com.google.common.base.Preconditions;

public class AggregationFunctionDefinition
{
public static AggregationFunctionDefinition aggregation(AggregationFunction function, int channel)
{
Preconditions.checkNotNull(function, "function is null");
return new AggregationFunctionDefinition(function, channel);
}

private final AggregationFunction function;
private final int channel;

AggregationFunctionDefinition(AggregationFunction function, int channel)
{
this.function = function;
this.channel = channel;
}

public AggregationFunction getFunction()
{
return function;
}

public int getChannel()
{
return channel;
}
}