In [32]:
import org.apache.spark.graphx._

val myVertices = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"),(6L,"Naruto"),
 (3L, "Charles"), (4L, "Diane"), (5L, "Went to gym this morning")))

val myEdges = sc.makeRDD(Array(Edge(1L, 2L, "is-friends-with"), Edge(6L,1L,"Likes-status"),
 Edge(2L, 6L, "is-friends-with"), Edge(3L, 4L, "is-friends-with"),Edge(2L,1L,"Likes-status"), Edge(3L, 1L, "Wrote-status")))

val myGraph = Graph(myVertices, myEdges)
myGraph.vertices.foreach(println(_))
// res1: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4,Diane), (2,Bill), (1,Ann), (3,Charles), (5,Went to gym this morning))

// Try this by yourself:
myGraph.edges.foreach(println(_))
myGraph.degrees.foreach(println(_))
myGraph.inDegrees.foreach(println(_))
myGraph.outDegrees.foreach(println(_))

(4,Diane)
(2,Bill)
(5,Went to gym this morning)
(6,Naruto)
(1,Ann)
(3,Charles)
Edge(6,1,Likes-status)
Edge(3,1,Wrote-status)
Edge(3,4,is-friends-with)
Edge(2,1,Likes-status)
Edge(1,2,is-friends-with)
Edge(2,6,is-friends-with)
(4,1)
(3,2)
(6,2)
(1,4)
(2,3)
(1,3)
(6,1)
(2,1)
(4,1)
(2,2)
(1,1)
(6,1)
(3,2)


import org.apache.spark.graphx._
myVertices: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[2065] at makeRDD at <console>:56
myEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[2066] at makeRDD at <console>:59
myGraph: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@caec5e1


In [3]:
val tmp = myGraph.mapEdges(e => e.attr == "is-friends-with")
tmp.edges.foreach(println(_))

Edge(3,4,true)
Edge(4,5,false)
Edge(1,2,true)
Edge(2,3,true)
Edge(3,5,false)


tmp: org.apache.spark.graphx.Graph[String,Boolean] = org.apache.spark.graphx.impl.GraphImpl@6f7b757


In [5]:
myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).join(myGraph.vertices).foreach(println(_))

// Or better formatted variant:
myGraph.aggregateMessages[Int](_.sendToSrc(1),
 _ + _).rightOuterJoin(myGraph.vertices).map(
 x => (x._2._2, x._2._1.getOrElse(0))).foreach(println(_))

(2,(1,Bill))
(4,(1,Diane))
(3,(2,Charles))
(1,(1,Ann))
(Bill,1)
(Diane,1)
(Ann,1)
(Went to gym this morning,0)
(Charles,2)


In [7]:
 val g = Pregel(  
    graph = myGraph.mapVertices((vid,vd) => 0),
    initialMsg = 0,
    maxIterations = Int.MaxValue,
    activeDirection = EdgeDirection.Out 
)(
    vprog = (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
    sendMsg = (et:EdgeTriplet[Int,String]) =>
        Iterator((et.dstId, et.srcAttr+1)),
    mergeMsg = (a:Int,b:Int) => math.max(a,b) 
)

g.vertices.foreach(println(_))

(3,2)
(4,3)
(5,4)
(1,0)
(2,1)


g: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@2af47bc


In [9]:
myGraph.mapTriplets(t=>t.attr=="is-friends-with").edges.foreach(println(_))

Edge(3,5,false)
Edge(1,2,true)
Edge(3,4,true)
Edge(2,3,true)
Edge(4,5,false)


In [10]:
val friendlyVertices = myGraph.edges.filter(_.attr=="is-friends-with").map(_.srcId).collect.toList
myGraph.mapVertices((v,s) => friendlyVertices.contains(v)).vertices.foreach(println(_))

(3,true)
(4,false)
(2,true)
(5,false)
(1,true)


friendlyVertices: List[Long] = List(1, 2, 3)


In [11]:
myGraph.aggregateMessages[Boolean](c => c.sendToSrc(c.attr == "is-friends-with"), (a, b) => a || b)

res7: org.apache.spark.graphx.VertexRDD[Boolean] = VertexRDDImpl[173] at RDD at VertexRDD.scala:57


In [33]:
myGraph.pageRank(0.001).vertices.foreach(println)

(1,2.106715854753376)
(6,1.0876823998322398)
(2,2.0185689415880077)
(3,0.22979059965733606)
(4,0.32745160451170385)
(5,0.22979059965733606)


Pregel has 3 main parametres: pagerankGraph, initialMessage and numIter. PagerankGraph - graph input. initialMessage - initial message to start the computation. last one is number of iterations.

2 example: for parallel compuation and simplify graph analytics projects.

How I acheved highest value innode 1? I created new node ("Naruto"). Connected 3 to 1, 2 to 1, 6(new) to 1 and removed between 4 and 5, removed 2 and 3.