研究网络中各个节点的重要程度；发现网络的聚类结构；

有环图上设计算法需要考虑终止条件，否则算法可能会沿着环永远循环下去；

Graphx的图都是属性图；

二分图可用于交友网站，源顶点来自男性集合，目标顶点来自女性集合。二分图也可以用于推荐系统，源顶点来自用户，目标顶点来自商品；

在Graphx中，用顶点属性表VertexRDD和边属性表EdgeRDD联合来表示图。


In [None]:
// 创建VertexRDD，注意VertexId必须是Long类型
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// 创建EdgeRDD
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// 设置缺失顶点
val defaultUser = ("John Doe", "Missing")

// 使用apply构造函数创建图
val graph_user:Graph[(String, String), String] = Graph(users, relationships, defaultUser)

//查看图的部分数据，triplets同时存储了边属性信息和对应顶点属性信息。
graph_user.triplets.take(5).foreach(println)

// 在用的时候不一定是点和边都构造好，也可以从边上进行构造
val graph_distance = Graph.fromEdges(sc.makeRDD(
    Array(Edge(1L,2L,10.0),Edge(1L,2L,3.0),
         Edge(2L,3L,5.0),Edge(2L,3L,7.0),
         Edge(1L,4L,2.0))),1).partitionBy(PartitionStrategy.RandomVertexCut,4)

#### Graph的各种接口方法的签名如下所示，大概有9组30多个方法。

In [None]:
class Graph[VD, ED] {
    
  // 1，图的信息 
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]

  // 2，图的视图 
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]

  // 3，图的缓存和分区
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  // 4，修改属性创建新图 
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

  // 5，修改图结构创建新图 
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
    
  // 6，连接其它RDD
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
    
  // 7，收集邻居消息
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
    
  // 8，pregel迭代接口 
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
    
  // 9，内置常用图算法
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

edges和vertices必须至少包括1个属性，如果没有，一般给每个顶点和边填充一个1作为属性；

在实际应用的时候，可以给顶点和边的属性封装成为对象；

triplets 代表的是三元组 （（起点，属性）， （终点，属性）， 边-属性）

如果图要多次被使用，应当使用persist缓存进行。如果确认图不再用到，推荐使用unpersist清理缓存以减轻内存压力。

如果设计迭代算法，推荐使用pregel迭代接口，它能够正确地释放不再使用的中间计算结果。

修改属性创建新图，都是进行一次map操作后生成新的VertexRDD或EdgeRDD替换掉已有Graph的对应部分，得到新的Graph。

In [None]:
//如果边属性为"is_friends_with"，并且其源顶点属性中包含字母"a"，则添加属性值 true,否则添加属性值false。
val newGraph = myGraph.mapTriplets(t => 
    (t.attr, t.attr=="is-friends-with"&&t.srcAttr.toLowerCase.contains("a")))

修改图结构创建新图

这4个方法的作用简单总结如下：

reverse最简单，将每条边的方向反向。

subgraph过滤一些符合条件的边和顶点构造子图。

mask返回和另外一个graph的公共子图

groupEdges可以对平行边进行merge，但要求平行边位于相同的分区。

In [None]:
//groupEdges可以对平行边进行merge

val graph_distance = Graph.fromEdges(sc.makeRDD(
    Array(Edge(1L,2L,10.0),Edge(1L,2L,3.0),
         Edge(2L,3L,5.0),Edge(2L,3L,7.0),
         Edge(1L,4L,2.0))),1).partitionBy(PartitionStrategy.RandomVertexCut,4)

graph_distance.triplets.collect.foreach(println)
((1,1),(2,1),10.0)
((1,1),(2,1),3.0)
((1,1),(4,1),2.0)
((2,1),(3,1),5.0)
((2,1),(3,1),7.0)
val graph_grouped = graph_distance.groupEdges((a,b)=> math.min(a,b))
graph_grouped.triplets.collect.foreach(println)
((1,1),(2,1),3.0)
((1,1),(4,1),2.0)
((2,1),(3,1),5.0)

In [None]:
连接其它RDD

// joinVertices 不会修改点属性的类型
val graph_distance = Graph.fromEdges(sc.makeRDD(
    Array(Edge(1L,2L,10.0),Edge(1L,2L,3.0),
         Edge(2L,3L,5.0),Edge(2L,3L,7.0),
         Edge(1L,4L,2.0))),"").partitionBy(PartitionStrategy.RandomVertexCut,4)

