Skip to content

Commit

Permalink
Create a new PageProcessor instance for each operator
Browse files Browse the repository at this point in the history
PageProcessor was created once and used by all operators created by the
OperatorFactory. 22e78f5 introduced state in PageProcessor, so we
cannot reuse PageProcessors between operators. Change it so that the
operator factory gets a supplier for PageProcessor which creates a new
PageProcessor for every operator.
  • Loading branch information
nileema committed Jan 26, 2016
1 parent e42e1b9 commit 35534e4
Show file tree
Hide file tree
Showing 17 changed files with 46 additions and 38 deletions.
Expand Up @@ -57,7 +57,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
// and quantity < 24;
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "extendedprice", "discount", "shipdate", "quantity");

FilterAndProjectOperator.FilterAndProjectOperatorFactory tpchQuery6Operator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(1, new PlanNodeId("test"), new TpchQuery6Processor(), ImmutableList.<Type>of(DOUBLE));
FilterAndProjectOperator.FilterAndProjectOperatorFactory tpchQuery6Operator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(1, new PlanNodeId("test"), () -> new TpchQuery6Processor(), ImmutableList.<Type>of(DOUBLE));

AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(
2,
Expand Down
Expand Up @@ -45,7 +45,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
FilterAndProjectOperator.FilterAndProjectOperatorFactory filterAndProjectOperator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
1,
new PlanNodeId("test"),
new GenericPageProcessor(new DoubleFilter(50000.00), ImmutableList.of(singleColumn(DOUBLE, 0))),
() -> new GenericPageProcessor(new DoubleFilter(50000.00), ImmutableList.of(singleColumn(DOUBLE, 0))),
ImmutableList.<Type>of(DOUBLE));

return ImmutableList.of(tableScanOperator, filterAndProjectOperator);
Expand Down
Expand Up @@ -420,7 +420,7 @@ public SourceOperator newScanFilterAndProjectOperator(DriverContext driverContex
new PlanNodeId("0"),
(session, split, columnHandles) -> pageSource,
new GenericCursorProcessor(FilterFunctions.TRUE_FUNCTION, projections),
new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, projections),
() -> new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, projections),
columns.stream().map(columnHandle -> (ColumnHandle) columnHandle).collect(toList()),
types
);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.function.Supplier;

import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingDictionaryEnabled;
import static com.facebook.presto.SystemSessionProperties.isColumnarProcessingEnabled;
Expand Down Expand Up @@ -130,11 +131,11 @@ public static class FilterAndProjectOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private final PageProcessor processor;
private final Supplier<PageProcessor> processor;
private final List<Type> types;
private boolean closed;

public FilterAndProjectOperatorFactory(int operatorId, PlanNodeId planNodeId, PageProcessor processor, List<Type> types)
public FilterAndProjectOperatorFactory(int operatorId, PlanNodeId planNodeId, Supplier<PageProcessor> processor, List<Type> types)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -153,7 +154,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, FilterAndProjectOperator.class.getSimpleName());
return new FilterAndProjectOperator(operatorContext, types, processor);
return new FilterAndProjectOperator(operatorContext, types, processor.get());
}

@Override
Expand Down
Expand Up @@ -300,7 +300,7 @@ public static class ScanFilterAndProjectOperatorFactory
private final int operatorId;
private final PlanNodeId planNodeId;
private final CursorProcessor cursorProcessor;
private final PageProcessor pageProcessor;
private final Supplier<PageProcessor> pageProcessor;
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final List<ColumnHandle> columns;
Expand All @@ -313,7 +313,7 @@ public ScanFilterAndProjectOperatorFactory(
PlanNodeId sourceId,
PageSourceProvider pageSourceProvider,
CursorProcessor cursorProcessor,
PageProcessor pageProcessor,
Supplier<PageProcessor> pageProcessor,
Iterable<ColumnHandle> columns,
List<Type> types)
{
Expand Down Expand Up @@ -349,7 +349,7 @@ public SourceOperator createOperator(DriverContext driverContext)
sourceId,
pageSourceProvider,
cursorProcessor,
pageProcessor,
pageProcessor.get(),
columns,
types);
}
Expand Down
Expand Up @@ -14,13 +14,15 @@
package com.facebook.presto.operator.index;

