# Integrating GraphFrames package

In [1]:
from pyspark import StorageLevel
from pyspark.sql import functions as F, SQLContext, SparkSession, Window
from pyspark.sql.types import *
from random import randint
import time
import datetime

spark = (SparkSession.builder
         .appName("graphframes")
         .master("spark://spark-master:7077")
         .config("spark.sql.legacy.allowUntypedScalaUDF", True)
         .config("spark.jars.packages", "graphframes:graphframes:0.8.1-spark3.0-s_2.12")
         .enableHiveSupport()
         .getOrCreate()
         )

In [2]:
import sys
sys.path

['/opt/workspace',
 '/tmp/spark-9dc29354-102a-4c2c-8246-be3366f9f568/userFiles-ea629c38-9f63-4d21-bcd0-67cbd4b52561/org.slf4j_slf4j-api-1.7.16.jar',
 '/tmp/spark-9dc29354-102a-4c2c-8246-be3366f9f568/userFiles-ea629c38-9f63-4d21-bcd0-67cbd4b52561/graphframes_graphframes-0.8.1-spark3.0-s_2.12.jar',
 '/tmp/spark-9dc29354-102a-4c2c-8246-be3366f9f568/userFiles-ea629c38-9f63-4d21-bcd0-67cbd4b52561',
 '/usr/lib/python37.zip',
 '/usr/lib/python3.7',
 '/usr/lib/python3.7/lib-dynload',
 '',
 '/usr/local/lib/python3.7/dist-packages',
 '/usr/lib/python3/dist-packages',
 '/usr/local/lib/python3.7/dist-packages/IPython/extensions',
 '/root/.ipython']

In [3]:
import graphframes

dir(graphframes.graphframe)

['Column',
 'DataFrame',
 'GraphFrame',
 'Pregel',
 'SQLContext',
 'SparkContext',
 'StorageLevel',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 '_from_java_gf',
 '_java_api',
 '_test',
 'basestring',
 'sys']

In [4]:
from pyspark.sql.functions import *

# Create a Vertex DataFrame with unique ID column "id"
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])

In [5]:
from graphframes import *
g = GraphFrame(v, e)

In [6]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+



In [7]:
g.edges.filter("relationship = 'follow'").count()

2

In [None]:
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()

## Bike Rides

