# 3小时入门Spark之Graphx图计算

In [2]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx._
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD

val spark = SparkSession.builder()
   .master("local[4]").appName("graph")
   .getOrCreate()

import spark.implicits._

val sc = spark.sparkContext

//将DataFrame写入单个本地文件
def writeCsvOnly_t(df: DataFrame, path: String, sep: String = "\t"): Unit = {
    val header = df.columns.reduce(_+sep+_)
    val values = df.rdd.map(_.toSeq.reduce(_+sep+_)).collect()
    val writer = new java.io.PrintWriter(path)
    writer.println(header)
    for(v<-values){
      writer.println(v)
    }
    writer.close
  }

spark = org.apache.spark.sql.SparkSession@25e787ad
sc = org.apache.spark.SparkContext@1609045a


writeCsvOnly_t: (df: org.apache.spark.sql.DataFrame, path: String, sep: String)Unit


org.apache.spark.SparkContext@1609045a

### 一，图的基本概念

图(graph)有时候又被称为网络(network), 是一种适合表现事物之间关联关系的数据结构。


1，图的组成

图的基本组成是顶点(vertex)和边(edge).

2，图的分类

有向图和无向图：根据边是否有方向，图可以分成为有向图和无向图。有向图的边从源顶点出发，指向目标顶点。在无向图中，一个顶点上的边的数量叫做这个顶点的度。在有向图中，一个顶点上出发的边的数量叫做这个顶点的出度，汇集到一个顶点上的边的数量叫做这个顶点的入度。

有环图和无环图：如果有向图中存在一些边构成闭合的环，称为有环图，反之为无环图。有环图上设计算法需要考虑终止条件，否则算法可能会沿着环永远循环下去。

多重图和伪图：如果两个顶点之间可以有多条平行边，称为多重图。如果存在自环，即由一个顶点指向自己的边，则称为伪图。Graphx的图都是伪图。

属性图和非属性图：如果顶点和边是包括属性的，称为属性图，否则是非属性图。非属性图作用不大。通常顶点和边至少有一个是包括属性的，Graphx的图都是属性图。

二分图：如果图的顶点被分成两个不同的子集，边的源顶点始终来自其中一个子集，目标顶点始终来自另外一个子集。这种图称为二分图。二分图可用于交友网站，源顶点来自男性集合，目标顶点来自女性集合。二分图也可以用于推荐系统，源顶点来自用户，目标顶点来自商品。


3，图的表示

如果图的边是没有属性的，可以用稀疏的邻接矩阵进行表示。

在Graphx中，用顶点属性表VertexRDD和边属性表EdgeRDD联合来表示图。

4，图的算法

图的著名算法包括：用于衡量顶点重要性的PageRank算法，用于计算顶点之间距离的最短路径算法，用于社区发现的标签传播算法，用于路径规划的Dijkstra算法和Kruscal算法……

5，图的应用

图的应用主要包括网站排名，社交网络分析，金融欺诈检测，推荐系统等等。


### 二，图的创建

有3类常用的创建图的方法。

第一种是通过Graph的构造函数进行创建。

第二种是通过GraphLoader.edgeListFile从文件读入EdgeRDD进行创建。

第三种是使用util.GraphGenerators生成一些简单的图用于测试算法。

1，通过Graph构造函数创建

Graph类有3个不同的构造函数，它们的签名如下，用法也是一目了然。

```scala
object Graph {
  def apply[C, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
```

In [3]:
// 创建VertexRDD，注意VertexId必须是Long类型
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// 创建EdgeRDD
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// 设置缺失顶点
val defaultUser = ("John Doe", "Missing")

// 使用apply构造函数创建图
val graph_user:Graph[(String, String), String] = Graph(users, relationships, defaultUser)

//查看图的部分数据，triplets同时存储了边属性信息和对应顶点属性信息。
graph_user.triplets.take(5).foreach(println)


((3,(rxin,student)),(7,(jgonzal,postdoc)),collab)
((5,(franklin,prof)),(3,(rxin,student)),advisor)
((2,(istoica,prof)),(5,(franklin,prof)),colleague)
((5,(franklin,prof)),(7,(jgonzal,postdoc)),pi)


users = ParallelCollectionRDD[0] at parallelize at <console>:39
relationships = ParallelCollectionRDD[1] at parallelize at <console>:44
defaultUser = (John Doe,Missing)
graph_user = org.apache.spark.graphx.impl.GraphImpl@2b009dd6


org.apache.spark.graphx.impl.GraphImpl@2b009dd6

2，从文件读入EdgeRDD进行创建

data/paperCite.edges是一些论文之间的引用关系，其格式如下所示。

#FromNodeId ToNodeId
1 2
1 3
1 4
1 5
2 6
2 7
2 8
5 9
5 10
5 11
5 12
6 13
7 14


In [7]:
val graph_paper = GraphLoader.edgeListFile(sc,"data/paperCite.edges")

//找到引用最高的文章
graph_paper.outDegrees.reduce((a,b) =>if (a._2 > b._2) a else b)

graph_paper = org.apache.spark.graphx.impl.GraphImpl@752cfd2


(559,11119)

3，使用util.GraphGenerators生成测试用图

In [5]:
//生成 4*4的网格图

//网格图的顶点和边符合特定的模式，就像是在一个二维的网格或矩阵中
val graph_grid =  util.GraphGenerators.gridGraph(sc, 4, 4)

graph_grid.edges.take(5).foreach(println)

Edge(0,1,1.0)
Edge(0,4,1.0)
Edge(1,2,1.0)
Edge(1,5,1.0)
Edge(2,3,1.0)


graph_grid = org.apache.spark.graphx.impl.GraphImpl@2feea40


org.apache.spark.graphx.impl.GraphImpl@2feea40

In [8]:
//生成星形图

//星形图指的是有一个顶点通过边与所有其他顶点相连，除此之外图中不存在其他边
val graph_star = util.GraphGenerators.starGraph(sc, 10)
graph_star.outDegrees.take(5).foreach(println)
graph_star.inDegrees.take(1).foreach(println)

(4,1)
(8,1)
(1,1)
(9,1)
(5,1)
(0,9)


graph_star = org.apache.spark.graphx.impl.GraphImpl@66b706c4


org.apache.spark.graphx.impl.GraphImpl@66b706c4

In [9]:
//生成对数正态图

//对数正态图通过随机算法生成，每个顶点的出度值分布符合对数正态分布
val graph_lognormal = util.GraphGenerators.logNormalGraph(sc,16)
graph_lognormal.outDegrees.map(_._2).collect.sorted


graph_lognormal = org.apache.spark.graphx.impl.GraphImpl@4281dafa


Array(1, 2, 2, 2, 2, 2, 3, 4, 6, 7, 7, 8, 11, 13, 15, 15)

In [10]:
//生成递归矩阵图

//递归矩阵图通过随机算法生成，递归矩阵图可以用于模拟典型的社交网络架构，会呈现社区趋势

val graph_rmat = util.GraphGenerators.rmatGraph(sc,16,40) //参数为顶点数，独立的边数量

graph_rmat.inDegrees.map(_._2).collect.sorted

graph_rmat = org.apache.spark.graphx.impl.GraphImpl@279c13de


Array(4, 4, 4, 5, 5, 6, 6, 6)

### 三，图的可视化

可以使用Python中的Networkx库，或者Gephi开源软件对图进行可视化，此外使用Zepplin也可以对Graphx的图进行可视化。

此处我们演示通过调用Networkx库中对Graphx图的可视化。

plot_graph.py 文件中的代码如下。

