<img src="img/logocs.jpeg" width="200" align="left">
<img src="img/logops.jpg" width="200" align="right">

# <center>Learn More about GraphX and Explore Pregel API</center>

<img src="http://spark.apache.org/docs/latest/img/graphx_logo.png" width=300/>
#### Family Name: 
#### First Name: 

## Exploring GraphX Pregel API
The purpose of this lab is to learn more about GraphX and to explore the GraphX Pregel API  

The ziped file named franceMapData.zip contains 3 file datasets: france_lite.map, france.map, toyData.map.
Each line in each file contains the name of the city and its coordinates separated by ":", for example, A:100:100. Each line after "--" contains the name of the city,  the name of a neighbour city and the distance  separated by ":", for example A:B:140. Here is the graph corresponding to toyData.map and france.map.

<img src="img/toy.png" width="300" align="right">
<img src="img/france.png" width="400" align="left">

### A summary list of Graph class operators

In [None]:
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

### Example using aggragateMessages method to implement  Dijkstra's algorithm.

In [None]:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


def dijkstra[VD](g:Graph[VD,Int], origin:VertexId) = {
  var g2 = g.mapVertices(
    (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue))

  for (i <- 1L to g.vertices.count-1) {
    val currentVertexId =
      g2.vertices.filter(!_._2._1)
        .fold((0L,(false,Double.MaxValue)))((a,b) =>
           if (a._2._2 < b._2._2) a else b)
        ._1

    val newDistances = g2.aggregateMessages[Double](
        ctx => if (ctx.srcId == currentVertexId)
                 ctx.sendToDst(ctx.srcAttr._2 + ctx.attr),
        (a,b) => math.min(a,b))

    g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) =>
      (vd._1 || vid == currentVertexId,
       math.min(vd._2, newSum.getOrElse(Double.MaxValue))))
  }

  g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
    (vd, dist.getOrElse((false,Double.MaxValue))._2))
}

val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"),
  (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges = sc.makeRDD(Array(Edge(1L, 2L, 7), Edge(1L, 4L, 5),
  Edge(2L, 3L, 8), Edge(2L, 4L, 9), Edge(2L, 5L, 7),
  Edge(3L, 5L, 5), Edge(4L, 5L, 15), Edge(4L, 6L, 6),
  Edge(5L, 6L, 8), Edge(5L, 7L, 9), Edge(6L, 7L, 11)))
val myGraph = Graph(myVertices, myEdges)

val tic = System.nanoTime()
val result = dijkstra(myGraph, 1L)
val tac = System.nanoTime()
println("Finding shortest path from A took "+ (tac-tic)/1e9 + " seconds.")
result.vertices.map(_._2).collect.foreach(println)

### Example using Pregel method to implement  Dijkstra's algorithm.

In [None]:
def findShortestPath(startNodes: List[String], graph:Graph[String, Int]): Graph[Int,  Int] = {
   
    
    val initialMsg = Int.MaxValue
    val maxIterations = Int.MaxValue
    
    val shortestPathGraph = graph.mapVertices((vid, vd) => if (startNodes.contains(vd)) 0 else Int.MaxValue)

    val g = shortestPathGraph.pregel(initialMsg, 
                           maxIterations,
                           EdgeDirection.Out)( 
                           (VertexId, vd, a) => math.min(vd, a),
                            et => {
                                if(et.srcAttr+et.attr < et.dstAttr){
                                Iterator((et.dstId, if(et.srcAttr == Int.MaxValue) Int.MaxValue else et.srcAttr + et.attr))}
                                else{Iterator.empty}},
                           (a:Int, b:Int) => math.min(a,b))
    return g
    
}

val tic = System.nanoTime()
val result = findShortestPath(List("A"), myGraph)
val tac = System.nanoTime()
println("Finding shortest path from A took "+ (tac-tic)/1e9 + " seconds.")

result.vertices.map(_._2).collect.foreach(println)

Exercise 1: Write a method to read (in parallel) this files line by line in order to create RDD vertices and edges and Graph object.

In [None]:
//To Do

Exercise 2: Compute the diameter of the created graph using the above methods

In [None]:
//To Do

Exercise 3: Implement PageRank algorithm:  First using pageRank method of Graph then using pregel API.

In [None]:
//To DO