From 793d6d14c9a0f8664eefed267642765120d5272a Mon Sep 17 00:00:00 2001 From: Ceng <441651826@qq.com> Date: Thu, 23 May 2024 10:54:35 +0800 Subject: [PATCH] [Flink]Support dynamic partition pushdown for streaming source (#489) * run git action Signed-off-by: zenghua * Support dynamic partition pushdown Signed-off-by: zenghua * get partition values from split attribute Signed-off-by: zenghua * fix clippy Signed-off-by: zenghua * fix clippy Signed-off-by: zenghua --------- Signed-off-by: zenghua Co-authored-by: zenghua --- .../lakesoul/meta/DataOperation.scala | 4 +- .../dmetasoul/lakesoul/meta/MetaVersion.scala | 9 +- .../sink/bucket/BulkFormatBuilder.java | 2 +- .../sink/committer/LakeSoulSinkCommitter.java | 4 +- .../AbstractLakeSoulMultiTableSinkWriter.java | 2 +- .../DefaultLakeSoulWriterBucketFactory.java | 11 +- .../sink/writer/LakeSoulWriterBucket.java | 34 +++- .../LakeSoulWriterDynamicBucketFactory.java | 50 ----- ...oulAllPartitionDynamicSplitEnumerator.java | 176 +++++++++++++++++ .../source/LakeSoulDynSplitAssigner.java | 25 ++- .../LakeSoulDynamicSplitEnumerator.java | 131 ------------- .../source/LakeSoulLookupTableSource.java | 20 +- .../source/LakeSoulOneSplitRecordsReader.java | 62 +++--- ...Split.java => LakeSoulPartitionSplit.java} | 15 +- .../source/LakeSoulPendingSplits.java | 17 +- .../source/LakeSoulRecordEmitter.java | 4 +- .../source/LakeSoulSimpleSplitAssigner.java | 12 +- .../flink/lakesoul/source/LakeSoulSource.java | 138 ++++++++----- .../lakesoul/source/LakeSoulSourceReader.java | 16 +- .../lakesoul/source/LakeSoulSplitReader.java | 39 ++-- .../source/LakeSoulStaticSplitEnumerator.java | 21 +- ...SimpleLakeSoulPendingSplitsSerializer.java | 13 +- .../source/SimpleLakeSoulSerializer.java | 10 +- .../substrait/SubstraitFlinkUtil.java | 51 +++-- .../lakesoul/substrait/SubstraitVisitor.java | 9 +- .../table/LakeSoulDynamicTableFactory.java | 18 +- ...keSoulRowLevelModificationScanContext.java | 47 ++++- .../lakesoul/table/LakeSoulTableSink.java | 23 ++- .../lakesoul/table/LakeSoulTableSource.java | 182 ++++++++++++------ .../apache/flink/lakesoul/tool/FlinkUtil.java | 19 +- .../connector/sink/LakeSoulTableSinkCase.java | 6 +- .../lakesoul/test/flinkSource/DMLSuite.java | 52 +++++ .../test/flinkSource/StreamReadSuite.java | 75 +++++++- native-io/lakesoul-io-java/pom.xml | 6 + .../lakesoul/lakesoul/io/NativeIOBase.java | 18 ++ .../lakesoul/io/jnr/LibLakeSoulIO.java | 6 + .../lakesoul/io/substrait/SubstraitUtil.java | 129 +++++++++++++ rust/Cargo.lock | 1 + rust/lakesoul-datafusion/src/catalog/mod.rs | 2 +- .../datasource/file_format/metadata_format.rs | 2 +- .../src/datasource/table_provider.rs | 10 +- .../src/lakesoul_table/helpers.rs | 2 +- .../src/planner/physical_planner.rs | 18 +- rust/lakesoul-io-c/Cargo.toml | 3 +- rust/lakesoul-io-c/lakesoul_c_bindings.h | 12 ++ rust/lakesoul-io-c/src/lib.rs | 65 ++++++- rust/lakesoul-io/Cargo.toml | 4 +- .../lakesoul-io/src/datasource/file_format.rs | 2 +- rust/lakesoul-io/src/datasource/listing.rs | 19 +- rust/lakesoul-io/src/helpers.rs | 132 +++++++++++-- rust/lakesoul-io/src/lakesoul_io_config.rs | 22 ++- rust/lakesoul-io/src/lakesoul_reader.rs | 4 +- rust/lakesoul-io/src/lakesoul_writer.rs | 31 +-- rust/lakesoul-io/src/repartition/mod.rs | 9 +- 54 files changed, 1254 insertions(+), 540 deletions(-) delete mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java delete mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynamicSplitEnumerator.java rename lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/{LakeSoulSplit.java => LakeSoulPartitionSplit.java} (72%) diff --git a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala index 26ecd5eb7..545c102eb 100644 --- a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala +++ b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala @@ -60,7 +60,7 @@ object DataOperation { val dbManager = new DBManager def getTableDataInfo(tableId: String): Array[DataFileInfo] = { - getTableDataInfo(MetaVersion.getAllPartitionInfo(tableId)) + getTableDataInfo(MetaVersion.getAllPartitionInfoScala(tableId)) } def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo] = { @@ -76,7 +76,7 @@ object DataOperation { def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo] = { - val Pars = MetaVersion.getAllPartitionInfo(tableId) + val Pars = MetaVersion.getAllPartitionInfoScala(tableId) val partitionInfos = new ArrayBuffer[PartitionInfoScala]() for (partition_info <- Pars) { var contained = true; diff --git a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala index 9954b641c..4d90f87ef 100644 --- a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala +++ b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala @@ -5,6 +5,7 @@ package com.dmetasoul.lakesoul.meta import com.alibaba.fastjson.JSONObject +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo import java.util import java.util.UUID @@ -184,9 +185,13 @@ object MetaVersion { (false, "") } - def getAllPartitionInfo(table_id: String): Array[PartitionInfoScala] = { + def getAllPartitionInfo(table_id: String): util.List[PartitionInfo] = { + dbManager.getAllPartitionInfo(table_id) + } + + def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala] = { val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() - val res_itr = dbManager.getAllPartitionInfo(table_id).iterator() + val res_itr = getAllPartitionInfo(table_id).iterator() while (res_itr.hasNext) { val res = res_itr.next() partitionVersionBuffer += PartitionInfoScala( diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java index 5fc74e51e..5775ac909 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java @@ -46,7 +46,7 @@ protected BulkFormatBuilder(Path basePath, Configuration conf) { conf, DEFAULT_BUCKET_CHECK_INTERVAL, OnCheckpointRollingPolicy.build(), - new DefaultLakeSoulWriterBucketFactory(), + new DefaultLakeSoulWriterBucketFactory(conf), OutputFileConfig.builder().build()); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index 96d609145..25ddc81ca 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -70,7 +70,9 @@ public List commit(List prepareCommit(boolean flush) thro committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo)); } } - + LOG.info("PrepareCommit with conf={}, \n activeBuckets={}, \n committables={}", conf, activeBuckets, committables); return committables; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DefaultLakeSoulWriterBucketFactory.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DefaultLakeSoulWriterBucketFactory.java index 6e82523d3..f675f1e21 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DefaultLakeSoulWriterBucketFactory.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DefaultLakeSoulWriterBucketFactory.java @@ -5,6 +5,7 @@ package org.apache.flink.lakesoul.sink.writer; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState; import org.apache.flink.lakesoul.types.TableSchemaIdentity; @@ -21,6 +22,12 @@ @Internal public class DefaultLakeSoulWriterBucketFactory implements LakeSoulWriterBucketFactory { + private final Configuration conf; + + public DefaultLakeSoulWriterBucketFactory(Configuration conf) { + this.conf = conf; + } + @Override public LakeSoulWriterBucket getNewBucket( int subTaskId, @@ -32,7 +39,7 @@ public LakeSoulWriterBucket getNewBucket( OutputFileConfig outputFileConfig) { return LakeSoulWriterBucket.getNew( subTaskId, tableId, - bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig); + bucketId, bucketPath, conf, bucketWriter, rollingPolicy, outputFileConfig); } @Override @@ -44,7 +51,7 @@ public LakeSoulWriterBucket restoreBucket( LakeSoulWriterBucketState bucketState, OutputFileConfig outputFileConfig) throws IOException { - return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter, + return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter, conf, rollingPolicy, bucketState, outputFileConfig); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 8042d7753..bd7655d68 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -4,10 +4,14 @@ package org.apache.flink.lakesoul.sink.writer; +import com.dmetasoul.lakesoul.meta.entity.JniWrapper; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink; import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkCommittable; import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState; +import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; import org.apache.flink.lakesoul.types.TableSchemaIdentity; import org.apache.flink.streaming.api.functions.sink.filesystem.*; import org.apache.flink.table.data.RowData; @@ -47,6 +51,7 @@ public class LakeSoulWriterBucket { private final OutputFileConfig outputFileConfig; private final String uniqueId; + private final Configuration conf; private long tsMs; @@ -67,9 +72,11 @@ public class LakeSoulWriterBucket { * Constructor to create a new empty bucket. */ private LakeSoulWriterBucket( - int subTaskId, TableSchemaIdentity tableId, + int subTaskId, + TableSchemaIdentity tableId, String bucketId, Path bucketPath, + Configuration conf, BucketWriter bucketWriter, RollingPolicy rollingPolicy, OutputFileConfig outputFileConfig) { @@ -77,6 +84,7 @@ private LakeSoulWriterBucket( this.tableId = checkNotNull(tableId); this.bucketId = checkNotNull(bucketId); this.bucketPath = checkNotNull(bucketPath); + this.conf = checkNotNull(conf); this.bucketWriter = checkNotNull(bucketWriter); this.rollingPolicy = checkNotNull(rollingPolicy); this.outputFileConfig = checkNotNull(outputFileConfig); @@ -91,6 +99,7 @@ private LakeSoulWriterBucket( private LakeSoulWriterBucket( int subTaskId, TableSchemaIdentity tableId, + Configuration conf, BucketWriter partFileFactory, RollingPolicy rollingPolicy, LakeSoulWriterBucketState bucketState, @@ -100,6 +109,7 @@ private LakeSoulWriterBucket( subTaskId, tableId, bucketState.getBucketId(), bucketState.getBucketPath(), + conf, partFileFactory, rollingPolicy, outputFileConfig); @@ -171,12 +181,16 @@ List prepareCommit(boolean flush, String dmlT long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE : ((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime; - // this.pendingFiles would be cleared later, we need to make a copy -// List tmpPending = new ArrayList<>(pendingFiles); -// committables.add(new LakeSoulMultiTableSinkCommittable( -// getBucketId(), -// tmpPending, -// time, tableId, tsMs, dmlType)); + if (dmlType.equals(LakeSoulSinkOptions.DELETE)) { + List sourcePartitionInfoList = JniWrapper + .parseFrom(Base64.getDecoder().decode(sourcePartitionInfo)) + .getPartitionInfoList(); + + for (PartitionInfo partitionInfo : sourcePartitionInfoList) { + String partitionDesc = partitionInfo.getPartitionDesc(); + pendingFilesMap.computeIfAbsent(partitionDesc, _partitionDesc -> new ArrayList()); + } + } committables.add(new LakeSoulMultiTableSinkCommittable( // getBucketId(), tableId, @@ -312,12 +326,13 @@ static LakeSoulWriterBucket getNew( final TableSchemaIdentity tableId, final String bucketId, final Path bucketPath, + final Configuration conf, final BucketWriter bucketWriter, final RollingPolicy rollingPolicy, final OutputFileConfig outputFileConfig) { return new LakeSoulWriterBucket( subTaskId, tableId, - bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig); + bucketId, bucketPath, conf, bucketWriter, rollingPolicy, outputFileConfig); } /** @@ -333,9 +348,10 @@ static LakeSoulWriterBucket restore( int subTaskId, final TableSchemaIdentity tableId, final BucketWriter bucketWriter, + final Configuration conf, final RollingPolicy rollingPolicy, final LakeSoulWriterBucketState bucketState, final OutputFileConfig outputFileConfig) throws IOException { - return new LakeSoulWriterBucket(subTaskId, tableId, bucketWriter, rollingPolicy, bucketState, outputFileConfig); + return new LakeSoulWriterBucket(subTaskId, tableId, conf, bucketWriter, rollingPolicy, bucketState, outputFileConfig); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java deleted file mode 100644 index a6f49ccfa..000000000 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package org.apache.flink.lakesoul.sink.writer; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.core.fs.Path; -import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState; -import org.apache.flink.lakesoul.types.TableSchemaIdentity; -import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; -import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; -import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; -import org.apache.flink.table.data.RowData; - -import java.io.IOException; - -/** - * A factory returning {@link AbstractLakeSoulMultiTableSinkWriter writer}. - */ -@Internal -public class LakeSoulWriterDynamicBucketFactory implements LakeSoulWriterBucketFactory { - - @Override - public LakeSoulWriterBucket getNewBucket( - int subTaskId, - TableSchemaIdentity tableId, - String bucketId, - Path bucketPath, - BucketWriter bucketWriter, - RollingPolicy rollingPolicy, - OutputFileConfig outputFileConfig) { - return LakeSoulWriterBucket.getNew( - subTaskId, tableId, - bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig); - } - - @Override - public LakeSoulWriterBucket restoreBucket( - int subTaskId, - TableSchemaIdentity tableId, - BucketWriter bucketWriter, - RollingPolicy rollingPolicy, - LakeSoulWriterBucketState bucketState, - OutputFileConfig outputFileConfig) - throws IOException { - return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter, - rollingPolicy, bucketState, outputFileConfig); - } -} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java new file mode 100644 index 000000000..a3b0228be --- /dev/null +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -0,0 +1,176 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.flink.lakesoul.source; + +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.dmetasoul.lakesoul.meta.DataFileInfo; +import com.dmetasoul.lakesoul.meta.DataOperation; +import com.dmetasoul.lakesoul.meta.MetaVersion; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import io.substrait.proto.Plan; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.fs.Path; +import org.apache.flink.lakesoul.tool.FlinkUtil; +import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; +import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(LakeSoulAllPartitionDynamicSplitEnumerator.class); + + private final SplitEnumeratorContext context; + + private final LakeSoulDynSplitAssigner splitAssigner; + private final long discoveryInterval; + private final Map partitionLatestTimestamp; + private final Set taskIdsAwaitingSplit; + private final Plan partitionFilters; + private final List partitionColumns; + private final TableInfo tableInfo; + String tableId; + private long startTime; + private long nextStartTime; + private int hashBucketNum = -1; + + protected Schema partitionArrowSchema; + + public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext context, + LakeSoulDynSplitAssigner splitAssigner, + RowType rowType, + long discoveryInterval, + long startTime, + String tableId, + String hashBucketNum, + List partitionColumns, + Plan partitionFilters) { + this.context = context; + this.splitAssigner = splitAssigner; + this.discoveryInterval = discoveryInterval; + this.tableId = tableId; + this.startTime = startTime; + this.hashBucketNum = Integer.parseInt(hashBucketNum); + this.taskIdsAwaitingSplit = Sets.newConcurrentHashSet(); + this.partitionLatestTimestamp = Maps.newConcurrentMap(); + this.partitionColumns = partitionColumns; + + Schema tableSchema = ArrowUtils.toArrowSchema(rowType); + List partitionFields = partitionColumns.stream().map(tableSchema::findField).collect(Collectors.toList()); + + this.partitionArrowSchema = new Schema(partitionFields); + this.partitionFilters = partitionFilters; + tableInfo = DataOperation.dbManager().getTableInfoByTableId(tableId); + } + + @Override + public void start() { + context.callAsync(this::enumerateSplits, this::processDiscoveredSplits, discoveryInterval, + discoveryInterval); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + if (!context.registeredReaders().containsKey(subtaskId)) { + // reader failed between sending the request and now. skip this request. + return; + } + int tasksSize = context.registeredReaders().size(); + Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); + if (nextSplit.isPresent()) { + context.assignSplit(nextSplit.get(), subtaskId); + taskIdsAwaitingSplit.remove(subtaskId); + } else { + taskIdsAwaitingSplit.add(subtaskId); + } + + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Add split back: {}", splits); + splitAssigner.addSplits(splits); + } + + @Override + public void addReader(int subtaskId) { + } + + @Override + public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { + LakeSoulPendingSplits pendingSplits = + new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "", + this.discoveryInterval, this.hashBucketNum); + LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits); + return pendingSplits; + } + + @Override + public void close() throws IOException { + + } + + private void processDiscoveredSplits(Collection splits, Throwable error) { + if (error != null) { + LOG.error("Failed to enumerate files", error); + return; + } + LOG.info("Process discovered splits {}", splits); + int tasksSize = context.registeredReaders().size(); + this.splitAssigner.addSplits(splits); + for (Integer item : taskIdsAwaitingSplit) { + Optional al = this.splitAssigner.getNext(item, tasksSize); + if (al.isPresent()) { + context.assignSplit(al.get(), item); + taskIdsAwaitingSplit.remove(item); + } + } + } + + public Collection enumerateSplits() { + List allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId); + List filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters); + + + ArrayList splits = new ArrayList<>(16); + for (PartitionInfo partitionInfo : filteredPartition) { + String partitionDesc = partitionInfo.getPartitionDesc(); + long latestTimestamp = partitionInfo.getTimestamp(); + this.nextStartTime = Math.max(latestTimestamp, this.nextStartTime); + + DataFileInfo[] dataFileInfos; + if (partitionLatestTimestamp.containsKey(partitionDesc)) { + Long lastTimestamp = partitionLatestTimestamp.get(partitionDesc); + dataFileInfos = + DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, lastTimestamp + 1, latestTimestamp, "incremental"); + } else { + dataFileInfos = + DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, startTime, latestTimestamp, "incremental"); + } + if (dataFileInfos.length > 0) { + Map>> splitByRangeAndHashPartition = + FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfos); + for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { + for (Map.Entry> split : entry.getValue().entrySet()) { + splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey(), partitionDesc)); + } + } + } + partitionLatestTimestamp.put(partitionDesc, latestTimestamp); + } + + return splits; + } +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynSplitAssigner.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynSplitAssigner.java index 22678cd23..d27ad7530 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynSplitAssigner.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynSplitAssigner.java @@ -7,20 +7,20 @@ import java.util.*; public class LakeSoulDynSplitAssigner { - private final HashMap> splits; + private final HashMap> splits; private int hashBucketNum = -1; - public LakeSoulDynSplitAssigner(Collection splits, String hashBucketNum) { + public LakeSoulDynSplitAssigner(Collection splits, String hashBucketNum) { this.hashBucketNum = Integer.valueOf(hashBucketNum); this.splits = new HashMap<>(100); addSplitsFromCollection(splits); } - private void addSplitsFromCollection(Collection splitsCol) { + private void addSplitsFromCollection(Collection splitsCol) { if (splitsCol == null && splitsCol.size() == 0) { return; } - for (LakeSoulSplit lss : splitsCol) { + for (LakeSoulPartitionSplit lss : splitsCol) { if (!this.splits.containsKey(lss.getBucketId())) { this.splits.put(lss.getBucketId(), new ArrayList<>()); } @@ -34,13 +34,12 @@ public LakeSoulDynSplitAssigner(String hashBucketNum) { } - - public Optional getNext(int taskId, int tasksNum) { + public Optional getNext(int taskId, int tasksNum) { final int size = splits.size(); if (size > 0) { if (-1 == this.hashBucketNum) { - Collection> all = this.splits.values(); - for (ArrayList al : all) { + Collection> all = this.splits.values(); + for (ArrayList al : all) { if (al.size() > 0) { return Optional.of(al.remove(0)); } @@ -48,11 +47,11 @@ public Optional getNext(int taskId, int tasksNum) { return Optional.empty(); } else { if (this.hashBucketNum <= tasksNum) { - ArrayList taskSplits = this.splits.get(taskId); + ArrayList taskSplits = this.splits.get(taskId); return (taskSplits == null || taskSplits.size() == 0) ? Optional.empty() : Optional.of(taskSplits.remove(0)); } else { for (int i = taskId; i < this.hashBucketNum; i += tasksNum) { - ArrayList splits = this.splits.get(i); + ArrayList splits = this.splits.get(i); if (splits != null && splits.size() > 0) { return Optional.of(splits.remove(0)); } @@ -67,12 +66,12 @@ public Optional getNext(int taskId, int tasksNum) { } - public void addSplits(Collection newSplits) { + public void addSplits(Collection newSplits) { addSplitsFromCollection(newSplits); } - public List remainingSplits() { - ArrayList als = new ArrayList<>(100); + public List remainingSplits() { + ArrayList als = new ArrayList<>(100); for (ArrayList al : this.splits.values()) { als.addAll(al); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynamicSplitEnumerator.java deleted file mode 100644 index 25f3769db..000000000 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulDynamicSplitEnumerator.java +++ /dev/null @@ -1,131 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package org.apache.flink.lakesoul.source; - -import com.dmetasoul.lakesoul.meta.DataFileInfo; -import com.dmetasoul.lakesoul.meta.DataOperation; -import com.dmetasoul.lakesoul.meta.MetaVersion; -import org.apache.flink.api.connector.source.SplitEnumerator; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.core.fs.Path; -import org.apache.flink.lakesoul.tool.FlinkUtil; -import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; - -public class LakeSoulDynamicSplitEnumerator implements SplitEnumerator { - private static final Logger LOG = LoggerFactory.getLogger(LakeSoulDynamicSplitEnumerator.class); - - private final SplitEnumeratorContext context; - - private final LakeSoulDynSplitAssigner splitAssigner; - private final long discoveryInterval; - private final String parDesc; - private final Set taskIdsAwaitingSplit; - String tableId; - private long startTime; - private long nextStartTime; - private int hashBucketNum = -1; - - - public LakeSoulDynamicSplitEnumerator(SplitEnumeratorContext context, - LakeSoulDynSplitAssigner splitAssigner, long discoveryInterval, - long startTime, String tableId, String parDesc, String hashBucketNum) { - this.context = context; - this.splitAssigner = splitAssigner; - this.discoveryInterval = discoveryInterval; - this.tableId = tableId; - this.startTime = startTime; - this.parDesc = parDesc; - this.hashBucketNum = Integer.parseInt(hashBucketNum); - this.taskIdsAwaitingSplit = Sets.newConcurrentHashSet(); - } - - @Override - public void start() { - context.callAsync(() -> this.enumerateSplits(tableId), this::processDiscoveredSplits, discoveryInterval, - discoveryInterval); - } - - @Override - public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { - if (!context.registeredReaders().containsKey(subtaskId)) { - // reader failed between sending the request and now. skip this request. - return; - } - int tasksSize = context.registeredReaders().size(); - Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); - if (nextSplit.isPresent()) { - context.assignSplit(nextSplit.get(), subtaskId); - taskIdsAwaitingSplit.remove(subtaskId); - } else { - taskIdsAwaitingSplit.add(subtaskId); - } - - } - - @Override - public void addSplitsBack(List splits, int subtaskId) { - LOG.info("Add split back: {}", splits); - splitAssigner.addSplits(splits); - } - - @Override - public void addReader(int subtaskId) { - } - - @Override - public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { - LakeSoulPendingSplits pendingSplits = - new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, this.parDesc, - this.discoveryInterval, this.hashBucketNum); - LOG.info("LakeSoulDynamicSplitEnumerator snapshotState {}", pendingSplits); - return pendingSplits; - } - - @Override - public void close() throws IOException { - - } - - private void processDiscoveredSplits(Collection splits, Throwable error) { - if (error != null) { - LOG.error("Failed to enumerate files", error); - return; - } - LOG.info("Process discovered splits {}", splits); - int tasksSize = context.registeredReaders().size(); - this.splitAssigner.addSplits(splits); - for (Integer item : taskIdsAwaitingSplit) { - Optional al = this.splitAssigner.getNext(item, tasksSize); - if (al.isPresent()) { - context.assignSplit(al.get(), item); - taskIdsAwaitingSplit.remove(item); - } - } - } - - public Collection enumerateSplits(String tid) { - this.nextStartTime = MetaVersion.getLastedTimestamp(tid, parDesc) + 1; - DataFileInfo[] dfinfos = - DataOperation.getIncrementalPartitionDataInfo(tid, parDesc, this.startTime, this.nextStartTime, - "incremental"); - LOG.info("Found new data info {}", (Object) dfinfos); - ArrayList splits = new ArrayList<>(16); - Map>> splitByRangeAndHashPartition = - FlinkUtil.splitDataInfosToRangeAndHashPartition(tid, dfinfos); - for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { - for (Map.Entry> split : entry.getValue().entrySet()) { - splits.add(new LakeSoulSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey())); - } - } - this.startTime = this.nextStartTime; - return splits; - } -} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java index 9f165c80c..9fbb63336 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java @@ -52,12 +52,14 @@ public LakeSoulLookupTableSource(TableId tableId, RowType rowType, boolean isStreaming, List pkColumns, + List partitionColumns, ResolvedCatalogTable catalogTable, Map optionParams) { super(tableId, rowType, isStreaming, pkColumns, + partitionColumns, optionParams); this.catalogTable = catalogTable; this.producedDataType = catalogTable.getResolvedSchema().toPhysicalRowDataType(); @@ -225,18 +227,20 @@ protected boolean isReadingLatest() { */ @Override public DynamicTableSource copy() { - LakeSoulLookupTableSource lsts = + LakeSoulLookupTableSource newInstance = new LakeSoulLookupTableSource(this.tableId, - this.rowType, - this.isStreaming, + this.tableRowType, + this.isBounded, this.pkColumns, + this.partitionColumns, this.catalogTable, this.optionParams); - lsts.projectedFields = this.projectedFields; - lsts.remainingPartitions = this.remainingPartitions; - lsts.filter = this.filter; - lsts.modificationContext = this.modificationContext; - return lsts; + newInstance.projectedFields = this.projectedFields; + newInstance.remainingPartitions = this.remainingPartitions; + newInstance.pushedFilters = this.pushedFilters; + newInstance.modificationContext = this.modificationContext; + newInstance.partitionFilters = this.partitionFilters; + return newInstance; } /** diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java index dd3c92ec6..21c843bed 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java @@ -6,9 +6,11 @@ import com.dmetasoul.lakesoul.LakeSoulArrowReader; import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader; +import com.dmetasoul.lakesoul.meta.DBUtil; import io.substrait.proto.Plan; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -20,7 +22,6 @@ import org.apache.flink.table.runtime.arrow.ArrowUtils; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.RowKind; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,25 +35,28 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds finishedSplit; + private final List partitionColumns; + private final RowType tableRowType; + private final Schema partitionSchema; List pkColumns; - LinkedHashMap partitions; + LinkedHashMap partitionValues; - boolean isStreaming; + boolean isBounded; String cdcColumn; @@ -76,24 +80,33 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds pkColumns, - boolean isStreaming, + boolean isBounded, String cdcColumn, + List partitionColumns, Plan filter) throws Exception { this.split = split; this.skipRecords = split.getSkipRecord(); this.conf = new Configuration(conf); - this.schema = schema; - this.schemaWithPk = schemaWithPk; + this.tableRowType = tableRowType; + this.projectedRowType = projectedRowType; + this.projectedRowTypeWithPk = projectedRowTypeWithPk; this.pkColumns = pkColumns; this.splitId = split.splitId(); - this.isStreaming = isStreaming; + this.isBounded = isBounded; this.cdcColumn = cdcColumn; this.finishedSplit = Collections.singleton(splitId); + this.partitionColumns = partitionColumns; + Schema tableSchema = ArrowUtils.toArrowSchema(tableRowType); + List partitionFields = partitionColumns.stream().map(tableSchema::findField).collect(Collectors.toList()); + + this.partitionSchema = new Schema(partitionFields); + this.partitionValues = DBUtil.parsePartitionDesc(split.getPartitionDesc()); this.filter = filter; initializeReader(); recoverFromSkipRecord(); @@ -104,27 +117,28 @@ private void initializeReader() throws IOException { for (Path path : split.getFiles()) { reader.addFile(FlinkUtil.makeQualifiedPath(path).toString()); } - this.partitions = PartitionPathUtils.extractPartitionSpecFromPath(split.getFiles().get(0)); List nonPartitionColumns = - this.schema.getFieldNames().stream().filter(name -> !this.partitions.containsKey(name)) + this.projectedRowType.getFieldNames().stream().filter(name -> !this.partitionValues.containsKey(name)) .collect(Collectors.toList()); if (!nonPartitionColumns.isEmpty()) { ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf)); // native reader requires pk columns in schema - Schema arrowSchema = ArrowUtils.toArrowSchema(schemaWithPk); + Schema arrowSchema = ArrowUtils.toArrowSchema(projectedRowTypeWithPk); reader.setSchema(arrowSchema); reader.setPrimaryKeys(pkColumns); FlinkUtil.setFSConfigs(conf, reader); } + reader.setPartitionSchema(partitionSchema); + if (!cdcColumn.isEmpty()) { - int cdcField = schemaWithPk.getFieldIndex(cdcColumn); + int cdcField = projectedRowTypeWithPk.getFieldIndex(cdcColumn); cdcFieldGetter = RowData.createFieldGetter(new VarCharType(), cdcField); } - for (Map.Entry partition : this.partitions.entrySet()) { + for (Map.Entry partition : this.partitionValues.entrySet()) { reader.setDefaultColumnValue(partition.getKey(), partition.getValue()); } @@ -136,7 +150,7 @@ private void initializeReader() throws IOException { " non partition cols={}, cdc column={}, filter={}", split, pkColumns, - partitions, + partitionValues, nonPartitionColumns, cdcColumn, filter); @@ -148,16 +162,16 @@ private void initializeReader() throws IOException { // final returned row should only contain requested schema in query private void makeCurrentArrowReader() { this.curArrowReader = ArrowUtils.createArrowReader(currentVCR, - this.schemaWithPk); + this.projectedRowTypeWithPk); // this.schema contains only requested fields, which does not include cdc column // and may not include pk columns ArrayList requestedVectors = new ArrayList<>(); - for (String fieldName : schema.getFieldNames()) { - int index = schemaWithPk.getFieldIndex(fieldName); + for (String fieldName : projectedRowType.getFieldNames()) { + int index = projectedRowTypeWithPk.getFieldIndex(fieldName); requestedVectors.add(currentVCR.getVector(index)); } this.curArrowReaderRequestedSchema = - ArrowUtils.createArrowReader(new VectorSchemaRoot(requestedVectors), schema); + ArrowUtils.createArrowReader(new VectorSchemaRoot(requestedVectors), projectedRowType); } private void recoverFromSkipRecord() throws Exception { @@ -230,7 +244,7 @@ public RowData nextRecordFromSplit() { // row kind by default is insert rd = this.curArrowReader.read(rowId); if (!cdcColumn.isEmpty()) { - if (this.isStreaming) { + if (!this.isBounded) { // set rowkind according to cdc row kind field value rk = FlinkUtil.operationToRowKind((StringData) cdcFieldGetter.getFieldOrNull(rd)); LOG.debug("Set RowKind to {}", rk); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplit.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPartitionSplit.java similarity index 72% rename from lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplit.java rename to lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPartitionSplit.java index 00c183c9f..0af7e7033 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplit.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPartitionSplit.java @@ -9,12 +9,13 @@ import java.io.Serializable; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** * Source split for LakeSoul's flink source */ -public class LakeSoulSplit implements SourceSplit, Serializable { +public class LakeSoulPartitionSplit implements SourceSplit, Serializable { private final String id; private long skipRecord = 0; @@ -22,19 +23,23 @@ public class LakeSoulSplit implements SourceSplit, Serializable { private final List files; private int bucketId = -1; - public LakeSoulSplit(String id, List files, long skipRecord) { + private final String partitionDesc; + + public LakeSoulPartitionSplit(String id, List files, long skipRecord, String partitionDesc) { assert id != null; this.id = id; this.files = files; this.skipRecord = skipRecord; + this.partitionDesc = partitionDesc; } - public LakeSoulSplit(String id, List files, long skipRecord, int bucketId) { + public LakeSoulPartitionSplit(String id, List files, long skipRecord, int bucketId, String partitionDesc) { assert id != null; this.id = id; this.files = files; this.skipRecord = skipRecord; this.bucketId = bucketId; + this.partitionDesc = partitionDesc; } @Override @@ -46,6 +51,10 @@ public List getFiles() { return files; } + public String getPartitionDesc() { + return partitionDesc; + } + public void incrementRecord() { this.skipRecord++; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java index e4a77f746..86254cf81 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulPendingSplits.java @@ -11,7 +11,7 @@ public class LakeSoulPendingSplits { /** * Split to read for both batch and streaming */ - private final List splits; + private final List splits; /** * Already discovered latest version's timestamp @@ -19,20 +19,21 @@ public class LakeSoulPendingSplits { */ private final long lastReadTimestamp; - private final String tableid; + private final String tableId; private final String parDesc; private final long discoverInterval; private final int hashBucketNum; - public LakeSoulPendingSplits(List splits, long lastReadTimestamp, String tableid, String parDesc, long discoverInterval, int hashBucketNum) { + + public LakeSoulPendingSplits(List splits, long lastReadTimestamp, String tableId, String parDesc, long discoverInterval, int hashBucketNum) { this.splits = splits; this.lastReadTimestamp = lastReadTimestamp; - this.tableid = tableid; + this.tableId = tableId; this.parDesc = parDesc; this.discoverInterval = discoverInterval; this.hashBucketNum = hashBucketNum; } - public List getSplits() { + public List getSplits() { return splits; } @@ -40,8 +41,8 @@ public long getLastReadTimestamp() { return lastReadTimestamp; } - public String getTableid() { - return tableid; + public String getTableId() { + return tableId; } public String getParDesc() { @@ -59,7 +60,7 @@ public int getHashBucketNum() { @Override public String toString() { return "LakeSoulPendingSplits{" + "splits=" + splits + ", lastReadTimestamp=" + lastReadTimestamp + - ", tableid='" + tableid + '\'' + ", parDesc='" + parDesc + '\'' + ", discoverInterval=" + + ", tableid='" + tableId + '\'' + ", parDesc='" + parDesc + '\'' + ", discoverInterval=" + discoverInterval + ", hashBucketNum=" + hashBucketNum + '}'; } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulRecordEmitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulRecordEmitter.java index da6dc77bd..60878948f 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulRecordEmitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulRecordEmitter.java @@ -8,9 +8,9 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.table.data.RowData; -public class LakeSoulRecordEmitter implements RecordEmitter { +public class LakeSoulRecordEmitter implements RecordEmitter { @Override - public void emitRecord(RowData element, SourceOutput output, LakeSoulSplit splitState) throws Exception { + public void emitRecord(RowData element, SourceOutput output, LakeSoulPartitionSplit splitState) throws Exception { output.collect(element); splitState.incrementRecord(); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSimpleSplitAssigner.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSimpleSplitAssigner.java index 23ba78e81..e03ae55b3 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSimpleSplitAssigner.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSimpleSplitAssigner.java @@ -7,24 +7,26 @@ import java.util.*; public class LakeSoulSimpleSplitAssigner { - private final List splits; + private final List splits; - public LakeSoulSimpleSplitAssigner(Collection splits) { + public LakeSoulSimpleSplitAssigner(Collection splits) { this.splits = new LinkedList<>(splits); } + public LakeSoulSimpleSplitAssigner() { this.splits = new LinkedList<>(); } - public Optional getNext() { + public Optional getNext() { final int size = splits.size(); return size == 0 ? Optional.empty() : Optional.of(splits.remove(0)); } - public void addSplits(Collection newSplits) { + + public void addSplits(Collection newSplits) { splits.addAll(newSplits); } - public List remainingSplits() { + public List remainingSplits() { return splits; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java index 1423b6b95..ffea89fdd 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java @@ -26,46 +26,60 @@ import java.time.format.DateTimeFormatter; import java.util.*; -public class LakeSoulSource implements Source { - TableId tableId; +public class LakeSoulSource implements Source { + final TableId tableId; - RowType rowType; + final RowType projectedRowType; - RowType rowTypeWithPk; + final RowType rowTypeWithPk; - boolean isStreaming; + final boolean isBounded; - List pkColumns; + final List pkColumns; - Map optionParams; + final List partitionColumns; + + final Map optionParams; + + @Nullable + final List> remainingPartitions; @Nullable - List> remainingPartitions; + final Plan pushedFilter; + @Nullable - Plan filter; + final Plan partitionFilters; + private final RowType tableRowType; public LakeSoulSource(TableId tableId, - RowType rowType, + RowType tableRowType, + RowType projectedRowType, RowType rowTypeWithPk, - boolean isStreaming, + boolean isBounded, List pkColumns, + List partitionColumns, Map optionParams, @Nullable List> remainingPartitions, - @Nullable Plan filter) { + @Nullable Plan pushedFilter, + @Nullable Plan partitionFilters + ) { this.tableId = tableId; - this.rowType = rowType; + this.tableRowType = tableRowType; + this.projectedRowType = projectedRowType; this.rowTypeWithPk = rowTypeWithPk; - this.isStreaming = isStreaming; + this.isBounded = isBounded; this.pkColumns = pkColumns; + this.partitionColumns = partitionColumns; this.optionParams = optionParams; this.remainingPartitions = remainingPartitions; - this.filter = filter; + this.pushedFilter = pushedFilter; + this.partitionFilters = partitionFilters; } @Override public Boundedness getBoundedness() { - if (this.isStreaming) { + if (!this.isBounded) { return Boundedness.CONTINUOUS_UNBOUNDED; } else { return Boundedness.BOUNDED; @@ -73,50 +87,63 @@ public Boundedness getBoundedness() { } @Override - public SourceReader createReader(SourceReaderContext readerContext) throws Exception { + public SourceReader createReader(SourceReaderContext readerContext) throws Exception { Configuration conf = Configuration.fromMap(optionParams); conf.addAll(readerContext.getConfiguration()); return new LakeSoulSourceReader( - () -> new LakeSoulSplitReader(conf, - this.rowType, + () -> new LakeSoulSplitReader( + conf, + this.tableRowType, + this.projectedRowType, this.rowTypeWithPk, this.pkColumns, - this.isStreaming, + this.isBounded, this.optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""), - this.filter), + this.partitionColumns, + this.pushedFilter), new LakeSoulRecordEmitter(), readerContext.getConfiguration(), readerContext); } @Override - public SplitEnumerator createEnumerator( - SplitEnumeratorContext enumContext) { + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableId.table(), tableId.schema()); List readStartTimestampWithTimeZone = Arrays.asList(optionParams.getOrDefault(LakeSoulOptions.READ_START_TIME(), ""), optionParams.getOrDefault(LakeSoulOptions.TIME_ZONE(), "")); String readType = optionParams.getOrDefault(LakeSoulOptions.READ_TYPE(), ""); - if (this.isStreaming) { - String partDesc = optionParams.getOrDefault(LakeSoulOptions.PARTITION_DESC(), ""); - if (partDesc.isEmpty()) { - if (remainingPartitions != null && !remainingPartitions.isEmpty()) { - // use remaining partition - if (remainingPartitions.size() > 1) { - throw new RuntimeException("Streaming read allows only one specified partition," + - " or no specified partition to incrementally read entire table"); - } - partDesc = DBUtil.formatPartitionDesc(remainingPartitions.get(0)); - } - } - return new LakeSoulDynamicSplitEnumerator(enumContext, + if (getBoundedness().equals(Boundedness.CONTINUOUS_UNBOUNDED)) { + return new LakeSoulAllPartitionDynamicSplitEnumerator(enumContext, new LakeSoulDynSplitAssigner(optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")), + this.tableRowType, Long.parseLong(optionParams.getOrDefault(LakeSoulOptions.DISCOVERY_INTERVAL(), "30000")), convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone), tableInfo.getTableId(), - partDesc, - optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")); + optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1"), + partitionColumns, + partitionFilters); + +// String partDesc = optionParams.getOrDefault(LakeSoulOptions.PARTITION_DESC(), ""); +// if (partDesc.isEmpty()) { +// if (remainingPartitions != null && !remainingPartitions.isEmpty()) { +// // use remaining partition +// if (remainingPartitions.size() > 1) { +// throw new RuntimeException("Streaming read allows only one specified partition," + +// " or no specified partition to incrementally read entire table"); +// } +// partDesc = DBUtil.formatPartitionDesc(remainingPartitions.get(0)); +// } +// } +// return new LakeSoulDynamicSplitEnumerator(enumContext, +// new LakeSoulDynSplitAssigner(optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")), +// Long.parseLong(optionParams.getOrDefault(LakeSoulOptions.DISCOVERY_INTERVAL(), "30000")), +// convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone), +// tableInfo.getTableId(), +// partDesc, +// optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")); } else { return staticSplitEnumerator(enumContext, tableInfo, @@ -125,7 +152,7 @@ public SplitEnumerator createEnumerator( } } - private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorContext enumContext, + private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorContext enumContext, TableInfo tableInfo, List readStartTimestampWithTimeZone, String readType) { @@ -156,24 +183,26 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte } } int capacity = 100; - ArrayList splits = new ArrayList<>(capacity); + ArrayList splits = new ArrayList<>(capacity); if (!FlinkUtil.isExistHashPartition(tableInfo)) { for (DataFileInfo dataFileInfo : dataFileInfoList) { ArrayList tmp = new ArrayList<>(); tmp.add(new Path(dataFileInfo.path())); - splits.add(new LakeSoulSplit(String.valueOf(dataFileInfo.hashCode()), + splits.add(new LakeSoulPartitionSplit(String.valueOf(dataFileInfo.hashCode()), tmp, - 0)); + 0, + dataFileInfo.range_partitions())); } } else { Map>> splitByRangeAndHashPartition = - FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo.getTableId(), + FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfoList.toArray(new DataFileInfo[0])); for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { for (Map.Entry> split : entry.getValue().entrySet()) { - splits.add(new LakeSoulSplit(String.valueOf(split.hashCode()), + splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), - 0)); + 0, + entry.getKey())); } } } @@ -203,21 +232,26 @@ private long convertTimeFormatWithTimeZone(List readTimestampWithTimeZon } @Override - public SplitEnumerator restoreEnumerator( - SplitEnumeratorContext enumContext, + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, LakeSoulPendingSplits checkpoint) throws Exception { - return new LakeSoulDynamicSplitEnumerator(enumContext, + return new LakeSoulAllPartitionDynamicSplitEnumerator( + enumContext, new LakeSoulDynSplitAssigner(checkpoint.getSplits(), String.valueOf(checkpoint.getHashBucketNum())), + this.tableRowType, checkpoint.getDiscoverInterval(), checkpoint.getLastReadTimestamp(), - checkpoint.getTableid(), - checkpoint.getParDesc(), - String.valueOf(checkpoint.getHashBucketNum())); + checkpoint.getTableId(), +// checkpoint.getParDesc(), + String.valueOf(checkpoint.getHashBucketNum()), + this.partitionColumns, + this.partitionFilters + ); } @Override - public SimpleVersionedSerializer getSplitSerializer() { + public SimpleVersionedSerializer getSplitSerializer() { return new SimpleLakeSoulSerializer(); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSourceReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSourceReader.java index a272d7761..c56a1a226 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSourceReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSourceReader.java @@ -16,34 +16,34 @@ public class LakeSoulSourceReader extends SingleThreadMultiplexSourceReaderBase< - RowData, RowData, LakeSoulSplit, LakeSoulSplit> { + RowData, RowData, LakeSoulPartitionSplit, LakeSoulPartitionSplit> { - public LakeSoulSourceReader(Supplier> splitReaderSupplier, - RecordEmitter recordEmitter, + public LakeSoulSourceReader(Supplier> splitReaderSupplier, + RecordEmitter recordEmitter, Configuration config, SourceReaderContext context) { super(splitReaderSupplier, recordEmitter, config, context); } @Override - public void start(){ - if(getNumberOfCurrentlyAssignedSplits() == 0) { + public void start() { + if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } @Override - protected void onSplitFinished(Map finishedSplitIds) { + protected void onSplitFinished(Map finishedSplitIds) { context.sendSplitRequest(); } @Override - protected LakeSoulSplit initializedState(LakeSoulSplit split) { + protected LakeSoulPartitionSplit initializedState(LakeSoulPartitionSplit split) { return split; } @Override - protected LakeSoulSplit toSplitType(String splitId, LakeSoulSplit splitState) { + protected LakeSoulPartitionSplit toSplitType(String splitId, LakeSoulPartitionSplit splitState) { return splitState; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java index e2c33a7e1..e76ba3335 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java @@ -21,21 +21,22 @@ import java.util.Objects; import java.util.Queue; -public class LakeSoulSplitReader implements SplitReader { +public class LakeSoulSplitReader implements SplitReader { private static final Logger LOG = LoggerFactory.getLogger(LakeSoulSplitReader.class); private final Configuration conf; - private final Queue splits; + private final Queue splits; + private final List partitionColumns; + private final RowType tableRowType; + RowType projectedRowType; - RowType rowType; - - RowType rowTypeWithPk; + RowType projectedRowTypeWithPk; List pkColumns; - boolean isStreaming; + boolean isBounded; String cdcColumn; @@ -44,19 +45,23 @@ public class LakeSoulSplitReader implements SplitReader private LakeSoulOneSplitRecordsReader lastSplitReader; public LakeSoulSplitReader(Configuration conf, - RowType rowType, - RowType rowTypeWithPk, + RowType tableRowType, + RowType projectedRowType, + RowType projectedRowTypeWithPk, List pkColumns, - boolean isStreaming, + boolean isBounded, String cdcColumn, + List partitionColumns, Plan filter) { this.conf = conf; this.splits = new ArrayDeque<>(); - this.rowType = rowType; - this.rowTypeWithPk = rowTypeWithPk; + this.tableRowType = tableRowType; + this.projectedRowType = projectedRowType; + this.projectedRowTypeWithPk = projectedRowTypeWithPk; this.pkColumns = pkColumns; - this.isStreaming = isStreaming; + this.isBounded = isBounded; this.cdcColumn = cdcColumn; + this.partitionColumns = partitionColumns; this.filter = filter; } @@ -67,11 +72,13 @@ public RecordsWithSplitIds fetch() throws IOException { lastSplitReader = new LakeSoulOneSplitRecordsReader(this.conf, Objects.requireNonNull(splits.poll()), - this.rowType, - this.rowTypeWithPk, + this.tableRowType, + this.projectedRowType, + this.projectedRowTypeWithPk, this.pkColumns, - this.isStreaming, + this.isBounded, this.cdcColumn, + this.partitionColumns, this.filter ); return lastSplitReader; @@ -81,7 +88,7 @@ public RecordsWithSplitIds fetch() throws IOException { } @Override - public void handleSplitsChanges(SplitsChange splitChange) { + public void handleSplitsChanges(SplitsChange splitChange) { if (!(splitChange instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format("The SplitChange type of %s is not supported.", diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java index f96327e90..f5d18d019 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java @@ -14,22 +14,23 @@ import java.util.List; import java.util.Optional; -public class LakeSoulStaticSplitEnumerator implements SplitEnumerator { +public class LakeSoulStaticSplitEnumerator implements SplitEnumerator { private static final Logger LOG = LoggerFactory.getLogger(LakeSoulStaticSplitEnumerator.class); - private final SplitEnumeratorContext context; + private final SplitEnumeratorContext context; private final LakeSoulSimpleSplitAssigner splitAssigner; - public LakeSoulStaticSplitEnumerator(SplitEnumeratorContext context, + public LakeSoulStaticSplitEnumerator(SplitEnumeratorContext context, LakeSoulSimpleSplitAssigner splitAssigner) { this.context = context; this.splitAssigner = splitAssigner; } @Override - public void start() {} + public void start() { + } @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { @@ -38,9 +39,9 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname return; } - final Optional nextSplit = splitAssigner.getNext(); + final Optional nextSplit = splitAssigner.getNext(); if (nextSplit.isPresent()) { - final LakeSoulSplit split = nextSplit.get(); + final LakeSoulPartitionSplit split = nextSplit.get(); context.assignSplit(split, subtaskId); LOG.info("Assigned split to subtask {} : {}", subtaskId, split); } else { @@ -50,13 +51,14 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List splits, int subtaskId) { + public void addSplitsBack(List splits, int subtaskId) { LOG.info("Add split back: {}", splits); splitAssigner.addSplits(splits); } @Override - public void addReader(int subtaskId) {} + public void addReader(int subtaskId) { + } @Override public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { @@ -65,5 +67,6 @@ public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { } @Override - public void close() throws IOException {} + public void close() throws IOException { + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulPendingSplitsSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulPendingSplitsSerializer.java index 42d7ea62e..8c87b3ea0 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulPendingSplitsSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulPendingSplitsSerializer.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; public class SimpleLakeSoulPendingSplitsSerializer implements SimpleVersionedSerializer { private static final ThreadLocal SERIALIZER_CACHE = @@ -26,10 +27,10 @@ public int getVersion() { @Override public byte[] serialize(LakeSoulPendingSplits splits) throws IOException { final DataOutputSerializer out = SERIALIZER_CACHE.get(); - List lsplits = splits.getSplits(); + List lsplits = splits.getSplits(); out.writeLong(splits.getLastReadTimestamp()); out.writeInt(lsplits.size()); - for (LakeSoulSplit split : lsplits) { + for (LakeSoulPartitionSplit split : lsplits) { out.writeUTF(split.splitId()); List paths = split.getFiles(); out.writeInt(paths.size()); @@ -38,8 +39,9 @@ public byte[] serialize(LakeSoulPendingSplits splits) throws IOException { } out.writeLong(split.getSkipRecord()); out.writeInt(split.getBucketId()); + out.writeUTF(split.getPartitionDesc()); } - out.writeUTF(splits.getTableid()); + out.writeUTF(splits.getTableId()); out.writeUTF(splits.getParDesc()); out.writeLong(splits.getDiscoverInterval()); out.writeInt(splits.getHashBucketNum()); @@ -54,7 +56,7 @@ public LakeSoulPendingSplits deserialize(int version, byte[] serialized) throws final DataInputDeserializer in = new DataInputDeserializer(serialized); final long startReadTime = in.readLong(); final int splitSize = in.readInt(); - final LakeSoulSplit[] lsplits = new LakeSoulSplit[splitSize]; + final LakeSoulPartitionSplit[] lsplits = new LakeSoulPartitionSplit[splitSize]; for (int j = 0; j < splitSize; j++) { final String id = in.readUTF(); final int size = in.readInt(); @@ -65,7 +67,8 @@ public LakeSoulPendingSplits deserialize(int version, byte[] serialized) throws } final long skipRecord = in.readLong(); final int bucketID = in.readInt(); - lsplits[j] = new LakeSoulSplit(id, Arrays.asList(paths), skipRecord, bucketID); + final String partitionDesc = in.readUTF(); + lsplits[j] = new LakeSoulPartitionSplit(id, Arrays.asList(paths), skipRecord, bucketID, partitionDesc); } final String tableid = in.readUTF(); final String parDesc = in.readUTF(); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulSerializer.java index a735fde69..4a1c5dc44 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/SimpleLakeSoulSerializer.java @@ -13,7 +13,7 @@ import java.util.Arrays; import java.util.List; -public class SimpleLakeSoulSerializer implements SimpleVersionedSerializer { +public class SimpleLakeSoulSerializer implements SimpleVersionedSerializer { private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); private static final int VERSION = 1; @@ -24,7 +24,7 @@ public int getVersion() { } @Override - public byte[] serialize(LakeSoulSplit split) throws IOException { + public byte[] serialize(LakeSoulPartitionSplit split) throws IOException { final DataOutputSerializer out = SERIALIZER_CACHE.get(); out.writeUTF(split.splitId()); @@ -35,6 +35,7 @@ public byte[] serialize(LakeSoulSplit split) throws IOException { } out.writeLong(split.getSkipRecord()); out.writeInt(split.getBucketId()); + out.writeUTF(split.getPartitionDesc()); final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -42,7 +43,7 @@ public byte[] serialize(LakeSoulSplit split) throws IOException { @Override - public LakeSoulSplit deserialize(int version, byte[] serialized) throws IOException { + public LakeSoulPartitionSplit deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -55,7 +56,8 @@ public LakeSoulSplit deserialize(int version, byte[] serialized) throws IOExcept } final long skipRecord = in.readLong(); final int bucketid = in.readInt(); - return new LakeSoulSplit(id, Arrays.asList(paths), skipRecord, bucketid); + final String partitionDesc = in.readUTF(); + return new LakeSoulPartitionSplit(id, Arrays.asList(paths), skipRecord, bucketid, partitionDesc); } throw new IOException("Unknown version: " + version); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitFlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitFlinkUtil.java index 0d788e0fe..30ed111de 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitFlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitFlinkUtil.java @@ -3,52 +3,61 @@ import io.substrait.expression.Expression; import io.substrait.expression.ExpressionCreator; import io.substrait.extension.SimpleExtension; -import io.substrait.plan.Plan; import io.substrait.type.TypeCreator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; import java.util.ArrayList; import java.util.List; import java.util.Set; -import static com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil.*; +import static com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil.and; public class SubstraitFlinkUtil { - public static Tuple2 flinkExprToSubStraitPlan( - List exprs, - List remaining, - String tableName + public static Tuple2 flinkExprToSubStraitExpr( + List filters ) { - List accepted = new ArrayList<>(); - Expression last = null; - for (ResolvedExpression expr : exprs) { - Expression e = doTransform(expr); - if (e == null) { - remaining.add(expr); + List pushed = new ArrayList<>(); + List remaining = new ArrayList<>(); + Expression combined = null; + for (ResolvedExpression fExpr : filters) { + Expression substraitExpr = toSubstraitExpr(fExpr); + if (substraitExpr == null) { + remaining.add(fExpr); } else { - accepted.add(expr); - if (last != null) { - SimpleExtension.FunctionAnchor fa = SimpleExtension.FunctionAnchor.of(BooleanNamespace, "and:bool"); - last = ExpressionCreator.scalarFunction(EXTENSIONS.getScalarFunction(fa), TypeCreator.NULLABLE.BOOLEAN, last, e); + pushed.add(fExpr); + if (combined != null) { + combined = and(combined, substraitExpr); } else { - last = e; + combined = substraitExpr; } } } - Plan filter = exprToFilter(last, tableName); - return Tuple2.of(SupportsFilterPushDown.Result.of(accepted, remaining), planToProto(filter)); + return Tuple2.of(SupportsFilterPushDown.Result.of(pushed, remaining), combined); } - public static Expression doTransform(ResolvedExpression flinkExpression) { + public static Expression toSubstraitExpr(ResolvedExpression flinkExpression) { SubstraitVisitor substraitVisitor = new SubstraitVisitor(); return flinkExpression.accept(substraitVisitor); } + public static boolean filterAllPartitionColumn(ResolvedExpression expression, Set partitionCols) { + if (expression instanceof FieldReferenceExpression) { + return partitionCols.contains(((FieldReferenceExpression) expression).getName()); + } else if (expression instanceof CallExpression) { + return expression.getResolvedChildren().stream().allMatch(child -> filterAllPartitionColumn(child, partitionCols)); + + } else if (expression instanceof ValueLiteralExpression) { + return true; + } + return false; + } + public static boolean filterContainsPartitionColumn(ResolvedExpression expression, Set partitionCols) { if (expression instanceof FieldReferenceExpression) { return partitionCols.contains(((FieldReferenceExpression) expression).getName()); @@ -58,6 +67,8 @@ public static boolean filterContainsPartitionColumn(ResolvedExpression expressio return true; } } + } else if (expression instanceof ValueLiteralExpression) { + return false; } return false; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitVisitor.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitVisitor.java index 8e1db804f..78c57ab07 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitVisitor.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/substrait/SubstraitVisitor.java @@ -223,7 +223,7 @@ public FieldReference visit(FieldReferenceExpression fieldReference) { fieldReference = (FieldReferenceExpression) fieldReference.getChildren().get(0); } LogicalType logicalType = fieldReference.getOutputDataType().getLogicalType(); - Type type = mapType(logicalType); + Type type = logicalTypeToSubstraitType(logicalType); if (type == null) { // not supported return null; @@ -242,7 +242,7 @@ protected FieldReference defaultMethod(org.apache.flink.table.expressions.Expres return null; } - public static Type mapType(LogicalType logicalType) { + public static Type logicalTypeToSubstraitType(LogicalType logicalType) { LogicalTypeRoot typeRoot = logicalType.getTypeRoot(); boolean nullable = logicalType.isNullable(); TypeCreator R = TypeCreator.of(nullable); @@ -290,9 +290,10 @@ public static Type mapType(LogicalType logicalType) { return R.TIMESTAMP_TZ; } default: - LOG.info("unsupported type"); + String msg = String.format("Unsupported LogicalType %s for LogicalTypeToSubstraitType", typeRoot); + LOG.info(msg); + throw new RuntimeException(msg); } - return null; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java index d1cdccf58..a1af3dae0 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java @@ -4,6 +4,8 @@ package org.apache.flink.lakesoul.table; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -22,9 +24,12 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import java.util.*; +import java.util.stream.Collectors; import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CATALOG_PATH; import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FACTORY_IDENTIFIER; @@ -85,29 +90,32 @@ public DynamicTableSource createDynamicTableSource(Context context) { ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); ResolvedCatalogTable catalogTable = context.getCatalogTable(); TableSchema schema = catalogTable.getSchema(); + RowType tableRowType = (RowType) catalogTable.getResolvedSchema().toSourceRowDataType().notNull().getLogicalType(); List pkColumns; if (schema.getPrimaryKey().isPresent()) { pkColumns = schema.getPrimaryKey().get().getColumns(); } else { pkColumns = new ArrayList<>(); } - catalogTable.getPartitionKeys(); - boolean isStreaming = true; + List partitionColumns = catalogTable.getPartitionKeys(); + + boolean isBounded = false; final RuntimeExecutionMode mode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE); if (mode == RuntimeExecutionMode.AUTOMATIC) { throw new RuntimeException( String.format("Runtime execution mode '%s' is not supported yet.", mode)); } else { if (mode == RuntimeExecutionMode.BATCH) { - isStreaming = false; + isBounded = true; } } return new LakeSoulLookupTableSource( new TableId(io.debezium.relational.TableId.parse(objectIdentifier.asSummaryString())), - (RowType) catalogTable.getResolvedSchema().toSourceRowDataType().notNull().getLogicalType(), - isStreaming, + tableRowType, + isBounded, pkColumns, + partitionColumns, catalogTable, options.toMap() ); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java index ae5ae110e..7440b0610 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java @@ -2,6 +2,7 @@ import com.dmetasoul.lakesoul.meta.entity.JniWrapper; import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import io.substrait.proto.Plan; import org.apache.flink.table.connector.RowLevelModificationScanContext; import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; @@ -11,15 +12,21 @@ public class LakeSoulRowLevelModificationScanContext implements RowLevelModificationScanContext { - private final JniWrapper sourcePartitionInfo; + private JniWrapper sourcePartitionInfo; private final SupportsRowLevelModificationScan.RowLevelModificationType type; - private List> remainingPartitions; + Plan partitionFilters; + + Plan nonPartitionFilters; + public LakeSoulRowLevelModificationScanContext(SupportsRowLevelModificationScan.RowLevelModificationType type, List listPartitionInfo) { this.type = type; sourcePartitionInfo = JniWrapper.newBuilder().addAllPartitionInfo(listPartitionInfo).build(); - remainingPartitions = null; + } + + public void setSourcePartitionInfo(JniWrapper sourcePartitionInfo) { + this.sourcePartitionInfo = sourcePartitionInfo; } public JniWrapper getSourcePartitionInfo() { @@ -34,11 +41,37 @@ public SupportsRowLevelModificationScan.RowLevelModificationType getType() { return type; } - public void setRemainingPartitions(List> remainingPartitions) { - this.remainingPartitions = remainingPartitions; + public void setNonPartitionFilters(Plan nonPartitionFilters) { + this.nonPartitionFilters = nonPartitionFilters; + } + + public void setPartitionFilters(Plan partitionFilters) { + this.partitionFilters = partitionFilters; + } + + public Plan getNonPartitionFilters() { + return nonPartitionFilters; + } + + public Plan getPartitionFilters() { + return partitionFilters; + } + + public boolean isDelete() { + return type == SupportsRowLevelModificationScan.RowLevelModificationType.DELETE; + } + + public boolean isUpdate() { + return type == SupportsRowLevelModificationScan.RowLevelModificationType.UPDATE; } - public List> getRemainingPartitions() { - return remainingPartitions; + @Override + public String toString() { + return "LakeSoulRowLevelModificationScanContext{" + + "sourcePartitionInfo=" + sourcePartitionInfo + + ", type=" + type + + ", partitionFilters=" + partitionFilters + + ", nonPartitionFilters=" + nonPartitionFilters + + '}'; } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java index 514fbadd7..28f8f27a3 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java @@ -28,10 +28,13 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; @@ -44,6 +47,8 @@ public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite, SupportsRowLevelDelete, SupportsRowLevelUpdate { + private static final Logger LOG = LoggerFactory.getLogger(LakeSoulTableSink.class); + private final String summaryName; private final String tableName; private final DataType dataType; @@ -54,6 +59,10 @@ public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning private boolean overwrite; private LakeSoulRowLevelModificationScanContext modificationContext; + public LakeSoulRowLevelModificationScanContext getModificationContext() { + return modificationContext; + } + public LakeSoulTableSink(String summaryName, String tableName, DataType dataType, List primaryKeyList, List partitionKeyList, ReadableConfig flinkConf, ResolvedSchema schema) { this.summaryName = summaryName; @@ -114,7 +123,7 @@ private DataStreamSink createStreamingSink(DataStream dataStream, Co throws IOException { if (modificationContext != null) { - if (modificationContext.getRemainingPartitions() != null) { + if (modificationContext.getPartitionFilters() != null) { flinkConf.set(DML_TYPE, PARTITION_DELETE); } } @@ -154,7 +163,7 @@ public DynamicTableSink copy() { @Override public String asSummaryString() { - return "lakeSoul table sink"; + return "LakeSoul Table Sink"; } @Override @@ -166,6 +175,16 @@ public void applyOverwrite(boolean newOverwrite) { public void applyStaticPartition(Map map) { } + private boolean isDelete() { + LakeSoulRowLevelModificationScanContext context = getModificationContext(); + return context != null && context.isDelete(); + } + + private boolean isUpdate() { + LakeSoulRowLevelModificationScanContext context = getModificationContext(); + return context != null && context.isUpdate(); + } + @Override public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java index d640e44d2..12afc3922 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java @@ -7,9 +7,13 @@ import com.dmetasoul.lakesoul.meta.DBConfig; import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.JniWrapper; import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import io.substrait.expression.Expression; import io.substrait.proto.Plan; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.lakesoul.source.LakeSoulSource; import org.apache.flink.lakesoul.substrait.SubstraitFlinkUtil; @@ -21,10 +25,10 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.runtime.arrow.ArrowUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -37,8 +41,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil.*; + public class LakeSoulTableSource - implements SupportsFilterPushDown, SupportsPartitionPushDown, SupportsProjectionPushDown, ScanTableSource, + implements SupportsFilterPushDown, SupportsProjectionPushDown, ScanTableSource, SupportsRowLevelModificationScan { private static final Logger LOG = LoggerFactory.getLogger(LakeSoulTableSource.class); @@ -48,45 +54,52 @@ public class LakeSoulTableSource protected TableId tableId; - protected RowType rowType; + protected RowType tableRowType; - protected boolean isStreaming; + protected boolean isBounded; protected List pkColumns; + protected List partitionColumns; + protected int[][] projectedFields; protected Map optionParams; protected List> remainingPartitions; - protected Plan filter; + protected Plan pushedFilters; protected LakeSoulRowLevelModificationScanContext modificationContext; + protected Plan partitionFilters; + public LakeSoulTableSource(TableId tableId, RowType rowType, boolean isStreaming, List pkColumns, + List partitionColumns, Map optionParams) { this.tableId = tableId; - this.rowType = rowType; - this.isStreaming = isStreaming; + this.tableRowType = rowType; + this.isBounded = isStreaming; this.pkColumns = pkColumns; + this.partitionColumns = partitionColumns; this.optionParams = optionParams; this.modificationContext = null; } @Override public DynamicTableSource copy() { - LakeSoulTableSource lsts = new LakeSoulTableSource(this.tableId, - this.rowType, - this.isStreaming, + LakeSoulTableSource newInstance = new LakeSoulTableSource(this.tableId, + this.tableRowType, + this.isBounded, this.pkColumns, + this.partitionColumns, this.optionParams); - lsts.projectedFields = this.projectedFields; - lsts.remainingPartitions = this.remainingPartitions; - lsts.filter = this.filter; - return lsts; + newInstance.projectedFields = this.projectedFields; + newInstance.remainingPartitions = this.remainingPartitions; + newInstance.pushedFilters = this.pushedFilters; + return newInstance; } @Override @@ -98,57 +111,98 @@ public String asSummaryString() { public Result applyFilters(List filters) { // first we filter out partition filter conditions LOG.info("Applying filters to native io: {}", filters); - List remainingFilters = new ArrayList<>(); + List completePartitionFilters = new ArrayList<>(); + List partialPartitionFilters = new ArrayList<>(); List nonPartitionFilters = new ArrayList<>(); DBManager dbManager = new DBManager(); TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableId.table(), tableId.schema()); + List allPartitionInfo = dbManager.getAllPartitionInfo(tableInfo.getTableId()); + DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); Set partitionCols = new HashSet<>(partitionKeys.rangeKeys); for (ResolvedExpression filter : filters) { - if (SubstraitFlinkUtil.filterContainsPartitionColumn(filter, partitionCols)) { - remainingFilters.add(filter); + if (SubstraitFlinkUtil.filterAllPartitionColumn(filter, partitionCols)) { + completePartitionFilters.add(filter); +// } else if (SubstraitFlinkUtil.filterContainsPartitionColumn(filter, partitionCols)) { +// partialPartitionFilters.add(filter); } else { nonPartitionFilters.add(filter); } } + LOG.info("completePartitionFilters: {}", completePartitionFilters); + LOG.info("partialPartitionFilters: {}", partialPartitionFilters); + LOG.info("nonPartitionFilters: {}", nonPartitionFilters); + + // find acceptable non partition filters - Tuple2 filterPushDownResult = SubstraitFlinkUtil.flinkExprToSubStraitPlan(nonPartitionFilters, - remainingFilters, tableInfo.getTableName()); - this.filter = filterPushDownResult.f1; - LOG.info("Applied filters to native io: {}, accepted {}, remaining {}", this.filter, - filterPushDownResult.f0.getAcceptedFilters(), - filterPushDownResult.f0.getRemainingFilters()); - LOG.info("FilterPlan: {}", this.filter); - return filterPushDownResult.f0; - } + Tuple2 pushDownResultAndSubstraitExpr = SubstraitFlinkUtil.flinkExprToSubStraitExpr( + nonPartitionFilters + ); - @Override - public Optional>> listPartitions() { - List allPartitionInfo = listPartitionInfo(); - List> partitions = new ArrayList<>(); - for (PartitionInfo info : allPartitionInfo) { - if (!info.getPartitionDesc().equals(DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC)) { - partitions.add(DBUtil.parsePartitionDesc(info.getPartitionDesc())); + LOG.info("Applied filters to native io: {}, accepted {}, remaining {}", this.pushedFilters, + pushDownResultAndSubstraitExpr.f0.getAcceptedFilters(), + pushDownResultAndSubstraitExpr.f0.getRemainingFilters()); + + this.pushedFilters = substraitExprToProto(pushDownResultAndSubstraitExpr.f1, tableInfo.getTableName()); + setModificationContextNonPartitionFilter(this.pushedFilters); + + if (!completePartitionFilters.isEmpty()) { + + Tuple2 substraitPartitionExpr = SubstraitFlinkUtil.flinkExprToSubStraitExpr( + completePartitionFilters + ); + Expression partitionFilter = substraitPartitionExpr.f1; + if (isDelete()) { + partitionFilter = not(partitionFilter); } - } - return Optional.of(partitions); - } + this.partitionFilters = substraitExprToProto(partitionFilter, tableInfo.getTableName()); + Schema tableSchema = ArrowUtils.toArrowSchema(tableRowType); + List partitionFields = partitionColumns.stream().map(tableSchema::findField).collect(Collectors.toList()); + + Schema partitionSchema = new Schema(partitionFields); + List remainingPartitionInfo = applyPartitionFilters(allPartitionInfo, partitionSchema, this.partitionFilters); + remainingPartitions = partitionInfoToPartitionMap(remainingPartitionInfo); + + setModificationContextSourcePartitions(JniWrapper.newBuilder().addAllPartitionInfo(remainingPartitionInfo).build()); + setModificationContextPartitionFilter(this.partitionFilters); + - @Override - public void applyPartitions(List> remainingPartitions) { - if (isDelete()) { - this.remainingPartitions = complementPartition(remainingPartitions); - getModificationContext().setRemainingPartitions(this.remainingPartitions); - } else { - this.remainingPartitions = remainingPartitions; } - LOG.info("Applied partitions to native io: {}", this.remainingPartitions); + + return pushDownResultAndSubstraitExpr.f0; } private boolean isDelete() { LakeSoulRowLevelModificationScanContext context = getModificationContext(); - return context != null && context.getType() == RowLevelModificationType.DELETE; + return context != null && context.isDelete(); + } + + private void setModificationContextPartitionFilter(Plan filter) { + if (modificationContext != null) { + modificationContext.setPartitionFilters(filter); + } + } + + private void setModificationContextNonPartitionFilter(Plan filter) { + if (modificationContext != null) { + modificationContext.setNonPartitionFilters(filter); + } + } + + private void setModificationContextSourcePartitions(JniWrapper sourcePartitions) { + if (modificationContext != null) { + modificationContext.setSourcePartitionInfo(sourcePartitions); + } + } + + private List> partitionInfoToPartitionMap(List partitionInfoList) { + List> partitions = new ArrayList<>(); + for (PartitionInfo info : partitionInfoList) { + String partitionDesc = info.getPartitionDesc(); + partitions.add(DBUtil.parsePartitionDesc(partitionDesc)); + } + return partitions; } private List> complementPartition(List> remainingPartitions) { @@ -183,28 +237,28 @@ private List listPartitionInfo() { private int[] getFieldIndexs() { return (projectedFields == null || projectedFields.length == 0) ? - IntStream.range(0, this.rowType.getFieldCount()).toArray() : + IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray(); } protected RowType readFields() { int[] fieldIndexs = getFieldIndexs(); - return RowType.of(Arrays.stream(fieldIndexs).mapToObj(this.rowType::getTypeAt).toArray(LogicalType[]::new), - Arrays.stream(fieldIndexs).mapToObj(this.rowType.getFieldNames()::get).toArray(String[]::new)); + return RowType.of(Arrays.stream(fieldIndexs).mapToObj(this.tableRowType::getTypeAt).toArray(LogicalType[]::new), + Arrays.stream(fieldIndexs).mapToObj(this.tableRowType.getFieldNames()::get).toArray(String[]::new)); } private RowType readFieldsAddPk(String cdcColumn) { int[] fieldIndexs = getFieldIndexs(); List projectTypes = - Arrays.stream(fieldIndexs).mapToObj(this.rowType::getTypeAt).collect(Collectors.toList()); + Arrays.stream(fieldIndexs).mapToObj(this.tableRowType::getTypeAt).collect(Collectors.toList()); List projectNames = - Arrays.stream(fieldIndexs).mapToObj(this.rowType.getFieldNames()::get).collect(Collectors.toList()); + Arrays.stream(fieldIndexs).mapToObj(this.tableRowType.getFieldNames()::get).collect(Collectors.toList()); List pkNamesNotExistInReadFields = new ArrayList<>(); List pkTypesNotExistInReadFields = new ArrayList<>(); for (String pk : pkColumns) { if (!projectNames.contains(pk)) { pkNamesNotExistInReadFields.add(pk); - pkTypesNotExistInReadFields.add(this.rowType.getTypeAt(rowType.getFieldIndex(pk))); + pkTypesNotExistInReadFields.add(this.tableRowType.getTypeAt(tableRowType.getFieldIndex(pk))); } } projectNames.addAll(pkNamesNotExistInReadFields); @@ -220,9 +274,9 @@ private RowType readFieldsAddPk(String cdcColumn) { @Override public ChangelogMode getChangelogMode() { boolean isCdc = !optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, "").isEmpty(); - if (this.isStreaming && isCdc) { + if (!this.isBounded && isCdc) { return ChangelogMode.upsert(); - } else if (this.isStreaming && !this.pkColumns.isEmpty()) { + } else if (!this.isBounded && !this.pkColumns.isEmpty()) { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.UPDATE_AFTER) @@ -234,31 +288,39 @@ public ChangelogMode getChangelogMode() { } @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { String cdcColumn = optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""); + return SourceProvider.of( new LakeSoulSource(this.tableId, + this.tableRowType, readFields(), readFieldsAddPk(cdcColumn), - this.isStreaming, + this.isBounded, this.pkColumns, + this.partitionColumns, this.optionParams, this.remainingPartitions, - this.filter)); + this.pushedFilters, + this.partitionFilters + )); } @Override public String toString() { return "LakeSoulTableSource{" + "tableId=" + tableId + - ", rowType=" + rowType + - ", isStreaming=" + isStreaming + + ", tableRowType=" + tableRowType + + ", isBounded=" + isBounded + ", pkColumns=" + pkColumns + + ", partitionColumns=" + partitionColumns + ", projectedFields=" + Arrays.toString(projectedFields) + ", optionParams=" + optionParams + ", remainingPartitions=" + remainingPartitions + - ", filter=" + filter + + ", pushedFilters=" + pushedFilters + + ", modificationContext=" + modificationContext + + ", partitionFilters=" + partitionFilters + '}'; } @@ -268,8 +330,8 @@ public RowLevelModificationScanContext applyRowLevelModificationScan( @Nullable RowLevelModificationScanContext previousContext) { if (previousContext == null || previousContext instanceof LakeSoulRowLevelModificationScanContext) { - // TODO: 2024/3/22 partiontion pruning should be handled this.modificationContext = new LakeSoulRowLevelModificationScanContext(rowLevelModificationType, listPartitionInfo()); + return modificationContext; } throw new RuntimeException("LakeSoulTableSource.applyRowLevelModificationScan only supports LakeSoulRowLevelModificationScanContext"); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index 36129058c..9cc4e9718 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -456,19 +456,18 @@ public static DataFileInfo[] getTargetDataFileInfo(TableInfo tif, List>> splitDataInfosToRangeAndHashPartition(String tid, - DataFileInfo[] dfinfos) { + public static Map>> splitDataInfosToRangeAndHashPartition(TableInfo tableInfo, + DataFileInfo[] dataFileInfoArray) { Map>> splitByRangeAndHashPartition = new LinkedHashMap<>(); - TableInfo tif = DataOperation.dbManager().getTableInfoByTableId(tid); - for (DataFileInfo pif : dfinfos) { - if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) - .computeIfAbsent(pif.file_bucket_id(), v -> new ArrayList<>()) - .add(new Path(pif.path())); + for (DataFileInfo dataFileInfo : dataFileInfoArray) { + if (isExistHashPartition(tableInfo) && dataFileInfo.file_bucket_id() != -1) { + splitByRangeAndHashPartition.computeIfAbsent(dataFileInfo.range_partitions(), k -> new LinkedHashMap<>()) + .computeIfAbsent(dataFileInfo.file_bucket_id(), v -> new ArrayList<>()) + .add(new Path(dataFileInfo.path())); } else { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) + splitByRangeAndHashPartition.computeIfAbsent(dataFileInfo.range_partitions(), k -> new LinkedHashMap<>()) .computeIfAbsent(-1, v -> new ArrayList<>()) - .add(new Path(pif.path())); + .add(new Path(dataFileInfo.path())); } } return splitByRangeAndHashPartition; diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java index cc4b614d2..0e8c6d0e8 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java @@ -231,11 +231,11 @@ public void testLakeSoulTableSinkDeleteWithParallelismInBatch() { "\n" + "== Optimized Physical Plan ==\n" + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col, part])\n" + - "+- TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\n" + + "+- TableSourceScan(table=[[lakesoul, db1, test_table, filter=[]]], fields=[id, real_col, part])\n" + "\n" + "== Optimized Execution Plan ==\n" + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col, part])\n" + - "+- TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\n" + + "+- TableSourceScan(table=[[lakesoul, db1, test_table, filter=[]]], fields=[id, real_col, part])\n" + "\n" + "== Physical Execution Plan ==\n" + "{\n" + @@ -243,7 +243,7 @@ public void testLakeSoulTableSinkDeleteWithParallelismInBatch() { " \"id\" : ,\n" + " \"type\" : \"Source: test_table[]\",\n" + " \"pact\" : \"Data Source\",\n" + - " \"contents\" : \"[]:TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\",\n" + + " \"contents\" : \"[]:TableSourceScan(table=[[lakesoul, db1, test_table, filter=[]]], fields=[id, real_col, part])\",\n" + " \"parallelism\" : 2\n" + " }, {\n" + " \"id\" : ,\n" + diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index 0ddac6aa8..fef137379 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -236,6 +236,42 @@ public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedE TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); } + @Test + public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceTableUserWithRange(tEnv); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + try { + // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called + tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and score > 60").await(); + } catch (Throwable e) { + System.out.println("Unsupported DELETE SQL"); + } + StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); + String testSelect = "select * from user_info_1"; + TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); + } + + @Test + public void testDeletePartitionedCdcTable() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulPartitionedCDCSourceTableUser(tEnv); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + try { + // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called + tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await(); + } catch (Throwable e) { + System.out.println("Unsupported DELETE SQL"); + } + StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); + String testSelect = "select * from user_info_1"; + TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); + } + private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create table user_info_2 (" + " order_id INT," + @@ -294,4 +330,20 @@ private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws Exe tEnvs.executeSql(createUserSql); } + private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { + String createUserSql = "create table user_info (" + + " order_id INT," + + " name STRING PRIMARY KEY NOT ENFORCED," + + " score DECIMAL" + + ") PARTITIONED BY ( order_id )" + + " WITH (" + + " 'format'='lakesoul'," + + " 'hashBucketNum'='2'," + + " 'use_cdc'='true'," + + " 'path'='" + getTempDirUri("/lakeSource/user") + + "' )"; + tEnvs.executeSql("DROP TABLE if exists user_info"); + tEnvs.executeSql(createUserSql); + } + } diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/StreamReadSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/StreamReadSuite.java index d25cbdfc6..a3d55b6bb 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/StreamReadSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/StreamReadSuite.java @@ -83,7 +83,7 @@ public void testLakesoulSourceIncrementalStream() { } @Test - public void testLakesoulSourceSelectMultiRangeAndHash() { + public void testLakesoulSourceSelectMultiRangeAndHashConditionAnd() { for (Tuple2 tup : BUCKET_NUM_AND_PARALLELISM) { int hashBucketNum = tup.f0; int parallelism = tup.f1; @@ -145,7 +145,78 @@ public void testLakesoulSourceSelectMultiRangeAndHash() { } }, - "[+I[1, Bob, 90, 1995-10-01, UK], +I[3, Amy, 95, 1995-10-10, UK]]", + "[+I[3, Amy, 95, 1995-10-10, UK]]", + 20L + ); + } + } + + @Test + public void testLakesoulSourceSelectMultiRangeAndHashConditionOr() { + for (Tuple2 tup : BUCKET_NUM_AND_PARALLELISM) { + int hashBucketNum = tup.f0; + int parallelism = tup.f1; + System.out.println( + "testLakesoulSourceSelectMultiRangeAndHash with hashBucketNum=" + hashBucketNum + ", parallelism=" + + parallelism); + TableEnvironment createTableEnv = LakeSoulTestUtils.createTableEnvInBatchMode(); + LakeSoulCatalog lakeSoulCatalog = LakeSoulTestUtils.createLakeSoulCatalog(true); + LakeSoulTestUtils.registerLakeSoulCatalog(createTableEnv, lakeSoulCatalog); + + String createSql = "create table user_multi (" + + " `id` INT," + + " name STRING," + + " score INT," + + " `date` DATE," + + " region STRING," + + "PRIMARY KEY (`id`,`name`) NOT ENFORCED" + + ") " + + "PARTITIONED BY (`region`,`date`)" + + "WITH (" + + " 'connector'='lakesoul'," + + String.format(" 'hashBucketNum'='%d',", hashBucketNum) + + " 'path'='" + getTempDirUri("/lakeSource/multi_range_hash") + + "' )"; + createTableEnv.executeSql("DROP TABLE if exists user_multi"); + createTableEnv.executeSql(createSql); + + String testMultiRangeSelect = "select id, name from user_multi" + + " /*+ OPTIONS('discoveryinterval'='1000')*/ " + + "where (`region`='UK' ) or score > 80"; + + + StreamTableEnvironment tEnvs = LakeSoulTestUtils.createTableEnvInStreamingMode( + LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 1000L, 1000L), parallelism); + LakeSoulTestUtils.registerLakeSoulCatalog(tEnvs, lakeSoulCatalog); + LakeSoulTestUtils.checkStreamingQueryAnswer( + tEnvs, + "user_multi", + testMultiRangeSelect, + " `id` INT," + + " name STRING", + +// " name STRING," + +// " score INT," + +// " `date` DATE," + +// " region STRING," + +// "PRIMARY KEY (`id`,`name`) NOT ENFORCED", + (s) -> { + try { + createTableEnv.executeSql( + "INSERT INTO user_multi VALUES (1, 'Bob', 90, TO_DATE('1995-10-01'), 'China'), (2, 'Alice', 80, TO_DATE('1995-10-10'), 'China')") + .await(); + createTableEnv.executeSql( + "INSERT INTO user_multi VALUES (3, 'Jack', 75, TO_DATE('1995-10-15'), 'China')") + .await(); + createTableEnv.executeSql( + "INSERT INTO user_multi VALUES (3, 'Amy', 95, TO_DATE('1995-10-10'),'UK'), (4, 'Mike', 70, TO_DATE('1995-10-15'), 'UK')") + .await(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + + }, + "[+I[1, Bob], +I[3, Amy], +I[4, Mike]]", 20L ); } diff --git a/native-io/lakesoul-io-java/pom.xml b/native-io/lakesoul-io-java/pom.xml index 8bd0f8bd5..dbcb7221d 100644 --- a/native-io/lakesoul-io-java/pom.xml +++ b/native-io/lakesoul-io-java/pom.xml @@ -29,10 +29,16 @@ SPDX-License-Identifier: Apache-2.0 3.1.0 0.28.0 3.22.0 + 0.6.1 + + com.dmetasoul + lakesoul-common + ${revision} + org.apache.arrow arrow-vector diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java index f2005251b..ecd958397 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java @@ -65,6 +65,14 @@ public NativeIOBase(String allocatorName) { libLakeSoulIO.rust_logger_init(); } + public ObjectReferenceManager getIntReferenceManager() { + return intReferenceManager; + } + + public ObjectReferenceManager getBoolReferenceManager() { + return boolReferenceManager; + } + public void setExternalAllocator(BufferAllocator allocator) { this.allocator = allocator; } @@ -104,6 +112,16 @@ public void setSchema(Schema schema) { ffiSchema.close(); } + public void setPartitionSchema(Schema schema) { + assert ioConfigBuilder != null; + ArrowSchema ffiSchema = ArrowSchema.allocateNew(allocator); + CDataDictionaryProvider tmpProvider = new CDataDictionaryProvider(); + Data.exportSchema(allocator, schema, tmpProvider, ffiSchema); + ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_partition_schema(ioConfigBuilder, ffiSchema.memoryAddress()); + tmpProvider.close(); + ffiSchema.close(); + } + public void setThreadNum(int threadNum) { assert ioConfigBuilder != null; ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_thread_num(ioConfigBuilder, threadNum); diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java index 60c2feba7..c3b1b6a39 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java @@ -45,6 +45,8 @@ public interface LibLakeSoulIO { Pointer lakesoul_config_builder_set_schema(Pointer builder, @LongLong long schemaAddr); + Pointer lakesoul_config_builder_set_partition_schema(Pointer builder, @LongLong long schemaAddr); + Pointer lakesoul_config_builder_set_object_store_option(Pointer builder, String key, String value); Pointer lakesoul_config_builder_set_thread_num(Pointer builder, int thread_num); @@ -99,8 +101,12 @@ interface IntegerCallback { // type representing callback void free_tokio_runtime(Pointer runtime); + Pointer apply_partition_filter(IntegerCallback callback, int pbLen, long jniWrapperAddr, long schemaAddr, int filterLen, long filterAddr); + void export_bytes_result(BooleanCallback booleanCallback, Pointer bytes, Integer len, @LongLong long addr); + void free_bytes_result(Pointer bytes); + void rust_logger_init(); } diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java index 9a7a80495..dbf2718b7 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java @@ -1,18 +1,41 @@ package com.dmetasoul.lakesoul.lakesoul.io.substrait; +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOBase; +import com.dmetasoul.lakesoul.lakesoul.io.jnr.JnrLoader; +import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO; +import com.dmetasoul.lakesoul.lakesoul.memory.ArrowMemoryUtils; +import com.dmetasoul.lakesoul.meta.entity.JniWrapper; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient; +import com.google.protobuf.InvalidProtocolBufferException; import io.substrait.dsl.SubstraitBuilder; import io.substrait.expression.Expression; +import io.substrait.expression.ExpressionCreator; import io.substrait.expression.proto.ExpressionProtoConverter; import io.substrait.extension.SimpleExtension; import io.substrait.plan.Plan; import io.substrait.plan.PlanProtoConverter; import io.substrait.relation.NamedScan; import io.substrait.type.Type; +import io.substrait.type.TypeCreator; +import jnr.ffi.Pointer; +import jnr.ffi.Runtime; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.CDataDictionaryProvider; +import org.apache.arrow.c.Data; +import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -23,16 +46,43 @@ public class SubstraitUtil { public static final String CompNamespace = "/functions_comparison.yaml"; public static final String BooleanNamespace = "/functions_boolean.yaml"; + private static final LibLakeSoulIO LIB; + + private static final Pointer BUFFER1; + private static final Pointer BUFFER2; + + private static final NativeIOBase NATIVE_IO_BASE; + + private static final long TIMEOUT = 2000; + private static final ReentrantReadWriteLock LOCK; + static { try { EXTENSIONS = SimpleExtension.loadDefaults(); BUILDER = new SubstraitBuilder(EXTENSIONS); + LIB = JnrLoader.get(); + BUFFER1 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(4096); + BUFFER2 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(4096); + LOCK = new ReentrantReadWriteLock(); + NATIVE_IO_BASE = new NativeIOBase("Substrait"); } catch (IOException e) { throw new RuntimeException("load simple extension failed"); } } + public static Expression and(Expression left, Expression right) { + SimpleExtension.FunctionAnchor fa = SimpleExtension.FunctionAnchor.of(BooleanNamespace, "and:bool"); + return ExpressionCreator.scalarFunction(EXTENSIONS.getScalarFunction(fa), TypeCreator.NULLABLE.BOOLEAN, left, right); + } + + public static Expression not(Expression expression) { + SimpleExtension.FunctionAnchor fa = SimpleExtension.FunctionAnchor.of(BooleanNamespace, "not:bool"); + return ExpressionCreator.scalarFunction(EXTENSIONS.getScalarFunction(fa), TypeCreator.NULLABLE.BOOLEAN, expression); + } + public static io.substrait.proto.Plan substraitExprToProto(Expression e, String tableName) { + return planToProto(exprToFilter(e, tableName)); + } public static Plan exprToFilter(Expression e, String tableName) { if (e == null) { @@ -66,5 +116,84 @@ public static io.substrait.proto.Plan planToProto(Plan plan) { } return new PlanProtoConverter().toProto(plan); } + + public static List applyPartitionFilters(List allPartitionInfo, Schema schema, io.substrait.proto.Plan partitionFilter) { + if (allPartitionInfo.isEmpty()) { + return Collections.emptyList(); + } + if (partitionFilter == null) { + return allPartitionInfo; + } + List resultPartitionInfo = allPartitionInfo; + ArrowSchema ffiSchema = ArrowSchema.allocateNew(ArrowMemoryUtils.rootAllocator); + CDataDictionaryProvider tmpProvider = new CDataDictionaryProvider(); + Data.exportSchema(ArrowMemoryUtils.rootAllocator, schema, tmpProvider, ffiSchema); + + + JniWrapper jniWrapper = JniWrapper.newBuilder().addAllPartitionInfo(allPartitionInfo).build(); + + byte[] jniBytes = jniWrapper.toByteArray(); + BUFFER1.put(0, jniBytes, 0, jniBytes.length); + BUFFER1.putByte(jniBytes.length, (byte) 0); + + byte[] filterBytes = partitionFilter.toByteArray(); + BUFFER2.put(0, filterBytes, 0, filterBytes.length); + BUFFER2.putByte(filterBytes.length, (byte) 0); + + try { + final CompletableFuture filterFuture = new CompletableFuture<>(); + Pointer filterResult = LIB.apply_partition_filter( + new NativeIOBase.IntegerCallback((resultLen, msg) -> { + if (msg == null || msg.isEmpty()) { + filterFuture.complete(resultLen); + } else { + filterFuture.completeExceptionally(new SQLException(msg)); + } + }, NATIVE_IO_BASE.getIntReferenceManager()), + jniBytes.length, BUFFER1.address(), + ffiSchema.memoryAddress(), + filterBytes.length, + BUFFER2.address() + ); + Integer len = null; + len = filterFuture.get(TIMEOUT, TimeUnit.MILLISECONDS); + if (len < 0) return null; + Integer lenWithTail = len + 1; + + final CompletableFuture importFuture = new CompletableFuture<>(); + LIB.export_bytes_result( + new NativeIOBase.BooleanCallback((result, msg) -> { + if (msg == null || msg.isEmpty()) { + importFuture.complete(result); + } else { + importFuture.completeExceptionally(new SQLException(msg)); + } + }, NATIVE_IO_BASE.getBoolReferenceManager()), + filterResult, + len, + BUFFER1.address() + ); + Boolean b = importFuture.get(TIMEOUT, TimeUnit.MILLISECONDS); + if (!b) return null; + + byte[] bytes = new byte[len]; + BUFFER1.get(0, bytes, 0, len); + resultPartitionInfo = JniWrapper.parseFrom(bytes).getPartitionInfoList(); + LIB.free_bytes_result(filterResult); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } finally { + tmpProvider.close(); + ffiSchema.close(); + } + + return resultPartitionInfo; + } } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 4c1c0cfa7..8ec2719e1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1831,6 +1831,7 @@ dependencies = [ "lakesoul-io", "log", "prost", + "proto", "serde", "serde_json", "tokio", diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs index 435cedce4..f8e3c2115 100644 --- a/rust/lakesoul-datafusion/src/catalog/mod.rs +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -46,7 +46,7 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co .ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?, table_name ), - table_schema: serde_json::to_string::(&config.schema().into())?, + table_schema: serde_json::to_string::(&config.target_schema().into())?, table_namespace: "default".to_string(), properties: serde_json::to_string(&LakeSoulTableProperty { hash_bucket_num: Some(4), diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index 76a11f435..12a49611a 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -147,7 +147,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat { let mut column_nullable = HashSet::::new(); for config in &flatten_conf { - let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(&config)?; + let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(config)?; let partition_columnar_value = Arc::new(partition_columnar_value); let parquet_exec = Arc::new(ParquetExec::new(config.clone(), predicate.clone(), self.parquet_format.metadata_size_hint(state.config_options()))); diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 5456f37ce..80ea8bbd9 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -180,15 +180,15 @@ impl LakeSoulTableProvider { }, }) } else { - return Err(DataFusionError::Plan( + Err(DataFusionError::Plan( // Return an error if schema of the input query does not match with the table schema. format!("Expected single column references in output_ordering, got {}", expr) - )); + )) } } else { - return Err(DataFusionError::Plan( + Err(DataFusionError::Plan( format!("Expected Expr::Sort in output_ordering, but got {}", expr) - )); + )) } }) .collect::>>()?; @@ -314,7 +314,7 @@ impl TableProvider for LakeSoulTableProvider { None }; - let object_store_url = if let Some(url) = self.listing_table.table_paths().get(0) { + let object_store_url = if let Some(url) = self.listing_table.table_paths().first() { url.object_store() } else { return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))); diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index c1d2707fc..9ebfe27a6 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -136,7 +136,7 @@ where I: IntoIterator, { let mut part_values = vec![]; - for (part, pn) in partition_desc.split(",").zip(table_partition_cols) { + for (part, pn) in partition_desc.split(',').zip(table_partition_cols) { match part.split_once('=') { Some((name, val)) if name == pn => part_values.push(val), _ => { diff --git a/rust/lakesoul-datafusion/src/planner/physical_planner.rs b/rust/lakesoul-datafusion/src/planner/physical_planner.rs index a5a830574..9d5e008a1 100644 --- a/rust/lakesoul-datafusion/src/planner/physical_planner.rs +++ b/rust/lakesoul-datafusion/src/planner/physical_planner.rs @@ -62,17 +62,17 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner { Ok(provider) => { let physical_input = self.create_physical_plan(input, session_state).await?; - if lakesoul_table.primary_keys().is_empty() { - if !lakesoul_table + if lakesoul_table.primary_keys().is_empty() + && !lakesoul_table .schema() .logically_equivalent_names_and_types(&Schema::from(input.schema().as_ref())) - { - return Err(DataFusionError::Plan( - // Return an error if schema of the input query does not match with the table schema. - "Inserting query must have the same schema with the table.".to_string(), - )); - } - } + { + return Err(DataFusionError::Plan( + // Return an error if schema of the input query does not match with the table schema. + "Inserting query must have the same schema with the table.".to_string(), + )); + } + let physical_input = if !lakesoul_table.primary_keys().is_empty() || !lakesoul_table.range_partitions().is_empty() { let input_schema = physical_input.schema(); let input_dfschema = input.as_ref().schema(); diff --git a/rust/lakesoul-io-c/Cargo.toml b/rust/lakesoul-io-c/Cargo.toml index 3c71d088e..5f935b498 100644 --- a/rust/lakesoul-io-c/Cargo.toml +++ b/rust/lakesoul-io-c/Cargo.toml @@ -17,7 +17,8 @@ tokio = { version = "1", features = ["full"] } serde_json = "1.0" serde = { version = "1.0", default-features = false, features = ["derive", "std"], optional = true } bytes = { workspace = true } -prost = "0.12.3" +proto = { path = "../proto" } +prost = { workspace = true } log = "0.4.20" env_logger = "0.11" diff --git a/rust/lakesoul-io-c/lakesoul_c_bindings.h b/rust/lakesoul-io-c/lakesoul_c_bindings.h index 043dd2f96..9af348694 100644 --- a/rust/lakesoul-io-c/lakesoul_c_bindings.h +++ b/rust/lakesoul-io-c/lakesoul_c_bindings.h @@ -81,6 +81,9 @@ IOConfigBuilder *lakesoul_config_builder_add_filter_proto(IOConfigBuilder *build IOConfigBuilder *lakesoul_config_builder_set_schema(IOConfigBuilder *builder, c_ptrdiff_t schema_addr); +IOConfigBuilder *lakesoul_config_builder_set_partition_schema(IOConfigBuilder *builder, + c_ptrdiff_t schema_addr); + IOConfigBuilder *lakesoul_config_builder_set_thread_num(IOConfigBuilder *builder, c_size_t thread_num); @@ -183,6 +186,15 @@ TokioRuntime *create_tokio_runtime_from_builder(TokioRuntimeBuilder *builder); void free_tokio_runtime(CResult *runtime); +CResult *apply_partition_filter(void (*callback)(int32_t, const char*), + int32_t len, + c_ptrdiff_t jni_wrapper_addr, + FFI_ArrowSchema *schema_addr, + int32_t filter_len, + c_ptrdiff_t filter_addr); + +void free_bytes_result(CResult *bytes); + void rust_logger_init(); } // extern "C" diff --git a/rust/lakesoul-io-c/src/lib.rs b/rust/lakesoul-io-c/src/lib.rs index 562f3b5a5..6a67c28db 100644 --- a/rust/lakesoul-io-c/src/lib.rs +++ b/rust/lakesoul-io-c/src/lib.rs @@ -17,16 +17,18 @@ use bytes::BufMut; use arrow::array::Array; pub use arrow::array::StructArray; -use arrow::datatypes::Schema; +use arrow::datatypes::{Schema, SchemaRef}; use arrow::ffi::from_ffi; pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use datafusion_substrait::substrait::proto::Plan; use prost::Message; use tokio::runtime::{Builder, Runtime}; +use proto::proto::entity; use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder}; use lakesoul_io::lakesoul_reader::{LakeSoulReader, RecordBatch, Result, SyncSendableMutableLakeSoulReader}; use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; +use lakesoul_io::helpers; use log::debug; #[repr(C)] @@ -201,6 +203,21 @@ pub extern "C" fn lakesoul_config_builder_set_schema( } } +#[no_mangle] +pub extern "C" fn lakesoul_config_builder_set_partition_schema( + builder: NonNull, + schema_addr: c_ptrdiff_t, +) -> NonNull { + unsafe { + let ffi_schema = schema_addr as *mut FFI_ArrowSchema; + let schema_data = std::ptr::replace(ffi_schema, FFI_ArrowSchema::empty()); + let schema = Schema::try_from(&schema_data).unwrap(); + convert_to_opaque( + from_opaque::(builder).with_partition_schema(Arc::new(schema)), + ) + } +} + #[no_mangle] pub extern "C" fn lakesoul_config_builder_set_thread_num( builder: NonNull, @@ -843,6 +860,52 @@ pub extern "C" fn free_tokio_runtime(runtime: NonNull>) { from_nonnull(runtime).free::(); } +#[no_mangle] +pub extern "C" fn apply_partition_filter( + callback: extern "C" fn(i32, *const c_char), + len: i32, + jni_wrapper_addr: c_ptrdiff_t, + schema_addr: *mut FFI_ArrowSchema, + filter_len:i32, + filter_addr: c_ptrdiff_t +) -> NonNull> { + let raw_parts = unsafe { + std::slice::from_raw_parts(jni_wrapper_addr as *const u8, len as usize) + }; + let wrapper = entity::JniWrapper::decode(prost::bytes::Bytes::from(raw_parts)).unwrap(); + + let dst = unsafe { + slice::from_raw_parts(filter_addr as *const u8, filter_len as usize) + }; + let filter = Plan::decode(dst).unwrap(); + + let ffi_schema = schema_addr; + let schema_data = unsafe { + std::ptr::replace(ffi_schema, FFI_ArrowSchema::empty()) + }; + let schema = SchemaRef::from(Schema::try_from(&schema_data).unwrap()); + + let filtered_partition = helpers::apply_partition_filter(wrapper, schema, filter); + + match filtered_partition { + Ok(wrapper) => { + let u8_vec = wrapper.encode_to_vec(); + let len = u8_vec.len(); + call_i32_result_callback(callback, len as i32, std::ptr::null()); + convert_to_nonnull(CResult::::new::>(u8_vec)) + } + Err(e) => { + call_i32_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()); + convert_to_nonnull(CResult::::new::>(vec![])) + } + } +} + +#[no_mangle] +pub extern "C" fn free_bytes_result(bytes: NonNull>) { + from_nonnull(bytes).free::>(); +} + #[cfg(test)] mod tests { use core::ffi::c_ptrdiff_t; diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index d0e4f2dca..d0b315d9b 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -41,7 +41,7 @@ rand = { workspace = true } half = { workspace = true } log = "0.4.20" anyhow = { workspace = true, features = [] } -prost = "0.12.3" +prost = { workspace = true } env_logger = "0.11" @@ -63,4 +63,4 @@ datafusion-substrait = { workspace = true, features = ["protoc"] } tempfile = "3.3.0" comfy-table = "6.0" whoami = "1.5" -rand = "0.8" + diff --git a/rust/lakesoul-io/src/datasource/file_format.rs b/rust/lakesoul-io/src/datasource/file_format.rs index 662ba3181..fc441d387 100644 --- a/rust/lakesoul-io/src/datasource/file_format.rs +++ b/rust/lakesoul-io/src/datasource/file_format.rs @@ -97,7 +97,7 @@ impl FileFormat for LakeSoulParquetFormat { .then(|| filters.cloned()) .flatten(); - let table_schema = LakeSoulListingTable::compute_table_schema(conf.file_schema.clone(), self.conf.schema()); + let table_schema = LakeSoulListingTable::compute_table_schema(conf.file_schema.clone(), &self.conf); let projection = conf.projection.clone(); let target_schema = project_schema(&table_schema, projection.as_ref())?; diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index 796c863e4..927b82daa 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -49,7 +49,7 @@ impl LakeSoulListingTable { let (file_schema, listing_table) = listing_table_from_lakesoul_io_config(session_state, lakesoul_io_config.clone(), file_format, as_sink).await?; let file_schema = file_schema .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?; - let table_schema = Self::compute_table_schema(file_schema, lakesoul_io_config.schema()); + let table_schema = Self::compute_table_schema(file_schema, &lakesoul_io_config); Ok(Self { listing_table, @@ -66,14 +66,19 @@ impl LakeSoulListingTable { self.listing_table.table_paths() } - pub fn compute_table_schema(file_schema: SchemaRef, target_schema: SchemaRef) -> SchemaRef { - let target_schema = uniform_schema(target_schema); + pub fn compute_table_schema(file_schema: SchemaRef, config: &LakeSoulIOConfig) -> SchemaRef { + let target_schema = uniform_schema(config.target_schema()); let mut builder = SchemaBuilder::from(target_schema.fields()); - for field in file_schema.fields() { - if target_schema.field_with_name(field.name()).is_err() { - builder.push(field.clone()); - } + for field in file_schema.fields() { + if target_schema.field_with_name(field.name()).is_err() { + builder.push(field.clone()); } + } + for field in config.partition_schema().fields() { + if target_schema.field_with_name(field.name()).is_err() { + builder.push(field.clone()); + } + } Arc::new(builder.finish()) } diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index d50818c07..e7e6a370b 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -4,25 +4,24 @@ use std::{collections::HashMap, sync::Arc}; -use arrow_array::RecordBatch; -use arrow_schema::{DataType, Schema, SchemaBuilder, SchemaRef}; +use arrow::datatypes::UInt32Type; +use arrow_array::{RecordBatch, UInt32Array}; +use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use datafusion::{ datasource::{ file_format::FileFormat, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, - physical_plan::FileScanConfig}, - execution::context::SessionState, - logical_expr::col, - physical_expr::{create_physical_expr, PhysicalSortExpr}, - physical_plan::PhysicalExpr, - physical_planner::create_physical_sort_expr + physical_plan::FileScanConfig}, execution::context::{SessionContext, SessionState}, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr }; -use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_common::{cast::as_primitive_array, DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_substrait::substrait::proto::Plan; use object_store::path::Path; +use proto::proto::entity::JniWrapper; +use rand::distributions::DistString; use url::Url; -use crate::{constant::{LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; +use crate::{constant::{LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; pub fn column_names_to_physical_sort_expr( columns: &[String], @@ -77,13 +76,13 @@ pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc Ok((range_col.clone(), scalar)), Err(e) => Err(e) } } else { - Err(datafusion::error::DataFusionError::External(format!("").into())) + Err(datafusion::error::DataFusionError::External(format!("Invalid partition desc of {}", range_col).into())) } }) .collect::>>() @@ -128,6 +127,35 @@ pub fn columnar_values_to_partition_desc(columnar_values: &Vec<(String, ScalarVa } } +pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String) -> Result> { + if partition_desc == "-5" { + Ok(vec![]) + } else { + let mut part_values = Vec::with_capacity(schema.fields().len()); + for part in partition_desc.split(',') { + match part.split_once('=') { + Some((name, val)) => { + part_values.push((name, val)); + } + _ => { + return Err(datafusion::error::DataFusionError::External(format!("Invalid partition_desc: {}", partition_desc).into())) + } + } + }; + let mut scalar_values = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + for (name, val) in part_values.iter() { + if field.name() == name { + let scalar = ScalarValue::try_from_string(val.to_string(), field.data_type())?; + scalar_values.push(scalar); + break; + } + } + } + Ok(scalar_values) + } +} + pub fn partition_desc_from_file_scan_config( conf: &FileScanConfig ) -> Result<(String, HashMap)> { @@ -141,7 +169,7 @@ pub fn partition_desc_from_file_scan_config( .iter() .enumerate() .map(|(idx, col)| { - format!("{}={}", col.name().clone(), file.partition_values[idx].to_string()) + format!("{}={}", col.name().clone(), file.partition_values[idx]) }) .collect::>() .join(","), @@ -177,7 +205,7 @@ pub async fn listing_table_from_lakesoul_io_config( // Resolve the schema let resolved_schema = infer_schema(session_state, &table_paths, Arc::clone(&file_format)).await?; - let target_schema = uniform_schema(lakesoul_io_config.schema()); + let target_schema = uniform_schema(lakesoul_io_config.target_schema()); let table_partition_cols = range_partition_to_partition_cols(target_schema.clone(), lakesoul_io_config.range_partitions_slice())?; let listing_options = ListingOptions::new(file_format.clone()) @@ -197,7 +225,7 @@ pub async fn listing_table_from_lakesoul_io_config( .with_schema(resolved_schema) } true => { - let target_schema = uniform_schema(lakesoul_io_config.schema()); + let target_schema = uniform_schema(lakesoul_io_config.target_schema()); let table_partition_cols = range_partition_to_partition_cols(target_schema.clone(), lakesoul_io_config.range_partitions_slice())?; let listing_options = ListingOptions::new(file_format.clone()) @@ -233,3 +261,77 @@ pub async fn infer_schema(sc: &SessionState, table_paths: &[ListingTableUrl], fi file_format.infer_schema(sc, &store, &objects).await } + +pub fn apply_partition_filter(wrapper: JniWrapper, schema: SchemaRef, filter: Plan) -> Result { + tokio::runtime::Runtime::new()?.block_on(async { + let context = SessionContext::default(); + let index_filed_name = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8); + let index_filed = Field::new(index_filed_name, DataType::UInt32, false); + let schema_len = schema.fields().len(); + let batch = batch_from_partition(&wrapper, schema, index_filed)?; + + let dataframe = context.read_batch(batch)?; + let df_filter = Parser::parse_proto(&filter, dataframe.schema())?; + + let results = dataframe + .filter(df_filter)? + .collect() + .await?; + + let mut partition_info = vec![]; + for result_batch in results { + for index in as_primitive_array::(result_batch.column(schema_len))?.values().iter() { + partition_info.push(wrapper.partition_info[*index as usize].clone()); + } + } + + + Ok( + JniWrapper { + partition_info, + ..Default::default() + } + ) + }) +} + +fn batch_from_partition(wrapper: &JniWrapper, schema: SchemaRef, index_field: Field) -> Result { + let scalar_values = wrapper + .partition_info + .iter() + .map(|partition_info| + partition_desc_to_scalar_values(schema.clone(), partition_info.partition_desc.clone()) + ) + .collect::>>()?; + + let mut columns = vec![vec![]; schema.fields().len()]; + + for values in scalar_values.iter() { + values + .iter() + .enumerate() + .for_each(|(index, value)| { + columns[index].push(value.clone()); + }) + } + let mut columns = columns + .iter() + .map(|values| { + ScalarValue::iter_to_array(values.clone()) + }) + .collect::>>()?; + + // Add index column + let mut fields_with_index = schema + .all_fields() + .into_iter() + .cloned() + .collect::>(); + fields_with_index.push(index_field); + let schema_with_index = SchemaRef::new(Schema::new(fields_with_index)); + columns.push( + Arc::new(UInt32Array::from((0..wrapper.partition_info.len() as u32).collect::>())) + ); + + Ok(RecordBatch::try_new(schema_with_index, columns)?) +} \ No newline at end of file diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index eb0756c1a..67d0e4984 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -76,7 +76,10 @@ pub struct LakeSoulIOConfig { pub(crate) parquet_filter_pushdown: bool, // arrow schema - pub(crate) schema: IOSchema, + pub(crate) target_schema: IOSchema, + + // arrow schema of partition columns + pub(crate) partition_schema: IOSchema, // object store related configs pub(crate) object_store_options: HashMap, @@ -100,8 +103,12 @@ pub struct LakeSoulIOConfig { } impl LakeSoulIOConfig { - pub fn schema(&self) -> SchemaRef { - self.schema.0.clone() + pub fn target_schema(&self) -> SchemaRef { + self.target_schema.0.clone() + } + + pub fn partition_schema(&self) -> SchemaRef { + self.partition_schema.0.clone() } pub fn primary_keys_slice(&self) -> &[String] { @@ -211,7 +218,12 @@ impl LakeSoulIOConfigBuilder { } pub fn with_schema(mut self, schema: SchemaRef) -> Self { - self.config.schema = IOSchema(schema); + self.config.target_schema = IOSchema(schema); + self + } + + pub fn with_partition_schema(mut self, schema: SchemaRef) -> Self { + self.config.partition_schema = IOSchema(schema); self } @@ -260,7 +272,7 @@ impl LakeSoulIOConfigBuilder { } pub fn schema(&self) -> SchemaRef { - self.config.schema() + self.config.target_schema() } pub fn primary_keys_slice(&self) -> &[String] { diff --git a/rust/lakesoul-io/src/lakesoul_reader.rs b/rust/lakesoul-io/src/lakesoul_reader.rs index 71561b7e8..96e2a02d9 100644 --- a/rust/lakesoul-io/src/lakesoul_reader.rs +++ b/rust/lakesoul-io/src/lakesoul_reader.rs @@ -47,7 +47,7 @@ impl LakeSoulReader { } pub async fn start(&mut self) -> Result<()> { - let schema: SchemaRef = self.config.schema.0.clone(); + let target_schema: SchemaRef = self.config.target_schema.0.clone(); if self.config.files.is_empty() { Err(DataFusionError::Internal( "LakeSoulReader has wrong number of file".to_string(), @@ -69,7 +69,7 @@ impl LakeSoulReader { let filters = convert_filter(&dataframe, self.config.filter_strs.clone(), self.config.filter_protos.clone())?; let stream = prune_filter_and_execute( dataframe, - schema.clone(), + target_schema.clone(), filters, self.config.batch_size, ) diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 92c6320dd..e5d85d804 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -212,7 +212,7 @@ impl MultiPartAsyncWriter { let in_mem_buf = InMemBuf(Arc::new(AtomicRefCell::new(VecDeque::::with_capacity( 16 * 1024 * 1024, // 16kb )))); - let schema = uniform_schema(config.schema.0.clone()); + let schema = uniform_schema(config.target_schema.0.clone()); let schema_projection_excluding_range = schema @@ -351,7 +351,7 @@ impl SortAsyncWriter { runtime: Arc, ) -> Result { let _ = runtime.enter(); - let schema = config.schema.0.clone(); + let schema = config.target_schema.0.clone(); let receiver_stream_builder = RecordBatchReceiverStream::builder(schema.clone(), 8); let tx = receiver_stream_builder.tx(); let recv_exec = ReceiverStreamExec::new(receiver_stream_builder, schema.clone()); @@ -362,7 +362,7 @@ impl SortAsyncWriter { // add aux sort cols to sort expr .chain(config.aux_sort_cols.iter()) .map(|pk| { - let col = Column::new_with_schema(pk.as_str(), &config.schema.0)?; + let col = Column::new_with_schema(pk.as_str(), &config.target_schema.0)?; Ok(PhysicalSortExpr { expr: Arc::new(col), options: SortOptions::default(), @@ -376,7 +376,7 @@ impl SortAsyncWriter { sort_exec } else { let proj_expr: Vec<(Arc, String)> = config - .schema + .target_schema .0 .fields .iter() @@ -385,7 +385,7 @@ impl SortAsyncWriter { // exclude aux sort cols None } else { - Some(col(f.name().as_str(), &config.schema.0).map(|e| (e, f.name().clone()))) + Some(col(f.name().as_str(), &config.target_schema.0).map(|e| (e, f.name().clone()))) } }) .collect::, String)>>>()?; @@ -491,6 +491,7 @@ impl AsyncBatchWriter for SortAsyncWriter { } } +type PartitionedWriterInfo = Arc, u64)>>>; impl PartitioningAsyncWriter { pub fn try_new( @@ -499,7 +500,7 @@ impl PartitioningAsyncWriter { runtime: Arc, ) -> Result { let _ = runtime.enter(); - let schema = config.schema.0.clone(); + let schema = config.target_schema.0.clone(); let receiver_stream_builder = RecordBatchReceiverStream::builder(schema.clone(), 8); let tx = receiver_stream_builder.tx(); let recv_exec = ReceiverStreamExec::new(receiver_stream_builder, schema.clone()); @@ -554,7 +555,7 @@ impl PartitioningAsyncWriter { // add aux sort cols to sort expr .chain(config.aux_sort_cols.iter()) .map(|pk| { - let col = Column::new_with_schema(pk.as_str(), &config.schema.0)?; + let col = Column::new_with_schema(pk.as_str(), &config.target_schema.0)?; Ok(PhysicalSortExpr { expr: Arc::new(col), options: SortOptions::default(), @@ -572,7 +573,7 @@ impl PartitioningAsyncWriter { sort_exec } else { let proj_expr: Vec<(Arc, String)> = config - .schema + .target_schema .0 .fields .iter() @@ -581,7 +582,7 @@ impl PartitioningAsyncWriter { // exclude aux sort cols None } else { - Some(col(f.name().as_str(), &config.schema.0).map(|e| (e, f.name().clone()))) + Some(col(f.name().as_str(), &config.target_schema.0).map(|e| (e, f.name().clone()))) } }) .collect::, String)>>>()?; @@ -625,7 +626,7 @@ impl PartitioningAsyncWriter { config_builder: LakeSoulIOConfigBuilder, range_partitions: Arc>, write_id: String, - partitioned_file_path_and_row_count: Arc, u64)>>>, + partitioned_file_path_and_row_count: PartitionedWriterInfo, ) -> Result { let mut data = input.execute(partition, context.clone())?; let schema_projection_excluding_range = @@ -715,7 +716,7 @@ impl PartitioningAsyncWriter { async fn await_and_summary( join_handles: Vec>>, - partitioned_file_path_and_row_count: Arc, u64)>>>, + partitioned_file_path_and_row_count: PartitionedWriterInfo, ) -> Result> { let _ = futures::future::join_all(join_handles) @@ -811,7 +812,7 @@ impl SyncSendableMutableLakeSoulWriter { // if aux sort cols exist, we need to adjust the schema of final writer // to exclude all aux sort cols let writer_schema: SchemaRef = if !config.aux_sort_cols.is_empty() { - let schema = config.schema.0.clone(); + let schema = config.target_schema.0.clone(); let proj_indices = schema .fields .iter() @@ -820,7 +821,7 @@ impl SyncSendableMutableLakeSoulWriter { .collect::>>()?; Arc::new(schema.project(proj_indices.borrow())?) } else { - config.schema.0.clone() + config.target_schema.0.clone() }; @@ -831,12 +832,12 @@ impl SyncSendableMutableLakeSoulWriter { } else if !config.primary_keys.is_empty() { // sort primary key table - writer_config.schema = IOSchema(uniform_schema(writer_schema)); + writer_config.target_schema = IOSchema(uniform_schema(writer_schema)); let writer = MultiPartAsyncWriter::try_new(writer_config).await?; Box::new(SortAsyncWriter::try_new(writer, config, runtime.clone())?) } else { // else multipart - writer_config.schema = IOSchema(uniform_schema(writer_schema)); + writer_config.target_schema = IOSchema(uniform_schema(writer_schema)); let writer = MultiPartAsyncWriter::try_new(writer_config).await?; Box::new(writer) }; diff --git a/rust/lakesoul-io/src/repartition/mod.rs b/rust/lakesoul-io/src/repartition/mod.rs index 5ad0864dc..498b64e78 100644 --- a/rust/lakesoul-io/src/repartition/mod.rs +++ b/rust/lakesoul-io/src/repartition/mod.rs @@ -152,8 +152,7 @@ impl BatchPartitioner { hash_buffer.clear(); hash_buffer.resize(batch.num_rows(), 0); - let mut range_buffer = Vec::::new(); - range_buffer.resize(batch.num_rows(), 0); + let mut range_buffer = vec![0; batch.num_rows()]; create_hashes(&hash_arrays, hash_buffer)?; create_hashes(&range_arrays, &mut range_buffer)?; @@ -164,9 +163,9 @@ impl BatchPartitioner { .collect(); for (index, (hash, range_hash)) in hash_buffer.iter().zip(range_buffer).enumerate() { - if !indices[(*hash % *partitions as u32) as usize].contains_key(&range_hash) { - indices[(*hash % *partitions as u32) as usize].insert(range_hash, UInt64Builder::with_capacity(batch.num_rows())); - } + indices[(*hash % *partitions as u32) as usize] + .entry(range_hash) + .or_insert_with(|| UInt64Builder::with_capacity(batch.num_rows())); if let Some(entry) = indices[(*hash % *partitions as u32) as usize].get_mut(&range_hash) { entry.append_value(index as u64); }