In [1]:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.{Edge, Graph}

### 5.1 Graph的属性
#### 1. 图的表示
(1) 每个Vertex都是一个kv对, 以64-bit长度的long型值作为key(称作vertexId); 顶点的自定义属性值为value  
(2) 每个Edge都有srcVertextId和destVertexId; 还有边上的属性值  
(3) **Graph[VD,ED]**是图对象的泛型, VD是定点的属性类型; ED是边的属性类型  
(4) 有些case中, vertex的属性可能有多个类型, 此时只能通过继承同一个接口实现 :   
    ```scala
    class VertexProperty()
    case class UserProperty(val name:String) extends VertexProperty
    case class ProductProperty(val name:String,val price:Double) extends VertexProperty
    var g:Graph[VertexProperty,String] = null
    ```
#### 2. Graph是静态的
(1) 和RDD一样, Graph也是静态的, 任何修改Graph的值或结构的操作, 都会产生一个全新的Graph;   
(2) 不过, 源Graph中未产生修改的部分数据可以在新产生的Graph上重用
   
#### 3. Graph的逻辑表示
Graph的逻辑表示为2个RDD集合(源码中VertexRDD和EdgeRDD都继承自RDD类), 如下表示 : 
<img src="img/graphres.png" width="65%">

#### 4. 创建Graph   
如下, 用2个RDD创建Graph, 后面会介绍更多的创建Graph的方法

In [2]:
import org.apache.spark.graphx.Graph
// rdd for vertex
val users:RDD[(VertexId,(String,String))] = sc.parallelize(Array((3L, ("rxin", "student")), 
                                                                 (7L, ("jgonzal", "postdoc")),
                                                                 (5L, ("franklin", "prof")), 
                                                                 (2L, ("istoica", "prof"))))
// rdd for edge
val relationships:RDD[Edge[String]] = sc.parallelize(Array(Edge(3l,7l,"collab"),
                                                           Edge(5L, 3L, "advisor"),
                                                           Edge(2L, 5L, "colleague"), 
                                                           Edge(5L, 7L, "pi")))

val defaultUser:(String,String) = ("Anonymous","Missing")
val g:Graph[(String,String),String] = Graph(users,relationships,defaultUser)

users = ParallelCollectionRDD[0] at parallelize at <console>:37
relationships = ParallelCollectionRDD[1] at parallelize at <console>:42
defaultUser = (Anonymous,Missing)
g = org.apache.spark.graphx.impl.GraphImpl@4cfbf02


org.apache.spark.graphx.impl.GraphImpl@4cfbf02

In [3]:
val cnt1 = g.vertices.filter({                            // VertexRDD[VD] extends RDD[(VertexId, VD)]
  case (id,(name,position)) => position == "postdoc"      // EdgeRDD[ED] extends RDD[Edge[ED]](sc, deps)
}).count

val cnt2 = g.edges.filter(e=>e.srcId>e.dstId).count

cnt1 = 1
cnt2 = 1


1

#### 5. Graph的三元组视图
(1) 除了从Rdd[Long,VD]和RDD[Long,Long,ED]的角度看图的构成外, Graph还有一个三元组视图:   
&nbsp;&nbsp;&nbsp;&nbsp;(1) (srcId,srcAttr),  
&nbsp;&nbsp;&nbsp;&nbsp;(2) (dstId,dstAttr),  
&nbsp;&nbsp;&nbsp;&nbsp;(3) attr    
(2) 三元组可以看做join得来的视图
```sql
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
```

In [4]:
/**
 * class EdgeTriplet[VD, ED]{
 *    override def toString: String = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
 * } 
 */
g.triplets.collect().foreach(println)

((3,(rxin,student)),(7,(jgonzal,postdoc)),collab)
((5,(franklin,prof)),(3,(rxin,student)),advisor)
((2,(istoica,prof)),(5,(franklin,prof)),colleague)
((5,(franklin,prof)),(7,(jgonzal,postdoc)),pi)


### 5.2 操作符
#### 1. map操作符
每种map操作都会产生一个新的Graph, 不过会重用map之前的Graph的部分数据
```scala
class Graph{
    def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
    def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
    def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
```

