Skip to content

Commit

Permalink
[Flink] Add Flink DataStream Sink for Arrow RecordBatch (#491)
Browse files Browse the repository at this point in the history
* add LakeSoulArrow Type

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* add LakeSoulArrowWriterBucket

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* tmp commit for rebase

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix after rebase

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* find error with NATIVE_METADATA_UPDATE_ENABLED=true

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* bigger checkpoint interval&timeout for test

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* minor fix

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* cargo fmt

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* optimize transactionInsert

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix spark timestamp type partition column

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix case of empty partition columns

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix case of empty partition columns

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* update test params

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* update test params

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* update suite test expected result

Signed-off-by: zenghua <huazeng@dmetasoul.com>

---------

Signed-off-by: zenghua <huazeng@dmetasoul.com>
Co-authored-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
Ceng23333 and zenghua committed Jun 4, 2024
1 parent dccd535 commit 9958faf
Show file tree
Hide file tree
Showing 74 changed files with 3,085 additions and 993 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
} else {
if (middleCommitOps.contains(CommitOp.UpdateCommit) || middleCommitOps.contains(CommitOp.CompactionCommit)) {
partitionDescList.remove(partitionDesc);
snapshotList.removeAll(partitionInfo.getSnapshotList());
snapshotList.removeAll(partitionInfo.getSnapshotList().stream().map(uuid -> DBUtil.toJavaUUID(uuid).toString()).collect(Collectors.toList()));
continue;
}
curPartitionInfo = updateSubmitPartitionSnapshot(partitionInfo, curPartitionInfo, readPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,38 @@ public static String protoUuidToJniString(Uuid uuid) {
sb.append(low);
return sb.toString();
}

public static class Timer {

private final String name;
private long totalCost;
private long times;

private long startTimeOnce;

public Timer(String name) {
this.name = name;
initTimer();
}

public void initTimer() {
totalCost = 0L;
times = 0L;
startTimeOnce = 0L;
}

public void start() {
startTimeOnce = System.currentTimeMillis();
}

public void end() {
times += 1;
totalCost += System.currentTimeMillis() - startTimeOnce;
}

public void report() {
System.out.printf("Timer %s: totalCost=%d, times=%d, avgCost=%.3f\n", name, totalCost, times, (double) totalCost / times);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.stream.Collectors;

public class PartitionInfoDao {
final DBUtil.Timer transactionInsertTimer = new DBUtil.Timer("transactionInsert");

public void insert(PartitionInfo partitionInfo) {
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
Expand All @@ -38,50 +39,62 @@ public void insert(PartitionInfo partitionInfo) {
}

public boolean transactionInsert(List<PartitionInfo> partitionInfoList, List<String> snapshotList) {
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
if (partitionInfoList.isEmpty()) return true;
Integer count = NativeMetadataJavaClient.insert(
NativeUtils.CodedDaoType.TransactionInsertPartitionInfo,
JniWrapper.newBuilder().addAllPartitionInfo(partitionInfoList).build());
return count > 0;
}
boolean flag = true;
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = DBConnector.getConn();
pstmt = conn.prepareStatement("insert into partition_info (table_id, partition_desc, version, " +
"commit_op, snapshot, expression, domain) values (?, ?, ?, ? ,?, ?, ?)");
conn.setAutoCommit(false);
for (PartitionInfo partitionInfo : partitionInfoList) {
insertSinglePartitionInfo(conn, pstmt, partitionInfo);
}
pstmt = conn.prepareStatement("update data_commit_info set committed = 'true' where commit_id = ?");
for (String uuid : snapshotList) {
pstmt.setString(1, uuid);
pstmt.execute();
transactionInsertTimer.start();
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
if (partitionInfoList.isEmpty()) return true;
PartitionInfo snapshotContainer = PartitionInfo.newBuilder().addAllSnapshot(snapshotList.stream().map(s -> DBUtil.toProtoUuid(UUID.fromString(s))).collect(Collectors.toList())).build();

Integer count = NativeMetadataJavaClient.insert(
NativeUtils.CodedDaoType.TransactionInsertPartitionInfo,
JniWrapper.newBuilder()
.addAllPartitionInfo(partitionInfoList)
.addPartitionInfo(snapshotContainer)
.build());
return count > 0;
}
conn.commit();
} catch (SQLException e) {
flag = false;
boolean flag = true;
Connection conn = null;
PreparedStatement pstmt = null;
try {
if (conn != null) {
conn.rollback();
conn = DBConnector.getConn();
pstmt = conn.prepareStatement("insert into partition_info (table_id, partition_desc, version, " +
"commit_op, snapshot, expression, domain) values (?, ?, ?, ? ,?, ?, ?)");
conn.setAutoCommit(false);
for (PartitionInfo partitionInfo : partitionInfoList) {
insertSinglePartitionInfo(conn, pstmt, partitionInfo);
}
} catch (SQLException ex) {
ex.printStackTrace();
}
if (e.getMessage().contains("duplicate key value violates unique constraint")) {
// only when primary key conflicts could we ignore the exception
e.printStackTrace();
} else {
// throw exception in all other cases
throw new RuntimeException(e);
pstmt = conn.prepareStatement("update data_commit_info set committed = 'true' where commit_id = ?");
for (String uuid : snapshotList) {
pstmt.setString(1, uuid);
pstmt.execute();
}
conn.commit();
} catch (SQLException e) {
flag = false;
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException ex) {
ex.printStackTrace();
}
if (e.getMessage().contains("duplicate key value violates unique constraint")) {
// only when primary key conflicts could we ignore the exception
e.printStackTrace();
} else {
// throw exception in all other cases
throw new RuntimeException(e);
}
} finally {
DBConnector.closeConn(pstmt, conn);
}
return flag;

} finally {
DBConnector.closeConn(pstmt, conn);
transactionInsertTimer.end();
// transactionInsertTimer.report();
}
return flag;
}

private void insertSinglePartitionInfo(Connection conn, PreparedStatement pstmt, PartitionInfo partitionInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.BinarySourceRecord;
import org.apache.flink.lakesoul.types.LakeSoulRecordConvert;
import org.apache.flink.lakesoul.types.arrow.LakeSoulArrowWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;
Expand Down Expand Up @@ -54,12 +56,12 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B

public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
context.conf.set(DYNAMIC_BUCKETING, false);
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl(
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl<RowData>(
context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME));
OutputFileConfig fileNameConfig = OutputFileConfig.builder()
.withPartSuffix(".parquet")
.build();
LakeSoulMultiTablesSink<BinarySourceRecord> sink = LakeSoulMultiTablesSink.forMultiTablesBulkFormat(context.conf)
LakeSoulMultiTablesSink<BinarySourceRecord, RowData> sink = LakeSoulMultiTablesSink.forMultiTablesBulkFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
Expand All @@ -68,6 +70,21 @@ public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<Binary
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
}

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context, DataStream<LakeSoulArrowWrapper> stream) {
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl<LakeSoulArrowWrapper>(
context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME));
OutputFileConfig fileNameConfig = OutputFileConfig.builder()
.withPartSuffix(".parquet")
.build();
LakeSoulMultiTablesSink<LakeSoulArrowWrapper, LakeSoulArrowWrapper> sink = LakeSoulMultiTablesSink.forMultiTablesArrowFormat(context.conf)
.withBucketCheckInterval(context.conf.getLong(BUCKET_CHECK_INTERVAL))
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable Arrow Sink")
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
}

public DataStreamSink<BinarySourceRecord> printStream(DataStream<BinarySourceRecord> stream, String name) {
PrintSinkFunction<BinarySourceRecord> printFunction = new PrintSinkFunction<>(name, false);
return stream.addSink(printFunction).name(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.lakesoul.sink.bucket.BucketsBuilder;
import org.apache.flink.lakesoul.sink.bucket.DefaultMultiTablesArrowFormatBuilder;
import org.apache.flink.lakesoul.sink.bucket.DefaultMultiTablesBulkFormatBuilder;
import org.apache.flink.lakesoul.sink.bucket.DefaultOneTableBulkFormatBuilder;
import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkCommittable;
Expand All @@ -20,6 +21,7 @@
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
Expand All @@ -30,13 +32,13 @@

import static org.apache.flink.util.Preconditions.checkNotNull;

public class LakeSoulMultiTablesSink<IN> implements
Sink<IN, LakeSoulMultiTableSinkCommittable, LakeSoulWriterBucketState,
LakeSoulMultiTableSinkGlobalCommittable> {
public class LakeSoulMultiTablesSink<IN, OUT> implements
Sink<IN, LakeSoulMultiTableSinkCommittable, LakeSoulWriterBucketState,
LakeSoulMultiTableSinkGlobalCommittable> {

private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;
private final BucketsBuilder<IN, OUT, ? extends BucketsBuilder<IN, OUT, ?>> bucketsBuilder;

public LakeSoulMultiTablesSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder) {
public LakeSoulMultiTablesSink(BucketsBuilder<IN, OUT, ? extends BucketsBuilder<IN, OUT, ?>> bucketsBuilder) {
this.bucketsBuilder = checkNotNull(bucketsBuilder);
}

Expand All @@ -51,11 +53,16 @@ public static DefaultMultiTablesBulkFormatBuilder forMultiTablesBulkFormat(Confi
conf);
}

public static DefaultMultiTablesArrowFormatBuilder forMultiTablesArrowFormat(Configuration conf) {
return new DefaultMultiTablesArrowFormatBuilder(new Path(conf.getString(LakeSoulSinkOptions.WAREHOUSE_PATH)),
conf);
}

@Override
public SinkWriter<IN, LakeSoulMultiTableSinkCommittable, LakeSoulWriterBucketState> createWriter(
InitContext context, List<LakeSoulWriterBucketState> states) throws IOException {
int subTaskId = context.getSubtaskId();
AbstractLakeSoulMultiTableSinkWriter<IN> writer = bucketsBuilder.createWriter(context, subTaskId);
AbstractLakeSoulMultiTableSinkWriter<IN, OUT> writer = bucketsBuilder.createWriter(context, subTaskId);
writer.initializeState(states);
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,67 +13,67 @@
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_TIME;

public class LakeSoulRollingPolicyImpl extends CheckpointRollingPolicy<RowData, String> {
public class LakeSoulRollingPolicyImpl<In> extends CheckpointRollingPolicy<In, String> {

private boolean rollOnCheckpoint;
private boolean rollOnCheckpoint;

private long rollingSize;
private long rollingSize;

private long rollingTime;
private long rollingTime;

public LakeSoulRollingPolicyImpl(long rollingSize, long rollingTime) {
this.rollOnCheckpoint = true;
this.rollingSize = rollingSize;
this.rollingTime = rollingTime;
}
public LakeSoulRollingPolicyImpl(long rollingSize, long rollingTime) {
this.rollOnCheckpoint = true;
this.rollingSize = rollingSize;
this.rollingTime = rollingTime;
}

public LakeSoulRollingPolicyImpl(boolean rollOnCheckpoint) {
this.rollingSize = DEFAULT_BUCKET_ROLLING_SIZE;
this.rollingTime = DEFAULT_BUCKET_ROLLING_TIME;
this.rollOnCheckpoint = rollOnCheckpoint;
}
public LakeSoulRollingPolicyImpl(boolean rollOnCheckpoint) {
this.rollingSize = DEFAULT_BUCKET_ROLLING_SIZE;
this.rollingTime = DEFAULT_BUCKET_ROLLING_TIME;
this.rollOnCheckpoint = rollOnCheckpoint;
}

@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
return this.rollOnCheckpoint;
}
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
return this.rollOnCheckpoint;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) throws IOException {
return partFileState.getSize() >= this.rollingSize;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, In element) throws IOException {
return partFileState.getSize() >= this.rollingSize;
}

@Override
public boolean shouldRollOnProcessingTime(
PartFileInfo<String> partFileState, long currentTime) {
return currentTime - partFileState.getLastUpdateTime() > rollingTime;
}
@Override
public boolean shouldRollOnProcessingTime(
PartFileInfo<String> partFileState, long currentTime) {
return currentTime - partFileState.getLastUpdateTime() > rollingTime;
}

public boolean shouldRollOnMaxSize(long size) {
return size > rollingSize;
}
public boolean shouldRollOnMaxSize(long size) {
return size > rollingSize;
}

public long getRollingSize() {
return rollingSize;
}
public long getRollingSize() {
return rollingSize;
}

public void setRollingSize(long rollingSize) {
this.rollingSize = rollingSize;
}
public void setRollingSize(long rollingSize) {
this.rollingSize = rollingSize;
}

public long getRollingTime() {
return rollingTime;
}
public long getRollingTime() {
return rollingTime;
}

public void setRollingTime(long rollingTime) {
this.rollingTime = rollingTime;
}
public void setRollingTime(long rollingTime) {
this.rollingTime = rollingTime;
}

public boolean isRollOnCheckpoint() {
return rollOnCheckpoint;
}
public boolean isRollOnCheckpoint() {
return rollOnCheckpoint;
}

public void setRollOnCheckpoint(boolean rollOnCheckpoint) {
this.rollOnCheckpoint = rollOnCheckpoint;
}
public void setRollOnCheckpoint(boolean rollOnCheckpoint) {
this.rollOnCheckpoint = rollOnCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* The base abstract class for the {@link BulkFormatBuilder}.
*/
public abstract class BucketsBuilder<IN, T extends BucketsBuilder<IN, T>>
public abstract class BucketsBuilder<IN, OUT, T extends BucketsBuilder<IN, OUT, T>>
implements Serializable {

private static final long serialVersionUID = 1L;
Expand All @@ -31,7 +31,7 @@ protected T self() {
return (T) this;
}

public abstract AbstractLakeSoulMultiTableSinkWriter<IN> createWriter(final Sink.InitContext context, int subTaskId) throws IOException;
public abstract AbstractLakeSoulMultiTableSinkWriter<IN, OUT> createWriter(final Sink.InitContext context, int subTaskId) throws IOException;

public abstract LakeSoulSinkCommitter createCommitter() throws IOException;

Expand Down
Loading

0 comments on commit 9958faf

Please sign in to comment.