From 4df9290256f45b34ae5216eaa14262c2e4335332 Mon Sep 17 00:00:00 2001 From: Nezih Yigitbasi Date: Fri, 7 Jul 2017 15:57:45 -0700 Subject: [PATCH] Add stage id/plan node id to generated page projection class name --- .../presto/benchmark/HandTpchQuery6.java | 2 +- .../presto/execution/SqlTaskExecution.java | 2 +- .../index/DynamicTupleFilterFactory.java | 2 +- .../presto/sql/gen/ExpressionCompiler.java | 9 ++++- .../presto/sql/gen/PageFunctionCompiler.java | 20 ++++++++-- .../sql/planner/LocalExecutionPlanner.java | 40 +++++++++++-------- .../presto/testing/LocalQueryRunner.java | 2 +- .../sql/gen/TestPageFunctionCompiler.java | 23 ++++++++++- 8 files changed, 73 insertions(+), 27 deletions(-) diff --git a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java index af4d670fe53f..1ad49e024879 100644 --- a/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java +++ b/presto-benchmark/src/main/java/com/facebook/presto/benchmark/HandTpchQuery6.java @@ -69,7 +69,7 @@ protected List createOperatorFactories() // and quantity < 24; OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "extendedprice", "discount", "shipdate", "quantity"); - Supplier projection = new PageFunctionCompiler(localQueryRunner.getMetadata()).compileProjection(field(0, BIGINT)); + Supplier projection = new PageFunctionCompiler(localQueryRunner.getMetadata()).compileProjection(field(0, BIGINT), Optional.empty()); FilterAndProjectOperator.FilterAndProjectOperatorFactory tpchQuery6Operator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory( 1, diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java index 4974d5d4c284..948d5e5a0421 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java @@ -160,7 +160,7 @@ private SqlTaskExecution( List driverFactories; try { LocalExecutionPlan localExecutionPlan = planner.plan( - taskContext.getSession(), + taskContext, fragment.getRoot(), fragment.getSymbols(), fragment.getPartitioningScheme(), diff --git a/presto-main/src/main/java/com/facebook/presto/operator/index/DynamicTupleFilterFactory.java b/presto-main/src/main/java/com/facebook/presto/operator/index/DynamicTupleFilterFactory.java index da90b6a035ba..a4d32712404d 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/index/DynamicTupleFilterFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/index/DynamicTupleFilterFactory.java @@ -72,7 +72,7 @@ public DynamicTupleFilterFactory(int filterOperatorId, PlanNodeId planNodeId, in this.outputTypes = ImmutableList.copyOf(outputTypes); PageFunctionCompiler pageFunctionCompiler = new PageFunctionCompiler(metadata); this.outputProjections = IntStream.range(0, outputTypes.size()) - .mapToObj(field -> pageFunctionCompiler.compileProjection(Expressions.field(field, outputTypes.get(field)))) + .mapToObj(field -> pageFunctionCompiler.compileProjection(Expressions.field(field, outputTypes.get(field)), Optional.empty())) .collect(toImmutableList()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/gen/ExpressionCompiler.java b/presto-main/src/main/java/com/facebook/presto/sql/gen/ExpressionCompiler.java index 89fdc8c22461..9c350a94e92a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/gen/ExpressionCompiler.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/gen/ExpressionCompiler.java @@ -97,12 +97,12 @@ public Supplier compileCursorProcessor(Optional }; } - public Supplier compilePageProcessor(Optional filter, List projections) + public Supplier compilePageProcessor(Optional filter, List projections, Optional classNameSuffix) { PageFunctionCompiler pageFunctionCompiler = new PageFunctionCompiler(metadata); Optional> filterFunctionSupplier = filter.map(pageFunctionCompiler::compileFilter); List> pageProjectionSuppliers = projections.stream() - .map(pageFunctionCompiler::compileProjection) + .map(projection -> pageFunctionCompiler.compileProjection(projection, classNameSuffix)) .collect(toImmutableList()); return () -> { @@ -114,6 +114,11 @@ public Supplier compilePageProcessor(Optional filt }; } + public Supplier compilePageProcessor(Optional filter, List projections) + { + return compilePageProcessor(filter, projections, Optional.empty()); + } + private Class compile(Optional filter, List projections, BodyCompiler bodyCompiler, Class superType) { // create filter and project page iterator class diff --git a/presto-main/src/main/java/com/facebook/presto/sql/gen/PageFunctionCompiler.java b/presto-main/src/main/java/com/facebook/presto/sql/gen/PageFunctionCompiler.java index bdb876e77d52..418d8fdc5849 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/gen/PageFunctionCompiler.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/gen/PageFunctionCompiler.java @@ -19,6 +19,7 @@ import com.facebook.presto.bytecode.FieldDefinition; import com.facebook.presto.bytecode.MethodDefinition; import com.facebook.presto.bytecode.Parameter; +import com.facebook.presto.bytecode.ParameterizedType; import com.facebook.presto.bytecode.Scope; import com.facebook.presto.bytecode.Variable; import com.facebook.presto.bytecode.control.ForLoop; @@ -58,6 +59,7 @@ import javax.inject.Inject; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.function.Consumer; @@ -104,7 +106,7 @@ public PageFunctionCompiler(Metadata metadata) this.determinismEvaluator = new DeterminismEvaluator(metadata.getFunctionRegistry()); } - public Supplier compileProjection(RowExpression projection) + public Supplier compileProjection(RowExpression projection, Optional classNameSuffix) { requireNonNull(projection, "projection is null"); @@ -123,7 +125,7 @@ public Supplier compileProjection(RowExpression projection) PageFieldsToInputParametersRewriter.Result result = rewritePageFieldsToInputParameters(projection); CallSiteBinder callSiteBinder = new CallSiteBinder(); - ClassDefinition classDefinition = defineProjectionClass(result.getRewrittenExpression(), result.getInputChannels(), callSiteBinder); + ClassDefinition classDefinition = defineProjectionClass(result.getRewrittenExpression(), result.getInputChannels(), callSiteBinder, classNameSuffix); Class projectionClass; try { @@ -143,11 +145,21 @@ public Supplier compileProjection(RowExpression projection) }; } - private ClassDefinition defineProjectionClass(RowExpression projection, InputChannels inputChannels, CallSiteBinder callSiteBinder) + private ParameterizedType generateProjectionClassName(Optional classNameSuffix) + { + StringBuilder className = new StringBuilder(PageProjection.class.getSimpleName()); + classNameSuffix.ifPresent(suffix -> className.append("_").append(suffix.replace('.', '_'))); + return makeClassName(className.toString()); + } + + private ClassDefinition defineProjectionClass(RowExpression projection, + InputChannels inputChannels, + CallSiteBinder callSiteBinder, + Optional classNameSuffix) { ClassDefinition classDefinition = new ClassDefinition( a(PUBLIC, FINAL), - makeClassName(PageProjection.class.getSimpleName()), + generateProjectionClassName(classNameSuffix), type(Object.class), type(PageProjection.class)); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 1c94317f6899..5a9b4c283193 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -17,6 +17,7 @@ import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.cost.CostCalculator; import com.facebook.presto.execution.QueryPerformanceFetcher; +import com.facebook.presto.execution.StageId; import com.facebook.presto.execution.TaskManagerConfig; import com.facebook.presto.execution.buffer.OutputBuffer; import com.facebook.presto.execution.buffer.PagesSerdeFactory; @@ -57,6 +58,7 @@ import com.facebook.presto.operator.SetBuilderOperator.SetSupplier; import com.facebook.presto.operator.SourceOperatorFactory; import com.facebook.presto.operator.TableScanOperator.TableScanOperatorFactory; +import com.facebook.presto.operator.TaskContext; import com.facebook.presto.operator.TaskOutputOperator.TaskOutputFactory; import com.facebook.presto.operator.TopNOperator.TopNOperatorFactory; import com.facebook.presto.operator.TopNRowNumberOperator; @@ -296,7 +298,7 @@ public LocalExecutionPlanner( } public LocalExecutionPlan plan( - Session session, + TaskContext taskContext, PlanNode plan, Map types, PartitioningScheme partitioningScheme, @@ -307,7 +309,7 @@ public LocalExecutionPlan plan( partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) || partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) { - return plan(session, plan, outputLayout, types, new TaskOutputFactory(outputBuffer)); + return plan(taskContext, plan, outputLayout, types, new TaskOutputFactory(outputBuffer)); } // We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed @@ -342,7 +344,7 @@ public LocalExecutionPlan plan( .collect(toImmutableList()); } - PartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(session, partitioningScheme, partitionChannelTypes); + PartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes); OptionalInt nullChannel = OptionalInt.empty(); Set partitioningColumns = partitioningScheme.getPartitioning().getColumns(); @@ -353,7 +355,7 @@ public LocalExecutionPlan plan( } return plan( - session, + taskContext, plan, outputLayout, types, @@ -367,13 +369,14 @@ public LocalExecutionPlan plan( maxPagePartitioningBufferSize)); } - public LocalExecutionPlan plan(Session session, + public LocalExecutionPlan plan(TaskContext taskContext, PlanNode plan, List outputLayout, Map types, OutputFactory outputOperatorFactory) { - LocalExecutionPlanContext context = new LocalExecutionPlanContext(session, types); + Session session = taskContext.getSession(); + LocalExecutionPlanContext context = new LocalExecutionPlanContext(taskContext, types); PhysicalOperation physicalOperation = plan.accept(new Visitor(session), context); @@ -441,7 +444,7 @@ private static void addLookupOuterDrivers(LocalExecutionPlanContext context) private static class LocalExecutionPlanContext { - private final Session session; + private final TaskContext taskContext; private final Map types; private final List driverFactories; private final Optional indexSourceContext; @@ -453,19 +456,19 @@ private static class LocalExecutionPlanContext private boolean inputDriver = true; private OptionalInt driverInstanceCount = OptionalInt.empty(); - public LocalExecutionPlanContext(Session session, Map types) + public LocalExecutionPlanContext(TaskContext taskContext, Map types) { - this(session, types, new ArrayList<>(), Optional.empty(), new AtomicInteger(0)); + this(taskContext, types, new ArrayList<>(), Optional.empty(), new AtomicInteger(0)); } private LocalExecutionPlanContext( - Session session, + TaskContext taskContext, Map types, List driverFactories, Optional indexSourceContext, AtomicInteger nextPipelineId) { - this.session = session; + this.taskContext = taskContext; this.types = types; this.driverFactories = driverFactories; this.indexSourceContext = indexSourceContext; @@ -484,7 +487,12 @@ private List getDriverFactories() public Session getSession() { - return session; + return taskContext.getSession(); + } + + public StageId getStageId() + { + return taskContext.getTaskId().getStageId(); } public Map getTypes() @@ -520,12 +528,12 @@ private void setInputDriver(boolean inputDriver) public LocalExecutionPlanContext createSubContext() { checkState(!indexSourceContext.isPresent(), "index build plan can not have sub-contexts"); - return new LocalExecutionPlanContext(session, types, driverFactories, indexSourceContext, nextPipelineId); + return new LocalExecutionPlanContext(taskContext, types, driverFactories, indexSourceContext, nextPipelineId); } public LocalExecutionPlanContext createIndexSourceSubContext(IndexSourceContext indexSourceContext) { - return new LocalExecutionPlanContext(session, types, driverFactories, Optional.of(indexSourceContext), nextPipelineId); + return new LocalExecutionPlanContext(taskContext, types, driverFactories, Optional.of(indexSourceContext), nextPipelineId); } public OptionalInt getDriverInstanceCount() @@ -1070,7 +1078,7 @@ private PhysicalOperation visitScanFilterAndProject( try { if (columns != null) { Supplier cursorProcessor = expressionCompiler.compileCursorProcessor(translatedFilter, translatedProjections, sourceNode.getId()); - Supplier pageProcessor = expressionCompiler.compilePageProcessor(translatedFilter, translatedProjections); + Supplier pageProcessor = expressionCompiler.compilePageProcessor(translatedFilter, translatedProjections, Optional.of(context.getStageId() + "_" + planNodeId)); SourceOperatorFactory operatorFactory = new ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory( context.getNextOperatorId(), @@ -1085,7 +1093,7 @@ private PhysicalOperation visitScanFilterAndProject( return new PhysicalOperation(operatorFactory, outputMappings); } else { - Supplier pageProcessor = expressionCompiler.compilePageProcessor(translatedFilter, translatedProjections); + Supplier pageProcessor = expressionCompiler.compilePageProcessor(translatedFilter, translatedProjections, Optional.of(context.getStageId() + "_" + planNodeId)); OperatorFactory operatorFactory = new FilterAndProjectOperator.FilterAndProjectOperatorFactory( context.getNextOperatorId(), diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 300e072d46c1..3569deb4a097 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -597,7 +597,7 @@ public List createDrivers(Session session, @Language("SQL") String sql, // plan query LocalExecutionPlan localExecutionPlan = executionPlanner.plan( - session, + taskContext, subplan.getFragment().getRoot(), subplan.getFragment().getPartitioningScheme().getOutputLayout(), plan.getTypes(), diff --git a/presto-main/src/test/java/com/facebook/presto/sql/gen/TestPageFunctionCompiler.java b/presto-main/src/test/java/com/facebook/presto/sql/gen/TestPageFunctionCompiler.java index bf4874aee615..4fbaef2fded7 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/gen/TestPageFunctionCompiler.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/gen/TestPageFunctionCompiler.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import org.testng.annotations.Test; +import java.util.Optional; import java.util.function.Supplier; import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; @@ -35,6 +36,7 @@ import static com.facebook.presto.sql.relational.Expressions.field; import static com.facebook.presto.testing.TestingConnectorSession.SESSION; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class TestPageFunctionCompiler @@ -51,7 +53,7 @@ public void testFailureDoesNotCorruptFutureResults() field(0, BIGINT), constant(10L, BIGINT)); - Supplier projectionSupplier = functionCompiler.compileProjection(add10); + Supplier projectionSupplier = functionCompiler.compileProjection(add10, Optional.empty()); PageProjection projection = projectionSupplier.get(); // process good page and verify we got the expected number of result rows @@ -75,6 +77,25 @@ public void testFailureDoesNotCorruptFutureResults() assertEquals(goodPage.getPositionCount(), goodResult.getPositionCount()); } + @Test + public void testGeneratedClassName() + { + PageFunctionCompiler functionCompiler = new PageFunctionCompiler(createTestMetadataManager()); + RowExpression add10 = call( + Signature.internalOperator(ADD, BIGINT.getTypeSignature(), ImmutableList.of(BIGINT.getTypeSignature(), BIGINT.getTypeSignature())), + BIGINT, + field(0, BIGINT), + constant(10L, BIGINT)); + + String planNodeId = "7"; + String stageId = "20170707_223500_67496_zguwn.2"; + String classSuffix = stageId + "_" + planNodeId; + Supplier projectionSupplier = functionCompiler.compileProjection(add10, Optional.of(classSuffix)); + PageProjection projection = projectionSupplier.get(); + // class name should look like PageProjection_20170707_223500_67496_zguwn_2_7_XX + assertTrue(projection.getClass().getSimpleName().startsWith("PageProjection_" + stageId.replace('.', '_') + "_" + planNodeId)); + } + private static Page createLongBlockPage(long... values) { BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(values.length);