# GraphFrames Exercise

In [15]:
import org.graphframes.{examples, GraphFrame}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{functions => F}

import org.apache.spark.sql.{functions=>F}


In [38]:
sc.setCheckpointDir("tmp")

lastException: Throwable = null


## 1. Creating GraphFrames

In [5]:
// Vertex DF

val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")

v = [id: string, name: string ... 1 more field]


[id: string, name: string ... 1 more field]

In [6]:
// Edge DF

val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

e = [src: string, dst: string ... 1 more field]


[src: string, dst: string ... 1 more field]

In [7]:
// Create GraphFrame

val g = GraphFrame(v, e)

g = GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

## 2. Basic Graph and DataFrame queries

In [22]:
// Import a GraphFrame

val g: GraphFrame = examples.Graphs.friends

g = GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [11]:
// Display vertexDF

g.vertices.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
|  g|  Gabby| 60|
+---+-------+---+



In [12]:
// Display edgesDF

g.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+



In [14]:
// Get in-degree information

val vertexInDegrees: DataFrame = g.inDegrees

vertexInDegrees = [id: string, inDegree: int]


[id: string, inDegree: int]

In [16]:
// Find the youngest users'age

g.vertices.agg(F.min("age")).show()

+--------+
|min(age)|
+--------+
|      29|
+--------+



In [19]:
// Count the number of "follows" in the graph.

g.edges.filter(F.col("relationship") === "follow").count()

4

## 3. Motif finding

In [24]:
// Search for pairs of vertices with edges in both directions between them.

val motifs: DataFrame = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|
+----------------+--------------+----------------+--------------+



motifs = [a: struct<id: string, name: string ... 1 more field>, e: struct<src: string, dst: string ... 1 more field> ... 2 more fields]


[a: struct<id: string, name: string ... 1 more field>, e: struct<src: string, dst: string ... 1 more field> ... 2 more fields]

In [25]:
motifs.printSchema()

root
 |-- a: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)
 |-- e: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- relationship: string (nullable = true)
 |-- b: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)
 |-- e2: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- relationship: string (nullable = true)



In [28]:
// More complex query
motifs.filter("b.age > 30").show()

+----------------+--------------+------------+--------------+
|               a|             e|           b|            e2|
+----------------+--------------+------------+--------------+
|[c, Charlie, 30]|[c, b, follow]|[b, Bob, 36]|[b, c, follow]|
+----------------+--------------+------------+--------------+



## 4. Subgraphs

* Simple subgraph: `filderVertices`, `filterEdges`, `dropIsolatedVertices`
* Complex subgraph: triplet filters + motifs

In [31]:
// Simple subgraph: vertex and edge
// Select subgraph of users older than 30, and relationships of type "friend".
// Drop isolated vertices (users) which are not contained in any edges (relationships).

val g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

g1 = GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


lastException: Throwable = null


GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [33]:
// Complex subgraph: triplet filters

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = {g.find("(a)-[e]->(b)")
             .filter("e.relationship = 'follow'")
            .filter("a.age < b.age")}

// Extract edge info
val e2 = paths.select("e.src", "e.dst", "e.relationship")

paths = [a: struct<id: string, name: string ... 1 more field>, e: struct<src: string, dst: string ... 1 more field> ... 1 more field]
e2 = [src: string, dst: string ... 1 more field]


[src: string, dst: string ... 1 more field]

In [34]:
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)

g2 = GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

## 5. Graph Algorithms

* Breadth-first search: `bfs`.
* Connected componets: `connectedComponents`; Strongly connected componets: `stronglyConnectedComponents`.
* Label propagation algorithm (LPA): `labelPropagation`.
* Page Rank: two implementations: graphX + aggregateMessages (setting .maxIter) and graphX + Pregel (setting .tol).
* Shortest paths: `shortestPaths` (you have to set the landMarks).
* Triangle count: `triangleCount`.

In [40]:
// Breadth-first search: bfs
// Search from "Esther" for users of age < 32.

val paths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
paths.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|
+---------------+--------------+--------------+



paths = [from: struct<id: string, name: string ... 1 more field>, e0: struct<src: string, dst: string ... 1 more field> ... 1 more field]


[from: struct<id: string, name: string ... 1 more field>, e0: struct<src: string, dst: string ... 1 more field> ... 1 more field]

