Skip to content

Commit

Permalink
Merge pull request #8 from wengzhenjie/master
Browse files Browse the repository at this point in the history
format some files
  • Loading branch information
Nicole00 committed Nov 9, 2021
2 parents 3cf85fb + 2365881 commit c8a9a6a
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,28 @@ import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ClosenessAlgo {
private val LOGGER = Logger.getLogger(this.getClass)
private val LOGGER = Logger.getLogger(this.getClass)
val ALGORITHM: String = "Closeness"
type SPMap = Map[VertexId, Double]

private def makeMap(x: (VertexId, Double)*) = Map(x: _*)

private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map { case (v, d) => v -> (d + weight) }
private def addMap(spmap: SPMap, weight: Double): SPMap = spmap.map {
case (v, d) => v -> (d + weight)
}

private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue))
(spmap1.keySet ++ spmap2.keySet).map { k =>
k -> math.min(spmap1.getOrElse(k, Double.MaxValue), spmap2.getOrElse(k, Double.MaxValue))
}(collection.breakOut)
}

/**
* run the Closeness algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
hasWeight:Boolean):DataFrame={
* run the Closeness algorithm for nebula graph
*/
def apply(spark: SparkSession, dataset: Dataset[Row], hasWeight: Boolean): DataFrame = {
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val closenessRDD = execute(graph)
val closenessRDD = execute(graph)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
Expand All @@ -46,12 +47,11 @@ object ClosenessAlgo {
}

/**
* execute Closeness algorithm
*/
def execute(graph: Graph[None.type, Double]):RDD[Row]={
* execute Closeness algorithm
*/
def execute(graph: Graph[None.type, Double]): RDD[Row] = {
val spGraph = graph.mapVertices((vid, _) => makeMap(vid -> 0.0))


val initialMessage = makeMap()

def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
Expand All @@ -63,15 +63,15 @@ object ClosenessAlgo {
if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
else Iterator.empty
}
val spsGraph=Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
val spsGraph = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
val closenessRDD = spsGraph.vertices.map(vertex => {
var dstNum = 0
var dstNum = 0
var dstDistanceSum = 0.0
for (distance <- vertex._2.values) {
dstNum += 1
dstDistanceSum += distance
}
Row(vertex._1,(dstNum - 1) / dstDistanceSum)
Row(vertex._1, (dstNum - 1) / dstDistanceSum)
})
closenessRDD
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* The implementation of the algorithm refers to paper `Towards real-time community detection in large networks`.
*/
object HanpAlgo {
val ALGORITHM: String = "Hanp"

/**
* run the Hanp algorithm for nebula graph
*/
* run the Hanp algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
hanpConfig: HanpConfig,
hasWeight:Boolean,
preferences:RDD[(VertexId,Double)]=null):DataFrame={
hasWeight: Boolean,
preferences: RDD[(VertexId, Double)] = null): DataFrame = {
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val hanpResultRDD = execute(graph,hanpConfig.hopAttenuation,hanpConfig.maxIter,hanpConfig.preference,preferences)
val hanpResultRDD = execute(graph,
hanpConfig.hopAttenuation,
hanpConfig.maxIter,
hanpConfig.preference,
preferences)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
Expand All @@ -37,51 +44,59 @@ object HanpAlgo {
}

/**
* execute Hanp algorithm
*/
* execute Hanp algorithm
*/
def execute(graph: Graph[None.type, Double],
hopAttenuation:Double,
hopAttenuation: Double,
maxIter: Int,
preference:Double=1.0,
preferences:RDD[(VertexId,Double)]=null):RDD[Row]={
var hanpGraph: Graph[(VertexId, Double, Double), Double]=null
if(preferences==null){
hanpGraph=graph.mapVertices((vertexId,_)=>(vertexId,preference,1.0))
}else{
hanpGraph=graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {(vertexId,vertexPreference.getOrElse(preference),1.0)})
preference: Double = 1.0,
preferences: RDD[(VertexId, Double)] = null): RDD[Row] = {
var hanpGraph: Graph[(VertexId, Double, Double), Double] = null
if (preferences == null) {
hanpGraph = graph.mapVertices((vertexId, _) => (vertexId, preference, 1.0))
} else {
hanpGraph = graph.outerJoinVertices(preferences)((vertexId, _, vertexPreference) => {
(vertexId, vertexPreference.getOrElse(preference), 1.0)
})
}
def sendMessage(e: EdgeTriplet[(VertexId,Double,Double), Double]): Iterator[(VertexId, Map[VertexId, (Double,Double)])] = {
if(e.srcAttr._3>0 && e.dstAttr._3>0){
def sendMessage(e: EdgeTriplet[(VertexId, Double, Double), Double])
: Iterator[(VertexId, Map[VertexId, (Double, Double)])] = {
if (e.srcAttr._3 > 0 && e.dstAttr._3 > 0) {
Iterator(
(e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))),
(e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr)))
(e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3, e.srcAttr._2 * e.srcAttr._3 * e.attr))),
(e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3, e.dstAttr._2 * e.dstAttr._3 * e.attr)))
)
}else if(e.srcAttr._3>0){
Iterator((e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3,e.srcAttr._2*e.srcAttr._3*e.attr))))
}else if(e.dstAttr._3>0){
Iterator((e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3,e.dstAttr._2*e.dstAttr._3*e.attr))))
}else{
} else if (e.srcAttr._3 > 0) {
Iterator(
(e.dstId, Map(e.srcAttr._1 -> (e.srcAttr._3, e.srcAttr._2 * e.srcAttr._3 * e.attr))))
} else if (e.dstAttr._3 > 0) {
Iterator(
(e.srcId, Map(e.dstAttr._1 -> (e.dstAttr._3, e.dstAttr._2 * e.dstAttr._3 * e.attr))))
} else {
Iterator.empty
}
}
def mergeMessage(count1: Map[VertexId, (Double,Double)], count2: Map[VertexId, (Double,Double)])
: Map[VertexId, (Double,Double)] = {
def mergeMessage(count1: Map[VertexId, (Double, Double)],
count2: Map[VertexId, (Double, Double)]): Map[VertexId, (Double, Double)] = {
(count1.keySet ++ count2.keySet).map { i =>
val count1Val = count1.getOrElse(i, (0.0,0.0))
val count2Val = count2.getOrElse(i, (0.0,0.0))
i -> (Math.max(count1Val._1,count2Val._1),count1Val._2+count2Val._2)
val count1Val = count1.getOrElse(i, (0.0, 0.0))
val count2Val = count2.getOrElse(i, (0.0, 0.0))
i -> (Math.max(count1Val._1, count2Val._1), count1Val._2 + count2Val._2)
}(collection.breakOut)
}
def vertexProgram(vid: VertexId, attr: (VertexId,Double,Double), message: Map[VertexId, (Double,Double)]): (VertexId,Double,Double) = {
def vertexProgram(vid: VertexId,
attr: (VertexId, Double, Double),
message: Map[VertexId, (Double, Double)]): (VertexId, Double, Double) = {
if (message.isEmpty) {
attr
} else {
val maxMessage=message.maxBy(_._2._2)
(maxMessage._1,attr._2,maxMessage._2._1-hopAttenuation)
val maxMessage = message.maxBy(_._2._2)
(maxMessage._1, attr._2, maxMessage._2._1 - hopAttenuation)
}
}
val initialMessage = Map[VertexId, (Double,Double)]()
val hanpResultGraph=hanpGraph.pregel(initialMessage,maxIter)(vertexProgram,sendMessage,mergeMessage)
hanpResultGraph.vertices.map(vertex=>Row(vertex._1,vertex._2._1))
val initialMessage = Map[VertexId, (Double, Double)]()
val hanpResultGraph =
hanpGraph.pregel(initialMessage, maxIter)(vertexProgram, sendMessage, mergeMessage)
hanpResultGraph.vertices.map(vertex => Row(vertex._1, vertex._2._1))
}
}
Loading

0 comments on commit c8a9a6a

Please sign in to comment.