In [1]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession \
    .builder \
    .appName("Pysparkexample") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
df1 = spark.read.csv('/data/tripdata/sb.csv', header='true', inferSchema = True)

In [7]:
df2 = spark.read.csv('/data/tripdata/sc.csv', header='true', inferSchema = True)

In [8]:
df1.createOrReplaceTempView('sb')

In [9]:
df2.createOrReplaceTempView('sc')

In [10]:
split_col = F.split(df1['starttime'],' ')

In [11]:
df1_date = df1.withColumn('StartDate',split_col.getItem(0))

In [12]:
df1_new = df1_date.withColumn('StartTime',split_col.getItem(1))

In [13]:
split_col1 = F.split(df1_new['stoptime'],' ')

In [14]:
df1_date1 = df1_new.withColumn('StopDate',split_col1.getItem(0))

In [15]:
df1_latest = df1_date1.withColumn('StopTime',split_col1.getItem(1))

In [33]:
df1_latest.createOrReplaceTempView('Bike_status')

In [34]:
# QUERY 1

# This query shows all the records where women passengers, who are subscribers, made trips between 12am - 7am

# This attributes displayed are bikeid, start date, start time, source, stop time, destination and trip duration 



spark.sql('''

select bikeid, StartDate, StartTime,`start station name` as source, 
StopTime, `end station name` as destination, tripduration as duration
from Bike_status 
where StartTime in (
select StartTime 
from (select startDate, bikeid, StartTime
    from Bike_status 
    where StartTime > "00:00:00.000" and StartTime < "07:00:00.000"
    and gender=2 
    and usertype="Subscriber"
    group by StartDate, bikeid, StartTime 
    order by StartDate))


''').show()

+------+----------+------------+--------------------+------------+--------------------+--------+
|bikeid| StartDate|   StartTime|              source|    StopTime|         destination|duration|
+------+----------+------------+--------------------+------------+--------------------+--------+
|  5240|2020-01-01|00:15:45.968|Boston Public Market|00:24:47.259|Warren St at Chel...|     541|
|  4109|2020-01-01|00:17:31.572|Cross St at Hanov...|00:27:44.063|Child St at North St|     612|
|  2848|2020-01-01|00:22:47.738|Harvard Square at...|00:29:32.671|       191 Beacon St|     404|
|  3490|2020-01-01|00:23:17.814|            Fan Pier|00:34:14.818|South Station - 7...|     657|
|  2296|2020-01-01|00:38:57.431|        Davis Square|00:42:06.455|Powder House Circ...|     189|
|  4373|2020-01-01|00:48:54.138|S Huntington Ave ...|00:59:05.749|Curtis Hall - Sou...|     611|
|  5425|2020-01-01|00:50:36.266|Sennott Park  Bro...|00:56:29.429|Cambridge Main Li...|     353|
|  3892|2020-01-01|00:51:25.26

In [18]:
# QUERY 2

# top 5 riders making maximum number of trips in a day in Bluebikes
spark.sql('''select (count(*)) as number_of_trips, bikeid as id 
from sb 
group by bikeid 
order by number_of_trips desc 
limit 5''').show()

+---------------+----+
|number_of_trips|  id|
+---------------+----+
|            111|4099|
|            106|5055|
|            105|5847|
|            105|3716|
|            104|6006|
+---------------+----+



In [19]:
# QUERY 3

# this query shows the locations where max roundtrips were by the biker who took the maximum roundtrips
spark.sql("select `start station name`, `end station name` from sb where bikeid in (select bikeid from (select * from (select count(*) as monthly_roundtrip, bikeid from(select from_unixtime(CAST(unix_timestamp(starttime)/3600 AS int)*3600) as start,bikeid from sb where ((starttime > '2020-01-01 00:00:00' and starttime < '2020-01-31 11:59:59') and `start station name` = `end station name`)group by CAST(unix_timestamp(starttime)/3600 AS int), bikeid order by bikeid asc) group by bikeid order by monthly_roundtrip desc) limit 1)) and `start station name`=`end station name`  ").show()



+--------------------+--------------------+
|  start station name|    end station name|
+--------------------+--------------------+
|Maverick Square -...|Maverick Square -...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
|Airport T Stop - ...|Airport T Stop - ...|
+--------------------+--------------------+



In [20]:
# QUERY 4

# shows the number of subscribers 
spark.sql('''select count(*) as Bsub from sb where usertype="Subscriber"''').show()

+------+
|  Bsub|
+------+
|114229|
+------+



In [23]:
spark.sql('''
select `start station name`, count(*) as num_trips, avg(tripduration
) as avg_duration_seconds, min(tripduration
) as min_duration_seconds, max(tripduration
) as max_duration_seconds from sb 
group by `start station name`
''').show()

