In [2]:
import shutil
import os
import pyspark
from pyspark.sql.functions import *

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

METASTORE_DB = '/Users/pran8670/Documents/workspace/spark-techtalk/metastore_db'
if os.path.exists(METASTORE_DB):
    shutil.rmtree(METASTORE_DB)

In [19]:
# graphframes shenanigans

sc.stop()

jars = [
    '/Users/pran8670/Documents/workspace/spark-techtalk/jars/graphframes-0.5.0-spark2.1-s_2.11.jar',
    '/Users/pran8670/Documents/workspace/spark-techtalk/jars/scala-logging-slf4j_2.11-2.1.2.jar',
    '/Users/pran8670/Documents/workspace/spark-techtalk/jars/scala-logging-api_2.11-2.1.2.jar'
]

sc = pyspark.SparkContext(pyFiles = jars)

from graphframes import *
sqlContext = pyspark.sql.SQLContext(sc)

In [20]:
from datetime import date, datetime

def calculate_age(born):
    born = datetime.strptime(born, '%Y-%m-%d')
    today = date.today()
    return today.year - born.year - ((today.month, today.day) < (born.month, born.day))

In [21]:
# Create a GraphFrame

edgesRDD = sc.textFile('facebook_edges.txt')
edgesRDD = edgesRDD.map(lambda row: row.split(' '))
e = sqlContext.createDataFrame(data = edgesRDD, schema = ['src', 'dst'])

verticesRDD = sc.textFile('facebook_vertices.txt')
verticesRDD = verticesRDD.map(lambda row: row.split(','))
verticesRDD = verticesRDD.map(lambda row: row + [calculate_age(row[-1])]) # add age field
v = sqlContext.createDataFrame(data = verticesRDD, schema = ['id', 'name', 'dob', 'age'])

g = GraphFrame(v, e)

In [22]:
g.edges.show()

+---+---+
|src|dst|
+---+---+
|  0|  1|
|  0|  2|
|  0|  3|
|  0|  4|
|  0|  5|
|  0|  6|
|  0|  7|
|  0|  8|
|  0|  9|
|  0| 10|
|  0| 11|
|  0| 12|
|  0| 13|
|  0| 14|
|  0| 15|
|  0| 16|
|  0| 17|
|  0| 18|
|  0| 19|
|  0| 20|
+---+---+
only showing top 20 rows



In [23]:
g.vertices.show()

+----+-------------------+----------+---+
|  id|               name|       dob|age|
+----+-------------------+----------+---+
|2265|     Kelly Anderson|2001-07-26| 17|
|3603|    Clayton Simpson|1943-02-03| 75|
|3076|      Melissa Dixon|1945-07-03| 73|
|3665|    Donald Fletcher|1925-01-22| 93|
|3690| Alexandra Williams|1912-05-27|106|
| 145|     Heather Suarez|2012-07-09|  6|
|1867|      Adam Faulkner|1912-06-24|106|
| 813|        Joe Stevens|1953-09-17| 64|
|  38|       Aaron Thomas|1981-03-24| 37|
|1281|Mrs. Amanda Ramirez|1904-10-26|113|
|2232|      Jose Campbell|2002-12-29| 15|
|3492|       Matthew Cruz|1991-04-30| 27|
|3701|      Michael Brown|1962-12-09| 55|
|1135|        Mike Parker|1985-11-22| 32|
| 118|       Daniel Moore|1911-07-09|107|
|2797|      Sarah Andrade|1977-07-14| 41|
|3449|     Pamela Edwards|1947-10-08| 70|
|1496|       Brett Barnes|1917-06-15|101|
|3244|         John Jones|1965-08-12| 53|
|3568|       Steven Wolfe|1974-12-15| 43|
+----+-------------------+--------

In [24]:
# Degrees of a node - Number of friends of a person
v = g.vertices
d = g.degrees
v.join(d, d.id == v.id).show() # performing join operation on dataframe just like SQL for you to see details like name, age, dob

