In [11]:
from graphframes import *
from pyspark import *
import pandas as pd
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import *
from functools import reduce
from pyspark.sql.functions import col, lit, when


In [12]:
#to inite new spark session instance
spark = SparkSession.builder.appName("graphProcessing").config("spark.master", "local").getOrCreate()

In [13]:
#Part(1)(1): To import the dataset as a csv file and create data frames directly on import 

station_data_RDD = spark.read.csv("input/201508_station_data.csv", header=True, inferSchema=True)
trip_data_RDD = spark.read.csv("input/201508_trip_data.csv", header=True, inferSchema=True)


#To Print RDDs Schema:
station_data_RDD.printSchema()
trip_data_RDD.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [14]:
#To show first 5 records of each RDD
station_data_RDD.show(5)


+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows



In [15]:
#To show first 5 records of each RDD
trip_data_RDD.show(5)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

In [16]:
#Part(1)(3): Remove duplicates
# Remove duplicate entries from stations RDD
station_data_RDD_NoDUP = station_data_RDD.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(station_data_RDD.count(), station_data_RDD_NoDUP.count()))


# Remove duplicate entries from trips RDD
trip_data_RDD_NoDUP = trip_data_RDD.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(trip_data_RDD.count(), trip_data_RDD_NoDUP.count()))


There were 70 rows before removing duplicates, and 70 rows after removing duplicates
There were 51927 rows before removing duplicates, and 51927 rows after removing duplicates


In [17]:
#Part(1)(4):Name Columns
station_data_RDD = station_data_RDD.withColumnRenamed("station_id", "id")
trip_data_RDD = trip_data_RDD.withColumnRenamed("Start Terminal", "src")
trip_data_RDD = trip_data_RDD.withColumnRenamed("End Terminal", "dst")
trip_data_RDD = trip_data_RDD.withColumnRenamed("Trip ID", "trip")


In [18]:
#Part(1)(5): Output Data Frame
station_data_RDD.show(5)
trip_data_RDD.show(5)

+---+--------------------+---------+-----------+---------+--------+------------+
| id|                name|      lat|       long|dockcount|landmark|installation|
+---+--------------------+---------+-----------+---------+--------+------------+
|  2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|  3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|  4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|  5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|  6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
+---+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

+------+--------+---------------+--------------------+---+---------------+--------------------+---+------+---------------+--------+
|  trip|Duration|     Start Date|       Start Station|src|       End Date|         End Station|dst|Bike #|Subscrib

In [19]:
#Part(1)(6):Create vertices
#To create temporary view of RDDs
station_data_RDD.createOrReplaceTempView("stations")
trip_data_RDD.createOrReplaceTempView("trips")
vertices = spark.sql("SELECT id, name FROM stations")
edges = spark.sql("SELECT src, dst, trip FROM trips")
vertices.show(5)
edges.show(5)

+---+--------------------+
| id|                name|
+---+--------------------+
|  2|San Jose Diridon ...|
|  3|San Jose Civic Ce...|
|  4|Santa Clara at Al...|
|  5|    Adobe on Almaden|
|  6|    San Pedro Square|
+---+--------------------+
only showing top 5 rows

+---+---+------+
|src|dst|  trip|
+---+---+------+
| 50| 70|913460|
| 31| 27|913459|
| 47| 64|913455|
| 10|  8|913454|
| 51| 60|913453|
+---+---+------+
only showing top 5 rows



In [24]:

graph = GraphFrame(vertices, edges)

Py4JJavaError: An error occurred while calling o196.createGraph.
: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
	at org.graphframes.GraphFrame$.apply(GraphFrame.scala:609)
	at org.graphframes.GraphFramePythonAPI.createGraph(GraphFramePythonAPI.scala:10)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
#Part(1)(7):Show some vertices


In [None]:
#Part(1)(8):Show some edges


In [None]:
#Part(1)(9):Vertex in-Degree


In [None]:
#Part(1)(10):Vertex out-Degree


In [None]:
#Part(1)(11):Apply the motif findings.
