Skip to content

[FLINK-37795] rewrite ml_evaluate to ml_predict and aggregate function #26667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

lihaosky
Copy link
Contributor

@lihaosky lihaosky commented Jun 10, 2025

What is the purpose of the change

Rewrite ml_evaluate table function scan to ml_predict table function scan and LogicalAggreate

Brief change log

Rewrite ml_evaluate table function scan to ml_predict table function scan and LogicalAggreate

Verifying this change

Unit test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@airlock-confluentinc airlock-confluentinc bot force-pushed the model-evaluate-rewrite branch from de7bce9 to b0cc113 Compare June 10, 2025 21:49
@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@airlock-confluentinc airlock-confluentinc bot force-pushed the model-evaluate-rewrite branch 2 times, most recently from 10b36e8 to 84452ad Compare June 16, 2025 19:20
@airlock-confluentinc airlock-confluentinc bot force-pushed the model-evaluate-rewrite branch from 84452ad to e301fb6 Compare June 17, 2025 02:38
}

public static boolean isValidTaskType(String name) {
return Arrays.stream(values()).anyMatch(taskType -> taskType.name.equals(name));
}

public static Optional<RuntimeException> throwOrReturnInvalidTaskType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to call this method validateTaskType.

String task = null;
if (taskNode instanceof RexLiteral) {
task = ((RexLiteral) taskNode).getValueAs(NlsString.class).getValue();
if (task == null || task.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need to if (task == null if we are going to set it to null

task = null;
}
}
if (task == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this message is not accurate as we could have a task but it not be a RexLiteral

private static String getTask(RexCall rexCall) {
final RexNode taskNode = rexCall.getOperands().get(4);
String task = null;
if (taskNode instanceof RexLiteral) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to include a comment as to why we need a RexLiteral here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to have one method

String task = getRexLiteralFrom(rexCall.getOperands().get(4), true);
then all the checking and validation is done in one place.

Also I suggest removing the boolean, and always thowing an exception from the method, that the caller can catch and return as required.

if (!(scan.getCall() instanceof RexCall)) {
return false;
}
RexCall call = (RexCall) scan.getCall();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't call be declared as a final also?

@@ -77,6 +76,7 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(MODEL_VERSION);
options.add(TASK);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about this test, you have added task as a config option to a test model provider factory. I notice that
OpenAIModelProviderFactory implements ModelProviderFactory should this not pickup the task in the same way as the test here?

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 1, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 5, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants