Skip to content

[FLINK-37913][table] Add built-in OBJECT_OF function #26704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

raminqaf
Copy link

What is the purpose of the change

This pull request implements the OBJECT_OF function as specified in FLIP-520 to enable creation of structured types in SQL and Table API. The function allows users to construct structured objects from key-value pairs without requiring UDFs, making structured type creation SQL-serializable and improving the overall usability of structured types in Flink.

Brief change log

  • Added OBJECT_OF function definition to BuiltInFunctionDefinitions
  • Implemented ObjectOfInputTypeStrategy for input validation (odd argument count, string keys, unique field names)
  • Implemented ObjectOfTypeStrategy for output type inference (creates structured types)
  • Added ObjectOfFunction runtime implementation that creates RowData from key-value pairs
  • Enhanced Expressions.java with objectOf() methods for Table API support
  • Added Python API support with object_of() function in expressions.py
  • Updated SQL functions documentation with OBJECT_OF function details

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for ObjectOfInputTypeStrategy to validate argument count, type validation, and field name uniqueness
  • Added integration tests in StructuredFunctionsITCase to test end-to-end functionality
  • Verified SQL usage: SELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)
  • Verified Table API usage: objectOf(User.class, "name", "Bob", "age", 42)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (additions to Expressions.java and BuiltInFunctionDefinitions)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs (updated SQL functions documentation, comprehensive JavaDocs for all new classes and methods, Python API documentation)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 19, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gustavodemorais
Copy link
Contributor

I think there's a checkstyle violation making the CI fail. Make sure to run ./mvnw checkstyle:check locally

Comment on lines 126 to 131
"NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = "
+ "OBJECT_OF('"
+ NestedType.class.getName()
+ "', 'n1', OBJECT_OF('"
+ Type1.class.getName()
+ "', 'a', 42, 'b', 'Bob'), "
+ "'n2', OBJECT_OF('"
+ Type2.class.getName()
+ "', 'a', 15, 'b', 'Alice'))",
Copy link
Contributor

@snuyanzin snuyanzin Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it was missed during PTF implementation
will it work with record s?
Asking since we have a dedicated folder for java17 tests
for instance https://github.com/apache/flink/blob/1c34ca011cacdbb3b0f48b485eac89dd913d29bf/flink-tests-java17/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoRecordSerializerUpgradeTestSpecifications.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So far we don't have any tests for records because the language level was too low.

Comment on lines 99 to 118
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new ValidationException(
"The field key at position "
+ keyIndex
+ " must be a STRING/VARCHAR type, but was "
+ logicalType.asSummaryString()
+ ".");
}
Copy link
Contributor

@snuyanzin snuyanzin Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if it is not null (or not nullable)?
Otherwise
output for nullable classname is confusing like

SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice');

gives

Caused by: org.apache.flink.table.api.ValidationException: Invalid function call:
OBJECT_OF(STRING, CHAR(2) NOT NULL, CHAR(1) NOT NULL, CHAR(1) NOT NULL, CHAR(5) NOT NULL)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.

which in fact satisfies the required

Supported signatures are:
OBJECT_OF(STRING, [STRING, ANY]*...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I have addressed the nullability check and added tests.

Copy link
Contributor

@snuyanzin snuyanzin Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will it behave for the case when arg is nullable however we can not resolve during planning like ?

  1. there is a table input with data
class_name1
class_name2
NULL
class_name3
  1. and there is a query
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input

@raminqaf raminqaf force-pushed the FLINK-37913 branch 2 times, most recently from 8f915c5 to bd655e0 Compare June 21, 2025 14:10
@raminqaf
Copy link
Author

@twalthr There is also another case that I have prepared, but so far, it has been stashed on my branch. In the FLIP-520, you mentioned under the table API, this signature:

Expressions.objectOf(DataType, Object... kv);

In SQL, this would be equivalent to:

SELECT OBJECT_OF(OBJECT_OF(...)));

Is this a case we should also support in SQL? Does this mean that we can construct structure types from structured types?

string literals and values can be arbitrary expressions.

Users are responsible for providing a valid fully qualified class name that exists
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User').
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the user classpath?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class name does not need to exist in the class path. It is only used for distinguishing two objects with identical fields. Only when leaving the SQL world to Table API (e.g. for collecting results or UDFs) it should match a class name. But also this is optional if you don't want to use your desired class.


Users are responsible for providing a valid fully qualified class name that exists
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User').
If an invalid or non-existent class name is provided, the function will fall back to using
Copy link
Contributor

@davidradl davidradl Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we fallback to row- instead of erroring?
If we fallback to row.class, what is the implication for the user? i.e. what have they lost?
When we refer to a classname, does this have to be a java bean with matching properties - if so we should document this? If so, can we not introspect the bean for the field names? I see for Row.class we would need the list of field names to be supplied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of how StructuredTypes behave under the hood. It is also explained in the FLIP-520 (Last bulletpoint under Notes)

* <p>Field type normalization includes:
*
* <ul>
* <li>Converting CHARACTER_STRING types (CHAR, VARCHAR) to nullable VARCHAR (STRING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention the approach to nullables in the docs.

Copy link
Author

@raminqaf raminqaf Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! We are still discussing how to handle null values.
#26704 (comment)

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 10, 2025
@raminqaf raminqaf force-pushed the FLINK-37913 branch 2 times, most recently from dabb692 to c01c36d Compare July 11, 2025 09:26
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants