Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support persistent CTE's #20887

Merged
merged 3 commits into from Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

Expand Up @@ -579,7 +579,7 @@ public CteMaterializationStrategy getCteMaterializationStrategy()
}

@Config("cte-materialization-strategy")
@ConfigDescription("Set strategy used to determine whether to materialize ctes (ALL, NONE)")
@ConfigDescription("Set strategy used to determine whether to materialize CTEs (ALL, NONE)")
public FeaturesConfig setCteMaterializationStrategy(CteMaterializationStrategy cteMaterializationStrategy)
{
this.cteMaterializationStrategy = cteMaterializationStrategy;
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.base.VerifyException;
Expand Down Expand Up @@ -61,6 +62,14 @@ public Void visitTableWriter(TableWriterNode node, Void context)
return null;
}

public Void visitSequence(SequenceNode node, Void context)
{
// Left children of sequence are ignored since they don't output anything
node.getPrimarySource().accept(this, context);

return null;
}

@Override
public Void visitPlan(PlanNode node, Void context)
{
Expand Down
Expand Up @@ -144,12 +144,14 @@
import com.facebook.presto.sql.planner.optimizations.IndexJoinOptimizer;
import com.facebook.presto.sql.planner.optimizations.KeyBasedSampler;
import com.facebook.presto.sql.planner.optimizations.LimitPushDown;
import com.facebook.presto.sql.planner.optimizations.LogicalCteOptimizer;
import com.facebook.presto.sql.planner.optimizations.MergeJoinForSortedInputOptimizer;
import com.facebook.presto.sql.planner.optimizations.MergePartialAggregationsWithFilter;
import com.facebook.presto.sql.planner.optimizations.MetadataDeleteOptimizer;
import com.facebook.presto.sql.planner.optimizations.MetadataQueryOptimizer;
import com.facebook.presto.sql.planner.optimizations.OptimizeMixedDistinctAggregations;
import com.facebook.presto.sql.planner.optimizations.PayloadJoinOptimizer;
import com.facebook.presto.sql.planner.optimizations.PhysicalCteOptimizer;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.optimizations.PredicatePushDown;
import com.facebook.presto.sql.planner.optimizations.PrefilterForLimitingAggregation;
Expand Down Expand Up @@ -275,6 +277,8 @@ public PlanOptimizers(
new PruneLimitColumns(),
new PruneTableScanColumns());

builder.add(new LogicalCteOptimizer(metadata));

IterativeOptimizer inlineProjections = new IterativeOptimizer(
metadata,
ruleStats,
Expand Down Expand Up @@ -760,6 +764,7 @@ public PlanOptimizers(
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new PushTableWriteThroughUnion()))); // Must run before AddExchanges
builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, sqlParser, partitioningProviderManager)));
}

Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.CteReferenceNode;
import com.facebook.presto.spi.plan.ExceptNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.IntersectNode;
Expand Down Expand Up @@ -106,6 +107,7 @@
import java.util.Set;
import java.util.stream.IntStream;

import static com.facebook.presto.SystemSessionProperties.getCteMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getQueryAnalyzerTimeout;
import static com.facebook.presto.common.type.TypeUtils.isEnumType;
import static com.facebook.presto.metadata.MetadataUtil.createQualifiedObjectName;
Expand All @@ -116,9 +118,11 @@
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.getSourceLocation;
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.isEqualComparisonExpression;
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.resolveEnumLiteral;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.CteMaterializationStrategy.ALL;
import static com.facebook.presto.sql.analyzer.SemanticExceptions.notSupportedException;
import static com.facebook.presto.sql.planner.PlannerUtils.newVariable;
import static com.facebook.presto.sql.planner.TranslateExpressionsUtil.toRowExpression;
import static com.facebook.presto.sql.planner.optimizations.CteUtils.isCteMaterializable;
import static com.facebook.presto.sql.tree.Join.Type.INNER;
import static com.facebook.presto.sql.tree.Join.Type.LEFT;
import static com.facebook.presto.sql.tree.Join.Type.RIGHT;
Expand Down Expand Up @@ -180,8 +184,18 @@ protected RelationPlan visitTable(Table node, SqlPlannerContext context)
if (namedQuery.isFromView()) {
cteName = createQualifiedObjectName(session, node, node.getName()).toString();
}
session.getCteInformationCollector().addCTEReference(cteName, namedQuery.isFromView());
context.getNestedCteStack().push(cteName, namedQuery.getQuery());
RelationPlan subPlan = process(namedQuery.getQuery(), context);
context.getNestedCteStack().pop(namedQuery.getQuery());
boolean shouldBeMaterialized = getCteMaterializationStrategy(session).equals(ALL) && isCteMaterializable(subPlan.getRoot().getOutputVariables());
session.getCteInformationCollector().addCTEReference(cteName, namedQuery.isFromView(), shouldBeMaterialized);
if (shouldBeMaterialized) {
subPlan = new RelationPlan(
new CteReferenceNode(getSourceLocation(node.getLocation()),
idAllocator.getNextId(), subPlan.getRoot(), context.getNestedCteStack().getRawPath(cteName)),
subPlan.getScope(),
subPlan.getFieldMappings());
}

