Skip to content

Commit

Permalink
Flink: Backport apache#8553 to v1.15, v1.16
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 23, 2023
1 parent feb2172 commit 67d5cdc
Show file tree
Hide file tree
Showing 32 changed files with 2,430 additions and 86 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
Expand Down Expand Up @@ -58,15 +59,20 @@
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
Expand All @@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table) {
Table table,
SerializableRecordEmitter<T> emitter) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
}

String name() {
Expand Down Expand Up @@ -152,7 +161,8 @@ public Boundedness getBoundedness() {
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, readerContext);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}

@Override
Expand Down Expand Up @@ -216,6 +226,8 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -237,6 +249,9 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -429,6 +444,33 @@ public Builder<T> setAll(Map<String, String> properties) {
return this;
}

/**
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
*/
public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
return this;
}

/**
* When the type of the {@link #watermarkColumn} is {@link
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
return this;
}

/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
Expand All @@ -453,6 +495,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -485,8 +539,14 @@ public IcebergSource<T> build() {

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<T>(
tableLoader, context, readerFunction, splitAssignerFactory, splitComparator, table);
return new IcebergSource<>(
tableLoader,
context,
readerFunction,
splitAssignerFactory,
splitComparator,
table,
emitter);
}

private void checkRequired() {
Expand Down
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;

/**
* {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics
* to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link
* WatermarkExtractorRecordEmitter} along with the actual records.
*/
@Internal
public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable {
private final int eventTimeFieldId;
private final String eventTimeFieldName;
private final TimeUnit timeUnit;

/**
* Creates the extractor.
*
* @param schema The schema of the Table
* @param eventTimeFieldName The column which should be used as an event time
* @param timeUnit Used for converting the long value to epoch milliseconds
*/
public ColumnStatsWatermarkExtractor(
Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
Types.NestedField field = schema.findField(eventTimeFieldName);
TypeID typeID = field.type().typeId();
Preconditions.checkArgument(
typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
"Found %s, expected a LONG or TIMESTAMP column for watermark generation.",
typeID);
this.eventTimeFieldId = field.fieldId();
this.eventTimeFieldName = eventTimeFieldName;
// Use the timeUnit only for Long columns.
this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS;
}

@VisibleForTesting
ColumnStatsWatermarkExtractor(int eventTimeFieldId, String eventTimeFieldName) {
this.eventTimeFieldId = eventTimeFieldId;
this.eventTimeFieldName = eventTimeFieldName;
this.timeUnit = TimeUnit.MICROSECONDS;
}

/**
* Get the watermark for a split using column statistics.
*
* @param split The split
* @return The watermark
* @throws IllegalArgumentException if there is no statistics for the column
*/
@Override
public long extractWatermark(IcebergSourceSplit split) {
return split.task().files().stream()
.map(
scanTask -> {
Preconditions.checkArgument(
scanTask.file().lowerBounds() != null
&& scanTask.file().lowerBounds().get(eventTimeFieldId) != null,
"Missing statistics for column name = %s in file = %s",
eventTimeFieldName,
eventTimeFieldId,
scanTask.file());
return timeUnit.toMillis(
Conversions.fromByteBuffer(
Types.LongType.get(), scanTask.file().lowerBounds().get(eventTimeFieldId)));
})
.min(Comparator.comparingLong(l -> l))
.get();
}
}
Expand Up @@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

public IcebergSourceReader(
SerializableRecordEmitter<T> emitter,
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction, splitComparator, context),
new IcebergSourceRecordEmitter<>(),
emitter,
context.getConfiguration(),
context);
}
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

@Internal
@FunctionalInterface
public interface SerializableRecordEmitter<T>
extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>, Serializable {
static <T> SerializableRecordEmitter<T> defaultEmitter() {
return (element, output, split) -> {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
};
}

static <T> SerializableRecordEmitter<T> emitterWithWatermark(SplitWatermarkExtractor extractor) {
return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
Expand Up @@ -18,19 +18,11 @@
*/
package org.apache.iceberg.flink.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import java.io.Serializable;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

final class IcebergSourceRecordEmitter<T>
implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {

IcebergSourceRecordEmitter() {}

@Override
public void emitRecord(
RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
}
/** The interface used to extract watermarks from splits. */
public interface SplitWatermarkExtractor extends Serializable {
/** Get the watermark for a split. */
long extractWatermark(IcebergSourceSplit split);
}

0 comments on commit 67d5cdc

Please sign in to comment.