Skip to content

[FLINK-37965][FLINK-37889][FLINK-37966] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule and Semantic + Restore + relPlan + execPlan tests #26687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

gustavodemorais
Copy link
Contributor

@gustavodemorais gustavodemorais commented Jun 17, 2025

What is the purpose of the change

Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule + e2e Semantic, Restore, relPlan, execPlan and explain tests.

This covers FLINK-37965-FLINK-37889-FLINK-37966

This is the smallest group of changes that makes sense and enables us to test e2e. Out of scope to try not to increase the the PR size even more:

  1. use right join to left join rule + tests for it it can be added in another PR.
  2. RelTimeIndicatorConverter
  3. Extend JoinToMultiJoinRule to check for common join key

Brief change log

  • Add PhysicalMultiJoinRule
  • Add FlinkLogicalMultiJoin
  • Add LogicalJoinToMultiJoinRule

Verifying this change

  • Added Semantic tests
  • Added Restore tests
  • Added verifyRelPlan tests
  • Added verifyExecPlan tests
  • Added verifyExplain tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (docs / JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 17, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gustavodemorais gustavodemorais force-pushed the FLINK-37965-FLINK-37889-FLINK-37966 branch from fdf0bc7 to d87ff8c Compare June 17, 2025 13:52
@gustavodemorais gustavodemorais changed the title [FLINK-37965] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule, Semantic tests and Restore tests [FLINK-37965] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule and Semantic + Restore + relPlan + execPlan tests Jun 17, 2025
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent PR @gustavodemorais! Well tested, well documented, and scoped. I mostly added nit comments.

@@ -99,24 +99,24 @@
* @see CoreRules#JOIN_TO_MULTI_JOIN
*/
@Value.Enclosing
public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.Config>
public class JoinToMultiJoinForReorderRule extends RelRule<JoinToMultiJoinForReorderRule.Config>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update JavaDoc?

This rule is copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule}

Copy link
Contributor Author

@gustavodemorais gustavodemorais Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the rule used for reordering and I just tried to give it a better name. The rule is mostly copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule} with some modifications. The javadoc is still accurate sinc.

That said, I fixed the typos I've found.

@Override
public boolean matches(RelOptRuleCall call) {
final Join origJoin = call.rel(0);
if (origJoin.getJoinType() != JoinRelType.INNER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we had some issues with the multi way join rules and FOR SYSTEM TIME AS OF, is it guaranteed that lookups joins are not merged into multijoin?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially also interval joins should not be pulled into the multijoin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we avoid matching temporal joins. I basically had done the same we do for binary joins, see

.

I took a look again and I agree this doesn't look complete. I've added the necessary checks in the logical rule + tests for it, see e0ccc9c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this look good to you I could add this + tests for the reorder rule in another ticket

Comment on lines 171 to 178
Row.ofKind(RowKind.INSERT, "1", "order0", "ProdB"),
Row.ofKind(RowKind.INSERT, "1", "order1", "ProdA"),
Row.ofKind(RowKind.INSERT, "2", "order2", "ProdB"),
Row.ofKind(RowKind.INSERT, "3", "order3", "ProdC"),
Row.ofKind(RowKind.INSERT, "4", "order4", "ProdD"),
Row.ofKind(RowKind.INSERT, "6", "order6", "ProdF"),
Row.ofKind(RowKind.INSERT, "7", "order7", "ProdG"),
Row.ofKind(RowKind.INSERT, "9", "order9", "ProdA"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the ids are mostly perfectly ordered and also always at column 0, I suggest to reorder some and move columns also to the middle or end.

Copy link
Contributor Author

@gustavodemorais gustavodemorais Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've shuffled rows and the columns for the four way complex join for restore and semantic test. Also shuffled columns for the rel/exec/explain tests 👍 570da7b

"rightFieldIndex" : 0
} ]
},
"inputUpsertKeys" : [ [ ], [ ], [ ] ],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comment as able: #26683

Copy link
Contributor Author

@gustavodemorais gustavodemorais Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I also added @JsonInclude(JsonInclude.Include.NON_EMPTY) to inputUpsertkeys. However, we'll still have it in the compiled plan because it's not an empty list and has the "relevant information" that we have three inputs.

We can add a workaround like using value = JsonInclude.Include.CUSTOM or some custom logic in the ExecNode but I wasn't sure if it's worth it for the readability for this case. It's always going to be a relatively short list of "empty lists". Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth the effort. I agree.

"+I[3, Nomad, order3, payment3, New York]")
"+I[1, Gus, order0, payment1, London]"
// "+I[1, Gus, order1, payment1, London]" // TODO
// Gustavo Why is this being consumed after the restore?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not end up in the Flink code base. Could you figure it out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed it, since you're squashing the PR then it won't land in the codebase nor in the history

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the first version. Thanks @gustavodemorais!

@gustavodemorais gustavodemorais changed the title [FLINK-37965] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule and Semantic + Restore + relPlan + execPlan tests [FLINK-37965][FLINK-37889][FLINK-37966] Add PhysicalMultiJoinRule, FlinkLogicalMultiJoin, LogicalJoinToMultiJoinRule and Semantic + Restore + relPlan + execPlan tests Jun 18, 2025
@gustavodemorais
Copy link
Contributor Author

LGTM for the first version. Thanks @gustavodemorais!

Thanks for precise and useful reviews, @twalthr!

Yeah, let's start with this initial version and I'm constantly mapping new tasks under FLINK-37859 and continuing the work to make this production ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants