Skip to content

Commit

Permalink
[Flink]Support dynamic partition pushdown for streaming source (#489)
Browse files Browse the repository at this point in the history
* run git action

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* Support dynamic partition pushdown

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* get partition values from split attribute

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix clippy

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix clippy

Signed-off-by: zenghua <huazeng@dmetasoul.com>

---------

Signed-off-by: zenghua <huazeng@dmetasoul.com>
Co-authored-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
Ceng23333 and zenghua committed May 23, 2024
1 parent b0f024f commit 793d6d1
Show file tree
Hide file tree
Showing 54 changed files with 1,254 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object DataOperation {
val dbManager = new DBManager

def getTableDataInfo(tableId: String): Array[DataFileInfo] = {
getTableDataInfo(MetaVersion.getAllPartitionInfo(tableId))
getTableDataInfo(MetaVersion.getAllPartitionInfoScala(tableId))
}

def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo] = {
Expand All @@ -76,7 +76,7 @@ object DataOperation {


def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo] = {
val Pars = MetaVersion.getAllPartitionInfo(tableId)
val Pars = MetaVersion.getAllPartitionInfoScala(tableId)
val partitionInfos = new ArrayBuffer[PartitionInfoScala]()
for (partition_info <- Pars) {
var contained = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.dmetasoul.lakesoul.meta

import com.alibaba.fastjson.JSONObject
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo

import java.util
import java.util.UUID
Expand Down Expand Up @@ -184,9 +185,13 @@ object MetaVersion {
(false, "")
}

def getAllPartitionInfo(table_id: String): Array[PartitionInfoScala] = {
def getAllPartitionInfo(table_id: String): util.List[PartitionInfo] = {
dbManager.getAllPartitionInfo(table_id)
}

def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]()
val res_itr = dbManager.getAllPartitionInfo(table_id).iterator()
val res_itr = getAllPartitionInfo(table_id).iterator()
while (res_itr.hasNext) {
val res = res_itr.next()
partitionVersionBuffer += PartitionInfoScala(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected BulkFormatBuilder(Path basePath, Configuration conf) {
conf,
DEFAULT_BUCKET_CHECK_INTERVAL,
OnCheckpointRollingPolicy.build(),
new DefaultLakeSoulWriterBucketFactory(),
new DefaultLakeSoulWriterBucketFactory(conf),
OutputFileConfig.builder().build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin

LOG.info("Files to commit {}", String.join("; ", files));

if (files.isEmpty()) continue;
if (files.isEmpty() && !LakeSoulSinkOptions.DELETE.equals(committable.getDmlType())) {
continue;
}

// commit LakeSoul Meta
TableSchemaIdentity identity = committable.getIdentity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush) thro
committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo));
}
}

LOG.info("PrepareCommit with conf={}, \n activeBuckets={}, \n committables={}", conf, activeBuckets, committables);
return committables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.apache.flink.lakesoul.sink.writer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
Expand All @@ -21,6 +22,12 @@
@Internal
public class DefaultLakeSoulWriterBucketFactory implements LakeSoulWriterBucketFactory {

private final Configuration conf;

public DefaultLakeSoulWriterBucketFactory(Configuration conf) {
this.conf = conf;
}

@Override
public LakeSoulWriterBucket getNewBucket(
int subTaskId,
Expand All @@ -32,7 +39,7 @@ public LakeSoulWriterBucket getNewBucket(
OutputFileConfig outputFileConfig) {
return LakeSoulWriterBucket.getNew(
subTaskId, tableId,
bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig);
bucketId, bucketPath, conf, bucketWriter, rollingPolicy, outputFileConfig);
}

@Override
Expand All @@ -44,7 +51,7 @@ public LakeSoulWriterBucket restoreBucket(
LakeSoulWriterBucketState bucketState,
OutputFileConfig outputFileConfig)
throws IOException {
return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter,
return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter, conf,
rollingPolicy, bucketState, outputFileConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

package org.apache.flink.lakesoul.sink.writer;

import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink;
import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkCommittable;
import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.streaming.api.functions.sink.filesystem.*;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -47,6 +51,7 @@ public class LakeSoulWriterBucket {
private final OutputFileConfig outputFileConfig;

private final String uniqueId;
private final Configuration conf;

private long tsMs;

Expand All @@ -67,16 +72,19 @@ public class LakeSoulWriterBucket {
* Constructor to create a new empty bucket.
*/
private LakeSoulWriterBucket(
int subTaskId, TableSchemaIdentity tableId,
int subTaskId,
TableSchemaIdentity tableId,
String bucketId,
Path bucketPath,
Configuration conf,
BucketWriter<RowData, String> bucketWriter,
RollingPolicy<RowData, String> rollingPolicy,
OutputFileConfig outputFileConfig) {
this.subTaskId = subTaskId;
this.tableId = checkNotNull(tableId);
this.bucketId = checkNotNull(bucketId);
this.bucketPath = checkNotNull(bucketPath);
this.conf = checkNotNull(conf);
this.bucketWriter = checkNotNull(bucketWriter);
this.rollingPolicy = checkNotNull(rollingPolicy);
this.outputFileConfig = checkNotNull(outputFileConfig);
Expand All @@ -91,6 +99,7 @@ private LakeSoulWriterBucket(
private LakeSoulWriterBucket(
int subTaskId,
TableSchemaIdentity tableId,
Configuration conf,
BucketWriter<RowData, String> partFileFactory,
RollingPolicy<RowData, String> rollingPolicy,
LakeSoulWriterBucketState bucketState,
Expand All @@ -100,6 +109,7 @@ private LakeSoulWriterBucket(
subTaskId, tableId,
bucketState.getBucketId(),
bucketState.getBucketPath(),
conf,
partFileFactory,
rollingPolicy,
outputFileConfig);
Expand Down Expand Up @@ -171,12 +181,16 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlT
long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE :
((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime;

// this.pendingFiles would be cleared later, we need to make a copy
// List<InProgressFileWriter.PendingFileRecoverable> tmpPending = new ArrayList<>(pendingFiles);
// committables.add(new LakeSoulMultiTableSinkCommittable(
// getBucketId(),
// tmpPending,
// time, tableId, tsMs, dmlType));
if (dmlType.equals(LakeSoulSinkOptions.DELETE)) {
List<PartitionInfo> sourcePartitionInfoList = JniWrapper
.parseFrom(Base64.getDecoder().decode(sourcePartitionInfo))
.getPartitionInfoList();

for (PartitionInfo partitionInfo : sourcePartitionInfoList) {
String partitionDesc = partitionInfo.getPartitionDesc();
pendingFilesMap.computeIfAbsent(partitionDesc, _partitionDesc -> new ArrayList());
}
}
committables.add(new LakeSoulMultiTableSinkCommittable(
// getBucketId(),
tableId,
Expand Down Expand Up @@ -312,12 +326,13 @@ static LakeSoulWriterBucket getNew(
final TableSchemaIdentity tableId,
final String bucketId,
final Path bucketPath,
final Configuration conf,
final BucketWriter<RowData, String> bucketWriter,
final RollingPolicy<RowData, String> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return new LakeSoulWriterBucket(
subTaskId, tableId,
bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig);
bucketId, bucketPath, conf, bucketWriter, rollingPolicy, outputFileConfig);
}

/**
Expand All @@ -333,9 +348,10 @@ static LakeSoulWriterBucket restore(
int subTaskId,
final TableSchemaIdentity tableId,
final BucketWriter<RowData, String> bucketWriter,
final Configuration conf,
final RollingPolicy<RowData, String> rollingPolicy,
final LakeSoulWriterBucketState bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return new LakeSoulWriterBucket(subTaskId, tableId, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
return new LakeSoulWriterBucket(subTaskId, tableId, conf, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
}
}

This file was deleted.

Loading

0 comments on commit 793d6d1

Please sign in to comment.