In [1]:
from graphframes import *
from pyspark import *
from pyspark.sql import *

In [2]:
# To install GraphFrame https://stackoverflow.com/questions/39261370/unable-to-run-a-basic-graphframes-example
# Download jars: https://spark-packages.org/package/graphframes/graphframes based on your Spark version
# After copying graphframe from C:/Users/I050385/.ivy2/jars to C:/D/Spark-2.4.4/jars then run 
# C:\D\Spark-2.4.4\jars> pyspark --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 --jars graphframes_graphframes-0.7.0-spark2.4-s_2.11.jar

spark = SparkSession.builder.master("local[*]").appName("Graph_Test").getOrCreate()
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50), 
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])
edges = spark.createDataFrame([('1', '2', 'friend'), 
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', '4', 'friend'),
                               ('4', '3', 'friend'),
                               ('5', '3', 'friend'),
                               ('3', '5', 'friend'),
                               ('4', '5', 'follows'),
                              ('98', '99', 'friend'),
                              ('99', '98', 'friend')],
                              ['src', 'dst', 'type'])

In [3]:
g = GraphFrame(vertices, edges)

In [4]:
g.vertices.show()
g.edges.show()
g.degrees.show() # degree = in_degree + out_degree

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  2|   May|  Derrick| 26|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
| 98|  Berg|      Tim| 28|
| 99|  Page|    Allan| 16|
+---+------+---------+---+

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  2|  1| friend|
|  3|  1| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  4|  3| friend|
|  5|  3| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
| 99| 98| friend|
+---+---+-------+

+---+------+
| id|degree|
+---+------+
|  3|     7|
| 98|     2|
| 99|     2|
|  5|     3|
|  1|     4|
|  4|     3|
|  2|     3|
+---+------+



In [5]:
g.vertices.filter('age > 30').show()

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
+---+------+---------+---+



In [6]:
g.inDegrees.filter("inDegree >= 2").sort("inDegree", ascending=False).show()

+---+--------+
| id|inDegree|
+---+--------+
|  3|       4|
|  1|       2|
|  5|       2|
+---+--------+



In [7]:
g.outDegrees.filter("outDegree >= 3").sort("outDegree", ascending=False).show()

+---+---------+
| id|outDegree|
+---+---------+
|  3|        3|
+---+---------+



In [8]:
# Finding motifs helps to execute queries to discover structural patterns in graphs
# Find the mutual friends for any pair of users a and c. 
# In order to be a mutual friend b, b must be a friend with both a and c (and not just followed by c, for example)
# https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5
#() --> Vertices, [] --> Edges

mutualFriends = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)").dropDuplicates()

In [9]:
mutualFriends.filter('a.id == 2 and c.id == 3').show()