+--------------------+---------+--------------------+--------------------+--------------------+
|  start station name|num_trips|avg_duration_seconds|min_duration_seconds|max_duration_seconds|
+--------------------+---------+--------------------+--------------------+--------------------+
|Soldiers Field Pa...|      314|   895.0987261146497|                 122|                9174|
|Lechmere Station ...|     1068|  1156.3773408239701|                 117|              417880|
|The Eddy - New St...|        9|   319.8888888888889|                  98|                 997|
|Cross St at Hanov...|      792|   956.2323232323232|                  66|               57474|
|       The Lawn on D|      295|   2831.491525423729|                 157|              566439|
|Prudential Center...|      895|  1622.7195530726258|                  74|              363225|
|Williams St at Wa...|      111|    710.927927927928|                 116|                2640|
|Egleston Square -...|       52|  1181.8

In [53]:
#5. Display Bikeids at different start locations.
spark.sql('''select bikeid from Bike_status where `start station name` in (select distinct(`start station name`) from Bike_status)''').show()

+------+
|bikeid|
+------+
|  6005|
|  3168|
|  3985|
|  2692|
|  4978|
|  5538|
|  2900|
|  4001|
|  2431|
|  6128|
|  2712|
|  3369|
|  4770|
|  5240|
|  3062|
|  4483|
|  4109|
|  3326|
|  4687|
|  4978|
+------+
only showing top 20 rows



In [35]:
#6. Display bikeids with trip duration greater than the threshold.
spark.sql('''select bikeid from Bike_status 
where tripduration > (select avg(tripduration) as duration from sb1) 
group by bikeid''').show()



+------+
|bikeid|
+------+
|  4101|
|  3997|
|  3749|
|  5803|
|  2142|
|  3175|
|  5518|
|  4935|
|  3794|
|  5156|
|  2122|
|  3918|
|  3226|
|  3179|
|  4219|
|  3698|
|  2387|
|  2580|
|  3475|
|  4929|
+------+
only showing top 20 rows



In [38]:
#7. Display Peak hours of a day
spark.sql('''select from_unixtime(CAST(unix_timestamp(starttime)/3600 AS int)*3600), count(*) 
from sb group by CAST(unix_timestamp(starttime)/3600 AS int) 
order by count(*) desc''').show(5)


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|from_unixtime(CAST((CAST((CAST(unix_timestamp(starttime, yyyy-MM-dd HH:mm:ss) AS DOUBLE) / CAST(3600 AS DOUBLE)) AS INT) * 3600) AS BIGINT), yyyy-MM-dd HH:mm:ss)|count(1)|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|                                                                                                                                              2020-01-28 08:00:00|     858|
|                                                                                                                                              2020-01-27 08:00:00|     842|
|                                                                                                                                      

In [39]:
# 8. Display the average, Minimum and maximum trip duration per start station.
spark.sql('''
select `start station name`, count(*) as num_trips, avg(tripduration
) as avg_duration_seconds, min(tripduration
) as min_duration_seconds, max(tripduration
) as max_duration_seconds from sb 
group by `start station name`
''').show()


+--------------------+---------+--------------------+--------------------+--------------------+
|  start station name|num_trips|avg_duration_seconds|min_duration_seconds|max_duration_seconds|
+--------------------+---------+--------------------+--------------------+--------------------+
|Soldiers Field Pa...|      314|   895.0987261146497|                 122|                9174|
|Lechmere Station ...|     1068|  1156.3773408239701|                 117|              417880|
|The Eddy - New St...|        9|   319.8888888888889|                  98|                 997|
|Cross St at Hanov...|      792|   956.2323232323232|                  66|               57474|
|       The Lawn on D|      295|   2831.491525423729|                 157|              566439|
|Prudential Center...|      895|  1622.7195530726258|                  74|              363225|
|Williams St at Wa...|      111|    710.927927927928|                 116|                2640|
|Egleston Square -...|       52|  1181.8

In [43]:
pip install ipyleaflet

Note: you may need to restart the kernel to use updated packages.


In [47]:
from ipyleaflet import Map, Heatmap
from random import uniform
m = Map(center=(42.3733121258, -71.0410200806), zoom=13)

bike_lat_lng_df = spark.sql('''select `start station latitude`, `start station longitude` from Bike_status where bikeid = 30326''')
bike_locations = list()
for row in bike_lat_lng_df.rdd.collect():
    bike_locations.append((row["start station latitude"], row["start station longitude"], 89)) # lat, lng, intensity
    
heatmap = Heatmap(
    locations=bike_locations,
    radius=20
)
m.add_layer(heatmap);
m

Map(center=[42.3733121258, -71.0410200806], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_i…