+----+-------------------+----------+---+----+------+
|  id|               name|       dob|age|  id|degree|
+----+-------------------+----------+---+----+------+
|1090|     Robert Padilla|1980-08-29| 38|1090|    13|
|1159|     Valerie Miller|1920-01-14| 98|1159|    61|
|1436|       Amanda Miles|2016-12-28|  1|1436|    30|
|1512|      Jared Collins|1902-11-20|115|1512|    34|
|1572|        Karen Reyes|1991-08-21| 27|1572|    43|
|2069|     Robert Rodgers|1991-07-26| 27|2069|   168|
|2088| Benjamin Blackburn|1906-02-04|112|2088|   198|
|2136|        Joseph Hall|1953-10-03| 64|2136|    74|
|2162|      Dawn Mckinney|2016-12-20|  1|2162|    15|
|2294|     Alison Stewart|1913-07-04|105|2294|   122|
|2904|  Alexandra Hampton|1979-01-31| 39|2904|    72|
| 296|         Troy Hayes|1910-10-13|107| 296|     7|
|3210|       Joseph Hayes|2014-11-18|  3|3210|    14|
|3414|       Barry Weaver|1932-11-02| 85|3414|    18|
|3606|       Allen Cannon|2015-04-22|  3|3606|     5|
|3959|Jacqueline Thompson|19

In [25]:
# Find friends at a distance of 2 hops known as motif finding
# In other words, a and c will be shown each other as "You may also know... or B can introduce a and c to each other"
g.find("(a)-[e]->(b); (b)-[e2]->(c)").show(10, False)

+------------------------------------------+----------+-------------------------------------+-----------+--------------------------------------+
|a                                         |e         |b                                    |e2         |c                                     |
+------------------------------------------+----------+-------------------------------------+-----------+--------------------------------------+
|[0, Catherine Pacheco, 1908-09-05, 109]   |[0, 107]  |[107, Randy Pena, 1972-06-17, 46]    |[107, 1090]|[1090, Robert Padilla, 1980-08-29, 38]|
|[58, Ashley Lucas, 1975-02-08, 43]        |[58, 107] |[107, Randy Pena, 1972-06-17, 46]    |[107, 1090]|[1090, Robert Padilla, 1980-08-29, 38]|
|[107, Randy Pena, 1972-06-17, 46]         |[107, 985]|[985, Gabriel Brown, 1965-08-11, 53] |[985, 1090]|[1090, Robert Padilla, 1980-08-29, 38]|
|[917, John Quinn, 2002-09-25, 15]         |[917, 944]|[944, Mark Robertson, 1941-09-09, 76]|[944, 1159]|[1159, Valerie Miller, 19

In [27]:
# Find a path (trail of people that connects 2 people) between 2 nodes - find a path from one person to the other
g.bfs("id = '0'", "id = '1090'").show(10, False)

+---------------------------------------+--------+---------------------------------+-----------+--------------------------------------+
|from                                   |e0      |v1                               |e1         |to                                    |
+---------------------------------------+--------+---------------------------------+-----------+--------------------------------------+
|[0, Catherine Pacheco, 1908-09-05, 109]|[0, 107]|[107, Randy Pena, 1972-06-17, 46]|[107, 1090]|[1090, Robert Padilla, 1980-08-29, 38]|
+---------------------------------------+--------+---------------------------------+-----------+--------------------------------------+



In [38]:
# Community (cluster) detection in social network 
# - based on how densely nodes are connected to each other in a certain region of graph
communities = g.labelPropagation(maxIter = 10)
communities = communities.select("id", "label")

In [39]:
# Each node and its corresponding community that it belongs to
communities.show()

+----+-------------+
|  id|        label|
+----+-------------+
|2772| 575525617676|
|1584| 292057776129|
|3914| 335007449099|
|3148| 575525617676|
|3391| 575525617676|
|1593| 386547056642|
|3221| 575525617676|
|1533| 395136991236|
|3485| 326417514505|
|1603| 292057776129|
|1244| 377957122048|
|2919|1529008357385|
|3331| 575525617676|
| 165|1503238553605|
|3263| 575525617676|
|1687| 386547056642|
|1335| 292057776129|
|2539| 601295421446|
|2542| 601295421446|
|2698|1529008357385|
+----+-------------+
only showing top 20 rows



In [40]:
# Total number of unique communities found by the algorithm
communities.select('label').distinct().count()

66

In [43]:
# Number of nodes present in each community sorted by descending order
communities.groupby('label').count().orderBy('count', ascending=False).show()

+-------------+-----+
|        label|count|
+-------------+-----+
| 292057776129|  507|
| 575525617676|  433|
|1047972020237|  295|
|1632087572497|  243|
| 601295421446|  237|
|1503238553605|  235|
|1529008357385|  226|
| 884763262999|  164|
| 335007449099|  153|
| 377957122048|  133|
|1537598291979|  113|
|1494648619015|   99|
| 429496729601|   99|
|1082331758597|   73|
| 721554505730|   72|
| 326417514505|   67|
| 506806140943|   55|
|1065151889409|   54|
|  17179869198|   51|
|  85899345937|   50|
+-------------+-----+
only showing top 20 rows