+--------------------+--------------------+--------------------+
|                   a|                   b|                   c|
+--------------------+--------------------+--------------------+
|[2, May, Derrick,...|[1, Carter, Derri...|[3, Mills, Jeff, 80]|
+--------------------+--------------------+--------------------+



In [10]:
# Triangle count
g.triangleCount().show()

+-----+---+------+---------+---+
|count| id|  name|firstname|age|
+-----+---+------+---------+---+
|    2|  3| Mills|     Jeff| 80|
|    0| 98|  Berg|      Tim| 28|
|    0| 99|  Page|    Allan| 16|
|    1|  5| Banks|     Mike| 93|
|    1|  1|Carter|  Derrick| 50|
|    1|  4|  Hood|   Robert| 65|
|    1|  2|   May|  Derrick| 26|
+-----+---+------+---------+---+



In [11]:
pr = g.pageRank(resetProbability=0.15, tol=0.01)

## look at the pagerank score for every vertex
pr.vertices.show()

## look at the weight of every edge
pr.edges.show()

+---+------+---------+---+------------------+
| id|  name|firstname|age|          pagerank|
+---+------+---------+---+------------------+
|  1|Carter|  Derrick| 50|0.9055074972891308|
|  3| Mills|     Jeff| 80| 1.853919642738813|
|  2|   May|  Derrick| 26|0.5377967999474921|
|  4|  Hood|   Robert| 65|0.6873519241384106|
| 98|  Berg|      Tim| 28|1.0225331112091938|
|  5| Banks|     Mike| 93|0.9703579134677663|
| 99|  Page|    Allan| 16|1.0225331112091938|
+---+------+---------+---+------------------+

+---+---+-------+------------------+
|src|dst|   type|            weight|
+---+---+-------+------------------+
|  1|  2| friend|               0.5|
| 99| 98| friend|               1.0|
|  1|  3| friend|               0.5|
|  4|  5|follows|               0.5|
|  5|  3| friend|               1.0|
| 98| 99| friend|               1.0|
|  3|  5| friend|0.3333333333333333|
|  4|  3| friend|               0.5|
|  2|  1| friend|               0.5|
|  3|  4| friend|0.3333333333333333|
|  3|  1| fr

In [12]:
# Create DataFrames manually for testing purposes
v = spark.createDataFrame([
    ("A", "ARON"  ,350 ),
    ("B", "BILL"  ,360 ),
    ("C", "CLAIR" ,195 ),
    ("D", "DANIEL",90),
    ("E", "ERIC"  ,90),
    ("F", "FRANK" ,215 ),
    ("G", "GRAHAM",30 ),
    ("H", "HENRY" ,25 ),
    ("I", "INNA"  ,25 ),
    ("J", "JEN"   ,20 )
], ["id", "name", "total_seconds"])

e = spark.createDataFrame([
    ("A", "B", 60),
    ("B", "A", 50),
    ("A", "C", 50),
    ("C", "A", 100),
    ("A", "D", 90),
    ("C", "I", 25),
    ("C", "J", 20),
    ("B", "F", 50),
    ("F", "B", 110),
    ("F", "G", 30),
    ("F", "H", 25),
    ("B", "E", 90)
],["src","dst","relationship"])

# Now lets construct the graph
g1 = GraphFrame(v, e)

In [13]:
verticesDF = g1.vertices 
edgesDF = g1.edges
verticesDF.show()
edgesDF.show()

+---+------+-------------+
| id|  name|total_seconds|
+---+------+-------------+
|  A|  ARON|          350|
|  B|  BILL|          360|
|  C| CLAIR|          195|
|  D|DANIEL|           90|
|  E|  ERIC|           90|
|  F| FRANK|          215|
|  G|GRAHAM|           30|
|  H| HENRY|           25|
|  I|  INNA|           25|
|  J|   JEN|           20|
+---+------+-------------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  A|  B|          60|
|  B|  A|          50|
|  A|  C|          50|
|  C|  A|         100|
|  A|  D|          90|
|  C|  I|          25|
|  C|  J|          20|
|  B|  F|          50|
|  F|  B|         110|
|  F|  G|          30|
|  F|  H|          25|
|  B|  E|          90|
+---+---+------------+



In [14]:
inDegreeDF = g1.inDegrees
outDegreeDF = g1.outDegrees
degreeDF = g1.degrees

inDegreeDF.sort(['inDegree'],ascending=[0]).show()
outDegreeDF.sort(['outDegree'],ascending=[0]).show()
degreeDF.show()

+---+--------+
| id|inDegree|
+---+--------+
|  B|       2|
|  A|       2|
|  E|       1|
|  F|       1|
|  C|       1|
|  D|       1|
|  J|       1|
|  G|       1|
|  H|       1|
|  I|       1|
+---+--------+

+---+---------+
| id|outDegree|
+---+---------+
|  B|        3|
|  F|        3|
|  C|        3|
|  A|        3|
+---+---------+

+---+------+
| id|degree|
+---+------+
|  F|     4|
|  E|     1|
|  B|     5|
|  D|     1|
|  C|     4|
|  J|     1|
|  A|     5|
|  G|     1|
|  I|     1|
|  H|     1|
+---+------+



In [16]:
# This one uses the org.apache.spark.graphx.Pregel interface and runs PageRank until convergence. This can be run by setting tol
PageRankResults = g1.pageRank(resetProbability=0.15, tol=0.01)
PageRankResults.vertices.sort(['pagerank'],ascending=[0]).show()
PageRankResults.edges.show()

+---+------+-------------+------------------+
| id|  name|total_seconds|          pagerank|
+---+------+-------------+------------------+
|  B|  BILL|          360|1.2457034959159363|
|  A|  ARON|          350|1.2457034959159363|
|  F| FRANK|          215|0.9751701987230708|
|  C| CLAIR|          195|0.9751701987230708|
|  D|DANIEL|           90|0.9751701987230708|
|  E|  ERIC|           90|0.9751701987230708|
|  H| HENRY|           25|0.9019780533189615|
|  I|  INNA|           25|0.9019780533189615|
|  G|GRAHAM|           30|0.9019780533189615|
|  J|   JEN|           20|0.9019780533189615|
+---+------+-------------+------------------+

+---+---+------------+------------------+
|src|dst|relationship|            weight|
+---+---+------------+------------------+
|  C|  J|          20|0.3333333333333333|
|  A|  B|          60|0.3333333333333333|
|  B|  F|          50|0.3333333333333333|
|  C|  I|          25|0.3333333333333333|
|  F|  H|          25|0.3333333333333333|
|  C|  A|         1

In [17]:
#The algorithm generates a DF of nodes with a new column added as "label" which contains the community labels
result = g1.labelPropagation(maxIter=5)
result.sort(['label'], ascending=[0]).show()

+---+------+-------------+------------+
| id|  name|total_seconds|       label|
+---+------+-------------+------------+
|  C| CLAIR|          195|910533066752|
|  D|DANIEL|           90|910533066752|
|  I|  INNA|           25|420906795008|
|  F| FRANK|          215|420906795008|
|  J|   JEN|           20|420906795008|
|  E|  ERIC|           90|420906795008|
|  A|  ARON|          350|420906795008|
|  H| HENRY|           25|171798691840|
|  G|GRAHAM|           30|171798691840|
|  B|  BILL|          360|171798691840|
+---+------+-------------+------------+

