From e9068462dbebd6b15a49b0299fc7564f0c37f777 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Tue, 23 Apr 2024 22:00:54 -0700 Subject: [PATCH] Batch write test case passed --- java/core/pom.xml | 5 - .../main/java/com/lancedb/lance/Dataset.java | 1 + .../com/lancedb/lance/FragmentMetadata.java | 4 +- .../java/com/lancedb/lance/TestUtils.java | 1 - .../com/lancedb/lance/spark/SparkCatalog.java | 24 ++-- .../lancedb/lance/spark/SparkSchemaUtils.java | 110 ------------------ .../lance/spark/source/SparkTable.java | 10 +- .../lance/spark/source/SparkWrite.java | 26 +++-- .../lance/spark/source/SparkWriteBuilder.java | 10 +- .../lancedb/lance/spark/SparkCatalogTest.java | 107 +++++++++++------ 10 files changed, 109 insertions(+), 189 deletions(-) delete mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java diff --git a/java/core/pom.xml b/java/core/pom.xml index b18511aa55..dcd02cd05b 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -48,9 +48,6 @@ - - - build-jni @@ -85,6 +82,4 @@ - - diff --git a/java/core/src/main/java/com/lancedb/lance/Dataset.java b/java/core/src/main/java/com/lancedb/lance/Dataset.java index 6f2552d656..347d10e7eb 100644 --- a/java/core/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/core/src/main/java/com/lancedb/lance/Dataset.java @@ -24,6 +24,7 @@ import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Schema; /** diff --git a/java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java b/java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java index 94107fc530..36a96ff9f4 100644 --- a/java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java +++ b/java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java @@ -14,13 +14,15 @@ package com.lancedb.lance; +import java.io.Serializable; import org.json.JSONObject; /** * Metadata of a Fragment in the dataset. * Matching to lance Fragment. * */ -public class FragmentMetadata { +public class FragmentMetadata implements Serializable { + private static final long serialVersionUID = -5886811251944130460L; private static final String ID_KEY = "id"; private static final String PHYSICAL_ROWS_KEY = "physical_rows"; private final String jsonMetadata; diff --git a/java/core/src/test/java/com/lancedb/lance/TestUtils.java b/java/core/src/test/java/com/lancedb/lance/TestUtils.java index cc24a635a7..5182ba50bb 100644 --- a/java/core/src/test/java/com/lancedb/lance/TestUtils.java +++ b/java/core/src/test/java/com/lancedb/lance/TestUtils.java @@ -76,7 +76,6 @@ public FragmentMetadata createNewFragment(int fragmentId, int rowCount) { FragmentMetadata fragmentMeta; try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { root.allocateNew(); - // Fill data VarCharVector nameVector = (VarCharVector) root.getVector("name"); IntVector ageVector = (IntVector) root.getVector("age"); for (int i = 0; i < rowCount; i++) { diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java index d9aade02b2..b9d7794506 100644 --- a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java @@ -21,14 +21,10 @@ import java.nio.file.Path; import java.time.ZoneId; import java.util.Map; -import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -51,8 +47,8 @@ static BufferAllocator newChildAllocator(String name, long initialReservation, } @Override - public Identifier[] listTables(String[] strings) throws NoSuchNamespaceException { - return new Identifier[0]; + public Identifier[] listTables(String[] strings) { + throw new UnsupportedOperationException("Lance spark listTables"); } @Override @@ -62,7 +58,8 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException { "load table reader for Lance", 0, Long.MAX_VALUE); Dataset dataset = Dataset.open(datasetUri, allocator)) { Schema schema = dataset.getSchema(); - return new SparkTable(datasetUri, schema, identifier.name(), ArrowUtils.fromArrowSchema( + // TODO(lu) Support type e.g. FixedSizeListArray + return new SparkTable(datasetUri, identifier.name(), ArrowUtils.fromArrowSchema( schema)); } catch (RuntimeException | IOException e) { throw new NoSuchTableException(identifier); @@ -79,24 +76,23 @@ public Table createTable(Identifier identifier, StructType structType, "create table loader for Lance", 0, Long.MAX_VALUE)) { Dataset.create(allocator, datasetUri, arrowSchema, new WriteParams.Builder().build()).close(); - return new SparkTable(datasetUri, arrowSchema, identifier.name(), structType); + return new SparkTable(datasetUri, identifier.name(), structType); } } @Override - public Table alterTable(Identifier identifier, TableChange... tableChanges) - throws NoSuchTableException { - return null; + public Table alterTable(Identifier identifier, TableChange... tableChanges) { + throw new UnsupportedOperationException("Lance spark alterTable"); } @Override public boolean dropTable(Identifier identifier) { - return false; + throw new UnsupportedOperationException("Lance spark dropTable"); } @Override - public void renameTable(Identifier identifier, Identifier identifier1) - throws NoSuchTableException, TableAlreadyExistsException { + public void renameTable(Identifier identifier, Identifier identifier1) { + throw new UnsupportedOperationException("Lance spark renameTable"); } @Override diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java deleted file mode 100644 index 6fa84795c2..0000000000 --- a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.lancedb.lance.spark; - -import java.util.ArrayList; -import java.util.List; -import org.apache.arrow.vector.types.FloatingPointPrecision; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.FieldType; -import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.FloatType; -import org.apache.spark.sql.types.IntegerType; -import org.apache.spark.sql.types.LongType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -/** - * Spark schema utils. - */ -public class SparkSchemaUtils { - /** - * Convert Arrow Schema to Spark struct type. - * - * @param arrowSchema arrow schema - * @return Spark struct type - */ - public static StructType convert(Schema arrowSchema) { - List sparkFields = new ArrayList<>(); - for (Field field : arrowSchema.getFields()) { - StructField sparkField = new StructField(field.getName(), - convert(field.getFieldType()), field.isNullable(), Metadata.empty()); - sparkFields.add(sparkField); - } - return new StructType(sparkFields.toArray(new StructField[0])); - } - - /** - * Convert Spark struct type to Arrow schema. - * - * @param structType spark struct type - * @return Arrow schema - */ - public static Schema convert(StructType structType) { - List arrowFields = new ArrayList<>(); - for (StructField field : structType.fields()) { - arrowFields.add(new Field(field.name(), - new FieldType(field.nullable(), convert(field.dataType()), null, null), - null)); - } - return new Schema(arrowFields); - } - - private static ArrowType convert(DataType dataType) { - if (dataType instanceof IntegerType) { - return new ArrowType.Int(32, true); - } else if (dataType instanceof LongType) { - return new ArrowType.Int(64, true); - } else if (dataType instanceof StringType) { - return new ArrowType.Utf8(); - } else if (dataType instanceof DoubleType) { - return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); - } else if (dataType instanceof FloatType) { - return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); - } else { - throw new UnsupportedOperationException("Unsupported Spark type: " + dataType); - } - } - - private static DataType convert(org.apache.arrow.vector.types.pojo.FieldType fieldType) { - ArrowType arrowType = fieldType.getType(); - if (arrowType instanceof ArrowType.Int) { - ArrowType.Int intType = (ArrowType.Int) arrowType; - if (intType.getBitWidth() == 32) { - return DataTypes.IntegerType; - } else if (intType.getBitWidth() == 64) { - return DataTypes.LongType; - } - } else if (arrowType instanceof ArrowType.Utf8) { - return DataTypes.StringType; - } else if (arrowType instanceof ArrowType.FloatingPoint) { - ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; - if (fpType.getPrecision() == FloatingPointPrecision.SINGLE) { - return DataTypes.FloatType; - } else if (fpType.getPrecision() == FloatingPointPrecision.DOUBLE) { - return DataTypes.DoubleType; - } - } - throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType); - } - - private SparkSchemaUtils() {} -} diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java index ff179cd326..15de26e515 100644 --- a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableSet; import java.util.Set; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -36,7 +35,6 @@ public class SparkTable implements SupportsRead, SupportsWrite { // Lance parameters private final String datasetUri; - private final Schema arrowSchema; // Spark parameters private final String tableName; private final StructType sparkSchema; @@ -45,26 +43,24 @@ public class SparkTable implements SupportsRead, SupportsWrite { * Creates a spark table. * * @param datasetUri the lance dataset uri - * @param arrowSchema arrow schema * @param tableName table name * @param sparkSchema spark struct type */ - public SparkTable(String datasetUri, Schema arrowSchema, + public SparkTable(String datasetUri, String tableName, StructType sparkSchema) { this.datasetUri = datasetUri; - this.arrowSchema = arrowSchema; this.tableName = tableName; this.sparkSchema = sparkSchema; } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) { - return null; + throw new UnsupportedOperationException("Lance Spark scan"); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return new SparkWriteBuilder(datasetUri, arrowSchema, info); + return new SparkWriteBuilder(datasetUri, sparkSchema, info); } @Override diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java index 58b1de847f..0b306ea312 100644 --- a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.stream.Collector; import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -38,18 +37,20 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; import org.apache.spark.sql.execution.arrow.ArrowWriter; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.ArrowUtils; /** * Spark write. */ public class SparkWrite implements Write { private final String datasetUri; - private final Schema arrowSchema; + private final StructType sparkSchema; private final LogicalWriteInfo info; - SparkWrite(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) { + SparkWrite(String datasetUri, StructType sparkSchema, LogicalWriteInfo info) { this.datasetUri = datasetUri; - this.arrowSchema = arrowSchema; + this.sparkSchema = sparkSchema; this.info = info; } @@ -64,7 +65,7 @@ public StreamingWrite toStreaming() { } private WriterFactory createWriterFactory() { - return new WriterFactory(datasetUri, arrowSchema); + return new WriterFactory(datasetUri, sparkSchema); } private class BatchAppend extends BaseBatchWrite { @@ -106,17 +107,17 @@ public String toString() { private static class WriterFactory implements DataWriterFactory { private final String datasetUri; - private final Schema arrowSchema; + private final StructType sparkSchema; - protected WriterFactory(String datasetUri, Schema arrowSchema) { - // Execute at Spark executor + protected WriterFactory(String datasetUri, StructType sparkSchema) { + // Everything passed to writer factory should be serializable this.datasetUri = datasetUri; - this.arrowSchema = arrowSchema; + this.sparkSchema = sparkSchema; } @Override public DataWriter createWriter(int partitionId, long taskId) { - return new UnpartitionedDataWriter(datasetUri, arrowSchema); + return new UnpartitionedDataWriter(datasetUri, sparkSchema); } } @@ -126,10 +127,11 @@ private static class UnpartitionedDataWriter implements DataWriter private final VectorSchemaRoot root; private final ArrowWriter writer; - private UnpartitionedDataWriter(String datasetUri, Schema arrowSchema) { - // TODO(lu) maxRecordPerBatch, turn to batch write + private UnpartitionedDataWriter(String datasetUri, StructType sparkSchema) { + // TODO(lu) add Lance Spark configuration of maxRowsPerFragment? this.datasetUri = datasetUri; this.allocator = new RootAllocator(Long.MAX_VALUE); + Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, datasetUri, true, false); root = VectorSchemaRoot.create(arrowSchema, allocator); writer = ArrowWriter.create(root); } diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java index 7c619efdc7..619f3483b4 100644 --- a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java @@ -14,27 +14,27 @@ package com.lancedb.lance.spark.source; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.connector.write.Write; import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; /** * Spark write builder. */ public class SparkWriteBuilder implements WriteBuilder { private final String datasetUri; - private final Schema arrowSchema; + private final StructType sparkSchema; private final LogicalWriteInfo info; - SparkWriteBuilder(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) { + SparkWriteBuilder(String datasetUri, StructType sparkSchema, LogicalWriteInfo info) { this.datasetUri = datasetUri; - this.arrowSchema = arrowSchema; + this.sparkSchema = sparkSchema; this.info = info; } @Override public Write build() { - return new SparkWrite(datasetUri, arrowSchema, info); + return new SparkWrite(datasetUri, sparkSchema, info); } } diff --git a/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java index df4880b247..6c6272098e 100644 --- a/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java +++ b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java @@ -14,6 +14,8 @@ package com.lancedb.lance.spark; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -28,22 +30,28 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.TimeoutException; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class SparkCatalogTest { @TempDir - static Path tempDir; + static Path warehouse; private static SparkSession spark; + private static String catalog = "my_catalog"; + private static String table_prefix = catalog + ".default."; @BeforeAll static void setup() { spark = SparkSession.builder() .appName("SparkCatalogTest") .master("local") - .config("spark.sql.catalog.dev", "com.lancedb.lance.spark.SparkCatalog") - .config("spark.sql.catalog.dev.warehouse", tempDir.toString()) + .config("spark.sql.catalog." + catalog, "com.lancedb.lance.spark.SparkCatalog") + .config("spark.sql.catalog." + catalog + ".warehouse", warehouse.toString()) .getOrCreate(); } @@ -55,48 +63,79 @@ static void tearDown() { } @Test - public void testCreate() { - String tableName = "dev.db.lance_create_table"; - // Workflow: loadTable with NoSuchTable -> createTable -> loadTable - createTable(tableName); - // Workflow: [Gap] dropTable - // spark.sql("DROP TABLE " + tableName); + public void testCreate() throws IOException { + String tableName = "lance_create_table"; + createTable(table_prefix + tableName); + String datasetPath = warehouse.resolve(tableName).toString(); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + com.lancedb.lance.Dataset dataset = com.lancedb.lance.Dataset.open(datasetPath, allocator)) { + assertEquals(1, dataset.version()); + assertEquals(0, dataset.countRows()); + } } @Test - public void testInsert() { - String tableName = "dev.db.lance_insert_table"; - createTable(tableName); - // Workflow: loadTable - // -> [Gap] SparkTable.newWriteBuilder -> BatchWrite -> lance append - spark.sql("INSERT INTO " + tableName + public void testInsert() throws IOException { + String tableName = "lance_insert_table"; + createTable(table_prefix + tableName); + spark.sql("INSERT INTO " + table_prefix + tableName + " VALUES ('100', '2015-01-01', '2015-01-01T13:51:39.340396Z'), ('101', '2015-01-01', '2015-01-01T12:14:58.597216Z')"); - // Workflow: loadTable - // -> [Gap] SparkScanBuilder.pushAggregation().build() - // -> [Gap] LocalScan.readSchema() -> [Gap] LocalScan.rows[] - spark.sql("SELECT * FROM " + tableName).show(); - spark.sql("SELECT COUNT(*) FROM " + tableName).show(); + String datasetPath = warehouse.resolve(tableName).toString(); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + com.lancedb.lance.Dataset dataset = com.lancedb.lance.Dataset.open(datasetPath, allocator)) { + assertEquals(2, dataset.version()); + assertEquals(2, dataset.countRows()); + } } @Test - @Disabled - public void testBatchWriteTable() throws TableAlreadyExistsException { + public void testBatchWriteTable() throws TableAlreadyExistsException, IOException { Dataset data = createSparkDataFrame(); - String tableName = "dev.db.lance_df_table"; - // Same as create + insert - data.writeTo(tableName).using("lance").create(); - spark.table(tableName).show(); - // Add check manifest created + String tableName = "lance_bath_write_table"; + data.writeTo(table_prefix + tableName).using("lance").create(); + String datasetPath = warehouse.resolve(tableName).toString(); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + com.lancedb.lance.Dataset dataset = com.lancedb.lance.Dataset.open(datasetPath, allocator)) { + assertEquals(2, dataset.version()); + assertEquals(4, dataset.countRows()); + } } @Test - @Disabled - public void testBatchAppendTable() throws NoSuchTableException { + public void testBatchAppendTable() throws NoSuchTableException, IOException { Dataset data = createSparkDataFrame(); - String tableName = "dev.db.lance_df_append_table"; + String tableName = "lance_batch_append_table"; + String fullName = table_prefix + "lance_batch_append_table"; + createTable(fullName); + data.writeTo(fullName).append(); + String datasetPath = warehouse.resolve(tableName).toString(); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + com.lancedb.lance.Dataset dataset = com.lancedb.lance.Dataset.open(datasetPath, allocator)) { + assertEquals(2, dataset.version()); + assertEquals(4, dataset.countRows()); + } + } + + @Test + @Disabled + public void testDrop() { + String tableName = table_prefix + "lance_drop_table"; createTable(tableName); - // Same as insert - data.writeTo(tableName).append(); + spark.sql("DROP TABLE " + tableName); + } + + @Test + @Disabled + public void testScan() { + String tableName = table_prefix + "lance_insert_table"; + createTable(tableName); + spark.sql("INSERT INTO " + tableName + + " VALUES ('100', '2015-01-01', '2015-01-01T13:51:39.340396Z'), ('101', '2015-01-01', '2015-01-01T12:14:58.597216Z')"); + // Workflow: loadTable + // -> [Gap] SparkScanBuilder.pushAggregation().build() + // -> [Gap] LocalScan.readSchema() -> [Gap] LocalScan.rows[] + spark.sql("SELECT * FROM " + tableName).show(); + spark.sql("SELECT COUNT(*) FROM " + tableName).show(); spark.table(tableName).show(); } @@ -104,7 +143,7 @@ public void testBatchAppendTable() throws NoSuchTableException { @Disabled public void testStreamingWriteTable() throws NoSuchTableException, TimeoutException { Dataset data = createSparkDataFrame(); - String tableName = "dev.db.lance_streaming_table"; + String tableName = table_prefix + "lance_streaming_table"; data.writeStream().format("lance").outputMode("append").toTable(tableName); spark.table(tableName).show(); } @@ -113,7 +152,7 @@ public void testStreamingWriteTable() throws NoSuchTableException, TimeoutExcept @Disabled public void testStreamingAppendTable() throws NoSuchTableException, TimeoutException { Dataset data = createSparkDataFrame(); - String tableName = "dev.db.lance_streaming_append_table"; + String tableName = table_prefix + "lance_streaming_append_table"; createTable(tableName); data.writeStream().format("lance").outputMode("append").toTable(tableName); spark.table(tableName).show();