Skip to content

[FLINK-37889] [table-planner] Add JoinToMultiJoinRule #26689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

SteveStevenpoor
Copy link
Contributor

@SteveStevenpoor SteveStevenpoor commented Jun 17, 2025

What is the purpose of the change

Introduce FlinkStreamJoinToMultiJoinRule

It also covers FLINK-37890

Brief change log

  • Added FlinkStreamJoinToMultiJoinRule
  • Added FlinkMultiJoinNode

Verifying this change

  • Added rel plan tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving)`: no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@SteveStevenpoor
Copy link
Contributor Author

Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 17, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gustavodemorais
Copy link
Contributor

Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.

Hey @SteveStevenpoor, I understand, sorry for the rush because of the deadline. We're far with #26687 and will probably merge it today or tomorrow.

If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge #26687. Some observations from reviewing this PR:

  • No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.
  • We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.
  • The rule shouldn't cover right joins since we got the other for that.
  • We'd want e2e table programs to test it as shown in the other PR.

If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.

@SteveStevenpoor
Copy link
Contributor Author

If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge #26687. Some observations from reviewing this PR:

I'll do it!

No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.

Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?

We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.

I needed it to make check for commonJoinKey more simple. I can use calcite's mj instead.

The rule shouldn't cover right joins since we got the other for that.

What do you mean by covering right joins? The rule will be applied only for inner and left joins:

        final Join origJoin = call.rel(0);
        if (origJoin.getJoinType() != JoinRelType.INNER
                && origJoin.getJoinType() != JoinRelType.LEFT) {
            return false;
        }

If you are talking about checking that right child is projection\join that's because I did JoinToMultiJoinRule with the respect to RightToLeftJoinRule which swaps inputs and adds projection. I needed to cover this case.

We'd want e2e table programs to test it as shown in the other PR.

Affirmative.

If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.

I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.

@gustavodemorais
Copy link
Contributor

gustavodemorais commented Jun 18, 2025

Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?

Exactly 👍 Or in a more realistic scenario, if we have A JOIN B JOIN C JOIN D JOIN E, we could have MJ(MJ(A, B, C), D, E). Or even more concatenated multi joins.

What do you mean by covering right joins? The rule will be applied only for inner and left joins:

I saw you had some tests with right joins. I didn't think it through for project and join nodes playing with the RightJoinToLeftJoin rule. I think that the multijoin check (right instanceof FlinkMultiJoinNode) can be dropped?

I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.

If you're not 100% done you can also open a draft PR so I can collaborate tomorrow with you. Or else I'll have to start my own. And if you don't have the time, it's ok and we can sync on the next items. Thanks, Stepan.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants