In [41]:
# sc.addPyFile("../graphframes-0.8.0-spark3.0-s_2.12.jar")

In [1]:
from graphframes import *
from pyspark.sql.functions import *

In [2]:
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"]) #id is compulsory

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"]) #src, dst are compulsory

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

                                                                                

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 37|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 38|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  g|  e|      follow|
+---+---+------------+



In [3]:
# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them

g.edges.filter("src = 'a'").show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [4]:
g.edges.filter("src = 'a'").count()

2

In [5]:
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter("relationship = 'follow' and dst = 'c'").count())

2


In [8]:
# A GraphFrame has additional attributes

g.outDegrees.show() #return dataframe

+---+---------+
| id|outDegree|
+---+---------+
|  g|        1|
|  f|        1|
|  e|        2|
|  d|        1|
|  c|        1|
|  b|        1|
|  a|        2|
+---+---------+



In [9]:
g.inDegrees.show() #data frame also non-materia, same as RDD. exists when action

+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+



In [9]:
g.inDegrees.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#171]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]




In [10]:
myInDegrees = g.edges.groupBy('dst').count()\
               .withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')
myInDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+



In [11]:
myInDegrees.explain() #same as the plan of in-degree

== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#218]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]




In [14]:
print(g.inDegrees.storageLevel)

Serialized 1x Replicated


In [15]:
g.inDegrees.cache()

DataFrame[id: string, inDegree: int]

In [16]:
print(g.inDegrees.storageLevel) #Disk Memory means materialize

Disk Memory Deserialized 1x Replicated


In [15]:
print(g.vertices.storageLevel) 

Serialized 1x Replicated


In [17]:
g.cache()

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [18]:
print(g.vertices.storageLevel) 
print(g.edges.storageLevel)

Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated


In [19]:
# A triplet view of the graph

g.triplets.show() #not efficient

                                                                                

+----------------+--------------+----------------+
|             src|          edge|             dst|
+----------------+--------------+----------------+
| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
|  [g, Gabby, 60]|[g, e, follow]| [e, Esther, 32]|
|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
|  [f, Fanny, 38]|[f, c, follow]|[c, Charlie, 37]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[c, Charlie, 37]|[c, b, follow]|    [b, Bob, 36]|
|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
+----------------+--------------+----------------+



In [20]:
g.triplets.explain()

== Physical Plan ==
*(3) Project [src#225, edge#223, dst#227]
+- *(3) BroadcastHashJoin [edge#223.dst], [dst#227.id], Inner, BuildRight
   :- *(3) BroadcastHashJoin [edge#223.src], [src#225.id], Inner, BuildRight
   :  :- *(3) Project [struct(src, src#6, dst, dst#7, relationship, relationship#8) AS edge#223]
   :  :  +- InMemoryTableScan [dst#7, relationship#8, src#6]
   :  :        +- InMemoryRelation [src#6, dst#7, relationship#8], StorageLevel(disk, memory, deserialized, 1 replicas)
   :  :              +- *(1) Scan ExistingRDD[src#6,dst#7,relationship#8]
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#389]
   :     +- *(1) Project [struct(id, id#0, name, name#1, age, age#2L) AS src#225]
   :        +- InMemoryTableScan [age#2L, id#0, name#1]
   :              +- InMemoryRelation [id#0, name#1, age#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                    +- *(1) Scan ExistingRDD

### Motif Finding

In [23]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[]->(b); (b)-[]->(a)").filter('a.id < b.id') #delte c->b
motifs.show()

+------------+----------------+
|           a|               b|
+------------+----------------+
|[b, Bob, 36]|[c, Charlie, 37]|
+------------+----------------+



In [25]:
# Find triangles

triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
triangles = triangles.filter("a.id < b.id AND a.id < c.id") #cannot use b.id < c.id
triangles.show()

+--------------+---------------+--------------+
|             a|              b|             c|
+--------------+---------------+--------------+
|[a, Alice, 34]|[e, Esther, 32]|[d, David, 29]|
+--------------+---------------+--------------+



In [26]:
triangles.explain()

== Physical Plan ==
*(6) Project [a#1611, b#1613, c#1638]
+- *(6) BroadcastHashJoin [c#1638.id, a#1611.id], [__tmp2506060614762666678#1668.src, __tmp2506060614762666678#1668.dst], Inner, BuildRight
   :- *(6) Project [a#1611, b#1613, c#1638]
   :  +- *(6) BroadcastHashJoin [__tmp-3851898762290097694#1636.dst], [c#1638.id], Inner, BuildRight, (a#1611.id < c#1638.id)
   :     :- *(6) BroadcastHashJoin [b#1613.id], [__tmp-3851898762290097694#1636.src], Inner, BuildRight
   :     :  :- *(6) Project [a#1611, b#1613]
   :     :  :  +- *(6) BroadcastHashJoin [__tmp-4480780508598698291#1609.dst], [b#1613.id], Inner, BuildRight, (a#1611.id < b#1613.id)
   :     :  :     :- *(6) BroadcastHashJoin [__tmp-4480780508598698291#1609.src], [a#1611.id], Inner, BuildRight
   :     :  :     :  :- *(6) Project [struct(src, src#6, dst, dst#7, relationship, relationship#8) AS __tmp-4480780508598698291#1609]
   :     :  :     :  :  +- InMemoryTableScan [dst#7, relationship#8, src#6]
   :     :  :     :  :   

In [28]:
# Negation
oneway = g.find("(a)-[]->(b); !(b)-[]->(a)")
oneway.show()

                                                                                

+---------------+----------------+
|              a|               b|
+---------------+----------------+
| [a, Alice, 34]| [e, Esther, 32]|
|[e, Esther, 32]|  [d, David, 29]|
| [a, Alice, 34]|    [b, Bob, 36]|
| [g, Gabby, 60]| [e, Esther, 32]|
|[e, Esther, 32]|  [f, Fanny, 38]|
| [f, Fanny, 38]|[c, Charlie, 37]|
| [d, David, 29]|  [a, Alice, 34]|
+---------------+----------------+



In [29]:
# Find vertices without incoming edges:
g.find("!()-[]->(a)").show()

+--------------+
|             a|
+--------------+
|[g, Gabby, 60]|
+--------------+



In [33]:
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?

g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 36").show() #e can not appear at both [], cause confusion 

+------------+--------------+----------------+
|           a|             e|               b|
+------------+--------------+----------------+
|[b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
+------------+--------------+----------------+



In [38]:
g.find("(a)-[]->(b); (b)-[]->(a)").filter("b.age > 36").explain()

== Physical Plan ==
*(4) Project [a#6606, b#6608]
+- *(4) BroadcastHashJoin [b#6608.id, a#6606.id], [__tmp-753465915533410812#6631.src, __tmp-753465915533410812#6631.dst], Inner, BuildRight
   :- *(4) Project [a#6606, b#6608]
   :  +- *(4) BroadcastHashJoin [__tmp4002724160828027973#6604.dst], [b#6608.id], Inner, BuildRight
   :     :- *(4) BroadcastHashJoin [__tmp4002724160828027973#6604.src], [a#6606.id], Inner, BuildRight
   :     :  :- *(4) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp4002724160828027973#6604]
   :     :  :  +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :  :        +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :              +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :     :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#2800]
   :     :     +- *(1

In [34]:
# Find chains of 4 vertices such that at least 2 of the 3 edges are "friend" relationships.
# The when function is similar to the CASE WHEN in SQL

chain4 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)").where('a!=d AND a!=c AND b!=d')

friendTo1 = lambda e: when(e['relationship'] == 'friend', 1).otherwise(0)

#the function will create a new column, and we will alias it
chain4.select('*',friendTo1(chain4['e1']).alias('f1'), \
                  friendTo1(chain4['e2']).alias('f2'), \
                  friendTo1(chain4['e3']).alias('f3')) \
      .where('f1 + f2 + f3 >= 2').select('a', 'b', 'c', 'd').show()

+---------------+---------------+---------------+----------------+
|              a|              b|              c|               d|
+---------------+---------------+---------------+----------------+
|[e, Esther, 32]| [d, David, 29]| [a, Alice, 34]|    [b, Bob, 36]|
| [d, David, 29]| [a, Alice, 34]|[e, Esther, 32]|  [f, Fanny, 38]|
| [d, David, 29]| [a, Alice, 34]|   [b, Bob, 36]|[c, Charlie, 37]|
| [g, Gabby, 60]|[e, Esther, 32]| [d, David, 29]|  [a, Alice, 34]|
+---------------+---------------+---------------+----------------+



### Subgraphs

In [48]:
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).

g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
      .dropIsolatedVertices()

g1.vertices.show()
g1.edges.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  a| Alice| 34|
|  b|   Bob| 36|
|  e|Esther| 32|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [40]:
paths.show()

+---------------+--------------+----------------+
|              a|             e|               b|
+---------------+--------------+----------------+
|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
+---------------+--------------+----------------+



In [37]:
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b".

paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")

# "paths" contains vertex info. Extract the edges.

e2 = paths.select("e.*")

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2).dropIsolatedVertices() #use g.vertices infomation and edge infomation

g2.vertices.show()
g2.edges.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  b|    Bob| 36|
|  c|Charlie| 37|
|  e| Esther| 32|
|  f|  Fanny| 38|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  e|  f|      follow|
+---+---+------------+



### BFS

In [41]:
# Starting vertex is 'a'
layers = [g.vertices.select('id').where("id = 'a'")]
visited =  layers[0]

while layers[-1].count() > 0:
    # From the current layer, get all the one-hop neighbors
    d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
    d1.show() #print the result
    
    # Rename the column as 'id', and remove visited verices and duplicates
    d2 = d1.select(d1['dst'].alias('id')) \
           .subtract(visited).distinct().cache()
    
    layers += [d2] #all the new node visted
    visited = visited.union(layers[-1]).cache() #layers[-1] is d2

+---+---+---+------------+
| id|src|dst|relationship|
+---+---+---+------------+
|  a|  a|  b|      friend|
|  a|  a|  e|      friend|
+---+---+---+------------+



                                                                                

+---+---+---+------------+
| id|src|dst|relationship|
+---+---+---+------------+
|  b|  b|  c|      follow|
|  e|  e|  f|      follow|
|  e|  e|  d|      friend|
+---+---+---+------------+

+---+---+---+------------+
| id|src|dst|relationship|
+---+---+---+------------+
|  c|  c|  b|      follow|
|  f|  f|  c|      follow|
|  d|  d|  a|      friend|
+---+---+---+------------+



                                                                                

In [44]:
layers[0].show()

+---+
| id|
+---+
|  a|
+---+



In [45]:
layers[1].show()

+---+
| id|
+---+
|  e|
|  b|
+---+



In [46]:
layers[2].show()

+---+
| id|
+---+
|  f|
|  d|
|  c|
+---+



In [47]:
layers[3].show()

+---+
| id|
+---+
+---+



In [48]:
# GraphFrames provides own BFS:

paths = g.bfs("id = 'a'", "age > 36")
paths.show()

+--------------+--------------+---------------+--------------+----------------+
|          from|            e0|             v1|            e1|              to|
+--------------+--------------+---------------+--------------+----------------+
|[a, Alice, 34]|[a, b, friend]|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
+--------------+--------------+---------------+--------------+----------------+



### List Ranking

In [64]:
# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)] #[node,pointer]
e = spark.createDataFrame(data, ['src', 'dst'])
v = e.select(col('src').alias('id'), when(e.dst == -1, 0).otherwise(1).alias('d'))  #end has value 0, otherwise 1
v1 = spark.createDataFrame([(-1, 0)], ['id', 'd']) #dummy id
v = v.union(v1)
v.show()
e.show()

+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  1|
|  3|  1|
|  4|  1|
|  5|  0|
|  6|  1|
| -1|  0|
+---+---+

+---+---+
|src|dst|
+---+---+
|  0|  5|
|  1|  0|
|  3|  4|
|  4|  6|
|  5| -1|
|  6|  1|
+---+---+



In [67]:
while e.filter('dst != -1').count() > 0:
    g = GraphFrame(v, e)
    g.cache()
    v = g.triplets.select(col('src.id').alias('id'), 
                          (col('src.d') + col('dst.d')).alias('d')) \
         .union(v1)
    e = g.find('(a)-[]->(b); (b)-[]->(c)') \
         .select(col('a.id').alias('src'), col('c.id').alias('dst')) \
         .union(e.filter('dst = -1')) #add the dummy variable back
v.show()

+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  2|
|  3|  5|
|  4|  4|
|  5|  0|
|  6|  3|
| -1|  0|
+---+---+



### Message passing via AggregateMessages

In [74]:
from pyspark.sql.functions import coalesce, col, lit, sum, when, min, max
from graphframes.lib import AggregateMessages as AM

# AggregateMessages has the following members: src, dst, edge, msg
# For each user, sum the ages of the adjacent users.
agg = g.aggregateMessages(
    sum(AM.msg).alias("summedAges"),
    sendToSrc = AM.dst['age'],
    sendToDst = AM.src['age'])
agg.show()

                                                                                

+---+----------+
| id|summedAges|
+---+----------+
|  g|        32|
|  f|        69|
|  e|       161|
|  d|        66|
|  c|       110|
|  b|       108|
|  a|        97|
+---+----------+



### The Pregel Model for Graph Computation

In [75]:
# Pagerank in the Pregel model 

from pyspark.sql.functions import coalesce, col, lit, sum, when, min
from graphframes.lib import Pregel

# Need to set up a directory for Pregel computation
sc.setCheckpointDir("checkpoint")

'''
Use builder pattern to describe the operations.
Call run() to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions 
defined by withVertexColumn(). Those additional vertex properties can be 
changed during Pregel iterations. In each Pregel iteration, there are three 
phases:

* Given each edge triplet, generate messages and specify target vertices to 
  send, described by sendMsgToDst() and sendMsgToSrc().
* Aggregate messages by target vertex IDs, described by aggMsgs().
* Update additional vertex properties based on aggregated messages and states 
  from previous iteration, described by withVertexColumn().
'''
v = g.outDegrees
g = GraphFrame(v,e)
ranks = g.pregel \
        .setMaxIter(5) \
        .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \
        .aggMsgs(sum(Pregel.msg())) \
        .withVertexColumn("rank", lit(1.0), \
            coalesce(Pregel.msg(), lit(0.0)) * lit(0.85) + lit(0.15)) \
        .run()
ranks.show()

# pyspark.sql.functions.coalesce(*cols): Returns the first column that is not null.
# Not to be confused with spark.sql.coalesce(numPartitions)


                                                                                

+---+---------+-------------------+
| id|outDegree|               rank|
+---+---------+-------------------+
|  g|        1|               0.15|
|  f|        1|0.41104330078124995|
|  e|        2| 0.5032932031249999|
|  d|        1|0.41104330078124995|
|  c|        1|  2.780783203124999|
|  b|        1| 2.2680220312499997|
|  a|        2|    0.4758149609375|
+---+---------+-------------------+



In [76]:
# BFS in the Pregel model

g = GraphFrame(v,e)

dist = g.pregel \
        .sendMsgToDst(when(Pregel.src('active'), Pregel.src('d') + 1)) \
        .aggMsgs(min(Pregel.msg())) \
        .withVertexColumn('d', when(v['id'] == 'a', 0).otherwise(99999), \
            when(Pregel.msg() < col('d'), Pregel.msg()).otherwise(col('d'))) \
        .withVertexColumn('active', when(v['id'] == 'a', True).otherwise(False), \
            when(Pregel.msg() < col('d'), True).otherwise(False)) \
        .run()
dist.show()


                                                                                

+---+---------+-----+------+
| id|outDegree|    d|active|
+---+---------+-----+------+
|  g|        1|99999| false|
|  f|        1|    2| false|
|  e|        2|    1| false|
|  d|        1|    2| false|
|  c|        1|    2| false|
|  b|        1|    1| false|
|  a|        2|    0| false|
+---+---------+-----+------+

