# Working with Graphs in Spark


In this lab, you will learn some of the functionality of Spark GraphFrames. GraphFrames is the next-generation library for working with graphs on Spark. 

In [1]:
sc

In [2]:
spark

To work with GraphFrames in Python, you need to import the `graphframes` library. **Note, this library is not installed by default with Spark on EMR. The post-startup-script you ran today downloaded it and made it accessible to the Spark engine.**

In [3]:
from graphframes import *

You will be using data from the Bay Area Bike Share Portal (similar service to Capital Bikeshare in DC.) 

In the following two cells, read in two csv files located in s3:
* `s3://bigdatateaching/bike-data/station_data.csv`
* `s3://bigdatateaching/bike-data/trip_data.csv`

The station file contains the metadata of the bicycile stations, and the trip data contains all the bike trips.

In [4]:
bike_stations = spark.read.option("header", "true")\
  .csv("s3://bigdatateaching/bike-data/station_data.csv")

In [5]:
trip_data = spark.read.option("header", "true")\
   .csv("s3://bigdatateaching/bike-data/trip_data.csv")

Explore the datasets:

In [6]:
bike_stations.show(10)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|San Jose|    8/5/2013|
|         9|           Japantown|37.348742|-121.894715|       15|San Jose|    8/5/2013|
|        10|  San Jose City Hall

In [7]:
trip_data.show(10)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

You will now modify the two DataFrames read in above to create a vertix list and an edge list.

In the next cell, use the station data and rename the "name" column to "id" and get distinct records:

In [8]:
station_vertices = bike_stations.withColumnRenamed("name", "id").distinct()

In the next cell, use the trip data and rename the "Start Station" column to "src" and the "End Station" columnt to "dst".

In [9]:
trip_edges = trip_data\
   .withColumnRenamed("Start Station", "src")\
   .withColumnRenamed("End Station", "dst")

In the next cell, you will create a GraphFrame passing in a vertex list and an edge list. Which is which from your original datasets?

In [10]:
station_graph = GraphFrame(station_vertices, trip_edges)

Since you will be using the GraphFrame more than once, it is best to cache it.

In [11]:
station_graph.cache()

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])

### Graph metadata

Count the number of vertices in the graph:

In [12]:
station_graph.vertices.count()

70

Count the number of edges in the graph:

In [13]:
station_graph.edges.count()

354152

### Querying the Graph

The most basic way of interacting with the graph is querying it. Since the GraphFrame is based on DataFrames, you can perform the same type of operations you would on a DataFrame.

In the next cell, show the top 10 source and destination combinations, ordered in descending order by count:

In [None]:
from pyspark.sql.functions import desc
station_graph.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(10)

In the next cell, show the top 10 source and destination combinations **where the source or destination station is 'Townsend at 7th'**, ordered in descending order by count:

In [None]:
station_graph.edges.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")\
   .groupBy("src", "dst").count()\
   .orderBy(desc("count"))\
   .show(10)

### Subsetting a Graph

Sometimes you need to work with a subset of a graph. The easiest way to create a subset is create a new graph with the vertices and edges of your your subset. 

In the next cell, subset the edges where the source or destination station is 'Townsend at 7th', and create a new graph called sg1 using the original vertices and the new edge list:

In [45]:
townsend_and_7th_edges = station_graph.edges\
    .where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")
sg1 = GraphFrame(station_vertices, townsend_and_7th_edges)

In [46]:
sg1.edges.count()

28999

In [47]:
sg1.vertices.count()

70

### Motifs

*Motifs* are ways of expressing structural patterns in a graph. The following cell creates a triangular pattern.

In [None]:
motifs = station_graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")

The following cell takes the motifs 

In [None]:
from pyspark.sql.functions import expr
motifs.selectExpr("*",
    "to_timestamp(ab.`Start Date`, 'MM/dd/yyyy HH:mm') as abStart",
    "to_timestamp(bc.`Start Date`, 'MM/dd/yyyy HH:mm') as bcStart",
    "to_timestamp(ca.`Start Date`, 'MM/dd/yyyy HH:mm') as caStart")\
  .where("ca.`Bike #` = bc.`Bike #`").where("ab.`Bike #` = bc.`Bike #`")\
  .where("a.id != b.id").where("b.id != c.id")\
  .where("abStart < bcStart").where("bcStart < caStart")\
  .orderBy(expr("cast(caStart as long) - cast(abStart as long)"))\
  .selectExpr("a.id", "b.id", "c.id", "ab.`Start Date`", "ca.`End Date`")\
.limit(1).show(1, False)

### Graph Algorithms

### PageRank

In [49]:
ranks = station_graph.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(10)

+--------------------+------------------+
|                  id|          pagerank|
+--------------------+------------------+
|San Jose Diridon ...| 4.051504835990017|
|San Francisco Cal...|3.3511832964286947|
|Mountain View Cal...|2.5143907710155426|
|Redwood City Calt...|2.3263087713711736|
|San Francisco Cal...| 2.231144291369882|
|Harry Bridges Pla...|1.8251120118882465|
|     2nd at Townsend|1.5821217785038681|
|Santa Clara at Al...|1.5730074084907577|
|     Townsend at 7th|1.5684565805340536|
|Embarcadero at Sa...|1.5414242087748584|
+--------------------+------------------+
only showing top 10 rows



### In-Degree and Out-Degree Metrics

In [48]:
inDeg = station_graph.inDegrees
inDeg.orderBy(desc("inDegree")).show(5, False)

+----------------------------------------+--------+
|id                                      |inDegree|
+----------------------------------------+--------+
|San Francisco Caltrain (Townsend at 4th)|34810   |
|San Francisco Caltrain 2 (330 Townsend) |22523   |
|Harry Bridges Plaza (Ferry Building)    |17810   |
|2nd at Townsend                         |15463   |
|Townsend at 7th                         |15422   |
+----------------------------------------+--------+
only showing top 5 rows



In [51]:
outDeg = station_graph.outDegrees
outDeg.orderBy(desc("outDegree")).show(5, False)

+---------------------------------------------+---------+
|id                                           |outDegree|
+---------------------------------------------+---------+
|San Francisco Caltrain (Townsend at 4th)     |26304    |
|San Francisco Caltrain 2 (330 Townsend)      |21758    |
|Harry Bridges Plaza (Ferry Building)         |17255    |
|Temporary Transbay Terminal (Howard at Beale)|14436    |
|Embarcadero at Sansome                       |14158    |
+---------------------------------------------+---------+
only showing top 5 rows



In [52]:
degreeRatio = inDeg.join(outDeg, "id")\
  .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).show(10, False)
degreeRatio.orderBy("degreeRatio").show(10, False)

+----------------------------------------+------------------+
|id                                      |degreeRatio       |
+----------------------------------------+------------------+
|Redwood City Medical Center             |1.5333333333333334|
|San Mateo County Center                 |1.4724409448818898|
|SJSU 4th at San Carlos                  |1.3621052631578947|
|San Francisco Caltrain (Townsend at 4th)|1.3233728710462287|
|Washington at Kearny                    |1.3086466165413533|
|Paseo de San Antonio                    |1.2535046728971964|
|California Ave Caltrain Station         |1.24              |
|Franklin at Maple                       |1.2345679012345678|
|Embarcadero at Vallejo                  |1.2201707365495336|
|Market at Sansome                       |1.2173913043478262|
+----------------------------------------+------------------+
only showing top 10 rows

+-------------------------------+------------------+
|id                             |degreeRatio       |
