In [1]:
!pip install pyspark
!pip install geopy
!pip install -U kaleido

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
#importing necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Circle
import random
import folium
from folium import plugins
from time import time
import glob
import datetime

#setting plot style to seaborn
plt.style.use('seaborn')

import warnings
warnings.filterwarnings('ignore')

#Initialize geocoding package
from geopy.geocoders import Nominatim

geolocator = Nominatim(user_agent="my_request")

import calendar
import holidays

us_holidays = holidays.US()

# plotly
import plotly.express as px
import plotly.graph_objects as go
import scipy
from plotly.subplots import make_subplots
from scipy.stats import norm

# ML Packages
from scipy.stats import norm
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import PolynomialFeatures
from sklearn import preprocessing
from sklearn.linear_model import RidgeCV
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import make_pipeline
from sklearn.pipeline import make_pipeline

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import os
import pyspark
conf = pyspark.SparkConf()

conf.set('spark.ui.proxyBase', '/user/' + 'proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', False)
conf.set('spark.driver.memory','4g')
sc= pyspark.SparkContext(conf=conf)

spark = pyspark.SQLContext.getOrCreate(sc)

In [5]:
if not os.path.exists('/content/data/2021_22/'):
  os.makedirs('/content/data/2021_22/')

if not os.path.exists('/content/data/2013_20/'):
  os.makedirs('/content/data/2013_20/')

#Unzip data from 2021-22
!unzip /content/drive/MyDrive/BigDataProject/citibike_trip_2021-22.zip -d /content/data/2021_22/
#unzip data from 2012-20
!unzip /content/drive/MyDrive/BigDataProject/citibike-trip_2013-20.zip -d /content/data/2013_20/

Archive:  /content/drive/MyDrive/BigDataProject/citibike_trip_2021-22.zip
replace /content/data/2021_22/202210-citibike-tripdata.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: N
Archive:  /content/drive/MyDrive/BigDataProject/citibike-trip_2013-20.zip
replace /content/data/2013_20/__MACOSX/._citibike-trip_2013-20? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


#Load and Process files from 2021-22

In [6]:
#get list of files
path = '/content/data/2021_22/'
dir_list = os.listdir(path)
dir_list.sort()
if dir_list[-1] == '__MACOSX':
  dir_list = dir_list[:-1]

filePaths =[path + fileName for fileName in dir_list]
filePaths

['/content/data/2021_22/202102-citibike-tripdata.csv',
 '/content/data/2021_22/202103-citibike-tripdata.csv',
 '/content/data/2021_22/202104-citibike-tripdata.csv',
 '/content/data/2021_22/202105-citibike-tripdata.csv',
 '/content/data/2021_22/202106-citibike-tripdata.csv',
 '/content/data/2021_22/202107-citibike-tripdata.csv',
 '/content/data/2021_22/202108-citibike-tripdata.csv',
 '/content/data/2021_22/202109-citibike-tripdata.csv',
 '/content/data/2021_22/202110-citibike-tripdata.csv',
 '/content/data/2021_22/202111-citibike-tripdata.csv',
 '/content/data/2021_22/202112-citibike-tripdata.csv',
 '/content/data/2021_22/202201-citibike-tripdata.csv',
 '/content/data/2021_22/202202-citibike-tripdata.csv',
 '/content/data/2021_22/202203-citibike-tripdata.csv',
 '/content/data/2021_22/202204-citibike-tripdata.csv',
 '/content/data/2021_22/202205-citibike-tripdata.csv',
 '/content/data/2021_22/202206-citbike-tripdata.csv',
 '/content/data/2021_22/202207-citbike-tripdata.csv',
 '/content/d

In [7]:
#read data into spark

start = time()

df = spark.read\
     .option('inferSchema', 'true')\
     .option('header', 'true')\
     .csv(filePaths)

load_end = time()

print(f'''2021-22 Data loaded to spark in {(load_end - start)/60.0} mins''')

#remove nulls
df = df.na.drop("any")

from pyspark.sql.types import IntegerType,BooleanType,DateType, TimestampType, DecimalType, StringType
from pyspark.sql.functions import udf, col, dayofweek, dayofmonth, dayofyear, hour, month, quarter, year, to_date, weekofyear
from pyspark.sql.functions import sum, count, mean

#define udf to reverse geolocate buroughs, counties, neighbourhoods
manhattan_neighbourhood_udf = udf(lambda lat, lng, b: geolocator.reverse([lat, lng])[0].split(', ')[-7] if b=='Manhattan'  else '', StringType())
burough_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-6], StringType())
county_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-5], StringType())
city_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-4], StringType())

