Skip to content

Flink: Dynamic Iceberg Sink: Optimise RowData evolution #13340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 26, 2025

Conversation

aiborodin
Copy link
Contributor

RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:

  1. Adding caching to the conversion write path
  2. Refactoring RowDataEvolver to dynamically instantiate converter classes (quasi code generation)

I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.

@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch from 913c0c6 to 0a6af3a Compare June 19, 2025 08:35
@aiborodin
Copy link
Contributor Author

According to the profile in my previous comment #13340 (comment), schema caching would not be sufficient and we also need to cache field accessors and converters to minimise the CPU overhead. The object overhead is minimal as each converter would only store filed accessors and conversion lambdas. The cache overhead is minimal because it is an identity cache and same schema objects are already cached in TableMetadataCache.

@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch from 0a6af3a to 5c63747 Compare June 20, 2025 07:12
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the rational behind the change. This is an excellent contribution!

@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch 3 times, most recently from c918919 to 8e45f21 Compare June 25, 2025 04:27
Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1
A few small changes, and we are ready

@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch 3 times, most recently from eeb0687 to a888dc3 Compare June 25, 2025 09:25
RowDataEvolver recomputes Flink RowType and field getters for every
input record that needs to match a destination Iceberg table schema.
Cache field getters and column converters to optimise RowData
conversion.
TableMetadataCache already contains an identity cache to store schema
comparison results. Let's move the row data converter cache into
SchemaInfo and make it configurable.
@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch from 83e1150 to 2339e78 Compare June 26, 2025 04:11
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @aiborodin!

@mxm
Copy link
Contributor

mxm commented Jun 26, 2025

Nice last commits 😂

@pvary pvary merged commit 7443e54 into apache:main Jun 26, 2025
18 checks passed
@pvary pvary changed the title Optimise RowData evolution Flink: Dynamic Iceberg Sink: Optimise RowData evolution Jun 26, 2025
@pvary
Copy link
Contributor

pvary commented Jun 26, 2025

Merged to main.
Thanks for the optimization @aiborodin and @mxm for the review.

@aiborodin: Could you please create a backport PR to port these changes to Flink 1.20, 1.19.
This sed command could help:

g diff HEAD~1 HEAD flink/v2.0 |sed "s/v2.0/v1.20/g">/tmp/patch

Also, you need to change anything above cleanly applying the change, please highlight, so it is easier to review.

Thanks for all of your work on this! Happy to have you as a contributor!

@aiborodin aiborodin deleted the optimise-row-data-conversion branch June 27, 2025 06:00
@aiborodin
Copy link
Contributor Author

Thank you for merging and reviewing the change @pvary!
I appreciate your and @mxm's valuable feedback, and it's a pleasure to have you as reviewers.
I raised this PR to backport the changes to Flink 1.19 / 1.20: #13401.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants