Skip to content

Commit

Permalink
Add WorkProcessorOperatorAdapter and use it in TopNOperatorFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Jun 3, 2019
1 parent 1dc8599 commit 005b7b7
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 326 deletions.
114 changes: 72 additions & 42 deletions presto-main/src/main/java/io/prestosql/operator/TopNOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import com.google.common.collect.ImmutableList;
import io.prestosql.Session;
import io.prestosql.memory.context.MemoryTrackingContext;
import io.prestosql.operator.WorkProcessor.TransformationState;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.prestosql.operator.WorkProcessorOperatorAdapter.PageBuffer;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.SortOrder;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
Expand All @@ -30,10 +34,10 @@
* Returns the top N rows from the source sorted according to the specified ordering in the keyChannelIndex channel.
*/
public class TopNOperator
implements Operator
implements WorkProcessorOperator
{
public static class TopNOperatorFactory
implements OperatorFactory, WorkProcessorOperatorFactory
implements OperatorFactory, AdapterWorkProcessorOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
Expand Down Expand Up @@ -82,12 +86,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, getOperatorType());
return new TopNOperator(
operatorContext,
sourceTypes,
n,
sortChannels,
sortOrders);
return new WorkProcessorOperatorAdapter(operatorContext, this);
}

@Override
Expand All @@ -109,72 +108,103 @@ public WorkProcessorOperator create(
DriverYieldSignal yieldSignal,
WorkProcessor<Page> sourcePages)
{
return new TopNWorkProcessorOperator(memoryTrackingContext, sourcePages, sourceTypes, n, sortChannels, sortOrders);
return new TopNOperator(
memoryTrackingContext,
sourcePages,
sourceTypes,
n,
sortChannels,
sortOrders,
Optional.empty());
}

@Override
public WorkProcessorOperator create(
Session session,
MemoryTrackingContext memoryTrackingContext,
DriverYieldSignal yieldSignal,
PageBuffer sourcePageBuffer)
{
return new TopNOperator(
memoryTrackingContext,
sourcePageBuffer.pages(),
sourceTypes,
n,
sortChannels,
sortOrders,
Optional.of(sourcePageBuffer));
}
}

private final OperatorContext operatorContext;
private final TopNProcessor topNProcessor;
private boolean finishing;
private final WorkProcessor<Page> pages;

public TopNOperator(
OperatorContext operatorContext,
MemoryTrackingContext memoryTrackingContext,
WorkProcessor<Page> sourcePages,
List<Type> types,
int n,
List<Integer> sortChannels,
List<SortOrder> sortOrders)
List<SortOrder> sortOrders,
Optional<PageBuffer> pageBuffer)
{
this.operatorContext = operatorContext;
this.topNProcessor = new TopNProcessor(
requireNonNull(operatorContext, "operatorContext is null").aggregateUserMemoryContext(),
requireNonNull(memoryTrackingContext, "memoryTrackingContext is null").aggregateUserMemoryContext(),
types,
n,
sortChannels,
sortOrders);

if (n == 0) {
finishing = true;
pages = WorkProcessor.of();
}
else {
pages = sourcePages.transform(new TopNPages());
}
}

@Override
public OperatorContext getOperatorContext()
{
return operatorContext;
pageBuffer.ifPresent(buffer -> buffer.setAddPageListener(() -> {
addPage(buffer.poll());
}));
}

@Override
public void finish()
public WorkProcessor<Page> getOutputPages()
{
finishing = true;
return pages;
}

@Override
public boolean isFinished()
{
return finishing && topNProcessor.noMoreOutput();
}
public void close()
throws Exception
{}

@Override
public boolean needsInput()
private void addPage(Page page)
{
return !finishing && !topNProcessor.noMoreOutput();
}

@Override
public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
topNProcessor.addInput(page);
}

@Override
public Page getOutput()
private class TopNPages
implements WorkProcessor.Transformation<Page, Page>
{
if (!finishing || topNProcessor.noMoreOutput()) {
return null;
@Override
public TransformationState<Page> process(Page inputPage)
{
if (inputPage != null) {
addPage(inputPage);
return TransformationState.needsMoreData();
}

// no more input, return results
Page page = null;
while (page == null && !topNProcessor.noMoreOutput()) {
page = topNProcessor.getOutput();
}

if (page != null) {
return TransformationState.ofResult(page, false);
}

return TransformationState.finished();
}

return topNProcessor.getOutput();
}
}

This file was deleted.

Loading

0 comments on commit 005b7b7

Please sign in to comment.