Skip to content

Commit 5334e7d

Browse files
[FLINK-37962][table-planner] Add visitMultiJoin to RelTimeIndicatorConverter
This closes #26703.
1 parent 8e7273d commit 5334e7d

File tree

15 files changed

+916
-166
lines changed

15 files changed

+916
-166
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ public RelNode visit(RelNode node) {
164164
} else if (node instanceof FlinkLogicalJoin) {
165165
return visitJoin((FlinkLogicalJoin) node);
166166
} else if (node instanceof FlinkLogicalMultiJoin) {
167-
// TODO FLINK-37962 add visitMultiJoin https://issues.apache.org/jira/browse/FLINK-37962
168-
return visitSimpleRel(node);
167+
return visitMultiJoin((FlinkLogicalMultiJoin) node);
169168
} else if (node instanceof FlinkLogicalSink) {
170169
return visitSink((FlinkLogicalSink) node);
171170
} else if (node instanceof FlinkLogicalLegacySink) {
@@ -521,6 +520,45 @@ private RelNode visitWindowTableAggregate(FlinkLogicalWindowTableAggregate table
521520
tableAgg.getNamedProperties());
522521
}
523522

523+
private RelNode visitMultiJoin(FlinkLogicalMultiJoin multiJoin) {
524+
// visit and materialize children
525+
final List<RelNode> newInputs =
526+
multiJoin.getInputs().stream()
527+
.map(input -> input.accept(this))
528+
.map(this::materializeTimeIndicators)
529+
.collect(Collectors.toList());
530+
531+
final List<RelDataType> allFields =
532+
newInputs.stream().map(RelNode::getRowType).collect(Collectors.toList());
533+
534+
RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(allFields);
535+
536+
final RexNode newJoinFilter = multiJoin.getJoinFilter().accept(materializer);
537+
538+
final List<RexNode> newJoinConditions =
539+
multiJoin.getJoinConditions().stream()
540+
.map(cond -> cond == null ? null : cond.accept(materializer))
541+
.collect(Collectors.toList());
542+
543+
final RexNode newPostJoinFilter =
544+
multiJoin.getPostJoinFilter() == null
545+
? null
546+
: multiJoin.getPostJoinFilter().accept(materializer);
547+
548+
// materialize all output types and remove special time indicator types
549+
RelDataType newOutputType = getRowTypeWithoutTimeIndicator(multiJoin.getRowType());
550+
551+
return FlinkLogicalMultiJoin.create(
552+
multiJoin.getCluster(),
553+
newInputs,
554+
newJoinFilter,
555+
newOutputType,
556+
newJoinConditions,
557+
multiJoin.getJoinTypes(),
558+
newPostJoinFilter,
559+
multiJoin.getHints());
560+
}
561+
524562
private RelNode visitInvalidRel(RelNode node) {
525563
throw new TableException(
526564
String.format(
@@ -640,6 +678,10 @@ private void validateType(RelDataType l, RelDataType r) {
640678
}
641679
}
642680

681+
private RelDataType getRowTypeWithoutTimeIndicator(RelDataType relType) {
682+
return getRowTypeWithoutTimeIndicator(relType, s -> true);
683+
}
684+
643685
private RelDataType getRowTypeWithoutTimeIndicator(
644686
RelDataType relType, Predicate<String> shouldMaterialize) {
645687
Map<String, RelDataType> convertedFields =

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
132132
traitSet,
133133
inputs,
134134
joinFilter,
135-
getRowType(),
135+
rowType,
136136
joinConditions,
137137
joinTypes,
138138
postJoinFilter,
@@ -161,11 +161,6 @@ public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadata
161161
return planner.getCostFactory().makeCost(rowCount, cpu, io);
162162
}
163163

164-
@Override
165-
protected RelDataType deriveRowType() {
166-
return rowType;
167-
}
168-
169164
public RexNode getJoinFilter() {
170165
return joinFilter;
171166
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java

Lines changed: 53 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,33 @@
1919
package org.apache.flink.table.planner.plan.rules.logical;
2020

2121
import org.apache.flink.table.api.TableException;
22-
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
23-
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
22+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
2423
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin;
25-
import org.apache.flink.table.planner.plan.utils.JoinUtil;
24+
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
2625

2726
import org.apache.calcite.plan.RelOptRuleCall;
2827
import org.apache.calcite.plan.RelOptUtil;
2928
import org.apache.calcite.plan.RelRule;
29+
import org.apache.calcite.plan.hep.HepRelVertex;
30+
import org.apache.calcite.plan.volcano.RelSubset;
3031
import org.apache.calcite.rel.RelNode;
32+
import org.apache.calcite.rel.SingleRel;
3133
import org.apache.calcite.rel.core.Join;
3234
import org.apache.calcite.rel.core.JoinInfo;
3335
import org.apache.calcite.rel.core.JoinRelType;
3436
import org.apache.calcite.rel.logical.LogicalJoin;
37+
import org.apache.calcite.rel.logical.LogicalSnapshot;
3538
import org.apache.calcite.rel.rules.CoreRules;
3639
import org.apache.calcite.rel.rules.FilterMultiJoinMergeRule;
3740
import org.apache.calcite.rel.rules.MultiJoin;
3841
import org.apache.calcite.rel.rules.ProjectMultiJoinMergeRule;
3942
import org.apache.calcite.rel.rules.TransformationRule;
40-
import org.apache.calcite.rel.type.RelDataType;
4143
import org.apache.calcite.rel.type.RelDataTypeField;
4244
import org.apache.calcite.rex.RexBuilder;
4345
import org.apache.calcite.rex.RexInputRef;
4446
import org.apache.calcite.rex.RexNode;
4547
import org.apache.calcite.rex.RexUtil;
4648
import org.apache.calcite.rex.RexVisitorImpl;
47-
import org.apache.calcite.sql.validate.SqlValidatorUtil;
4849
import org.apache.calcite.tools.RelBuilderFactory;
4950
import org.apache.calcite.util.ImmutableBitSet;
5051
import org.apache.calcite.util.ImmutableIntList;
@@ -140,25 +141,21 @@ public JoinToMultiJoinRule(
140141

141142
// ~ Methods ----------------------------------------------------------------
142143

144+
/**
145+
* This rule matches only INNER and LEFT joins. Right joins are expected to be rewritten to left
146+
* joins by the optimizer with {@link FlinkRightJoinToLeftJoinRule}
147+
*/
143148
@Override
144149
public boolean matches(RelOptRuleCall call) {
145150
final Join origJoin = call.rel(0);
146151
if (origJoin.getJoinType() != JoinRelType.INNER
147152
&& origJoin.getJoinType() != JoinRelType.LEFT) {
148-
/* This rule expects only INNER and LEFT joins. Right joins are expected to be
149-
rewritten to left joins by the optimizer with {@link FlinkRightJoinToLeftJoinRule} */
150-
return false;
151-
}
152-
final RelNode left = call.rel(1);
153-
final RelNode right = call.rel(2);
154-
155-
// Check for temporal/lookup joins (FOR SYSTEM_TIME AS OF) - these should not be merged
156-
if (containsSnapshot(left) || containsSnapshot(right)) {
157153
return false;
158154
}
159155

160-
// Check for interval joins - these should not be merged as they have special time semantics
161-
if (hasTimeAttributes(origJoin, left, right)) {
156+
// Check for interval joins and temporal join - these should not be merged
157+
// as they have special time semantics
158+
if (isIntervalJoin(origJoin) || isTemporalJoin(call)) {
162159
return false;
163160
}
164161

@@ -663,66 +660,62 @@ private List<RexNode> combinePostJoinFilters(Join joinRel, RelNode left, RelNode
663660
}
664661

665662
/**
666-
* Checks if a RelNode tree contains FlinkLogicalSnapshot nodes, which indicate temporal/lookup
667-
* joins. These joins have special semantics and should not be merged into MultiJoin.
663+
* Checks if a join is an interval join. Interval joins have special time-based semantics and
664+
* should not be merged into MultiJoin.
668665
*
669-
* @param node the RelNode to check
670-
* @return true if the node or its children contain FlinkLogicalSnapshot
666+
* @param join the join to check
667+
* @return true if the join condition or outputs access time attributes
671668
*/
672-
private boolean containsSnapshot(RelNode node) {
673-
if (node instanceof FlinkLogicalSnapshot) {
669+
private boolean isIntervalJoin(Join join) {
670+
if (!(join instanceof LogicalJoin)) {
674671
return true;
675672
}
676673

677-
// Check if any input contains a snapshot
678-
for (RelNode input : node.getInputs()) {
679-
if (containsSnapshot(input)) {
680-
return true;
681-
}
682-
}
683-
684-
return false;
674+
FlinkLogicalJoin flinkLogicalJoin =
675+
(FlinkLogicalJoin) FlinkLogicalJoin.CONVERTER().convert(join);
676+
return IntervalJoinUtil.satisfyIntervalJoin(flinkLogicalJoin);
685677
}
686678

687679
/**
688-
* Checks if a join accesses time attributes, which indicates an interval join. Interval joins
689-
* have special time-based semantics and should not be merged into MultiJoin.
680+
* Checks if a join is a temporal/lookup join. Interval joins have special time-based semantics
681+
* (FOR SYSTEM_TIME AS OF) and should not be merged into a MultiJoin.
690682
*
691-
* @param join the join to check
692-
* @param left the left input
693-
* @param right the right input
683+
* @param call the join call to check
694684
* @return true if the join condition or outputs access time attributes
695685
*/
696-
private boolean hasTimeAttributes(Join join, RelNode left, RelNode right) {
697-
// Time attributes must not be in the output type of regular join
698-
final boolean timeAttrInOutput =
699-
join.getRowType().getFieldList().stream()
700-
.anyMatch(f -> FlinkTypeFactory.isTimeIndicatorType(f.getType()));
701-
if (timeAttrInOutput) {
702-
return true;
703-
}
686+
private boolean isTemporalJoin(RelOptRuleCall call) {
687+
final RelNode left = call.rel(1);
688+
final RelNode right = call.rel(2);
704689

705-
// Join condition must not access time attributes
706-
if (join.getCondition() != null) {
707-
final RelDataType inputsRowType =
708-
createInputsRowType(left, right, join.getCluster().getTypeFactory());
709-
return JoinUtil.accessesTimeAttribute(join.getCondition(), inputsRowType);
690+
if (containsSnapshot(left) || containsSnapshot(right)) {
691+
return true;
710692
}
711-
712693
return false;
713694
}
714695

715-
/** Creates a combined row type from the left and right inputs for time attribute checking. */
716-
private RelDataType createInputsRowType(
717-
RelNode left,
718-
RelNode right,
719-
org.apache.calcite.rel.type.RelDataTypeFactory typeFactory) {
720-
return SqlValidatorUtil.createJoinType(
721-
typeFactory,
722-
left.getRowType(),
723-
right.getRowType(),
724-
null,
725-
java.util.Collections.emptyList());
696+
/**
697+
* Checks if a RelNode tree contains FlinkLogicalSnapshot nodes, which indicate temporal/lookup
698+
* joins. These joins have special semantics and should not be merged into MultiJoin.
699+
*
700+
* @param relNode the RelNode to check
701+
* @return true if the node or its children contain FlinkLogicalSnapshot
702+
*/
703+
private boolean containsSnapshot(RelNode relNode) {
704+
RelNode original = null;
705+
if (relNode instanceof RelSubset) {
706+
original = ((RelSubset) relNode).getOriginal();
707+
} else if (relNode instanceof HepRelVertex) {
708+
original = ((HepRelVertex) relNode).getCurrentRel();
709+
} else {
710+
original = relNode;
711+
}
712+
if (original instanceof LogicalSnapshot) {
713+
return true;
714+
} else if (original instanceof SingleRel) {
715+
return containsSnapshot(((SingleRel) original).getInput());
716+
} else {
717+
return false;
718+
}
726719
}
727720

728721
// ~ Inner Classes ----------------------------------------------------------

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMultiJoinRule.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,22 @@
2323
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMultiJoin;
2424
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
2525
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin;
26-
import org.apache.flink.table.planner.plan.utils.JoinUtil;
2726
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
2827
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef;
2928
import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
3029
import org.apache.flink.table.types.logical.RowType;
3130

3231
import org.apache.calcite.plan.RelOptRule;
33-
import org.apache.calcite.plan.RelOptRuleCall;
3432
import org.apache.calcite.plan.RelTraitSet;
3533
import org.apache.calcite.rel.RelNode;
3634
import org.apache.calcite.rel.convert.ConverterRule;
37-
import org.apache.calcite.rel.type.RelDataType;
38-
import org.apache.calcite.rel.type.RelDataTypeFactory;
3935
import org.apache.calcite.rex.RexCall;
4036
import org.apache.calcite.rex.RexInputRef;
4137
import org.apache.calcite.rex.RexNode;
4238
import org.apache.calcite.sql.SqlKind;
43-
import org.apache.calcite.sql.validate.SqlValidatorUtil;
4439
import org.checkerframework.checker.nullness.qual.Nullable;
4540

4641
import java.util.ArrayList;
47-
import java.util.Collections;
4842
import java.util.HashMap;
4943
import java.util.List;
5044
import java.util.Map;
@@ -63,19 +57,6 @@ private StreamPhysicalMultiJoinRule() {
6357
"StreamPhysicalMultiJoinRule"));
6458
}
6559

66-
@Override
67-
public boolean matches(final RelOptRuleCall call) {
68-
final FlinkLogicalMultiJoin multiJoin = call.rel(0);
69-
70-
if (isTemporalJoin(multiJoin)) {
71-
return false;
72-
}
73-
74-
// Time attributes are not allowed in regular joins, as they are used for time-based
75-
// operations like windowing. See FLINK-37962 for more details.
76-
return !hasTimeAttributes(multiJoin);
77-
}
78-
7960
@Override
8061
public RelNode convert(final RelNode rel) {
8162
final FlinkLogicalMultiJoin multiJoin = (FlinkLogicalMultiJoin) rel;
@@ -121,45 +102,6 @@ private boolean isTemporalJoin(final FlinkLogicalMultiJoin multiJoin) {
121102
.anyMatch(input -> input instanceof FlinkLogicalSnapshot);
122103
}
123104

124-
private boolean hasTimeAttributes(final FlinkLogicalMultiJoin multiJoin) {
125-
// Time attributes must not be in the output type of regular join.
126-
final boolean timeAttrInOutput =
127-
multiJoin.getRowType().getFieldList().stream()
128-
.anyMatch(f -> FlinkTypeFactory.isTimeIndicatorType(f.getType()));
129-
if (timeAttrInOutput) {
130-
return true;
131-
}
132-
133-
// Join condition must not access time attributes.
134-
final List<RelNode> inputs = multiJoin.getInputs();
135-
if (inputs.isEmpty()) {
136-
return false;
137-
}
138-
139-
final RelDataType inputsRowType = createInputsRowType(multiJoin);
140-
return JoinUtil.accessesTimeAttribute(multiJoin.getJoinFilter(), inputsRowType);
141-
}
142-
143-
private RelDataType createInputsRowType(final FlinkLogicalMultiJoin multiJoin) {
144-
final List<RelDataType> inputTypes =
145-
multiJoin.getInputs().stream()
146-
.map(RelNode::getRowType)
147-
.collect(Collectors.toList());
148-
final RelDataTypeFactory typeFactory = multiJoin.getCluster().getTypeFactory();
149-
150-
RelDataType connectedInputsRowType = inputTypes.get(0);
151-
for (int i = 1; i < inputTypes.size(); i++) {
152-
connectedInputsRowType =
153-
SqlValidatorUtil.createJoinType(
154-
typeFactory,
155-
connectedInputsRowType,
156-
inputTypes.get(i),
157-
null,
158-
Collections.emptyList());
159-
}
160-
return connectedInputsRowType;
161-
}
162-
163105
private Map<Integer, List<ConditionAttributeRef>> createJoinAttributeMap(
164106
final FlinkLogicalMultiJoin multiJoin) {
165107
final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap = new HashMap<>();

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinRestoreTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public List<TableTestProgram> programs() {
3636
return Arrays.asList(
3737
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_RESTORE,
3838
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_RESTORE,
39-
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX_WITH_RESTORE);
39+
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX_WITH_RESTORE,
40+
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION_WITH_RESTORE);
4041
}
4142
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
3030
public List<TableTestProgram> programs() {
3131
return List.of(
3232
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN,
33-
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_WHERE,
3433
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN,
3534
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_UPDATING,
36-
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX);
35+
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_WHERE,
36+
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX,
37+
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION);
3738
}
3839
}

0 commit comments

Comments
 (0)