```python
import networkx as nx
import pandas as pd 
import sys
import matplotlib.pyplot as plt

def plot(directed = True):
    dfedges = pd.read_csv('data/dfedges.csv',sep = '\t')
    if directed:
        graph = nx.DiGraph(dfedges[['srcId','dstId']].values.tolist())
    else:
        graph = nx.Graph(dfedges[['srcId','dstId']].values.tolist())
    nx.draw_networkx(graph)
    plt.show()

if __name__ == "__main__":
    if sys.argv[1]=="true": 
        plot(directed = True)
    else:
        plot(directed = False)

```

In [11]:
def visualEdges(dfedges:DataFrame, ifdirected:Boolean = true):Unit = {
    writeCsvOnly_t(dfedges,"data/dfedges.csv")
    Runtime.getRuntime().exec(s"python plot_graph.py ${ifdirected}")
}

visualEdges: (dfedges: org.apache.spark.sql.DataFrame, ifdirected: Boolean)Unit


In [12]:
//网格图的可视化
val graph_grid =  util.GraphGenerators.gridGraph(sc, 4, 4)

visualEdges(graph_grid.edges.toDS.toDF,false)

graph_grid = org.apache.spark.graphx.impl.GraphImpl@1d34c62b


org.apache.spark.graphx.impl.GraphImpl@1d34c62b

![](data/网格图可视化.png)

In [13]:
//星形图的可视化
val graph_star = util.GraphGenerators.starGraph(sc, 10)

visualEdges(graph_star.edges.toDS.toDF,true)

graph_star = org.apache.spark.graphx.impl.GraphImpl@355d4813


org.apache.spark.graphx.impl.GraphImpl@355d4813

![](data/星形图可视化.png)

In [14]:
//对数正态图可视化
val graph_lognormal = util.GraphGenerators.logNormalGraph(sc,16)
visualEdges(graph_lognormal.edges.toDS.toDF,true)

graph_lognormal = org.apache.spark.graphx.impl.GraphImpl@7293d26f


org.apache.spark.graphx.impl.GraphImpl@7293d26f

![](data/对数正态图可视化.png)

### 四，Graph类的常用方法

Graph的各种接口方法的签名如下所示，大概有9组30多个方法。

其中pregel迭代接口和aggregateMessages合并消息接口是较为重要而灵活的方法。

使用pregel和aggregateMessages方法的精妙之处在于只需要考虑每个顶点的更新函数即可，让框架在遍历顶点时进行调用，

而无需考虑并行分布计算的细节。这种图计算编程模式叫做"像顶点一样思考"(Think Like A Vertex)。


In [None]:
class Graph[VD, ED] {
    
  // 1，图的信息 
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]

  // 2，图的视图 
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]

  // 3，图的缓存和分区
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  // 4，修改属性创建新图 
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

  // 5，修改图结构创建新图 
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
    
  // 6，连接其它RDD
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
    
  // 7，收集邻居消息
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
    
  // 8，pregel迭代接口 
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
    
  // 9，内置常用图算法
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

**1，图的信息**

degrees既包括inDegrees和outDegrees之和。

In [15]:
val graph_star = util.GraphGenerators.starGraph(sc, 10)

graph_star.degrees.collect

graph_star = org.apache.spark.graphx.impl.GraphImpl@323f2c57


Array((4,1), (0,9), (8,1), (1,1), (9,1), (5,1), (6,1), (2,1), (3,1), (7,1))

In [16]:
graph_star.inDegrees.collect

Array((0,9))

In [17]:
graph_star.outDegrees.collect

Array((4,1), (8,1), (1,1), (9,1), (5,1), (6,1), (2,1), (3,1), (7,1))

**2，图的视图**

edges和vertices必须至少包括1个属性，如果没有，一般给每个顶点和边填充一个1作为属性。

可以从triplets中同时获取边的属性，以及与之关联的顶点属性。

In [18]:
val graph_star = util.GraphGenerators.starGraph(sc, 5)
graph_star.vertices.collect.foreach(println)

(4,1)
(0,1)
(1,1)
(2,1)
(3,1)


graph_star = org.apache.spark.graphx.impl.GraphImpl@64cfc576


org.apache.spark.graphx.impl.GraphImpl@64cfc576

In [19]:
graph_star.edges.collect.foreach(println)

Edge(1,0,1)
Edge(2,0,1)
Edge(3,0,1)
Edge(4,0,1)


In [20]:
graph_star.triplets.collect.foreach(println)

((1,1),(0,1),1)
((2,1),(0,1),1)
((3,1),(0,1),1)
((4,1),(0,1),1)


**3，图的缓存和分区**

如果图要多次被使用，应当使用persist缓存进行。如果确认图不再用到，推荐使用unpersist清理缓存以减轻内存压力。

如果设计迭代算法，推荐使用pregel迭代接口，它能够正确地释放不再使用的中间计算结果。

graphx对图的默认分区策略是切割Vertex而非切割Edge，这种设计更有利于减少存储和分区间的通信压力。

在切割Vertex策略下，可以保证不同的分区是不同的边，但是有些Vertex可能会在多个分区存在。

graphx提供了4种按照Vertex进行切割的具体策略。

RandomVertexCut：以边的srcId和dstId来作Hash，这样两个顶点之间相同方向的边会分配到同一个分区。
 
CanonicalRandomVertexCut：对srcId和dstId的排序结果来作Hash，这样两个顶点之间所有的边都会分配到同一个分区，而不管方向如何。
 
EdgePartition1D：对srcId来作Hash, 同一个vertex出来的edge会被切到同一个分区, supernode问题得不到任何缓解, 仅仅适用于比较稀疏的图.
 
EdgePartition2D：把整个图看成一个稀疏的矩阵, 然后对这个矩阵的行和列进行切分，从而保证顶点的备份数不大于2*sqrt(numParts)的限制。

In [21]:
import org.apache.spark.storage.StorageLevel
val graph_lognormal = util.GraphGenerators.logNormalGraph(sc,16).cache()

//找到最大出度的顶点
println(graph_lognormal.inDegrees.reduce((a,b)=>if(a._2>b._2) a else b))

//找到最大入度的顶点
println(graph_lognormal.outDegrees.reduce((a,b)=>if(a._2>b._2) a else b))

//清空缓存
graph_lognormal.unpersist()

(8,21)
(9,13)


graph_lognormal = org.apache.spark.graphx.impl.GraphImpl@79c04b2e


org.apache.spark.graphx.impl.GraphImpl@79c04b2e

In [27]:
val graph_lognormal = util.GraphGenerators.logNormalGraph(sc,6,4).cache()

//对数正态图默认的分区策略是EdgePartition1D
graph_lognormal.edges.mapPartitions(iter=>Array(iter.toArray).iterator)
.collect.map(arr=>arr.mkString(" ")).foreach(println)


Edge(0,0,1) Edge(0,1,1) Edge(0,1,1) Edge(0,2,1) Edge(0,4,1)
Edge(1,2,1) Edge(1,3,1) Edge(1,3,1) Edge(1,5,1) Edge(2,3,1)
Edge(3,0,1) Edge(3,2,1) Edge(3,4,1) Edge(3,5,1)
Edge(4,0,1) Edge(4,4,1) Edge(5,1,1) Edge(5,1,1) Edge(5,2,1) Edge(5,3,1)


graph_lognormal = org.apache.spark.graphx.impl.GraphImpl@56f8fdf4


org.apache.spark.graphx.impl.GraphImpl@56f8fdf4

In [28]:
//修改分区策略为CanonicalRandomVertexCut
val graph_repartition = graph_lognormal.partitionBy(PartitionStrategy.CanonicalRandomVertexCut,4)
graph_repartition.edges.mapPartitions(iter=>Array(iter.toArray).iterator)
.collect.map(arr=>arr.mkString(" ")).foreach(println)

Edge(0,4,1) Edge(3,0,1) Edge(4,0,1)
Edge(0,2,1) Edge(1,3,1) Edge(1,3,1) Edge(3,5,1) Edge(5,3,1)
Edge(0,1,1) Edge(0,1,1) Edge(1,2,1) Edge(2,3,1) Edge(3,2,1)
Edge(0,0,1) Edge(1,5,1) Edge(3,4,1) Edge(4,4,1) Edge(5,1,1) Edge(5,1,1) Edge(5,2,1)


graph_repartition = org.apache.spark.graphx.impl.GraphImpl@20b6f243


org.apache.spark.graphx.impl.GraphImpl@20b6f243

**4，修改属性创建新图**

这几个方法的使用相对简单，都是进行一次map操作后生成新的VertexRDD或EdgeRDD替换掉已有Graph的对应部分，得到新的Graph。

```scala
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
```

我们首先构造如下一个社交关系图，然后利用mapTriplets给边添加属性值。

如果边属性为"is_friends_with"，并且其源顶点属性中包含字母"a"，则添加属性值 true,否则添加属性值false。

![](data/社交图范例.png)

In [29]:
val myVertices = sc.makeRDD(Array((1L,"Ann"),(2L,"Bill"),(3L,"Charles"),
                                  (4L,"Diane"),(5L,"Went to gym this morning")))

val myEdges = sc.makeRDD(Array(Edge(1L,2L,"is-friends-with"),Edge(2L,3L,"is-friends-with"),
                               Edge(3L,4L,"is-friends-with"),Edge(3L,5L,"wrote-status"),
                               Edge(4L,5L,"like-status")))

val myGraph = Graph(myVertices,myEdges)

myVertices = ParallelCollectionRDD[369] at makeRDD at <console>:38
myEdges = ParallelCollectionRDD[370] at makeRDD at <console>:41
myGraph = org.apache.spark.graphx.impl.GraphImpl@63ec5d99


org.apache.spark.graphx.impl.GraphImpl@63ec5d99

In [30]:
//如果边属性为"is_friends_with"，并且其源顶点属性中包含字母"a"，则添加属性值 true,否则添加属性值false。
val newGraph = myGraph.mapTriplets(t => 
    (t.attr, t.attr=="is-friends-with"&&t.srcAttr.toLowerCase.contains("a")))

newGraph.triplets.collect.foreach(println)

((1,Ann),(2,Bill),(is-friends-with,true))
((2,Bill),(3,Charles),(is-friends-with,false))
((3,Charles),(4,Diane),(is-friends-with,true))
((3,Charles),(5,Went to gym this morning),(wrote-status,false))
((4,Diane),(5,Went to gym this morning),(like-status,false))


newGraph = org.apache.spark.graphx.impl.GraphImpl@36d04a5c


org.apache.spark.graphx.impl.GraphImpl@36d04a5c

**5，修改图结构创建新图**

这4个方法的作用简单总结如下：

reverse最简单，将每条边的方向反向。

subgraph过滤一些符合条件的边和顶点构造子图。

mask返回和另外一个graph的公共子图

groupEdges可以对平行边进行merge，但要求平行边位于相同的分区。

```scala
def reverse: Graph[VD, ED]
def subgraph(
  epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
  vpred: (VertexId, VD) => Boolean = ((v, d) => true))
  : Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
```

In [31]:
//reverse可以翻转边
val myGraph_reverse = myGraph.reverse
myGraph_reverse.triplets.collect.foreach(println)

((2,Bill),(1,Ann),is-friends-with)
((3,Charles),(2,Bill),is-friends-with)
((4,Diane),(3,Charles),is-friends-with)
((5,Went to gym this morning),(3,Charles),wrote-status)
((5,Went to gym this morning),(4,Diane),like-status)


myGraph_reverse = org.apache.spark.graphx.impl.GraphImpl@2540c83a


org.apache.spark.graphx.impl.GraphImpl@2540c83a

In [32]:
//subgraph可以通过过滤边和顶点创建子图
val myGraph_charles = myGraph.subgraph(t=> (t.srcAttr=="Charles"||t.dstAttr=="Charles"))
myGraph_charles.triplets.collect.foreach(println)

((2,Bill),(3,Charles),is-friends-with)
((3,Charles),(4,Diane),is-friends-with)
((3,Charles),(5,Went to gym this morning),wrote-status)


myGraph_charles = org.apache.spark.graphx.impl.GraphImpl@114e769d


org.apache.spark.graphx.impl.GraphImpl@114e769d

In [33]:
//mask可以返回和另外一个图的公共子图
val otherVertices = sc.makeRDD(Array((1L,"Ann"),(2L,"Bill"),(3L,"Charles"),
                                  (4L,"Diane"),(6L,"David")))

val otherEdges = sc.makeRDD(Array(Edge(1L,2L,"is-friends-with"),Edge(2L,3L,"is-friends-with"),
                               Edge(3L,4L,"is-friends-with"),Edge(3L,6L,"is-friends-with"),
                               Edge(4L,6L,"is-friends-with")))

val otherGraph = Graph(otherVertices,otherEdges)
val graph_mask = myGraph.mask(otherGraph)

graph_mask.triplets.collect.foreach(println)

((1,Ann),(2,Bill),is-friends-with)
((2,Bill),(3,Charles),is-friends-with)
((3,Charles),(4,Diane),is-friends-with)


otherVertices = ParallelCollectionRDD[400] at makeRDD at <console>:41
otherEdges = ParallelCollectionRDD[401] at makeRDD at <console>:44
otherGraph = org.apache.spark.graphx.impl.GraphImpl@4a8001d8
graph_mask = org.apache.spark.graphx.impl.GraphImpl@2d645610


org.apache.spark.graphx.impl.GraphImpl@2d645610

In [34]:
//groupEdges可以对平行边进行merge

val graph_distance = Graph.fromEdges(sc.makeRDD(
    Array(Edge(1L,2L,10.0),Edge(1L,2L,3.0),
         Edge(2L,3L,5.0),Edge(2L,3L,7.0),
         Edge(1L,4L,2.0))),1).partitionBy(PartitionStrategy.RandomVertexCut,4)

graph_distance.triplets.collect.foreach(println)

((1,1),(2,1),10.0)
((1,1),(2,1),3.0)
((1,1),(4,1),2.0)
((2,1),(3,1),5.0)
((2,1),(3,1),7.0)


graph_distance = org.apache.spark.graphx.impl.GraphImpl@47ba81e8


org.apache.spark.graphx.impl.GraphImpl@47ba81e8

In [249]:
val graph_grouped = graph_distance.groupEdges((a,b)=> math.min(a,b))
graph_grouped.triplets.collect.foreach(println)

((1,1),(2,1),3.0)
((1,1),(4,1),2.0)
((2,1),(3,1),5.0)


graph_grouped = org.apache.spark.graphx.impl.GraphImpl@6f7bbf56


org.apache.spark.graphx.impl.GraphImpl@6f7bbf56

**6，连接其它RDD**


```scala
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
  (mapFunc: (VertexId, VD, Option[U]) => VD2)
  : Graph[VD2, ED]
```

In [35]:
// joinVertices 不会修改点属性的类型
val graph_distance = Graph.fromEdges(sc.makeRDD(
    Array(Edge(1L,2L,10.0),Edge(1L,2L,3.0),
         Edge(2L,3L,5.0),Edge(2L,3L,7.0),
         Edge(1L,4L,2.0))),"").partitionBy(PartitionStrategy.RandomVertexCut,4)

