# Configuration

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession 
spark = SparkSession.builder \
            .master("local[4]") \
            .appName("bikerental") \
            .getOrCreate() 

# Read and Preprocess Data

In [3]:
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import month
from pyspark.sql.functions import year
from pyspark.sql.functions import expr 

## Stations

In [4]:
stationDF = spark.read.csv("file:/home/spark/Develop/data/201508_station_data.csv", header=True)
stationDF.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [5]:
# convert station_id from string to long
stationDF = stationDF.withColumnRenamed("station_id", "station_id_old")
stationDF = stationDF.withColumn("station_id", col("station_id_old").cast("long"))

In [6]:
# convert dockcount from string to long
stationDF = stationDF.withColumnRenamed("dockcount", "dockcount_old")
stationDF = stationDF.withColumn("dockcount", col("dockcount_old").cast("long"))

In [7]:
# omit unused data
stationDF = stationDF.selectExpr("station_id", "name", "dockcount", "landmark")
stationDF.printSchema()

root
 |-- station_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dockcount: long (nullable = true)
 |-- landmark: string (nullable = true)



In [8]:
# register as table
stationDF.createOrReplaceTempView("station")

## Trips

In [10]:
tripDF = spark.read.csv("file:/home/spark/Develop/data/201508_trip_data.csv", header=True)
tripDF.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Bike #: string (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [11]:
# convert trip id from string to long
tripDF = tripDF.withColumnRenamed("Trip ID", "trip_id_old")
tripDF = tripDF.withColumn("trip_id", col("trip_id_old").cast("long"))

In [12]:
# convert start date from string to date
tripDF = tripDF.withColumnRenamed("Start Date", "start_date_old")
tripDF = tripDF.withColumn("start_date", to_timestamp(col("start_date_old"), "MM/dd/yyyy HH:mm"))

In [13]:
# add explicit day, month, and year columns for easier processing
tripDF = tripDF.withColumn("start_date_day", dayofmonth(col("start_date")))
tripDF = tripDF.withColumn("start_date_month", month(col("start_date")))
tripDF = tripDF.withColumn("start_date_year", year(col("start_date")))

In [14]:
# convert start terminal/station from string to long
tripDF = tripDF.withColumnRenamed("Start Terminal", "start_station_id_old")
tripDF = tripDF.withColumn("start_station_id", col("start_station_id_old").cast("long"))

In [15]:
# convert end date from string to date
tripDF = tripDF.withColumnRenamed("End Date", "end_date_old")
tripDF = tripDF.withColumn("end_date", to_timestamp(col("end_date_old"), "MM/dd/yyyy HH:mm"))

In [16]:
# convert end terminal/station from string to long
tripDF = tripDF.withColumnRenamed("End Terminal", "end_station_id_old")
tripDF = tripDF.withColumn("end_station_id", col("end_station_id_old").cast("long"))

In [17]:
# convert duration from string to long
tripDF = tripDF.withColumnRenamed("Duration", "duration_old")
tripDF = tripDF.withColumn("duration", col("duration_old").cast("long"))

In [18]:
# convert bike # from string to long
tripDF = tripDF.withColumnRenamed("Bike #", "bike_old")
tripDF = tripDF.withColumn("bike", col("bike_old").cast("long"))

In [19]:
# Omit unused data
tripDF = tripDF.selectExpr("trip_id", "duration", "start_date", "start_date_day", "start_date_month", "start_date_year", "start_station_id", "end_date", "end_station_id", "bike")
tripDF.printSchema()

