From 561d60cae86cd357ead45bd8e9d4fbc49bab020c Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Fri, 20 Jul 2018 15:53:36 -0400 Subject: [PATCH] Collect column statistics on table write: Planner --- .../sql/planner/LocalExecutionPlanner.java | 6 +- .../presto/sql/planner/LogicalPlanner.java | 109 ++++++++--- .../planner/StatisticsAggregationPlanner.java | 181 ++++++++++++++++++ .../rule/PushTableWriteThroughUnion.java | 70 +++++-- .../optimizations/BeginTableWrite.java | 11 +- .../PruneUnreferencedOutputs.java | 22 ++- .../planner/optimizations/SymbolMapper.java | 82 +++++++- .../UnaliasSymbolReferences.java | 39 +--- .../planner/plan/StatisticAggregations.java | 103 ++++++++++ .../plan/StatisticAggregationsDescriptor.java | 109 +++++++++++ .../sql/planner/plan/TableFinishNode.java | 30 ++- .../sql/planner/plan/TableWriterNode.java | 13 +- .../iterative/rule/test/PlanBuilder.java | 5 +- 13 files changed, 680 insertions(+), 100 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregations.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregationsDescriptor.java diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 953cebe715ab..dd5f62c26604 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -2146,10 +2146,8 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl inputChannels, session); - Map layout = ImmutableMap.builder() - .put(node.getOutputSymbols().get(0), 0) - .put(node.getOutputSymbols().get(1), 1) - .build(); + Map layout = IntStream.range(0, node.getOutputSymbols().size()).boxed() + .collect(toImmutableMap(i -> node.getOutputSymbols().get(i), i -> i)); return new PhysicalOperation(operatorFactory, layout, context, source); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index 9b585ec61fe7..906870401cb2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.statistics.TableStatisticsMetadata; import com.facebook.presto.spi.type.Type; import com.facebook.presto.sql.analyzer.Analysis; import com.facebook.presto.sql.analyzer.Field; @@ -30,6 +31,7 @@ import com.facebook.presto.sql.analyzer.RelationType; import com.facebook.presto.sql.analyzer.Scope; import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.StatisticsAggregationPlanner.TableStatisticAggregation; import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; import com.facebook.presto.sql.planner.plan.Assignments; import com.facebook.presto.sql.planner.plan.DeleteNode; @@ -38,6 +40,7 @@ import com.facebook.presto.sql.planner.plan.OutputNode; import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.ProjectNode; +import com.facebook.presto.sql.planner.plan.StatisticAggregations; import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.plan.ValuesNode; @@ -58,10 +61,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; @@ -73,7 +78,10 @@ import static com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import static com.facebook.presto.sql.planner.sanity.PlanSanityChecker.DISTRIBUTED_PLAN_SANITY_CHECKER; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Streams.zip; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -92,6 +100,7 @@ public enum Stage private final SymbolAllocator symbolAllocator = new SymbolAllocator(); private final Metadata metadata; private final SqlParser sqlParser; + private final StatisticsAggregationPlanner statisticsAggregationPlanner; public LogicalPlanner(Session session, List planOptimizers, @@ -122,6 +131,7 @@ public LogicalPlanner(Session session, this.idAllocator = idAllocator; this.metadata = metadata; this.sqlParser = sqlParser; + this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(symbolAllocator, metadata); } public Plan plan(Analysis analysis) @@ -216,12 +226,15 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) .map(ColumnMetadata::getName) .collect(toImmutableList()); + TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadata(session, destination.getCatalogName(), tableMetadata); + return createTableWriterPlan( analysis, plan, new CreateName(destination.getCatalogName(), tableMetadata, newTableLayout), columnNames, - newTableLayout); + newTableLayout, + statisticsMetadata); } private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) @@ -275,13 +288,16 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) plan = new RelationPlan(projectNode, scope, projectNode.getOutputSymbols()); Optional newTableLayout = metadata.getInsertLayout(session, insert.getTarget()); + String catalogName = insert.getTarget().getConnectorId().getCatalogName(); + TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadata(session, catalogName, tableMetadata.getMetadata()); return createTableWriterPlan( analysis, plan, new InsertReference(insert.getTarget()), visibleTableColumnNames, - newTableLayout); + newTableLayout, + statisticsMetadata); } private RelationPlan createTableWriterPlan( @@ -289,12 +305,9 @@ private RelationPlan createTableWriterPlan( RelationPlan plan, WriterTarget target, List columnNames, - Optional writeTableLayout) + Optional writeTableLayout, + TableStatisticsMetadata statisticsMetadata) { - List writerOutputs = ImmutableList.of( - symbolAllocator.newSymbol("partialrows", BIGINT), - symbolAllocator.newSymbol("fragment", VARBINARY)); - PlanNode source = plan.getRoot(); if (!analysis.isCreateTableAsSelectWithData()) { @@ -325,23 +338,69 @@ private RelationPlan createTableWriterPlan( outputLayout)); } - PlanNode writerNode = new TableWriterNode( - idAllocator.getNextId(), - source, - target, - symbols, - columnNames, - writerOutputs, - partitioningScheme); + List writerOutputs = ImmutableList.of( + symbolAllocator.newSymbol("partialrows", BIGINT), + symbolAllocator.newSymbol("fragment", VARBINARY)); + + List commitOutputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)); + + if (!statisticsMetadata.isEmpty()) { + verify(columnNames.size() == symbols.size(), "columnNames.size() != symbols.size(): %s and %s", columnNames, symbols); + Map columnToSymbolMap = zip(columnNames.stream(), symbols.stream(), SimpleImmutableEntry::new) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)); + + TableStatisticAggregation result = statisticsAggregationPlanner.createStatisticsAggregation(statisticsMetadata, columnToSymbolMap); + + StatisticAggregations.Parts aggregations = result.getAggregations().createPartialAggregations(symbolAllocator, metadata.getFunctionRegistry()); + + // partial aggregation is run within the TableWriteOperator to calculate the statistics for + // the data consumed by the TableWriteOperator + // final aggregation is run within the TableFinishOperator to summarize collected statistics + // by the partial aggregation from all of the writer nodes + StatisticAggregations partialAggregation = aggregations.getPartialAggregation(); + List writerOutputSymbols = ImmutableList.builder() + .addAll(writerOutputs) + .addAll(partialAggregation.getGroupingSymbols()) + .addAll(partialAggregation.getAggregations().keySet()) + .build(); + + PlanNode writerNode = new TableWriterNode( + idAllocator.getNextId(), + source, + target, + symbols, + columnNames, + writerOutputSymbols, + partitioningScheme, + Optional.of(partialAggregation)); + + TableFinishNode commitNode = new TableFinishNode( + idAllocator.getNextId(), + writerNode, + target, + commitOutputs, + Optional.of(aggregations.getFinalAggregation()), + Optional.of(result.getDescriptor())); + + return new RelationPlan(commitNode, analysis.getRootScope(), commitOutputs); + } - List outputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)); TableFinishNode commitNode = new TableFinishNode( idAllocator.getNextId(), - writerNode, + new TableWriterNode( + idAllocator.getNextId(), + source, + target, + symbols, + columnNames, + writerOutputs, + partitioningScheme, + Optional.empty()), target, - outputs); - - return new RelationPlan(commitNode, analysis.getRootScope(), outputs); + commitOutputs, + Optional.empty(), + Optional.empty()); + return new RelationPlan(commitNode, analysis.getRootScope(), commitOutputs); } private RelationPlan createDeletePlan(Analysis analysis, Delete node) @@ -350,7 +409,13 @@ private RelationPlan createDeletePlan(Analysis analysis, Delete node) .plan(node); List outputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)); - TableFinishNode commitNode = new TableFinishNode(idAllocator.getNextId(), deleteNode, deleteNode.getTarget(), outputs); + TableFinishNode commitNode = new TableFinishNode( + idAllocator.getNextId(), + deleteNode, + deleteNode.getTarget(), + outputs, + Optional.empty(), + Optional.empty()); return new RelationPlan(commitNode, analysis.getScope(node), commitNode.getOutputSymbols()); } @@ -413,7 +478,7 @@ private static List getOutputTableColumns(RelationPlan plan, Opt private static Map, Symbol> buildLambdaDeclarationToSymbolMap(Analysis analysis, SymbolAllocator symbolAllocator) { Map, Symbol> resultMap = new LinkedHashMap<>(); - for (Map.Entry, Type> entry : analysis.getTypes().entrySet()) { + for (Entry, Type> entry : analysis.getTypes().entrySet()) { if (!(entry.getKey().getNode() instanceof LambdaArgumentDeclaration)) { continue; } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java new file mode 100644 index 000000000000..c6a79c72f125 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner; + +import com.facebook.presto.metadata.FunctionRegistry; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.Signature; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; +import com.facebook.presto.spi.statistics.TableStatisticType; +import com.facebook.presto.spi.statistics.TableStatisticsMetadata; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.sql.analyzer.TypeSignatureProvider; +import com.facebook.presto.sql.planner.plan.AggregationNode; +import com.facebook.presto.sql.planner.plan.StatisticAggregations; +import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor; +import com.facebook.presto.sql.tree.FunctionCall; +import com.facebook.presto.sql.tree.QualifiedName; +import com.facebook.presto.sql.tree.SymbolReference; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Objects.requireNonNull; + +public class StatisticsAggregationPlanner +{ + private final SymbolAllocator symbolAllocator; + private final Metadata metadata; + + public StatisticsAggregationPlanner(SymbolAllocator symbolAllocator, Metadata metadata) + { + this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMetadata statisticsMetadata, Map columnToSymbolMap) + { + StatisticAggregationsDescriptor.Builder descriptor = StatisticAggregationsDescriptor.builder(); + + List groupingColumns = statisticsMetadata.getGroupingColumns(); + List groupingSymbols = groupingColumns.stream() + .map(columnToSymbolMap::get) + .collect(toImmutableList()); + + for (int i = 0; i < groupingSymbols.size(); i++) { + descriptor.addGrouping(groupingSymbols.get(i), groupingColumns.get(i)); + } + + ImmutableMap.Builder aggregations = ImmutableMap.builder(); + FunctionRegistry functionRegistry = metadata.getFunctionRegistry(); + for (TableStatisticType type : statisticsMetadata.getTableStatistics()) { + if (type != ROW_COUNT) { + throw new PrestoException(NOT_SUPPORTED, "Table-wide statistic type not supported: " + type); + } + QualifiedName count = QualifiedName.of("count"); + AggregationNode.Aggregation aggregation = new AggregationNode.Aggregation( + new FunctionCall(count, ImmutableList.of()), + functionRegistry.resolveFunction(count, ImmutableList.of()), + Optional.empty()); + Symbol symbol = symbolAllocator.newSymbol("rowCount", BIGINT); + aggregations.put(symbol, aggregation); + descriptor.addTableStatistic(symbol, ROW_COUNT); + } + + for (ColumnStatisticMetadata columnStatisticMetadata : statisticsMetadata.getColumnStatistics()) { + String columnName = columnStatisticMetadata.getColumnName(); + ColumnStatisticType statisticType = columnStatisticMetadata.getStatisticType(); + Symbol inputSymbol = columnToSymbolMap.get(columnName); + verify(inputSymbol != null, "inputSymbol is null"); + Type inputType = symbolAllocator.getTypes().get(inputSymbol); + verify(inputType != null, "inputType is null for symbol: %s", inputSymbol); + ColumnStatisticsAggregation aggregation = createColumnAggregation(statisticType, inputSymbol, inputType); + Symbol symbol = symbolAllocator.newSymbol(statisticType + ":" + columnName, aggregation.getOutputType()); + aggregations.put(symbol, aggregation.getAggregation()); + descriptor.addColumnStatistic(symbol, columnStatisticMetadata); + } + + StatisticAggregations aggregation = new StatisticAggregations(aggregations.build(), groupingSymbols); + return new TableStatisticAggregation(aggregation, descriptor.build()); + } + + private ColumnStatisticsAggregation createColumnAggregation(ColumnStatisticType statisticType, Symbol input, Type inputType) + { + switch (statisticType) { + case MIN_VALUE: + return createAggregation(QualifiedName.of("min"), input.toSymbolReference(), inputType, inputType); + case MAX_VALUE: + return createAggregation(QualifiedName.of("max"), input.toSymbolReference(), inputType, inputType); + case NUMBER_OF_DISTINCT_VALUES: + return createAggregation(QualifiedName.of("approx_distinct"), input.toSymbolReference(), inputType, BIGINT); + case NUMBER_OF_NON_NULL_VALUES: + return createAggregation(QualifiedName.of("count"), input.toSymbolReference(), inputType, BIGINT); + case NUMBER_OF_TRUE_VALUES: + return createAggregation(QualifiedName.of("count_if"), input.toSymbolReference(), BOOLEAN, BIGINT); + default: + throw new IllegalArgumentException("Unsupported statistic type: " + statisticType); + } + } + + private ColumnStatisticsAggregation createAggregation(QualifiedName functionName, SymbolReference input, Type inputType, Type outputType) + { + Signature signature = metadata.getFunctionRegistry().resolveFunction(functionName, TypeSignatureProvider.fromTypes(ImmutableList.of(inputType))); + Type resolvedType = metadata.getType(getOnlyElement(signature.getArgumentTypes())); + verify(resolvedType.equals(inputType), "resolved function input type does not match the input type: %s != %s", resolvedType, inputType); + return new ColumnStatisticsAggregation( + new AggregationNode.Aggregation( + new FunctionCall(functionName, ImmutableList.of(input)), + signature, + Optional.empty()), + outputType); + } + + public static class TableStatisticAggregation + { + private final StatisticAggregations aggregations; + private final StatisticAggregationsDescriptor descriptor; + + private TableStatisticAggregation( + StatisticAggregations aggregations, + StatisticAggregationsDescriptor descriptor) + { + this.aggregations = requireNonNull(aggregations, "statisticAggregations is null"); + this.descriptor = requireNonNull(descriptor, "descriptor is null"); + } + + public StatisticAggregations getAggregations() + { + return aggregations; + } + + public StatisticAggregationsDescriptor getDescriptor() + { + return descriptor; + } + } + + public static class ColumnStatisticsAggregation + { + private final AggregationNode.Aggregation aggregation; + private final Type outputType; + + private ColumnStatisticsAggregation(AggregationNode.Aggregation aggregation, Type outputType) + { + this.aggregation = requireNonNull(aggregation, "aggregation is null"); + this.outputType = requireNonNull(outputType, "outputType is null"); + } + + public AggregationNode.Aggregation getAggregation() + { + return aggregation; + } + + public Type getOutputType() + { + return outputType; + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java index 07efa01d5b62..c10a7cf17b03 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java @@ -19,18 +19,24 @@ import com.facebook.presto.matching.Pattern; import com.facebook.presto.sql.planner.Symbol; import com.facebook.presto.sql.planner.iterative.Rule; +import com.facebook.presto.sql.planner.optimizations.SymbolMapper; import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.plan.UnionNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import static com.facebook.presto.SystemSessionProperties.isPushTableWriteThroughUnion; import static com.facebook.presto.matching.Capture.newCapture; import static com.facebook.presto.sql.planner.plan.Patterns.source; import static com.facebook.presto.sql.planner.plan.Patterns.tableWriterNode; import static com.facebook.presto.sql.planner.plan.Patterns.union; -import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; public class PushTableWriteThroughUnion implements Rule @@ -59,31 +65,57 @@ public boolean isEnabled(Session session) } @Override - public Result apply(TableWriterNode tableWriterNode, Captures captures, Context context) + public Result apply(TableWriterNode writerNode, Captures captures, Context context) { UnionNode unionNode = captures.get(CHILD); ImmutableList.Builder rewrittenSources = ImmutableList.builder(); - ImmutableListMultimap.Builder mappings = ImmutableListMultimap.builder(); - for (int i = 0; i < unionNode.getSources().size(); i++) { - int index = i; - ImmutableList.Builder newSymbols = ImmutableList.builder(); - for (Symbol outputSymbol : tableWriterNode.getOutputSymbols()) { + List> sourceMappings = new ArrayList<>(); + for (int source = 0; source < unionNode.getSources().size(); source++) { + rewrittenSources.add(rewriteSource(writerNode, unionNode, source, sourceMappings, context)); + } + + ImmutableListMultimap.Builder unionMappings = ImmutableListMultimap.builder(); + sourceMappings.forEach(mappings -> mappings.forEach(unionMappings::put)); + + return Result.ofPlanNode( + new UnionNode( + context.getIdAllocator().getNextId(), + rewrittenSources.build(), + unionMappings.build(), + ImmutableList.copyOf(unionMappings.build().keySet()))); + } + + private static TableWriterNode rewriteSource( + TableWriterNode writerNode, + UnionNode unionNode, + int source, + List> sourceMappings, + Context context) + { + Map inputMappings = getInputSymbolMapping(unionNode, source); + ImmutableMap.Builder mappings = ImmutableMap.builder(); + mappings.putAll(inputMappings); + ImmutableMap.Builder outputMappings = ImmutableMap.builder(); + for (Symbol outputSymbol : writerNode.getOutputSymbols()) { + if (inputMappings.containsKey(outputSymbol)) { + outputMappings.put(outputSymbol, inputMappings.get(outputSymbol)); + } + else { Symbol newSymbol = context.getSymbolAllocator().newSymbol(outputSymbol); - newSymbols.add(newSymbol); + outputMappings.put(outputSymbol, newSymbol); mappings.put(outputSymbol, newSymbol); } - rewrittenSources.add(new TableWriterNode( - context.getIdAllocator().getNextId(), - unionNode.getSources().get(index), - tableWriterNode.getTarget(), - tableWriterNode.getColumns().stream() - .map(column -> unionNode.getSymbolMapping().get(column).get(index)) - .collect(toImmutableList()), - tableWriterNode.getColumnNames(), - newSymbols.build(), - tableWriterNode.getPartitioningScheme())); } + sourceMappings.add(outputMappings.build()); + SymbolMapper symbolMapper = new SymbolMapper(mappings.build()); + return symbolMapper.map(writerNode, unionNode.getSources().get(source), context.getIdAllocator().getNextId()); + } - return Result.ofPlanNode(new UnionNode(context.getIdAllocator().getNextId(), rewrittenSources.build(), mappings.build(), ImmutableList.copyOf(mappings.build().keySet()))); + private static Map getInputSymbolMapping(UnionNode node, int source) + { + return node.getSymbolMapping() + .keySet() + .stream() + .collect(toImmutableMap(key -> key, key -> node.getSymbolMapping().get(key).get(source))); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/BeginTableWrite.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/BeginTableWrite.java index 6c33f99c024d..252c27293e28 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/BeginTableWrite.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/BeginTableWrite.java @@ -95,7 +95,8 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext c node.getColumns(), node.getColumnNames(), node.getOutputSymbols(), - node.getPartitioningScheme()); + node.getPartitioningScheme(), + node.getStatisticsAggregation()); } @Override @@ -121,7 +122,13 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext c context.get().addMaterializedHandle(originalTarget, newTarget); child = child.accept(this, context); - return new TableFinishNode(node.getId(), child, newTarget, node.getOutputSymbols()); + return new TableFinishNode( + node.getId(), + child, + newTarget, + node.getOutputSymbols(), + node.getStatisticsAggregation(), + node.getStatisticsAggregationDescriptor()); } public TableWriterNode.WriterTarget getTarget(PlanNode node) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index 20a78ce09a3c..e92a5dc8db4a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -48,6 +48,7 @@ import com.facebook.presto.sql.planner.plan.SetOperationNode; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; import com.facebook.presto.sql.planner.plan.SortNode; +import com.facebook.presto.sql.planner.plan.StatisticAggregations; import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableScanNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; @@ -607,12 +608,15 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext expectedInputs.addAll(SymbolsExtractor.extractUnique(aggregation.getCall()))); + } PlanNode source = context.rewrite(node.getSource(), expectedInputs.build()); - return new TableWriterNode( node.getId(), source, @@ -620,15 +624,21 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext> context) { - // Maintain the existing inputs needed for TableCommitNode PlanNode source = context.rewrite(node.getSource(), ImmutableSet.copyOf(node.getSource().getOutputSymbols())); - return new TableFinishNode(node.getId(), source, node.getTarget(), node.getOutputSymbols()); + return new TableFinishNode( + node.getId(), + source, + node.getTarget(), + node.getOutputSymbols(), + node.getStatisticsAggregation(), + node.getStatisticsAggregationDescriptor()); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 15c93db71b58..d8f4a17bde6b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -15,12 +15,16 @@ import com.facebook.presto.spi.block.SortOrder; import com.facebook.presto.sql.planner.OrderingScheme; +import com.facebook.presto.sql.planner.PartitioningScheme; import com.facebook.presto.sql.planner.PlanNodeIdAllocator; import com.facebook.presto.sql.planner.Symbol; import com.facebook.presto.sql.planner.plan.AggregationNode; import com.facebook.presto.sql.planner.plan.AggregationNode.Aggregation; import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.PlanNodeId; +import com.facebook.presto.sql.planner.plan.StatisticAggregations; +import com.facebook.presto.sql.planner.plan.TableFinishNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.plan.TopNNode; import com.facebook.presto.sql.tree.Expression; import com.facebook.presto.sql.tree.ExpressionRewriter; @@ -33,9 +37,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.Objects.requireNonNull; public class SymbolMapper @@ -82,14 +88,8 @@ public AggregationNode map(AggregationNode node, PlanNode source, PlanNodeIdAllo private AggregationNode map(AggregationNode node, PlanNode source, PlanNodeId newNodeId) { ImmutableMap.Builder aggregations = ImmutableMap.builder(); - for (Map.Entry entry : node.getAggregations().entrySet()) { - Symbol symbol = entry.getKey(); - Aggregation aggregation = entry.getValue(); - - aggregations.put(map(symbol), new Aggregation( - (FunctionCall) map(aggregation.getCall()), - aggregation.getSignature(), - aggregation.getMask().map(this::map))); + for (Entry entry : node.getAggregations().entrySet()) { + aggregations.put(map(entry.getKey()), map(entry.getValue())); } List> groupingSets = node.getGroupingSets().stream() @@ -107,6 +107,14 @@ private AggregationNode map(AggregationNode node, PlanNode source, PlanNodeId ne node.getGroupIdSymbol().map(this::map)); } + private Aggregation map(Aggregation aggregation) + { + return new Aggregation( + (FunctionCall) map(aggregation.getCall()), + aggregation.getSignature(), + aggregation.getMask().map(this::map)); + } + public TopNNode map(TopNNode node, PlanNode source, PlanNodeId newNodeId) { ImmutableList.Builder symbols = ImmutableList.builder(); @@ -129,6 +137,64 @@ public TopNNode map(TopNNode node, PlanNode source, PlanNodeId newNodeId) node.getStep()); } + public TableWriterNode map(TableWriterNode node, PlanNode source) + { + return map(node, source, node.getId()); + } + + public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId newNodeId) + { + // Intentionally does not use canonicalizeAndDistinct as that would remove columns + ImmutableList columns = node.getColumns().stream() + .map(this::map) + .collect(toImmutableList()); + + return new TableWriterNode( + newNodeId, + source, + node.getTarget(), + columns, + node.getColumnNames(), + map(node.getOutputSymbols()), + node.getPartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)), + node.getStatisticsAggregation().map(this::map)); + } + + public TableFinishNode map(TableFinishNode node, PlanNode source) + { + return new TableFinishNode( + node.getId(), + source, + node.getTarget(), + map(node.getOutputSymbols()), + node.getStatisticsAggregation().map(this::map), + node.getStatisticsAggregationDescriptor().map(descriptor -> descriptor.map(this::map))); + } + + private PartitioningScheme canonicalize(PartitioningScheme scheme, PlanNode source) + { + return new PartitioningScheme( + scheme.getPartitioning().translate(this::map), + mapAndDistinct(source.getOutputSymbols()), + scheme.getHashColumn().map(this::map), + scheme.isReplicateNullsAndAny(), + scheme.getBucketToPartition()); + } + + private StatisticAggregations map(StatisticAggregations statisticAggregations) + { + Map aggregations = statisticAggregations.getAggregations().entrySet().stream() + .collect(toImmutableMap(entry -> map(entry.getKey()), entry -> map(entry.getValue()))); + return new StatisticAggregations(aggregations, map(statisticAggregations.getGroupingSymbols())); + } + + private List map(List outputs) + { + return outputs.stream() + .map(this::map) + .collect(toImmutableList()); + } + private List mapAndDistinct(List outputs) { Set added = new HashSet<>(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index 3959c2b9fa69..ddab96e28862 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -372,7 +372,9 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext context) @Override public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) { - return context.defaultRewrite(node); + PlanNode source = context.rewrite(node.getSource()); + SymbolMapper mapper = new SymbolMapper(mapping); + return mapper.map(node, source); } @Override @@ -556,20 +558,8 @@ private static ImmutableList.Builder rewriteSources(SetOperationNode n public PlanNode visitTableWriter(TableWriterNode node, RewriteContext context) { PlanNode source = context.rewrite(node.getSource()); - - // Intentionally does not use canonicalizeAndDistinct as that would remove columns - ImmutableList columns = node.getColumns().stream() - .map(this::canonicalize) - .collect(toImmutableList()); - - return new TableWriterNode( - node.getId(), - source, - node.getTarget(), - columns, - node.getColumnNames(), - node.getOutputSymbols(), - node.getPartitioningScheme().map(partitioningScheme -> canonicalizePartitionFunctionBinding(partitioningScheme, source))); + SymbolMapper mapper = new SymbolMapper(mapping); + return mapper.map(node, source); } @Override @@ -731,24 +721,5 @@ private ListMultimap canonicalizeSetOperationSymbolMap(ListMulti } return builder.build(); } - - private PartitioningScheme canonicalizePartitionFunctionBinding(PartitioningScheme scheme, PlanNode source) - { - Set addedOutputs = new HashSet<>(); - ImmutableList.Builder outputs = ImmutableList.builder(); - for (Symbol symbol : source.getOutputSymbols()) { - Symbol canonicalOutput = canonicalize(symbol); - if (addedOutputs.add(canonicalOutput)) { - outputs.add(canonicalOutput); - } - } - - return new PartitioningScheme( - scheme.getPartitioning().translate(this::canonicalize), - outputs.build(), - canonicalize(scheme.getHashColumn()), - scheme.isReplicateNullsAndAny(), - scheme.getBucketToPartition()); - } } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregations.java new file mode 100644 index 000000000000..d565730dda8d --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregations.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.plan; + +import com.facebook.presto.metadata.FunctionRegistry; +import com.facebook.presto.metadata.Signature; +import com.facebook.presto.operator.aggregation.InternalAggregationFunction; +import com.facebook.presto.sql.planner.Symbol; +import com.facebook.presto.sql.planner.SymbolAllocator; +import com.facebook.presto.sql.planner.plan.AggregationNode.Aggregation; +import com.facebook.presto.sql.tree.FunctionCall; +import com.facebook.presto.sql.tree.QualifiedName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class StatisticAggregations +{ + private final Map aggregations; + private final List groupingSymbols; + + @JsonCreator + public StatisticAggregations( + @JsonProperty("aggregations") Map aggregations, + @JsonProperty("groupingSymbols") List groupingSymbols) + { + this.aggregations = ImmutableMap.copyOf(requireNonNull(aggregations, "aggregations is null")); + this.groupingSymbols = ImmutableList.copyOf(requireNonNull(groupingSymbols, "groupingSymbols is null")); + } + + @JsonProperty + public Map getAggregations() + { + return aggregations; + } + + @JsonProperty + public List getGroupingSymbols() + { + return groupingSymbols; + } + + public Parts createPartialAggregations(SymbolAllocator symbolAllocator, FunctionRegistry functionRegistry) + { + ImmutableMap.Builder partialAggregation = ImmutableMap.builder(); + ImmutableMap.Builder finalAggregation = ImmutableMap.builder(); + for (Map.Entry entry : aggregations.entrySet()) { + Aggregation originalAggregation = entry.getValue(); + Signature signature = originalAggregation.getSignature(); + InternalAggregationFunction function = functionRegistry.getAggregateFunctionImplementation(signature); + Symbol partialSymbol = symbolAllocator.newSymbol(signature.getName(), function.getIntermediateType()); + partialAggregation.put(partialSymbol, new Aggregation(originalAggregation.getCall(), signature, originalAggregation.getMask())); + finalAggregation.put(entry.getKey(), + new Aggregation( + new FunctionCall(QualifiedName.of(signature.getName()), ImmutableList.of(partialSymbol.toSymbolReference())), + signature, + Optional.empty())); + } + return new Parts( + new StatisticAggregations(partialAggregation.build(), groupingSymbols), + new StatisticAggregations(finalAggregation.build(), groupingSymbols)); + } + + public static class Parts + { + private final StatisticAggregations partialAggregation; + private final StatisticAggregations finalAggregation; + + public Parts(StatisticAggregations partialAggregation, StatisticAggregations finalAggregation) + { + this.partialAggregation = requireNonNull(partialAggregation, "partialAggregation is null"); + this.finalAggregation = requireNonNull(finalAggregation, "finalAggregation is null"); + } + + public StatisticAggregations getPartialAggregation() + { + return partialAggregation; + } + + public StatisticAggregations getFinalAggregation() + { + return finalAggregation; + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregationsDescriptor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregationsDescriptor.java new file mode 100644 index 000000000000..428a41b87329 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/StatisticAggregationsDescriptor.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.plan; + +import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.TableStatisticType; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.function.Function; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.util.Objects.requireNonNull; + +public class StatisticAggregationsDescriptor +{ + private final Map grouping; + private final Map tableStatistics; + private final Map columnStatistics; + + @JsonCreator + public StatisticAggregationsDescriptor( + @JsonProperty("grouping") Map grouping, + @JsonProperty("tableStatistics") Map tableStatistics, + @JsonProperty("columnStatistics") Map columnStatistics) + { + this.grouping = ImmutableMap.copyOf(requireNonNull(grouping, "grouping is null")); + this.tableStatistics = ImmutableMap.copyOf(requireNonNull(tableStatistics, "tableStatistics is null")); + this.columnStatistics = ImmutableMap.copyOf(requireNonNull(columnStatistics, "columnStatistics is null")); + } + + @JsonProperty + public Map getGrouping() + { + return grouping; + } + + @JsonProperty + public Map getTableStatistics() + { + return tableStatistics; + } + + @JsonProperty + public Map getColumnStatistics() + { + return columnStatistics; + } + + public static Builder builder() + { + return new Builder<>(); + } + + public StatisticAggregationsDescriptor map(Function mapper) + { + return new StatisticAggregationsDescriptor<>( + map(this.getGrouping(), mapper), + map(this.getTableStatistics(), mapper), + map(this.getColumnStatistics(), mapper)); + } + + private static Map map(Map input, Function mapper) + { + return input.entrySet() + .stream() + .collect(toImmutableMap(entry -> mapper.apply(entry.getKey()), Map.Entry::getValue)); + } + + public static class Builder + { + private final ImmutableMap.Builder grouping = ImmutableMap.builder(); + private final ImmutableMap.Builder tableStatistics = ImmutableMap.builder(); + private final ImmutableMap.Builder columnStatistics = ImmutableMap.builder(); + + public void addGrouping(T key, String column) + { + grouping.put(key, column); + } + + public void addTableStatistic(T key, TableStatisticType type) + { + tableStatistics.put(key, type); + } + + public void addColumnStatistic(T key, ColumnStatisticMetadata statisticMetadata) + { + columnStatistics.put(key, statisticMetadata); + } + + public StatisticAggregationsDescriptor build() + { + return new StatisticAggregationsDescriptor<>(grouping.build(), tableStatistics.build(), columnStatistics.build()); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableFinishNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableFinishNode.java index 86d4bb2f29f9..4004f0e42a35 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableFinishNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableFinishNode.java @@ -22,6 +22,7 @@ import javax.annotation.concurrent.Immutable; import java.util.List; +import java.util.Optional; import static com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import static com.google.common.base.Preconditions.checkArgument; @@ -34,13 +35,17 @@ public class TableFinishNode private final PlanNode source; private final WriterTarget target; private final List outputs; + private final Optional statisticsAggregation; + private final Optional> statisticsAggregationDescriptor; @JsonCreator public TableFinishNode( @JsonProperty("id") PlanNodeId id, @JsonProperty("source") PlanNode source, @JsonProperty("target") WriterTarget target, - @JsonProperty("outputs") List outputs) + @JsonProperty("outputs") List outputs, + @JsonProperty("statisticsAggregation") Optional statisticsAggregation, + @JsonProperty("statisticsAggregationDescriptor") Optional> statisticsAggregationDescriptor) { super(id); @@ -48,6 +53,9 @@ public TableFinishNode( this.source = requireNonNull(source, "source is null"); this.target = requireNonNull(target, "target is null"); this.outputs = ImmutableList.copyOf(requireNonNull(outputs, "outputs is null")); + this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null"); + this.statisticsAggregationDescriptor = requireNonNull(statisticsAggregationDescriptor, "statisticsAggregationDescriptor is null"); + checkArgument(statisticsAggregation.isPresent() == statisticsAggregationDescriptor.isPresent(), "statisticsAggregation and statisticsAggregationDescriptor must both be either present or absent"); } @JsonProperty @@ -69,6 +77,18 @@ public List getOutputSymbols() return outputs; } + @JsonProperty + public Optional getStatisticsAggregation() + { + return statisticsAggregation; + } + + @JsonProperty + public Optional> getStatisticsAggregationDescriptor() + { + return statisticsAggregationDescriptor; + } + @Override public List getSources() { @@ -84,6 +104,12 @@ public R accept(PlanVisitor visitor, C context) @Override public PlanNode replaceChildren(List newChildren) { - return new TableFinishNode(getId(), Iterables.getOnlyElement(newChildren), target, outputs); + return new TableFinishNode( + getId(), + Iterables.getOnlyElement(newChildren), + target, + outputs, + statisticsAggregation, + statisticsAggregationDescriptor); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java index 06a31633288f..fc7770b23bd2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java @@ -46,6 +46,7 @@ public class TableWriterNode private final List columns; private final List columnNames; private final Optional partitioningScheme; + private final Optional statisticsAggregation; @JsonCreator public TableWriterNode( @@ -55,7 +56,8 @@ public TableWriterNode( @JsonProperty("columns") List columns, @JsonProperty("columnNames") List columnNames, @JsonProperty("outputs") List outputs, - @JsonProperty("partitioningScheme") Optional partitioningScheme) + @JsonProperty("partitioningScheme") Optional partitioningScheme, + @JsonProperty("statisticsAggregation") Optional statisticsAggregation) { super(id); @@ -69,6 +71,7 @@ public TableWriterNode( this.columnNames = ImmutableList.copyOf(columnNames); this.outputs = ImmutableList.copyOf(requireNonNull(outputs, "outputs is null")); this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null"); + this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null"); } @JsonProperty @@ -108,6 +111,12 @@ public Optional getPartitioningScheme() return partitioningScheme; } + @JsonProperty + public Optional getStatisticsAggregation() + { + return statisticsAggregation; + } + @Override public List getSources() { @@ -123,7 +132,7 @@ public R accept(PlanVisitor visitor, C context) @Override public PlanNode replaceChildren(List newChildren) { - return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, columns, columnNames, outputs, partitioningScheme); + return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, columns, columnNames, outputs, partitioningScheme, statisticsAggregation); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type") diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index efae4007b1e9..e5aa80dbd675 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -420,7 +420,9 @@ public TableFinishNode tableDelete(SchemaTableName schemaTableName, PlanNode del .addInputsSet(deleteRowId) .singleDistributionPartitioningScheme(deleteRowId)), deleteHandle, - ImmutableList.of(deleteRowId)); + ImmutableList.of(deleteRowId), + Optional.empty(), + Optional.empty()); } public ExchangeNode gatheringExchange(ExchangeNode.Scope scope, PlanNode child) @@ -670,6 +672,7 @@ public TableWriterNode tableWriter(List columns, List columnName columns, columnNames, ImmutableList.of(symbol("partialrows", BIGINT), symbol("fragment", VARBINARY)), + Optional.empty(), Optional.empty()); }