In [39]:
// Connected componets: connectedComponents

val result = g.connectedComponents.run()
result.select("id", "component").orderBy("component").show()

+---+------------+
| id|   component|
+---+------------+
|  g|146028888064|
|  b|412316860416|
|  e|412316860416|
|  a|412316860416|
|  d|412316860416|
|  f|412316860416|
|  c|412316860416|
+---+------------+



result = [id: string, name: string ... 2 more fields]


[id: string, name: string ... 2 more fields]

In [41]:
// Strongly connected componets: stronglyConnectedComponents

val result = g.stronglyConnectedComponents.maxIter(10).run()
result.select("id", "component").orderBy("component").show()

+---+-------------+
| id|    component|
+---+-------------+
|  g| 146028888064|
|  f| 412316860416|
|  a| 670014898176|
|  d| 670014898176|
|  e| 670014898176|
|  b|1047972020224|
|  c|1047972020224|
+---+-------------+



result = [id: string, name: string ... 2 more fields]


[id: string, name: string ... 2 more fields]

In [42]:
// Label propagation algorithm (LPA): labelPropagation

val result = g.labelPropagation.maxIter(5).run()
result.select("id", "label").show()

+---+-------------+
| id|        label|
+---+-------------+
|  b|1047972020224|
|  e|1460288880640|
|  a| 670014898176|
|  f| 670014898176|
|  g| 146028888064|
|  d|1460288880640|
|  c|1382979469312|
+---+-------------+



result = [id: string, name: string ... 2 more fields]


[id: string, name: string ... 2 more fields]

In [43]:
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()

results = GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])


GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])

In [44]:
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()

results2 = GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])


GraphFrame(v:[id: string, name: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])

In [45]:
// Shortest Paths: shortestPaths

val results = g.shortestPaths.landmarks(Seq("a", "d")).run()

results = [id: string, name: string ... 2 more fields]


[id: string, name: string ... 2 more fields]

In [46]:
// Triangle Count: triangleCount

val results = g.triangleCount.run()
results.select("id", "count").show()

+---+-----+
| id|count|
+---+-----+
|  g|    0|
|  f|    0|
|  e|    1|
|  d|    1|
|  c|    0|
|  b|    0|
|  a|    1|
+---+-----+



results = [count: bigint, id: string ... 2 more fields]


[count: bigint, id: string ... 2 more fields]

## 6. Saving and Loading GraphFrames

In [None]:
// Save vertices and edges as Parquet to some location.
g.vertices.write.parquet("hdfs://myLocation/vertices")
g.edges.write.parquet("hdfs://myLocation/edges")

// Load the vertices and edges back.
val sameV = sqlContext.read.parquet("hdfs://myLocation/vertices")
val sameE = sqlContext.read.parquet("hdfs://myLocation/edges")

// Create an identical GraphFrame.
val sameG = GraphFrame(sameV, sameE)

## 7. Message passing via AggregateMessages

In [48]:
import org.graphframes.lib.AggregateMessages


// We will use AggregateMessages utilities later, so name it "AM" for short.
val AM = AggregateMessages

// For each user, sum the ages of the adjacent users.
val msgToSrc = AM.dst("age")
val msgToDst = AM.src("age")
val agg = { g.aggregateMessages
  .sendToSrc(msgToSrc)  // send destination user's age to source
  .sendToDst(msgToDst)  // send source user's age to destination
  .agg(F.sum(AM.msg).as("summedAges")) } // sum up ages, stored in AM.msg column
agg.show()

+---+----------+
| id|summedAges|
+---+----------+
|  f|        62|
|  e|        99|
|  d|        66|
|  c|       108|
|  b|        94|
|  a|        97|
+---+----------+



AM = org.graphframes.lib.AggregateMessages$@13f6408b
msgToSrc = dst[age]
msgToDst = src[age]
agg = [id: string, summedAges: bigint]


[id: string, summedAges: bigint]

## 8. GraphX - GraphFrames conversions

In [49]:
import org.apache.spark.graphx.Graph
import org.apache.spark.sql.Row

In [None]:
// Convert to GraphX
val gx: Graph[Row, Row] = g.toGraphX

// Convert back to GraphFrame.
// Note that the schema is changed because of constraints in the GraphX API.
val g2: GraphFrame = GraphFrame.fromGraphX(gx)