Skip to content

Optimise RowData evolution #13340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

aiborodin
Copy link

RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:

  1. Adding caching to the conversion write path
  2. Refactoring RowDataEvolver to dynamically instantiate converter classes (quasi code generation)

I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.


@Internal
class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal>
implements Collector<DynamicRecord> {
private static final int ROW_DATA_CONVERTER_CACHE_MAXIMUM_SIZE = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable, similarly to the other caches.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a configuration option for this cache. However, it feels like we should do it differently. I made a change locally, which I can raise as a PR, to merge the RowDataConverter cache into the SchemaInfo in the TableMetadataCache. This design means each converter is stored with its source schema and a comparison result from the schema visitor. We can then make the static constant TableMetadataCache.MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE configurable to adjust the converter cache size instead of the option I added.
The only problem with the second approach is that the cache will recompute converters every refreshMs for all operators (1 second by default). This would probably have a small performance overhead, but I need to confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aiborodin: Tests are needed, but my guess that adding the converter to the TableMetadataCache is a good idea. The cache timeout is only used for missing items. So I don't think we will have an issue with the refresh.

A bit concerned with the size of the converters if they are in the TableMetadataCache. The cache items are only evicted if they are not used, or the tables are updated. So old converters are still kept until the table is not used anymore. This might not be a big issue, but we have to be aware of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I support to move the cache to the TableMetadataCache

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit to this PR, which moves the converter cache into TableMetadataCache.

@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch from 913c0c6 to 0a6af3a Compare June 19, 2025 08:35
@aiborodin
Copy link
Author

According to the profile in my previous comment #13340 (comment), schema caching would not be sufficient and we also need to cache field accessors and converters to minimise the CPU overhead. The object overhead is minimal as each converter would only store filed accessors and conversion lambdas. The cache overhead is minimal because it is an identity cache and same schema objects are already cached in TableMetadataCache.

for (int i = 0; i < convertedArray.length; i++) {
Object element = elementGetter.getElementOrNull(arrayData, i);
convertedArray[i] = elementConverter.convert(element);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this is not enforced by checkstyle / spotless :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same :) Shall we start a thread on the mailing list?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a PR to enforce it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a discussion to remove this style ;)

RowDataEvolver recomputes Flink RowType and field getters for every
input record that needs to match a destination Iceberg table schema.
Cache field getters and column converters to optimise RowData
conversion.
TableMetadataCache already contains an identity cache to store schema
comparison results. Let's move the row data converter cache into
SchemaInfo and make it configurable.
@aiborodin aiborodin force-pushed the optimise-row-data-conversion branch from 0a6af3a to 5c63747 Compare June 20, 2025 07:12
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the rational behind the change. This is an excellent contribution!

Comment on lines +319 to +328
/**
* Maximum input {@link org.apache.iceberg.Schema} objects to cache per each Iceberg table. The
* cache improves Dynamic Sink performance by storing {@link org.apache.iceberg.Schema}
* comparison results.
*/
public Builder<T> inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaximumSize) {
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
return this;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need an extra option or whether we can use cacheMaxSize for now. The conversion cache isn't fundamentally different than the other caches. Eventually, I think we want something like

public Build<T> cacheConfiguration(CacheConfiguration config);

where we configure all caches in one go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are more conservative about the possible configuration options. I would suggest to refrain from introducing a new config until we know how we would like to do it. Otherwise we need to keep the old one for backward compatibility reasons.
If we are not sure we can start with some reasonable static value, and add the config later

Comment on lines 60 to +62
TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) {
this(catalog, maximumSize, refreshMs, MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this constructor or annotate with @VisibleForTesting? I think its best to remove it to avoid confusion.

this.converter = converter;
}

Schema tableSchema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be a selectedTableSchema? Or something more descriptive than tableSchema

@@ -220,37 +238,59 @@ SchemaInfo getSchemaInfo() {
*/
static class SchemaInfo {
private final Map<Integer, Schema> schemas;
private final Map<Schema, Tuple2<Schema, CompareSchemasVisitor.Result>> lastResults;
private final Cache<Schema, SchemaCompareInfo> lastResults;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the performance of this cache?
I specifically used LimitedLinkedHashMap to avoid the extra hit on performance caused by the cache, and the eviction policy enforcement

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants