# Trip, Station Data Analysis in Spark

In [25]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from datetime import datetime

import sys
import operator
import re, string
assert sys.version_info >= (3, 5)  # make sure we have Python 3.5+
DATA_PATH='/user/chowkec/capitalbikeshare/data/'
OUTPUT_PATH='/user/chowkec/capitalbikeshare/output/'
spark = SparkSession.builder.appName('analysis').getOrCreate()
#assert spark.version >= '2.4' # make sure we have Spark 2.4+
spark.sparkContext.setLogLevel('WARN')

## Load trip and station data

In [26]:
# create trip data schema
tripdata_schema = types.StructType([
    types.StructField('duration', types.IntegerType()),
    types.StructField('start_date', types.TimestampType()),
    types.StructField('end_date', types.TimestampType()),
    types.StructField('start_station_number', types.StringType()),
    types.StructField('start_station', types.StringType()),
    types.StructField('end_station_number', types.StringType()),
    types.StructField('end_station', types.StringType()),
    types.StructField('bike_number', types.StringType()),
    types.StructField('member_type', types.StringType()),  
])

In [27]:
# read trip info data
trip_data = spark.read.csv(DATA_PATH+'trip', header=True, schema=tripdata_schema)
trip_data.createOrReplaceTempView('tripdata_table')

In [27]:
# read trip info data
df=

In [28]:
# read station info data
station_data = spark.read.csv(DATA_PATH+"stationinfo", header=True)
station_data.createOrReplaceTempView('stationdata_table')

In [29]:
# Create station info dataframe with useful info 
station_info_df = spark.sql("""
    SELECT station_id, external_id, name, short_name,
    lat, lon, region_id, capacity FROM stationdata_table
""")
station_info_df.createOrReplaceTempView('station_info_table')
station_info_df.show(10)

+----------+--------------------+--------------------+----------+-----------------+------------------+---------+--------+
|station_id|         external_id|                name|short_name|              lat|               lon|region_id|capacity|
+----------+--------------------+--------------------+----------+-----------------+------------------+---------+--------+
|         1|082469cc-1f3f-11e...| Eads St & 15th St S|     31000|        38.858971|         -77.05323|       41|      15|
|         2|08246b69-1f3f-11e...| 18th St & S Eads St|     31001|         38.85725|         -77.05332|       41|      11|
|         3|08246c35-1f3f-11e...|Crystal Dr & 20th...|     31002|        38.856425|        -77.049232|       41|      17|
|         4|08246cd5-1f3f-11e...|Crystal Dr & 15th...|     31003|         38.86017|        -77.049593|       41|      11|
|         5|08246d68-1f3f-11e...|Aurora Hills Cmty...|     31004|        38.857866|         -77.05949|       41|      11|
|         6|08246df5-1f3

In [30]:
station_info_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"station_info", header=True)

## Find popular start stations by total trip counts / percentage

In [31]:
# select useful info and sum station count group by start station name 
pop_start_station_df = spark.sql("""
    SELECT start_station, start_station_number, 
    COUNT(start_station) AS station_count FROM 
    tripdata_table GROUP BY start_station, 
    start_station_number ORDER BY 3 DESC
""")
pop_start_station_df.createOrReplaceTempView('pop_start_station_table')

# join lontitude and latitude info from station data to trip data
pop_start_station_df = spark.sql("""
    SELECT p.start_station, p.start_station_number, p.station_count, 
    s.short_name, s.lat, s.lon FROM pop_start_station_table as p, 
    station_info_table as s WHERE p.start_station_number = s. short_name
""")
pop_start_station_df.createOrReplaceTempView('pop_start_station_table')

# calculate the trip percentage of each station
total_df = spark.sql("""
    SELECT SUM(station_count) AS total_count FROM pop_start_station_table
""")
total_count = total_df.head()[0]
pop_start_station_df = pop_start_station_df.withColumn \
    ('percentage', round(pop_start_station_df['station_count']/total_count, 8))
pop_start_station_df.createOrReplaceTempView('pop_start_station_table')

pop_start_station_df = spark.sql("""
    SELECT * FROM pop_start_station_table ORDER BY station_count DESC
""")
pop_start_station_df.show(10)

+--------------------+--------------------+-------------+----------+---------+----------+----------+
|       start_station|start_station_number|station_count|short_name|      lat|       lon|percentage|
+--------------------+--------------------+-------------+----------+---------+----------+----------+
|Columbus Circle /...|               31623|       527055|     31623| 38.89696| -77.00493|0.02055683|
|Massachusetts Ave...|               31200|       458046|     31200|  38.9101|  -77.0444|0.01786526|
|    Lincoln Memorial|               31258|       388464|     31258|38.888251|-77.049426|0.01515134|
|      15th & P St NW|               31201|       361890|     31201| 38.90985|-77.034438|0.01411487|
|Jefferson Dr & 14...|               31247|       354454|     31247|38.888553|-77.032429|0.01382484|
|New Hampshire Ave...|               31229|       293267|     31229| 38.91554| -77.03818|0.01143835|
|       Thomas Circle|               31241|       283118|     31241|  38.9059|  -77.0325|0.

In [32]:
pop_start_station_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"top_n_start_stations", header=True)

## Find popular end stations by total trip counts and percentage

In [33]:
# select useful info and sum station count group by end station name 
pop_end_station_df = spark.sql("""
    SELECT end_station, end_station_number, COUNT(*) AS 
    station_count FROM tripdata_table GROUP BY end_station, 
    end_station_number ORDER BY 3 DESC
""")
pop_end_station_df.createOrReplaceTempView('pop_end_station_table')

# join lontitude and latitude info from station data to trip data
pop_end_station_df = spark.sql("""
    SELECT p.end_station, p.end_station_number, p.station_count, 
    s.short_name, s.lat, s.lon FROM pop_end_station_table as p, 
    station_info_table as s WHERE p.end_station_number = s. short_name
""")
pop_end_station_df.createOrReplaceTempView('pop_end_station_table')

# calculate the trip percentage of each station
total_df = spark.sql("""
    SELECT SUM(station_count) AS total_count FROM pop_end_station_table
""")
total_count = total_df.head()[0]
pop_end_station_df = pop_end_station_df.withColumn \
    ('percentage', round(pop_end_station_df['station_count']/total_count, 8))
pop_end_station_df.createOrReplaceTempView('pop_end_station_table')

pop_end_station_df = spark.sql("""
    SELECT * FROM pop_end_station_table ORDER BY station_count DESC
""")
pop_end_station_df.show()

+--------------------+------------------+-------------+----------+------------------+------------------+----------+
|         end_station|end_station_number|station_count|short_name|               lat|               lon|percentage|
+--------------------+------------------+-------------+----------+------------------+------------------+----------+
|Columbus Circle /...|             31623|       547690|     31623|          38.89696|         -77.00493|0.02136259|
|Massachusetts Ave...|             31200|       515022|     31200|           38.9101|          -77.0444|0.02008837|
|      15th & P St NW|             31201|       396198|     31201|          38.90985|        -77.034438|0.01545366|
|    Lincoln Memorial|             31258|       387459|     31258|         38.888251|        -77.049426|0.01511279|
|Jefferson Dr & 14...|             31247|       365544|     31247|         38.888553|        -77.032429|  0.014258|
|      14th & V St NW|             31101|       305834|     31101|      

In [34]:
pop_end_station_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"top_n_end_stations", header=True)

## Find popular paths by total trip counts and percentage

In [35]:
# select useful info and sum station count group by end station name and end station name
pop_path_df = spark.sql("""
    SELECT duration, start_station, start_station_number, end_station, 
    end_station_number FROM tripdata_table
""")
pop_path_df = pop_path_df.createOrReplaceTempView('path_table')
pop_path_df = spark.sql("""
    SELECT start_station, start_station_number, end_station,  
    end_station_number, COUNT(*) AS path_count 
    FROM path_table GROUP BY start_station,start_station_number, 
    end_station, end_station_number ORDER BY 3 DESC
""")
pop_path_df.createOrReplaceTempView('pop_path_table')

# calculate the trip percentage of each path
total_df = spark.sql("""
    SELECT SUM(path_count) AS total_count FROM pop_path_table
""")
total_count = total_df.head()[0]
pop_path_df = pop_path_df.withColumn \
    ('percentage', round(pop_path_df['path_count']/total_count, 8))
pop_path_df.createOrReplaceTempView('pop_path_table')

# append lontitude and latitude info to table
# which may useful in visualization
# add start station lat. and lon. info
pop_path_df = spark.sql("""
    SELECT p.*, s.lon AS start_lon, s.lat As start_lat 
    FROM pop_path_table p JOIN station_info_table s ON
    p.start_station_number = s.short_name
""")
pop_path_df.createOrReplaceTempView('pop_path_table')

# add end station lat. and lon. info
pop_path_df = spark.sql("""
    SELECT p.*, s.lon AS end_lon, s.lat As end_lat 
    FROM pop_path_table p JOIN station_info_table s ON
    p.end_station_number = s.short_name
""")
pop_path_df.createOrReplaceTempView('pop_path_table')

pop_path_df.show()
pop_path_df = spark.sql("""
    SELECT * FROM pop_path_table ORDER BY path_count DESC 
""")
pop_path_df.show()

+--------------------+--------------------+--------------------+------------------+----------+----------+------------------+------------------+---------+---------+
|       start_station|start_station_number|         end_station|end_station_number|path_count|percentage|         start_lon|         start_lat|  end_lon|  end_lat|
+--------------------+--------------------+--------------------+------------------+----------+----------+------------------+------------------+---------+---------+
|Georgia Ave & Eme...|               31405|Yuma St & Tenley ...|             31316|         2|    8.0E-8|        -77.027333|         38.949662|-77.08059|38.946182|
|Calvert & Biltmor...|               31106|Yuma St & Tenley ...|             31316|         3|    1.2E-7|        -77.047637|         38.923203|-77.08059|38.946182|
|Wisconsin Ave & N...|               31302|Yuma St & Tenley ...|             31316|       214|   8.34E-6|        -77.072755|         38.934881|-77.08059|38.946182|
|Friendship Hts 

In [36]:
pop_path_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"top_n_paths", header=True)

## Generate the number of weekday trip 

In [37]:
# extract only date from the date-time
weekday_df = spark.sql("""
    SELECT start_date, CAST(start_date AS DATE) 
    AS only_date, end_date FROM tripdata_table
""")
weekday_df.createOrReplaceTempView('date_table')

# cast date to particular weekday 
weekday_df = spark.sql("""
    SELECT start_date, end_date, WEEKDAY(only_date) 
    As date_description FROM date_table
""")
weekday_df.createOrReplaceTempView('weekday_table')

# sum the trips of each weekday
weekday_df = spark.sql("""
    SELECT date_description, COUNT(*) AS number_of_trip FROM 
    weekday_table GROUP BY date_description 
    ORDER BY date_description
""")
weekday_df.show(10)

+----------------+--------------+
|date_description|number_of_trip|
+----------------+--------------+
|               0|       3551732|
|               1|       3711240|
|               2|       3809666|
|               3|       3776861|
|               4|       3795319|
|               5|       3668264|
|               6|       3360272|
+----------------+--------------+



In [None]:
weekday_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"number_of_weekday_trip", header=True)

## Generate the number of hourly trip 

In [None]:
# extract only hour from the date-time
hourly_df = spark.sql("""
    SELECT start_date, EXTRACT(HOUR FROM start_date)
    AS start_hour, end_date, EXTRACT(HOUR FROM end_date) 
    AS end_hour FROM tripdata_table
""")
hourly_df.createOrReplaceTempView('hourly_table')

# sum the trips of each hour
hourly_df = spark.sql("""
    SELECT start_hour, COUNT(*) AS number_of_trip FROM 
    hourly_table GROUP BY start_hour ORDER BY start_hour
""")
hourly_df.show(10)

In [None]:
hourly_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"number_of_hourly_trip", header=True)

## Generate trip duration by minutes

In [None]:
# cast each trip duration from seconds to minutes
duration_by_minutes_df = spark.sql("""SELECT duration FROM tripdata_table""")
duration_by_minutes_df.createOrReplaceTempView("duration_table")
duration_by_minutes_df = spark.sql("""
    SELECT duration, floor(duration/60) AS 
    duration_by_minutes FROM duration_table
""")
duration_by_minutes_df.createOrReplaceTempView("duration_by_minutes_table")

# sum trip duration by each time range
duration_by_minutes_df = spark.sql("""
    SELECT duration_by_minutes, count(*) AS 
    duration_range_count FROM duration_by_minutes_table GROUP BY
    duration_by_minutes ORDER BY duration_by_minutes
""")
duration_by_minutes_df.createOrReplaceTempView("duration_by_minutes_table")

# use 120min as a criterion to split time
duration_less120_df = spark.sql("""
    SELECT duration_by_minutes, duration_range_count FROM 
    duration_by_minutes_table WHERE duration_by_minutes < 120
""")
duration_more120_df = spark.sql("""
    SELECT sum(duration_range_count) AS duration_count FROM 
    duration_by_minutes_table WHERE duration_by_minutes > 120
""")
duration_more120_df.createOrReplaceTempView("duration_more120_table")
duration_more120_df = duration_more120_df.withColumn('duration_by_minutes', lit(120))
col = duration_more120_df['duration_count']
duration_more120_df = duration_more120_df.withColumn('duration_range_count', col)
duration_more120_df = duration_more120_df.drop('duration_count')
duration_range_df = duration_less120_df.union(duration_more120_df)
duration_range_df.show(10)

In [None]:
duration_range_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"duration_by_minutes", header=True)

## Generate hourly station activities (Borrow, Return, Net = (Return - Borrow))

In [None]:
# extract only hour from start date
borrow_df = spark.sql("""
    SELECT start_station, start_station_number,
    EXTRACT(HOUR FROM start_date) AS start_hour FROM tripdata_table
""")
borrow_df.createOrReplaceTempView("borrow_activity_table")

# sum the borrow activity of each station
borrow_df = spark.sql("""
    SELECT start_station, start_station_number, 
    start_hour, count(*) AS borrow_activity FROM borrow_activity_table 
    GROUP BY start_station, start_station_number, start_hour 
    ORDER BY start_station, start_hour
""")
borrow_df.createOrReplaceTempView("borrow_activity_table")

# extract only hour from end date
return_df = spark.sql("""
    SELECT end_station, end_station_number, 
    EXTRACT(HOUR FROM end_date) AS end_hour FROM tripdata_table
""")

# sum the return activity of each station
return_df.createOrReplaceTempView("return_activity_table")
return_df = spark.sql("""
    SELECT end_station, end_station_number, end_hour, 
    count(*) AS return_activity FROM return_activity_table 
    GROUP BY end_station, end_station_number, end_hour 
    ORDER BY end_station, end_hour
""")
return_df.createOrReplaceTempView("return_activity_table")

# join borrow activity and return activity table together
activity_df = spark.sql("""
    SELECT b.*, r.return_activity FROM borrow_activity_table b 
    JOIN return_activity_table r ON 
    (b.start_station_number = r.end_station_number AND 
    b.start_hour = r.end_hour) ORDER BY end_station, end_hour
""")
activity_df.createOrReplaceTempView("activity_table")

# calcute net(return - borrow) activity of each station
activity_df = spark.sql("""
    SELECT start_station AS station_name, start_station_number
    AS station_number, start_hour AS hour, borrow_activity, 
    return_activity, (return_activity - borrow_activity) 
    AS net_activity FROM activity_table
""")
activity_df.createOrReplaceTempView("activity_table")

# append lontitude and latitude info to table
# which may useful in visualization
activity_df = spark.sql("""
    SELECT a.*, s.lon, s.lat FROM activity_table a 
    JOIN station_info_table s ON 
    a.station_number = s.short_name
""")
activity_df = spark.sql("""
    SELECT a.*, s.lon, s.lat FROM activity_table a 
    JOIN station_info_table s ON a.station_number = s.short_name
""")
activity_df.show(30)


In [None]:
activity_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"station_activity_by_hours", header=True)

## Show yearly membership changes

In [None]:
# extract only year from the date-time
membership_df = spark.sql("""
    SELECT EXTRACT(YEAR FROM start_date)AS year, 
    member_type FROM tripdata_table
""")
membership_df.createOrReplaceTempView("membership_table")

# sum differnt memebership by year
membership_df = spark.sql("""
    SELECT *, COUNT(*) AS count FROM membership_table GROUP 
    BY year, member_type ORDER BY year, member_type
""")
membership_df.show(30)

In [None]:
membership_df.coalesce(1).write.mode("overwrite").csv(OUTPUT_PATH+"memebership_changes", header=True)