val rdd_city = sc.makeRDD(Array((1L,"Beijing"),(2L,"Nanjing"),(3L,"Shanghai"),(4L,"Tianjing")))

val graph_join = graph_distance.joinVertices[String](rdd_city)((id,v,u) => u)
graph_join.triplets.collect.foreach(println)

((1,Beijing),(2,Nanjing),10.0)
((1,Beijing),(2,Nanjing),3.0)
((1,Beijing),(4,Tianjing),2.0)
((2,Nanjing),(3,Shanghai),5.0)
((2,Nanjing),(3,Shanghai),7.0)


graph_distance = org.apache.spark.graphx.impl.GraphImpl@5dae6db3
rdd_city = ParallelCollectionRDD[462] at makeRDD at <console>:46
graph_join = org.apache.spark.graphx.impl.GraphImpl@101da12f


org.apache.spark.graphx.impl.GraphImpl@101da12f

In [36]:
//outerJoinVertices 可以修改点属性的类型
val rdd_gender = sc.makeRDD(Array((1L,"female"),(2L,"male"),(3L,"male"),(4L,"female")))
val graph_outjoin = 
   myGraph.outerJoinVertices[String,(String,String)](rdd_gender)((id,v,opt)=>(v,opt.getOrElse(" ")))
graph_outjoin.triplets.collect.foreach(println)

((1,(Ann,female)),(2,(Bill,male)),is-friends-with)
((2,(Bill,male)),(3,(Charles,male)),is-friends-with)
((3,(Charles,male)),(4,(Diane,female)),is-friends-with)
((3,(Charles,male)),(5,(Went to gym this morning, )),wrote-status)
((4,(Diane,female)),(5,(Went to gym this morning, )),like-status)


rdd_gender = ParallelCollectionRDD[477] at makeRDD at <console>:41
graph_outjoin = org.apache.spark.graphx.impl.GraphImpl@1d6c4309


org.apache.spark.graphx.impl.GraphImpl@1d6c4309

**7，收集邻居消息**

```scala
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
```

In [37]:
//collectNeighborIds可以获取每个节点附近的邻居节点id
myGraph.collectNeighborIds(EdgeDirection.In).collect

Array((4,Array(3)), (1,Array()), (5,Array(3, 4)), (2,Array(1)), (3,Array(2)))

In [43]:
//collectNeighbors可以获取每个节点附近的邻居节点id和attr
myGraph.collectNeighbors(EdgeDirection.Either).take(2)

Array((4,Array((3,Charles), (5,Went to gym this morning))), (1,Array((2,Bill))))

aggregateMessages在图结构中实现了一个基本的map/reduce编程模型。

sendMsg是map过程，每条边向其src或dst发送一个消息。其输入参数为EdgeContext类型。
EdgeContext类型比Triplet类型多了sendToSrc和sendToDst两个方法，用于发送消息。

mergeMsg是reduce过程，每个顶点收集其收到的消息，并做合并处理。

aggregateMessages的返回值是一个VetexRDD。

In [44]:
//使用 aggregateMessages实现每个顶点的度统计
val degrees = myGraph.aggregateMessages[Int](edge => {edge.sendToDst(1);edge.sendToSrc(1)}, _+_)
degrees.collect.foreach(println)

(4,2)
(1,1)
(5,2)
(2,2)
(3,3)


degrees = VertexRDDImpl[533] at RDD at VertexRDD.scala:57


VertexRDDImpl[533] at RDD at VertexRDD.scala:57

In [45]:
//使用 aggregateMessages实现每个顶点的入度统计
val inDegrees = myGraph.aggregateMessages[Int](_.sendToDst(1), _+_)
.rightOuterJoin(myGraph.vertices).map(t=>(t._2._2,t._2._1.getOrElse(0))).collect

inDegrees = Array((Diane,1), (Ann,0), (Went to gym this morning,2), (Bill,1), (Charles,1))


Array((Diane,1), (Ann,0), (Went to gym this morning,2), (Bill,1), (Charles,1))

大部分算法中都包括多轮迭代，我们可以通过构建循环反复调用aggregateMessages方法进行实现。

我们考虑使用迭代算法计算每个顶点和离它最远的源顶点的距离。假设图是无环图。

算法基本过程如下：

1，给每个顶点赋初始属性值0.

2，每条边向其目标顶点发送消息，消息值为该边源顶点的属性值+1.

3，每个顶点收集所有消息，取消息中的最大值。

4，重复执行第2，3步骤，直到图中每个顶点的属性值都不再发生改变。


In [46]:
var g = myGraph.mapVertices((_,_)=>0)
var diff = 1
while(diff>0){
    val vertes = g.aggregateMessages[Int](ec =>ec.sendToDst(ec.srcAttr+1), math.max(_,_))
    val g2 = Graph(vertes,g.edges)
    diff = g2.vertices.join(g.vertices).map(t => t._2._1 - t._2._2).reduce(_+_)
    g = g2
}

g.vertices.collect.foreach(println)

(4,3)
(1,0)
(5,4)
(2,1)
(3,2)


g = org.apache.spark.graphx.impl.GraphImpl@59c46001
diff = 0


0

In [47]:
//以上代码也可以使用递归函数进行实现
def propagateEdgeCount(g:Graph[Int,String]):Graph[Int,String] = {
    val vertes = g.aggregateMessages[Int](ec =>ec.sendToDst(ec.srcAttr+1), math.max(_,_))
    val g2 = Graph(vertes,g.edges)
    val diff = g2.vertices.join(g.vertices).map(t => t._2._1 - t._2._2).reduce(_+_)
    if(diff>0)
        propagateEdgeCount(g2)
    else
       g
}

val g = propagateEdgeCount(myGraph.mapVertices((_,_)=>0))
g.vertices.collect.foreach(println) 

(4,3)
(1,0)
(5,4)
(2,1)
(3,2)


g = org.apache.spark.graphx.impl.GraphImpl@60df0418


propagateEdgeCount: (g: org.apache.spark.graphx.Graph[Int,String])org.apache.spark.graphx.Graph[Int,String]


org.apache.spark.graphx.impl.GraphImpl@60df0418

**8，pregel迭代接口**

上述使用aggregateMessages进行迭代的方法尽管已经非常简短了，但是其迭代过程中中间结果的缓存问题可能会给程序的性能造成影响。

使用pregel迭代接口能够很好地进行缓存优化。

```scala
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
  vprog: (VertexId, VD, A) => VD,
  sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
  mergeMsg: (A, A) => A)
  : Graph[VD, ED]
```

pregel迭代接口基于Graphx的基础API实现，实现方式相当简洁，其代码不过20多行。

它主要使用了mapVertices，GraphXUtils.mapReduceTriplets，以及joinVertices这3个基础API进行实现。

其中mapReduceTriplets和aggregateMessages作用非常相似，都是一个map/reduce操作，

主要差异是其参数方法sendMsg的输入输出类型不太一样。

```scala
class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDirection: EdgeDirection = EdgeDirection.Either)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message.
      //We must cache messages so it can be materialized on the next line, 
      //allowing us to uncache the previous iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}
```

pregel迭代接口有2个参数列表。

第一个参数列表完成了一些配置工作，三个参数分别是initialMsg、maxIter和activeDirection。

分别设置了初始消息，最大迭代次数和边激活的条件。


第二个参数列表有三个函数参数：vprog、sendMsg和mergeMsg.