import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.PageProcessor;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.function.Supplier;

import static com.facebook.presto.operator.FilterAndProjectOperator.FilterAndProjectOperatorFactory;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -54,7 +56,7 @@ public DynamicTupleFilterFactory(int filterOperatorId, PlanNodeId planNodeId, in
public OperatorFactory filterWithTuple(Page tuplePage)
{
Page normalizedTuplePage = normalizeTuplePage(tuplePage);
TupleFilterProcessor processor = new TupleFilterProcessor(normalizedTuplePage, outputTypes, outputFilterChannels);
Supplier<PageProcessor> processor = () -> new TupleFilterProcessor(normalizedTuplePage, outputTypes, outputFilterChannels);
return new FilterAndProjectOperatorFactory(filterOperatorId, planNodeId, processor, outputTypes);
}

Expand Down
Expand Up @@ -31,6 +31,7 @@

import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

import static com.facebook.presto.bytecode.Access.FINAL;
import static com.facebook.presto.bytecode.Access.PUBLIC;
Expand Down Expand Up @@ -91,15 +92,17 @@ public CursorProcessor compileCursorProcessor(RowExpression filter, List<RowExpr
}
}

public PageProcessor compilePageProcessor(RowExpression filter, List<RowExpression> projections)
public Supplier<PageProcessor> compilePageProcessor(RowExpression filter, List<RowExpression> projections)
{
try {
return pageProcessors.getUnchecked(new CacheKey(filter, projections, null))
.newInstance();
}
catch (ReflectiveOperationException e) {
throw Throwables.propagate(e);
}
Class<? extends PageProcessor> pageProcessor = pageProcessors.getUnchecked(new CacheKey(filter, projections, null));
return () -> {
try {
return pageProcessor.newInstance();
}
catch (ReflectiveOperationException e) {
throw Throwables.propagate(e);
}
};
}

private <T> Class<? extends T> compile(RowExpression filter, List<RowExpression> projections, BodyCompiler<T> bodyCompiler, Class<? extends T> superType)
Expand Down
Expand Up @@ -162,6 +162,7 @@
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.getTaskAggregationConcurrency;
Expand Down Expand Up @@ -1025,7 +1026,7 @@ private PhysicalOperation visitScanFilterAndProject(
try {
if (columns != null) {
CursorProcessor cursorProcessor = compiler.compileCursorProcessor(translatedFilter, translatedProjections, sourceNode.getId());
PageProcessor pageProcessor = compiler.compilePageProcessor(translatedFilter, translatedProjections);
Supplier<PageProcessor> pageProcessor = compiler.compilePageProcessor(translatedFilter, translatedProjections);

SourceOperatorFactory operatorFactory = new ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory(
context.getNextOperatorId(),
Expand All @@ -1040,7 +1041,7 @@ private PhysicalOperation visitScanFilterAndProject(
return new PhysicalOperation(operatorFactory, outputMappings);
}
else {
PageProcessor processor = compiler.compilePageProcessor(translatedFilter, translatedProjections);
Supplier<PageProcessor> processor = compiler.compilePageProcessor(translatedFilter, translatedProjections);

OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
context.getNextOperatorId(),
Expand Down Expand Up @@ -1097,7 +1098,7 @@ private PhysicalOperation visitScanFilterAndProject(
sourceNode.getId(),
pageSourceProvider,
new GenericCursorProcessor(filterFunction, projectionFunctions),
new GenericPageProcessor(filterFunction, projectionFunctions),
() -> new GenericPageProcessor(filterFunction, projectionFunctions),
columns,
toTypes(projectionFunctions));

Expand All @@ -1107,7 +1108,7 @@ private PhysicalOperation visitScanFilterAndProject(
OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
context.getNextOperatorId(),
planNodeId,
new GenericPageProcessor(filterFunction, projectionFunctions),
() -> new GenericPageProcessor(filterFunction, projectionFunctions),
toTypes(projectionFunctions));
return new PhysicalOperation(operatorFactory, outputMappings, source);
}
Expand Down
Expand Up @@ -676,7 +676,7 @@ public OperatorFactory createHashProjectOperator(int operatorId, PlanNodeId plan
return new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
operatorId,
planNodeId,
new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, projectionFunctions.build()),
() -> new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, projectionFunctions.build()),
ImmutableList.copyOf(Iterables.concat(columnTypes, ImmutableList.of(BIGINT))));
}

Expand Down
Expand Up @@ -89,7 +89,7 @@ public boolean filter(RecordCursor cursor)
OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
0,
new PlanNodeId("test"),
new GenericPageProcessor(filter, ImmutableList.of(singleColumn(VARCHAR, 0), new Add5Projection(1))),
() -> new GenericPageProcessor(filter, ImmutableList.of(singleColumn(VARCHAR, 0), new Add5Projection(1))),
ImmutableList.<Type>of(VARCHAR, BIGINT));