// Add implicit coercions if view query produces types that don't match the declared output types
// of the view (e.g., if the underlying tables referenced by the view changed)
Expand Down
Expand Up @@ -16,6 +16,12 @@
import com.facebook.presto.Session;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.relational.SqlToRowExpressionTranslator;
import com.facebook.presto.sql.tree.Query;
import com.google.common.annotations.VisibleForTesting;

import java.util.HashMap;
import java.util.Map;
import java.util.Stack;

import static com.facebook.presto.SystemSessionProperties.getMaxLeafNodesInPlan;
import static com.facebook.presto.SystemSessionProperties.isLeafNodeLimitEnabled;
Expand All @@ -28,10 +34,18 @@ public class SqlPlannerContext
private int leafNodesInLogicalPlan;
private final SqlToRowExpressionTranslator.Context translatorContext;

private final NestedCteStack nestedCteStack;

public SqlPlannerContext(int leafNodesInLogicalPlan)
{
this.leafNodesInLogicalPlan = leafNodesInLogicalPlan;
this.translatorContext = new SqlToRowExpressionTranslator.Context();
this.nestedCteStack = new NestedCteStack();
}

public NestedCteStack getNestedCteStack()
{
return nestedCteStack;
}

public SqlToRowExpressionTranslator.Context getTranslatorContext()
Expand All @@ -49,4 +63,58 @@ public void incrementLeafNodes(Session session)
}
}
}

public class NestedCteStack
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NestedCteStack is responsible for managing the scope of the CTE
cteStack keeps track of the current CTEs that are accessible outside this scope.
The rawCtePathMap holds the localCtes along with their respective scopes, keeping them updated.

{
@VisibleForTesting
public static final String delimiter = "_*%$_";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming convention note: static final fields should be in all caps (so DELIMETER)

private final Stack<String> cteStack;
private final Map<String, String> rawCtePathMap;

public NestedCteStack()
{
this.cteStack = new Stack<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize these where they are declared. They don't need anything from the constructor.

this.rawCtePathMap = new HashMap<>();
}

public void push(String cteName, Query query)
{
this.cteStack.push(cteName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style note: we generally only use the "this" prefix in the constructor or setters. Otherwise, we refer to fields without the "this" prefix as long as there is no conflict requiring it. (not sure why, i think just to be concise).

if (query.getWith().isPresent()) {
// All ctes defined in this context should have their paths updated
query.getWith().get().getQueries().forEach(with -> this.addNestedCte(with.getName().toString()));
}
}

public void pop(Query query)
{
this.cteStack.pop();
if (query.getWith().isPresent()) {
query.getWith().get().getQueries().forEach(with -> this.removeNestedCte(with.getName().toString()));
}
}

public String getRawPath(String cteName)
{
if (!this.rawCtePathMap.containsKey(cteName)) {
return cteName;
}
return this.rawCtePathMap.get(cteName);
}

private void addNestedCte(String cteName)
{
this.rawCtePathMap.put(cteName, getCurrentRelativeCtePath() + delimiter + cteName);
}

private void removeNestedCte(String cteName)
{
this.rawCtePathMap.remove(cteName);
}

public String getCurrentRelativeCtePath()
{
return String.join(delimiter, cteStack);
}
}
}
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.cost.TaskCountEstimator;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand Down Expand Up @@ -198,7 +199,7 @@ static double getSourceTablesSizeInBytes(PlanNode node, Lookup lookup, StatsProv
}

List<PlanNode> sourceNodes = PlanNodeSearcher.searchFrom(node, lookup)
.whereIsInstanceOfAny(ImmutableList.of(TableScanNode.class, ValuesNode.class, RemoteSourceNode.class))
.whereIsInstanceOfAny(ImmutableList.of(TableScanNode.class, ValuesNode.class, RemoteSourceNode.class, CteConsumerNode.class))
.findAll();

return sourceNodes.stream()
Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
Expand Down Expand Up @@ -95,6 +96,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.getAggregationPartitioningMergingStrategy;
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
Expand Down Expand Up @@ -616,6 +618,22 @@ public PlanWithProperties visitFilter(FilterNode node, PreferredProperties prefe
return rebaseAndDeriveProperties(node, planChild(node, preferredProperties));
}

