Skip to content

Commit

Permalink
Add spark create, load, part of batch write
Browse files Browse the repository at this point in the history
  • Loading branch information
LuQQiu committed Apr 10, 2024
1 parent 988d1c6 commit da7ac2c
Show file tree
Hide file tree
Showing 9 changed files with 712 additions and 0 deletions.
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

<modules>
<module>core</module>
<module>spark</module>
</modules>

<dependencyManagement>
Expand Down
22 changes: 22 additions & 0 deletions java/spark/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.lancedb</groupId>
<artifactId>lance-parent</artifactId>
<version>0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>lance-spark</artifactId>
<name>Lance Spark Connector</name>
<packaging>pom</packaging>

<modules>
<module>v3.5</module>
</modules>
</project>
36 changes: 36 additions & 0 deletions java/spark/v3.5/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.lancedb</groupId>
<artifactId>lance-spark</artifactId>
<version>0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>lance-spark-3.5_2.12</artifactId>
<name>Lance Connector with Spark v3.5 scala 2.12</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lance-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lancedb.lance.spark;

import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.spark.source.SparkTable;
import java.io.IOException;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.Map;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.ArrowUtils;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* Lance Spark Catalog.
*/
public class SparkCatalog implements TableCatalog {
private static final BufferAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
private Path warehouse = null;

static BufferAllocator newChildAllocator(String name, long initialReservation,
long maxAllocation) {
return rootAllocator.newChildAllocator(name, initialReservation, maxAllocation);
}

@Override
public Identifier[] listTables(String[] strings) throws NoSuchNamespaceException {
return new Identifier[0];
}

@Override
public Table loadTable(Identifier identifier) throws NoSuchTableException {
String datasetUri = warehouse.resolve(identifier.name()).toString();
try (BufferAllocator allocator = newChildAllocator(
"load table reader for Lance", 0, Long.MAX_VALUE);
Dataset dataset = Dataset.open(datasetUri, allocator)) {
Schema schema = dataset.getSchema();
return new SparkTable(datasetUri, schema, identifier.name(), ArrowUtils.fromArrowSchema(
schema));
} catch (RuntimeException | IOException e) {
throw new NoSuchTableException(identifier);
}
}

@Override
public Table createTable(Identifier identifier, StructType structType,
Transform[] transforms, Map<String, String> map) {
String datasetUri = warehouse.resolve(identifier.name()).toString();
Schema arrowSchema = ArrowUtils.toArrowSchema(
structType, ZoneId.systemDefault().getId(), true, false);
try (BufferAllocator allocator = newChildAllocator(
"create table loader for Lance", 0, Long.MAX_VALUE)) {
Dataset.create(allocator, datasetUri, arrowSchema,
new WriteParams.Builder().build()).close();
return new SparkTable(datasetUri, arrowSchema, identifier.name(), structType);
}
}

@Override
public Table alterTable(Identifier identifier, TableChange... tableChanges)
throws NoSuchTableException {
return null;
}

@Override
public boolean dropTable(Identifier identifier) {
return false;
}

@Override
public void renameTable(Identifier identifier, Identifier identifier1)
throws NoSuchTableException, TableAlreadyExistsException {
}

@Override
public void initialize(String s, CaseInsensitiveStringMap caseInsensitiveStringMap) {
this.warehouse = Path.of(caseInsensitiveStringMap.get("warehouse"));
}

@Override
public String name() {
return "lance";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lancedb.lance.spark;

import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* Spark schema utils.
*/
public class SparkSchemaUtils {
/**
* Convert Arrow Schema to Spark struct type.
*
* @param arrowSchema arrow schema
* @return Spark struct type
*/
public static StructType convert(Schema arrowSchema) {
List<StructField> sparkFields = new ArrayList<>();
for (Field field : arrowSchema.getFields()) {
StructField sparkField = new StructField(field.getName(),
convert(field.getFieldType()), field.isNullable(), Metadata.empty());
sparkFields.add(sparkField);
}
return new StructType(sparkFields.toArray(new StructField[0]));
}

/**
* Convert Spark struct type to Arrow schema.
*
* @param structType spark struct type
* @return Arrow schema
*/
public static Schema convert(StructType structType) {
List<Field> arrowFields = new ArrayList<>();
for (StructField field : structType.fields()) {
arrowFields.add(new Field(field.name(),
new FieldType(field.nullable(), convert(field.dataType()), null, null),
null));
}
return new Schema(arrowFields);
}

private static ArrowType convert(DataType dataType) {
if (dataType instanceof IntegerType) {
return new ArrowType.Int(32, true);
} else if (dataType instanceof LongType) {
return new ArrowType.Int(64, true);
} else if (dataType instanceof StringType) {
return new ArrowType.Utf8();
} else if (dataType instanceof DoubleType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
} else if (dataType instanceof FloatType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
} else {
throw new UnsupportedOperationException("Unsupported Spark type: " + dataType);
}
}

private static DataType convert(org.apache.arrow.vector.types.pojo.FieldType fieldType) {
ArrowType arrowType = fieldType.getType();
if (arrowType instanceof ArrowType.Int) {
ArrowType.Int intType = (ArrowType.Int) arrowType;
if (intType.getBitWidth() == 32) {
return DataTypes.IntegerType;
} else if (intType.getBitWidth() == 64) {
return DataTypes.LongType;
}
} else if (arrowType instanceof ArrowType.Utf8) {
return DataTypes.StringType;
} else if (arrowType instanceof ArrowType.FloatingPoint) {
ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType;
if (fpType.getPrecision() == FloatingPointPrecision.SINGLE) {
return DataTypes.FloatType;
} else if (fpType.getPrecision() == FloatingPointPrecision.DOUBLE) {
return DataTypes.DoubleType;
}
}
throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType);
}

private SparkSchemaUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lancedb.lance.spark.source;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* Lance Spark Table.
*/
public class SparkTable implements SupportsRead, SupportsWrite {
private static final Set<TableCapability> CAPABILITIES =
ImmutableSet.of(
TableCapability.BATCH_WRITE);

// Lance parameters
private final String datasetUri;
private final Schema arrowSchema;
// Spark parameters
private final String tableName;
private final StructType sparkSchema;

/**
* Creates a spark table.
*
* @param datasetUri the lance dataset uri
* @param arrowSchema arrow schema
* @param tableName table name
* @param sparkSchema spark struct type
*/
public SparkTable(String datasetUri, Schema arrowSchema,
String tableName, StructType sparkSchema) {
this.datasetUri = datasetUri;
this.arrowSchema = arrowSchema;
this.tableName = tableName;
this.sparkSchema = sparkSchema;
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) {
return null;
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new SparkWriteBuilder(datasetUri, arrowSchema, info);
}

@Override
public String name() {
return this.tableName;
}

@Override
public StructType schema() {
return this.sparkSchema;
}

@Override
public Set<TableCapability> capabilities() {
return CAPABILITIES;
}
}

0 comments on commit da7ac2c

Please sign in to comment.