In [1]:
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [2]:
# Read .csv files into individual dataframes
ds1=spark.read.csv('202006-divvy-tripdata.csv', header=True)
ds2=spark.read.csv('202007-divvy-tripdata.csv', header=True)
ds3=spark.read.csv('202008-divvy-tripdata.csv', header=True)
ds4=spark.read.csv('202009-divvy-tripdata.csv', header=True)
ds5=spark.read.csv('202010-divvy-tripdata.csv', header=True)
ds6=spark.read.csv('202011-divvy-tripdata.csv', header=True)
ds7=spark.read.csv('202012-divvy-tripdata.csv', header=True)
ds8=spark.read.csv('202101-divvy-tripdata.csv', header=True)
ds9=spark.read.csv('202102-divvy-tripdata.csv', header=True)
ds10=spark.read.csv('202103-divvy-tripdata.csv', header=True)
ds11=spark.read.csv('202104-divvy-tripdata.csv', header=True)
ds12=spark.read.csv('202105-divvy-tripdata.csv', header=True)

# Merge all datasets to one dataframe
ds=ds1.union(ds2).union(ds3).union(ds4).union(ds5).union(ds6).union(ds7).union(ds8).union(ds9).union(ds10).union(ds11).union(ds12)

# Print number of records and columns
print((ds.count(), len(ds.columns)))

(4073561, 13)


In [3]:
# Show the first row of data
ds.first()

Row(ride_id='8CD5DE2C2B6C4CFC', rideable_type='docked_bike', started_at='2020-06-13 23:24:48', ended_at='2020-06-13 23:36:55', start_station_name='Wilton Ave & Belmont Ave', start_station_id='117', end_station_name='Damen Ave & Clybourn Ave', end_station_id='163', start_lat='41.94018', start_lng='-87.65304', end_lat='41.931931', end_lng='-87.677856', member_casual='casual')

In [4]:
# Show the first five rows of data
ds.show(5)

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|8CD5DE2C2B6C4CFC|  docked_bike|2020-06-13 23:24:48|2020-06-13 23:36:55|Wilton Ave & Belm...|             117|Damen Ave & Clybo...|           163| 41.94018| -87.65304|41.931931|-87.677856|       casual|
|9A191EB2C751D85D|  docked_bike|2020-06-26 07:26:10|2020-06-26 07:31:58|Federal St & Polk St|              41|  Daley Center Plaza|            81|41.872077|-87.629543|41.884241|-87.629634|

In [5]:
# Remove duplicate rows and count all rows again
ds.dropDuplicates().count()

4073561

In [6]:
# Clean date fields
ds.filter("started_at IS NOT NULL and ended_at IS NOT NULL").show(5)
print(ds.count())

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|8CD5DE2C2B6C4CFC|  docked_bike|2020-06-13 23:24:48|2020-06-13 23:36:55|Wilton Ave & Belm...|             117|Damen Ave & Clybo...|           163| 41.94018| -87.65304|41.931931|-87.677856|       casual|
|9A191EB2C751D85D|  docked_bike|2020-06-26 07:26:10|2020-06-26 07:31:58|Federal St & Polk St|              41|  Daley Center Plaza|            81|41.872077|-87.629543|41.884241|-87.629634|

In [7]:
# Create a column for distance traveled in meters using start and end lat-lng
import pyspark.sql.functions as F
ds = ds.withColumn("a", (
        F.pow(F.sin(F.radians(F.col("end_lat") - F.col("start_lat")) / 2), 2) +
        F.cos(F.radians(F.col("start_lat"))) * F.cos(F.radians(F.col("end_lat"))) *
        F.pow(F.sin(F.radians(F.col("end_lng") - F.col("start_lng")) / 2), 2)
    )).withColumn("distance_traveled", F.atan2(F.sqrt(F.col("a")), F.sqrt(-F.col("a") + 1)) * 12742000)

# View the new column to verify it was successfully created
ds.select('start_lat', 'start_lng', 'end_lat', 'end_lng', 'distance_traveled').show(5)

+---------+----------+---------+----------+------------------+
|start_lat| start_lng|  end_lat|   end_lng| distance_traveled|
+---------+----------+---------+----------+------------------+
| 41.94018| -87.65304|41.931931|-87.677856| 2248.317583317838|
|41.872077|-87.629543|41.884241|-87.629634|1352.5960705541047|
|41.884241|-87.629634|41.874053|-87.627716|1143.9287821509456|
|41.945529|-87.646439|41.978353|-87.659753|3812.2639415619205|
| 41.92154|-87.653818| 41.94018| -87.65304|2073.6724042245105|
+---------+----------+---------+----------+------------------+
only showing top 5 rows



In [8]:
# Create a column that finds the date difference and show it in descending order
ds = ds.withColumn('date_diff', F.datediff(F.to_date(ds.ended_at), F.to_date(ds.started_at)))
ds.select('started_at', 'ended_at', 'member_casual', 'date_diff').sort(ds.date_diff.desc()).show(10)

+-------------------+-------------------+-------------+---------+
|         started_at|           ended_at|member_casual|date_diff|
+-------------------+-------------------+-------------+---------+
|2020-09-02 18:34:33|2020-10-10 11:17:54|       casual|       38|
|2021-05-02 02:56:07|2021-06-08 13:37:43|       casual|       37|
|2020-09-06 23:20:29|2020-10-12 11:46:25|       casual|       36|
|2020-07-05 14:25:39|2020-08-09 07:11:06|       casual|       35|
|2020-09-05 08:50:15|2020-10-10 13:43:02|       casual|       35|
|2020-07-05 01:51:06|2020-08-08 12:13:19|       casual|       34|
|2020-07-07 14:36:11|2020-08-09 19:13:11|       casual|       33|
|2020-07-02 19:49:10|2020-08-04 18:00:37|       casual|       33|
|2021-04-02 17:50:00|2021-05-05 22:06:42|       casual|       33|
|2020-07-02 17:26:55|2020-08-04 07:16:12|       casual|       33|
+-------------------+-------------------+-------------+---------+
only showing top 10 rows



In [9]:
# Order the date difference in ascending order
ds.select('started_at', 'ended_at', 'member_casual', 'date_diff').sort(ds.date_diff.asc()).show(10)

+-------------------+-------------------+-------------+---------+
|         started_at|           ended_at|member_casual|date_diff|
+-------------------+-------------------+-------------+---------+
|2020-12-15 11:39:19|2020-11-25 10:16:59|       member|      -20|
|2020-12-15 11:42:13|2020-11-25 10:50:56|       casual|      -20|
|2020-12-15 11:37:49|2020-11-25 09:04:08|       casual|      -20|
|2020-12-15 12:11:41|2020-11-25 20:03:07|       casual|      -20|
|2020-12-15 12:05:43|2020-11-25 20:20:05|       member|      -20|
|2020-12-15 12:15:37|2020-11-25 16:38:59|       casual|      -20|
|2020-12-15 11:51:59|2020-11-25 14:16:40|       casual|      -20|
|2020-12-15 12:12:41|2020-11-25 13:52:21|       member|      -20|
|2020-12-15 11:48:35|2020-11-25 13:21:43|       member|      -20|
|2020-12-15 11:50:32|2020-11-25 14:06:37|       member|      -20|
+-------------------+-------------------+-------------+---------+
only showing top 10 rows



In [10]:
# Remove records where end date comes before start date
ds = ds.filter(col('date_diff').cast(LongType()) >= 0)
ds.count()

4073182

In [11]:
# Order the date difference in ascending order
ds.select('started_at', 'ended_at', 'member_casual', 'date_diff').sort(ds.date_diff.asc()).show(10)

+-------------------+-------------------+-------------+---------+
|         started_at|           ended_at|member_casual|date_diff|
+-------------------+-------------------+-------------+---------+
|2020-06-28 11:40:46|2020-06-28 11:54:49|       casual|        0|
|2020-06-06 14:53:58|2020-06-06 15:18:05|       casual|        0|
|2020-06-17 09:55:08|2020-06-17 10:05:52|       casual|        0|
|2020-06-14 18:48:05|2020-06-14 20:18:44|       casual|        0|
|2020-06-04 17:23:09|2020-06-04 17:50:05|       casual|        0|
|2020-06-06 17:44:41|2020-06-06 17:53:51|       casual|        0|
|2020-06-17 18:36:11|2020-06-17 18:42:58|       member|        0|
|2020-06-28 07:19:49|2020-06-28 08:18:24|       casual|        0|
|2020-06-24 21:29:59|2020-06-24 21:48:24|       member|        0|
|2020-06-24 21:29:09|2020-06-24 21:29:15|       member|        0|
+-------------------+-------------------+-------------+---------+
only showing top 10 rows



In [12]:
# Calculate time difference to find the duration in minutes for each trip using .minute, .second and .hour functions
ds = ds.withColumn('duration_in_min', (ds.date_diff*24*60) + F.hour(ds.ended_at)*60 + F.minute(ds.ended_at) +
                  F.second(ds.ended_at)/60 - F.hour(ds.started_at)*60 - F.minute(ds.started_at) - F.second(ds.started_at)/60)

# Confirm the calculation worked
ds.select('started_at', 'ended_at', 'member_casual', 'date_diff', 'distance_traveled', 'duration_in_min').show(10)

# We can check if the duration of each trip in minutes is correct using started_at and ended_at columns

+-------------------+-------------------+-------------+---------+------------------+------------------+
|         started_at|           ended_at|member_casual|date_diff| distance_traveled|   duration_in_min|
+-------------------+-------------------+-------------+---------+------------------+------------------+
|2020-06-13 23:24:48|2020-06-13 23:36:55|       casual|        0| 2248.317583317838|12.116666666666742|
|2020-06-26 07:26:10|2020-06-26 07:31:58|       member|        0|1352.5960705541047| 5.799999999999973|
|2020-06-23 17:12:41|2020-06-23 17:21:14|       member|        0|1143.9287821509456| 8.550000000000015|
|2020-06-20 01:09:35|2020-06-20 01:28:24|       casual|        0|3812.2639415619205|18.816666666666674|
|2020-06-25 16:59:25|2020-06-25 17:08:48|       casual|        0|2073.6724042245105| 9.383333333333288|
|2020-06-17 18:07:18|2020-06-17 18:18:14|       casual|        0|2073.6724042245105|10.933333333333348|
|2020-06-25 07:24:33|2020-06-25 07:31:11|       member|        0

In [13]:
# View the summary of the calculated fields distance and duration
ds.select('distance_traveled', 'duration_in_min').summary().show()

+-------+------------------+-------------------+
|summary| distance_traveled|    duration_in_min|
+-------+------------------+-------------------+
|  count|           4068146|            4073182|
|   mean|2221.8403058682056| 26.882429780942754|
| stddev|2025.9532818899672| 236.67822964689256|
|    min|               0.0|-120.30000000000004|
|    25%| 865.4421767937478|  7.666666666666614|
|    50%|1674.9411822309112| 14.016666666666575|
|    75%|3018.4110032739354|              25.85|
|    max| 48370.80097108494|           54283.35|
+-------+------------------+-------------------+



In [14]:
# Duration of a trip can't be negative as seen above, further check by sorting
ds.select('started_at', 'ended_at', 'date_diff', 'duration_in_min').sort(ds.duration_in_min.asc()).show(10)

+-------------------+-------------------+---------+-------------------+
|         started_at|           ended_at|date_diff|    duration_in_min|
+-------------------+-------------------+---------+-------------------+
|2020-07-25 15:08:21|2020-07-25 13:08:03|        0|-120.30000000000004|
|2020-10-16 16:44:52|2020-10-16 15:09:51|        0| -95.01666666666664|
|2020-10-16 16:44:53|2020-10-16 15:10:54|        0| -93.98333333333336|
|2020-10-16 16:44:55|2020-10-16 15:11:27|        0| -93.46666666666663|
|2020-10-16 16:44:56|2020-10-16 15:43:14|        0| -61.69999999999998|
|2020-10-16 16:44:58|2020-10-16 15:45:03|        0|-59.916666666666714|
|2020-11-01 01:57:30|2020-11-01 01:00:39|        0|             -56.85|
|2020-11-01 01:56:26|2020-11-01 01:00:29|        0|-55.949999999999996|
|2020-11-01 01:55:57|2020-11-01 01:02:04|        0| -53.88333333333333|
|2020-11-01 01:54:40|2020-11-01 01:01:34|        0|-53.099999999999994|
+-------------------+-------------------+---------+-------------

In [15]:
# Remove negative observations from duration column
ds = ds.filter(col('duration_in_min') >= 0.0)
ds.count()

4063030

In [16]:
# Create a column for day of the week
ds = ds.withColumn("day_of_week", date_format(col('started_at'), 'EEEE'))

# Confirm the calculation worked
ds.select('started_at', 'ended_at', 'member_casual', 'date_diff', 'distance_traveled', 'duration_in_min', 'day_of_week').show(10)

+-------------------+-------------------+-------------+---------+------------------+------------------+-----------+
|         started_at|           ended_at|member_casual|date_diff| distance_traveled|   duration_in_min|day_of_week|
+-------------------+-------------------+-------------+---------+------------------+------------------+-----------+
|2020-06-13 23:24:48|2020-06-13 23:36:55|       casual|        0| 2248.317583317838|12.116666666666742|   Saturday|
|2020-06-26 07:26:10|2020-06-26 07:31:58|       member|        0|1352.5960705541047| 5.799999999999973|     Friday|
|2020-06-23 17:12:41|2020-06-23 17:21:14|       member|        0|1143.9287821509456| 8.550000000000015|    Tuesday|
|2020-06-20 01:09:35|2020-06-20 01:28:24|       casual|        0|3812.2639415619205|18.816666666666674|   Saturday|
|2020-06-25 16:59:25|2020-06-25 17:08:48|       casual|        0|2073.6724042245105| 9.383333333333288|   Thursday|
|2020-06-17 18:07:18|2020-06-17 18:18:14|       casual|        0|2073.67

In [17]:
# Casual vs. member distribution (null represents total number of records)
ds.cube('member_casual').count().show()

+-------------+-------+
|member_casual|  count|
+-------------+-------+
|       casual|1710107|
|       member|2352923|
|         null|4063030|
+-------------+-------+



In [18]:
# Frequency distribution of day of the week
ds.cube('day_of_week').count().show()

+-----------+-------+
|day_of_week|  count|
+-----------+-------+
|  Wednesday| 529720|
|   Thursday| 533708|
|    Tuesday| 503792|
|     Monday| 503884|
|     Friday| 597210|
|     Sunday| 637741|
|   Saturday| 756975|
|       null|4063030|
+-----------+-------+



In [19]:
# Frequency distribution of bike types
ds.cube('rideable_type').count().show()

+-------------+-------+
|rideable_type|  count|
+-------------+-------+
|  docked_bike|2331251|
|electric_bike| 888224|
| classic_bike| 843555|
|         null|4063030|
+-------------+-------+



In [21]:
# Fill null values with 'missing data'
ds = ds.na.fill('missing_data')
ds = ds.orderBy('started_at')
print((ds.count(), len(ds.columns)))

(4063030, 18)


In [22]:
# Drop unnecessary columns to reduce file size of export
ds = ds.drop('ride_id')
ds = ds.drop('start_station_id')
ds = ds.drop('end_station_id')
ds = ds.drop('start_lat')
ds = ds.drop('end_lat')
ds = ds.drop('start_lng')
ds = ds.drop('end_lng')
ds = ds.drop('a')
ds = ds.drop('date_diff')

# Export dataset as a .csv file
ds.repartition(1).write.csv("cyclistic_export.csv", header=True)