#Convert Data types of locations and timestamps
# df = df.withColumn("started_at",df.started_at.cast(TimestampType())) \
#   .withColumn("ended_at",df.ended_at.cast(TimestampType())) \
#   .withColumn("start_lat",df.start_lat.cast(DecimalType(20, 10))) \
#   .withColumn("start_lng",df.start_lng.cast(DecimalType(20, 10))) \
#   .withColumn("end_lat",df.end_lat.cast(DecimalType(20, 10))) \
#   .withColumn("end_lng",df.end_lng.cast(DecimalType(20, 10))) \
#   .withColumn("trip_duration",col('ended_at').cast('long') - col('started_at').cast('long')) \
#   .withColumn("startBurough", burough_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("startManhattanNeighbourhood", manhattan_neighbourhood_udf(col('start_lat'), col('start_lng'), col('startBurough'))) \
#   .withColumn("startCounty", county_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("startCity", city_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("endBurough", burough_udf(col('end_lat'), col('end_lng'))) \
#   .withColumn("endManhattanNeighbourhood", manhattan_neighbourhood_udf(col('end_lat'), col('end_lng'), col('endBurough'))) \
#   .withColumn("endCounty", county_udf(col('end_lat'), col('end_lng'))) \
#   .withColumn("endCity", city_udf(col('end_lat'), col('end_lng'))) \

#Get informations from the start and end dates
df = df.withColumn("startDate", to_date(col('started_at'))) \
       .withColumn('startDOW', dayofweek(col('started_at'))) \
       .withColumn('startDOM', dayofmonth(col('started_at'))) \
       .withColumn('startDOY', dayofyear(col('started_at'))) \
       .withColumn('startHour', hour(col('started_at'))) \
       .withColumn('startMonth', month(col('started_at'))) \
       .withColumn('startQuarter', quarter(col('started_at'))) \
       .withColumn('startWOY', weekofyear(col('started_at'))) \
       .withColumn('startYear', year(col('started_at'))) \
       .groupby(col('startDate').alias('date'), \
                col('startYear').alias('year'), \
                col('startMonth').alias('month'), \
                col('startWOY').alias('week'), \
                col('startDOM').alias('Day'), \
                col('startDOW').alias('DOW'), \
                col('startDOY').alias('DOY')) \
       .agg(count(col('ride_id'))) \
       .withColumnRenamed('count(ride_id)', 'count') \
       .toPandas()
       
df.to_csv('/content/data/aggRidership.csv')

save_end = time()

print(f'''2021-22 Aggregated Data Processed and Saved in {(save_end - load_end)/60.0} mins''')
       
      #  .withColumn("startDate", to_date(col('started_at'))) \
      #  .withColumn("startDOW", dayofweek(col('started_at'))) \
      #  .withColumn('startDOM', dayofmonth(col('started_at'))) \
      #  .withColumn('startDOY', dayofyear(col('started_at'))) \
      #  .withColumn('startHour', hour(col('started_at'))) \
      #  .withColumn('startMonth', month(col('started_at'))) \
      #  .withColumn('startQuarter', quarter(col('started_at'))) \
      #  .withColumn('startWOY', weekofyear(col('started_at'))) \
      #  .withColumn('startYear', year(col('ended_at'))) \