val rdd_city = sc.makeRDD(Array((1L,"Beijing"),(2L,"Nanjing"),(3L,"Shanghai"),(4L,"Tianjing")))

val graph_join = graph_distance.joinVertices[String](rdd_city)((id,v,u) => u)
graph_join.triplets.collect.foreach(println)

((1,Beijing),(2,Nanjing),10.0)
((1,Beijing),(2,Nanjing),3.0)
((1,Beijing),(4,Tianjing),2.0)
((2,Nanjing),(3,Shanghai),5.0)
((2,Nanjing),(3,Shanghai),7.0)

//outerJoinVertices 可以修改点属性的类型
val rdd_gender = sc.makeRDD(Array((1L,"female"),(2L,"male"),(3L,"male"),(4L,"female")))
val graph_outjoin = 
   myGraph.outerJoinVertices[String,(String,String)](rdd_gender)((id,v,opt)=>(v,opt.getOrElse(" ")))
graph_outjoin.triplets.collect.foreach(println)

((1,(Ann,female)),(2,(Bill,male)),is-friends-with)
((2,(Bill,male)),(3,(Charles,male)),is-friends-with)
((3,(Charles,male)),(4,(Diane,female)),is-friends-with)
((3,(Charles,male)),(5,(Went to gym this morning, )),wrote-status)
((4,(Diane,female)),(5,(Went to gym this morning, )),like-status)

aggregateMessages在图结构中实现了一个基本的map/reduce编程模型。

sendMsg是map过程，每条边向其src或dst发送一个消息。其输入参数为EdgeContext类型。 EdgeContext类型比Triplet类型多了sendToSrc和sendToDst两个方法，用于发送消息。

mergeMsg是reduce过程，每个顶点收集其收到的消息，并做合并处理。

aggregateMessages的返回值是一个VetexRDD。

使用aggregateMessages进行迭代的方法尽管已经非常简短了，但是其迭代过程中中间结果的缓存问题可能会给程序的性能造成影响。

使用pregel迭代接口能够很好地进行缓存优化。

内置图算法主要包括：

PageRank: 可以由PageRank值衡量节点的重要程度，常用于网页排名，社区关键人物分析。

personalizedPageRank: 个性化的PageRank值，可用于社交网站中推荐"你可能认识的人"。

triangleCount: 三角形个数，可以衡量周围的节点的连通性，也可以用于衡量网络总体的联通性。

ShortestPaths: 最小跳跃数，可以找到图中全部顶点和给定顶点的最小跳跃数。

connectedComponents: 联通组件，可以在社交网络中找到社交圈子。

stronglyConnectedComponents: 增强联通组件，针对有向图，可以找到社交圈子

LabelPropagation: 标签传播算法，可以用于社区发现。但往往不收敛，不是特别推荐使用。

个性化PageRank是 PageRank的一个变种，可以用于在社交网站中给用户推荐"你可能认识的人"。

personalizedPageRank除了要设定一个迭代终止的条件，还要指定一个源顶点的srcId.

如果是这样的话，在应用的时候有点傻逼，不可能遍历全部的源顶点

connectedComponents

connectedComponents连通组件会将图划分成几个连通区域，每个顶点的属性值为其所在连通区域中顶点编号的最小值。 connectedComponents的一种巧妙用法是用来在spark上实现DBSCAN算法，可以用它来对临时聚类簇进行合并。

连通组件不关心边的方向。


In [None]:

val graph = Graph.fromEdgeTuples(sc.makeRDD(Array((1L,2L),(2L,3L),(3L,1L),(5L,5L),(6L,7L))),1)
val graph_connected = graph.connectedComponents()
graph_connected.vertices.collect
Array((1,1), (5,5), (6,6), (2,1), (3,1), (7,6))
graph_connected.vertices.map(t=>(t._2,Set(t._1))).reduceByKey((s1,s2)=>s1|s2).collect
Array((1,Set(1, 2, 3)), (5,Set(5)), (6,Set(6, 7)))


在一张图中可以让每个顶点有不同的属性， 对于用户和物品就可以生成二部图：

In [None]:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null