vprog是顶点更新函数，它在每轮迭代的最后一步用mergeMsg的结果更新顶点属性，并在初始化时用initialMsg初始化图。

sengMsg是消息发送函数。其输入参数类型为EdgeTriplet，输出参数类型为一个Iterator，与aggregateMessages中的sengMsg有所不同。

需要注意的是，为了让算法结束迭代，需要在合适的时候让其返回一个空的Iterator

mergeMsg是消息合并函数。与aggregateMessages中的mergeMsg一样。

pregel在迭代的每一步都会生成一个新的图，直到没有新的消息产生或达到最大迭代次数退出。

重点讲解一下activeDirection，它是边的活跃状态的控制参数。在一轮迭代后，所有收到消息的顶点都是活跃的顶点。

活跃顶点将状态传递给边的方式由activeDirection控制，activeDirection有4个候选取值。

EdgeDirection.Out: 只有边的srcId的顶点在上一轮收到了消息，这条边才允许发送消息。即顶点活跃状态传递给它的出边。

EdgeDirection.In: 只有边的dstId的顶点在上一轮收到了消息，这条边才允许发送消息。即顶点活跃状态传递给它的入边。

EdgeDirection.Both: 只有边的srcId和desId两个顶点在上一轮都收到了消息，这条边才允许发送消息。即只有两个顶点都活跃，边才活跃。

EdgeDirection.Either: 只要边的srcId或desId顶点在上一轮收到了消息，这条边就可以发送消息。即顶点活跃状态传递给它的入边和出边。这是默认值。

下面我们基于pregel接口来重新实现：计算每个顶点和离它最远的源顶点的距离。

In [49]:
val init_graph = myGraph.mapVertices((vid,vd) => 0) 
val g = init_graph.pregel[Int](initialMsg = 0)(
    (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
    (et:EdgeTriplet[Int,String])=> if(et.srcAttr+1>et.dstAttr) Iterator((et.dstId, et.srcAttr+1)) 
      else Iterator.empty,
    (a:Int,b:Int) => math.max(a,b)
)

g.vertices.collect.foreach(println)

(4,3)
(1,0)
(5,4)
(2,1)
(3,2)


init_graph = org.apache.spark.graphx.impl.GraphImpl@5633fcb4
g = org.apache.spark.graphx.impl.GraphImpl@63a6a282


org.apache.spark.graphx.impl.GraphImpl@63a6a282

### 五，VertexRDD和EdgeRDD类的补充方法

除了Graph类定义的这些方法，VertexRDD类和EdgeRDD类在继承了RDD类的方法的基础上，还有一些补充方法。

**1，VertexRDD类的补充方法**

```scala
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
    
  // 1，过滤顶点
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  def minus(other: RDD[(VertexId, VD)])
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
    
  // 2，修改属性
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
    
  // 3,连接操作
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
    
  // 4,使用当前VertexRDD索引加速对其它VertexRDD的reduceByKey操作
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
```

In [50]:
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 5L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 10L)
    .flatMap(id => List((id, 1.0), (id, 2.0)))
rddB.count

setA = VertexRDDImpl[962] at RDD at VertexRDD.scala:57
rddB = MapPartitionsRDD[964] at flatMap at <console>:40


20

In [51]:
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
println(setB.count)
setB.collect.foreach(println)

5
(4,3.0)
(0,3.0)
(1,3.0)
(2,3.0)
(3,3.0)


setB = VertexRDDImpl[967] at RDD at VertexRDD.scala:57


VertexRDDImpl[967] at RDD at VertexRDD.scala:57

In [52]:
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
setC.collect.foreach(println)

(4,4.0)
(0,4.0)
(1,4.0)
(2,4.0)
(3,4.0)


setC = VertexRDDImpl[970] at RDD at VertexRDD.scala:57


VertexRDDImpl[970] at RDD at VertexRDD.scala:57

**2，EdgeRDD类的补充方法**

```scala
class EdgeRDD[ED] extends RDD[Edge[ED]] {
    // 1,修改属性
    def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
    // 2,改变方向
    def reverse: EdgeRDD[ED]
    // 3,连接操作
    def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]   
}

```

### 六，Graphx内置常用图算法

Graphx内置的图算法一些作为GraphOps类的方法存在，另外一些在graphx.lib中。

Graph类和GraphOps类的关系就像RDD和PairRDD的关系，必要时候Graph对象可以通过隐式转换变成GraphOps对象。

这些内置图算法主要包括：

PageRank: 可以由PageRank值衡量节点的重要程度，常用于网页排名，社区关键人物分析。

personalizedPageRank: 个性化的PageRank值，可用于社交网站中推荐"你可能认识的人"。 

triangleCount: 三角形个数，可以衡量周围的节点的连通性，也可以用于衡量网络总体的联通性。

ShortestPaths: 最小跳跃数，可以找到图中全部顶点和给定顶点的最小跳跃数。

connectedComponents: 联通组件，可以在社交网络中找到社交圈子。

stronglyConnectedComponents: 增强联通组件，针对有向图，可以找到社交圈子

LabelPropagation: 标签传播算法，可以用于社区发现。但往往不收敛，不是特别推荐使用。

**1，PageRank**


PageRank网页排名算法由谷歌创始人拉里·佩奇和谢尔盖·布林在1998年提出。

可用于搜索引擎页面排名，或者在论文引用关系网中找到最有影响力的论文。

总之，它可以衡量一个顶点在一个网络中的"重要性"程度。

PageRank的基本思想是被其它网页通过超链接引用越多的网页就越重要，并且被越重要网页引用的网页也越重要。

PageRank值相当于在inDegrees的基础上，增加了引用来源网页的重要性作为权重因子。

PageRank值通过迭代方法进行计算，其物理含义可以用这样一个思想实验来说明。

假定有许许多多的用户在各个网页之间随机地通过超链接进行跳转，那么当达到动态均衡时，

停留在某网页的用户数量占全部用户的比例就可以衡量为该网页的PageRank值。实际中的PageRank值还会做一些线性缩放。

PageRank的迭代公式如下：

$$PR'_i = resetProb + (1-resetProb) \sum _{(j: j->i)} {\frac{PR_j}{OutDegree_j}}$$

其中resetProb 为重置概率，即用户不通过超链接，而是直接访问某个页面的概率，默认值为0.15。

重置概率可以防止某些只有出边没有入边的PageRank值衰减到零。

求和项为将所有有超链接指向$i$的网页的PageRank值根据这项网页的出度均分后转移到$i$。

在经过许多轮迭代后，各个网页的PageRank值基本稳定不变时，算法即宣告收敛。  

In [53]:
//pageRank方法会生成一个新的图，图的顶点属性即PR值
val graph_paper = GraphLoader.edgeListFile(sc,"data/paperCite.edges")
val graph_PR = graph_paper.pageRank(0.01) //参数0.01是前后两次迭代的PR差异阈值
//查看最重要的几篇论文
graph_PR.vertices.sortBy(k=>k._2,false).take(10).foreach(println)

(21517,37.56088066394292)
(21487,29.675728164383607)
(21348,26.87597352177093)
(21566,25.62027530706551)
(21572,25.074895000031738)
(21318,24.321173574264403)
(21319,23.958485566968843)
(21575,23.14642376495923)
(21254,23.133255380450336)
(21618,21.903525791885322)


graph_paper = org.apache.spark.graphx.impl.GraphImpl@19040f3c
graph_PR = org.apache.spark.graphx.impl.GraphImpl@5f0d6b08


org.apache.spark.graphx.impl.GraphImpl@5f0d6b08

In [54]:
//调用graphx.lib中的PageRank方法来跑PageRank
val graph_PageRank = lib.PageRank.run(graph_paper,numIter = 20,resetProb = 0.1) 
graph_PageRank.vertices.sortBy(k=>k._2,false).take(10).foreach(println)

(21517,50.63490034773536)
(21487,37.95316837182138)
(21572,36.29677001702333)
(21566,33.93200610848722)
(21348,32.78003434149494)
(21318,30.639868206727012)
(21319,30.63541621071055)
(21575,30.104113971137526)
(21618,29.71268516135662)
(21254,28.823289062162544)


graph_PageRank = org.apache.spark.graphx.impl.GraphImpl@44790a53


org.apache.spark.graphx.impl.GraphImpl@44790a53

**2，personalizedPageRank**

个性化PageRank是 PageRank的一个变种，可以用于在社交网站中给用户推荐"你可能认识的人"。

personalizedPageRank除了要设定一个迭代终止的条件，还要指定一个源顶点的srcId.

在PageRank原理中，有一个重置概率，即用户不通过超链接直接进入某个页面。

而在个性化的PageRank中，除了指定源顶点的重置概率不为零，其余顶点的重置概率都为0.


$$PR'_i = resetProb\;\delta _{is} + (1-resetProb) \sum _{(j: j->i)} {\frac{PR_j}{OutDegree_j}}$$

In [55]:
//找到和21517论文关联最高的论文
val graph_paper = GraphLoader.edgeListFile(sc,"data/paperCite.edges")
val graph_PR_21517 = graph_paper.personalizedPageRank(21517L,0.01)
graph_PR_21517.vertices.filter(_._1!=21517L).reduce((a,b)=> if(a._2>b._2) a else b)

graph_paper = org.apache.spark.graphx.impl.GraphImpl@660de127
graph_PR_21517 = org.apache.spark.graphx.impl.GraphImpl@58868925


(21572,0.23637585271284656)

**3，triangleCount**

三角形数量可以衡量顶点周围网络连通性。graphx在计算三角形数量时，会忽略边的方向。

微信朋友圈的互动规则就是基于三角形关系的。如果A和B是好友，A和C也是好友，B和C却不是好友，那么如果A发了一个状态，B点了一个赞，C能看到A的状态，却看不到B的点赞。只有A和B是好友，A和C是好友，并且B和C也是好友，三个人构成了三角形关系的前提下，B和C才能在A的状态下看到彼此的点赞。


In [56]:
val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(1L,3L),(2L,3L),
            (1L,4L),(1L,5L),(3L,5L),(3L,4L))),1)
val graph_triangle = graph.triangleCount()
graph_triangle.vertices.collect()

graph = org.apache.spark.graphx.impl.GraphImpl@7ecc443c
graph_triangle = org.apache.spark.graphx.impl.GraphImpl@62e29954


Array((4,1), (1,3), (5,1), (2,1), (3,3))

**4,ShortestPaths**

ShortestPaths虽然命名上是最短路径，但其实际含义是计算各个顶点到给定顶点的最小跳跃数。

In [61]:
val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(2L,3L),(3L,4L),
    (4L,5L),(1L,5L))),1)

lib.ShortestPaths.run(graph,Array(4L)).vertices.collect()

graph = org.apache.spark.graphx.impl.GraphImpl@75340945


Array((4,Map(4 -> 0)), (1,Map(4 -> 3)), (5,Map()), (2,Map(4 -> 2)), (3,Map(4 -> 1)))

**5,connectedComponents**

connectedComponents连通组件会将图划分成几个连通区域，每个顶点的属性值为其所在连通区域中顶点编号的最小值。
connectedComponents的一种巧妙用法是用来在spark上实现DBSCAN算法，可以用它来对临时聚类簇进行合并。

连通组件不关心边的方向。

In [62]:
val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(2L,3L),(3L,1L),(5L,5L),(6L,7L))),1)
val graph_connected = graph.connectedComponents()
graph_connected.vertices.collect

graph = org.apache.spark.graphx.impl.GraphImpl@20f2578d
graph_connected = org.apache.spark.graphx.impl.GraphImpl@c973c99


Array((1,1), (5,5), (6,6), (2,1), (3,1), (7,6))

In [63]:
graph_connected.vertices.map(t=>(t._2,Set(t._1))).reduceByKey((s1,s2)=>s1|s2).collect

Array((1,Set(1, 2, 3)), (5,Set(5)), (6,Set(6, 7)))

**6，stronglyConnectedComponents**

stronglyConnectedComponents强连通组件和连通组件作用类似，但是它还关心边的方向。

在强连通组件中，每个顶点都可以通过其它顶点到达。

强连通组件由于边有方向，为了避免环的存在，需要设置最大迭代次数。

In [64]:
val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(2L,3L),(3L,1L),(5L,5L),(6L,7L))),1)
val graph_stronglyconnected = graph.stronglyConnectedComponents(20) //设置最大迭代次数为20
graph_stronglyconnected.vertices.collect

graph = org.apache.spark.graphx.impl.GraphImpl@5b403805
graph_stronglyconnected = org.apache.spark.graphx.impl.GraphImpl@7353a186


Array((1,1), (5,5), (6,6), (2,1), (3,1), (7,7))

In [65]:
graph_stronglyconnected.vertices.map(t=>(t._2,Set(t._1))).reduceByKey((s1,s2)=>s1|s2).collect

Array((1,Set(1, 2, 3)), (5,Set(5)), (6,Set(6)), (7,Set(7)))

**7，LabelPropagation**

为了识别出图中紧密交织的群体，GraphX 提供了标签传播算法(LPA).
这个想法是让稠密连接的顶点组在一个唯一的标签上达成一致，所以这些顶点组被定义为一个社区。
不幸的是,LPA常常不是收敛的，所以需要指定一个最大迭代次数。

In [66]:
val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(2L,3L),(3L,4L),(4L,1L),
            (1L,3L),(2L,4L),(4L,5L),(5L,6L),(6L,7L),(7L,8L),(8L,9L),(5L,7L),(6L,8L))),1)
val graph_LPA = lib.LabelPropagation.run(graph,10).vertices.collect.sortWith(_._1<_._1)


graph = org.apache.spark.graphx.impl.GraphImpl@63880b20
graph_LPA = Array((1,1), (2,2), (3,1), (4,2), (5,1), (6,6), (7,5), (8,5), (9,6))


Array((1,1), (2,2), (3,1), (4,2), (5,1), (6,6), (7,5), (8,5), (9,6))

### 七，其它常用图算法

Graphx内置的一些图算法基本上是用pregel迭代API实现的。

还有一些非常经典的图算法不太适合使用pregel迭代API实现，因此它们在Graphx中没有对应的内置实现。这些算法本质上也是迭代算法，例如每次迭代添加一条边。本节我们将主要使用诸如mapVertices和函数outerJoinVertices函数来实现和并行化这些原本被设计为顺序执行的算法。

这些算法包括：

最短路径算法(Dijkstra)：找到图中各个顶点到给定顶点的最短路径。

旅行推销员问题(TSP)：在图中找到一条访问每个顶点一次并回到出发点的最短路径。

最小生成树算法(Kruskal)：在一棵树(无环图)中 ，找到一个生成树，其边权值之和小于任何其他生成树边权值之和。

**1，最短路径算法(Dijkstra)**

Dijkstra算法实际上是一种广度优先搜索算法，可以用pregel迭代API进行实现。

![](data/最短路径.png)

In [67]:
val verts = sc.makeRDD(Array((1L,"A"),(2L,"B"),(3L,"C"),
(4L,"D"),(5L,"E"),(6L,"F"),(7L,"G")))