root
 |-- trip_id: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_date_day: integer (nullable = true)
 |-- start_date_month: integer (nullable = true)
 |-- start_date_year: integer (nullable = true)
 |-- start_station_id: long (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_id: long (nullable = true)
 |-- bike: long (nullable = true)



In [20]:
# register as table
tripDF.createOrReplaceTempView("trip")

# Analysis

## Number of Bikes

In [21]:
tripDF.selectExpr("bike").distinct().count()

668

## Number of Rental Stations

In [22]:
stationDF.count()

70

In [23]:
stationDF.where("landmark = \"San Francisco\"").count()

35

## Station with the most Number of Docks

In [24]:
resultDF = spark.sql("SELECT station_id, dockcount "+\
                     "FROM station "+\
                     "ORDER BY dockcount DESC "+\
                     "LIMIT 1")
resultDF.collect()

[Row(station_id=2, dockcount=27)]

In [25]:
resultDF = spark.sql("SELECT station_id, dockcount "+\
                     "FROM station "+\
                     "WHERE landmark = \"San Francisco\" "+\
                     "ORDER BY dockcount DESC "+\
                     "LIMIT 1")
resultDF.collect()

[Row(station_id=61, dockcount=27)]

## Duration of Bike Trips

In [26]:
resultDF = spark.sql("SELECT avg(duration) as duration_avg FROM trip")
resultDF.collect()

[Row(duration_avg=1046.0326611172604)]

In [27]:
resultDF = spark.sql("SELECT min(duration) as duration_min FROM trip")
resultDF.collect()

[Row(duration_min=60)]

In [28]:
resultDF = spark.sql("SELECT max(duration) as duration_max FROM trip")
resultDF.collect()

[Row(duration_max=17270400)]

## Number of Bike Trips

In [29]:
# overall
tripDF.count()

354152

In [30]:
# per station
resultDF = spark.sql("SELECT start_station_id, count(trip_id) as nbr_trips "+\
                     "FROM trip "+\
                     "GROUP BY start_station_id "+\
                     "ORDER BY nbr_trips DESC")
resultDF.take(10)

[Row(start_station_id=70, nbr_trips=26304),
 Row(start_station_id=69, nbr_trips=21758),
 Row(start_station_id=50, nbr_trips=17255),
 Row(start_station_id=55, nbr_trips=14436),
 Row(start_station_id=60, nbr_trips=14158),
 Row(start_station_id=61, nbr_trips=14026),
 Row(start_station_id=65, nbr_trips=13752),
 Row(start_station_id=74, nbr_trips=13687),
 Row(start_station_id=67, nbr_trips=11885),
 Row(start_station_id=77, nbr_trips=11431)]

In [31]:
# per bike
resultDF = spark.sql("SELECT bike, count(trip_id) as nbr_trips "+\
                     "FROM trip "+\
                     "GROUP BY bike "+\
                     "ORDER BY nbr_trips DESC")
resultDF.take(10)

[Row(bike=878, nbr_trips=1121),
 Row(bike=392, nbr_trips=1102),
 Row(bike=489, nbr_trips=1101),
 Row(bike=463, nbr_trips=1085),
 Row(bike=532, nbr_trips=1074),
 Row(bike=558, nbr_trips=1071),
 Row(bike=306, nbr_trips=1060),
 Row(bike=29, nbr_trips=1057),
 Row(bike=66, nbr_trips=1053),
 Row(bike=589, nbr_trips=1052)]

## Roundtrips

In [32]:
# overall
resultDF = spark.sql("SELECT trip_id "+\
                     "FROM trip "+\
                     "WHERE start_station_id = end_station_id")
print(float(resultDF.count())/tripDF.count())

0.029015789830355326


In [33]:
# per station
tripsPerStationDF = spark.sql("SELECT count(trip_id) as nbr_trips, start_station_id "+\
                              "FROM trip "+\
                              "GROUP BY start_station_id")
tripsPerStationDF.createOrReplaceTempView("trips_per_station")

roundtripsPerStationDF = spark.sql("SELECT count(trip_id) as nbr_roundtrips, start_station_id "+\
                                   "FROM trip "+\
                                   "WHERE start_station_id = end_station_id "+\
                                   "GROUP BY start_station_id")
roundtripsPerStationDF.createOrReplaceTempView("roundtrips_per_station")

resultDF = spark.sql("SELECT trips_per_station.start_station_id, nbr_roundtrips / nbr_trips as roundtrip_percentage "+\
                     "FROM trips_per_station JOIN roundtrips_per_station ON trips_per_station.start_station_id = roundtrips_per_station.start_station_id "+\
                     "ORDER BY roundtrip_percentage DESC")
resultDF.take(10)

[Row(start_station_id=35, roundtrip_percentage=0.5752066115702479),
 Row(start_station_id=23, roundtrip_percentage=0.3700787401574803),
 Row(start_station_id=3, roundtrip_percentage=0.2441860465116279),
 Row(start_station_id=36, roundtrip_percentage=0.2125),
 Row(start_station_id=24, roundtrip_percentage=0.19491525423728814),
 Row(start_station_id=14, roundtrip_percentage=0.160741885625966),
 Row(start_station_id=21, roundtrip_percentage=0.14814814814814814),
 Row(start_station_id=34, roundtrip_percentage=0.14516129032258066),
 Row(start_station_id=37, roundtrip_percentage=0.11458333333333333),
 Row(start_station_id=25, roundtrip_percentage=0.11238532110091744)]

## Most Bike Trips Per Day

In [34]:
# overall
resultDF = spark.sql("SELECT start_date_year, start_date_month, start_date_day, count(trip_id) as nbr_trips " +\
                     "FROM trip " +\
                     "GROUP BY start_date_year, start_date_month, start_date_day " +\
                     "ORDER BY nbr_trips DESC")
resultDF.take(10)

[Row(start_date_year=2014, start_date_month=9, start_date_day=15, nbr_trips=1516),
 Row(start_date_year=2014, start_date_month=10, start_date_day=29, nbr_trips=1496),
 Row(start_date_year=2014, start_date_month=10, start_date_day=14, nbr_trips=1496),
 Row(start_date_year=2015, start_date_month=8, start_date_day=26, nbr_trips=1465),
 Row(start_date_year=2014, start_date_month=10, start_date_day=16, nbr_trips=1462),
 Row(start_date_year=2014, start_date_month=10, start_date_day=2, nbr_trips=1452),
 Row(start_date_year=2015, start_date_month=7, start_date_day=28, nbr_trips=1451),
 Row(start_date_year=2015, start_date_month=8, start_date_day=27, nbr_trips=1443),
 Row(start_date_year=2014, start_date_month=9, start_date_day=16, nbr_trips=1438),
 Row(start_date_year=2015, start_date_month=6, start_date_day=11, nbr_trips=1437)]

In [35]:
# in Palo Alto
paloAltoDF = spark.sql("SELECT station_id " +\
                       "FROM station " +\
                       "WHERE landmark = \"Palo Alto\"")
paloAltoDF.createOrReplaceTempView("paloalto")
 
resultDF = spark.sql("SELECT start_date_year, start_date_month, start_date_day, count(trip_id) as nbr_trips " +\
                     "FROM trip JOIN paloalto ON trip.start_station_id = paloalto.station_id " +\
                     "WHERE start_station_id = end_station_id " +\
                     "GROUP BY start_date_year, start_date_month, start_date_day " +\
                     "ORDER BY nbr_trips DESC")
resultDF.take(10)

[Row(start_date_year=2014, start_date_month=10, start_date_day=5, nbr_trips=18),
 Row(start_date_year=2014, start_date_month=9, start_date_day=7, nbr_trips=12),
 Row(start_date_year=2015, start_date_month=7, start_date_day=11, nbr_trips=11),
 Row(start_date_year=2015, start_date_month=4, start_date_day=11, nbr_trips=11),
 Row(start_date_year=2015, start_date_month=1, start_date_day=4, nbr_trips=10),
 Row(start_date_year=2015, start_date_month=8, start_date_day=8, nbr_trips=9),
 Row(start_date_year=2015, start_date_month=7, start_date_day=21, nbr_trips=9),
 Row(start_date_year=2015, start_date_month=7, start_date_day=26, nbr_trips=9),
 Row(start_date_year=2014, start_date_month=9, start_date_day=13, nbr_trips=8),
 Row(start_date_year=2015, start_date_month=8, start_date_day=9, nbr_trips=8)]

In [36]:
# per month
months = spark.sql("SELECT DISTINCT start_date_month " +\
                     "FROM trip " +\
                     "WHERE start_date_year = 2014 " +\
                     "ORDER BY start_date_month").collect()

for r in months:
    print(spark.sql("SELECT start_date_year, start_date_month, start_date_day, count(trip_id) as nbr_trips " +\
              "FROM trip " +\
              "WHERE start_date_year = 2014 AND start_date_month = " + str(r["start_date_month"]) + " " +\
              "GROUP BY start_date_year, start_date_month, start_date_day " +\
              "ORDER BY nbr_trips DESC " + \
              "LIMIT 1").first())
    

Row(start_date_year=2014, start_date_month=9, start_date_day=15, nbr_trips=1516)
Row(start_date_year=2014, start_date_month=10, start_date_day=29, nbr_trips=1496)
Row(start_date_year=2014, start_date_month=11, start_date_day=6, nbr_trips=1410)
Row(start_date_year=2014, start_date_month=12, start_date_day=8, nbr_trips=1363)


In [37]:
# listing all days for a month
resultDF = spark.sql("SELECT start_date_day, start_date_month, count(trip_id) as nbr_trips " +\
                     "FROM trip " +\
                     "WHERE start_date_year = 2014 " +\
                     "GROUP BY start_date_month, start_date_day " +\
                     "ORDER BY start_date_month, nbr_trips DESC")
resultDF.take(10)

[Row(start_date_day=15, start_date_month=9, nbr_trips=1516),
 Row(start_date_day=16, start_date_month=9, nbr_trips=1438),
 Row(start_date_day=17, start_date_month=9, nbr_trips=1429),
 Row(start_date_day=3, start_date_month=9, nbr_trips=1404),
 Row(start_date_day=4, start_date_month=9, nbr_trips=1389),
 Row(start_date_day=11, start_date_month=9, nbr_trips=1381),
 Row(start_date_day=9, start_date_month=9, nbr_trips=1362),
 Row(start_date_day=23, start_date_month=9, nbr_trips=1362),
 Row(start_date_day=22, start_date_month=9, nbr_trips=1353),
 Row(start_date_day=10, start_date_month=9, nbr_trips=1351)]

In [38]:
resultDF.explain()

== Physical Plan ==
*(3) Sort [start_date_month#163 ASC NULLS FIRST, nbr_trips#610L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(start_date_month#163 ASC NULLS FIRST, nbr_trips#610L DESC NULLS LAST, 200)
   +- *(2) HashAggregate(keys=[start_date_month#163, start_date_day#148], functions=[count(trip_id#108L)])
      +- Exchange hashpartitioning(start_date_month#163, start_date_day#148, 200)
         +- *(1) HashAggregate(keys=[start_date_month#163, start_date_day#148], functions=[partial_count(trip_id#108L)])
            +- *(1) Project [cast(Trip ID#74 as bigint) AS trip_id#108L, dayofmonth(cast(cast(unix_timestamp(Start Date#76, MM/dd/yyyy HH:mm, Some(Europe/Zurich)) as timestamp) as date)) AS start_date_day#148, month(cast(cast(unix_timestamp(Start Date#76, MM/dd/yyyy HH:mm, Some(Europe/Zurich)) as timestamp) as date)) AS start_date_month#163]
               +- *(1) Filter (year(cast(cast(unix_timestamp(Start Date#76, MM/dd/yyyy HH:mm, Some(Europe/Zurich)) as timestamp) as