-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Flink: Dynamic Iceberg Sink: Optimise RowData evolution #13340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:
- Adding caching to the conversion write path
- Refactoring
RowDataEvolver
to dynamically instantiate converter classes (quasi code generation)
I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/MapConverter.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
....0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
Outdated
Show resolved
Hide resolved
913c0c6
to
0a6af3a
Compare
According to the profile in my previous comment #13340 (comment), schema caching would not be sufficient and we also need to cache field accessors and converters to minimise the CPU overhead. The object overhead is minimal as each converter would only store filed accessors and conversion lambdas. The cache overhead is minimal because it is an identity cache and same schema objects are already cached in TableMetadataCache. |
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/ArrayConverter.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/ArrayConverter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/DataConverter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/MapConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
0a6af3a
to
5c63747
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining the rational behind the change. This is an excellent contribution!
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
c918919
to
8e45f21
Compare
flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/CacheBenchmark.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
A few small changes, and we are ready
eeb0687
to
a888dc3
Compare
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.
TableMetadataCache already contains an identity cache to store schema comparison results. Let's move the row data converter cache into SchemaInfo and make it configurable.
83e1150
to
2339e78
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @aiborodin!
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java
Show resolved
Hide resolved
Nice last commits 😂 |
Merged to main. @aiborodin: Could you please create a backport PR to port these changes to Flink 1.20, 1.19.
Also, you need to change anything above cleanly applying the change, please highlight, so it is easier to review. Thanks for all of your work on this! Happy to have you as a contributor! |
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.