#### 2. 结构操作符
(1) reverse: Graph[VD, ED]   
将边的src和dst互换; 由于改变边的属性和边的条数, 可以很快速地实现
```scala
def reverse: Graph[VD, ED]
```
  
(2) subgraph: graph中只保留
```scala
def subgraph(
      epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
```
(3) mask: 构建一个子图, 只保留另一个图中也出现的顶点和边
```scala
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
```

In [5]:
import org.apache.spark.graphx.Graph
// rdd for vertex
val users:RDD[(VertexId,(String,String))] = sc.parallelize(Array((3L, ("rxin", "student")), 
                                                                 (7L, ("jgonzal", "postdoc")),
                                                                 (5L, ("franklin", "prof")), 
                                                                 (2L, ("istoica", "prof"))))
// rdd for edge
val relationships:RDD[Edge[String]] = sc.parallelize(Array(Edge(3l,7l,"collab"),
                                                           Edge(5L, 3L, "advisor"),
                                                           Edge(2L, 5L, "colleague"), 
                                                           Edge(5L, 7L, "pi")))

val defaultUser:(String,String) = ("Anonymous","Missing")
val g:Graph[(String,String),String] = Graph(users,relationships,defaultUser)
val validGraph = g.subgraph(vpred = (vertexId,attr)=> attr._2 != "prof")
validGraph.triplets.collect().foreach(println)

((3,(rxin,student)),(7,(jgonzal,postdoc)),collab)


users = ParallelCollectionRDD[23] at parallelize at <console>:43
relationships = ParallelCollectionRDD[24] at parallelize at <console>:48
defaultUser = (Anonymous,Missing)
g = org.apache.spark.graphx.impl.GraphImpl@42769676
validGraph = org.apache.spark.graphx.impl.GraphImpl@50fb7127


org.apache.spark.graphx.impl.GraphImpl@50fb7127

#### 3. join操作符
1. joinVertices: 
    1. 以vertexId为key进行关联, 
    2. 需要定义一个map函数, 其输出的类型和vertex attr的类型要保持一致
        **map: (VertexId, VD, U) => VD** : VD是原vertexRDD的attr属性, U是join的RDD的Option值
    ```scala
    class Graph[VD, ED] {
      def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED]
    }
    ```
2. outerJoinVertices
   outerJoinVertices与joinVertices大致相同, 只有2个不一样的地方: 
    1. **map: (VertexId, VD, Option[U]) => VD2** : 其中输入的第三个参数为Option类型
    2. map函数输出的类型可以任意, 不必和原vertexAttr一致
        ```scala
    class Graph[VD, ED] {
      def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED]
    }
    ```

In [30]:
val outDegrees = g.outDegrees
val newG = g.joinVertices(outDegrees)({
    (vid,vattr,newAttr) => ("strings",s"${vattr}_out:${newAttr}") // 原vertexAttr为(String,String), 则map输出也应该是(String,String)
})
newG.triplets.collect.foreach(println)

((3,(strings,(rxin,student)_out:1)),(7,(jgonzal,postdoc)),collab)
((5,(strings,(franklin,prof)_out:2)),(3,(strings,(rxin,student)_out:1)),advisor)
((2,(strings,(istoica,prof)_out:1)),(5,(strings,(franklin,prof)_out:2)),colleague)
((5,(strings,(franklin,prof)_out:2)),(7,(jgonzal,postdoc)),pi)


outDegrees = VertexRDDImpl[49] at RDD at VertexRDD.scala:57
newG = org.apache.spark.graphx.impl.GraphImpl@291d0b55


org.apache.spark.graphx.impl.GraphImpl@291d0b55

In [35]:
val outDegrees = g.outDegrees
val newG = g.outerJoinVertices(outDegrees)({
    (vid,vattr,newAttrOpt) => 999   // map函数可任意输出
})
newG.triplets.collect.foreach(println)

((3,999),(7,999),collab)
((5,999),(3,999),advisor)
((2,999),(5,999),colleague)
((5,999),(7,999),pi)


