Skip to content

Optimise RowData evolution #13340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

aiborodin
Copy link

RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.

RowDataEvolver recomputes Flink RowType and field getters for every
input record that needs to match a destination Iceberg table schema.
Cache field getters and column converters to optimise RowData
conversion.
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:

  1. Adding caching to the conversion write path
  2. Refactoring RowDataEvolver to dynamically instantiate converter classes (quasi code generation)

I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.

data.schema(),
dataSchema ->
new RowDataConverter(
FlinkSchemaUtil.convert(dataSchema), FlinkSchemaUtil.convert(schema)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we measured which conversion steps take the most time? Would it suffice to simply cache source and target schema while retaining the static conversion code? My gut feeling is that the schema conversion is the most expensive. Apart from caching the schema, the code here creates a series of objects, which adds to the memory footprint.

*/
public class RowDataConverter implements DataConverter {
private final RowData.FieldGetter[] fieldGetters;
private final DataConverter[] dataConverters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why we need to break apart RowDataEvolver. Could we simply add a cache in RowDataEvolver? I don't think the quasi code generation here leads to much performance gain, apart from adding to the memory footprint.

private final DataConverter keyConverter;
private final DataConverter valueConverter;

public MapConverter(MapType sourceType, MapType targetType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need separate classes we instantiate for every schema?


@Internal
class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal>
implements Collector<DynamicRecord> {
private static final int ROW_DATA_CONVERTER_CACHE_MAXIMUM_SIZE = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable, similarly to the other caches.

Comment on lines +75 to +77
new RowDataConverter(
FlinkSchemaUtil.convert(data.schema()), FlinkSchemaUtil.convert(newData.f0))
.convert(data.rowData());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no caching here, or is there?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants