New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support persistent CTE's #20887
Support persistent CTE's #20887
Conversation
a226e08
to
61c61cb
Compare
61c61cb
to
ed19859
Compare
ed19859
to
2b45bcf
Compare
deb4054
to
49dd17c
Compare
85ae4bd
to
6d1c216
Compare
Codenotify: Notifying subscribers in CODENOTIFY files for diff 3b8e8cc...8993b02. No notifications. |
General comment: I wonder if it would have been possible to achieve the same without introducing new syntax (for example through a session parameter). I know if't not as flexible as allowing specification per CTE but also not as disruptive as new syntax |
presto-spi/src/main/java/com/facebook/presto/spi/plan/SequenceNode.java
Outdated
Show resolved
Hide resolved
305b9bc
to
9852616
Compare
makes sense. |
Yes, the optimizer and entire flow remains unchanged unless the relational planner adds a cteReferenceNode, which only occurs if the specific flag is enabled. |
Does this PR look good to merge? Will need a committer approval |
As a follow up PR, I will also add a materialization strategy to only materialize ctes if references > threshold times and have a join or aggregate and maybe reduce data size |
I'll have a look this week, other committers are free to get to it sooner! |
catch (PrestoException e) { | ||
if (e.getErrorCode().equals(NOT_SUPPORTED.toErrorCode())) { | ||
throw new PrestoException( | ||
NOT_SUPPORTED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add this check to is isCteMaterializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here its just a catch for the metadata.createTemporaryTable() which cannot be invoked in the parser since we do not have partition metadata and other info there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
9852616
to
15a1700
Compare
Adding a nit (using partitioningProviderCatalog) instead of hardcoded "hive" string in a use case. |
15a1700
to
90b46c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure to consistently validate inputs. Please squash the Tests
commit.
@Override | ||
public PlanNode replaceChildren(List<PlanNode> newChildren) | ||
{ | ||
checkArgument(newChildren.size() == 1, "expected newChildren to contain 1 node"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also validate that the input is not null with Objects.requireNonNull
.
{ | ||
} | ||
|
||
public static TableScanNode createTemporaryTableScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and elsewhere: Please validate inputs are not null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks added in other cases, not adding here because not really needed at this static function.
|
||
public List<PlanNode> getTopologicalOrdering() | ||
{ | ||
List<PlanNode> topSortedCteProducerList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use immutable list
|
||
public Map<String, CteProducerNode> getCteProducerMap() | ||
{ | ||
return cteProducerMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this return an immutable copy
} | ||
|
||
@Override | ||
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and elsewhere--please always validate the inputs are not null
|
||
public CteProducerRewiter(Session session, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator) | ||
{ | ||
this.idAllocator = idAllocator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
|
||
@Override | ||
public PlanNode visitCteProducer(CteProducerNode node, RewriteContext<PhysicalCteTransformerContext> context) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
tempScan.getCurrentConstraint(), | ||
tempScan.getEnforcedConstraint()); | ||
|
||
/* The temporary table scan might have columns removed by the UnaliasSymbolReferences and other optimizers (its a plan tree after all), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we prefer //
comments
public TemporaryTableInfo(TableScanNode tableScanNode, List<VariableReferenceExpression> originalOutputVariables) | ||
{ | ||
this.tableScanNode = tableScanNode; | ||
this.originalOutputVariables = originalOutputVariables; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check inputs are not null, and create an immutable copy of the original output variables (no cost if the input is immutable as well)
{ | ||
// Since this is topologically sorted by the LogicalCtePlanner, need to make sure that execution order follows | ||
// Can be optimized further to avoid non dependents from getting blocked | ||
int n = node.getCteProducers().size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use single letter variables. cteProducerCount
or something similar
Please look into the merge conflicts. |
4e97080
to
a268f5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one last nit, and I think good to merge.
presto-spi/src/main/java/com/facebook/presto/spi/plan/CteReferenceNode.java
Outdated
Show resolved
Hide resolved
ab89eb0
to
8993b02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments for Presto style and best practices that are likely to come up elsewhere in the future.
String cteName) | ||
{ | ||
super(sourceLocation, id, statsEquivalentPlanNode); | ||
this.cteName = cteName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always check requireNonNull() when setting Object fields in a constructor. (sometimes other argument checks are appropriate as well, but we almost never allow nulls. Generally we enforce this in constructors, and then elsewhere assume that nothing is null.)
public class NestedCteStack | ||
{ | ||
@VisibleForTesting | ||
public static final String delimiter = "_*%$_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming convention note: static final fields should be in all caps (so DELIMETER)
|
||
public NestedCteStack() | ||
{ | ||
this.cteStack = new Stack<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize these where they are declared. They don't need anything from the constructor.
|
||
public void push(String cteName, Query query) | ||
{ | ||
this.cteStack.push(cteName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style note: we generally only use the "this" prefix in the constructor or setters. Otherwise, we refer to fields without the "this" prefix as long as there is no conflict requiring it. (not sure why, i think just to be concise).
PlanNode primarySource) | ||
{ | ||
super(sourceLocation, planNodeId, statsEquivalentPlanNode); | ||
this.cteProducers = leftList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when setting a field to a collection passed in as input, make a defensive copy using ImmutableList.copyOf(). If it's already an immutable list this will be a no-op. (these should have requireNonNullChecks too)
{ | ||
List<PlanNode> children = new ArrayList<>(cteProducers); | ||
children.add(primarySource); | ||
return children; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use immutable collections whenever possible. It prevents bugs caused by things being modified unexpectedly. mutable collections should only be used when you plan something to be modified, and should generally stay local to wherever they are.
Here you could do
return ImmutableList.builder()
.addAll(cteProducers);
.add(primarySource)
.build();
Description
Support persistent query-level CTEs
The goal is to allow users to define persistent ctes and support them
Session property cte_materialization_strategy is added which when set to ALL will materialize all ctes to HDFS
This will be the first step for 19744.
Motivation and Context
This change is based on the design. The eventual goal is to materialize or stream the CTEs (either in local memory or remote) using CBO rules and the architecture reflects that.
The temporary tables created for each cte will be partitioned on the first input column
In the current implementation, all CTEs will be evaluated before the query is run.
This change has 5 main commits
Add session property and ast changes
This change adds the session property which allows CTE materialization. This information is also added to the CTEInformation collector.
The relational planner has the main change where a CTEReference Node is added if a CTE is persistent. This reference node will be used by the LogicalCteOptimizer to process.
Planner changes
Main changes
LogicalCteOptimizer - Processes the CTE references into CteConsumers and CteProducers. It also creates a sequence node which is added as a child below the topmost node. The sequence node has a list of leftSources and a right source. The leftSources list has the ctes ordered in a topological order of execution. {a,b,c,d} --> order of execution {d->c->b->a}.
PhysicalCteOptimizer - Processes the CteConsumers and CteProducers into table writes and temporary table scan. The processing code is based on basePlanFragmenter. From logical to physical optimizers, the CteProducer, CteConsumers, and Sequence Node are present to enable further logical optimizations. Physical optimization is applied just before adding exchanges. Post this optimization stage, CteConsumers and CteProducers are no longer present.
Code change in other optimizers and visitors
AddExchanges.java - Handling Sequence Node
HashGenerationOptimizer.java - Handling Sequence Node
StreamPropertyDerivations.java - Handling Sequence Node
PropertyDerivations.java - Handling Sequence Node
PruneUnreferencedOutputs.java - Handling CteConsumer, CteProducer and Sequnce
PushdownSubfields.java - handling CteProducer Node
StreamPropertyDerivations.java - handling sequence
UnaliasSymbolReferences.java - Handling CteConsumer, CteProducer and Sequnce
Scheduling changes
BasePlanFragmenter - Handle and transform sequence nodes. {a,b,c,d} --> order of execution {d->c->b->a}
We will create a chain of different sections of plans {d->c->b->a} and add it as a child of the main query section.
StreamingPlanSection will make sure that the children are scheduled and executed before the parent
Tests
Integration test to test end to end and result correctness
Testing logical cte optimizer
Current wips
Impact
Test Plan
Production tested with ALL CTEs materialized
Contributor checklist
Todo in future PRs
Final Quality ToDos
Release Notes
Please follow release notes guidelines and fill in the release notes below.