2021-22 Data loaded to spark in 2.177092456817627 mins
2021-22 Aggregated Data Processed and Saved in 1.5878947814305624 mins


#Load and process 2013-20 Data

In [8]:
#get list of files
path = '/content/data/2013_20/'
# dir_list = os.listdir(path)
# dir_list.sort()
# if dir_list[-1] == '__MACOSX':
#   dir_list = dir_list[:-1]

filePaths = glob.glob(path + '*/*.csv', recursive=False)

#filePaths =[path + fileName for fileName in dir_list]
filePaths.sort()

filePaths

['/content/data/2013_20/citibike-trip_2013-20/2013-07 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2013-08 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2013-09 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2013-10 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2013-11 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2013-12 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/201306-citibike-tripdata.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2014-01 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2014-02 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2014-03 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2014-04 - Citi Bike trip data.csv',
 '/content/data/2013_20/citibike-trip_2013-20/2014-05 - Citi Bike trip data.csv',
 '/content/data/2013_

In [9]:
#read data into spark

start = time()

df = spark.read\
     .option('inferSchema', 'true')\
     .option('header', 'true')\
     .csv(filePaths)

load_end = time()

print(f'''2013-20 Data loaded to spark in {(load_end - start)/60.0} mins''')

#remove null rows
#df = df.na.drop("all")

from pyspark.sql.types import IntegerType,BooleanType,DateType, TimestampType, DecimalType, StringType
from pyspark.sql.functions import udf, col, dayofweek, dayofmonth, dayofyear, hour, month, quarter, year, to_date, weekofyear
from pyspark.sql.functions import sum, count, mean

#define udf to reverse geolocate buroughs, counties, neighbourhoods
manhattan_neighbourhood_udf = udf(lambda lat, lng, b: geolocator.reverse([lat, lng])[0].split(', ')[-7] if b=='Manhattan'  else '', StringType())
burough_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-6], StringType())
county_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-5], StringType())
city_udf = udf(lambda lat, lng: geolocator.reverse([lat, lng])[0].split(', ')[-4], StringType())

#Convert Data types of locations and timestamps
# df = df.withColumn("started_at",df.started_at.cast(TimestampType())) \
#   .withColumn("ended_at",df.ended_at.cast(TimestampType())) \
#   .withColumn("start_lat",df.start_lat.cast(DecimalType(20, 10))) \
#   .withColumn("start_lng",df.start_lng.cast(DecimalType(20, 10))) \
#   .withColumn("end_lat",df.end_lat.cast(DecimalType(20, 10))) \
#   .withColumn("end_lng",df.end_lng.cast(DecimalType(20, 10))) \
#   .withColumn("trip_duration",col('ended_at').cast('long') - col('started_at').cast('long')) \
#   .withColumn("startBurough", burough_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("startManhattanNeighbourhood", manhattan_neighbourhood_udf(col('start_lat'), col('start_lng'), col('startBurough'))) \
#   .withColumn("startCounty", county_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("startCity", city_udf(col('start_lat'), col('start_lng'))) \
#   .withColumn("endBurough", burough_udf(col('end_lat'), col('end_lng'))) \
#   .withColumn("endManhattanNeighbourhood", manhattan_neighbourhood_udf(col('end_lat'), col('end_lng'), col('endBurough'))) \
#   .withColumn("endCounty", county_udf(col('end_lat'), col('end_lng'))) \
#   .withColumn("endCity", city_udf(col('end_lat'), col('end_lng'))) \

