From da7ac2c4d554768fcd6ed25915f6f5041ecdbc2e Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Tue, 9 Apr 2024 20:43:45 -0700 Subject: [PATCH] Add spark create, load, part of batch write --- java/pom.xml | 1 + java/spark/pom.xml | 22 +++ java/spark/v3.5/pom.xml | 36 ++++ .../com/lancedb/lance/spark/SparkCatalog.java | 111 ++++++++++++ .../lancedb/lance/spark/SparkSchemaUtils.java | 110 ++++++++++++ .../lance/spark/source/SparkTable.java | 84 +++++++++ .../lance/spark/source/SparkWrite.java | 164 ++++++++++++++++++ .../lance/spark/source/SparkWriteBuilder.java | 40 +++++ .../lancedb/lance/spark/SparkCatalogTest.java | 144 +++++++++++++++ 9 files changed, 712 insertions(+) create mode 100644 java/spark/pom.xml create mode 100644 java/spark/v3.5/pom.xml create mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java create mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java create mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java create mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java create mode 100644 java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java create mode 100644 java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java diff --git a/java/pom.xml b/java/pom.xml index 2493c1eba2..ffd665ec55 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -20,6 +20,7 @@ core + spark diff --git a/java/spark/pom.xml b/java/spark/pom.xml new file mode 100644 index 0000000000..a403914923 --- /dev/null +++ b/java/spark/pom.xml @@ -0,0 +1,22 @@ + + + + 4.0.0 + + + com.lancedb + lance-parent + 0.1-SNAPSHOT + ../pom.xml + + + lance-spark + Lance Spark Connector + pom + + + v3.5 + + diff --git a/java/spark/v3.5/pom.xml b/java/spark/v3.5/pom.xml new file mode 100644 index 0000000000..79ad5e12f3 --- /dev/null +++ b/java/spark/v3.5/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + com.lancedb + lance-spark + 0.1-SNAPSHOT + ../pom.xml + + + lance-spark-3.5_2.12 + Lance Connector with Spark v3.5 scala 2.12 + jar + + + + org.apache.spark + spark-sql_2.13 + 3.5.1 + + + com.lancedb + lance-core + ${project.version} + + + org.junit.jupiter + junit-jupiter + test + + + diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java new file mode 100644 index 0000000000..d9aade02b2 --- /dev/null +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.WriteParams; +import com.lancedb.lance.spark.source.SparkTable; +import java.io.IOException; +import java.nio.file.Path; +import java.time.ZoneId; +import java.util.Map; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.ArrowUtils; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * Lance Spark Catalog. + */ +public class SparkCatalog implements TableCatalog { + private static final BufferAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE); + private Path warehouse = null; + + static BufferAllocator newChildAllocator(String name, long initialReservation, + long maxAllocation) { + return rootAllocator.newChildAllocator(name, initialReservation, maxAllocation); + } + + @Override + public Identifier[] listTables(String[] strings) throws NoSuchNamespaceException { + return new Identifier[0]; + } + + @Override + public Table loadTable(Identifier identifier) throws NoSuchTableException { + String datasetUri = warehouse.resolve(identifier.name()).toString(); + try (BufferAllocator allocator = newChildAllocator( + "load table reader for Lance", 0, Long.MAX_VALUE); + Dataset dataset = Dataset.open(datasetUri, allocator)) { + Schema schema = dataset.getSchema(); + return new SparkTable(datasetUri, schema, identifier.name(), ArrowUtils.fromArrowSchema( + schema)); + } catch (RuntimeException | IOException e) { + throw new NoSuchTableException(identifier); + } + } + + @Override + public Table createTable(Identifier identifier, StructType structType, + Transform[] transforms, Map map) { + String datasetUri = warehouse.resolve(identifier.name()).toString(); + Schema arrowSchema = ArrowUtils.toArrowSchema( + structType, ZoneId.systemDefault().getId(), true, false); + try (BufferAllocator allocator = newChildAllocator( + "create table loader for Lance", 0, Long.MAX_VALUE)) { + Dataset.create(allocator, datasetUri, arrowSchema, + new WriteParams.Builder().build()).close(); + return new SparkTable(datasetUri, arrowSchema, identifier.name(), structType); + } + } + + @Override + public Table alterTable(Identifier identifier, TableChange... tableChanges) + throws NoSuchTableException { + return null; + } + + @Override + public boolean dropTable(Identifier identifier) { + return false; + } + + @Override + public void renameTable(Identifier identifier, Identifier identifier1) + throws NoSuchTableException, TableAlreadyExistsException { + } + + @Override + public void initialize(String s, CaseInsensitiveStringMap caseInsensitiveStringMap) { + this.warehouse = Path.of(caseInsensitiveStringMap.get("warehouse")); + } + + @Override + public String name() { + return "lance"; + } +} diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java new file mode 100644 index 0000000000..6fa84795c2 --- /dev/null +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark; + +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * Spark schema utils. + */ +public class SparkSchemaUtils { + /** + * Convert Arrow Schema to Spark struct type. + * + * @param arrowSchema arrow schema + * @return Spark struct type + */ + public static StructType convert(Schema arrowSchema) { + List sparkFields = new ArrayList<>(); + for (Field field : arrowSchema.getFields()) { + StructField sparkField = new StructField(field.getName(), + convert(field.getFieldType()), field.isNullable(), Metadata.empty()); + sparkFields.add(sparkField); + } + return new StructType(sparkFields.toArray(new StructField[0])); + } + + /** + * Convert Spark struct type to Arrow schema. + * + * @param structType spark struct type + * @return Arrow schema + */ + public static Schema convert(StructType structType) { + List arrowFields = new ArrayList<>(); + for (StructField field : structType.fields()) { + arrowFields.add(new Field(field.name(), + new FieldType(field.nullable(), convert(field.dataType()), null, null), + null)); + } + return new Schema(arrowFields); + } + + private static ArrowType convert(DataType dataType) { + if (dataType instanceof IntegerType) { + return new ArrowType.Int(32, true); + } else if (dataType instanceof LongType) { + return new ArrowType.Int(64, true); + } else if (dataType instanceof StringType) { + return new ArrowType.Utf8(); + } else if (dataType instanceof DoubleType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + } else if (dataType instanceof FloatType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + } else { + throw new UnsupportedOperationException("Unsupported Spark type: " + dataType); + } + } + + private static DataType convert(org.apache.arrow.vector.types.pojo.FieldType fieldType) { + ArrowType arrowType = fieldType.getType(); + if (arrowType instanceof ArrowType.Int) { + ArrowType.Int intType = (ArrowType.Int) arrowType; + if (intType.getBitWidth() == 32) { + return DataTypes.IntegerType; + } else if (intType.getBitWidth() == 64) { + return DataTypes.LongType; + } + } else if (arrowType instanceof ArrowType.Utf8) { + return DataTypes.StringType; + } else if (arrowType instanceof ArrowType.FloatingPoint) { + ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; + if (fpType.getPrecision() == FloatingPointPrecision.SINGLE) { + return DataTypes.FloatType; + } else if (fpType.getPrecision() == FloatingPointPrecision.DOUBLE) { + return DataTypes.DoubleType; + } + } + throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType); + } + + private SparkSchemaUtils() {} +} diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java new file mode 100644 index 0000000000..ff179cd326 --- /dev/null +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark.source; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * Lance Spark Table. +*/ +public class SparkTable implements SupportsRead, SupportsWrite { + private static final Set CAPABILITIES = + ImmutableSet.of( + TableCapability.BATCH_WRITE); + + // Lance parameters + private final String datasetUri; + private final Schema arrowSchema; + // Spark parameters + private final String tableName; + private final StructType sparkSchema; + + /** + * Creates a spark table. + * + * @param datasetUri the lance dataset uri + * @param arrowSchema arrow schema + * @param tableName table name + * @param sparkSchema spark struct type + */ + public SparkTable(String datasetUri, Schema arrowSchema, + String tableName, StructType sparkSchema) { + this.datasetUri = datasetUri; + this.arrowSchema = arrowSchema; + this.tableName = tableName; + this.sparkSchema = sparkSchema; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) { + return null; + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + return new SparkWriteBuilder(datasetUri, arrowSchema, info); + } + + @Override + public String name() { + return this.tableName; + } + + @Override + public StructType schema() { + return this.sparkSchema; + } + + @Override + public Set capabilities() { + return CAPABILITIES; + } +} diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java new file mode 100644 index 0000000000..67222b9af7 --- /dev/null +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java @@ -0,0 +1,164 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark.source; + +import com.lancedb.lance.Fragment; +import com.lancedb.lance.WriteParams; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.execution.arrow.ArrowWriter; + +/** + * Spark write. + */ +public class SparkWrite implements Write { + private final String datasetUri; + private final Schema arrowSchema; + private final LogicalWriteInfo info; + + SparkWrite(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) { + this.datasetUri = datasetUri; + this.arrowSchema = arrowSchema; + this.info = info; + } + + @Override + public BatchWrite toBatch() { + return new BatchAppend(); + } + + @Override + public StreamingWrite toStreaming() { + throw new UnsupportedOperationException(); + } + + private WriterFactory createWriterFactory() { + return new WriterFactory(datasetUri, arrowSchema); + } + + private class BatchAppend extends BaseBatchWrite { + @Override + public void commit(WriterCommitMessage[] messages) { + // TODO(lu) LanceOperation.Append + // TODO(lu) commit + } + } + + private abstract class BaseBatchWrite implements BatchWrite { + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + return createWriterFactory(); + } + + @Override + public boolean useCommitCoordinator() { + return false; + } + + @Override + public void abort(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return String.format("LanceBatchWrite(datasetUri=%s)", datasetUri); + } + } + + private static class WriterFactory implements DataWriterFactory { + private final String datasetUri; + private final Schema arrowSchema; + + protected WriterFactory(String datasetUri, Schema arrowSchema) { + // Execute at Spark executor + this.datasetUri = datasetUri; + this.arrowSchema = arrowSchema; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId) { + return new UnpartitionedDataWriter(datasetUri, arrowSchema); + } + } + + private static class UnpartitionedDataWriter implements DataWriter { + private final String datasetUri; + private final BufferAllocator allocator; + private final VectorSchemaRoot root; + private final ArrowWriter writer; + + private UnpartitionedDataWriter(String datasetUri, Schema arrowSchema) { + // TODO(lu) maxRecordPerBatch, turn to batch write + this.datasetUri = datasetUri; + this.allocator = new RootAllocator(Long.MAX_VALUE); + root = VectorSchemaRoot.create(arrowSchema, allocator); + writer = ArrowWriter.create(root); + } + + @Override + public void write(InternalRow record) { + writer.write(record); + } + + @Override + public WriterCommitMessage commit() { + writer.finish(); + return new TaskCommit(Arrays.asList( + Fragment.create(datasetUri, allocator, root, + Optional.empty(), new WriteParams.Builder().build()) + .getFragementId())); + } + + @Override + public void abort() { + close(); + } + + @Override + public void close() { + writer.reset(); + root.close(); + allocator.close(); + } + } + + /** Task commit. */ + public static class TaskCommit implements WriterCommitMessage { + private final List fragments; + + TaskCommit(List fragments) { + this.fragments = fragments; + } + + List getFragments() { + return fragments; + } + } +} diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java new file mode 100644 index 0000000000..7c619efdc7 --- /dev/null +++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark.source; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; + +/** + * Spark write builder. + */ +public class SparkWriteBuilder implements WriteBuilder { + private final String datasetUri; + private final Schema arrowSchema; + private final LogicalWriteInfo info; + + SparkWriteBuilder(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) { + this.datasetUri = datasetUri; + this.arrowSchema = arrowSchema; + this.info = info; + } + + @Override + public Write build() { + return new SparkWrite(datasetUri, arrowSchema, info); + } +} diff --git a/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java new file mode 100644 index 0000000000..8ddebb3526 --- /dev/null +++ b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lancedb.lance.spark; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; +import java.util.concurrent.TimeoutException; + +public class SparkCatalogTest { + @TempDir + static Path tempDir; + + private static SparkSession spark; + + @BeforeAll + static void setup() { + spark = SparkSession.builder() + .appName("SparkCatalogTest") + .master("local") + .config("spark.sql.catalog.dev", "com.lancedb.lance.spark.SparkCatalog") + .config("spark.sql.catalog.dev.warehouse", tempDir.toString()) + .getOrCreate(); + } + + @AfterAll + static void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + @Test + public void testCreate() { + String tableName = "dev.db.lance_create_table"; + // Workflow: loadTable with NoSuchTable -> createTable -> loadTable + createTable(tableName); + // Workflow: [Gap] dropTable + // spark.sql("DROP TABLE " + tableName); + } + + @Test + @Disabled + public void testInsert() { + String tableName = "dev.db.lance_insert_table"; + createTable(tableName); + // Workflow: loadTable + // -> [Gap] SparkTable.newWriteBuilder -> BatchWrite -> lance append + spark.sql("INSERT INTO " + tableName + + " VALUES ('100', '2015-01-01', '2015-01-01T13:51:39.340396Z'), ('101', '2015-01-01', '2015-01-01T12:14:58.597216Z')"); + // Workflow: loadTable + // -> [Gap] SparkScanBuilder.pushAggregation().build() + // -> [Gap] LocalScan.readSchema() -> [Gap] LocalScan.rows[] + spark.sql("SELECT * FROM " + tableName).show(); + spark.sql("SELECT COUNT(*) FROM " + tableName).show(); + } + + @Test + @Disabled + public void testBatchWriteTable() throws TableAlreadyExistsException { + Dataset data = createSparkDataFrame(); + String tableName = "dev.db.lance_df_table"; + // Same as create + insert + data.writeTo(tableName).using("lance").create(); + spark.table(tableName).show(); + // Add check manifest created + } + + @Test + @Disabled + public void testBatchAppendTable() throws NoSuchTableException { + Dataset data = createSparkDataFrame(); + String tableName = "dev.db.lance_df_append_table"; + createTable(tableName); + // Same as insert + data.writeTo(tableName).append(); + spark.table(tableName).show(); + } + + @Test + @Disabled + public void testStreamingWriteTable() throws NoSuchTableException, TimeoutException { + Dataset data = createSparkDataFrame(); + String tableName = "dev.db.lance_streaming_table"; + data.writeStream().format("lance").outputMode("append").toTable(tableName); + spark.table(tableName).show(); + } + + @Test + @Disabled + public void testStreamingAppendTable() throws NoSuchTableException, TimeoutException { + Dataset data = createSparkDataFrame(); + String tableName = "dev.db.lance_streaming_append_table"; + createTable(tableName); + data.writeStream().format("lance").outputMode("append").toTable(tableName); + spark.table(tableName).show(); + } + + private Dataset createSparkDataFrame() { + StructType schema = new StructType(new StructField[]{ + DataTypes.createStructField("id", DataTypes.StringType, false), + DataTypes.createStructField("creation_date", DataTypes.StringType, false), + DataTypes.createStructField("last_update_time", DataTypes.StringType, false) + }); + return spark.createDataFrame(java.util.Arrays.asList( + RowFactory.create("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), + RowFactory.create("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), + RowFactory.create("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), + RowFactory.create("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") + ), schema); + } + + private void createTable(String tableName) { + spark.sql("CREATE TABLE IF NOT EXISTS " + tableName + + "(id STRING, " + + "creation_date STRING, " + + "last_update_time STRING) " + + "USING lance"); + } +}