Skip to content

Commit

Permalink
support spark3.x for connector (#71)
Browse files Browse the repository at this point in the history
* add connector for spark3.0

* add connector for spark3 & extract common code

* fix incompatible for spark

* update maven command

* sleep after create schema for test

* update scalatest plugin

* upadte default scala version

* update scala version

* fix options for writer

* update pom

* add spark version validate for spark3

* update action

* update match statement
  • Loading branch information
Nicole00 committed Feb 27, 2023
1 parent 305f980 commit f904dee
Show file tree
Hide file tree
Showing 57 changed files with 3,743 additions and 751 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ jobs:
- name: Build with Maven
run: |
mvn -B package
mvn clean package -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2
mvn clean package -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4
mvn clean package -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
- uses: codecov/codecov-action@v2
21 changes: 20 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,29 @@ jobs:
popd
popd
- name: Deploy release to Maven
- name: Deploy release for spark2.4 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4

- name: Deploy release for spark2.2 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2

- name: Deploy release for spark3.0 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
21 changes: 20 additions & 1 deletion .github/workflows/snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,29 @@ jobs:
popd
popd
- name: Deploy SNAPSHOT to Sonatype
- name: Deploy SNAPSHOT for spark2.4 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4

- name: Deploy SNAPSHOT for spark2.2 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2

- name: Deploy SNAPSHOT for spark3.0 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ object NebulaSparkReaderExample {
}

def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withMetaAddress("192.168.8.171:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
Expand All @@ -63,12 +62,10 @@ object NebulaSparkReaderExample {
}

def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")

val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withMetaAddress("192.168.8.171:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
Expand All @@ -77,7 +74,7 @@ object NebulaSparkReaderExample {
.withSpace("test")
.withLabel("knows")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,22 @@ object NebulaSparkWriterExample {
* if your withVidAsProp is true, then tag schema also should have property name: id
*/
def writeVertex(spark: SparkSession): Unit = {
LOG.info("start to write nebula vertices")
val df = spark.read.json("example/src/main/resources/vertex")
val df = spark.read.json("vertex")
df.show()

val config = getNebulaConnectionConfig()
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("192.168.8.171:9559")
.withGraphAddress("192.168.8.171:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withWriteMode(WriteMode.DELETE)
.withVidAsProp(false)
.withBatch(1000)
.build()
Expand All @@ -117,8 +123,7 @@ object NebulaSparkWriterExample {
* if your withRankAsProperty is true, then edge schema also should have property name: degree
*/
def writeEdge(spark: SparkSession): Unit = {
LOG.info("start to write nebula edges")
val df = spark.read.json("example/src/main/resources/edge")
val df = spark.read.json("edge")
df.show()
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

Expand Down
6 changes: 1 addition & 5 deletions nebula-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
<scalatest.version>3.2.3</scalatest.version>
<junit.version>4.13.1</junit.version>
<codec.version>1.13</codec.version>

<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.4</spark.version>
<scala.version>2.11.12</scala.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -57,7 +53,7 @@
<!-- scalatest -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-funsuite_2.11</artifactId>
<artifactId>scalatest-funsuite_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

import scala.collection.mutable.ListBuffer

class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
operaType: OperaType.Value)
extends Serializable
with Logging {
class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) extends Serializable {

import NebulaOptions._

def this(parameters: Map[String, String], operaType: OperaType.Value) =
this(CaseInsensitiveMap(parameters))(operaType)
this(CaseInsensitiveMap(parameters))

def this(hostAndPorts: String,
spaceName: String,
dataType: String,
label: String,
parameters: Map[String, String],
operaType: OperaType.Value) = {
parameters: Map[String, String]) = {
this(
CaseInsensitiveMap(
parameters ++ Map(
Expand All @@ -39,8 +35,9 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
NebulaOptions.TYPE -> dataType,
NebulaOptions.LABEL -> label
))
)(operaType)
)
}
val operaType = OperaType.withName(parameters(OPERATE_TYPE))

/**
* Return property with all options
Expand Down Expand Up @@ -104,21 +101,24 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
val label: String = parameters(LABEL)

/** read parameters */
var returnCols: String = _
var partitionNums: String = _
var noColumn: Boolean = _
var limit: Int = _
var ngql: String = _
var returnCols: String = _
var partitionNums: String = _
var noColumn: Boolean = _
var limit: Int = _
var pushDownFiltersEnabled: Boolean = _
var ngql: String = _
if (operaType == OperaType.READ) {
returnCols = parameters(RETURN_COLS)
noColumn = parameters.getOrElse(NO_COLUMN, false).toString.toBoolean
partitionNums = parameters(PARTITION_NUMBER)
limit = parameters.getOrElse(LIMIT, DEFAULT_LIMIT).toString.toInt
ngql = parameters.getOrElse(NGQL,EMPTY_STRING)
ngql = parameters.getOrElse(NGQL,EMPTY_STRING)
if(ngql!=EMPTY_STRING){
// TODO explore the pushDownFiltersEnabled parameter to users
pushDownFiltersEnabled = parameters.getOrElse(PUSHDOWN_FILTERS_ENABLE, false).toString.toBoolean
ngql = parameters.getOrElse(NGQL, EMPTY_STRING)
ngql = parameters.getOrElse(NGQL, EMPTY_STRING)
if (ngql != EMPTY_STRING) {
require(parameters.isDefinedAt(GRAPH_ADDRESS),
s"option $GRAPH_ADDRESS is required for ngql and can not be blank")
s"option $GRAPH_ADDRESS is required for ngql and can not be blank")
graphAddress = parameters(GRAPH_ADDRESS)
}
}
Expand Down Expand Up @@ -187,13 +187,11 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(

def getMetaAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
metaAddress
.split(",")
.foreach(hostPort => {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
})
for (hostPort <- metaAddress.split(",")) {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
}
hostPorts.toList
}

Expand All @@ -211,9 +209,6 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(

}

class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMap[String])
extends NebulaOptions(parameters)(OperaType.WRITE) {}

object NebulaOptions {

/** nebula common config */
Expand All @@ -237,14 +232,17 @@ object NebulaOptions {
val CA_SIGN_PARAM: String = "caSignParam"
val SELF_SIGN_PARAM: String = "selfSignParam"

val OPERATE_TYPE: String = "operateType"

/** read config */
val RETURN_COLS: String = "returnCols"
val NO_COLUMN: String = "noColumn"
val PARTITION_NUMBER: String = "partitionNumber"
val LIMIT: String = "limit"
val RETURN_COLS: String = "returnCols"
val NO_COLUMN: String = "noColumn"
val PARTITION_NUMBER: String = "partitionNumber"
val LIMIT: String = "limit"
val PUSHDOWN_FILTERS_ENABLE: String = "pushDownFiltersEnable"

/** read by ngql **/
val NGQL: String = "ngql"
val NGQL: String = "ngql"

/** write config */
val RATE_LIMIT: String = "rateLimit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ package com.vesoft.nebula.connector

import com.vesoft.nebula.PropertyType
import com.vesoft.nebula.client.graph.data.{DateTimeWrapper, DurationWrapper, TimeWrapper}
import com.vesoft.nebula.connector.nebula.MetaProvider
import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{
BooleanType,
DataType,
DataTypes,
DoubleType,
FloatType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType
}
import org.apache.spark.unsafe.types.UTF8String
import org.slf4j.LoggerFactory

import scala.collection.mutable.ListBuffer

object NebulaUtils {
private val LOG = LoggerFactory.getLogger(this.getClass)

Expand Down Expand Up @@ -156,4 +161,70 @@ object NebulaUtils {
s
}

/**
* return the dataset's schema. Schema includes configured cols in returnCols or includes all properties in nebula.
*/
def getSchema(nebulaOptions: NebulaOptions): StructType = {
val returnCols = nebulaOptions.getReturnCols
val noColumn = nebulaOptions.noColumn
val fields: ListBuffer[StructField] = new ListBuffer[StructField]
val metaProvider = new MetaProvider(
nebulaOptions.getMetaAddress,
nebulaOptions.timeout,
nebulaOptions.connectionRetry,
nebulaOptions.executionRetry,
nebulaOptions.enableMetaSSL,
nebulaOptions.sslSignType,
nebulaOptions.caSignParam,
nebulaOptions.selfSignParam
)

import scala.collection.JavaConverters._
var schemaCols: Seq[ColumnDef] = Seq()
val isVertex = DataTypeEnum.VERTEX.toString.equalsIgnoreCase(nebulaOptions.dataType)

// construct vertex or edge default prop
if (isVertex) {
fields.append(DataTypes.createStructField("_vertexId", DataTypes.StringType, false))
} else {
fields.append(DataTypes.createStructField("_srcId", DataTypes.StringType, false))
fields.append(DataTypes.createStructField("_dstId", DataTypes.StringType, false))
fields.append(DataTypes.createStructField("_rank", DataTypes.LongType, false))
}

var dataSchema: StructType = null
// read no column
if (noColumn) {
dataSchema = new StructType(fields.toArray)
return dataSchema
}
// get tag schema or edge schema
val schema = if (isVertex) {
metaProvider.getTag(nebulaOptions.spaceName, nebulaOptions.label)
} else {
metaProvider.getEdge(nebulaOptions.spaceName, nebulaOptions.label)
}

schemaCols = schema.columns.asScala

// read all columns
if (returnCols.isEmpty) {
schemaCols.foreach(columnDef => {
LOG.info(s"prop name ${new String(columnDef.getName)}, type ${columnDef.getType.getType} ")
fields.append(
DataTypes.createStructField(new String(columnDef.getName),
NebulaUtils.convertDataType(columnDef.getType),
true))
})
} else {
for (col: String <- returnCols) {
fields.append(
DataTypes
.createStructField(col, NebulaUtils.getColDataType(schemaCols.toList, col), true))
}
}
dataSchema = new StructType(fields.toArray)
dataSchema
}

}
Loading

0 comments on commit f904dee

Please sign in to comment.