val edges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0) ,
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
Edge(3L, 5L, 5.0), Edge(5L, 6L, 8.0), Edge(4L, 5L,15.0), Edge (4L, 6L, 6.0),
Edge(5L, 7L, 9.0) , Edge(6L, 7L, 11.0))) 

val graph = Graph(verts, edges)

//使用pregel API 实现 有向图的Dijkstra，初始点为A
val init_graph = graph.mapVertices((vid,vd)=>if(vid==1L) 0.0 else Double.PositiveInfinity)
val g = init_graph.pregel[Double](initialMsg = Double.PositiveInfinity,
     activeDirection = EdgeDirection.Out)(
    (vid:VertexId,vd:Double,a:Double) => math.min(vd,a), //vprog
    (et:EdgeTriplet[Double,Double]) => ({
        val candidate = et.srcAttr+et.attr
        if(candidate<et.dstAttr) Iterator((et.dstId,candidate)) else Iterator.empty
    }),//sendMsg
    (a:Double,b:Double) => math.min(a,b) //mergeMsg
)

val output_graph = g.outerJoinVertices[String,(String,Double)](graph.vertices)(
    (vid:Long,vd:Double,opt:Option[String])=>(opt.getOrElse(" "),vd))

output_graph.vertices.map(t => t._2).collect

verts = ParallelCollectionRDD[2777] at makeRDD at <console>:43
edges = ParallelCollectionRDD[2778] at makeRDD at <console>:46
graph = org.apache.spark.graphx.impl.GraphImpl@34e2cbd4
init_graph = org.apache.spark.graphx.impl.GraphImpl@6531dd81
g = org.apache.spark.graphx.impl.GraphImpl@11419de
output_graph = org.apache.spark.graphx.impl.GraphImpl@39e1b8a


Array((D,5.0), (A,0.0), (E,14.0), (F,11.0), (B,7.0), (C,15.0), (G,22.0))

In [68]:
//使用pregel API 实现无向图的Dijkstra，初始点为B
val init_graph = graph.mapVertices((vid,vd)=>if(vid==2L) 0.0 else Double.PositiveInfinity)
val g = init_graph.pregel[Double](initialMsg = Double.PositiveInfinity,
    activeDirection = EdgeDirection.Either)(
    (vid:VertexId,vd:Double,a:Double) => math.min(vd,a), //vprog
    (et:EdgeTriplet[Double,Double]) => ({
        val candidate_dst = et.srcAttr+et.attr
        val candidate_src = et.dstAttr+et.attr
        val iter = if(candidate_dst<et.dstAttr){
            Iterator((et.dstId,candidate_dst))
        }else if(candidate_src<et.srcAttr){
            Iterator((et.srcId,candidate_src))
        }else{
            Iterator.empty
        }
        iter
    }),//sendMsg
    (a:Double,b:Double) => math.min(a,b) //mergeMsg
)
g.vertices.collect

init_graph = org.apache.spark.graphx.impl.GraphImpl@464b5293
g = org.apache.spark.graphx.impl.GraphImpl@38c811f2


Array((4,9.0), (1,7.0), (5,7.0), (6,15.0), (2,0.0), (3,8.0), (7,16.0))

In [4]:
//使用 pregel API 实现有向图的Dijkstra，初始点为A，并使用列表记录路径
val init_graph = graph.mapVertices((vid,vd)=>if(vid==1L) (0.0,List[String](vd)) 
                                   else (Double.PositiveInfinity,List[String](vd)))

//使用pregel API 实现 有向图的Dijkstra，初始点为A
val g = init_graph.pregel[(Double,List[String])](initialMsg=(Double.PositiveInfinity,List[String]()),
    activeDirection = EdgeDirection.Out)(
    (vid:VertexId,vd:(Double,List[String]),a:(Double,List[String]))=>if(a._1<vd._1) a else vd, //vprog
    (et:EdgeTriplet[(Double,List[String]),Double]) => ({
        val candidate = et.srcAttr._1 + et.attr
        if(candidate<et.dstAttr._1)Iterator((et.dstId,(candidate,et.srcAttr._2:+ et.dstAttr._2.last))) 
        else Iterator.empty
    }),//sendMsg
    (a:(Double,List[String]),b:(Double,List[String])) => if(a._1<b._1) a else b//mergeMsg
)

g.vertices.collect

init_graph = org.apache.spark.graphx.impl.GraphImpl@7b0e8a10
g = org.apache.spark.graphx.impl.GraphImpl@544d7f9b


Array((4,(5.0,List(A, D))), (1,(0.0,List(A))), (5,(14.0,List(A, B, E))), (6,(11.0,List(A, D, F))), (2,(7.0,List(A, B))), (3,(15.0,List(A, B, C))), (7,(22.0,List(A, D, F, G))))

In [70]:
//使用 pregel API 实现有向图的Dijkstra，初始点为A，并使用字符串记录路径, 
val init_graph = graph.mapVertices((vid,vd)=>
    if(vid==1L) (0.0,vd) else (Double.PositiveInfinity,vd))

//使用pregel API 实现 有向图的Dijkstra，初始点为A
val g = init_graph.pregel[(Double,String)](initialMsg = (Double.PositiveInfinity,""),
    activeDirection = EdgeDirection.Out)(
    (vid:VertexId,vd:(Double,String),a:(Double,String)) => if(a._1<vd._1) a else vd, //vprog
    (et:EdgeTriplet[(Double,String),Double]) => ({
        val candidate = et.srcAttr._1 + et.attr
        if(candidate<et.dstAttr._1) 
            Iterator((et.dstId,(candidate,et.srcAttr._2+"->"+et.dstAttr._2.last))) 
        else 
            Iterator.empty
    }),//sendMsg
    (a:(Double,String),b:(Double,String)) => if(a._1<b._1) a else b//mergeMsg
)

g.vertices.collect.foreach(println)

(4,(5.0,A->D))
(1,(0.0,A))
(5,(14.0,A->B->E))
(6,(11.0,A->D->F))
(2,(7.0,A->B))
(3,(15.0,A->B->C))
(7,(22.0,A->D->F->G))


init_graph = org.apache.spark.graphx.impl.GraphImpl@62c5ec47
g = org.apache.spark.graphx.impl.GraphImpl@39deb020


org.apache.spark.graphx.impl.GraphImpl@39deb020

**2，旅行推销员问题(TSP)**

旅行推销员问题(TSP)是在一个无向图中找到一个经过每一个顶点的最短路径。 假如有一个推销员，他要到某一地区的所有城市去推销，他想要走过的总路程最少。

旅行推销员问题是一个NP-Hard问题，没有一个有效的算法在多项式时间复杂度内得到确定的解。我们可以使用如下贪心算法得到近似解。

TSP问题的贪心算法：

* 1，从某些点开始
* 2，添加权重最小的邻边到路径中。
* 3，以该边的终点为新的起点，跳到第2步。

对于旅行推销员问题来说，贪心算法是最简单的，缺点是不会总是到达所有顶点。在这 个例子中，顶点 G 就没有到达。

贪心算法可在不用增加太多代码的情况下，用不同的起始顶点重新运行整个算法，不断迭代，挑选出一个到达所有顶点并且最短的解决方案。

![](data/TSP问题.png)

In [71]:
val verts = sc.makeRDD(Array((1L,"A"),(2L,"B"),(3L,"C"),
(4L,"D"),(5L,"E"),(6L,"F"),(7L,"G")))