#Get informations from the start and end dates
df = df.withColumn("startDate", to_date(col('starttime'))) \
       .withColumn('startDOW', dayofweek(col('starttime'))) \
       .withColumn('startDOM', dayofmonth(col('starttime'))) \
       .withColumn('startDOY', dayofyear(col('starttime'))) \
       .withColumn('startHour', hour(col('starttime'))) \
       .withColumn('startMonth', month(col('starttime'))) \
       .withColumn('startQuarter', quarter(col('starttime'))) \
       .withColumn('startWOY', weekofyear(col('starttime'))) \
       .withColumn('startYear', year(col('starttime'))) \
       .groupby(col('startDate').alias('date'), \
                col('startYear').alias('year'), \
                col('startMonth').alias('month'), \
                col('startWOY').alias('week'), \
                col('startDOM').alias('day'), \
                col('startDOW').alias('DOW'), \
                col('startDOY').alias('DOY')) \
       .agg(count('bikeid')) \
       .withColumnRenamed('count(bikeid)', 'count') \
       .toPandas() \

print(df.columns)

df.to_csv('/content/data/aggRidership.csv', mode = 'a', header = False)

save_end = time()

print(f'''2013-20 Aggregated Data Processed and Saved in {(save_end - load_end)/60.0} mins''')       

2013-20 Data loaded to spark in 4.836398681004842 mins
Index(['date', 'year', 'month', 'week', 'day', 'DOW', 'DOY', 'count'], dtype='object')
2013-20 Aggregated Data Processed and Saved in 2.534510691960653 mins


#Load Weather and Bikestation Data

In [10]:
#read the weather and bike station data
bikeStation_df = pd.read_csv('/content/drive/MyDrive/BigDataProject/bikes_stations_data_21_22.csv')
weatherNYC_df = pd.read_csv('/content/drive/MyDrive/BigDataProject/NYC_weather_2013-22.csv')

#convert date to str
weatherNYC_df['date'] = weatherNYC_df['DATE'].astype(str)

#create new columns for bikestations
bikeStation_df['month'] = bikeStation_df['Month'].apply(lambda x: int(x.split('-')[1]))
bikeStation_df['year'] = bikeStation_df['Month'].apply(lambda x: int(x.split('-')[0]))

In [11]:
weatherNYC_df

Unnamed: 0,STATION,NAME,LATITUDE,LONGITUDE,ELEVATION,DATE,AWND,AWND_ATTRIBUTES,PGTM,PGTM_ATTRIBUTES,...,WT13_ATTRIBUTES,WT16,WT16_ATTRIBUTES,WT18,WT18_ATTRIBUTES,WT19,WT19_ATTRIBUTES,WT22,WT22_ATTRIBUTES,date
0,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2013-01-01,6.93,",,X",,,...,,,,,,,,,,2013-01-01
1,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2013-01-02,5.82,",,X",,,...,,,,,,,,,,2013-01-02
2,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2013-01-03,4.47,",,X",,,...,,,,,,,,,,2013-01-03
3,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2013-01-04,8.05,",,X",,,...,,,,,,,,,,2013-01-04
4,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2013-01-05,6.71,",,X",,,...,,,,,,,,,,2013-01-05
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3629,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2022-12-09,6.71,",,W",,,...,,,,,,,,,,2022-12-09
3630,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2022-12-10,7.16,",,W",,,...,,,,,,,,,,2022-12-10
3631,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2022-12-11,8.05,",,W",,,...,,,,,,,,,,2022-12-11
3632,USW00094728,"NY CITY CENTRAL PARK, NY US",40.77898,-73.96925,42.7,2022-12-12,5.82,",,W",1319.0,",,W",...,,,,,,,,,,2022-12-12


In [12]:
bikeStation_df

Unnamed: 0,Month,Stations,Avg active bikes,EoM fleet,Membership,month,year
0,2013-06,322.0,5130.0,6000.0,52130.0,6,2013
1,2013-07,328.0,5531.0,6000.0,66315.0,7,2013
2,2013-08,332.0,5681.0,6000.0,77138.0,8,2013
3,2013-09,332.0,5513.0,6000.0,85241.0,9,2013
4,2013-10,332.0,5623.0,6000.0,93124.0,10,2013
...,...,...,...,...,...,...,...
110,2022-08,1595.0,25080.0,25033.0,146910.0,8,2022
111,2022-09,1625.0,24514.0,24327.0,145188.0,9,2022
112,2022-10,1637.0,24703.0,24951.0,143537.0,10,2022
113,2022-11,1166.0,15442.0,14813.0,167556.0,11,2022


# Load aggregated Data from 2013 to 22

In [13]:
dailyRidership_df = pd.read_csv('/content/data/aggRidership.csv').sort_values(['year', 'month', 'Day']).dropna().reset_index(drop = True)

#Filter data from 2016 october and greater
dailyRidership_df = dailyRidership_df.loc[(dailyRidership_df.date >= '2016-10-01')].reset_index(drop = True)

dailyRidership_df['year'] = dailyRidership_df['year'].astype(int)
dailyRidership_df['month'] = dailyRidership_df['month'].astype(int)
dailyRidership_df['Day'] = dailyRidership_df['Day'].astype(int)
dailyRidership_df['DOW'] = dailyRidership_df['DOW'].astype(int)
dailyRidership_df['DOY'] = dailyRidership_df['DOY'].astype(int)
dailyRidership_df['week'] = dailyRidership_df['week'].astype(int)
dailyRidership_df

Unnamed: 0.1,Unnamed: 0,date,year,month,week,Day,DOW,DOY,count
0,191,2016-10-01,2016,10,39,1,7,275,39811
1,194,2016-10-02,2016,10,39,2,1,276,41023
2,188,2016-10-03,2016,10,40,3,2,277,56384
3,197,2016-10-04,2016,10,40,4,3,278,60379
4,192,2016-10-05,2016,10,40,5,4,279,65053
...,...,...,...,...,...,...,...,...,...
2212,582,2022-10-27,2022,10,43,27,5,300,118919
2213,580,2022-10-28,2022,10,43,28,6,301,109372
2214,579,2022-10-29,2022,10,43,29,7,302,107900
2215,599,2022-10-30,2022,10,43,30,1,303,98221


# Calendar Features

In [14]:
def add_calendar_features(df):
    df_new = df.copy()
    days_in_year = np.where(df["year"].apply(lambda y: calendar.isleap(int(y))), 366, 365)
    df_new['sin_doy'] = np.sin(2*np.pi*df_new["DOY"]/days_in_year)
    df_new['cos_doy'] = np.cos(2*np.pi*df_new["DOY"]/days_in_year)
    df_new['sin_dow'] = np.sin(2*np.pi*df_new["DOW"]/7)
    df_new['cos_dow'] = np.cos(2*np.pi*df_new["DOW"]/7)
    df_new = pd.concat([df_new, pd.get_dummies(df_new["DOW"], "dow")], axis=1)
    df_new = pd.concat([df_new, pd.get_dummies(df_new["month"], "month")], axis=1)
    df_new = pd.concat([df_new, pd.get_dummies(df_new["Day"], "day")], axis=1)
    df_new["holiday"] = [1 if d in us_holidays else 0 for d in df_new["date"]]
    df_new['date'] = df_new['date'].astype(str)
    return df_new

df_nyc = add_calendar_features(dailyRidership_df)

df_nyc

Unnamed: 0.1,Unnamed: 0,date,year,month,week,Day,DOW,DOY,count,sin_doy,...,day_23,day_24,day_25,day_26,day_27,day_28,day_29,day_30,day_31,holiday
0,191,2016-10-01,2016,10,39,1,7,275,39811,-0.999963,...,0,0,0,0,0,0,0,0,0,0
1,194,2016-10-02,2016,10,39,2,1,276,41023,-0.999668,...,0,0,0,0,0,0,0,0,0,0
2,188,2016-10-03,2016,10,40,3,2,277,56384,-0.999079,...,0,0,0,0,0,0,0,0,0,0
3,197,2016-10-04,2016,10,40,4,3,278,60379,-0.998195,...,0,0,0,0,0,0,0,0,0,0
4,192,2016-10-05,2016,10,40,5,4,279,65053,-0.997018,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2212,582,2022-10-27,2022,10,43,27,5,300,118919,-0.899631,...,0,0,0,0,1,0,0,0,0,0
2213,580,2022-10-28,2022,10,43,28,6,301,109372,-0.891981,...,0,0,0,0,0,1,0,0,0,0
2214,579,2022-10-29,2022,10,43,29,7,302,107900,-0.884068,...,0,0,0,0,0,0,1,0,0,0
2215,599,2022-10-30,2022,10,43,30,1,303,98221,-0.875892,...,0,0,0,0,0,0,0,1,0,0


