-
Notifications
You must be signed in to change notification settings - Fork 13.6k
[FLINK-37965][FLINK-37889][FLINK-37966] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule and Semantic + Restore + relPlan + execPlan tests #26687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
fdf0bc7
to
d87ff8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent PR @gustavodemorais! Well tested, well documented, and scoped. I mostly added nit comments.
...r/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinForReorderRule.java
Outdated
Show resolved
Hide resolved
@@ -99,24 +99,24 @@ | |||
* @see CoreRules#JOIN_TO_MULTI_JOIN | |||
*/ | |||
@Value.Enclosing | |||
public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.Config> | |||
public class JoinToMultiJoinForReorderRule extends RelRule<JoinToMultiJoinForReorderRule.Config> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update JavaDoc?
This rule is copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the rule used for reordering and I just tried to give it a better name. The rule is mostly copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule} with some modifications. The javadoc is still accurate sinc.
That said, I fixed the typos I've found.
...ner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
Outdated
Show resolved
Hide resolved
@Override | ||
public boolean matches(RelOptRuleCall call) { | ||
final Join origJoin = call.rel(0); | ||
if (origJoin.getJoinType() != JoinRelType.INNER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we had some issues with the multi way join rules and FOR SYSTEM TIME AS OF
, is it guaranteed that lookups joins are not merged into multijoin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially also interval joins should not be pulled into the multijoin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this look good to you I could add this + tests for the reorder rule in another ticket
Row.ofKind(RowKind.INSERT, "1", "order0", "ProdB"), | ||
Row.ofKind(RowKind.INSERT, "1", "order1", "ProdA"), | ||
Row.ofKind(RowKind.INSERT, "2", "order2", "ProdB"), | ||
Row.ofKind(RowKind.INSERT, "3", "order3", "ProdC"), | ||
Row.ofKind(RowKind.INSERT, "4", "order4", "ProdD"), | ||
Row.ofKind(RowKind.INSERT, "6", "order6", "ProdF"), | ||
Row.ofKind(RowKind.INSERT, "7", "order7", "ProdG"), | ||
Row.ofKind(RowKind.INSERT, "9", "order9", "ProdA")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the ids are mostly perfectly ordered and also always at column 0, I suggest to reorder some and move columns also to the middle or end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've shuffled rows and the columns for the four way complex join for restore and semantic test. Also shuffled columns for the rel/exec/explain tests 👍 570da7b
...r/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
Outdated
Show resolved
Hide resolved
...way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
Outdated
Show resolved
Hide resolved
"rightFieldIndex" : 0 | ||
} ] | ||
}, | ||
"inputUpsertKeys" : [ [ ], [ ], [ ] ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some comment as able: #26683
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I also added @JsonInclude(JsonInclude.Include.NON_EMPTY)
to inputUpsertkeys. However, we'll still have it in the compiled plan because it's not an empty list and has the "relevant information" that we have three inputs.
We can add a workaround like using value = JsonInclude.Include.CUSTOM or some custom logic in the ExecNode but I wasn't sure if it's worth it for the readability for this case. It's always going to be a relatively short list of "empty lists". Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not worth the effort. I agree.
...he/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java
Outdated
Show resolved
Hide resolved
"+I[3, Nomad, order3, payment3, New York]") | ||
"+I[1, Gus, order0, payment1, London]" | ||
// "+I[1, Gus, order1, payment1, London]" // TODO | ||
// Gustavo Why is this being consumed after the restore? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not end up in the Flink code base. Could you figure it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed it, since you're squashing the PR then it won't land in the codebase nor in the history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the first version. Thanks @gustavodemorais!
Thanks for precise and useful reviews, @twalthr! Yeah, let's start with this initial version and I'm constantly mapping new tasks under FLINK-37859 and continuing the work to make this production ready. |
What is the purpose of the change
Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule + e2e Semantic, Restore, relPlan, execPlan and explain tests.
This covers FLINK-37965-FLINK-37889-FLINK-37966
This is the smallest group of changes that makes sense and enables us to test e2e. Out of scope to try not to increase the the PR size even more:
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation