diff --git a/java/pom.xml b/java/pom.xml
index 2493c1eba2..ffd665ec55 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -20,6 +20,7 @@
core
+ spark
diff --git a/java/spark/pom.xml b/java/spark/pom.xml
new file mode 100644
index 0000000000..a403914923
--- /dev/null
+++ b/java/spark/pom.xml
@@ -0,0 +1,22 @@
+
+
+
+ 4.0.0
+
+
+ com.lancedb
+ lance-parent
+ 0.1-SNAPSHOT
+ ../pom.xml
+
+
+ lance-spark
+ Lance Spark Connector
+ pom
+
+
+ v3.5
+
+
diff --git a/java/spark/v3.5/pom.xml b/java/spark/v3.5/pom.xml
new file mode 100644
index 0000000000..79ad5e12f3
--- /dev/null
+++ b/java/spark/v3.5/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+
+ com.lancedb
+ lance-spark
+ 0.1-SNAPSHOT
+ ../pom.xml
+
+
+ lance-spark-3.5_2.12
+ Lance Connector with Spark v3.5 scala 2.12
+ jar
+
+
+
+ org.apache.spark
+ spark-sql_2.13
+ 3.5.1
+
+
+ com.lancedb
+ lance-core
+ ${project.version}
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+
diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java
new file mode 100644
index 0000000000..d9aade02b2
--- /dev/null
+++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkCatalog.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.spark.source.SparkTable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.Map;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * Lance Spark Catalog.
+ */
+public class SparkCatalog implements TableCatalog {
+ private static final BufferAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+ private Path warehouse = null;
+
+ static BufferAllocator newChildAllocator(String name, long initialReservation,
+ long maxAllocation) {
+ return rootAllocator.newChildAllocator(name, initialReservation, maxAllocation);
+ }
+
+ @Override
+ public Identifier[] listTables(String[] strings) throws NoSuchNamespaceException {
+ return new Identifier[0];
+ }
+
+ @Override
+ public Table loadTable(Identifier identifier) throws NoSuchTableException {
+ String datasetUri = warehouse.resolve(identifier.name()).toString();
+ try (BufferAllocator allocator = newChildAllocator(
+ "load table reader for Lance", 0, Long.MAX_VALUE);
+ Dataset dataset = Dataset.open(datasetUri, allocator)) {
+ Schema schema = dataset.getSchema();
+ return new SparkTable(datasetUri, schema, identifier.name(), ArrowUtils.fromArrowSchema(
+ schema));
+ } catch (RuntimeException | IOException e) {
+ throw new NoSuchTableException(identifier);
+ }
+ }
+
+ @Override
+ public Table createTable(Identifier identifier, StructType structType,
+ Transform[] transforms, Map map) {
+ String datasetUri = warehouse.resolve(identifier.name()).toString();
+ Schema arrowSchema = ArrowUtils.toArrowSchema(
+ structType, ZoneId.systemDefault().getId(), true, false);
+ try (BufferAllocator allocator = newChildAllocator(
+ "create table loader for Lance", 0, Long.MAX_VALUE)) {
+ Dataset.create(allocator, datasetUri, arrowSchema,
+ new WriteParams.Builder().build()).close();
+ return new SparkTable(datasetUri, arrowSchema, identifier.name(), structType);
+ }
+ }
+
+ @Override
+ public Table alterTable(Identifier identifier, TableChange... tableChanges)
+ throws NoSuchTableException {
+ return null;
+ }
+
+ @Override
+ public boolean dropTable(Identifier identifier) {
+ return false;
+ }
+
+ @Override
+ public void renameTable(Identifier identifier, Identifier identifier1)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ }
+
+ @Override
+ public void initialize(String s, CaseInsensitiveStringMap caseInsensitiveStringMap) {
+ this.warehouse = Path.of(caseInsensitiveStringMap.get("warehouse"));
+ }
+
+ @Override
+ public String name() {
+ return "lance";
+ }
+}
diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java
new file mode 100644
index 0000000000..6fa84795c2
--- /dev/null
+++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/SparkSchemaUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Spark schema utils.
+ */
+public class SparkSchemaUtils {
+ /**
+ * Convert Arrow Schema to Spark struct type.
+ *
+ * @param arrowSchema arrow schema
+ * @return Spark struct type
+ */
+ public static StructType convert(Schema arrowSchema) {
+ List sparkFields = new ArrayList<>();
+ for (Field field : arrowSchema.getFields()) {
+ StructField sparkField = new StructField(field.getName(),
+ convert(field.getFieldType()), field.isNullable(), Metadata.empty());
+ sparkFields.add(sparkField);
+ }
+ return new StructType(sparkFields.toArray(new StructField[0]));
+ }
+
+ /**
+ * Convert Spark struct type to Arrow schema.
+ *
+ * @param structType spark struct type
+ * @return Arrow schema
+ */
+ public static Schema convert(StructType structType) {
+ List arrowFields = new ArrayList<>();
+ for (StructField field : structType.fields()) {
+ arrowFields.add(new Field(field.name(),
+ new FieldType(field.nullable(), convert(field.dataType()), null, null),
+ null));
+ }
+ return new Schema(arrowFields);
+ }
+
+ private static ArrowType convert(DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return new ArrowType.Int(32, true);
+ } else if (dataType instanceof LongType) {
+ return new ArrowType.Int(64, true);
+ } else if (dataType instanceof StringType) {
+ return new ArrowType.Utf8();
+ } else if (dataType instanceof DoubleType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ } else if (dataType instanceof FloatType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ } else {
+ throw new UnsupportedOperationException("Unsupported Spark type: " + dataType);
+ }
+ }
+
+ private static DataType convert(org.apache.arrow.vector.types.pojo.FieldType fieldType) {
+ ArrowType arrowType = fieldType.getType();
+ if (arrowType instanceof ArrowType.Int) {
+ ArrowType.Int intType = (ArrowType.Int) arrowType;
+ if (intType.getBitWidth() == 32) {
+ return DataTypes.IntegerType;
+ } else if (intType.getBitWidth() == 64) {
+ return DataTypes.LongType;
+ }
+ } else if (arrowType instanceof ArrowType.Utf8) {
+ return DataTypes.StringType;
+ } else if (arrowType instanceof ArrowType.FloatingPoint) {
+ ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType;
+ if (fpType.getPrecision() == FloatingPointPrecision.SINGLE) {
+ return DataTypes.FloatType;
+ } else if (fpType.getPrecision() == FloatingPointPrecision.DOUBLE) {
+ return DataTypes.DoubleType;
+ }
+ }
+ throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType);
+ }
+
+ private SparkSchemaUtils() {}
+}
diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java
new file mode 100644
index 0000000000..ff179cd326
--- /dev/null
+++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkTable.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * Lance Spark Table.
+*/
+public class SparkTable implements SupportsRead, SupportsWrite {
+ private static final Set CAPABILITIES =
+ ImmutableSet.of(
+ TableCapability.BATCH_WRITE);
+
+ // Lance parameters
+ private final String datasetUri;
+ private final Schema arrowSchema;
+ // Spark parameters
+ private final String tableName;
+ private final StructType sparkSchema;
+
+ /**
+ * Creates a spark table.
+ *
+ * @param datasetUri the lance dataset uri
+ * @param arrowSchema arrow schema
+ * @param tableName table name
+ * @param sparkSchema spark struct type
+ */
+ public SparkTable(String datasetUri, Schema arrowSchema,
+ String tableName, StructType sparkSchema) {
+ this.datasetUri = datasetUri;
+ this.arrowSchema = arrowSchema;
+ this.tableName = tableName;
+ this.sparkSchema = sparkSchema;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) {
+ return null;
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+ return new SparkWriteBuilder(datasetUri, arrowSchema, info);
+ }
+
+ @Override
+ public String name() {
+ return this.tableName;
+ }
+
+ @Override
+ public StructType schema() {
+ return this.sparkSchema;
+ }
+
+ @Override
+ public Set capabilities() {
+ return CAPABILITIES;
+ }
+}
diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java
new file mode 100644
index 0000000000..67222b9af7
--- /dev/null
+++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWrite.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark.source;
+
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.WriteParams;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.execution.arrow.ArrowWriter;
+
+/**
+ * Spark write.
+ */
+public class SparkWrite implements Write {
+ private final String datasetUri;
+ private final Schema arrowSchema;
+ private final LogicalWriteInfo info;
+
+ SparkWrite(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) {
+ this.datasetUri = datasetUri;
+ this.arrowSchema = arrowSchema;
+ this.info = info;
+ }
+
+ @Override
+ public BatchWrite toBatch() {
+ return new BatchAppend();
+ }
+
+ @Override
+ public StreamingWrite toStreaming() {
+ throw new UnsupportedOperationException();
+ }
+
+ private WriterFactory createWriterFactory() {
+ return new WriterFactory(datasetUri, arrowSchema);
+ }
+
+ private class BatchAppend extends BaseBatchWrite {
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ // TODO(lu) LanceOperation.Append
+ // TODO(lu) commit
+ }
+ }
+
+ private abstract class BaseBatchWrite implements BatchWrite {
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ return createWriterFactory();
+ }
+
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("LanceBatchWrite(datasetUri=%s)", datasetUri);
+ }
+ }
+
+ private static class WriterFactory implements DataWriterFactory {
+ private final String datasetUri;
+ private final Schema arrowSchema;
+
+ protected WriterFactory(String datasetUri, Schema arrowSchema) {
+ // Execute at Spark executor
+ this.datasetUri = datasetUri;
+ this.arrowSchema = arrowSchema;
+ }
+
+ @Override
+ public DataWriter createWriter(int partitionId, long taskId) {
+ return new UnpartitionedDataWriter(datasetUri, arrowSchema);
+ }
+ }
+
+ private static class UnpartitionedDataWriter implements DataWriter {
+ private final String datasetUri;
+ private final BufferAllocator allocator;
+ private final VectorSchemaRoot root;
+ private final ArrowWriter writer;
+
+ private UnpartitionedDataWriter(String datasetUri, Schema arrowSchema) {
+ // TODO(lu) maxRecordPerBatch, turn to batch write
+ this.datasetUri = datasetUri;
+ this.allocator = new RootAllocator(Long.MAX_VALUE);
+ root = VectorSchemaRoot.create(arrowSchema, allocator);
+ writer = ArrowWriter.create(root);
+ }
+
+ @Override
+ public void write(InternalRow record) {
+ writer.write(record);
+ }
+
+ @Override
+ public WriterCommitMessage commit() {
+ writer.finish();
+ return new TaskCommit(Arrays.asList(
+ Fragment.create(datasetUri, allocator, root,
+ Optional.empty(), new WriteParams.Builder().build())
+ .getFragementId()));
+ }
+
+ @Override
+ public void abort() {
+ close();
+ }
+
+ @Override
+ public void close() {
+ writer.reset();
+ root.close();
+ allocator.close();
+ }
+ }
+
+ /** Task commit. */
+ public static class TaskCommit implements WriterCommitMessage {
+ private final List fragments;
+
+ TaskCommit(List fragments) {
+ this.fragments = fragments;
+ }
+
+ List getFragments() {
+ return fragments;
+ }
+ }
+}
diff --git a/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java
new file mode 100644
index 0000000000..7c619efdc7
--- /dev/null
+++ b/java/spark/v3.5/src/main/java/com/lancedb/lance/spark/source/SparkWriteBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark.source;
+
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+
+/**
+ * Spark write builder.
+ */
+public class SparkWriteBuilder implements WriteBuilder {
+ private final String datasetUri;
+ private final Schema arrowSchema;
+ private final LogicalWriteInfo info;
+
+ SparkWriteBuilder(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) {
+ this.datasetUri = datasetUri;
+ this.arrowSchema = arrowSchema;
+ this.info = info;
+ }
+
+ @Override
+ public Write build() {
+ return new SparkWrite(datasetUri, arrowSchema, info);
+ }
+}
diff --git a/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java
new file mode 100644
index 0000000000..8ddebb3526
--- /dev/null
+++ b/java/spark/v3.5/src/test/java/com/lancedb/lance/spark/SparkCatalogTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.lancedb.lance.spark;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.nio.file.Path;
+import java.util.concurrent.TimeoutException;
+
+public class SparkCatalogTest {
+ @TempDir
+ static Path tempDir;
+
+ private static SparkSession spark;
+
+ @BeforeAll
+ static void setup() {
+ spark = SparkSession.builder()
+ .appName("SparkCatalogTest")
+ .master("local")
+ .config("spark.sql.catalog.dev", "com.lancedb.lance.spark.SparkCatalog")
+ .config("spark.sql.catalog.dev.warehouse", tempDir.toString())
+ .getOrCreate();
+ }
+
+ @AfterAll
+ static void tearDown() {
+ if (spark != null) {
+ spark.stop();
+ }
+ }
+
+ @Test
+ public void testCreate() {
+ String tableName = "dev.db.lance_create_table";
+ // Workflow: loadTable with NoSuchTable -> createTable -> loadTable
+ createTable(tableName);
+ // Workflow: [Gap] dropTable
+ // spark.sql("DROP TABLE " + tableName);
+ }
+
+ @Test
+ @Disabled
+ public void testInsert() {
+ String tableName = "dev.db.lance_insert_table";
+ createTable(tableName);
+ // Workflow: loadTable
+ // -> [Gap] SparkTable.newWriteBuilder -> BatchWrite -> lance append
+ spark.sql("INSERT INTO " + tableName
+ + " VALUES ('100', '2015-01-01', '2015-01-01T13:51:39.340396Z'), ('101', '2015-01-01', '2015-01-01T12:14:58.597216Z')");
+ // Workflow: loadTable
+ // -> [Gap] SparkScanBuilder.pushAggregation().build()
+ // -> [Gap] LocalScan.readSchema() -> [Gap] LocalScan.rows[]
+ spark.sql("SELECT * FROM " + tableName).show();
+ spark.sql("SELECT COUNT(*) FROM " + tableName).show();
+ }
+
+ @Test
+ @Disabled
+ public void testBatchWriteTable() throws TableAlreadyExistsException {
+ Dataset data = createSparkDataFrame();
+ String tableName = "dev.db.lance_df_table";
+ // Same as create + insert
+ data.writeTo(tableName).using("lance").create();
+ spark.table(tableName).show();
+ // Add check manifest created
+ }
+
+ @Test
+ @Disabled
+ public void testBatchAppendTable() throws NoSuchTableException {
+ Dataset data = createSparkDataFrame();
+ String tableName = "dev.db.lance_df_append_table";
+ createTable(tableName);
+ // Same as insert
+ data.writeTo(tableName).append();
+ spark.table(tableName).show();
+ }
+
+ @Test
+ @Disabled
+ public void testStreamingWriteTable() throws NoSuchTableException, TimeoutException {
+ Dataset data = createSparkDataFrame();
+ String tableName = "dev.db.lance_streaming_table";
+ data.writeStream().format("lance").outputMode("append").toTable(tableName);
+ spark.table(tableName).show();
+ }
+
+ @Test
+ @Disabled
+ public void testStreamingAppendTable() throws NoSuchTableException, TimeoutException {
+ Dataset data = createSparkDataFrame();
+ String tableName = "dev.db.lance_streaming_append_table";
+ createTable(tableName);
+ data.writeStream().format("lance").outputMode("append").toTable(tableName);
+ spark.table(tableName).show();
+ }
+
+ private Dataset createSparkDataFrame() {
+ StructType schema = new StructType(new StructField[]{
+ DataTypes.createStructField("id", DataTypes.StringType, false),
+ DataTypes.createStructField("creation_date", DataTypes.StringType, false),
+ DataTypes.createStructField("last_update_time", DataTypes.StringType, false)
+ });
+ return spark.createDataFrame(java.util.Arrays.asList(
+ RowFactory.create("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
+ RowFactory.create("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
+ RowFactory.create("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
+ RowFactory.create("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
+ ), schema);
+ }
+
+ private void createTable(String tableName) {
+ spark.sql("CREATE TABLE IF NOT EXISTS " + tableName +
+ "(id STRING, " +
+ "creation_date STRING, " +
+ "last_update_time STRING) " +
+ "USING lance");
+ }
+}