From 23658811700ce0ab6234d8b0a47ade4fbb4a4080 Mon Sep 17 00:00:00 2001 From: zakiweng Date: Tue, 9 Nov 2021 14:53:07 +0800 Subject: [PATCH] format some files --- .../nebula/algorithm/lib/ClosenessAlgo.scala | 34 +-- .../nebula/algorithm/lib/HanpAlgo.scala | 85 ++++--- .../nebula/algorithm/lib/Node2vecAlgo.scala | 229 +++++++++++------- .../algorithm/lib/ClosenessAlgoSuite.scala | 9 +- .../nebula/algorithm/lib/HanpSuite.scala | 10 +- .../algorithm/lib/Node2vecAlgoSuite.scala | 35 ++- 6 files changed, 230 insertions(+), 172 deletions(-) diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala index 2b0f8c1..27a1bab 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala @@ -15,27 +15,28 @@ import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object ClosenessAlgo { - private val LOGGER = Logger.getLogger(this.getClass) + private val LOGGER = Logger.getLogger(this.getClass) val ALGORITHM: String = "Closeness" type SPMap = Map[VertexId, Double] private def makeMap(x: (VertexId, Double)*) = Map(x: _*) - private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { case (v, d) => v -> (d + weight) } + private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { + case (v, d) => v -> (d + weight) + } private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = { - (spmap1.keySet ++ spmap2.keySet).map { - k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue)) + (spmap1.keySet ++ spmap2.keySet).map { k => + k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue)) }(collection.breakOut) } + /** - * run the Closeness algorithm for nebula graph - */ - def apply(spark: SparkSession, - dataset: Dataset[Row], - hasWeight:Boolean):DataFrame={ + * run the Closeness algorithm for nebula graph + */ + def apply(spark: SparkSession, dataset: Dataset[Row], hasWeight: Boolean): DataFrame = { val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) - val closenessRDD = execute(graph) + val closenessRDD = execute(graph) val schema = StructType( List( StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), @@ -46,12 +47,11 @@ object ClosenessAlgo { } /** - * execute Closeness algorithm - */ - def execute(graph: Graph[None.type, Double]):RDD[Row]={ + * execute Closeness algorithm + */ + def execute(graph: Graph[None.type, Double]): RDD[Row] = { val spGraph = graph.mapVertices((vid, _) => makeMap(vid -> 0.0)) - val initialMessage = makeMap() def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { @@ -63,15 +63,15 @@ object ClosenessAlgo { if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } - val spsGraph=Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) + val spsGraph = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) val closenessRDD = spsGraph.vertices.map(vertex => { - var dstNum = 0 + var dstNum = 0 var dstDistanceSum = 0.0 for (distance <- vertex._2.values) { dstNum += 1 dstDistanceSum += distance } - Row(vertex._1,(dstNum - 1) / dstDistanceSum) + Row(vertex._1, (dstNum - 1) / dstDistanceSum) }) closenessRDD } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala index fa5b067..9246716 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala @@ -14,19 +14,26 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +/** + * The implementation of the algorithm refers to paper `Towards real-time community detection in large networks`. + */ object HanpAlgo { val ALGORITHM: String = "Hanp" /** - * run the Hanp algorithm for nebula graph - */ + * run the Hanp algorithm for nebula graph + */ def apply(spark: SparkSession, dataset: Dataset[Row], hanpConfig: HanpConfig, - hasWeight:Boolean, - preferences:RDD[(VertexId,Double)]=null):DataFrame={ + hasWeight: Boolean, + preferences: RDD[(VertexId, Double)] = null): DataFrame = { val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) - val hanpResultRDD = execute(graph,hanpConfig.hopAttenuation,hanpConfig.maxIter,hanpConfig.preference,preferences) + val hanpResultRDD = execute(graph, + hanpConfig.hopAttenuation, + hanpConfig.maxIter, + hanpConfig.preference, + preferences) val schema = StructType( List( StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), @@ -37,51 +44,59 @@ object HanpAlgo { } /** - * execute Hanp algorithm - */ + * execute Hanp algorithm + */ def execute(graph: Graph[None.type, Double], - hopAttenuation:Double, + hopAttenuation: Double, maxIter: Int, - preference:Double=1.0, - preferences:RDD[(VertexId,Double)]=null):RDD[Row]={ - var hanpGraph: Graph[(VertexId, Double, Double), Double]=null - if(preferences==null){ - hanpGraph=graph.mapVertices((vertexId,_)=>(vertexId,preference,1.0)) - }else{ - hanpGraph=graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {(vertexId,vertexPreference.getOrElse(preference),1.0)}) + preference: Double = 1.0, + preferences: RDD[(VertexId, Double)] = null): RDD[Row] = { + var hanpGraph: Graph[(VertexId, Double, Double), Double] = null + if (preferences == null) { + hanpGraph = graph.mapVertices((vertexId, _) => (vertexId, preference, 1.0)) + } else { + hanpGraph = graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => { + (vertexId, vertexPreference.getOrElse(preference), 1.0) + }) } - def sendMessage(e: EdgeTriplet[(VertexId,Double,Double), Double]): Iterator[(VertexId, Map[VertexId, (Double,Double)])] = { - if(e.srcAttr._3>0 && e.dstAttr._3>0){ + def sendMessage(e: EdgeTriplet[(VertexId, Double, Double), Double]) + : Iterator[(VertexId, Map[VertexId, (Double, Double)])] = { + if (e.srcAttr._3 > 0 && e.dstAttr._3 > 0) { Iterator( - (e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))), - (e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr))) + (e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3, e.srcAttr._2 * e.srcAttr._3 * e.attr))), + (e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3, e.dstAttr._2 * e.dstAttr._3 * e.attr))) ) - }else if(e.srcAttr._3>0){ - Iterator((e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr)))) - }else if(e.dstAttr._3>0){ - Iterator((e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr)))) - }else{ + } else if (e.srcAttr._3 > 0) { + Iterator( + (e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3, e.srcAttr._2 * e.srcAttr._3 * e.attr)))) + } else if (e.dstAttr._3 > 0) { + Iterator( + (e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3, e.dstAttr._2 * e.dstAttr._3 * e.attr)))) + } else { Iterator.empty } } - def mergeMessage(count1: Map[VertexId, (Double,Double)], count2: Map[VertexId, (Double,Double)]) - : Map[VertexId, (Double,Double)] = { + def mergeMessage(count1: Map[VertexId, (Double, Double)], + count2: Map[VertexId, (Double, Double)]): Map[VertexId, (Double, Double)] = { (count1.keySet ++ count2.keySet).map { i => - val count1Val = count1.getOrElse(i, (0.0,0.0)) - val count2Val = count2.getOrElse(i, (0.0,0.0)) - i -> (Math.max(count1Val._1,count2Val._1),count1Val._2+count2Val._2) + val count1Val = count1.getOrElse(i, (0.0, 0.0)) + val count2Val = count2.getOrElse(i, (0.0, 0.0)) + i -> (Math.max(count1Val._1, count2Val._1), count1Val._2 + count2Val._2) }(collection.breakOut) } - def vertexProgram(vid: VertexId, attr: (VertexId,Double,Double), message: Map[VertexId, (Double,Double)]): (VertexId,Double,Double) = { + def vertexProgram(vid: VertexId, + attr: (VertexId, Double, Double), + message: Map[VertexId, (Double, Double)]): (VertexId, Double, Double) = { if (message.isEmpty) { attr } else { - val maxMessage=message.maxBy(_._2._2) - (maxMessage._1,attr._2,maxMessage._2._1-hopAttenuation) + val maxMessage = message.maxBy(_._2._2) + (maxMessage._1, attr._2, maxMessage._2._1 - hopAttenuation) } } - val initialMessage = Map[VertexId, (Double,Double)]() - val hanpResultGraph=hanpGraph.pregel(initialMessage,maxIter)(vertexProgram,sendMessage,mergeMessage) - hanpResultGraph.vertices.map(vertex=>Row(vertex._1,vertex._2._1)) + val initialMessage = Map[VertexId, (Double, Double)]() + val hanpResultGraph = + hanpGraph.pregel(initialMessage, maxIter)(vertexProgram, sendMessage, mergeMessage) + hanpResultGraph.vertices.map(vertex => Row(vertex._1, vertex._2._1)) } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala index 05aca2d..1a26943 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgo.scala @@ -21,21 +21,27 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Try case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)], - var path: Array[Long] = Array.empty[Long]) extends Serializable + var path: Array[Long] = Array.empty[Long]) + extends Serializable case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long], var J: Array[Int] = Array.empty[Int], - var q: Array[Double] = Array.empty[Double]) extends Serializable + var q: Array[Double] = Array.empty[Double]) + extends Serializable +/** + * The implementation of the algorithm refers to the code implemented by the author of the paper. + * Here is the code url https://github.com/aditya-grover/node2vec.git. + */ object Node2vecAlgo { - val ALGORITHM: String = "Node2vec" - var node2vecConfig:Node2vecConfig = _ - var context:SparkContext = _ - var indexedEdges: RDD[Edge[EdgeAttr]] = _ - var indexedNodes: RDD[(VertexId, NodeAttr)] = _ - var graph: Graph[NodeAttr, EdgeAttr] = _ + val ALGORITHM: String = "Node2vec" + var node2vecConfig: Node2vecConfig = _ + var context: SparkContext = _ + var indexedEdges: RDD[Edge[EdgeAttr]] = _ + var indexedNodes: RDD[(VertexId, NodeAttr)] = _ + var graph: Graph[NodeAttr, EdgeAttr] = _ var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null - var vectors:Map[String,Array[Float]] = _ + var vectors: Map[String, Array[Float]] = _ lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { Array( (srcId, Array((dstId, weight))), @@ -54,16 +60,17 @@ object Node2vecAlgo { val q = Array.fill(K)(0.0) val smaller = new ArrayBuffer[Int]() - val larger = new ArrayBuffer[Int]() + val larger = new ArrayBuffer[Int]() val sum = nodeWeights.map(_._2).sum - nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) => - q(i) = K * weight / sum - if (q(i) < 1.0) { - smaller.append(i) - } else { - larger.append(i) - } + nodeWeights.zipWithIndex.foreach { + case ((nodeId, weight), i) => + q(i) = K * weight / sum + if (q(i) < 1.0) { + smaller.append(i) + } else { + larger.append(i) + } } while (smaller.nonEmpty && larger.nonEmpty) { @@ -78,47 +85,64 @@ object Node2vecAlgo { (J, q) } - def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = { - val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) => - var unnormProb = weight / q - if (srcId == dstNeighborId) unnormProb = weight / p - else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight + def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)( + srcId: Long, + srcNeighbors: Array[(Long, Double)], + dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = { + val neighbors_ = dstNeighbors.map { + case (dstNeighborId, weight) => + var unnormProb = weight / q + if (srcId == dstNeighborId) unnormProb = weight / p + else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight - (dstNeighborId, unnormProb) + (dstNeighborId, unnormProb) } setupAlias(neighbors_) } def drawAlias(J: Array[Int], q: Array[Double]): Int = { - val K = J.length + val K = J.length val kk = math.floor(math.random * K).toInt if (math.random < q(kk)) kk else J(kk) } - def load(graph:Graph[None.type, Double]):this.type ={ + def load(graph: Graph[None.type, Double]): this.type = { val bcMaxDegree = context.broadcast(node2vecConfig.degree) val bcEdgeCreator = node2vecConfig.directed match { - case true => context.broadcast(createDirectedEdge) + case true => context.broadcast(createDirectedEdge) case false => context.broadcast(createUndirectedEdge) } - indexedNodes = graph.edges.flatMap { row => - bcEdgeCreator.value.apply(row.srcId, row.dstId,row.attr) - }.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) => - var neighbors_ = neighbors - if (neighbors_.length > bcMaxDegree.value) { - neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value) + indexedNodes = graph.edges + .flatMap { row => + bcEdgeCreator.value.apply(row.srcId, row.dstId, row.attr) } + .reduceByKey(_ ++ _) + .map { + case (nodeId, neighbors: Array[(VertexId, Double)]) => + var neighbors_ = neighbors + if (neighbors_.length > bcMaxDegree.value) { + neighbors_ = neighbors + .sortWith { case (left, right) => left._2 > right._2 } + .slice(0, bcMaxDegree.value) + } - (nodeId, NodeAttr(neighbors = neighbors_.distinct)) - }.repartition(node2vecConfig.dataNumPartition).cache + (nodeId, NodeAttr(neighbors = neighbors_.distinct)) + } + .repartition(node2vecConfig.dataNumPartition) + .cache - indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) => - clickNode.neighbors.map { case (dstId, weight) => - Edge(srcId, dstId, EdgeAttr()) + indexedEdges = indexedNodes + .flatMap { + case (srcId, clickNode) => + clickNode.neighbors.map { + case (dstId, weight) => + Edge(srcId, dstId, EdgeAttr()) + } } - }.repartition(node2vecConfig.dataNumPartition).cache + .repartition(node2vecConfig.dataNumPartition) + .cache this } def initTransitionProb(): this.type = { @@ -126,64 +150,77 @@ object Node2vecAlgo { val bcQ = context.broadcast(node2vecConfig.q) graph = Graph(indexedNodes, indexedEdges) - .mapVertices[NodeAttr] { case (vertexId, clickNode) => - val (j, q) = this.setupAlias(clickNode.neighbors) - val nextNodeIndex = this.drawAlias(j, q) - clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1) + .mapVertices[NodeAttr] { + case (vertexId, clickNode) => + val (j, q) = this.setupAlias(clickNode.neighbors) + val nextNodeIndex = this.drawAlias(j, q) + clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1) - clickNode + clickNode } .mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] => - val (j, q) = this.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors) + val (j, q) = this.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, + edgeTriplet.srcAttr.neighbors, + edgeTriplet.dstAttr.neighbors) edgeTriplet.attr.J = j edgeTriplet.attr.q = q edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1) edgeTriplet.attr - }.cache + } + .cache this } def randomWalk(): this.type = { - val edge2attr = graph.triplets.map { edgeTriplet => - (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr) - }.repartition(node2vecConfig.dataNumPartition).cache + val edge2attr = graph.triplets + .map { edgeTriplet => + (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr) + } + .repartition(node2vecConfig.dataNumPartition) + .cache edge2attr.first for (iter <- 0 until node2vecConfig.numWalks) { var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null - var randomWalk = graph.vertices.map { case (nodeId, clickNode) => - val pathBuffer = new ArrayBuffer[Long]() - pathBuffer.append(clickNode.path:_*) - (nodeId, pathBuffer) + var randomWalk = graph.vertices.map { + case (nodeId, clickNode) => + val pathBuffer = new ArrayBuffer[Long]() + pathBuffer.append(clickNode.path: _*) + (nodeId, pathBuffer) }.cache var activeWalks = randomWalk.first graph.unpersist(blocking = false) graph.edges.unpersist(blocking = false) for (walkCount <- 0 until node2vecConfig.walkLength) { prevWalk = randomWalk - randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) => - val prevNodeId = pathBuffer(pathBuffer.length - 2) - val currentNodeId = pathBuffer.last - - (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer)) - }.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) => - try { - val nextNodeIndex = this.drawAlias(attr.J, attr.q) - val nextNodeId = attr.dstNeighbors(nextNodeIndex) - pathBuffer.append(nextNodeId) - - (srcNodeId, pathBuffer) - } catch { - case e: Exception => throw new RuntimeException(e.getMessage) + randomWalk = randomWalk + .map { + case (srcNodeId, pathBuffer) => + val prevNodeId = pathBuffer(pathBuffer.length - 2) + val currentNodeId = pathBuffer.last + + (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer)) + } + .join(edge2attr) + .map { + case (edge, ((srcNodeId, pathBuffer), attr)) => + try { + val nextNodeIndex = this.drawAlias(attr.J, attr.q) + val nextNodeId = attr.dstNeighbors(nextNodeIndex) + pathBuffer.append(nextNodeId) + + (srcNodeId, pathBuffer) + } catch { + case e: Exception => throw new RuntimeException(e.getMessage) + } } - }.cache + .cache activeWalks = randomWalk.first() - prevWalk.unpersist(blocking=false) + prevWalk.unpersist(blocking = false) } - if (randomWalkPaths != null) { val prevRandomWalkPaths = randomWalkPaths randomWalkPaths = randomWalkPaths.union(randomWalk).cache() @@ -197,42 +234,50 @@ object Node2vecAlgo { this } def embedding(): this.type = { - val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) => - Try(pathBuffer.map(_.toString).toIterable).getOrElse(null) - }.filter(_!=null) + val randomPaths = randomWalkPaths + .map { + case (vertexId, pathBuffer) => + Try(pathBuffer.map(_.toString).toIterable).getOrElse(null) + } + .filter(_ != null) val word2vec = new Word2Vec() - word2vec.setLearningRate(node2vecConfig.lr) - .setNumIterations(node2vecConfig.maxIter) - .setNumPartitions(node2vecConfig.modelNumPartition) - .setVectorSize(node2vecConfig.dim) - .setWindowSize(node2vecConfig.window) - val model=word2vec.fit(randomPaths) - model.save(context,node2vecConfig.modelPath)// use Word2VecModel.load(context, path) to load model - this.vectors=model.getVectors + word2vec + .setLearningRate(node2vecConfig.lr) + .setNumIterations(node2vecConfig.maxIter) + .setNumPartitions(node2vecConfig.modelNumPartition) + .setVectorSize(node2vecConfig.dim) + .setWindowSize(node2vecConfig.window) + val model = word2vec.fit(randomPaths) + model.save(context, node2vecConfig.modelPath) // use Word2VecModel.load(context, path) to load model + this.vectors = model.getVectors this } + /** - * run the Node2vec algorithm for nebula graph - */ + * run the Node2vec algorithm for nebula graph + */ def apply(spark: SparkSession, dataset: Dataset[Row], node2vecConfig: Node2vecConfig, - hasWeight:Boolean):DataFrame={ + hasWeight: Boolean): DataFrame = { val inputGraph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) - this.context=spark.sparkContext - this.node2vecConfig=node2vecConfig - val node2vecResult:Map[String,Array[Float]]=this.load(inputGraph) - .initTransitionProb() - .randomWalk() - .embedding() - .vectors - val node2vecRDD:RDD[Row]=this.context.parallelize(node2vecResult.toList).map(row=>Row(row._1,row._2.mkString(node2vecConfig.embSeparate))) + this.context = spark.sparkContext + this.node2vecConfig = node2vecConfig + val node2vecResult: Map[String, Array[Float]] = this + .load(inputGraph) + .initTransitionProb() + .randomWalk() + .embedding() + .vectors + val node2vecRDD: RDD[Row] = this.context + .parallelize(node2vecResult.toList) + .map(row => Row(row._1, row._2.mkString(node2vecConfig.embSeparate))) val schema = StructType( List( StructField(AlgoConstants.ALGO_ID_COL, StringType, nullable = false), StructField(AlgoConstants.NODE2VEC_RESULT_COL, StringType, nullable = true) )) - val algoResult=spark.sqlContext.createDataFrame(node2vecRDD,schema) + val algoResult = spark.sqlContext.createDataFrame(node2vecRDD, schema) algoResult } } diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala index f28e717..938726e 100644 --- a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgoSuite.scala @@ -6,16 +6,15 @@ package com.vesoft.nebula.algorithm.lib - import org.apache.spark.sql.SparkSession import org.junit.Test class ClosenessAlgoSuite { @Test - def closenessAlgoSuite()={ - val spark = SparkSession.builder().master("local").getOrCreate() - val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") - val result = ClosenessAlgo.apply(spark, data, true) + def closenessAlgoSuite() = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val result = ClosenessAlgo.apply(spark, data, true) assert(result.count() == 4) } } diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala index dfaf0c0..ed4bbfc 100644 --- a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/HanpSuite.scala @@ -13,11 +13,11 @@ import org.junit.Test class HanpSuite { @Test - def hanpSuite()={ - val spark = SparkSession.builder().master("local").getOrCreate() - val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") - val hanpConfig = new HanpConfig(0.1,10,1.0) - val result = HanpAlgo.apply(spark, data, hanpConfig, false) + def hanpSuite() = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val hanpConfig = new HanpConfig(0.1, 10, 1.0) + val result = HanpAlgo.apply(spark, data, hanpConfig, false) assert(result.count() == 4) } } diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala index 4041eed..65670af 100644 --- a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/Node2vecAlgoSuite.scala @@ -13,25 +13,24 @@ import org.junit.Test class Node2vecAlgoSuite { @Test - def node2vecAlgoSuite():Unit={ - val spark = SparkSession.builder().master("local").getOrCreate() - val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + def node2vecAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") val node2vecConfig = new Node2vecConfig(10, - 0.025, - 10, - 10, - 10, - 3, - 5, - 3, - 1.0, - 1.0, - false, - 10, - ",", - "src/test/resources/model" - ) - val result = Node2vecAlgo.apply(spark, data, node2vecConfig, true) + 0.025, + 10, + 10, + 10, + 3, + 5, + 3, + 1.0, + 1.0, + false, + 10, + ",", + "src/test/resources/model") + val result = Node2vecAlgo.apply(spark, data, node2vecConfig, true) assert(result.count() == 4) } }