# Working with Graphs in Spark


In this lab, you will learn some of the functionality of Spark GraphFrames. GraphFrames is the next-generation library for working with graphs on Spark. 

First of all, you need to import the `graphframes` library. However, this is not installed by default. The post-startup script downloaded the library (which is in a `jar` format), it extracts it, and makes it available for you yo use.

There are still a couple of things you need to do to get this to work in this setup.

In [1]:
# import findspark and os and let findspark find all the environment variables
import findspark
import os
findspark.init()

In [2]:
# Before you create the SparkSession, you need to add a new environment variable to tell pyspark where the graphframes
# library is
SUBMIT_ARGS = "--packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [3]:
# Since you added some new environment variables, you want to make sure that the Spark configuration sees it
import pyspark
conf = pyspark.SparkConf()

In [4]:
# Create the SparkSession using the configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("graphx-lab").config(conf = conf).getOrCreate()


In [5]:
spark

In [6]:
# Import the graphframes library
from graphframes import *

You will be using data from the Bay Area Bike Share Portal (similar service to Capital Bikeshare in DC.) 

In the following two cells, read in two csv files located in s3:
* `s3://bigdatateaching/bike-data/station_data.csv`
* `s3://bigdatateaching/bike-data/trip_data.csv`

The station file contains the metadata of the bicycile stations, and the trip data contains all the bike trips.

Read in the two files:

In [7]:
bike_stations = spark.read.option("header", "true")\
  .csv("s3://bigdatateaching/bike-data/station_data.csv")

In [8]:
trip_data = spark.read.option("header", "true")\
   .csv("s3://bigdatateaching/bike-data/trip_data.csv")

Explore the datasets:

In [9]:
bike_stations.show(10)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|San Jose|    8/5/2013|
|         9|           Japantown|37.348742|-121.894715|       15|San Jose|    8/5/2013|
|        10|  San Jose City Hall

In [10]:
trip_data.show(10)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

You will now modify the two DataFrames read in above to create a vertix list and an edge list.

In the next cell, use the station data and rename the "name" column to "id" and get distinct records:

In [11]:
station_vertices = bike_stations.withColumnRenamed("name", "id").distinct()

In the next cell, use the trip data and rename the "Start Station" column to "src" and the "End Station" columnt to "dst".

In [12]:
trip_edges = trip_data\
   .withColumnRenamed("Start Station", "src")\
   .withColumnRenamed("End Station", "dst")

In the next cell, you will create a GraphFrame passing in a vertex list and an edge list. Which is which from your original datasets?

In [13]:
station_graph = GraphFrame(station_vertices, trip_edges)

Since you will be using the GraphFrame more than once, it is best to cache it.

In [14]:
station_graph.cache()

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])

### Graph metadata

Count the number of vertices in the graph:

In [15]:
station_graph.vertices.count()

70

Count the number of edges in the graph:

In [16]:
station_graph.edges.count()

354152

### Querying the Graph

The most basic way of interacting with the graph is querying it. Since the GraphFrame is based on DataFrames, you can perform the same type of operations you would on a DataFrame.

In the next cell, show the top 10 source and destination combinations, ordered in descending order by count:

In [17]:
from pyspark.sql.functions import desc
station_graph.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(10)

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|Harry Bridges Pla...|Embarcadero at Sa...| 3145|
|     2nd at Townsend|Harry Bridges Pla...| 2973|
|     Townsend at 7th|San Francisco Cal...| 2734|
|Harry Bridges Pla...|     2nd at Townsend| 2640|
|Embarcadero at Fo...|San Francisco Cal...| 2439|
|   Steuart at Market|     2nd at Townsend| 2356|
|Embarcadero at Sa...|   Steuart at Market| 2330|
|     Townsend at 7th|San Francisco Cal...| 2192|
|Temporary Transba...|San Francisco Cal...| 2184|
+--------------------+--------------------+-----+
only showing top 10 rows



In the next cell, show the top 10 source and destination combinations **where the source or destination station is 'Townsend at 7th'**, ordered in descending order by count:

In [18]:
station_graph.edges.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")\
   .groupBy("src", "dst").count()\
   .orderBy(desc("count"))\
   .show(10)

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|     Townsend at 7th|San Francisco Cal...| 2734|
|     Townsend at 7th|San Francisco Cal...| 2192|
|     Townsend at 7th|Civic Center BART...| 1844|
|Civic Center BART...|     Townsend at 7th| 1765|
|San Francisco Cal...|     Townsend at 7th| 1198|
|Temporary Transba...|     Townsend at 7th|  834|
|     Townsend at 7th|Harry Bridges Pla...|  827|
|   Steuart at Market|     Townsend at 7th|  746|
|     Townsend at 7th|Temporary Transba...|  740|
+--------------------+--------------------+-----+
only showing top 10 rows



