Skip to content

Commit

Permalink
Collect column statistics on table write: Planner
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Aug 2, 2018
1 parent 6df8ff2 commit 561d60c
Show file tree
Hide file tree
Showing 13 changed files with 680 additions and 100 deletions.
Expand Up @@ -2146,10 +2146,8 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl
inputChannels, inputChannels,
session); session);


Map<Symbol, Integer> layout = ImmutableMap.<Symbol, Integer>builder() Map<Symbol, Integer> layout = IntStream.range(0, node.getOutputSymbols().size()).boxed()
.put(node.getOutputSymbols().get(0), 0) .collect(toImmutableMap(i -> node.getOutputSymbols().get(i), i -> i));
.put(node.getOutputSymbols().get(1), 1)
.build();


return new PhysicalOperation(operatorFactory, layout, context, source); return new PhysicalOperation(operatorFactory, layout, context, source);
} }
Expand Down
Expand Up @@ -23,13 +23,15 @@
import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.Analysis; import com.facebook.presto.sql.analyzer.Analysis;
import com.facebook.presto.sql.analyzer.Field; import com.facebook.presto.sql.analyzer.Field;
import com.facebook.presto.sql.analyzer.RelationId; import com.facebook.presto.sql.analyzer.RelationId;
import com.facebook.presto.sql.analyzer.RelationType; import com.facebook.presto.sql.analyzer.RelationType;
import com.facebook.presto.sql.analyzer.Scope; import com.facebook.presto.sql.analyzer.Scope;
import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.StatisticsAggregationPlanner.TableStatisticAggregation;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.plan.Assignments; import com.facebook.presto.sql.planner.plan.Assignments;
import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.DeleteNode;
Expand All @@ -38,6 +40,7 @@
import com.facebook.presto.sql.planner.plan.OutputNode; import com.facebook.presto.sql.planner.plan.OutputNode;
import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.ProjectNode; import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.StatisticAggregations;
import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.ValuesNode; import com.facebook.presto.sql.planner.plan.ValuesNode;
Expand All @@ -58,10 +61,12 @@
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;


import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;


import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
Expand All @@ -73,7 +78,10 @@
import static com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import static com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
import static com.facebook.presto.sql.planner.sanity.PlanSanityChecker.DISTRIBUTED_PLAN_SANITY_CHECKER; import static com.facebook.presto.sql.planner.sanity.PlanSanityChecker.DISTRIBUTED_PLAN_SANITY_CHECKER;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Streams.zip;
import static java.lang.String.format; import static java.lang.String.format;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


Expand All @@ -92,6 +100,7 @@ public enum Stage
private final SymbolAllocator symbolAllocator = new SymbolAllocator(); private final SymbolAllocator symbolAllocator = new SymbolAllocator();
private final Metadata metadata; private final Metadata metadata;
private final SqlParser sqlParser; private final SqlParser sqlParser;
private final StatisticsAggregationPlanner statisticsAggregationPlanner;


public LogicalPlanner(Session session, public LogicalPlanner(Session session,
List<PlanOptimizer> planOptimizers, List<PlanOptimizer> planOptimizers,
Expand Down Expand Up @@ -122,6 +131,7 @@ public LogicalPlanner(Session session,
this.idAllocator = idAllocator; this.idAllocator = idAllocator;
this.metadata = metadata; this.metadata = metadata;
this.sqlParser = sqlParser; this.sqlParser = sqlParser;
this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(symbolAllocator, metadata);
} }


public Plan plan(Analysis analysis) public Plan plan(Analysis analysis)
Expand Down Expand Up @@ -216,12 +226,15 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
.map(ColumnMetadata::getName) .map(ColumnMetadata::getName)
.collect(toImmutableList()); .collect(toImmutableList());


TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadata(session, destination.getCatalogName(), tableMetadata);

return createTableWriterPlan( return createTableWriterPlan(
analysis, analysis,
plan, plan,
new CreateName(destination.getCatalogName(), tableMetadata, newTableLayout), new CreateName(destination.getCatalogName(), tableMetadata, newTableLayout),
columnNames, columnNames,
newTableLayout); newTableLayout,
statisticsMetadata);
} }


private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement)
Expand Down Expand Up @@ -275,26 +288,26 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement)
plan = new RelationPlan(projectNode, scope, projectNode.getOutputSymbols()); plan = new RelationPlan(projectNode, scope, projectNode.getOutputSymbols());


Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, insert.getTarget()); Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, insert.getTarget());
String catalogName = insert.getTarget().getConnectorId().getCatalogName();
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadata(session, catalogName, tableMetadata.getMetadata());


return createTableWriterPlan( return createTableWriterPlan(
analysis, analysis,
plan, plan,
new InsertReference(insert.getTarget()), new InsertReference(insert.getTarget()),
visibleTableColumnNames, visibleTableColumnNames,
newTableLayout); newTableLayout,
statisticsMetadata);
} }


private RelationPlan createTableWriterPlan( private RelationPlan createTableWriterPlan(
Analysis analysis, Analysis analysis,
RelationPlan plan, RelationPlan plan,
WriterTarget target, WriterTarget target,
List<String> columnNames, List<String> columnNames,
Optional<NewTableLayout> writeTableLayout) Optional<NewTableLayout> writeTableLayout,
TableStatisticsMetadata statisticsMetadata)
{ {
List<Symbol> writerOutputs = ImmutableList.of(
symbolAllocator.newSymbol("partialrows", BIGINT),
symbolAllocator.newSymbol("fragment", VARBINARY));

PlanNode source = plan.getRoot(); PlanNode source = plan.getRoot();


if (!analysis.isCreateTableAsSelectWithData()) { if (!analysis.isCreateTableAsSelectWithData()) {
Expand Down Expand Up @@ -325,23 +338,69 @@ private RelationPlan createTableWriterPlan(
outputLayout)); outputLayout));
} }


