In [1]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.apache.spark::spark-mllib:2.1.0`
import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

val sparkSession = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
                    .jupyter()
                    .master("local[*]")
                    .appName("notebook")
                    .getOrCreate()
val sc = sparkSession.sparkContext

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[32mimport [39m[36m$exclude.$                        , $ivy.$                            // for cleaner logs
[39m
[32mimport [39m[36m$profile.$           
[39m
[32mimport [39m[36m$ivy.$                                   // adjust spark version - spark >= 2.0
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36mjupyter.spark.session._

[39m
[32mimport [39m[36morg.apache.spark.graphx._
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD

[39m
[32mimport [39m[36morg.apache.spark.graphx.{Graph, VertexRDD}
[39m
[32mimport [39m[36morg.apache.spark.graphx.util.GraphGenerators

[39m
[36msparkSession[39m: [32morg[39

# Basics

In [2]:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

[36musers[39m: [32mRDD[39m[([32mVertexId[39m, ([32mString[39m, [32mString[39m))] = ParallelCollectionRDD[0] at parallelize at cmd1.sc:2
[36mrelationships[39m: [32mRDD[39m[[32mEdge[39m[[32mString[39m]] = ParallelCollectionRDD[1] at parallelize at cmd1.sc:6
[36mdefaultUser[39m: ([32mString[39m, [32mString[39m) = ([32m"John Doe"[39m, [32m"Missing"[39m)
[36mgraph[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@5e97b350

In [3]:
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

                                                                                

[36mres2_0[39m: [32mLong[39m = [32m1L[39m
[36mres2_1[39m: [32mLong[39m = [32m1L[39m
[36mres2_2[39m: [32mLong[39m = [32m1L[39m

In [4]:
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] = graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the pi of jgonzal


[36mfacts[39m: [32mRDD[39m[[32mString[39m] = MapPartitionsRDD[24] at map at cmd3.sc:1

In [5]:
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

(2,(istoica,prof))
(3,(rxin,student))
(5,(franklin,prof))
(7,(jgonzal,postdoc))
rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the pi of jgonzal


[36mvalidGraph[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@40103d2c

In [6]:
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

[36mccGraph[39m: [32mGraph[39m[[32mVertexId[39m, [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@6926d1a4
[36mvalidGraph[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@1e1d9ea1
[36mvalidCCGraph[39m: [32mGraph[39m[[32mVertexId[39m, [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@11df10ee

In [7]:
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

[36moutDegrees[39m: [32mVertexRDD[39m[[32mInt[39m] = VertexRDDImpl[97] at RDD at VertexRDD.scala:57
[36mdegreeGraph[39m: [32mGraph[39m[[32mInt[39m, [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@45d17877

In [8]:
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

(96,99.0)
(56,76.0)
(16,60.48148148148148)
(80,92.5)
(48,70.38888888888889)
(32,56.666666666666664)
(0,48.42307692307692)
(24,55.7)
(64,84.6)
(40,71.65217391304348)
(72,89.0)
(8,50.51724137931034)
(88,93.33333333333333)
(41,67.76470588235294)
(81,91.5)
(25,67.66666666666667)
(65,76.625)
(73,78.71428571428571)
(57,79.27272727272727)
(33,57.294117647058826)
(1,46.30434782608695)
(89,96.2)
(17,55.72727272727273)
(9,56.0)
(49,71.92307692307692)
(34,60.65384615384615)
(82,92.6)
(66,84.125)
(98,99.0)
(50,79.17647058823529)
(42,66.04761904761905)
(74,91.125)
(90,96.25)
(18,59.08)
(58,76.83333333333333)
(26,65.34782608695652)
(10,55.916666666666664)
(2,55.18518518518518)
(19,65.34782608695652)
(59,77.28571428571429)
(11,51.19047619047619)
(35,64.9375)
(27,60.51851851851852)
(75,82.57142857142857)
(51,64.76923076923077)
(83,94.33333333333333)
(67,82.375)
(3,53.04545454545455)
(91,93.8)
(43,71.0625)
(84,92.25)
(52,69.92857142857143)
(4,42.035714285714285)
(76,90.875)
(28,58.04545454545455)
(36,7

[36mgraph[39m: [32mGraph[39m[[32mDouble[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@62d164c
[36molderFollowers[39m: [32mVertexRDD[39m[([32mInt[39m, [32mDouble[39m)] = VertexRDDImpl[128] at RDD at VertexRDD.scala:57
[36mavgAgeOfOlderFollowers[39m: [32mVertexRDD[39m[[32mDouble[39m] = VertexRDDImpl[130] at RDD at VertexRDD.scala:57

In [9]:
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

defined [32mfunction[39m [36mmax[39m
[36mmaxInDegree[39m: ([32mVertexId[39m, [32mInt[39m) = ([32m27L[39m, [32m43[39m)
[36mmaxOutDegree[39m: ([32mVertexId[39m, [32mInt[39m) = ([32m31L[39m, [32m54[39m)
[36mmaxDegrees[39m: ([32mVertexId[39m, [32mInt[39m) = ([32m94L[39m, [32m88[39m)

# Algorithms

In [10]:
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

(justinbieber,0.15)
(matei_zaharia,0.7013599933629602)
(ladygaga,1.390049198216498)
(BarackObama,1.4588814096664682)
(jeresig,0.9993442038507723)
(odersky,1.2973176314422592)


[36mgraph[39m: [32mGraph[39m[[32mInt[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@18e81e30
[36mranks[39m: [32mVertexRDD[39m[[32mDouble[39m] = VertexRDDImpl[1040] at RDD at VertexRDD.scala:57
[36musers[39m: [32mRDD[39m[([32mLong[39m, [32mString[39m)] = MapPartitionsRDD[1045] at map at cmd9.sc:5
[36mranksByUsername[39m: [32mRDD[39m[([32mString[39m, [32mDouble[39m)] = MapPartitionsRDD[1049] at map at cmd9.sc:9

In [11]:
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))

(justinbieber,1)
(matei_zaharia,3)
(ladygaga,1)
(BarackObama,1)
(jeresig,3)
(odersky,3)


[36mgraph[39m: [32mGraph[39m[[32mInt[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@70c81086
[36mcc[39m: [32mVertexRDD[39m[[32mVertexId[39m] = VertexRDDImpl[1083] at RDD at VertexRDD.scala:57
[36musers[39m: [32mRDD[39m[([32mLong[39m, [32mString[39m)] = MapPartitionsRDD[1101] at map at cmd10.sc:5
[36mccByUsername[39m: [32mRDD[39m[([32mString[39m, [32mVertexId[39m)] = MapPartitionsRDD[1105] at map at cmd10.sc:9

In [12]:
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))

(justinbieber,0)
(matei_zaharia,1)
(ladygaga,0)
(BarackObama,0)
(jeresig,1)
(odersky,1)


[36mgraph[39m: [32mGraph[39m[[32mInt[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@3a634233
[36mtriCounts[39m: [32mVertexRDD[39m[[32mInt[39m] = VertexRDDImpl[1175] at RDD at VertexRDD.scala:57
[36musers[39m: [32mRDD[39m[([32mLong[39m, [32mString[39m)] = MapPartitionsRDD[1180] at map at cmd11.sc:6
[36mtriCountByUsername[39m: [32mRDD[39m[([32mString[39m, [32mInt[39m)] = MapPartitionsRDD[1184] at map at cmd11.sc:10

In [13]:
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

(1,(1.453834747463902,List(BarackObama, Barack Obama)))
(2,(1.3857595353443166,List(ladygaga, Goddess of Love)))
(7,(1.2892158818481694,List(odersky, Martin Odersky)))
(3,(0.9936187772892124,List(jeresig, John Resig)))
(6,(0.697916749785472,List(matei_zaharia, Matei Zaharia)))


[36musers[39m: [32mRDD[39m[([32mLong[39m, [32mArray[39m[[32mString[39m])] = MapPartitionsRDD[1188] at map at cmd12.sc:2
[36mfollowerGraph[39m: [32mGraph[39m[[32mInt[39m, [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@770788bf
[36mgraph[39m: [32mGraph[39m[[32mArray[39m[[32mString[39m], [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@4d3ed67e
[36msubgraph[39m: [32mGraph[39m[[32mArray[39m[[32mString[39m], [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@8fffe7a
[36mpagerankGraph[39m: [32mGraph[39m[[32mDouble[39m, [32mDouble[39m] = org.apache.spark.graphx.impl.GraphImpl@5c8f6174
[36muserInfoWithPageRank[39m: [32mGraph[39m[([32mDouble[39m, [32mList[39m[[32mString[39m]), [32mInt[39m] = org.apache.spark.graphx.impl.GraphImpl@37edbb1