Skip to content

Commit

Permalink
Implement DISTINCT in aggregation operator
Browse files Browse the repository at this point in the history
This makes it possible to execute DISTINCT aggregation queries
without relying on the MarkDistinct operator, which requires
one shuffle for each unique combination of DISTINCT arguments
to aggregations.

A new config option (optimizer.use-mark-distinct) and session property
(use_mark_distinct) control whether the MultipleDistinctToMarkDistict
optimizer fires.

It's worth noting that when the MarkDistinct optimization is disabled
and aggregations contain DISTINCT inputs, aggregations will be planned
as SINGLE-step. A future improvement could introduce a partial
aggregation step that produces deduped subsets as intermediates.
  • Loading branch information
martint committed Mar 27, 2018
1 parent 3d4d6ca commit 78f8146
Show file tree
Hide file tree
Showing 18 changed files with 411 additions and 72 deletions.
Expand Up @@ -87,6 +87,7 @@ public final class SystemSessionProperties
public static final String FORCE_SINGLE_NODE_OUTPUT = "force_single_node_output";
public static final String FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_SIZE = "filter_and_project_min_output_page_size";
public static final String FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT = "filter_and_project_min_output_page_row_count";
public static final String USE_MARK_DISTINCT = "use_mark_distinct";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -388,6 +389,11 @@ public SystemSessionProperties(
FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT,
"Experimental: Minimum output page row count for filter and project operators",
featuresConfig.getFilterAndProjectMinOutputPageRowCount(),
false),
booleanSessionProperty(
USE_MARK_DISTINCT,
"Implement DISTINCT aggregations using MarkDistinct",
featuresConfig.isUseMarkDistinct(),
false));
}

Expand Down Expand Up @@ -638,6 +644,11 @@ public static int getFilterAndProjectMinOutputPageRowCount(Session session)
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class);
}

public static boolean useMarkDistinct(Session session)
{
return session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class);
}

private static int validateValueIsPowerOfTwo(Object value, String property)
{
int intValue = ((Number) requireNonNull(value, "value is null")).intValue();
Expand Down
Expand Up @@ -369,8 +369,8 @@ public void addInput(Page page)
inputProcessed = true;

if (aggregationBuilder == null) {
// TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause because ORDER BY is not yet implemented for spilling.
if (step.isOutputPartial() || !spillEnabled || hasOrderBy()) {
// TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause or DISTINCT because they are not yet implemented for spilling.
if (step.isOutputPartial() || !spillEnabled || hasOrderBy() || hasDistinct()) {
aggregationBuilder = new InMemoryHashAggregationBuilder(
accumulatorFactories,
step,
Expand Down Expand Up @@ -417,6 +417,11 @@ private boolean hasOrderBy()
return accumulatorFactories.stream().anyMatch(AccumulatorFactory::hasOrderBy);
}

private boolean hasDistinct()
{
return accumulatorFactories.stream().anyMatch(AccumulatorFactory::hasDistinct);
}

@Override
public ListenableFuture<?> startMemoryRevoke()
{
Expand Down
Expand Up @@ -28,4 +28,6 @@ public interface AccumulatorFactory
GroupedAccumulator createGroupedIntermediateAccumulator();

boolean hasOrderBy();

boolean hasDistinct();
}
Expand Up @@ -13,14 +13,16 @@
*/
package com.facebook.presto.operator.aggregation;

import com.facebook.presto.Session;
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.spi.block.SortOrder;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.gen.JoinCompiler;

import java.util.List;
import java.util.Optional;

public interface AccumulatorFactoryBinder
{
AccumulatorFactory bind(List<Integer> argumentChannels, Optional<Integer> maskChannel, List<Type> sourceTypes, List<Integer> orderByChannels, List<SortOrder> orderings, PagesIndex.Factory pagesIndexFactory);
AccumulatorFactory bind(List<Integer> argumentChannels, Optional<Integer> maskChannel, List<Type> sourceTypes, List<Integer> orderByChannels, List<SortOrder> orderings, PagesIndex.Factory pagesIndexFactory, boolean distinct, JoinCompiler joinCompiler, Session session);
}

0 comments on commit 78f8146

Please sign in to comment.