@Override
public PlanWithProperties visitSequence(SequenceNode node, PreferredProperties preferredProperties)
{
List<PlanWithProperties> leftPlans = node.getCteProducers().stream()
.map(source -> accept(source, PreferredProperties.any()))
.collect(toImmutableList());
PlanWithProperties rightPlan = accept(node.getPrimarySource(), preferredProperties);
List<PlanNode> childrenNodes = Stream.concat(
leftPlans.stream().map(PlanWithProperties::getNode),
Stream.of(rightPlan.getNode())
).collect(toImmutableList());
return new PlanWithProperties(
node.replaceChildren(childrenNodes),
rightPlan.getProperties());
}

@Override
public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties preferredProperties)
{
Expand Down
Expand Up @@ -19,6 +19,9 @@
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.spi.plan.CteProducerNode;
import com.facebook.presto.spi.plan.CteReferenceNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.ExceptNode;
import com.facebook.presto.spi.plan.FilterNode;
Expand Down Expand Up @@ -55,6 +58,9 @@ public class ApplyConnectorOptimization
implements PlanOptimizer
{
static final Set<Class<? extends PlanNode>> CONNECTOR_ACCESSIBLE_PLAN_NODES = ImmutableSet.of(
CteProducerNode.class,
CteConsumerNode.class,
CteReferenceNode.class,
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For connector optimizers to apply in cases where plan contains these nodes

DistinctLimitNode.class,
FilterNode.class,
TableScanNode.class,
Expand Down
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.common.type.Varchars;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.relation.VariableReferenceExpression;

import java.util.List;

import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;

public class CteUtils
{
private CteUtils()
{
}

// Determines whether the CTE can be materialized.
public static boolean isCteMaterializable(List<VariableReferenceExpression> outputVariables)
{
return outputVariables.stream().anyMatch(CteUtils::isVariableMaterializable)
&& outputVariables.stream()
.allMatch(variableReferenceExpression -> {
if (Varchars.isVarcharType(variableReferenceExpression.getType())) {
return isSupportedVarcharType((VarcharType) variableReferenceExpression.getType());
}
return true;
});
}

/*
Fetches the index of the first variable that can be materialized.
ToDo: Implement usage of NDV (number of distinct values) statistics to identify the best partitioning variable,
as temporary tables are bucketed.
*/
public static Integer getCtePartitionIndex(List<VariableReferenceExpression> outputVariables)
{
for (int i = 0; i < outputVariables.size(); i++) {
if (isVariableMaterializable(outputVariables.get(i))) {
return i;
}
}
throw new PrestoException(GENERIC_INTERNAL_ERROR, "No Partitioning index found");
}

/*
Currently, Hive bucketing does not support the Presto type 'ROW'.
*/
public static boolean isVariableMaterializable(VariableReferenceExpression var)
{
return !(var.getType() instanceof RowType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about other types not supported by Hive such as TIME?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can we add tests for these types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recommended table format is Pagefile which doesn't require translation to hivetypes. Also, there special handling already that uses the format pagefile for these types that are not supported by hive.

if (isUsePageFileForHiveUnsupportedType(session)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some tests like this:

with cte as (select current_time as a)
select a from cte;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test based on ttwithtz, thanks

}

/*
While Presto supports Varchar of length 0 (as discussed in https://github.com/trinodb/trino/issues/1136),
Hive does not support this.
*/
private static boolean isSupportedVarcharType(VarcharType varcharType)
{
return (varcharType.isUnbounded() || varcharType.getLengthSafe() != 0);
}
}
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.relation.CallExpression;
Expand Down Expand Up @@ -166,6 +167,21 @@ public PlanWithProperties visitApply(ApplyNode node, HashComputationSet context)
return new PlanWithProperties(node, ImmutableMap.of());
}

public PlanWithProperties visitSequence(SequenceNode node, HashComputationSet context)
{
List<PlanNode> cteProducers = node.getCteProducers().stream()
.map(c ->
planAndEnforce(c, new HashComputationSet(), true, new HashComputationSet()).getNode())
.collect(ImmutableList.toImmutableList());
PlanWithProperties primarySource = plan(node.getPrimarySource(), context);
return new PlanWithProperties(
replaceChildren(node, ImmutableList.<PlanNode>builder()
.addAll(cteProducers)
.add(primarySource.getNode())
.build()),
primarySource.getHashVariables());
}

@Override
public PlanWithProperties visitLateralJoin(LateralJoinNode node, HashComputationSet context)
{
Expand Down