Extending spark.graphx.lib.ShortestPaths to GraphXShortestWeightedPaths
=======================================================================

### 2016-2020, Ivan Sadikov and Raazesh Sainudiin

We extend Shortest Paths algorithm in Spark's GraphX Library to allow
for user-specified edge-weights as an edge attribute.

This is part of *Project MEP: Meme Evolution Programme* and supported by
databricks academic partners program.

The analysis is available in the following databricks notebook: \*
<http://lamastex.org/lmse/mep/src/GraphXShortestWeightedPaths.html>

\`\`\` Copyright 2016 Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. \`\`\`

### Let's modify shortest paths algorithm to allow for user-specified edge-weights

Update shortest paths algorithm to work over edge attribute of
edge-weights as Double, key concepts are: - we increment map with delta,
which is `edge.attr` - edge attribute is anything numeric, tested on
Double - infinity value is not infinity, but `Integer.MAX_VALUE`

Modifying the following code: \*
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala

Explained here: \*
http://note.yuhc.me/2015/03/graphx-pregel-shortest-path/

In [None]:
import scala.reflect.ClassTag
import org.apache.spark.graphx._

/**
 * Computes shortest weighted paths to the given set of landmark vertices, returning a graph where each
 * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
 * Currently supports only Graph of [VD, Double], where VD is an arbitrary vertex type.
 */
object GraphXShortestWeightedPaths extends Serializable {
  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  
  
  //type SPMap = Map[VertexId, Double]
  type SPMap = Map[VertexId, Tuple3[Double, VertexId, VertexId]]
  
  // initial and infinity values, use to relax edges
  private val INITIAL = 0.0
  private val DEFAULT_ID = -1.toLong
  private val INFINITY = Int.MaxValue.toDouble

  //private def makeMap(x: (VertexId, Double)*) = Map(x: _*)
  private def makeMap(x: (VertexId, Tuple3[Double, VertexId, VertexId])*) = Map(x: _*) // _* -> python "unpacking", see https://stackoverflow.com/questions/6051302/what-does-colon-underscore-star-do-in-scala
  
  
  //private def incrementMap(spmap: SPMap, delta: Double): SPMap = {
  //  spmap.map { case (v, d) => v -> (d + delta) }
  //}
  
  private def incrementMap(spmap: SPMap, delta: Double, id: VertexId): SPMap = { 
    spmap.map { case (v, d) => v -> (Tuple3(d._1 + delta, d._3, id)) }
  }

  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
    (spmap1.keySet ++ spmap2.keySet).map {
      
    k =>{
        if (spmap1.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._1 < spmap2.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._1) 
                k -> (Tuple3(spmap1.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._1, 
                             spmap1.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._2, 
                             spmap1.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._3))
        else 
                k -> (Tuple3(spmap2.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._1, 
                             spmap2.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._2, 
                             spmap2.getOrElse(k, Tuple3(INFINITY, DEFAULT_ID, DEFAULT_ID))._3))
        }
      /*k => k -> (Tuple2(
                  math.min(
                    spmap1.getOrElse(k, Tuple2(INFINITY, 0L))._1,
                    spmap2.getOrElse(k, Tuple2(INFINITY, 0L))._1),
                  0L)
                )*/
      //k => k -> math.min(spmap1.getOrElse(k, INFINITY), spmap2.getOrElse(k, INFINITY))
    }.toMap
  }
  
  // at this point it does not really matter what vertex type is
  def run[VD](graph: Graph[VD, Double], landmarks: Seq[VertexId]): Graph[SPMap, Double] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      // initial value for itself is 0.0 as Double
      if (landmarks.contains(vid)) makeMap(vid -> Tuple3(INITIAL,DEFAULT_ID,DEFAULT_ID)) else makeMap()
      //if (landmarks.contains(vid)) makeMap(vid -> INITIAL) else makeMap()
    }

    val initialMessage = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
    //def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {// Change to Double to VertexId after debugging
      val newAttr = incrementMap(edge.dstAttr, edge.attr, edge.srcId)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }

    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}

println("Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))")

  

>     Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))
>     import scala.reflect.ClassTag
>     import org.apache.spark.graphx._
>     defined object GraphXShortestWeightedPaths

  

### Generate test graph

Generate simple graph with double weights for edges

In [None]:
import scala.util.Random

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 150, seed=123L).mapEdges { e => 
  // to make things nicer we assign 0 distance to itself
  if (e.srcId == e.dstId) 0.0 else Random.nextDouble()
}





  

>     import scala.util.Random
>     import org.apache.spark.graphx.{Graph, VertexId}
>     import org.apache.spark.graphx.util.GraphGenerators
>     graph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@e2066f3

In [None]:
// Create an RDD for the vertices
val nodes: RDD[(VertexId, String)] =
  sc.parallelize(Seq((0L, "0"), (1L, "1"), (2L, "2"), (3L, "3")))
// Create an RDD for edges
val connections: RDD[Edge[Double]] =
  sc.parallelize(Seq(Edge(0L, 1L, 1),    Edge(1L, 2L, 1),
                       Edge(2L, 3L, 1), Edge(3L, 4L, 1)))
// Build the initial Graph
val default_node = "default"
val graph = Graph(nodes, connections, default_node)



  

>     nodes: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = ParallelCollectionRDD[17] at parallelize at command-3289963953973041:3
>     connections: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[18] at parallelize at command-3289963953973041:6
>     default_node: String = default
>     graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@9637035

In [None]:
graph.vertices.take(150).foreach(println)
graph.edges.take(150).foreach(println)

  

>     (0,0)
>     (1,1)
>     (2,2)
>     (3,3)
>     (4,default)
>     Edge(0,1,1.0)
>     Edge(1,2,1.0)
>     Edge(2,3,1.0)
>     Edge(3,4,1.0)

In [None]:
val landMarkVertexIds = Seq(4L)
val result = GraphXShortestWeightedPaths.run(graph, landMarkVertexIds)

  

>     landMarkVertexIds: Seq[Long] = List(4)
>     result: org.apache.spark.graphx.Graph[GraphXShortestWeightedPaths.SPMap,Double] = org.apache.spark.graphx.impl.GraphImpl@4af64c00

In [None]:
// Found shortest paths
println(result.vertices.collect.mkString("\n"))

  

>     (0,Map(4 -> (4.0,1,0)))
>     (1,Map(4 -> (3.0,2,1)))
>     (2,Map(4 -> (2.0,3,2)))
>     (3,Map(4 -> (1.0,-1,3)))
>     (4,Map(4 -> (0.0,-1,-1)))

In [None]:
// set all edges to zero
val path = result.mapEdges(e => 0)
println(path.edges.collect.mkString("\n"))
println(path.vertices.collect.mkString("\n"))
println(path.triplets.collect.mkString("\n"))


val relations: RDD[Int] = path.triplets.map(triplet => {
  if (triplet.srcAttr(4)._2 == -1 ) // If the edge between srcId and dstId is use in the path
    -1
  else
    1
})
  
relations.collect.foreach(println)

  

>     Edge(0,1,0)
>     Edge(1,2,0)
>     Edge(2,3,0)
>     Edge(3,4,0)
>     (0,Map(4 -> (4.0,1,0)))
>     (1,Map(4 -> (3.0,2,1)))
>     (2,Map(4 -> (2.0,3,2)))
>     (3,Map(4 -> (1.0,-1,3)))
>     (4,Map(4 -> (0.0,-1,-1)))
>     ((0,Map(4 -> (4.0,1,0))),(1,Map(4 -> (3.0,2,1))),0)
>     ((1,Map(4 -> (3.0,2,1))),(2,Map(4 -> (2.0,3,2))),0)
>     ((2,Map(4 -> (2.0,3,2))),(3,Map(4 -> (1.0,-1,3))),0)
>     ((3,Map(4 -> (1.0,-1,3))),(4,Map(4 -> (0.0,-1,-1))),0)
>     1
>     1
>     1
>     -1
>     path: org.apache.spark.graphx.Graph[GraphXShortestWeightedPaths.SPMap,Int] = org.apache.spark.graphx.impl.GraphImpl@5f801981
>     relations: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[150] at map at command-3289963953973045:8

In [None]:
// Note on message: the message consists of two variables. 
// The first varible specifies the goal node, the second specifies the next node which should be executed in the path.
// Note on vprog
type SPMap = Map[VertexId, Tuple3[Double, VertexId, VertexId]]
val goalId = 4L
val startId = 0L

val initMsg = (goalId, startId) 

def mergeMsg(msg1: Tuple2[VertexId,VertexId], msg2: Tuple2[VertexId,VertexId]): Tuple2[VertexId,VertexId]) = { // we should only get one msg
    msg1
}

def vprog(id: VertexId, attr: SPMap, msg: Tuple2[VertexId,VertexId]): Map[VertexId, Tuple3[Double, VertexId, VertexId]] = {
  // Check that the current node is the one adressed in the next step of the path, i.e. the second variable
  if(VertexId == msg._2)
    // Do a lookup in the map, attr(msg._1)._2 gives the next node on the path   
    val nextStep = attr(msg._1)._2
      // Note: if attr(msg._1)._2 is -1 and attr(msg._1)._3 is id then the next node is the goal node, i.e. msg._1

    // Now, I need to find a way to send goalId, i.e. msg._1, to the sendMsg function

}
def sendMsg(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, VertexId)] = {
  
  // How should this function recieve the goalId variable??
  
  if (edge.srcAttr(goalId)._2 == -1) // If the edge between srcId and dstId is not used in the path
    Iterator.empty// Do nothing
  else
    Iterator((triplet.dstId, triplet.dstId))// Send xest msg
}

result.Pregel(initMsg)(vertexProgram, sendMsg, mergeMsg)






In [None]:
// edges with weights, make sure to check couple of shortest paths from above
display(result.edges.toDF)

  

[TABLE]

Truncated to 30 rows

In [None]:
display(graph.vertices) 

  

[TABLE]

In [None]:
// now let us collect the shortest distance between every vertex and every landmark vertex
// to manipulate scala maps that are vertices of the result see: http://docs.scala-lang.org/overviews/collections/maps.html
// a quick point: http://stackoverflow.com/questions/28769367/scala-map-a-map-to-list-of-tuples
val shortestDistsVertex2Landmark = result.vertices.flatMap(GxSwpSPMap => {
  GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // to get triples: vertex, landmarkVertex, shortest_distance
})

  

>     shortestDistsVertex2Landmark: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, (Double, org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId))] = MapPartitionsRDD[2345] at flatMap at command-4443336225095845:4

In [None]:
shortestDistsVertex2Landmark.collect.mkString("\n")

  

>     res9: String =
>     (0,4,(4.0,1,0))
>     (1,4,(3.0,2,1))
>     (2,4,(2.0,3,2))
>     (3,4,(1.0,-1,3))
>     (4,4,(0.0,-1,-1))

  

#### Let's make a DataFrame for visualizing pairwise matrix plots

We want to make 4 columns in this example as follows (note actual values
change for each realisation of graph!):

`landmark_Id1 ("0"),   landmarkID2 ("4"), landmarkId3 ("9"),  srcVertexId ------------------------------------------------------------------------ 0.0,                  0.7425..,          0.8718,                0 0.924...,             1.2464..,          1.0472,                1 ...`

In [None]:
// http://alvinalexander.com/scala/how-to-sort-map-in-scala-key-value-sortby-sortwith
// we need this to make sure that the maps are ordered by the keys for ensuring unique column values
import scala.collection.immutable.ListMap
import sqlContext.implicits._

  

>     import scala.collection.immutable.ListMap
>     import sqlContext.implicits._

In [None]:
 // recall our landmark vertices in landMarkVertexIds. let's use their Strings for names
val unorderedNamedLandmarkVertices = landMarkVertexIds.map(id => (id, id.toString) )
val orderedNamedLandmarkVertices = ListMap(unorderedNamedLandmarkVertices.sortBy(_._1):_*)
val orderedLandmarkVertexNames = orderedNamedLandmarkVertices.toSeq.map(x => x._2)
orderedLandmarkVertexNames.mkString(", ")

  

>     unorderedNamedLandmarkVertices: Seq[(Long, String)] = List((4,4))
>     orderedNamedLandmarkVertices: scala.collection.immutable.ListMap[Long,String] = ListMap(4 -> 4)
>     orderedLandmarkVertexNames: Seq[String] = Vector(4)
>     res7: String = 4

In [None]:
// this is going to be our column names
val columnNames:Seq[String] = orderedLandmarkVertexNames :+ "srcVertexId"

  

>     columnNames: Seq[String] = Vector(4, srcVertexId)

In [None]:
// a case class to make a data-frame quickly from the result
case class SeqOfDoublesAndsrcVertexId(shortestDistances: Seq[Double], srcVertexId: VertexId)

  

>     defined class SeqOfDoublesAndsrcVertexId

In [None]:
val shortestDistsSeqFromVertex2Landmark2DF = result.vertices.map(GxSwpSPMap => {
  //GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // from before to get triples: vertex, landmarkVertex, shortest_distance
  val v = GxSwpSPMap._1
  val a = ListMap(GxSwpSPMap._2.toSeq.sortBy(_._1):_*).toSeq.map(x => x._2)
  val d = (a,v)
  d
}).map(x => SeqOfDoublesAndsrcVertexId(x._1, x._2)).toDF()

In [None]:
display(shortestDistsSeqFromVertex2Landmark2DF) // but this dataframe needs the first column exploded into 3 columns

  

Now we want to make separate columns for each distance in the Sequence
in column 'shortestDistances'.

Let us use the following ideas for this: \*
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html

In [None]:
// this is from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{lit, udf}

// UDF to extract i-th element from array column
//val elem = udf((x: Seq[Int], y: Int) => x(y))
val elem = udf((x: Seq[Double], y: Int) => x(y)) // modified for Sequence of Doubles

// Method to apply 'elem' UDF on each element, requires knowing length of sequence in advance
def split(col: Column, len: Int): Seq[Column] = {
  for (i <- 0 until len) yield { elem(col, lit(i)).as(s"$col($i)") }
}

// Implicit conversion to make things nicer to use, e.g. 
// select(Column, Seq[Column], Column) is converted into select(Column*) flattening sequences
implicit class DataFrameSupport(df: DataFrame) {
  def select(cols: Any*): DataFrame = {
    var buffer: Seq[Column] = Seq.empty
    for (col <- cols) {
      if (col.isInstanceOf[Seq[_]]) {
        buffer = buffer ++ col.asInstanceOf[Seq[Column]]
      } else {
        buffer = buffer :+ col.asInstanceOf[Column]
      }
    }
    df.select(buffer:_*)
  }
}

  

>     import org.apache.spark.sql.{Column, DataFrame}
>     import org.apache.spark.sql.functions.{lit, udf}
>     elem: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9124/2006068772@4ae2323a,DoubleType,List(Some(class[value[0]: array<double>]), Some(class[value[0]: int])),None,false,true)
>     split: (col: org.apache.spark.sql.Column, len: Int)Seq[org.apache.spark.sql.Column]
>     defined class DataFrameSupport

In [None]:
val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($"shortestDistances", 3), $"srcVertexId")

  

>     shortestDistsFromVertex2Landmark2DF: org.apache.spark.sql.DataFrame = [shortestDistances(0): double, shortestDistances(1): double ... 2 more fields]

In [None]:
display(shortestDistsFromVertex2Landmark2DF)

  

[TABLE]

In [None]:
// now let's give it our names based on the landmark vertex Ids
val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($"shortestDistances", 3), $"srcVertexId").toDF(columnNames:_*)

  

>     shortestDistsFromVertex2Landmark2DF: org.apache.spark.sql.DataFrame = [0: double, 4: double ... 2 more fields]

In [None]:
display(shortestDistsFromVertex2Landmark2DF)

  

[TABLE]

In [None]:
display(shortestDistsFromVertex2Landmark2DF.select($"0",$"4",$"9"))

  

[TABLE]