PlanNode writerNode = new TableWriterNode( List<Symbol> writerOutputs = ImmutableList.of(
idAllocator.getNextId(), symbolAllocator.newSymbol("partialrows", BIGINT),
source, symbolAllocator.newSymbol("fragment", VARBINARY));
target,
symbols, List<Symbol> commitOutputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT));
columnNames,
writerOutputs, if (!statisticsMetadata.isEmpty()) {
partitioningScheme); verify(columnNames.size() == symbols.size(), "columnNames.size() != symbols.size(): %s and %s", columnNames, symbols);
Map<String, Symbol> columnToSymbolMap = zip(columnNames.stream(), symbols.stream(), SimpleImmutableEntry::new)
.collect(toImmutableMap(Entry::getKey, Entry::getValue));

TableStatisticAggregation result = statisticsAggregationPlanner.createStatisticsAggregation(statisticsMetadata, columnToSymbolMap);

StatisticAggregations.Parts aggregations = result.getAggregations().createPartialAggregations(symbolAllocator, metadata.getFunctionRegistry());

// partial aggregation is run within the TableWriteOperator to calculate the statistics for
// the data consumed by the TableWriteOperator
// final aggregation is run within the TableFinishOperator to summarize collected statistics
// by the partial aggregation from all of the writer nodes
StatisticAggregations partialAggregation = aggregations.getPartialAggregation();
List<Symbol> writerOutputSymbols = ImmutableList.<Symbol>builder()
.addAll(writerOutputs)
.addAll(partialAggregation.getGroupingSymbols())
.addAll(partialAggregation.getAggregations().keySet())
.build();

PlanNode writerNode = new TableWriterNode(
idAllocator.getNextId(),
source,
target,
symbols,
columnNames,
writerOutputSymbols,
partitioningScheme,
Optional.of(partialAggregation));

TableFinishNode commitNode = new TableFinishNode(
idAllocator.getNextId(),
writerNode,
target,
commitOutputs,
Optional.of(aggregations.getFinalAggregation()),
Optional.of(result.getDescriptor()));

return new RelationPlan(commitNode, analysis.getRootScope(), commitOutputs);
}


List<Symbol> outputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT));
TableFinishNode commitNode = new TableFinishNode( TableFinishNode commitNode = new TableFinishNode(
idAllocator.getNextId(), idAllocator.getNextId(),
writerNode, new TableWriterNode(
idAllocator.getNextId(),
source,
target,
symbols,
columnNames,
writerOutputs,
partitioningScheme,
Optional.empty()),
target, target,
outputs); commitOutputs,

Optional.empty(),
return new RelationPlan(commitNode, analysis.getRootScope(), outputs); Optional.empty());
return new RelationPlan(commitNode, analysis.getRootScope(), commitOutputs);
} }


private RelationPlan createDeletePlan(Analysis analysis, Delete node) private RelationPlan createDeletePlan(Analysis analysis, Delete node)
Expand All @@ -350,7 +409,13 @@ private RelationPlan createDeletePlan(Analysis analysis, Delete node)
.plan(node); .plan(node);


List<Symbol> outputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)); List<Symbol> outputs = ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT));
TableFinishNode commitNode = new TableFinishNode(idAllocator.getNextId(), deleteNode, deleteNode.getTarget(), outputs); TableFinishNode commitNode = new TableFinishNode(
idAllocator.getNextId(),
deleteNode,
deleteNode.getTarget(),
outputs,
Optional.empty(),
Optional.empty());


return new RelationPlan(commitNode, analysis.getScope(node), commitNode.getOutputSymbols()); return new RelationPlan(commitNode, analysis.getScope(node), commitNode.getOutputSymbols());
} }
Expand Down Expand Up @@ -413,7 +478,7 @@ private static List<ColumnMetadata> getOutputTableColumns(RelationPlan plan, Opt
private static Map<NodeRef<LambdaArgumentDeclaration>, Symbol> buildLambdaDeclarationToSymbolMap(Analysis analysis, SymbolAllocator symbolAllocator) private static Map<NodeRef<LambdaArgumentDeclaration>, Symbol> buildLambdaDeclarationToSymbolMap(Analysis analysis, SymbolAllocator symbolAllocator)
{ {
Map<NodeRef<LambdaArgumentDeclaration>, Symbol> resultMap = new LinkedHashMap<>(); Map<NodeRef<LambdaArgumentDeclaration>, Symbol> resultMap = new LinkedHashMap<>();
for (Map.Entry<NodeRef<Expression>, Type> entry : analysis.getTypes().entrySet()) { for (Entry<NodeRef<Expression>, Type> entry : analysis.getTypes().entrySet()) {
if (!(entry.getKey().getNode() instanceof LambdaArgumentDeclaration)) { if (!(entry.getKey().getNode() instanceof LambdaArgumentDeclaration)) {
continue; continue;
} }
Expand Down

0 comments on commit 561d60c

Please sign in to comment.