-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37913][table] Add built-in OBJECT_OF function #26704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
...on/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java
Outdated
Show resolved
Hide resolved
I think there's a checkstyle violation making the CI fail. Make sure to run |
"NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = " | ||
+ "OBJECT_OF('" | ||
+ NestedType.class.getName() | ||
+ "', 'n1', OBJECT_OF('" | ||
+ Type1.class.getName() | ||
+ "', 'a', 42, 'b', 'Bob'), " | ||
+ "'n2', OBJECT_OF('" | ||
+ Type2.class.getName() | ||
+ "', 'a', 15, 'b', 'Alice'))", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it was missed during PTF implementation
will it work with record
s?
Asking since we have a dedicated folder for java17 tests
for instance https://github.com/apache/flink/blob/1c34ca011cacdbb3b0f48b485eac89dd913d29bf/flink-tests-java17/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoRecordSerializerUpgradeTestSpecifications.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. So far we don't have any tests for records because the language level was too low.
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { | ||
throw new ValidationException( | ||
"The field key at position " | ||
+ keyIndex | ||
+ " must be a STRING/VARCHAR type, but was " | ||
+ logicalType.asSummaryString() | ||
+ "."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check if it is not null (or not nullable)?
Otherwise
output for nullable classname is confusing like
SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice');
gives
Caused by: org.apache.flink.table.api.ValidationException: Invalid function call:
OBJECT_OF(STRING, CHAR(2) NOT NULL, CHAR(1) NOT NULL, CHAR(1) NOT NULL, CHAR(5) NOT NULL)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.
which in fact satisfies the required
Supported signatures are:
OBJECT_OF(STRING, [STRING, ANY]*...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback! I have addressed the nullability check and added tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how will it behave for the case when arg is nullable however we can not resolve during planning like ?
- there is a table
input
with data
class_name1
class_name2
NULL
class_name3
- and there is a query
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input
...lanner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java
Show resolved
Hide resolved
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java
Show resolved
Hide resolved
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java
Show resolved
Hide resolved
...on/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java
Show resolved
Hide resolved
8f915c5
to
bd655e0
Compare
@twalthr There is also another case that I have prepared, but so far, it has been stashed on my branch. In the FLIP-520, you mentioned under the table API, this signature:
In SQL, this would be equivalent to: SELECT OBJECT_OF(OBJECT_OF(...))); Is this a case we should also support in SQL? Does this mean that we can construct structure types from structured types? |
docs/data/sql_functions.yml
Outdated
string literals and values can be arbitrary expressions. | ||
|
||
Users are responsible for providing a valid fully qualified class name that exists | ||
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User'). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the user classpath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is also mentioned in the StructuredType JavaDocs
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java#L85-L88
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class name does not need to exist in the class path. It is only used for distinguishing two objects with identical fields. Only when leaving the SQL world to Table API (e.g. for collecting results or UDFs) it should match a class name. But also this is optional if you don't want to use your desired class.
docs/data/sql_functions.yml
Outdated
|
||
Users are responsible for providing a valid fully qualified class name that exists | ||
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User'). | ||
If an invalid or non-existent class name is provided, the function will fall back to using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we fallback to row- instead of erroring?
If we fallback to row.class, what is the implication for the user? i.e. what have they lost?
When we refer to a classname, does this have to be a java bean with matching properties - if so we should document this? If so, can we not introspect the bean for the field names? I see for Row.class we would need the list of field names to be supplied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of how StructuredTypes behave under the hood. It is also explained in the FLIP-520 (Last bulletpoint under Notes
)
* <p>Field type normalization includes: | ||
* | ||
* <ul> | ||
* <li>Converting CHARACTER_STRING types (CHAR, VARCHAR) to nullable VARCHAR (STRING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should mention the approach to nullables in the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! We are still discussing how to handle null values.
#26704 (comment)
dabb692
to
c01c36d
Compare
What is the purpose of the change
This pull request implements the
OBJECT_OF
function as specified in FLIP-520 to enable creation of structured types in SQL and Table API. The function allows users to construct structured objects from key-value pairs without requiring UDFs, making structured type creation SQL-serializable and improving the overall usability of structured types in Flink.Brief change log
OBJECT_OF
function definition toBuiltInFunctionDefinitions
ObjectOfInputTypeStrategy
for input validation (odd argument count, string keys, unique field names)ObjectOfTypeStrategy
for output type inference (creates structured types)ObjectOfFunction
runtime implementation that createsRowData
from key-value pairsExpressions.java
withobjectOf()
methods for Table API supportobject_of()
function inexpressions.py
OBJECT_OF
function detailsVerifying this change
This change added tests and can be verified as follows:
ObjectOfInputTypeStrategy
to validate argument count, type validation, and field name uniquenessStructuredFunctionsITCase
to test end-to-end functionalitySELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)
objectOf(User.class, "name", "Bob", "age", 42)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yes (additions toExpressions.java
andBuiltInFunctionDefinitions
)Documentation