Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running algos on subgraph #56

Merged
merged 3 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
object ReadData {

/**
* read edge data from local csv and apply clustering coefficient
* livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz
*
* The livejournal data is put to hdfs, the path is
* hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt
*/
* read edge data from local csv and apply clustering coefficient
* livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz
*
* The livejournal data is put to hdfs, the path is
* hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt
*/
def readLiveJournalData(spark: SparkSession): DataFrame = {
val df = spark.sparkContext.textFile(
"hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt")
Expand All @@ -30,14 +30,14 @@ object ReadData {

val schema = StructType(
List(StructField("src", StringType, nullable = false),
StructField("dst", StringType, nullable = true)))
StructField("dst", StringType, nullable = true)))
val edgeDF = spark.sqlContext.createDataFrame(dd, schema)
edgeDF
}

/**
* read edge data from csv
*/
* read edge data from csv
*/
def readCsvData(spark: SparkSession): DataFrame = {
val df = spark.read
.option("header", true)
Expand All @@ -47,9 +47,9 @@ object ReadData {
}

/**
* read edge data from csv
* the data has string type id
*/
* read edge data from csv
* the data has string type id
*/
def readStringCsvData(spark: SparkSession): DataFrame = {
val df = spark.read
.option("header", true)
Expand All @@ -59,8 +59,8 @@ object ReadData {
}

/**
* read edge data from Nebula
*/
* read edge data from Nebula
*/
def readNebulaData(spark: SparkSession): DataFrame = {
val config =
NebulaConnectionConfig
Expand All @@ -80,4 +80,29 @@ object ReadData {
val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
df
}

/**
* read edge data from Nebula by NGQL
*/
def readNebulaDataByNgql(spark: SparkSession): DataFrame = {
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("knows")
.withNoColumn(true)
.withLimit(2000)
.withNgql(" GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships ;")
.build()
val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDfByNgql()
df
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object TriangleCountExample {

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
// val nebulaDFbyNgql = ReadData.readNebulaDataByNgql(spark)
val journalDF = ReadData.readLiveJournalData(spark)

graphTriangleCount(spark, journalDF)
Expand Down
28 changes: 15 additions & 13 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,22 @@
weightCols: ["start_year"]
}

# algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
write:{
# Nebula graphd server address, multiple addresses are split by English comma
# Nebula Graph reader by ngql
nebula-ngql: {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The options of data.source should be nebula,nebula_ngql,csv,text

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is nebula-ngql tag in in an appropriate location?
The nebula tag cannot be close by brace。

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the nebula.write ?
I think nebula-ngql tag is an option of data.source.

# algo's data source from Nebula by ngql
read: {
# Nebula metad server address, multiple addresses are split by English comma,graphAddress must be set
metaAddress: "127.0.0.1:9559"
graphAddress: "127.0.0.1:9669"
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "127.0.0.1:9559,127.0.0.1:9560"
user:root
pswd:nebula
# Nebula space name
space:nb
# Nebula tag name, the algorithm result will be write into this tag
tag:pagerank
# algorithm result is insert into new tag or update to original tag. type: insert/update
type:insert
# Nebula space
space: nb
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["serve"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: ["start_year"]
# ngql for subgraph , return with edge
ngql: "GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships;"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* This object is the entry of all graph algorithms.
*
* How to use this tool to run algorithm:
* 1. Configure application.conf file.
* 2. Make sure your environment has installed spark and started spark service.
* 3. Submit nebula algorithm application using this command:
* spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf
*/
* This object is the entry of all graph algorithms.
*
* How to use this tool to run algorithm:
* 1. Configure application.conf file.
* 2. Make sure your environment has installed spark and started spark service.
* 3. Submit nebula algorithm application using this command:
* spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf
*/
object Main {

private val LOGGER = Logger.getLogger(this.getClass)
Expand Down Expand Up @@ -102,12 +102,12 @@ object Main {
}

/**
* create data from datasource
*
* @param spark
* @param configs
* @return DataFrame
*/
* create data from datasource
*
* @param spark
* @param configs
* @return DataFrame
*/
private[this] def createDataSource(spark: SparkSession,
configs: Configs,
partitionNum: String): DataFrame = {
Expand All @@ -117,6 +117,10 @@ object Main {
val reader = new NebulaReader(spark, configs, partitionNum)
reader.read()
}
case "nebula-ngql" => {
val reader = new NebulaReader(spark, configs, partitionNum)
reader.readByNqgl()
}
case "csv" => {
val reader = new CsvReader(spark, configs, partitionNum)
reader.read()
Expand All @@ -130,13 +134,13 @@ object Main {
}

/**
* execute algorithms
* @param spark
* @param algoName
* @param configs
* @param dataSet
* @return DataFrame
*/
* execute algorithms
* @param spark
* @param algoName
* @param configs
* @param dataSet
* @return DataFrame
*/
private[this] def executeAlgorithm(spark: SparkSession,
algoName: String,
configs: Configs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,47 @@ object NebulaConfigEntry {
if (!config.hasPath("nebula")) {
return NebulaConfigEntry(NebulaReadConfigEntry(), NebulaWriteConfigEntry())
}
val nebulaConfig = config.getConfig("nebula")
var nebulaConfig: Config = ConfigFactory.empty()
if(nebulaConfig.hasPath("nebula")){
nebulaConfig = config.getConfig("nebula")
}else if(nebulaConfig.hasPath("nebula-ngql")){
nebulaConfig = config.getConfig("nebula-ngql")
}


val readMetaAddress = nebulaConfig.getString("read.metaAddress")
val readSpace = nebulaConfig.getString("read.space")
val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList
val readSpace = nebulaConfig.getString("read.space")
val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList
val readWeightCols = if (nebulaConfig.hasPath("read.weightCols")) {
nebulaConfig.getStringList("read.weightCols").asScala.toList
} else {
List()
}
val readConfigEntry =
NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols)
var readConfigEntry: NebulaReadConfigEntry = NebulaReadConfigEntry()
if (nebulaConfig.hasPath("read.ngql")) {
val ngal = nebulaConfig.getString("read.ngql")
val graphAddress = nebulaConfig.getString("read.graphAddress")
readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols, ngal, graphAddress)
} else {
readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols)
}


val graphAddress = nebulaConfig.getString("write.graphAddress")
val graphAddress = nebulaConfig.getString("write.graphAddress")
val writeMetaAddress = nebulaConfig.getString("write.metaAddress")
val user = nebulaConfig.getString("write.user")
val pswd = nebulaConfig.getString("write.pswd")
val writeSpace = nebulaConfig.getString("write.space")
val writeTag = nebulaConfig.getString("write.tag")
val writeType = nebulaConfig.getString("write.type")
val user = nebulaConfig.getString("write.user")
val pswd = nebulaConfig.getString("write.pswd")
val writeSpace = nebulaConfig.getString("write.space")
val writeTag = nebulaConfig.getString("write.tag")
val writeType = nebulaConfig.getString("write.type")
val writeConfigEntry =
NebulaWriteConfigEntry(graphAddress,
writeMetaAddress,
user,
pswd,
writeSpace,
writeTag,
writeType)
writeMetaAddress,
user,
pswd,
writeSpace,
writeTag,
writeType)
NebulaConfigEntry(readConfigEntry, writeConfigEntry)
}
}
Expand Down Expand Up @@ -203,7 +216,9 @@ case class NebulaConfigEntry(readConfigEntry: NebulaReadConfigEntry,
case class NebulaReadConfigEntry(address: String = "",
space: String = "",
labels: List[String] = List(),
weightCols: List[String] = List()) {
weightCols: List[String] = List(),
graphAddress: String = "",
ngql: String = "") {
override def toString: String = {
s"NebulaReadConfigEntry: " +
s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " +
Expand Down
Loading