outDegrees = VertexRDDImpl[49] at RDD at VertexRDD.scala:57
newG = org.apache.spark.graphx.impl.GraphImpl@365cacbf


org.apache.spark.graphx.impl.GraphImpl@365cacbf

#### 4. Nerghborhood Aggregation
(1) 很多图迭代模型, 都要聚合来自邻居节点的消息, 因此, graphx提供对应的方法aggregateMessages  
(2) aggregateMessages需要定义sendMsg()和mergeMsg(); 返回值为VertexRDD    
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;sendMsg输入中提供了EdgeContext获取边的选相关信息(如下析构函数), 和sendToSrc,sendToDst方法  
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;mergeMsg相当于聚合2个msg为1个msg的reduce函数
```scala
def aggregateMessages[Msg: ClassTag](
  sendMsg: EdgeContext[VD, ED, Msg] => Unit,
  mergeMsg: (Msg, Msg) => Msg,
  tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]

class EdgeContext {
  def sendToSrc(msg: A): Unit
  def sendToDst(msg: A): Unit
}
object EdgeContext {
    /** EdgeContext中包含的属性 */
    def unapply[VD, ED, A](edge: EdgeContext[VD, ED, A]): Some[(VertexId, VertexId, VD, VD, ED)] =
        Some((edge.srcId, edge.dstId, edge.srcAttr, edge.dstAttr, edge.attr))
}
```
(3) 如下, 每个顶点的vertexAttr都是指代年龄, 使用aggregateMessages实现计算顶点及其周围顶点的年龄平均值. 

