Skip to content

Commit

Permalink
Batch write test case passed
Browse files Browse the repository at this point in the history
  • Loading branch information
LuQQiu committed Apr 24, 2024
1 parent 6d2b28f commit e906846
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 189 deletions.
5 changes: 0 additions & 5 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@
</dependency>
</dependencies>

<profiles>
<profile>
<id>build-jni</id>
<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -85,6 +82,4 @@
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
1 change: 1 addition & 0 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

package com.lancedb.lance;

import java.io.Serializable;
import org.json.JSONObject;

/**
* Metadata of a Fragment in the dataset.
* Matching to lance Fragment.
* */
public class FragmentMetadata {
public class FragmentMetadata implements Serializable {
private static final long serialVersionUID = -5886811251944130460L;
private static final String ID_KEY = "id";
private static final String PHYSICAL_ROWS_KEY = "physical_rows";
private final String jsonMetadata;
Expand Down
1 change: 0 additions & 1 deletion java/core/src/test/java/com/lancedb/lance/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public FragmentMetadata createNewFragment(int fragmentId, int rowCount) {
FragmentMetadata fragmentMeta;
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
root.allocateNew();
// Fill data
VarCharVector nameVector = (VarCharVector) root.getVector("name");
IntVector ageVector = (IntVector) root.getVector("age");
for (int i = 0; i < rowCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.Map;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -51,8 +47,8 @@ static BufferAllocator newChildAllocator(String name, long initialReservation,
}

@Override
public Identifier[] listTables(String[] strings) throws NoSuchNamespaceException {
return new Identifier[0];
public Identifier[] listTables(String[] strings) {
throw new UnsupportedOperationException("Lance spark listTables");
}

@Override
Expand All @@ -62,7 +58,8 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
"load table reader for Lance", 0, Long.MAX_VALUE);
Dataset dataset = Dataset.open(datasetUri, allocator)) {
Schema schema = dataset.getSchema();
return new SparkTable(datasetUri, schema, identifier.name(), ArrowUtils.fromArrowSchema(
// TODO(lu) Support type e.g. FixedSizeListArray
return new SparkTable(datasetUri, identifier.name(), ArrowUtils.fromArrowSchema(
schema));
} catch (RuntimeException | IOException e) {
throw new NoSuchTableException(identifier);
Expand All @@ -79,24 +76,23 @@ public Table createTable(Identifier identifier, StructType structType,
"create table loader for Lance", 0, Long.MAX_VALUE)) {
Dataset.create(allocator, datasetUri, arrowSchema,
new WriteParams.Builder().build()).close();
return new SparkTable(datasetUri, arrowSchema, identifier.name(), structType);
return new SparkTable(datasetUri, identifier.name(), structType);
}
}

@Override
public Table alterTable(Identifier identifier, TableChange... tableChanges)
throws NoSuchTableException {
return null;
public Table alterTable(Identifier identifier, TableChange... tableChanges) {
throw new UnsupportedOperationException("Lance spark alterTable");
}

@Override
public boolean dropTable(Identifier identifier) {
return false;
throw new UnsupportedOperationException("Lance spark dropTable");
}

@Override
public void renameTable(Identifier identifier, Identifier identifier1)
throws NoSuchTableException, TableAlreadyExistsException {
public void renameTable(Identifier identifier, Identifier identifier1) {
throw new UnsupportedOperationException("Lance spark renameTable");
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
Expand All @@ -36,7 +35,6 @@ public class SparkTable implements SupportsRead, SupportsWrite {

// Lance parameters
private final String datasetUri;
private final Schema arrowSchema;
// Spark parameters
private final String tableName;
private final StructType sparkSchema;
Expand All @@ -45,26 +43,24 @@ public class SparkTable implements SupportsRead, SupportsWrite {
* Creates a spark table.
*
* @param datasetUri the lance dataset uri
* @param arrowSchema arrow schema
* @param tableName table name
* @param sparkSchema spark struct type
*/
public SparkTable(String datasetUri, Schema arrowSchema,
public SparkTable(String datasetUri,
String tableName, StructType sparkSchema) {
this.datasetUri = datasetUri;
this.arrowSchema = arrowSchema;
this.tableName = tableName;
this.sparkSchema = sparkSchema;
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) {
return null;
throw new UnsupportedOperationException("Lance Spark scan");
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new SparkWriteBuilder(datasetUri, arrowSchema, info);
return new SparkWriteBuilder(datasetUri, sparkSchema, info);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand All @@ -38,18 +37,20 @@
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.ArrowUtils;

/**
* Spark write.
*/
public class SparkWrite implements Write {
private final String datasetUri;
private final Schema arrowSchema;
private final StructType sparkSchema;
private final LogicalWriteInfo info;

SparkWrite(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) {
SparkWrite(String datasetUri, StructType sparkSchema, LogicalWriteInfo info) {
this.datasetUri = datasetUri;
this.arrowSchema = arrowSchema;
this.sparkSchema = sparkSchema;
this.info = info;
}

Expand All @@ -64,7 +65,7 @@ public StreamingWrite toStreaming() {
}

private WriterFactory createWriterFactory() {
return new WriterFactory(datasetUri, arrowSchema);
return new WriterFactory(datasetUri, sparkSchema);
}

private class BatchAppend extends BaseBatchWrite {
Expand Down Expand Up @@ -106,17 +107,17 @@ public String toString() {

private static class WriterFactory implements DataWriterFactory {
private final String datasetUri;
private final Schema arrowSchema;
private final StructType sparkSchema;

protected WriterFactory(String datasetUri, Schema arrowSchema) {
// Execute at Spark executor
protected WriterFactory(String datasetUri, StructType sparkSchema) {
// Everything passed to writer factory should be serializable
this.datasetUri = datasetUri;
this.arrowSchema = arrowSchema;
this.sparkSchema = sparkSchema;
}

@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
return new UnpartitionedDataWriter(datasetUri, arrowSchema);
return new UnpartitionedDataWriter(datasetUri, sparkSchema);
}
}

Expand All @@ -126,10 +127,11 @@ private static class UnpartitionedDataWriter implements DataWriter<InternalRow>
private final VectorSchemaRoot root;
private final ArrowWriter writer;

private UnpartitionedDataWriter(String datasetUri, Schema arrowSchema) {
// TODO(lu) maxRecordPerBatch, turn to batch write
private UnpartitionedDataWriter(String datasetUri, StructType sparkSchema) {
// TODO(lu) add Lance Spark configuration of maxRowsPerFragment?
this.datasetUri = datasetUri;
this.allocator = new RootAllocator(Long.MAX_VALUE);
Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, datasetUri, true, false);
root = VectorSchemaRoot.create(arrowSchema, allocator);
writer = ArrowWriter.create(root);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@

package com.lancedb.lance.spark.source;

import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;

/**
* Spark write builder.
*/
public class SparkWriteBuilder implements WriteBuilder {
private final String datasetUri;
private final Schema arrowSchema;
private final StructType sparkSchema;
private final LogicalWriteInfo info;

SparkWriteBuilder(String datasetUri, Schema arrowSchema, LogicalWriteInfo info) {
SparkWriteBuilder(String datasetUri, StructType sparkSchema, LogicalWriteInfo info) {
this.datasetUri = datasetUri;
this.arrowSchema = arrowSchema;
this.sparkSchema = sparkSchema;
this.info = info;
}

@Override
public Write build() {
return new SparkWrite(datasetUri, arrowSchema, info);
return new SparkWrite(datasetUri, sparkSchema, info);
}
}

0 comments on commit e906846

Please sign in to comment.