# Graph Algorithms - Part 2 - Using provided GrophX Algorithms

Using algorithms already implemented in the Apache Spark GraphX platform

The following topics have been covered:

  1. Initialising Spark Environment
  1. Loading and constructing example graphs
  1. Using out-of-the-box algorithms
  1. Using the send message method to write a simple algorithm
  1. Use the Pregel API to write a simple Algorithm


---
## Initialising the Apache Spark environment

When running in a jupyter notebook, sometimes the required libraries may not exist in the classpath.

Load all the essential apache spark libraries from maven public repositories at runtime in this manner:

In [1]:
import $ivy.`org.apache.spark::spark-core:3.4.0`
import $ivy.`org.apache.spark::spark-mllib-local:3.4.0`
import $ivy.`org.apache.spark::spark-mllib:3.4.0`
import $ivy.`org.apache.spark::spark-graphx:3.4.0`
import $ivy.`org.apache.spark::spark-streaming:3.4.0`
import $ivy.`org.apache.spark::spark-tags:3.4.0`

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                          
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                                        
[39m
[32mimport [39m[36m$ivy.$                                   [39m

In [2]:
import $ivy.`org.scalanlp::breeze-viz:1.2`
import $ivy.`org.jfree:jfreechart:1.5.4`
import $ivy.`org.creativescala::doodle-core:0.9.21`

[32mimport [39m[36m$ivy.$                             
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                                      [39m

In [3]:
import $ivy.`com.fasterxml.jackson.core:jackson-databind:2.15.1`

[32mimport [39m[36m$ivy.$                                                   [39m

---

### 1.2 Import the Spark Libraries

In [99]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

[32mimport [39m[36morg.apache.spark.SparkContext
[39m
[32mimport [39m[36morg.apache.spark.SparkConf
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m

In [100]:
import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{col, udf, _}

[32mimport [39m[36morg.apache.spark.ml.linalg.{Matrix, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.sql.Row
[39m
[32mimport [39m[36morg.apache.spark.sql.Dataset
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, udf, _}[39m

In [101]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.graphx._
// To make some of the examples work we will also need RDD
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD
[39m
[32mimport [39m[36morg.apache.spark.storage.StorageLevel[39m

In [102]:
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import breeze.linalg._
import breeze.plot._

[32mimport [39m[36mcom.fasterxml.jackson.core.`type`.TypeReference
[39m
[32mimport [39m[36mcom.fasterxml.jackson.module.scala.DefaultScalaModule
[39m
[32mimport [39m[36mbreeze.linalg._
[39m
[32mimport [39m[36mbreeze.plot._[39m

In [103]:
val appName = "Spark_Graph_Algorithms2"

[36mappName[39m: [32mString[39m = [32m"Spark_Graph_Algorithms2"[39m

### 1.3 Setup the Logger

To control the volume of log messages, change the log4j configuraiton programatically like this:

In [104]:
import org.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

val logger: Logger = Logger.getLogger(appName)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
logger.setLevel(Level.INFO)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

[39m
[36mlogger[39m: [32mLogger[39m = org.apache.log4j.Logger@f4880fc

---
## 2. Create Spark session

### 2.1 Initialise Spark Session

In [105]:
val sparkConf = new SparkConf()
             .setAppName(appName)
             .setMaster("local[*]")
             //.setMaster("spark://sparkmaster320:7077")
             .set("spark.driver.extraClassPath", "/mnt/shared/lib/db2jcc4.jar,/mnt/shared/lib/breeze-viz_2.12-1.2.jar")
             .set("spark.executor.extraClassPath", "/mnt/shared/lib/db2jcc4.jar,/mnt/shared/lib/breeze-viz_2.12-1.2.jar")
             .set("spark.default.parallelism", "6")

[36msparkConf[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@69a5de46

In [106]:
// Apply the config to start a spark session:
val spark = org.apache.spark.sql.SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

23/06/02 19:03:02 INFO ResourceUtils: No custom resources configured for spark.driver.
23/06/02 19:03:02 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/06/02 19:03:02 INFO ResourceProfile: Limiting resource is cpu
23/06/02 19:03:02 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/06/02 19:03:02 INFO SecurityManager: Changing view acls to: notebooker
23/06/02 19:03:02 INFO SecurityManager: Changing modify acls to: notebooker
23/06/02 19:03:02 INFO SecurityManager: Changing view acls groups to: 
23/06/02 19:03:02 INFO SecurityManager: Changing modify acls groups to: 
23/06/02 19:03:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: notebooker; groups with view pe

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3006e12f

In [107]:
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerMaster").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerMasterEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerInfo").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.DiskBlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.memory.MemoryStore").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.ShuffleBlockFetcherIterator").setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.DAGScheduler").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.TaskSchedulerImpl").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.TaskSetManager").setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark.SparkContext").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.executor.Executor").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.ui.JettyUtils").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.network.netty.NettyBlockTransferService").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.SparkEnv").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.util.Utils").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.rdd.HadoopRDD").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.MapOutputTrackerMasterEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.hadoop.mapred.FileOutputCommitter").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.mapred.SparkHadoopMapRedUtil").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.internal.io.HadoopMapRedCommitProtocol").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.internal.io.SparkHadoopWriter").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.rdd.ZippedPartitionsRDD2").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.rdd.MapPartitionsRDD").setLevel(Level.ERROR)

In [108]:
val sc = spark.sparkContext

[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@2d1e28bd

In [109]:
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

---

## Load sample graph data

Data can be loaded into a graph by reading from an edgelist file

In [110]:
// read from edgelist file
val graph1 = GraphLoader
      .edgeListFile(sc,
                    "../src/test/resources/graph1_edgelist.txt",
                    edgeStorageLevel=StorageLevel.MEMORY_AND_DISK,
                    vertexStorageLevel=StorageLevel.MEMORY_AND_DISK)
      .mapEdges(e => e.attr.toDouble)
      .mapVertices[(Long, Double)]((vid, data) => (vid.toLong, 0.0));

23/06/02 19:03:05 INFO FileInputFormat: Total input files to process : 1
23/06/02 19:03:05 INFO GraphLoader: It took 109 ms to load the edges


[36mgraph1[39m: [32mGraph[39m[([32mLong[39m, [32mDouble[39m), [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@6cb3084e

Data can also be fed in via RDDs of edges and vertices:

In [111]:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.makeRDD( Array(
        (3L, ("rxin", "student"))
      , (7L, ("jgonzal", "postdoc"))
      , (1L, ("somebody", "postdoc"))
      , (5L, ("franklin", "prof"))
      , (2L, ("istoica", "prof"))
      , (10L, ("hoityToity", "student"))
     )
   ).persist(StorageLevel.MEMORY_AND_DISK)

[36musers[39m: [32mRDD[39m[([32mVertexId[39m, ([32mString[39m, [32mString[39m))] = ParallelCollectionRDD[17] at makeRDD at cmd110.sc:2

In [112]:
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.makeRDD(
      Array(
      Edge(3L, 7L, "collab")
      , Edge(5L, 3L, "advisor")
      , Edge(2L, 5L, "colleague")
      , Edge(5L, 7L, "pi")
      , Edge(10L, 5L, "friend")
      , Edge(10L, 1L, "friend")
      )
    ).persist(StorageLevel.MEMORY_AND_DISK)

[36mrelationships[39m: [32mRDD[39m[[32mEdge[39m[[32mString[39m]] = ParallelCollectionRDD[18] at makeRDD at cmd111.sc:2

In [113]:
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

[36mdefaultUser[39m: ([32mString[39m, [32mString[39m) = ([32m"John Doe"[39m, [32m"Missing"[39m)

In [114]:
// Build the initial Graph
val graph2 = Graph(users, relationships, defaultUser)

[36mgraph2[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@3a6afacb

In [115]:
import org.apache.spark.graphx.util.GraphGenerators

[32mimport [39m[36morg.apache.spark.graphx.util.GraphGenerators[39m

In [116]:
val graph3 = GraphGenerators.gridGraph(sc, 4, 4)
val graph4 = GraphGenerators.starGraph(sc, 8)
val graph5 = GraphGenerators.logNormalGraph(sc, 10)

[36mgraph3[39m: [32mGraph[39m[([32mInt[39m, [32mInt[39m), [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@3acb48fe
[36mgraph4[39m: [32mGraph[39m[[32mInt[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@4420d662
[36mgraph5[39m: [32mGraph[39m[[32mLong[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@302eebf5

---

## Utility functions for viewing graphs


In [117]:
// define convenience function to print all edges of a graph:
def printEdges[V, E]( graph: Graph[V, E] ): Unit = {
    
    val facts: RDD[String] = graph.triplets.map(triplet => 
      " " + triplet.toTuple._1 + " --[" + triplet.toTuple._3 + "]--> " + triplet.toTuple._2 );

    facts.collect.foreach(println(_))
}

defined [32mfunction[39m [36mprintEdges[39m

In [118]:
def printGraphProperties( graph: Graph[_,_] ): Unit = {
    // graph operators:
    println( "Num of edges = " + graph.numEdges )
    println( "Num of vertices = " + graph.numVertices )
    println( "Num of inDegrees = " + graph.inDegrees.count() )
    println( "Num of outDegrees = " + graph.outDegrees.count() )
    println( "Num of degrees = " + graph.degrees.count() )
}

defined [32mfunction[39m [36mprintGraphProperties[39m

In [119]:
def printNeighbors[V, D, E](graph: Graph[_, _], edgeDirection: EdgeDirection):Unit = {
    graph.collectNeighborIds(edgeDirection).collect.foreach(
      x =>
        println("Neighbors of " + x._1 + " ("+ edgeDirection +") are: " + x._2.mkString(",") )
    );
}

defined [32mfunction[39m [36mprintNeighbors[39m

In [120]:
def printVertices[V, E](graph:Graph[_, _]):Unit = {
    graph.vertices.map(
      vd => "Vertex ID = " + vd._1 + ": " + vd._2
    ).collect.foreach(println(_))
}

defined [32mfunction[39m [36mprintVertices[39m

In [121]:
printEdges(graph3)

 (0,(0,0)) --[1.0]--> (1,(0,1))
 (0,(0,0)) --[1.0]--> (4,(1,0))
 (1,(0,1)) --[1.0]--> (2,(0,2))
 (1,(0,1)) --[1.0]--> (5,(1,1))
 (2,(0,2)) --[1.0]--> (3,(0,3))
 (2,(0,2)) --[1.0]--> (6,(1,2))
 (3,(0,3)) --[1.0]--> (7,(1,3))
 (4,(1,0)) --[1.0]--> (5,(1,1))
 (4,(1,0)) --[1.0]--> (8,(2,0))
 (5,(1,1)) --[1.0]--> (6,(1,2))
 (5,(1,1)) --[1.0]--> (9,(2,1))
 (6,(1,2)) --[1.0]--> (7,(1,3))
 (6,(1,2)) --[1.0]--> (10,(2,2))
 (7,(1,3)) --[1.0]--> (11,(2,3))
 (8,(2,0)) --[1.0]--> (9,(2,1))
 (8,(2,0)) --[1.0]--> (12,(3,0))
 (9,(2,1)) --[1.0]--> (10,(2,2))
 (9,(2,1)) --[1.0]--> (13,(3,1))
 (10,(2,2)) --[1.0]--> (11,(2,3))
 (10,(2,2)) --[1.0]--> (14,(3,2))
 (11,(2,3)) --[1.0]--> (15,(3,3))
 (12,(3,0)) --[1.0]--> (13,(3,1))
 (13,(3,1)) --[1.0]--> (14,(3,2))
 (14,(3,2)) --[1.0]--> (15,(3,3))


In [122]:
printEdges(graph4)

 (1,1) --[1]--> (0,1)
 (2,1) --[1]--> (0,1)
 (3,1) --[1]--> (0,1)
 (4,1) --[1]--> (0,1)
 (5,1) --[1]--> (0,1)
 (6,1) --[1]--> (0,1)
 (7,1) --[1]--> (0,1)


## Explore Out-of-the-box graph algorithms in GraphX

### Page Rank Algorithm:

In [123]:
val graph10 = graph1.pageRank(tol=0.01, resetProb = 0.15)

23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 0
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 1
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 2
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 3
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 4
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 5
23/06/02 19:03:10 INFO Pregel: Pregel finished iteration 6
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 7
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 8
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 9
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 10
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 11
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 12
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 13
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 14
23/06/02 19:03:11 INFO Pregel: Pregel finished iteration 15
23/06/02 19:03:11 INFO Pregel: Pregel finished ite

[36mgraph10[39m: [32mGraph[39m[[32mDouble[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@44faea2a

In [124]:
val pgrank_df = spark.createDataFrame(graph10.vertices)
    .toDF(Seq("Vertex_id", "pagerank_score"):_*)

23/06/02 19:03:12 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/06/02 19:03:12 INFO SharedState: Warehouse path is 'file:/mnt/src/spark_projs/graphx-algorithms/examples/spark-warehouse'.


[36mpgrank_df[39m: [32mDataFrame[39m = [Vertex_id: bigint, pagerank_score: double]

In [125]:
pgrank_df.show()

+---------+-------------------+
|Vertex_id|     pagerank_score|
+---------+-------------------+
|       80| 0.9822563471669581|
|       30| 1.7297158594552773|
|       50| 0.2880277933777645|
|       40|0.15569069912311598|
|       90| 0.9822563471669581|
|       70| 0.9822563471669581|
|       20|  1.862052953709926|
|       60| 0.4005143234942158|
|       10|  1.617229329338826|
+---------+-------------------+



In [126]:
printEdges(graph10)

 (10,1.617229329338826) --[1.0]--> (20,1.862052953709926)
 (20,1.862052953709926) --[1.0]--> (30,1.7297158594552773)
 (30,1.7297158594552773) --[1.0]--> (10,1.617229329338826)
 (70,0.9822563471669581) --[1.0]--> (80,0.9822563471669581)
 (40,0.15569069912311598) --[1.0]--> (50,0.2880277933777645)
 (50,0.2880277933777645) --[1.0]--> (60,0.4005143234942158)
 (60,0.4005143234942158) --[1.0]--> (20,1.862052953709926)
 (80,0.9822563471669581) --[1.0]--> (90,0.9822563471669581)
 (90,0.9822563471669581) --[1.0]--> (70,0.9822563471669581)


### Connected Components

In [127]:
// Run Connected Components
val ccGraph = graph1.connectedComponents() // No longer contains missing field

// Remove missing vertices as well as the edges to connected to them
val validGraph = graph1.subgraph(vpred = (id, attr) => attr._2 != "Missing")

// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

23/06/02 19:03:22 INFO Pregel: Pregel finished iteration 0
23/06/02 19:03:22 INFO Pregel: Pregel finished iteration 1
23/06/02 19:03:22 INFO Pregel: Pregel finished iteration 2
23/06/02 19:03:22 INFO Pregel: Pregel finished iteration 3


[36mccGraph[39m: [32mGraph[39m[[32mVertexId[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@a939132
[36mvalidGraph[39m: [32mGraph[39m[([32mLong[39m, [32mDouble[39m), [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@50431fc
[36mvalidCCGraph[39m: [32mGraph[39m[[32mVertexId[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@53f27a28

In [128]:
printEdges(ccGraph)

 (10,10) --[1.0]--> (20,10)
 (20,10) --[1.0]--> (30,10)
 (30,10) --[1.0]--> (10,10)
 (70,70) --[1.0]--> (80,70)
 (40,10) --[1.0]--> (50,10)
 (50,10) --[1.0]--> (60,10)
 (60,10) --[1.0]--> (20,10)
 (80,70) --[1.0]--> (90,70)
 (90,70) --[1.0]--> (70,70)


### Triangle counting

In [129]:
val triGraph = graph1.triangleCount()

[36mtriGraph[39m: [32mGraph[39m[[32mInt[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@109b1f5f

In [130]:
printEdges(triGraph)

23/06/02 19:03:24 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
23/06/02 19:03:24 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.


 (10,1) --[1.0]--> (20,1)
 (20,1) --[1.0]--> (30,1)
 (30,1) --[1.0]--> (10,1)
 (70,1) --[1.0]--> (80,1)
 (40,0) --[1.0]--> (50,0)
 (50,0) --[1.0]--> (60,0)
 (60,0) --[1.0]--> (20,1)
 (80,1) --[1.0]--> (90,1)
 (90,1) --[1.0]--> (70,1)


In [131]:
printVertices(triGraph)

Vertex ID = 80: 1
Vertex ID = 30: 1
Vertex ID = 50: 0
Vertex ID = 40: 0
Vertex ID = 90: 1
Vertex ID = 70: 1
Vertex ID = 20: 1
Vertex ID = 60: 0
Vertex ID = 10: 1


### Strongly Connected Components

In [132]:
val sccGraph = graph1.stronglyConnectedComponents(numIter=10)

23/06/02 19:03:32 INFO Pregel: Pregel finished iteration 0
23/06/02 19:03:32 INFO Pregel: Pregel finished iteration 1
23/06/02 19:03:32 INFO Pregel: Pregel finished iteration 0
23/06/02 19:03:32 INFO Pregel: Pregel finished iteration 1


[36msccGraph[39m: [32mGraph[39m[[32mVertexId[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@6fdf83c

In [133]:
val scc_df = spark.createDataFrame(sccGraph.vertices).toDF(Seq("Vertex_id", "strong_conn_comp"):_*)

[36mscc_df[39m: [32mDataFrame[39m = [Vertex_id: bigint, strong_conn_comp: bigint]

In [134]:
printEdges(sccGraph)

 (10,10) --[1.0]--> (20,10)
 (20,10) --[1.0]--> (30,10)
 (30,10) --[1.0]--> (10,10)
 (70,70) --[1.0]--> (80,70)
 (40,40) --[1.0]--> (50,50)
 (50,50) --[1.0]--> (60,60)
 (60,60) --[1.0]--> (20,10)
 (80,70) --[1.0]--> (90,70)
 (90,70) --[1.0]--> (70,70)


## Extracting Graph Algorithm output from the Graphs

In [135]:
val cc_df = spark.createDataFrame(ccGraph.vertices).toDF(Seq("Vertex_id", "Connected_id"):_*)

[36mcc_df[39m: [32mDataFrame[39m = [Vertex_id: bigint, Connected_id: bigint]

In [136]:
cc_df.show()

+---------+------------+
|Vertex_id|Connected_id|
+---------+------------+
|       80|          70|
|       30|          10|
|       50|          10|
|       40|          10|
|       90|          70|
|       70|          70|
|       20|          10|
|       60|          10|
|       10|          10|
+---------+------------+



In [137]:
val tri_df = spark.createDataFrame(triGraph.vertices).toDF(Seq("Vertex_id", "triangle_count"):_*)

[36mtri_df[39m: [32mDataFrame[39m = [Vertex_id: bigint, triangle_count: int]

In [138]:
// join dataframes on Vertex_id : pgrank_df, cc_df, tri_df, scc_df
val graph1_vtx_data = cc_df.join(pgrank_df,cc_df("Vertex_id") === pgrank_df("Vertex_id"),"inner" )
.join(tri_df,cc_df("Vertex_id") === tri_df("Vertex_id"),"inner" )
.join(scc_df,cc_df("Vertex_id") === scc_df("Vertex_id"),"inner" )
.select(cc_df("Vertex_id"),cc_df("Connected_id"),pgrank_df("pagerank_score"), tri_df("triangle_count"), scc_df("strong_conn_comp"))

[36mgraph1_vtx_data[39m: [32mDataFrame[39m = [Vertex_id: bigint, Connected_id: bigint ... 3 more fields]

In [139]:
graph1_vtx_data.show()

23/06/02 19:03:39 INFO ShufflePartitionsUtil: For shuffle(152, 153), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
23/06/02 19:03:39 INFO ShufflePartitionsUtil: For shuffle(154, 155), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576


+---------+------------+-------------------+--------------+----------------+
|Vertex_id|Connected_id|     pagerank_score|triangle_count|strong_conn_comp|
+---------+------------+-------------------+--------------+----------------+
|       10|          10|  1.617229329338826|             1|              10|
|       20|          10|  1.862052953709926|             1|              10|
|       30|          10| 1.7297158594552773|             1|              10|
|       40|          10|0.15569069912311598|             0|              40|
|       50|          10| 0.2880277933777645|             0|              50|
|       60|          10| 0.4005143234942158|             0|              60|
|       70|          70| 0.9822563471669581|             1|              70|
|       80|          70| 0.9822563471669581|             1|              70|
|       90|          70| 0.9822563471669581|             1|              70|
+---------+------------+-------------------+--------------+----------------+

In [140]:
graph1_vtx_data.write
    .option("header",true)
    .mode(SaveMode.Overwrite)
    .csv("/tmp/graph1_properties.csv")

23/06/02 19:03:41 INFO ShufflePartitionsUtil: For shuffle(156, 157), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
23/06/02 19:03:41 INFO ShufflePartitionsUtil: For shuffle(158, 159), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
23/06/02 19:03:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/06/02 19:03:41 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/06/02 19:03:41 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23/06/02 19:03:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/06/02 19:03:41 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/06/02 19:03:41 INFO SQLHadoopMapReduce

## A simple graph algorithm using the Aggregate Messages functionality

In [140]:
// The syntax of the send message/aggregate message methods are:
// val vrdd = graph1.aggregateMessages[Msg: ClassTag](
//       sendMsg: EdgeContext[VD, ED, Msg] => Unit,
//       mergeMsg: (Msg, Msg) => Msg,
//       tripletFields: TripletFields = TripletFields.All)

Using this functionality, we can iteratively send messages and compute some simple metrics:

In [141]:
// get outDegrees of each edge by sending message of "1" from each edge to each source vertex
// then, reduce these by adding
// collect to local array to see vertex id with outdegree count
graph2.aggregateMessages[Int](_.sendToSrc(1), _ + _).collect

[36mres140[39m: [32mArray[39m[([32mVertexId[39m, [32mInt[39m)] = [33mArray[39m(([32m2L[39m, [32m1[39m), ([32m3L[39m, [32m1[39m), ([32m10L[39m, [32m2[39m), ([32m5L[39m, [32m2[39m))

In [142]:
// now, repeat same for counting in-degrees, but by sending messages to each destination:
graph2.aggregateMessages[Int](_.sendToDst(1), _ + _).collect

[36mres141[39m: [32mArray[39m[([32mVertexId[39m, [32mInt[39m)] = [33mArray[39m(([32m1L[39m, [32m1[39m), ([32m7L[39m, [32m2[39m), ([32m3L[39m, [32m1[39m), ([32m5L[39m, [32m2[39m))

Define custom functions for these simple aggregations

In [143]:
def sendMsgDestination(e:EdgeContext[_, _, Int]):Unit = {
    e.sendToDst(1)
    // e.sendToSrc(1)
}

defined [32mfunction[39m [36msendMsgDestination[39m

In [144]:
def mergeMsgMethod(x:Int, y:Int):Int = {
    x + y
}

defined [32mfunction[39m [36mmergeMsgMethod[39m

In [145]:
// in-degrees for graph1, but now using the defined functions
graph1.aggregateMessages[Int](
    sendMsgDestination(_), mergeMsgMethod(_, _)
).rightOuterJoin(graph1.vertices)
.map(x => (x._2._2._1, x._2._2._2, x._2._1.getOrElse(1)))
.collect

[36mres144[39m: [32mArray[39m[([32mLong[39m, [32mDouble[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m80L[39m, [32m0.0[39m, [32m1[39m),
  ([32m30L[39m, [32m0.0[39m, [32m1[39m),
  ([32m50L[39m, [32m0.0[39m, [32m1[39m),
  ([32m40L[39m, [32m0.0[39m, [32m1[39m),
  ([32m90L[39m, [32m0.0[39m, [32m1[39m),
  ([32m70L[39m, [32m0.0[39m, [32m1[39m),
  ([32m20L[39m, [32m0.0[39m, [32m2[39m),
  ([32m60L[39m, [32m0.0[39m, [32m1[39m),
  ([32m10L[39m, [32m0.0[39m, [32m1[39m)
)

In [146]:
graph3.aggregateMessages[Int](
    sendMsgDestination(_), mergeMsgMethod(_, _)
).rightOuterJoin(graph3.vertices)
.map(x => (x._2._2._1, x._2._2._2, x._2._1.getOrElse(1)))
.collect

[36mres145[39m: [32mArray[39m[([32mInt[39m, [32mInt[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m0[39m, [32m0[39m, [32m1[39m),
  ([32m1[39m, [32m2[39m, [32m2[39m),
  ([32m3[39m, [32m0[39m, [32m1[39m),
  ([32m3[39m, [32m1[39m, [32m2[39m),
  ([32m0[39m, [32m1[39m, [32m1[39m),
  ([32m1[39m, [32m3[39m, [32m2[39m),
  ([32m3[39m, [32m2[39m, [32m2[39m),
  ([32m2[39m, [32m0[39m, [32m1[39m),
  ([32m0[39m, [32m2[39m, [32m1[39m),
  ([32m3[39m, [32m3[39m, [32m2[39m),
  ([32m0[39m, [32m3[39m, [32m1[39m),
  ([32m2[39m, [32m1[39m, [32m2[39m),
  ([32m1[39m, [32m0[39m, [32m1[39m),
  ([32m2[39m, [32m2[39m, [32m2[39m),
  ([32m2[39m, [32m3[39m, [32m2[39m),
  ([32m1[39m, [32m1[39m, [32m2[39m)
)

In [147]:
graph4.aggregateMessages[Int](
    sendMsgDestination(_), mergeMsgMethod(_, _)
).rightOuterJoin(graph4.vertices)
.map(x => (x._1, x._2._1.getOrElse(0)))
.collect

[36mres146[39m: [32mArray[39m[([32mVertexId[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m0L[39m, [32m7[39m),
  ([32m6L[39m, [32m0[39m),
  ([32m1L[39m, [32m0[39m),
  ([32m7L[39m, [32m0[39m),
  ([32m2L[39m, [32m0[39m),
  ([32m3L[39m, [32m0[39m),
  ([32m4L[39m, [32m0[39m),
  ([32m5L[39m, [32m0[39m)
)

In [148]:
graph5.aggregateMessages[Int](
    sendMsgDestination(_), mergeMsgMethod(_, _)
).rightOuterJoin(graph5.vertices)
.map(x => (x._1, x._2._1.getOrElse(0), x._2._2))
.collect

[36mres147[39m: [32mArray[39m[([32mVertexId[39m, [32mInt[39m, [32mLong[39m)] = [33mArray[39m(
  ([32m0L[39m, [32m4[39m, [32m4L[39m),
  ([32m6L[39m, [32m7[39m, [32m1L[39m),
  ([32m1L[39m, [32m1[39m, [32m2L[39m),
  ([32m7L[39m, [32m1[39m, [32m6L[39m),
  ([32m8L[39m, [32m5[39m, [32m0L[39m),
  ([32m2L[39m, [32m4[39m, [32m3L[39m),
  ([32m3L[39m, [32m5[39m, [32m3L[39m),
  ([32m9L[39m, [32m5[39m, [32m6L[39m),
  ([32m4L[39m, [32m4[39m, [32m5L[39m),
  ([32m5L[39m, [32m2[39m, [32m8L[39m)
)

In [149]:
printEdges(graph5)

 (0,4) --[1]--> (0,4)
 (0,4) --[1]--> (0,4)
 (0,4) --[1]--> (5,8)
 (0,4) --[1]--> (9,6)
 (1,2) --[1]--> (1,2)
 (1,2) --[1]--> (8,0)
 (2,3) --[1]--> (6,1)
 (2,3) --[1]--> (6,1)
 (2,3) --[1]--> (7,6)
 (3,3) --[1]--> (2,3)
 (3,3) --[1]--> (4,5)
 (3,3) --[1]--> (6,1)
 (4,5) --[1]--> (3,3)
 (4,5) --[1]--> (3,3)
 (4,5) --[1]--> (3,3)
 (4,5) --[1]--> (6,1)
 (4,5) --[1]--> (8,0)
 (5,8) --[1]--> (2,3)
 (5,8) --[1]--> (2,3)
 (5,8) --[1]--> (4,5)
 (5,8) --[1]--> (4,5)
 (5,8) --[1]--> (6,1)
 (5,8) --[1]--> (8,0)
 (5,8) --[1]--> (8,0)
 (5,8) --[1]--> (9,6)
 (6,1) --[1]--> (4,5)
 (7,6) --[1]--> (0,4)
 (7,6) --[1]--> (3,3)
 (7,6) --[1]--> (6,1)
 (7,6) --[1]--> (8,0)
 (7,6) --[1]--> (9,6)
 (7,6) --[1]--> (9,6)
 (9,6) --[1]--> (0,4)
 (9,6) --[1]--> (2,3)
 (9,6) --[1]--> (3,3)
 (9,6) --[1]--> (5,8)
 (9,6) --[1]--> (6,1)
 (9,6) --[1]--> (9,6)


---
## Simple Example of using the Pregel API

Pregel provides an iterative graph-parallel computation mechanism comprising of multiple supersteps.

It requires specifying three functions:

  1. Method to update a vertex given its incoming messages
  2. Method to send message based on a defined logic
  3. Method to merge messages received at a given vertex 

In [149]:
// val graph11 = graph1.pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
//       vprog: (VertexId, VD, A) => VD,
//       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
//       mergeMsg: (A, A) => A)

Let's define these functions to use them for a simple pregel algorithm:

In [150]:
// assume message 'A' is a Int
// update vertex data by adding incoming message value with vertex ID
def vertexProgram(vid: VertexId, existingVtxVal:Int , messageVal:Int ): Int  = {
    existingVtxVal + messageVal
}

defined [32mfunction[39m [36mvertexProgram[39m

In [151]:
// send message only if:
// the current vertex value + edge value/distance is less than destination vertex value
def sendMsgMethod[VD, ED](triplet:EdgeTriplet[VertexId, VertexId]):Iterator[_] = {
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
}

defined [32mfunction[39m [36msendMsgMethod[39m

In [152]:
def mergeMsgMethod(x:Int, y:Int):Int = {
    x + y
}

defined [32mfunction[39m [36mmergeMsgMethod[39m

In [153]:
val exampleVerticesResult = graph3.aggregateMessages[Int](
    sendMsgDestination(_), mergeMsgMethod(_, _)
)

[36mexampleVerticesResult[39m: [32mVertexRDD[39m[[32mInt[39m] = VertexRDDImpl[1147] at RDD at VertexRDD.scala:57

In [154]:
exampleVerticesResult.collect()

[36mres153[39m: [32mArray[39m[([32mVertexId[39m, [32mInt[39m)] = [33mArray[39m(
  ([32m6L[39m, [32m2[39m),
  ([32m12L[39m, [32m1[39m),
  ([32m13L[39m, [32m2[39m),
  ([32m1L[39m, [32m1[39m),
  ([32m7L[39m, [32m2[39m),
  ([32m14L[39m, [32m2[39m),
  ([32m8L[39m, [32m1[39m),
  ([32m2L[39m, [32m1[39m),
  ([32m15L[39m, [32m2[39m),
  ([32m3L[39m, [32m1[39m),
  ([32m9L[39m, [32m2[39m),
  ([32m4L[39m, [32m1[39m),
  ([32m10L[39m, [32m2[39m),
  ([32m11L[39m, [32m2[39m),
  ([32m5L[39m, [32m2[39m)
)

In [155]:
printEdges(graph3)

 (0,(0,0)) --[1.0]--> (1,(0,1))
 (0,(0,0)) --[1.0]--> (4,(1,0))
 (1,(0,1)) --[1.0]--> (2,(0,2))
 (1,(0,1)) --[1.0]--> (5,(1,1))
 (2,(0,2)) --[1.0]--> (3,(0,3))
 (2,(0,2)) --[1.0]--> (6,(1,2))
 (3,(0,3)) --[1.0]--> (7,(1,3))
 (4,(1,0)) --[1.0]--> (5,(1,1))
 (4,(1,0)) --[1.0]--> (8,(2,0))
 (5,(1,1)) --[1.0]--> (6,(1,2))
 (5,(1,1)) --[1.0]--> (9,(2,1))
 (6,(1,2)) --[1.0]--> (7,(1,3))
 (6,(1,2)) --[1.0]--> (10,(2,2))
 (7,(1,3)) --[1.0]--> (11,(2,3))
 (8,(2,0)) --[1.0]--> (9,(2,1))
 (8,(2,0)) --[1.0]--> (12,(3,0))
 (9,(2,1)) --[1.0]--> (10,(2,2))
 (9,(2,1)) --[1.0]--> (13,(3,1))
 (10,(2,2)) --[1.0]--> (11,(2,3))
 (10,(2,2)) --[1.0]--> (14,(3,2))
 (11,(2,3)) --[1.0]--> (15,(3,3))
 (12,(3,0)) --[1.0]--> (13,(3,1))
 (13,(3,1)) --[1.0]--> (14,(3,2))
 (14,(3,2)) --[1.0]--> (15,(3,3))


---
## Applying the Pregel API in a simple algorithm

Here is an example of computing the single source shortest path in a graph.

In [156]:
def ShortestPath(graph: Graph[(Long, Double), Double], srcID: VertexId): RDD[Row] = {
    
    // Initialize the graph such that all vertices except the root have distance infinity.
    val initialGraph = graph.mapVertices(
        (id, _) =>
        if (id == srcID) 0.0 else Double.PositiveInfinity
    )

    val sssp = initialGraph.pregel(Double.PositiveInfinity, maxIterations=7)(
      (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
      triplet => {  // Send Message
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },
      (a, b) => math.min(a, b) // Merge Message
    )
    
    return( sssp.vertices.filter(y => y._2 < Double.PositiveInfinity).map( x => Row(srcID, x._1, x._2)))
}

defined [32mfunction[39m [36mShortestPath[39m

In [157]:
val distances = ShortestPath(graph1, 10)

23/06/02 19:04:22 INFO Pregel: Pregel finished iteration 0
23/06/02 19:04:22 INFO Pregel: Pregel finished iteration 1


[36mdistances[39m: [32mRDD[39m[[32mRow[39m] = MapPartitionsRDD[1203] at map at cmd155.sc:21

In [160]:
val rowArray = distances.collect()
println("Distance from Src node (10) to Destination node =")
println(rowArray)

Distance from Src node (10) to Destination node =
[Lorg.apache.spark.sql.Row;@570826fc


[36mrowArray[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([10,30,2.0], [10,20,1.0], [10,10,0.0])

In [163]:
println("Distance from Src node (40) to Destination node =")
println(ShortestPath(graph1, 40).collect().mkString("\n"))

Distance from Src node (40) to Destination node =


23/06/02 19:06:01 INFO Pregel: Pregel finished iteration 0
23/06/02 19:06:01 INFO Pregel: Pregel finished iteration 1
23/06/02 19:06:01 INFO Pregel: Pregel finished iteration 2
23/06/02 19:06:01 INFO Pregel: Pregel finished iteration 3
23/06/02 19:06:02 INFO Pregel: Pregel finished iteration 4


[40,30,4.0]
[40,50,1.0]
[40,40,0.0]
[40,20,3.0]
[40,60,2.0]
[40,10,5.0]


---

### Stop the Spark Session

In [164]:
spark.stop()

23/06/02 19:06:06 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040


In [97]:
sc.stop()