-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Optimise RowData evolution #13340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Optimise RowData evolution #13340
Conversation
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:
- Adding caching to the conversion write path
- Refactoring
RowDataEvolver
to dynamically instantiate converter classes (quasi code generation)
I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.
data.schema(), | ||
dataSchema -> | ||
new RowDataConverter( | ||
FlinkSchemaUtil.convert(dataSchema), FlinkSchemaUtil.convert(schema))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we measured which conversion steps take the most time? Would it suffice to simply cache source and target schema while retaining the static conversion code? My gut feeling is that the schema conversion is the most expensive. Apart from caching the schema, the code here creates a series of objects, which adds to the memory footprint.
*/ | ||
public class RowDataConverter implements DataConverter { | ||
private final RowData.FieldGetter[] fieldGetters; | ||
private final DataConverter[] dataConverters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand why we need to break apart RowDataEvolver. Could we simply add a cache in RowDataEvolver? I don't think the quasi code generation here leads to much performance gain, apart from adding to the memory footprint.
private final DataConverter keyConverter; | ||
private final DataConverter valueConverter; | ||
|
||
public MapConverter(MapType sourceType, MapType targetType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need separate classes we instantiate for every schema?
|
||
@Internal | ||
class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal> | ||
implements Collector<DynamicRecord> { | ||
private static final int ROW_DATA_CONVERTER_CACHE_MAXIMUM_SIZE = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be configurable, similarly to the other caches.
new RowDataConverter( | ||
FlinkSchemaUtil.convert(data.schema()), FlinkSchemaUtil.convert(newData.f0)) | ||
.convert(data.rowData()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no caching here, or is there?
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.