Skip to content

Commit

Permalink
[Fix] Fix LakeSoulArrowSource serialization (#499)
Browse files Browse the repository at this point in the history
* fix_substrait_loading

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix_substrait_loading

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix tableInfo serialization

Signed-off-by: zenghua <huazeng@dmetasoul.com>

---------

Signed-off-by: zenghua <huazeng@dmetasoul.com>
Co-authored-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
Ceng23333 and zenghua committed Jun 24, 2024
1 parent 09f67dc commit 0a3e6bc
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
import org.apache.flink.table.types.logical.RowType;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.INFERRING_SCHEMA;

public class LakeSoulArrowSource extends LakeSoulSource<LakeSoulArrowWrapper> {

private final TableInfo tableInfo;
private final byte[] encodedTableInfo;

public static LakeSoulArrowSource create(
String tableNamespace,
Expand Down Expand Up @@ -65,7 +62,7 @@ public static LakeSoulArrowSource create(
null,
null
);
this.tableInfo = tableInfo;
this.encodedTableInfo = tableInfo.toByteArray();
}


Expand All @@ -84,7 +81,7 @@ public SourceReader<LakeSoulArrowWrapper, LakeSoulPartitionSplit> createReader(S
conf.addAll(readerContext.getConfiguration());
return new LakeSoulSourceReader(
() -> new LakeSoulArrowSplitReader(
tableInfo,
encodedTableInfo,
conf,
this.tableRowType,
this.projectedRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class LakeSoulArrowSplitReader implements SplitReader<LakeSoulArrowWrappe
private final Queue<LakeSoulPartitionSplit> splits;
private final List<String> partitionColumns;
private final RowType tableRowType;
private final TableInfo tableInfo;
private final byte[] encodedTableInfo;
RowType projectedRowType;

RowType projectedRowTypeWithPk;
Expand All @@ -50,7 +50,7 @@ public class LakeSoulArrowSplitReader implements SplitReader<LakeSoulArrowWrappe
private LakeSoulArrowSplitRecordsReader lastSplitReader;

public LakeSoulArrowSplitReader(
TableInfo tableInfo,
byte[] encodedTableInfo,
Configuration conf,
RowType tableRowType,
RowType projectedRowType,
Expand All @@ -61,7 +61,7 @@ public LakeSoulArrowSplitReader(
List<String> partitionColumns,
Plan filter
) {
this.tableInfo = tableInfo;
this.encodedTableInfo = encodedTableInfo;
this.conf = conf;
this.splits = new ArrayDeque<>();
this.tableRowType = tableRowType;
Expand All @@ -80,7 +80,7 @@ public RecordsWithSplitIds<LakeSoulArrowWrapper> fetch() throws IOException {
close();
lastSplitReader =
new LakeSoulArrowSplitRecordsReader(
this.tableInfo,
this.encodedTableInfo,
this.conf,
Objects.requireNonNull(splits.poll()),
this.tableRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class LakeSoulArrowSplitRecordsReader implements RecordsWithSplitIds<Lake
private final Plan filter;

public LakeSoulArrowSplitRecordsReader(
TableInfo tableInfo,
byte[] encodedTableInfo,
Configuration conf,
LakeSoulPartitionSplit split,
RowType tableRowType,
Expand All @@ -96,7 +96,7 @@ public LakeSoulArrowSplitRecordsReader(
List<String> partitionColumns,
Plan filter
) throws Exception {
this.tableInfo = tableInfo;
this.tableInfo = TableInfo.parseFrom(encodedTableInfo);
this.split = split;
this.skipRecords = split.getSkipRecord();
this.conf = new Configuration(conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.apache.flink.lakesoul.test;

import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import io.substrait.extension.SimpleExtension;
import org.apache.flink.lakesoul.test.flinkSource.TestUtils;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.Test;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.TimeZone;
Expand All @@ -17,6 +20,11 @@

public class SubstraitTest extends AbstractTestBase {

@Test
public void loadSubStrait() throws IOException {
SimpleExtension.ExtensionCollection extensionCollection = SimpleExtension.loadDefaults();
}

@Test
public void dateTypeTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class LakeSoulDataGenSourceTable {
* --write.time 5
*/
public static void main(String[] args) throws Exception {

ParameterTool parameter = ParameterTool.fromArgs(args);

String sinkDBName = parameter.get("sink.database.name", "flink_source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class LakeSoulSourceToSinkTable {
* --flink.checkpoint /tmp/chk
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {

ParameterTool parameter = ParameterTool.fromArgs(args);

String sourceDBName = parameter.get("source.database.name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Test;

import java.io.IOException;
import java.util.*;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.INFERRING_SCHEMA;
Expand Down Expand Up @@ -167,9 +168,10 @@ public void testManualArrowBatch() throws Exception {
tEnv.executeSql("select * from `default`.`qar_table`").print();
}

// @Test
public void testLakeSoulArrowSource() throws Exception {
public static void main(String[] args) throws Exception {

int parallelism = 2;

StreamExecutionEnvironment execEnv = LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 2000L, 2000L);

Configuration conf = new Configuration();
Expand Down

0 comments on commit 0a3e6bc

Please sign in to comment.