Operator operator = operatorFactory.createOperator(driverContext);
Expand Down
Expand Up @@ -72,7 +72,7 @@ public ConnectorPageSource createPageSource(Session session, Split split, List<C
}
},
new GenericCursorProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
() -> new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
ImmutableList.<ColumnHandle>of(),
ImmutableList.<Type>of(VARCHAR));

Expand Down Expand Up @@ -106,7 +106,7 @@ public ConnectorPageSource createPageSource(Session session, Split split, List<C
}
},
new GenericCursorProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
() -> new GenericPageProcessor(FilterFunctions.TRUE_FUNCTION, ImmutableList.of(singleColumn(VARCHAR, 0))),
ImmutableList.<ColumnHandle>of(),
ImmutableList.<Type>of(VARCHAR));

Expand Down
Expand Up @@ -123,7 +123,7 @@ public void setup()
}

ImmutableList<RowExpression> projections = projectionsBuilder.build();
pageProcessor = compiler.compilePageProcessor(new ConstantExpression(true, BooleanType.BOOLEAN), projections);
pageProcessor = compiler.compilePageProcessor(new ConstantExpression(true, BooleanType.BOOLEAN), projections).get();
pageBuilder = new PageBuilder(projections.stream().map(RowExpression::getType).collect(Collectors.toList()));
page = new Page(blocks);
}
Expand Down
Expand Up @@ -81,6 +81,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.block.BlockAssertions.createBooleansBlock;
Expand Down Expand Up @@ -543,7 +544,7 @@ private Operator interpretedFilterProject(Expression filter, Expression projecti
session
);

OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(0, new PlanNodeId("test"), new GenericPageProcessor(filterFunction, ImmutableList.of(projectionFunction)), toTypes(
OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(0, new PlanNodeId("test"), () -> new GenericPageProcessor(filterFunction, ImmutableList.of(projectionFunction)), toTypes(
ImmutableList.of(projectionFunction)));
return operatorFactory.createOperator(createDriverContext(session));
}
Expand All @@ -555,7 +556,7 @@ private OperatorFactory compileFilterWithNoInputColumns(Expression filter, Expre
IdentityHashMap<Expression, Type> expressionTypes = getExpressionTypesFromInput(TEST_SESSION, metadata, SQL_PARSER, INPUT_TYPES, ImmutableList.of(filter));

try {
PageProcessor processor = compiler.compilePageProcessor(toRowExpression(filter, expressionTypes), ImmutableList.of());
Supplier<PageProcessor> processor = compiler.compilePageProcessor(toRowExpression(filter, expressionTypes), ImmutableList.of());

return new FilterAndProjectOperator.FilterAndProjectOperatorFactory(0, new PlanNodeId("test"), processor, ImmutableList.<Type>of());
}
Expand All @@ -577,7 +578,7 @@ private OperatorFactory compileFilterProject(Expression filter, Expression proje

try {
List<RowExpression> projections = ImmutableList.of(toRowExpression(projection, expressionTypes));
PageProcessor processor = compiler.compilePageProcessor(toRowExpression(filter, expressionTypes), projections);
Supplier<PageProcessor> processor = compiler.compilePageProcessor(toRowExpression(filter, expressionTypes), projections);

return new FilterAndProjectOperator.FilterAndProjectOperatorFactory(0, new PlanNodeId("test"), processor, ImmutableList.of(expressionTypes.get(projection)));
}
Expand All @@ -603,7 +604,7 @@ private SourceOperatorFactory compileScanFilterProject(Expression filter, Expres
ImmutableList.of(toRowExpression(projection, expressionTypes)),
SOURCE_ID);

PageProcessor pageProcessor = compiler.compilePageProcessor(
Supplier<PageProcessor> pageProcessor = compiler.compilePageProcessor(
toRowExpression(filter, expressionTypes),
ImmutableList.of(toRowExpression(projection, expressionTypes)));

Expand Down
Expand Up @@ -58,8 +58,8 @@ public void testNoCaching()
projectionsBuilder.add(new CallExpression(signature, arrayType, ImmutableList.of(new InputReferenceExpression(0, arrayType), new InputReferenceExpression(1, arrayType))));

ImmutableList<RowExpression> projections = projectionsBuilder.build();
PageProcessor pageProcessor = compiler.compilePageProcessor(new ConstantExpression(true, BOOLEAN), projections);
PageProcessor pageProcessor2 = compiler.compilePageProcessor(new ConstantExpression(true, BOOLEAN), projections);
PageProcessor pageProcessor = compiler.compilePageProcessor(new ConstantExpression(true, BOOLEAN), projections).get();
PageProcessor pageProcessor2 = compiler.compilePageProcessor(new ConstantExpression(true, BOOLEAN), projections).get();
assertTrue(pageProcessor != pageProcessor2);
}

Expand All @@ -68,7 +68,7 @@ public void testSanityRLE()
throws Exception
{
PageProcessor processor = new ExpressionCompiler(createTestMetadataManager())
.compilePageProcessor(new ConstantExpression(TRUE, BOOLEAN), ImmutableList.of(new InputReferenceExpression(0, BIGINT), new InputReferenceExpression(1, VARCHAR)));
.compilePageProcessor(new ConstantExpression(TRUE, BOOLEAN), ImmutableList.of(new InputReferenceExpression(0, BIGINT), new InputReferenceExpression(1, VARCHAR))).get();

Slice varcharValue = Slices.utf8Slice("hello");
Page page = new Page(RunLengthEncodedBlock.create(BIGINT, 123L, 100), RunLengthEncodedBlock.create(VARCHAR, varcharValue, 100));
Expand All @@ -90,7 +90,7 @@ public void testSanityColumnarDictionary()
throws Exception
{
PageProcessor processor = new ExpressionCompiler(createTestMetadataManager())
.compilePageProcessor(new ConstantExpression(TRUE, BOOLEAN), ImmutableList.of(new InputReferenceExpression(0, VARCHAR)));
.compilePageProcessor(new ConstantExpression(TRUE, BOOLEAN), ImmutableList.of(new InputReferenceExpression(0, VARCHAR))).get();

Page page = new Page(createDictionaryBlock(createExpectedValues(10), 100));
Page outputPage = processor.processColumnarDictionary(null, page, ImmutableList.of(VARCHAR));
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void setup()

handCodedProcessor = new Tpch1FilterAndProject();

compiledProcessor = new ExpressionCompiler(MetadataManager.createTestMetadataManager()).compilePageProcessor(FILTER, ImmutableList.of(PROJECT));
compiledProcessor = new ExpressionCompiler(MetadataManager.createTestMetadataManager()).compilePageProcessor(FILTER, ImmutableList.of(PROJECT)).get();
}

@Benchmark
Expand Down
Expand Up @@ -127,7 +127,7 @@ public void setup()
BOOLEAN,
arguments);

processor = new ExpressionCompiler(MetadataManager.createTestMetadataManager()).compilePageProcessor(filter, ImmutableList.of(project));
processor = new ExpressionCompiler(MetadataManager.createTestMetadataManager()).compilePageProcessor(filter, ImmutableList.of(project)).get();
}

@Benchmark
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void setup()
types = projections.stream().map(RowExpression::getType).collect(toList());

inputPage = createPage(types, dictionaryBlocks);
processor = new ExpressionCompiler(createTestMetadataManager()).compilePageProcessor(getFilter(type), projections);
processor = new ExpressionCompiler(createTestMetadataManager()).compilePageProcessor(getFilter(type), projections).get();
}

@Benchmark
Expand Down

0 comments on commit 35534e4

Please sign in to comment.