In [39]:
import org.apache.spark.graphx.util.GraphGenerators
// 创建1个20个节点, vertexAttr为double型的定点RDD
val g:Graph[Double,Int] = GraphGenerators.logNormalGraph(sc,numVertices = 20).mapVertices((age,_)=>age.toDouble)
//g.triplets.collect.foreach(println)
val vertexs: VertexRDD[(Int, Double)] =  g.aggregateMessages[(Int,Double)](  // vertexAtt的类型为(Int,Double)
  context=>{  // sendMsg
    if(context.srcAttr > context.dstAttr)
      context.sendToDst((1,context.srcAttr))
  },
  (msg1,msg2)=>{  // mergeMsg
    (msg1._1+msg2._1,msg1._2+msg2._2)
  }
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  vertexs.mapValues( (vId, value) =>  // mapValues结果中带着vertexId
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

(0,6.8)
(12,15.0)
(13,16.5)
(1,9.4)
(14,17.0)
(2,11.0)
(15,18.0)
(3,9.857142857142858)
(4,10.666666666666666)
(16,19.0)
(5,13.166666666666666)
(6,12.11111111111111)
(7,13.0)
(8,9.0)
(9,14.857142857142858)
(10,15.285714285714286)
(11,15.166666666666666)


g = org.apache.spark.graphx.impl.GraphImpl@7394877d
vertexs = VertexRDDImpl[211] at RDD at VertexRDD.scala:57
avgAgeOfOlderFollowers = VertexRDDImpl[213] at RDD at VertexRDD.scala:57


VertexRDDImpl[213] at RDD at VertexRDD.scala:57

### 5.3 Pregel
#### 前言
1. Graphx有很多的内部优化, 具体参考[graphx paper](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-gonzalez.pdf)
2. 关于图的迭代算法, 最好使用pregel api实现, 因为提供了正确的persist,unpersist,checkpoint的流程  
    1. persist和unpersist每轮迭代都会进行, 在内存中进行cache; 每次都是先unpersist上次的计算结果, 在persist这次的计算结果  
    2. checkpoint是通过参数指定的多少轮以后自动进行 , set **spark.graphx.pregel.checkpointInterval** = 10

#### 一. Pregel api
1. 三种user defined function
    1. **Vertex Program**:   
       Vertex Program在每个ertex上运行, 其输入为
        1. message list
        2. vertex attr state
        3. vertexId  
       输出为vertex的新状态(新的attr)
    2. **Send Message Program**:  
       运行在三元组视图的记录上;其输入为三元组视图**EdgeTriplet**, 输出为1个message
    3. **Merge Message Program**:    
       把同一个顶点上的2个message合并为一个message, 输出组合后的message  
       消息的形式为kv对: (vartexid作为key, vertex message作为value)
2. 三个参数
    1. Initial message:该message会发送给每个vertex, 用于第一次迭代  
    2. Max Iteration: 最大迭代次数    
    3. Edge Direction: 用于过滤那些需要执行send message程序的边上; 只有当变得方向是OUT是才会执行发送程序
    
```scala
def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,   // 默认值
      activeDirection: EdgeDirection = EdgeDirection.Either)  // 默认值
      (
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
: Graph[VD, ED]
```
    
#### 二. Pregel的表现优化
1. VertexRDD手动分区   
    graphx只会对EdgeRDD分区, 因此需要手动对VertexRDD分区; 经验上看, VertexRDD和EdgeRDD个数相同时会有更好的表现
2. 设置checkpoint    
&nbsp;&nbsp;&nbsp;&nbsp;因为graphx是迭代算法, 每次迭代都会导致构成graph的VertexRDD和EdgeRDD链会越来越长; 所以需要使用缓存来确保每次迭代避免重复计算RDD链;单着并不能改变一个事实: 子RDD到父RDD的对象引用列表还是会不断增长. 为了切断RDD的linage, 应该在每几次迭代后进行checkpoint.  
3. 如下,有checkpoint的迭代式图更新算法(模拟) :    
pregel中每次迭代会persist到内存  ; 每隔一段间隔checkpint;

#### 三. 如下使用pregel api进行单源最短路径查找

In [43]:
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances,边的默认值是1.0
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 10).mapEdges(e => e.attr.toDouble)
graph.triplets.collect.foreach(println)
val sourceId: VertexId = 8 // 起始点

// 初始化vertexAttr, 除8号vertex外, 其余所有定点的属性值都是无限
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message, 在边上传播, 框架内部会指找到2次结果不变的边
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

((0,7),(1,4),1.0)
((0,7),(1,4),1.0)
((0,7),(6,6),1.0)
((0,7),(7,6),1.0)
((0,7),(7,6),1.0)
((0,7),(8,2),1.0)
((0,7),(8,2),1.0)
((1,4),(1,4),1.0)
((1,4),(2,7),1.0)
((1,4),(6,6),1.0)
((1,4),(9,8),1.0)
((2,7),(0,7),1.0)
((2,7),(0,7),1.0)
((2,7),(2,7),1.0)
((2,7),(6,6),1.0)
((2,7),(7,6),1.0)
((2,7),(7,6),1.0)
((2,7),(8,2),1.0)
((3,1),(3,1),1.0)
((4,2),(0,7),1.0)
((4,2),(5,3),1.0)
((5,3),(2,7),1.0)
((5,3),(2,7),1.0)
((5,3),(9,8),1.0)
((6,6),(0,7),1.0)
((6,6),(5,3),1.0)
((6,6),(6,6),1.0)
((6,6),(8,2),1.0)
((6,6),(8,2),1.0)
((6,6),(9,8),1.0)
((7,6),(0,7),1.0)
((7,6),(0,7),1.0)
((7,6),(1,4),1.0)
((7,6),(3,1),1.0)
((7,6),(5,3),1.0)
((7,6),(7,6),1.0)
((8,2),(3,1),1.0)
((8,2),(6,6),1.0)
((9,8),(0,7),1.0)
((9,8),(2,7),1.0)
((9,8),(4,2),1.0)
((9,8),(7,6),1.0)
((9,8),(7,6),1.0)
((9,8),(8,2),1.0)
((9,8),(8,2),1.0)
((9,8),(9,8),1.0)
(0,2.0)
(1,3.0)
(2,3.0)
(3,1.0)
(4,3.0)
(5,2.0)
(6,1.0)
(7,3.0)
(8,0.0)
(9,2.0)


graph = org.apache.spark.graphx.impl.GraphImpl@617118e6
sourceId = 8
initialGraph = org.apache.spark.graphx.impl.GraphImpl@ba5b223
sssp = org.apache.spark.graphx.impl.GraphImpl@29f3e54


org.apache.spark.graphx.impl.GraphImpl@29f3e54

#### 4. pregel的实现思路及其他图算法
1. 基本思路
    1. 首先, pregel设置一个舒适message发往图中的每个顶点, 每个顶点在收到消息后, 和自身的vertexAttr进行聚合, 作为自己的新attr  
    2. 然后, 开始迭代部分:  
        每个顶点会想自己的邻居发送messgae, 因此每个节点会受到其邻居发来的message 列表; 节点将这个message列表聚合成一条message更新自己的VertexAttr. 这个步骤称作一个superstep
    3. 如果2次super step中, 顶点的vertexAttr没有发生变化, 则认为顶点已经稳定, 下次该顶点既不会发送message, 也不会再受到message
    4. 当所有顶点的vertexAttr都不在发生变化时, 算法结束
    
2. PageRank 在pregel的实现  
    1. 每个顶点的vertexAttr初始化为1/$N_{graph顶点个数}$
    2. 将自己的vertexAttr传播给邻居
    3. 邻居更新自己的attr为$0.15*\frac{1}{N_{graph顶点个数}}+0.85\Sigma(邻居的attr)$

3. 单源最短路径问题
    1. 除源点外, 其余顶点的attr设置为无穷, 边attr设置为1
    2. 更新顶点的vertexAttr为(邻居vertexAttr+边的edgeAttr), 如果后者小的话
    3. 持续迭代直到收敛
    
4. 半群问题  
    1. 半群问题常用在社交网络中, 表示1个group中的人交往频繁, 而和其他人交往的不频繁;  
        半群问题的graph中边有各种不同的权值, 表示顶点之间交互程度的强弱;   
        和一般的cluster算法不同, 半群问题下的顶点可能同属多个cluster

### 5.4 Graph构建起
#### 1. GraphLoader 读取文件
(1) 文件格式为(source vertex ID, destination vertex ID)对   
(2) 自动跳过首行"#"注释
```text
# This is a comment
2 1
4 1
1 2
```
(3) 读取文件的api   
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a. 如下, 所有顶点和边的attr都是默认的1;  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; b. canonicalOrientation参数允许边的方向为正向(srcId<dstId)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c. 此种方法创建的图不会对edge重分区, 如edgelist在那个block, 就会产生在哪台机器上
```scala
object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}
```
#### 2. 从RDD创建图
```scala
object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}
```

### **[附录]:**  图迭代进行缓存的一般写法

In [30]:
sc.setLogLevel("WARN")
import org.apache.spark.storage.StorageLevel

def fun() = {
    sc.setCheckpointDir("/tmp/test")
    var updateCount = 0
    val interval = 10

    def update(data:Graph[Int,Int]):Unit = {
      data.persist()  // 每轮迭代都persist
      updateCount += 1
      if(updateCount%interval == 0)   // 每隔interval进行checkpoint
        data.checkpoint()
    }

    var g = Graph.fromEdges(sc.parallelize(Array(Edge(1l,3l,1),
      Edge(2l,4l,1),
      Edge(3l,4l,1))),1)

    g.persist()
    println(g.vertices.count())

    for(i <- 1 to 20){
      println(s"Iteration $i")
      val newGraph = g.mapVertices((vid,vattr) => (vattr*i)/17)
      g = g.outerJoinVertices(newGraph.vertices)({(vid,vAttr,newAttr) => newAttr.getOrElse(-99)})
      update(g)
      println(g.vertices.count)
    }

    g.triplets.collect.foreach(println)
}
fun()

4
Iteration 1
4
Iteration 2
4
Iteration 3
4
Iteration 4
4
Iteration 5
4
Iteration 6
4
Iteration 7
4
Iteration 8
4
Iteration 9
4
Iteration 10
4
Iteration 11
4
Iteration 12
4
Iteration 13
4
Iteration 14
4
Iteration 15
4
Iteration 16
4
Iteration 17
4
Iteration 18
4
Iteration 19
4
Iteration 20
4


fun: ()Unit


((1,0),(3,0),1)
((2,0),(4,0),1)
((3,0),(4,0),1)
