# Bike share system EDA: Trip Time Series Data 2017-Present

In [15]:
# intended to be submitted via pyspark but produce results on the command line.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from pyspark.sql.functions import expr, col, column, lit, to_date, coalesce, to_timestamp

import plotly.express as px

In [16]:
import os
memory = '20g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [17]:
def to_timestamp_(col, formats=("MM/dd/yyyy hh:mm:ss", "yyyy-MM-dd hh:mm:ss")):
    # Spark 2.2 or later syntax, for < 2.2 use unix_timestamp and cast
    return coalesce(*[to_timestamp(col, f) for f in formats])

In [18]:
spark = SparkSession.builder.appName("Sample bike data").getOrCreate()
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

df = spark.read.load("alldata.parquet")
df.createOrReplaceTempView("bikedata")

In [19]:
# # we only care about data before 2017
# bike_data_sql = """
# select TO_DATE(starttime) trip_date, city, (unix_timestamp(stoptime) - unix_timestamp(starttime))/60 as trip_duration_minutes 
# from bikedata
# WHERE starttime >= cast('2017-01-01 00:00:00' AS TIMESTAMP)
# """

# df = spark.sql(bike_data_sql)
# df.createOrReplaceTempView("bikedata")

In [20]:
bike_data_sql = """
select TO_DATE(starttime) trip_date, city, (unix_timestamp(stoptime) - unix_timestamp(starttime))/60 as trip_duration_minutes 
from bikedata
WHERE starttime >= cast('2017-01-01 00:00:00' AS TIMESTAMP)
"""

df = spark.sql(bike_data_sql)
df.createOrReplaceTempView("bikedata")

# filter out trips longer than 24 hours
bike_data_sql = """
select *
from bikedata
WHERE trip_duration_minutes < 1440
"""

df = spark.sql(bike_data_sql)
df.createOrReplaceTempView("bikedata")

df.show(20, False)

+----------+------+---------------------+
|trip_date |city  |trip_duration_minutes|
+----------+------+---------------------+
|2021-09-01|boston|10.85                |
|2021-09-01|boston|16.033333333333335   |
|2021-09-01|boston|13.35                |
|2021-09-01|boston|17.166666666666668   |
|2021-09-01|boston|18.55                |
|2021-09-01|boston|8.15                 |
|2021-09-01|boston|38.5                 |
|2021-09-01|boston|4.8                  |
|2021-09-01|boston|17.8                 |
|2021-09-01|boston|6.283333333333333    |
|2021-09-01|boston|5.416666666666667    |
|2021-09-01|boston|68.5                 |
|2021-09-01|boston|8.016666666666667    |
|2021-09-01|boston|8.15                 |
|2021-09-01|boston|9.866666666666667    |
|2021-09-01|boston|6.733333333333333    |
|2021-09-01|boston|2.0166666666666666   |
|2021-09-01|boston|6.066666666666666    |
|2021-09-01|boston|2.8333333333333335   |
|2021-09-01|boston|14.716666666666667   |
+----------+------+---------------

In [21]:
trip_data_group_by_date_sql = """
SELECT 
    trip_date,
    city,
    percentile(trip_duration_minutes, 0.5) median_trip_duration_minutes,
    count(*) as trip_count
FROM bikedata
GROUP BY trip_date, city
ORDER BY trip_date"""


trip_data_group_by_date_df = spark.sql(trip_data_group_by_date_sql)

trip_data_group_by_date_df.show(20, False)

+----------+------+----------------------------+----------+
|trip_date |city  |median_trip_duration_minutes|trip_count|
+----------+------+----------------------------+----------+
|2017-01-01|nyc   |11.1                        |16273     |
|2017-01-01|boston|9.216666666666667           |481       |
|2017-01-02|nyc   |7.45                        |9061      |
|2017-01-02|boston|8.7                         |801       |
|2017-01-03|nyc   |8.133333333333333           |14543     |
|2017-01-03|boston|8.633333333333333           |651       |
|2017-01-04|nyc   |8.966666666666667           |34669     |
|2017-01-04|boston|9.016666666666667           |1534      |
|2017-01-05|boston|8.7                         |1330      |
|2017-01-05|nyc   |8.316666666666666           |28942     |
|2017-01-06|nyc   |8.366666666666667           |24599     |
|2017-01-06|boston|8.083333333333334           |835       |
|2017-01-07|boston|7.175000000000001           |106       |
|2017-01-07|nyc   |7.816666666666666    

In [34]:
trip_data_group_by_date_pd_df = trip_data_group_by_date_df.toPandas()

# trip_data_group_by_date_pd_df[trip_data_group_by_date_pd_df['city'] == 'nyc']['trip_count_sma_7d'] = \
#     trip_data_group_by_date_pd_df[trip_data_group_by_date_pd_df['city'] == 'nyc']['trip_count'].rolling(window=7).mean()

# trip_data_group_by_date_pd_df['trip_count_sma_14d'] = trip_data_group_by_date_pd_df['trip_count'].rolling(window=14).mean()
trip_data_group_by_date_pd_df.head()
trip_data_group_by_date_pd_df.describe()



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



Unnamed: 0,median_trip_duration_minutes,trip_count
count,5017.0,5017.0
mean,11.04952,22880.181981
std,2.543224,28472.596177
min,6.033333,21.0
25%,9.333333,4218.0
50%,10.45,7545.0
75%,12.133333,37372.0
max,28.416667,137709.0


In [30]:
trip_data_group_by_date_pd_df.to_csv('2017_present_trip_data.csv')

In [31]:
fig = px.line(trip_data_group_by_date_pd_df, x="trip_date", y="median_trip_duration_minutes", color='city')
fig.show()

In [32]:
fig = px.line(trip_data_group_by_date_pd_df, x="trip_date", y="trip_count", color='city')
fig.show()

In [35]:
fig = px.line(trip_data_group_by_date_pd_df, x="trip_date", y="trip_count_ma_7d", color='city')
fig.show()

ValueError: Value of 'y' is not the name of a column in 'data_frame'. Expected one of ['trip_date', 'city', 'median_trip_duration_minutes', 'trip_count'] but received: trip_count_ma_7d

In [None]:
# fig = px.line(trip_data_group_by_date_pd_df, x="trip_date", y="trip_count_ma_14d", color='city')
# fig.show()