Skip to content

Commit

Permalink
Use WorkProcessor instead of Iterators in hash aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Apr 3, 2018
1 parent babca20 commit 06719a2
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 117 deletions.
Expand Up @@ -32,7 +32,6 @@
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand Down Expand Up @@ -271,7 +270,7 @@ public OperatorFactory duplicate()
private final HashCollisionsCounter hashCollisionsCounter; private final HashCollisionsCounter hashCollisionsCounter;


private HashAggregationBuilder aggregationBuilder; private HashAggregationBuilder aggregationBuilder;
private Iterator<Page> outputIterator; private WorkProcessor<Page> outputPages;
private boolean inputProcessed; private boolean inputProcessed;
private boolean finishing; private boolean finishing;
private boolean finished; private boolean finished;
Expand Down Expand Up @@ -349,7 +348,7 @@ public boolean isFinished()
@Override @Override
public boolean needsInput() public boolean needsInput()
{ {
if (finishing || outputIterator != null) { if (finishing || outputPages != null) {
return false; return false;
} }
else if (aggregationBuilder != null && aggregationBuilder.isFull()) { else if (aggregationBuilder != null && aggregationBuilder.isFull()) {
Expand Down Expand Up @@ -456,10 +455,7 @@ public Page getOutput()
unfinishedWork = null; unfinishedWork = null;
} }


if (outputIterator == null) { if (outputPages == null) {
// current output iterator is done
outputIterator = null;

if (finishing) { if (finishing) {
if (!inputProcessed && produceDefaultOutput) { if (!inputProcessed && produceDefaultOutput) {
// global aggregations always generate an output row with the default aggregation output (e.g. 0 for COUNT, NULL for SUM) // global aggregations always generate an output row with the default aggregation output (e.g. 0 for COUNT, NULL for SUM)
Expand All @@ -478,20 +474,19 @@ public Page getOutput()
return null; return null;
} }


outputIterator = aggregationBuilder.buildResult(); outputPages = aggregationBuilder.buildResult();
}


if (!outputIterator.hasNext()) { if (!outputPages.process()) {
// current output iterator is done return null;
closeAggregationBuilder();
return null;
}
} }


Page output = outputIterator.next(); if (outputPages.isFinished()) {
if (!outputIterator.hasNext()) {
closeAggregationBuilder(); closeAggregationBuilder();
return null;
} }
return output;
return outputPages.getResult();
} }


@Override @Override
Expand All @@ -508,7 +503,7 @@ public HashAggregationBuilder getAggregationBuilder()


private void closeAggregationBuilder() private void closeAggregationBuilder()
{ {
outputIterator = null; outputPages = null;
if (aggregationBuilder != null) { if (aggregationBuilder != null) {
aggregationBuilder.recordHashCollisions(hashCollisionsCounter); aggregationBuilder.recordHashCollisions(hashCollisionsCounter);
aggregationBuilder.close(); aggregationBuilder.close();
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.util.MergeSortedPages.PageWithPosition; import com.facebook.presto.util.MergeSortedPages.PageWithPosition;


import java.io.Closeable; import java.io.Closeable;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.function.BiPredicate; import java.util.function.BiPredicate;
import java.util.stream.IntStream; import java.util.stream.IntStream;
Expand Down Expand Up @@ -48,20 +47,18 @@ public MergeHashSort(AggregatedMemoryContext memoryContext)
/** /**
* Rows with same hash value are guaranteed to be in the same result page. * Rows with same hash value are guaranteed to be in the same result page.
*/ */
public Iterator<Page> merge(List<Type> keyTypes, List<Type> allTypes, List<Iterator<Page>> channels) public WorkProcessor<Page> merge(List<Type> keyTypes, List<Type> allTypes, List<WorkProcessor<Page>> channels, DriverYieldSignal driverYieldSignal)
{ {
InterpretedHashGenerator hashGenerator = createHashGenerator(keyTypes); InterpretedHashGenerator hashGenerator = createHashGenerator(keyTypes);
return mergeSortedPages( return mergeSortedPages(
channels.stream() channels,
.map(WorkProcessor::fromIterator)
.collect(toImmutableList()),
createHashPageWithPositionComparator(hashGenerator), createHashPageWithPositionComparator(hashGenerator),
IntStream.range(0, allTypes.size()).boxed().collect(toImmutableList()), IntStream.range(0, allTypes.size()).boxed().collect(toImmutableList()),
allTypes, allTypes,
keepSameHashValuesWithinSinglePage(hashGenerator), keepSameHashValuesWithinSinglePage(hashGenerator),
true, true,
memoryContext, memoryContext,
new DriverYieldSignal()).iterator(); driverYieldSignal);
} }


@Override @Override
Expand Down
Expand Up @@ -15,17 +15,16 @@


import com.facebook.presto.operator.HashCollisionsCounter; import com.facebook.presto.operator.HashCollisionsCounter;
import com.facebook.presto.operator.Work; import com.facebook.presto.operator.Work;
import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;


import java.util.Iterator;

public interface HashAggregationBuilder public interface HashAggregationBuilder
extends AutoCloseable extends AutoCloseable
{ {
Work<?> processPage(Page page); Work<?> processPage(Page page);


Iterator<Page> buildResult(); WorkProcessor<Page> buildResult();


boolean isFull(); boolean isFull();


Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.facebook.presto.operator.TransformWork; import com.facebook.presto.operator.TransformWork;
import com.facebook.presto.operator.UpdateMemory; import com.facebook.presto.operator.UpdateMemory;
import com.facebook.presto.operator.Work; import com.facebook.presto.operator.Work;
import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.operator.WorkProcessor.ProcessorState;
import com.facebook.presto.operator.aggregation.AccumulatorFactory; import com.facebook.presto.operator.aggregation.AccumulatorFactory;
import com.facebook.presto.operator.aggregation.GroupedAccumulator; import com.facebook.presto.operator.aggregation.GroupedAccumulator;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
Expand All @@ -32,7 +34,6 @@
import com.facebook.presto.sql.planner.plan.AggregationNode; import com.facebook.presto.sql.planner.plan.AggregationNode;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step; import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -42,7 +43,6 @@
import it.unimi.dsi.fastutil.ints.IntIterators; import it.unimi.dsi.fastutil.ints.IntIterators;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;


Expand Down Expand Up @@ -250,15 +250,15 @@ public long getGroupCount()
} }


@Override @Override
public Iterator<Page> buildResult() public WorkProcessor<Page> buildResult()
{ {
for (Aggregator aggregator : aggregators) { for (Aggregator aggregator : aggregators) {
aggregator.prepareFinal(); aggregator.prepareFinal();
} }
return buildResult(consecutiveGroupIds()); return buildResult(consecutiveGroupIds());
} }


public Iterator<Page> buildHashSortedResult() public WorkProcessor<Page> buildHashSortedResult()
{ {
return buildResult(hashSortedGroupIds()); return buildResult(hashSortedGroupIds());
} }
Expand All @@ -278,37 +278,32 @@ public int getCapacity()
return groupByHash.getCapacity(); return groupByHash.getCapacity();
} }


private Iterator<Page> buildResult(IntIterator groupIds) private WorkProcessor<Page> buildResult(IntIterator groupIds)
{ {
final PageBuilder pageBuilder = new PageBuilder(buildTypes()); final PageBuilder pageBuilder = new PageBuilder(buildTypes());
return new AbstractIterator<Page>() return WorkProcessor.create(() -> {
{ if (!groupIds.hasNext()) {
@Override return ProcessorState.finished();
protected Page computeNext() }
{
if (!groupIds.hasNext()) {
return endOfData();
}


pageBuilder.reset(); pageBuilder.reset();


List<Type> types = groupByHash.getTypes(); List<Type> types = groupByHash.getTypes();
while (!pageBuilder.isFull() && groupIds.hasNext()) { while (!pageBuilder.isFull() && groupIds.hasNext()) {
int groupId = groupIds.nextInt(); int groupId = groupIds.nextInt();


groupByHash.appendValuesTo(groupId, pageBuilder, 0); groupByHash.appendValuesTo(groupId, pageBuilder, 0);


pageBuilder.declarePosition(); pageBuilder.declarePosition();
for (int i = 0; i < aggregators.size(); i++) { for (int i = 0; i < aggregators.size(); i++) {
Aggregator aggregator = aggregators.get(i); Aggregator aggregator = aggregators.get(i);
BlockBuilder output = pageBuilder.getBlockBuilder(types.size() + i); BlockBuilder output = pageBuilder.getBlockBuilder(types.size() + i);
aggregator.evaluate(groupId, output); aggregator.evaluate(groupId, output);
}
} }

return pageBuilder.build();
} }
};
return ProcessorState.ofResult(pageBuilder.build());
});
} }


public List<Type> buildTypes() public List<Type> buildTypes()
Expand Down
Expand Up @@ -15,6 +15,9 @@


import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.operator.WorkProcessor.ProcessorState;
import com.facebook.presto.operator.WorkProcessor.Transformation;
import com.facebook.presto.operator.aggregation.AccumulatorFactory; import com.facebook.presto.operator.aggregation.AccumulatorFactory;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
Expand All @@ -24,8 +27,6 @@
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import java.io.Closeable; import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;


Expand All @@ -40,7 +41,7 @@ public class MergingHashAggregationBuilder
private final ImmutableList<Integer> groupByPartialChannels; private final ImmutableList<Integer> groupByPartialChannels;
private final Optional<Integer> hashChannel; private final Optional<Integer> hashChannel;
private final OperatorContext operatorContext; private final OperatorContext operatorContext;
private final Iterator<Page> sortedPages; private final WorkProcessor<Page> sortedPages;
private InMemoryHashAggregationBuilder hashAggregationBuilder; private InMemoryHashAggregationBuilder hashAggregationBuilder;
private final List<Type> groupByTypes; private final List<Type> groupByTypes;
private final LocalMemoryContext systemMemoryContext; private final LocalMemoryContext systemMemoryContext;
Expand All @@ -55,7 +56,7 @@ public MergingHashAggregationBuilder(
List<Type> groupByTypes, List<Type> groupByTypes,
Optional<Integer> hashChannel, Optional<Integer> hashChannel,
OperatorContext operatorContext, OperatorContext operatorContext,
Iterator<Page> sortedPages, WorkProcessor<Page> sortedPages,
LocalMemoryContext systemMemoryContext, LocalMemoryContext systemMemoryContext,
long memoryLimitForMerge, long memoryLimitForMerge,
int overwriteIntermediateChannelOffset, int overwriteIntermediateChannelOffset,
Expand All @@ -82,40 +83,44 @@ public MergingHashAggregationBuilder(
rebuildHashAggregationBuilder(); rebuildHashAggregationBuilder();
} }


public Iterator<Page> buildResult() public WorkProcessor<Page> buildResult()
{ {
return new Iterator<Page>() return sortedPages.flatTransform(new Transformation<Page, WorkProcessor<Page>>()
{ {
private Iterator<Page> resultPages = Collections.emptyIterator(); boolean reset = true;
long memorySize;


@Override public ProcessorState<WorkProcessor<Page>> process(Optional<Page> inputPageOptional)
public boolean hasNext()
{ {
return sortedPages.hasNext() || resultPages.hasNext(); if (reset) {
}

@Override
public Page next()
{
if (!resultPages.hasNext()) {
rebuildHashAggregationBuilder(); rebuildHashAggregationBuilder();
long memorySize = 0; // ensure that at least one merged page will be processed memorySize = 0;

reset = false;
// we can produce output after every page, because sortedPages does not have }
// hash values that span multiple pages (guaranteed by MergeHashSort)
while (sortedPages.hasNext() && !shouldProduceOutput(memorySize)) { boolean finished = !inputPageOptional.isPresent();
boolean done = hashAggregationBuilder.processPage(sortedPages.next()).process(); if (finished && memorySize == 0) {
// TODO: this class does not yield wrt memory limit; enable it // no more pages and aggregation builder is empty
verify(done); return ProcessorState.finished();
memorySize = hashAggregationBuilder.getSizeInMemory(); }
systemMemoryContext.setBytes(memorySize);
if (!finished) {
Page inputPage = inputPageOptional.get();
boolean done = hashAggregationBuilder.processPage(inputPage).process();
// TODO: this class does not yield wrt memory limit; enable it
verify(done);
memorySize = hashAggregationBuilder.getSizeInMemory();
systemMemoryContext.setBytes(memorySize);

if (!shouldProduceOutput(memorySize)) {
return ProcessorState.needsMoreData();
} }
resultPages = hashAggregationBuilder.buildResult();
} }


return resultPages.next(); reset = true;
return ProcessorState.ofResult(hashAggregationBuilder.buildResult());
} }
}; });
} }


@Override @Override
Expand Down

0 comments on commit 06719a2

Please sign in to comment.