#Change Date Format

In [15]:
df_nyc['date'] = df_nyc.apply(lambda x: str(datetime.datetime(x['year'], x['month'], x['Day']).strftime('%Y-%m-%d')), axis = 1)
df_nyc

Unnamed: 0.1,Unnamed: 0,date,year,month,week,Day,DOW,DOY,count,sin_doy,...,day_23,day_24,day_25,day_26,day_27,day_28,day_29,day_30,day_31,holiday
0,191,2016-10-01,2016,10,39,1,7,275,39811,-0.999963,...,0,0,0,0,0,0,0,0,0,0
1,194,2016-10-02,2016,10,39,2,1,276,41023,-0.999668,...,0,0,0,0,0,0,0,0,0,0
2,188,2016-10-03,2016,10,40,3,2,277,56384,-0.999079,...,0,0,0,0,0,0,0,0,0,0
3,197,2016-10-04,2016,10,40,4,3,278,60379,-0.998195,...,0,0,0,0,0,0,0,0,0,0
4,192,2016-10-05,2016,10,40,5,4,279,65053,-0.997018,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2212,582,2022-10-27,2022,10,43,27,5,300,118919,-0.899631,...,0,0,0,0,1,0,0,0,0,0
2213,580,2022-10-28,2022,10,43,28,6,301,109372,-0.891981,...,0,0,0,0,0,1,0,0,0,0
2214,579,2022-10-29,2022,10,43,29,7,302,107900,-0.884068,...,0,0,0,0,0,0,1,0,0,0
2215,599,2022-10-30,2022,10,43,30,1,303,98221,-0.875892,...,0,0,0,0,0,0,0,1,0,0


In [16]:
# Select Calendar features
calendar_features = ["year", "sin_doy", "cos_doy", "sin_dow", "cos_dow", "holiday"] \
    + [f"dow_{i+1}" for i in range(7)] \
    + [f"month_{i+1}" for i in range(12)]

calendar_features

['year',
 'sin_doy',
 'cos_doy',
 'sin_dow',
 'cos_dow',
 'holiday',
 'dow_1',
 'dow_2',
 'dow_3',
 'dow_4',
 'dow_5',
 'dow_6',
 'dow_7',
 'month_1',
 'month_2',
 'month_3',
 'month_4',
 'month_5',
 'month_6',
 'month_7',
 'month_8',
 'month_9',
 'month_10',
 'month_11',
 'month_12']

# Build Models