### Subsetting a Graph

Sometimes you need to work with a subset of a graph. The easiest way to create a subset is create a new graph with the vertices and edges of your your subset. 

In the next cell, subset the edges where the source or destination station is 'Townsend at 7th', and create a new graph called sg1 using the original vertices and the new edge list:

In [19]:
townsend_and_7th_edges = station_graph.edges\
    .where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")
sg1 = GraphFrame(station_vertices, townsend_and_7th_edges)

In [20]:
sg1.edges.count()

28999

In [21]:
sg1.vertices.count()

70

### Motifs

*Motifs* are ways of expressing structural patterns in a graph. The following cell has a triangle motif:  (a) signifies the starting station, and [ab] represents an edge from (a) to our next station (b). We repeat this for stations (b) to (c) and then from (c) to (a):The following cell creates a triangular pattern.

In [22]:
motifs = station_graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")

The DataFrame we get from running this query contains nested fields for vertices a, b, and c, as well as the respective edges. We can now query this as we would a DataFrame. For example, given a certain bike, what is the shortest trip the bike has taken from station a, to station b, to station c, and back to station a? The following logic will parse our timestamps, into Spark timestamps and then we’ll do comparisons to make sure that it’s the same bike, traveling from station to station, and that the start times for each trip are correct.

In [23]:
from pyspark.sql.functions import expr
motifs.selectExpr("*",
    "to_timestamp(ab.`Start Date`, 'MM/dd/yyyy HH:mm') as abStart",
    "to_timestamp(bc.`Start Date`, 'MM/dd/yyyy HH:mm') as bcStart",
    "to_timestamp(ca.`Start Date`, 'MM/dd/yyyy HH:mm') as caStart")\
  .where("ca.`Bike #` = bc.`Bike #`").where("ab.`Bike #` = bc.`Bike #`")\
  .where("a.id != b.id").where("b.id != c.id")\
  .where("abStart < bcStart").where("bcStart < caStart")\
  .orderBy(expr("cast(caStart as long) - cast(abStart as long)"))\
  .selectExpr("a.id", "b.id", "c.id", "ab.`Start Date`", "ca.`End Date`")\
.limit(1).show(1, False)

+---------------------------------------+---------------+----------------------------------------+---------------+---------------+
|id                                     |id             |id                                      |Start Date     |End Date       |
+---------------------------------------+---------------+----------------------------------------+---------------+---------------+
|San Francisco Caltrain 2 (330 Townsend)|Townsend at 7th|San Francisco Caltrain (Townsend at 4th)|5/19/2015 16:09|5/19/2015 16:33|
+---------------------------------------+---------------+----------------------------------------+---------------+---------------+



### Graph Algorithms

A graph is just a logical representation of data. Graph theory provides numerous algorithms for analyzing data in this format, and GraphFrames allows us to leverage many algorithms out of the box. Development continues as new algorithms are added to GraphFrames, so this list will most likely continue to grow.

### PageRank

One of the most prolific graph algorithms is PageRank. Larry Page, cofounder of Google, created PageRank as a research project for how to rank web pages. Unfortu‐ nately, a complete explanation of how PageRank works is outside the scope of this book. However, to quote Wikipedia, the high-level explanation is as follows:

```
PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.
```

PageRank generalizes quite well outside of the web domain. We can apply this right to our own data and get a sense for important bike stations (specifically, those that receive a lot of bike traffic). In this example, important bike stations will be assigned large PageRank values:

In the next cells, you will run the PageRank algorith on the stations graph dataset. 
* Run the pagerank algorithm with a reset probability of 0.15, and 10 maximum iterations
* Show the top 10 vertices based on pageranks

In [24]:
ranks = station_graph.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(10)

+--------------------+------------------+
|                  id|          pagerank|
+--------------------+------------------+
|San Jose Diridon ...| 4.051504835990018|
|San Francisco Cal...|3.3511832964286956|
|Mountain View Cal...| 2.514390771015543|
|Redwood City Calt...|2.3263087713711705|
|San Francisco Cal...|2.2311442913698825|
|Harry Bridges Pla...| 1.825112011888247|
|     2nd at Townsend|1.5821217785038686|
|Santa Clara at Al...| 1.573007408490758|
|     Townsend at 7th|1.5684565805340542|
|Embarcadero at Sa...|1.5414242087748584|
+--------------------+------------------+
only showing top 10 rows

