-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Optimise RowData evolution #13340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Optimise RowData evolution #13340
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:
- Adding caching to the conversion write path
- Refactoring
RowDataEvolver
to dynamically instantiate converter classes (quasi code generation)
I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/MapConverter.java
Outdated
Show resolved
Hide resolved
|
||
@Internal | ||
class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal> | ||
implements Collector<DynamicRecord> { | ||
private static final int ROW_DATA_CONVERTER_CACHE_MAXIMUM_SIZE = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be configurable, similarly to the other caches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a configuration option for this cache. However, it feels like we should do it differently. I made a change locally, which I can raise as a PR, to merge the RowDataConverter
cache into the SchemaInfo
in the TableMetadataCache
. This design means each converter is stored with its source schema and a comparison result from the schema visitor. We can then make the static constant TableMetadataCache.MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE
configurable to adjust the converter cache size instead of the option I added.
The only problem with the second approach is that the cache will recompute converters every refreshMs
for all operators (1 second by default). This would probably have a small performance overhead, but I need to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aiborodin: Tests are needed, but my guess that adding the converter to the TableMetadataCache
is a good idea. The cache timeout is only used for missing items. So I don't think we will have an issue with the refresh.
A bit concerned with the size of the converters if they are in the TableMetadataCache
. The cache items are only evicted if they are not used, or the tables are updated. So old converters are still kept until the table is not used anymore. This might not be a big issue, but we have to be aware of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I support to move the cache to the TableMetadataCache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a commit to this PR, which moves the converter cache into TableMetadataCache
.
....0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
Outdated
Show resolved
Hide resolved
913c0c6
to
0a6af3a
Compare
According to the profile in my previous comment #13340 (comment), schema caching would not be sufficient and we also need to cache field accessors and converters to minimise the CPU overhead. The object overhead is minimal as each converter would only store filed accessors and conversion lambdas. The cache overhead is minimal because it is an identity cache and same schema objects are already cached in TableMetadataCache. |
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
Outdated
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/ArrayConverter.java
Outdated
Show resolved
Hide resolved
for (int i = 0; i < convertedArray.length; i++) { | ||
Object element = elementGetter.getElementOrNull(arrayData, i); | ||
convertedArray[i] = elementConverter.convert(element); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
https://iceberg.apache.org/contribute/#block-spacing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why this is not enforced by checkstyle / spotless :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering the same :) Shall we start a thread on the mailing list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a PR to enforce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a discussion to remove this style ;)
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/DataConverter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/MapConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/convert/RowDataConverter.java
Outdated
Show resolved
Hide resolved
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.
TableMetadataCache already contains an identity cache to store schema comparison results. Let's move the row data converter cache into SchemaInfo and make it configurable.
0a6af3a
to
5c63747
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining the rational behind the change. This is an excellent contribution!
/** | ||
* Maximum input {@link org.apache.iceberg.Schema} objects to cache per each Iceberg table. The | ||
* cache improves Dynamic Sink performance by storing {@link org.apache.iceberg.Schema} | ||
* comparison results. | ||
*/ | ||
public Builder<T> inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaximumSize) { | ||
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; | ||
return this; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need an extra option or whether we can use cacheMaxSize
for now. The conversion cache isn't fundamentally different than the other caches. Eventually, I think we want something like
public Build<T> cacheConfiguration(CacheConfiguration config);
where we configure all caches in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are more conservative about the possible configuration options. I would suggest to refrain from introducing a new config until we know how we would like to do it. Otherwise we need to keep the old one for backward compatibility reasons.
If we are not sure we can start with some reasonable static value, and add the config later
TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) { | ||
this(catalog, maximumSize, refreshMs, MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this constructor or annotate with @VisibleForTesting
? I think its best to remove it to avoid confusion.
this.converter = converter; | ||
} | ||
|
||
Schema tableSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this could be a selectedTableSchema
? Or something more descriptive than tableSchema
@@ -220,37 +238,59 @@ SchemaInfo getSchemaInfo() { | |||
*/ | |||
static class SchemaInfo { | |||
private final Map<Integer, Schema> schemas; | |||
private final Map<Schema, Tuple2<Schema, CompareSchemasVisitor.Result>> lastResults; | |||
private final Cache<Schema, SchemaCompareInfo> lastResults; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the performance of this cache?
I specifically used LimitedLinkedHashMap
to avoid the extra hit on performance caused by the cache, and the eviction policy enforcement
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.