In [17]:
def mape(y_true, y_pred): 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def fit_predict(
    df,
    features,
    model="linear",
    scale=True,
    log=False,
    train_year_to=2022,
    #train_month_to=8,
    test_year=2022,
    #test_month_from = 9,
    degree=2,
    alphas=np.logspace(-2,2)
):
    df_train = df.loc[(df["year"] < train_year_to)]
    df_test = df.loc[(df["year"] >= test_year)]
    
    print(df_train.shape)
    print(df_test.shape)

    X = df_train[features]
    y = df_train["count"]
    
    if scale:
        scaler = StandardScaler()
        scaler.fit(X)
        X = scaler.transform(X)
    if log: y = np.log(y)
  
    if model == "linear":
        regr = LinearRegression()
    elif model == "polynomial":
        regr = make_pipeline(
            PolynomialFeatures(degree=degree),
            LinearRegression()
            )
    elif model == "ridge":
        regr = make_pipeline(
            PolynomialFeatures(degree=degree),
            RidgeCV(alphas=alphas, cv=TimeSeriesSplit())
            )
    else:
        assert False, f"Unknown model {model}"
        
    regr.fit(X, y)
    
    if model == "ridge":
        print(f"Best alpha: {regr['ridgecv'].alpha_}")
    
    X_test = df_test[features]
    if scale: X_test = scaler.transform(X_test) 
    y_train = regr.predict(X)
    y_test = regr.predict(X_test)
    if log: y_test, y_train = np.exp(y_test), np.exp(y_train)
        
    # prevent model from predicting negative values
    y_test, y_train = np.clip(y_test, 0, np.max(y_test)), np.clip(y_train, 0, np.max(y_train))
    
    print(f"Training set R2 score: {r2_score(df_train['count'], y_train)}")
    print(f"Test set R2 core: {r2_score(df_test['count'], y_test)}")
    print(f"Test set MAPE: {mape(df_test['count'], y_test)}")
    
    return df_train, df_test, y_test, y_train, regr

In [18]:
# Execute model with calendar features
df_train, df_test, y_test, y_train, regr = fit_predict(df_nyc, calendar_features, model="linear", scale=True)

(1913, 64)
(304, 64)
Training set R2 score: 0.6378272522289589
Test set R2 core: 0.5330957716420548
Test set MAPE: 33.57434209141628


# Weather Features



In [19]:
#Join Weather details
df_nyc = pd.merge(left=df_nyc, right=weatherNYC_df, left_on='date', right_on='date', how= 'inner')
df_nyc["PRCP_log"] = np.log(df_nyc["PRCP"] + 1)
df_nyc["SNOW_log"] = np.log(df_nyc["SNOW"] + 1)
df_nyc["SNWD_log"] = np.log(df_nyc["SNWD"] + 1)

In [20]:
weather_features = ["TMAX", "TMIN", "PRCP_log", "SNOW_log", "SNWD_log"]
wt_columns = ["WT01", "WT08"]
features = [*calendar_features, *weather_features]

df_train, df_test, y_test, y_train, regr = fit_predict(df_nyc, features, model="ridge", degree=2, alphas=np.logspace(1,3,20))

(1913, 123)
(304, 123)
Best alpha: 20.6913808111479
Training set R2 score: 0.8742445507680018
Test set R2 core: 0.864312438840875
Test set MAPE: 17.770484780541686


# System Features

In [21]:
#Bikestations
df_nyc = pd.merge(left=df_nyc, right=bikeStation_df, on=['month', 'year'], how= 'left')
df_nyc

Unnamed: 0.1,Unnamed: 0,date,year,month,week,Day,DOW,DOY,count,sin_doy,...,WT22,WT22_ATTRIBUTES,PRCP_log,SNOW_log,SNWD_log,Month,Stations,Avg active bikes,EoM fleet,Membership
0,191,2016-10-01,2016,10,39,1,7,275,39811,-0.999963,...,,,0.00000,0.0,0.0,2016-10,590.0,9475.0,9557.0,118950.0
1,194,2016-10-02,2016,10,39,2,1,276,41023,-0.999668,...,,,0.00000,0.0,0.0,2016-10,590.0,9475.0,9557.0,118950.0
2,188,2016-10-03,2016,10,40,3,2,277,56384,-0.999079,...,,,0.00000,0.0,0.0,2016-10,590.0,9475.0,9557.0,118950.0
3,197,2016-10-04,2016,10,40,4,3,278,60379,-0.998195,...,,,0.00000,0.0,0.0,2016-10,590.0,9475.0,9557.0,118950.0
4,192,2016-10-05,2016,10,40,5,4,279,65053,-0.997018,...,,,0.00000,0.0,0.0,2016-10,590.0,9475.0,9557.0,118950.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2212,582,2022-10-27,2022,10,43,27,5,300,118919,-0.899631,...,,,0.00000,0.0,0.0,2022-10,1637.0,24703.0,24951.0,143537.0
2213,580,2022-10-28,2022,10,43,28,6,301,109372,-0.891981,...,,,0.00000,0.0,0.0,2022-10,1637.0,24703.0,24951.0,143537.0
2214,579,2022-10-29,2022,10,43,29,7,302,107900,-0.884068,...,,,0.00000,0.0,0.0,2022-10,1637.0,24703.0,24951.0,143537.0
2215,599,2022-10-30,2022,10,43,30,1,303,98221,-0.875892,...,,,0.00000,0.0,0.0,2022-10,1637.0,24703.0,24951.0,143537.0


In [22]:
features = [*calendar_features, *weather_features, "EoM fleet"]
df_train, df_test, y_test, y_train, regr = fit_predict(df_nyc, features, model="ridge", log=False, degree=2, alphas=np.logspace(0,2))

(1913, 128)
(304, 128)
Best alpha: 100.0
Training set R2 score: 0.9087981634630531
Test set R2 core: 0.8500143716039421
Test set MAPE: 19.280609298587024


In [23]:
def plot_markers(fig, x, y, color, name):
    fig.add_trace(go.Scatter(x=x,
                             y=y,
                             mode='markers',
                             marker=dict(size=3, color=color),
                             name=name))
    
def plot_lines(fig, x, y, color, window, name):
    fig.add_trace(go.Scatter(x=x,
                             y=y,
                             mode='lines',
                             marker_color=color,
                             name=f'{name}({window}-day average)'))
    

def plot_test(df_test,
              y_test,
              title=None,
              scatter=True,
              ma=True,
              comparison=None,
              comparison_title=None,
              show=True):
    fig = go.Figure()
    fig.update_layout(
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=0.01
        )
    )

    df = df_test.copy()
    df["count_pred"] = y_test
    
    window = 14
    
    df['count_ma'] = df["count"].rolling(window).mean()    
    df['count_pred_ma'] = df["count_pred"].rolling(window).mean()
    
    if scatter:
        plot_markers(fig, df["date"], df["count"], "blue", "Actual")
        plot_markers(fig, df["date"], df["count_pred"], "red", "Predicted")
        
    if ma:
        plot_lines(fig, df["date"], df["count_ma"], "blue", window, "Actual")
        plot_lines(fig, df["date"], df["count_pred_ma"], "red", window, "Predicted")
    
    if comparison is not None:
        comparison = comparison.copy()[["Month", "Day", "count"]]
        comparison.rename(columns={"count": "comparison"}, inplace=True)
        df = df.merge(comparison, on=["Month", "Day"])        
        df["comparison_ma"] = df["comparison"].rolling(window).mean()
        if scatter: plot_markers(fig, df["date"], df["comparison"], "#fc8d59", comparison_title)
        if ma: plot_lines(fig, df["date"], df["comparison_ma"], "#fc8d59", window, comparison_title)

    fig.update_layout(
        xaxis_title="Date",
        yaxis_title="Trips",
        title_text=title
    )
    
    return fig
    
def plot_all(df_train, df_test, y_train, y_test, title = None):
    return plot_test(pd.concat([df_train, df_test]), np.append(y_train, y_test), title = title)

In [24]:
#Plot

fig = plot_all(df_train, df_test, y_train, y_test, 'Actual vs Predicted Demand Per Day')

fig.add_vrect(
    x0="2016-10-01", x1="2021-12-31",
    fillcolor="LightSalmon", opacity=0.5,
    layer="below", line_width=0,
    annotation_text="Training Data",
    annotation_position="bottom left"
)


fig.add_vrect(
    x0="2022-01-01", x1="2022-12-31",
    fillcolor="LightGrey", opacity=0.5,
    layer="below", line_width=0,
    annotation_text="Testing Data",
    annotation_position="bottom left"
)

#fig.write_image('/content/drive/MyDrive/BigDataProject/demand_forcast_plot.png')

fig.show()