In [50]:
bikeStations = spark.read.option("header",True).csv("data/station.csv")
bikeStations.printSchema()
tripData = spark.read.option("header",True).csv("data/trip.csv")
tripData.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dock_count: string (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Prepare Vertices

In [60]:
stationVertices = bikeStations.distinct()
stationVertices.show(truncate=False)

+---+---------------------------------+------------------+-------------------+----------+-------------+-----------------+
|id |name                             |lat               |long               |dock_count|city         |installation_date|
+---+---------------------------------+------------------+-------------------+----------+-------------+-----------------+
|67 |Market at 10th                   |37.776619000000004|-122.41738500000001|27        |San Francisco|8/23/2013        |
|10 |San Jose City Hall               |37.337391         |-121.886995        |15        |San Jose     |8/6/2013         |
|11 |MLK Library                      |37.335885         |-121.88566000000002|19        |San Jose     |8/6/2013         |
|34 |Palo Alto Caltrain Station       |37.443988         |-122.164759        |23        |Palo Alto    |8/14/2013        |
|42 |Davis at Jackson                 |37.79728          |-122.398436        |15        |San Francisco|8/19/2013        |
|32 |Castro Street and E

### Prepare Edges

In [None]:
tripEdges = tripData\
    .withColumnRenamed("start_station_id", "src")\
    .withColumnRenamed("end_station_id", "dst")\
    .select('src', 'dst', 'duration', 'bike_id')
tripEdges.show(truncate=False)

### Initialize the GraphFrame

In [62]:
stationGraph = GraphFrame(stationVertices, tripEdges)

### Simple Graph computations

In [59]:
print("Total Number of Stations: " + str(stationGraph.vertices.count()))
print("Total Number of Trips in Graph: " + str(stationGraph.edges.count()))
print("Total Number of Trips in Original Data: " + str(tripData.count()))

Total Number of Stations: 70
Total Number of Trips in Graph: 669959
Total Number of Trips in Original Data: 669959


### Most popular trips

In [54]:
topTrips = stationGraph\
    .edges\
    .groupBy("src", "dst")\
    .count()\
    .orderBy(desc("count"))\
    .limit(10)

topTrips.show(truncate=False)

+----------------------------------------+----------------------------------------+-----+
|src                                     |dst                                     |count|
+----------------------------------------+----------------------------------------+-----+
|San Francisco Caltrain 2 (330 Townsend) |Townsend at 7th                         |6216 |
|Harry Bridges Plaza (Ferry Building)    |Embarcadero at Sansome                  |6164 |
|Townsend at 7th                         |San Francisco Caltrain (Townsend at 4th)|5041 |
|2nd at Townsend                         |Harry Bridges Plaza (Ferry Building)    |4839 |
|Harry Bridges Plaza (Ferry Building)    |2nd at Townsend                         |4357 |
|Embarcadero at Sansome                  |Steuart at Market                       |4269 |
|Embarcadero at Folsom                   |San Francisco Caltrain (Townsend at 4th)|3967 |
|Steuart at Market                       |2nd at Townsend                         |3903 |
|2nd at So

### In Degree

In [16]:
inDeg = stationGraph.inDegrees
inDeg\
    .orderBy(desc("inDegree"))\
    .limit(5)\
    .show(truncate=False)

+----------------------------------------+--------+
|id                                      |inDegree|
+----------------------------------------+--------+
|San Francisco Caltrain (Townsend at 4th)|63179   |
|San Francisco Caltrain 2 (330 Townsend) |35117   |
|Harry Bridges Plaza (Ferry Building)    |33193   |
|Embarcadero at Sansome                  |30796   |
|2nd at Townsend                         |28529   |
+----------------------------------------+--------+



### Out Degree

In [17]:
outDeg = stationGraph.outDegrees
outDeg\
    .orderBy(desc("outDegree"))\
    .limit(5)\
    .show(truncate=False)

+---------------------------------------------+---------+
|id                                           |outDegree|
+---------------------------------------------+---------+
|San Francisco Caltrain (Townsend at 4th)     |49092    |
|San Francisco Caltrain 2 (330 Townsend)      |33742    |
|Harry Bridges Plaza (Ferry Building)         |32934    |
|Embarcadero at Sansome                       |27713    |
|Temporary Transbay Terminal (Howard at Beale)|26089    |
+---------------------------------------------+---------+



### Degree Ratio

In [18]:
degreeRatio = inDeg\
    .join(outDeg, inDeg["id"] == outDeg["id"])\
    .drop(outDeg["id"])\
    .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")

degreeRatio.cache()

DataFrame[id: string, degreeRatio: double]

#### Descending

In [19]:
degreeRatio\
    .orderBy(desc("degreeRatio"))\
    .limit(10)\
    .show(truncate=False)

+----------------------------------------+------------------+
|id                                      |degreeRatio       |
+----------------------------------------+------------------+
|Redwood City Medical Center             |1.4533762057877813|
|Redwood City Public Library             |1.300469483568075 |
|San Francisco Caltrain (Townsend at 4th)|1.286951030717836 |
|Washington at Kearny                    |1.2723671947809878|
|MLK Library                             |1.233038348082596 |
|SJSU 4th at San Carlos                  |1.2282051282051283|
|San Mateo County Center                 |1.2195121951219512|
|Broadway at Main                        |1.208955223880597 |
|University and Emerson                  |1.2056878306878307|
|Washington at Kearney                   |1.203804347826087 |
+----------------------------------------+------------------+



#### Ascending

In [20]:
degreeRatio\
    .orderBy(asc("degreeRatio"))\
    .limit(10)\
    .show(truncate=False)

+-------------------------------+------------------+
|id                             |degreeRatio       |
+-------------------------------+------------------+
|Grant Avenue at Columbus Avenue|0.564700110388814 |
|2nd at Folsom                  |0.6056461731493099|
|Powell at Post (Union Square)  |0.6887003841229193|
|San Jose City Hall             |0.6928541579607188|
|San Francisco City Hall        |0.7497243660418964|
|Beale at Market                |0.774906104780699 |
|Redwood City Caltrain Station  |0.8075933075933076|
|Golden Gate at Polk            |0.8153091800599291|
|Evelyn Park and Ride           |0.8218356328734253|
|Ryland Park                    |0.8248425872925015|
+-------------------------------+------------------+



### Shortest Paths

In [65]:
stationGraph.shortestPaths(['10']).show(200)

+---+--------------------+------------------+-------------------+----------+-------------+-----------------+---------+
| id|                name|               lat|               long|dock_count|         city|installation_date|distances|
+---+--------------------+------------------+-------------------+----------+-------------+-----------------+---------+
| 25|Stanford in Redwo...|          37.48537|-122.20328799999999|        15| Redwood City|        8/12/2013|[10 -> 3]|
| 51|Embarcadero at Fo...|37.791464000000005|        -122.391034|        19|San Francisco|        8/20/2013|[10 -> 2]|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|     San Jose|         8/7/2013|[10 -> 1]|
| 54|Embarcadero at Br...|         37.787152|-122.38801299999999|        15|San Francisco|        8/20/2013|[10 -> 2]|
| 82|Broadway St at Ba...|         37.798541|-122.40086200000002|        15|San Francisco|        1/22/2014|[10 -> 2]|
| 11|         MLK Library|         37.335885|-12

## Explore the dataset

In [68]:
tripEdges.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|               src|               dst|          duration|           bike_id|
+-------+------------------+------------------+------------------+------------------+
|  count|            669959|            669959|            669959|            669959|
|   mean| 57.85187601032302|57.837437813358726|1107.9498461846172|427.58761954089726|
| stddev|17.112473968397165|17.200141653222516|22255.437020085345|153.45098794073212|
|    min|                10|                10|               100|                10|
|    max|                 9|                 9|             99993|                99|
+-------+------------------+------------------+------------------+------------------+



## Recommend free route

1. Calculate average durations src->dst
2. Filter out durations > 30min
3. Map BFS output to the first occurance of the duration
4. Filter the min sum of durations

In [86]:
from pyspark.sql.functions import col, mean

w = Window.partitionBy([col("src"), col("dst")])
uniqueEdges = tripEdges.withColumn("mean_duration", mean('duration').over(w)).select('mean_duration', 'src', 'dst').dropDuplicates()

In [87]:
uniqueEdges.show()
uniqueEdges.cache()

+------------------+---+---+
|     mean_duration|src|dst|
+------------------+---+---+
| 749.3030303030303|  5| 10|
|2443.9397590361446| 50| 46|
|340.36209335219235| 55| 41|
|1012.9285714285714| 58| 62|
|1558.3411764705882| 65| 73|
| 823.8023076923076| 69| 77|
| 803.2891832229581| 71| 77|
| 692.3253588516747| 72| 66|
| 901.3134328358209| 73| 68|
|3666.3333333333335| 28| 37|
|            3006.0| 37| 24|
|1189.9722222222222| 56| 74|
|440.93218720152817| 68| 70|
| 4867.768518518518| 70| 69|
|1618.0833333333333| 76| 73|
| 610.9076923076923| 10|  5|
|1047.0588235294117| 11| 14|
|1651.1653543307086| 51| 56|
| 1550.764705882353| 54| 58|
| 829.9788732394367| 55| 73|
+------------------+---+---+
only showing top 20 rows



DataFrame[mean_duration: double, src: string, dst: string]

In [88]:
graph = GraphFrame(stationVertices, uniqueEdges)

In [89]:
bfs = graph.bfs(fromExpr='id=10', toExpr='id=66')

In [90]:
bfs = bfs.cache()

In [120]:
ecols = [col + ".mean_duration" for col in bfs.columns if col.startswith("e")]
ecols

['e0.mean_duration', 'e1.mean_duration', 'e2.mean_duration']

In [123]:
from functools import reduce
from operator import add
bfs.withColumn('total',reduce(add, [F.col(x) for x in ecols])).orderBy('total').show()

+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+
|                from|                  e0|                  v1|               e1|                  v2|                  e2|                  to|             total|
+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+
|[10, San Jose Cit...|[470.833333333333...|[7, Paseo de San ...|  [7798.0, 7, 76]|[76, Market at 4t...|[1063.47669172932...|[66, South Van Ne...| 9332.310025062658|
|[10, San Jose Cit...|[679.779661016949...|[3, San Jose Civi...| [12088.0, 3, 72]|[72, Civic Center...|[692.325358851674...|[66, South Van Ne...|13460.105019868624|
|[10, San Jose Cit...|[904.014084507042...|[11, MLK Library,...|[29942.0, 11, 76]|[76, Market at 4t...|[1063.47669172932...|[66, South Van Ne...|31909.490776236365|
|[10, San 

In [48]:
spark.stop()