val edges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0) ,
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
Edge(3L, 5L, 5.0), Edge(5L, 6L, 8.0), Edge(4L, 5L,15.0), Edge (4L, 6L, 6.0),
Edge(5L, 7L, 9.0) , Edge(6L, 7L, 11.0))) 

val graph = Graph(verts, edges)

verts = ParallelCollectionRDD[3067] at makeRDD at <console>:41
edges = ParallelCollectionRDD[3068] at makeRDD at <console>:44
graph = org.apache.spark.graphx.impl.GraphImpl@76df072a


org.apache.spark.graphx.impl.GraphImpl@76df072a

In [72]:
//使用图的顶点属性来记录访问顺序，0表示没有被访问

//从A点开始
var graph_tsp = graph.mapVertices((vid,vd) => if(vd!="A") (vd,0L) else (vd,1L))

//找到连接当前顶点和未访问顶点的边
var i = 1L 
var near_triplets = graph_tsp.triplets.filter(t=>
    {(t.srcAttr._2==i && t.dstAttr._2 ==0L)||(t.dstAttr._2==i && t.srcAttr._2 ==0L)})

while(near_triplets.count > 0){
    //取其中最短的一条连向的另外一个顶点作为下一个访问顶点。
    val min_t = near_triplets.reduce((a,b)=> if(a.attr<b.attr) a else b)
    val next_id = if(min_t.srcAttr._2 ==0L) min_t.srcId else min_t.dstId
    i=i+1
    
    //使用outerJoinVertices修改图顶点属性
    graph_tsp = graph_tsp.outerJoinVertices[Long,(String,Long)](sc.makeRDD(List((next_id,i))))(
      (vid,vd,opt) => (vd._1, opt.getOrElse(vd._2)))
    
    near_triplets = graph_tsp.triplets.filter(t=>
     {(t.srcAttr._2==i && t.dstAttr._2 ==0L)||(t.dstAttr._2==i && t.srcAttr._2 ==0L)})
}

//输出遍历顺序 
graph_tsp.vertices.map(_._2).filter(_._2>0).sortBy(_._2).collect

graph_tsp = org.apache.spark.graphx.impl.GraphImpl@6eb97170
i = 6
near_triplets = MapPartitionsRDD[3150] at filter at <console>:60


Array((A,1), (D,2), (F,3), (E,4), (C,5), (B,6))

**3，最小生成树算法(Kruskal)**

最小生成树问题是为了寻找包含图的每一个顶点的总边长度最小的子图。

由于这样的子图包括了原始图中的每一个顶点，并且其边之和是最短的，所以可以叫做最小生成子图。

这样总边之和最短的图必定不会形成环，否则的话，去掉环中的一段，新得到的子图依然包括了图中的每一个顶点，但其边之和却可以变短。

所以最小生成子图实际上是一个树结构，一般称之为最小生成树。

解决最小生成树问题的解法是Kruskal算法，这是一种贪心算法。

但是和TSP算法不同，可以从数学上用反证法证明Kruskal算法的解一定是最优的。

最小生成树的最直接应用是在路径规划工具方面(道路、电力、水等)，用来确保这些基础设施资源能在最小消耗的前提下到达所有城市(例如最短距离，路径图的边权值表示城市间的距离)。 也有一些不太显著的应用，如在相似事物的集合上做分类，例如动物 (用于科学分类)或 报纸头条。

解决最小生成树的Kruskal算法可以表述如下：

* 1,初始化集合中的边，构建一个空的最小生成树。

* 2,找到图中最短的边，将其添加到结果集合中。其对应的两个顶点设置成已访问顶点。

* 3,找到连接已访问顶点和未访问顶点中的边的最短的那条，将其添加到结果集合中。对应的未访问顶点设置成已访问顶点。

* 4,重复步骤3，直到所有顶点都已经被访问。


![](data/最小生成树.png)

In [73]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx._
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD

//完整范例，增加Serializable特质以解决无法序列化问题。

object TestKruskal extends Serializable{
    def main(args:Array[String]):Unit = {
        val spark = SparkSession.builder()
           .master("local[4]").appName("graph")
           .getOrCreate()
        
        import spark.implicits._
        val sc = spark.sparkContext
        
        //构建图
        val verts = sc.makeRDD(Array((1L,"A"),(2L,"B"),(3L,"C"),
            (4L,"D"),(5L,"E"),(6L,"F"),(7L,"G")))

        val edges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0) ,
            Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
            Edge(3L, 5L, 5.0), Edge(5L, 6L, 8.0), Edge(4L, 5L,15.0), Edge (4L, 6L, 6.0),
            Edge(5L, 7L, 9.0) , Edge(6L, 7L, 11.0))) 

        val graph = Graph(verts, edges)

        //使用图的边属性来记录该边是否在最小生成树中，0表示不在生成树中
        //使用图的顶点属性表示该顶点是否已经被生成树触达，0表示尚未触达
        
        //1，找到最短的1条边
        val min_t = graph.triplets.reduce((a,b)=> if(a.attr<b.attr) a else b)
        val init_graph = graph.mapVertices((vid,vd) => 
            if(vid!=min_t.srcId && vid!=min_t.dstId) (vd,0) else (vd,1))
        var graph_kruscal = init_graph.
          mapTriplets( t => if(t.srcId==min_t.srcId && t.dstId==min_t.dstId) 
                      (t.attr,1) else (t.attr,0))
        
        //2,依次找到连接已访问顶点和未访问顶点中的最短的边
        
        for(i<-3 to graph.numVertices.toInt){
            val min_t = graph_kruscal.triplets.filter(t =>t.srcAttr._2+t.dstAttr._2==1)
               .reduce((a,b)=> if(a.attr._1<b.attr._1) a else b)
            
            graph_kruscal = graph_kruscal.mapVertices((vid,vd) =>
               if(vid!=min_t.srcId && vid!=min_t.dstId) vd else (vd._1,1))
            graph_kruscal = graph_kruscal.mapTriplets( t => 
               if(t.srcId==min_t.srcId && t.dstId==min_t.dstId) (t.attr._1,1) else t.attr)
            
        }
        
        graph_kruscal.edges.filter(_.attr._2==1).toDS.show(100)
    }
}

TestKruskal.main(Array(""))


defined object TestKruskal


+-----+-----+--------+
|srcId|dstId|    attr|
+-----+-----+--------+
|    1|    2|[7.0, 1]|
|    1|    4|[5.0, 1]|
|    2|    5|[7.0, 1]|
|    3|    5|[5.0, 1]|
|    4|    6|[6.0, 1]|
|    5|    7|[9.0, 1]|
+-----+-----+--------+



### 八，图和机器学习

大部分机器学习算法使用矩阵或者张量作为其数据结构，但也有一些算法使用图作为其数据结构，此外还有相当一部分算法的实现中用到了图。

一些常用的和图相关的机器学习算法简单介绍如下。

1，监督学习

SVDPLUSPLUS算法：这是一个商品推荐算法，使用EdgeRDD作为输入，可以通过graphx.lib.SVDPLUSPLUS进行调用。

2，非监督学习

LDA文本主题算法：LDA文本主题模型可以将文本映射为主题向量，从而对文档进行聚类。它属于mllib库，但其最大期望算法EM的实现可以用图来进行加速。

PIC聚类算法：幂迭代聚类算法可以用于图像分割。它可以通过mllib.clustering.PoweriterationClustering进行调用。

3，半监督学习

基于K近邻图的标签传播算法：可以利用图结构，将少量顶点的标签传递到其近邻的未知标签顶点上（可以按照边的权重倒数作概率加权）。

得到较多的含有标签的顶点后，再利用K邻近的方式对未知顶点的标签进行预测。
