Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support spark3.x for connector #71

Merged
merged 13 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ jobs:

- name: Build with Maven
run: |
mvn -B package
mvn clean package -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2
mvn clean package -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4
mvn clean package -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0


- uses: codecov/codecov-action@v2
21 changes: 20 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,29 @@ jobs:
popd
popd

- name: Deploy release to Maven
- name: Deploy release for spark2.4 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4

- name: Deploy release for spark2.2 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2

- name: Deploy release for spark3.0 to Maven
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
21 changes: 20 additions & 1 deletion .github/workflows/snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,29 @@ jobs:
popd
popd

- name: Deploy SNAPSHOT to Sonatype
- name: Deploy SNAPSHOT for spark2.4 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector -am -Pscala-2.11 -Pspark-2.4

- name: Deploy SNAPSHOT for spark2.2 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_2.2 -am -Pscala-2.11 -Pspark-2.2

- name: Deploy SNAPSHOT for spark3.0 to Sonatype
uses: samuelmeuli/action-maven-publish@v1
with:
gpg_private_key: ${{ secrets.JAVA_GPG_PRIVATE_KEY }}
gpg_passphrase: ${{ secrets.JAVA_GPG_PASSPHRASE }}
nexus_username: ${{ secrets.OSSRH_USERNAME }}
nexus_password: ${{ secrets.OSSRH_TOKEN }}
maven_args: -pl nebula-spark-connector_3.0 -am -Pscala-2.12 -Pspark-3.0
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ object NebulaSparkReaderExample {
}

def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withMetaAddress("192.168.8.171:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
Expand All @@ -63,12 +62,10 @@ object NebulaSparkReaderExample {
}

def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")

val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withMetaAddress("192.168.8.171:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
Expand All @@ -77,7 +74,7 @@ object NebulaSparkReaderExample {
.withSpace("test")
.withLabel("knows")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,22 @@ object NebulaSparkWriterExample {
* if your withVidAsProp is true, then tag schema also should have property name: id
*/
def writeVertex(spark: SparkSession): Unit = {
LOG.info("start to write nebula vertices")
val df = spark.read.json("example/src/main/resources/vertex")
val df = spark.read.json("vertex")
df.show()

val config = getNebulaConnectionConfig()
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("192.168.8.171:9559")
.withGraphAddress("192.168.8.171:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withWriteMode(WriteMode.DELETE)
.withVidAsProp(false)
.withBatch(1000)
.build()
Expand All @@ -117,8 +123,7 @@ object NebulaSparkWriterExample {
* if your withRankAsProperty is true, then edge schema also should have property name: degree
*/
def writeEdge(spark: SparkSession): Unit = {
LOG.info("start to write nebula edges")
val df = spark.read.json("example/src/main/resources/edge")
val df = spark.read.json("edge")
df.show()
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

Expand Down
6 changes: 1 addition & 5 deletions nebula-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
<scalatest.version>3.2.3</scalatest.version>
<junit.version>4.13.1</junit.version>
<codec.version>1.13</codec.version>

<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.4</spark.version>
<scala.version>2.11.12</scala.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -57,7 +53,7 @@
<!-- scalatest -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-funsuite_2.11</artifactId>
<artifactId>scalatest-funsuite_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

import scala.collection.mutable.ListBuffer

class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
operaType: OperaType.Value)
extends Serializable
with Logging {
class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) extends Serializable {

import NebulaOptions._

def this(parameters: Map[String, String], operaType: OperaType.Value) =
this(CaseInsensitiveMap(parameters))(operaType)
this(CaseInsensitiveMap(parameters))

def this(hostAndPorts: String,
spaceName: String,
dataType: String,
label: String,
parameters: Map[String, String],
operaType: OperaType.Value) = {
parameters: Map[String, String]) = {
this(
CaseInsensitiveMap(
parameters ++ Map(
Expand All @@ -39,8 +35,9 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
NebulaOptions.TYPE -> dataType,
NebulaOptions.LABEL -> label
))
)(operaType)
)
}
val operaType = OperaType.withName(parameters(OPERATE_TYPE))

/**
* Return property with all options
Expand Down Expand Up @@ -104,21 +101,24 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
val label: String = parameters(LABEL)

/** read parameters */
var returnCols: String = _
var partitionNums: String = _
var noColumn: Boolean = _
var limit: Int = _
var ngql: String = _
var returnCols: String = _
var partitionNums: String = _
var noColumn: Boolean = _
var limit: Int = _
var pushDownFiltersEnabled: Boolean = _
var ngql: String = _
if (operaType == OperaType.READ) {
returnCols = parameters(RETURN_COLS)
noColumn = parameters.getOrElse(NO_COLUMN, false).toString.toBoolean
partitionNums = parameters(PARTITION_NUMBER)
limit = parameters.getOrElse(LIMIT, DEFAULT_LIMIT).toString.toInt
ngql = parameters.getOrElse(NGQL,EMPTY_STRING)
ngql = parameters.getOrElse(NGQL,EMPTY_STRING)
if(ngql!=EMPTY_STRING){
// TODO explore the pushDownFiltersEnabled parameter to users
pushDownFiltersEnabled = parameters.getOrElse(PUSHDOWN_FILTERS_ENABLE, false).toString.toBoolean
ngql = parameters.getOrElse(NGQL, EMPTY_STRING)
ngql = parameters.getOrElse(NGQL, EMPTY_STRING)
if (ngql != EMPTY_STRING) {
require(parameters.isDefinedAt(GRAPH_ADDRESS),
s"option $GRAPH_ADDRESS is required for ngql and can not be blank")
s"option $GRAPH_ADDRESS is required for ngql and can not be blank")
graphAddress = parameters(GRAPH_ADDRESS)
}
}
Expand Down Expand Up @@ -187,13 +187,11 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(

def getMetaAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
metaAddress
.split(",")
.foreach(hostPort => {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
})
for (hostPort <- metaAddress.split(",")) {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
}
hostPorts.toList
}

Expand All @@ -211,9 +209,6 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(

}

class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMap[String])
extends NebulaOptions(parameters)(OperaType.WRITE) {}

object NebulaOptions {

/** nebula common config */
Expand All @@ -237,14 +232,17 @@ object NebulaOptions {
val CA_SIGN_PARAM: String = "caSignParam"
val SELF_SIGN_PARAM: String = "selfSignParam"

val OPERATE_TYPE: String = "operateType"

/** read config */
val RETURN_COLS: String = "returnCols"
val NO_COLUMN: String = "noColumn"
val PARTITION_NUMBER: String = "partitionNumber"
val LIMIT: String = "limit"
val RETURN_COLS: String = "returnCols"
val NO_COLUMN: String = "noColumn"
val PARTITION_NUMBER: String = "partitionNumber"
val LIMIT: String = "limit"
val PUSHDOWN_FILTERS_ENABLE: String = "pushDownFiltersEnable"

/** read by ngql **/
val NGQL: String = "ngql"
val NGQL: String = "ngql"

/** write config */
val RATE_LIMIT: String = "rateLimit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ package com.vesoft.nebula.connector

import com.vesoft.nebula.PropertyType
import com.vesoft.nebula.client.graph.data.{DateTimeWrapper, DurationWrapper, TimeWrapper}
import com.vesoft.nebula.connector.nebula.MetaProvider
import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{
BooleanType,
DataType,
DataTypes,
DoubleType,
FloatType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType
}
import org.apache.spark.unsafe.types.UTF8String
import org.slf4j.LoggerFactory

import scala.collection.mutable.ListBuffer

object NebulaUtils {
private val LOG = LoggerFactory.getLogger(this.getClass)

Expand Down Expand Up @@ -156,4 +161,70 @@ object NebulaUtils {
s
}

/**
* return the dataset's schema. Schema includes configured cols in returnCols or includes all properties in nebula.
*/
def getSchema(nebulaOptions: NebulaOptions): StructType = {
val returnCols = nebulaOptions.getReturnCols
val noColumn = nebulaOptions.noColumn
val fields: ListBuffer[StructField] = new ListBuffer[StructField]
val metaProvider = new MetaProvider(
nebulaOptions.getMetaAddress,
nebulaOptions.timeout,
nebulaOptions.connectionRetry,
nebulaOptions.executionRetry,
nebulaOptions.enableMetaSSL,
nebulaOptions.sslSignType,
nebulaOptions.caSignParam,
nebulaOptions.selfSignParam
)

import scala.collection.JavaConverters._
var schemaCols: Seq[ColumnDef] = Seq()
val isVertex = DataTypeEnum.VERTEX.toString.equalsIgnoreCase(nebulaOptions.dataType)

// construct vertex or edge default prop
if (isVertex) {
fields.append(DataTypes.createStructField("_vertexId", DataTypes.StringType, false))
} else {
fields.append(DataTypes.createStructField("_srcId", DataTypes.StringType, false))
fields.append(DataTypes.createStructField("_dstId", DataTypes.StringType, false))
fields.append(DataTypes.createStructField("_rank", DataTypes.LongType, false))
}

var dataSchema: StructType = null
// read no column
if (noColumn) {
dataSchema = new StructType(fields.toArray)
return dataSchema
}
// get tag schema or edge schema
val schema = if (isVertex) {
metaProvider.getTag(nebulaOptions.spaceName, nebulaOptions.label)
} else {
metaProvider.getEdge(nebulaOptions.spaceName, nebulaOptions.label)
}

schemaCols = schema.columns.asScala

// read all columns
if (returnCols.isEmpty) {
schemaCols.foreach(columnDef => {
LOG.info(s"prop name ${new String(columnDef.getName)}, type ${columnDef.getType.getType} ")
fields.append(
DataTypes.createStructField(new String(columnDef.getName),
NebulaUtils.convertDataType(columnDef.getType),
true))
})
} else {
for (col: String <- returnCols) {
fields.append(
DataTypes
.createStructField(col, NebulaUtils.getColDataType(schemaCols.toList, col), true))
}
}
dataSchema = new StructType(fields.toArray)
dataSchema
}

}
Loading