# MIE1512H Project

### Topic: Predicting Station-Level Demand in Bike-Sharing Systems

Name: Tanya Tang<br>
Version: V1<br>
Date: March 22, 2020

___
### Project Overview

The goal of the paper this project is based on is to improve the effectiveness of station-level rebalancing activites within bike-sharing systems by obtaining more accurate minimum inventory levels per station. 

The authors used historical bike trip data along with hourly weather data to train several machine learning models. The model features fell within two categories, time-related features from the bike data (i.e. year, month, etc.) and weather-related features (i.e. precipitation, temperature, etc.). To reduce overfitting, the data is first transformed onto a reduced space using four different reduction techniques:
1. No reduction, baseline
2. Group all stations into one cluster
3. Kmeans - cluster stations into k clusters
4. Singular value decomposition

After reducing the problem, four different prediction methods were tested:
1. Linear regression
2. Multi-layer perceptron
3. Gradient boosted tree
4. Random forest

Once a prediction was made on the reduced problem, the reduction needed to be inverted to obtain the full problem predictions. Each reduction technique has its own specific inversion process. 

The solution techniques were tested and scored based on the performance of its generated minimum station inventory levels. 

In this project, a similar methodology will be followed, with modifications and additions on the feature construction aspect. Focusing on the city of San Francisco, historical bike data and weather data will be used in conjunction with crime data. Features from all three data sets will be constructed, and several prediction techniques will be tested. As will be further explained in later sections, the reduction/inversion step was omitted from this project due to its more theoretical nature. Thus, we tested three of the four prediction methods from the paper, linear regression, gradient-boosted tree, and random forest. As per feedback from V1, before working on the full dataset, this notebook first applies the three prediction methods we tested to the toy dataset prepared from V1. Then, we will prepare the full dataset in the same manner as V1 and test the three prediction models. 

___
### Project Versions/Timelines

#### V1

CRISP-DM Tasks:
* Data Understanding (collecting, describing, exploring, verifying)
* Data Preparation on Test Scale (selecting, cleaning, constructing, integrating, formatting)

Actual Timeline:
* Looking up potential datasets and collecting from online sources/deciding which datasets to use (0.5 hr)
* Exploring entries/schema of each chosen dataset to understand discrepancies and connections between datasets (1 hr)
* Loading/cleaning raw data within each dataset, taking a closer look at which entries are relevant (2 hr)
* Calculating connections between different datasets and joining tables to eventually form a single aggregated table containing all relevant information for a single entry (4 hrs)
* Verifying data preparation steps (i.e. tables joined correctly, entries were not lost throughout the preparation process, no null values exist) (1 hr)

#### V2

CRISP-DM Tasks:
* Data Preparation on Full Scale (selecting, cleaning, constructing, integrating, formatting)
* Modeling (selecting techniques, generating test design, building, assessing)

Planned Timeline:
* Recreate data preparation tasks from V1 on full dataset (1 hr)
* Research reduction/prediction techniques and how to apply them using python packages (1 hr)
* *Implement reduction techniques (4 hr) (did not complete)*
* Implement regression techniques (4 hr)
* *Implement inversion techniques (4 hr) (did not complete)*
* Test implemented models (1 hr)

Actual Timeline:
* Research prediction techniques and how to apply them using python package (0.5 hr)
* Apply ML models to dataset prepared in V1 (should have been included in V1) (3 hr)
* Recreate data preparation tasks from V1 on full dataset (2 hr)
* Implement regression techniques (3 hr)
* Apply implemented models to test data to make predictions (4 hr)
    * NOTE: Linear regression on all stations takes ~10 min to complete, gradient-boosted tree on all stations takes ~1 hr to complete, random forest on all stations takes ~30 min to complete

#### V3

CRISP-DM Tasks:
* Modeling (assessing)
* Evaluation (evaluting results, reviewing process, determining next steps)

Planned Timeline (Week 3: March 26 to April 2):
* Transform predictions into inventory model for test data time range (4 hr)
* Create visualizations and compare performance between models (3 hr)
* Review project activites to verify correctness (1 hr)
* Summarize project and list some potential next steps (1 hr)

#### F

Planned Timeline (Week 4: April 2 to April 9):
* Write report (6 hrs)

___
### Date Preparation

Data preparation will be completed on data from January 2018 to February 2020 in the same manner as was completed for the smaller dataset in V1. Following a similar partitioning as the paper, training will be done on data from January 2018 to July 2019, validation will be done on data from August to Octoher 2019, and testing will be done on data from November 2019 to January 2020. 

___
### Modeling

#### Linear Regression
* The linear regression model will be chosen from 6 different models via cross validations, the three regularization parameters are chosen from $(0.3, 0,5, 0.7)$ and the two elastic net parameters are chosen from $(0.5, 0.8)$
* These parameters were selected by taking the default value given by spark, and slightly varying that value
* The models will be evaluated by the built in RegressionEvaluator class in spark, giving an $R^2$ value

#### Gradient-Boosted Tree
* The gradient-boosted tree model will be chosen from 2 different models via cross validations, the two maximum depth parameters are chosen from $(3, 5)$
* These parameters were selected by taking the value from the paper (5) and giving a lower parameter option as we have not applied any reduction methods so our model is more prone to overfitting
* The models will be evaluated by the built in RegressionEvaluator class in spark, giving an $R^2$ value

#### Random Forest
* The random forest model will be chosen from 2 different models via cross validations, the two forest size (number of trees) parameters are chosen from $(50, 100)$
* These parameters were selected by taking the value from the paper (100), and giving a lower parameter option as we have not applied any reduction methods so our model is more prone to overgitting
* The models will be evaluated by the built in RegressionEvaluator class in spark, giving an $R^2$ value

First, we need to make sure we can apply all of our modelling techniques using MLlib to the toy dataset consisting of two months of data prepared in V1. We will now reload and prepare the data in the same manner as V1. 

In [1]:
# Add Java locations
import os
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home/"
os.environ["JRE_HOME"] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home/"

# Initialize spark
import findspark
findspark.init("/usr/local/Cellar/apache-spark@2.3.2/2.3.2/libexec/")
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("appName").getOrCreate()

Load data from previously cleaned **aggregateTripWeatherCrimeData** data. 

In [7]:
v1data = spark.read.parquet('resources/v1/v1_aggregate_trip_weather_crime_data.parquet')
v1data.createOrReplaceTempView('v1data')

Sample loaded data to make sure it's correct. 

In [8]:
spark.sql("""
SELECT * FROM v1data
LIMIT 10
""").toPandas()

Unnamed: 0,month,day,hour,station_name,lat,long,departure_count,arrival_count,wind_speed_rate,visibility,temperature,precipitation,num_incidents
0,1,26,18,10th Ave at E 15th St,37.792714,-122.24878,0,1,0.0,16093.0,10.0,0.0,0
1,2,5,17,10th Ave at E 15th St,37.792714,-122.24878,2,1,0.0,16093.0,14.4,0.0,0
2,2,11,16,10th Ave at E 15th St,37.792714,-122.24878,0,1,46.0,16093.0,9.4,0.0,0
3,1,8,18,10th St at Fallon St,37.797673,-122.262997,1,0,15.0,3219.0,11.1,18.0,0
4,1,17,21,10th St at Fallon St,37.797673,-122.262997,0,1,31.0,8047.0,13.9,0.0,0
5,1,20,0,10th St at Fallon St,37.797673,-122.262997,1,0,67.0,16093.0,12.2,0.0,0
6,1,24,10,10th St at Fallon St,37.797673,-122.262997,0,2,0.0,16093.0,7.8,0.0,0
7,1,29,23,10th St at Fallon St,37.797673,-122.262997,1,0,21.0,16093.0,13.9,0.0,0
8,2,20,14,10th St at Fallon St,37.797673,-122.262997,1,3,15.0,16093.0,2.2,0.0,0
9,1,2,11,11th St at Bryant St,37.77003,-122.411726,1,1,0.0,16093.0,10.0,0.0,0


The paper uses four reduction techniques to transform the data from its original dimension to a reduced dimension. After further investigation, we will not be pursuing these reduction method because the reduction and reconstruction techniques presented in the paper are fairly theoretical. Thus, we will now apply the prediction techniques. The first step to applying any prediction model is to transform the dataset into the appropriate format. 

There are two additional time features we need to add to this data, weekday and holiday indicators. Those indicators will be categorical, 0 if the day is a weekday or a holiday, or 1 if it is not. This data will be loaded in from a csv file and joined with the table above. We can also prune the lat/long columns as they will not be used as features. 

In [24]:
timeData = spark.read.csv('resources/v1/v1_time.csv',
                          header=True,
                          inferSchema=True)
timeData.createOrReplaceTempView('temp')
spark.sql("""
SELECT MONTH(TO_TIMESTAMP(date, "yyyy/mm/dd")) AS month, DAY(TO_TIMESTAMP(date, "yyyy/mm/dd")) AS day, weekday, holiday
FROM temp
""").createOrReplaceTempView('timeData')

In [86]:
mlData = spark.sql("""
SELECT a.month, a.day, hour, station_name, departure_count, arrival_count, wind_speed_rate, visibility, temperature, precipitation, num_incidents, weekday, holiday FROM
v1data AS a
JOIN
timeData AS b
ON
a.month == b.month
AND a.day == b.day
""")
mlData.show()

+-----+---+----+--------------------+---------------+-------------+---------------+----------+-----------+-------------+-------------+-------+-------+
|month|day|hour|        station_name|departure_count|arrival_count|wind_speed_rate|visibility|temperature|precipitation|num_incidents|weekday|holiday|
+-----+---+----+--------------------+---------------+-------------+---------------+----------+-----------+-------------+-------------+-------+-------+
|    1| 26|  18|10th Ave at E 15t...|              0|            1|            0.0|   16093.0|       10.0|          0.0|            0|      0|      0|
|    2|  5|  17|10th Ave at E 15t...|              2|            1|            0.0|   16093.0|       14.4|          0.0|            0|      1|      0|
|    2| 11|  16|10th Ave at E 15t...|              0|            1|           46.0|   16093.0|        9.4|          0.0|            0|      1|      0|
|    1|  8|  18|10th St at Fallon St|              1|            0|           15.0|    3219.0|

Now, we can apply the first prediction model, linear regression, to each station by partitioning the dataset by station. For these initial application tests on the V1 dataset, we will not be tuning any hyperparameters, but when we model the full dataset, we will create cross validation models for model selection. 

In [120]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Get station names
station_names = spark.sql("""
SELECT DISTINCT station_name
FROM v1data
""")

# Predict departure and arrival counts
for row in station_names.rdd.collect()[0:5]:
    
    # Assemble features into a single vector
    vectorA = VectorAssembler(inputCols=['month',
                                        'day',
                                        'hour',
                                        'wind_speed_rate',
                                        'visibility',
                                        'temperature',
                                        'precipitation',
                                        'num_incidents',
                                        'weekday',
                                        'holiday'],
                              outputCol='features')
    filteredStation = mlData.filter(mlData.station_name == row.__getattr__('station_name'))
    stationDF = vectorA.transform(filteredStation)
    stationDF = stationDF.select('features', 'departure_count', 'arrival_count')
    
    # Split into training and test sets
    splits = stationDF.randomSplit([0.7, 0.3])
    trainDF = splits[0]
    testDF = splits[1]
    
    # Train model and evaluate on test set (departure count)
    lr = LinearRegression(featuresCol='features',
                          labelCol='departure_count',
                          maxIter=100,
                          regParam=0.3,
                          elasticNetParam=0.8)
    lr_model = lr.fit(trainDF)
    lr_pred = lr_model.transform(testDF)
    lr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                      labelCol='departure_count',
                                      metricName='r2')
    print(f'R^2 value on predicting departure counts for test data on station {row}: ', lr_evaluator.evaluate(lr_pred))
    
    # Train model and evaluate on test set (arrival count)
    lr = LinearRegression(featuresCol='features',
                          labelCol='arrival_count',
                          maxIter=100,
                          regParam=0.3,
                          elasticNetParam=0.8)
    lr_model = lr.fit(trainDF)
    lr_pred = lr_model.transform(testDF)
    lr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                      labelCol='arrival_count',
                                      metricName='r2')
    print(f'R^2 value on predicting arrival counts for test data on station {row}: ', lr_evaluator.evaluate(lr_pred))

R^2 value on predicting departure counts for test data on station Row(station_name='19th St at Florida St'):  0.026830841201329392
R^2 value on predicting arrival counts for test data on station Row(station_name='19th St at Florida St'):  -0.0012861847934118043
R^2 value on predicting departure counts for test data on station Row(station_name='5th St at Brannan St'):  0.08627169183408467
R^2 value on predicting arrival counts for test data on station Row(station_name='5th St at Brannan St'):  0.04445117794212072
R^2 value on predicting departure counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.01602284041674107
R^2 value on predicting arrival counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.0026434217235493662
R^2 value on predicting departure counts for test data on station Row(station_name='Townsend St at 7th St'):  -1.480573444466608
R^2 value on predicting arrival counts for test data on station Row(station_name=

The next prediction method explored in the paper is a multi-layer perceptron model. Spark's ML library only appears to contain a multi-layer perceptron classifier model. Thus, we will not be able to implement this model as this project is dealing with a regression problem. Thus, we will skip to the third prediction method, a gradient-boosted tree. We will follow the same steps as above. 

In [126]:
from pyspark.ml.regression import GBTRegressor

# Predict departure and arrival counts
for row in station_names.rdd.collect()[0:5]:
    
    # Assemble features into a single vector
    vectorA = VectorAssembler(inputCols=['month',
                                        'day',
                                        'hour',
                                        'wind_speed_rate',
                                        'visibility',
                                        'temperature',
                                        'precipitation',
                                        'num_incidents',
                                        'weekday',
                                        'holiday'],
                              outputCol='features')
    filteredStation = mlData.filter(mlData.station_name == row.__getattr__('station_name'))
    stationDF = vectorA.transform(filteredStation)
    stationDF = stationDF.select('features', 'departure_count', 'arrival_count')
    
    # Split into training and test sets
    splits = stationDF.randomSplit([0.7, 0.3])
    trainDF = splits[0]
    testDF = splits[1]
    
    # Train model and evaluate on test set (departure count)
    gbtr = GBTRegressor(labelCol='departure_count',
                        featuresCol="features",
                        maxIter=10)
    gbtr_model = gbtr.fit(trainDF)
    gbtr_pred = gbtr_model.transform(testDF)
    gbtr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                         labelCol='departure_count',
                                         metricName='r2')
    print(f'R^2 value on predicting departure counts for test data on station {row}: ',
          gbtr_evaluator.evaluate(gbtr_pred))
    
    # Train model and evaluate on test set (arrival count)
    gbtr = GBTRegressor(labelCol='arrival_count',
                        featuresCol="features",
                        maxIter=10)
    gbtr_model = gbtr.fit(trainDF)
    gbtr_pred = gbtr_model.transform(testDF)
    gbtr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                         labelCol='arrival_count',
                                         metricName='r2')
    print(f'R^2 value on predicting arrival counts for test data on station {row}: ',
          gbtr_evaluator.evaluate(gbtr_pred))

R^2 value on predicting departure counts for test data on station Row(station_name='19th St at Florida St'):  -0.18645470031943523
R^2 value on predicting arrival counts for test data on station Row(station_name='19th St at Florida St'):  -0.42694145099486236
R^2 value on predicting departure counts for test data on station Row(station_name='5th St at Brannan St'):  -0.10472018786287962
R^2 value on predicting arrival counts for test data on station Row(station_name='5th St at Brannan St'):  0.3847710896270803
R^2 value on predicting departure counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.3035704844680538
R^2 value on predicting arrival counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.14558693311275883
R^2 value on predicting departure counts for test data on station Row(station_name='Townsend St at 7th St'):  0.31309933694012726
R^2 value on predicting arrival counts for test data on station Row(station_name='Tow

The last prediction method is a random forest model. Again, we will follow the same steps as above. 

In [127]:
from pyspark.ml.regression import RandomForestRegressor

# Predict departure and arrival counts
for row in station_names.rdd.collect()[0:5]:
    
    # Assemble features into a single vector
    vectorA = VectorAssembler(inputCols=['month',
                                        'day',
                                        'hour',
                                        'wind_speed_rate',
                                        'visibility',
                                        'temperature',
                                        'precipitation',
                                        'num_incidents',
                                        'weekday',
                                        'holiday'],
                              outputCol='features')
    filteredStation = mlData.filter(mlData.station_name == row.__getattr__('station_name'))
    stationDF = vectorA.transform(filteredStation)
    stationDF = stationDF.select('features', 'departure_count', 'arrival_count')
    
    # Split into training and test sets
    splits = stationDF.randomSplit([0.7, 0.3])
    trainDF = splits[0]
    testDF = splits[1]
    
    # Train model and evaluate on test set (departure count)
    rfr = GBTRegressor(labelCol='departure_count',
                       featuresCol="features")
    rfr_model = rfr.fit(trainDF)
    rfr_pred = rfr_model.transform(testDF)
    rfr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                        labelCol='departure_count',
                                        metricName='r2')
    print(f'R^2 value on predicting departure counts for test data on station {row}: ',
          rfr_evaluator.evaluate(rfr_pred))
    
    # Train model and evaluate on test set (arrival count)
    rfr = GBTRegressor(labelCol='arrival_count',
                       featuresCol="features")
    rfr_model = rfr.fit(trainDF)
    rfr_pred = rfr_model.transform(testDF)
    rfr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                        labelCol='arrival_count',
                                        metricName='r2')
    print(f'R^2 value on predicting arrival counts for test data on station {row}: ',
          rfr_evaluator.evaluate(rfr_pred))

R^2 value on predicting departure counts for test data on station Row(station_name='19th St at Florida St'):  0.07628345845233486
R^2 value on predicting arrival counts for test data on station Row(station_name='19th St at Florida St'):  -0.11294351616097265
R^2 value on predicting departure counts for test data on station Row(station_name='5th St at Brannan St'):  0.06837177822727059
R^2 value on predicting arrival counts for test data on station Row(station_name='5th St at Brannan St'):  0.32911018225041266
R^2 value on predicting departure counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.28320310797311876
R^2 value on predicting arrival counts for test data on station Row(station_name='College Ave at Harwood Ave'):  -0.49946007785063795
R^2 value on predicting departure counts for test data on station Row(station_name='Townsend St at 7th St'):  0.4926272783836336
R^2 value on predicting arrival counts for test data on station Row(station_name='Town

Now we have applied three prediction methods to the toy dataset, so we can move onto applying the entire data preparation/modelling framework developed in V1 and above to the full dataset. First, let's load all of the trip/crime/weather data from the beginning of January 2018 to the end of January 2020. 

In [2]:
data = []
for i in ['2018', '2019', '2020']:
    for j in ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']:
        if i == '2020' and j == '02':
            break
        filepath = f'resources/raw_data/{i}{j}-fordgobike-tripdata.csv'
        month_data = spark.read.csv(filepath,
                                    header=True,
                                    inferSchema=True,
                                    sep=',',
                                    mode='DROPMALFORMED')
        data.append(month_data)
for i in range(1, len(data)):
    data[0] = data[0].union(data[i])
data[0].createOrReplaceTempView('temp')
data[0].printSchema()

root
 |-- duration_sec: integer (nullable = true)
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: double (nullable = true)
 |-- start_station_longitude: double (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: double (nullable = true)
 |-- end_station_longitude: double (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- bike_share_for_all_trip: string (nullable = true)



In [8]:
spark.sql("""
SELECT * FROM
(SELECT CAST(start_time AS timestamp) AS start_time, CAST(end_time AS timestamp), start_station_name, start_station_latitude, start_station_longitude, end_station_name, end_station_latitude, end_station_longitude
FROM temp)
WHERE start_time IS NOT NULL
AND end_time IS NOT NULL
AND start_station_name IS NOT NULL
AND end_station_name IS NOT NULL
""").createOrReplaceTempView('tripData')

In [9]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import split

data = []
for i in ['2018', '2019', '2020']:
    filepath = f'resources/raw_data/sf_weather_{i}.csv'
    data.append(spark.read.csv(filepath,
                               header=True,
                               inferSchema=True,
                               sep=',',
                               mode='DROPMALFORMED'))
for i in range(1, len(data)):
    data[0] = data[0].union(data[i])
data[0].createOrReplaceTempView('temp')
weather_data = spark.sql("""
SELECT (DATE + INTERVAL '4' MINUTE) AS timestamp,
LATITUDE AS lat,
LONGITUDE AS long,
WND AS wind_speed_rate,
VIS AS visibility,
TMP AS temperature,
AA1 AS precipitation
FROM temp
WHERE SOURCE = 7 AND REPORT_TYPE = "FM-15"
""")
weather_data = weather_data.withColumn('wind_speed_rate', split('wind_speed_rate', '\,')[3])
weather_data = weather_data.withColumn('wind_speed_rate', weather_data['wind_speed_rate'].cast(IntegerType()))
weather_data = weather_data.withColumn('visibility', split('visibility', '\,')[0])
weather_data = weather_data.withColumn('visibility', weather_data['visibility'].cast(IntegerType()))
weather_data = weather_data.withColumn('temperature', split('temperature', '\,')[0])
weather_data = weather_data.withColumn('temperature', weather_data['temperature'].cast(IntegerType()))
weather_data = weather_data.withColumn('temperature', weather_data['temperature'] / 10)
weather_data = weather_data.withColumn('precipitation', split('precipitation', '\,')[1])
weather_data = weather_data.withColumn('precipitation', weather_data['precipitation'].cast(IntegerType()))
weather_data.createOrReplaceTempView('weatherData')
weather_data.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- wind_speed_rate: integer (nullable = true)
 |-- visibility: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- precipitation: integer (nullable = true)



In [10]:
filepath = 'resources/raw_data/sfpd_crime.csv'
crime_data = spark.read.csv(filepath,
                            header=True,
                            inferSchema=True,
                            sep=',',
                            mode='DROPMALFORMED')
crime_data.createOrReplaceTempView('temp')
crime_data = spark.sql("""
SELECT TO_TIMESTAMP(`Incident Datetime`, "yyyy/MM/dd hh:mm") AS incident_timestamp,
`Incident ID` AS incident_id,
`Report Type Code` AS report_code,
`Incident Code` AS incident_code,
`Incident Category` AS incident_category,
`Incident Subcategory` AS incident_subcategory,
`Resolution` AS resolution,
`Latitude` as lat,
`Longitude` as long
FROM temp
WHERE `Latitude` IS NOT NULL
AND `Incident Category` IS NOT NULL
""")
crime_data.createOrReplaceTempView('crimeData')
crime_data.printSchema()

root
 |-- incident_timestamp: timestamp (nullable = true)
 |-- incident_id: integer (nullable = true)
 |-- report_code: string (nullable = true)
 |-- incident_code: integer (nullable = true)
 |-- incident_category: string (nullable = true)
 |-- incident_subcategory: string (nullable = true)
 |-- resolution: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



Now, let's aggregate the loaded data into a single table. 

In [11]:
aggregate = spark.sql("""
SELECT
COALESCE(departure_count, 0) AS departure_count,
COALESCE(arrival_count, 0) AS arrival_count,
COALESCE(departure.year, arrival.year) AS year,
COALESCE(departure.hour, arrival.hour) AS hour,
COALESCE(departure.day, arrival.day) AS day,
COALESCE(departure.month, arrival.month) AS month,
COALESCE(departure.start_station_name, arrival.end_station_name) AS station_name,
COALESCE(departure.lat, arrival.lat) AS lat,
COALESCE(departure.long, arrival.long) AS long
FROM
    (SELECT COUNT(1) AS departure_count, hour, day, month, year, start_station_name, lat, long
    FROM
        (
        SELECT YEAR(start_time) AS year, MONTH(start_time) AS month, DAY(start_time) AS day, HOUR(start_time) AS hour, start_station_name, start_station_latitude AS lat, start_station_longitude AS long
        FROM tripData
        )
    GROUP BY start_station_name, lat, long, hour, day, month, year
    ORDER BY year, month, day, hour, start_station_name) AS departure
FULL OUTER JOIN
    (SELECT COUNT(1) AS arrival_count, hour, day, month, year, end_station_name, lat, long
    FROM
        (
        SELECT YEAR(end_time) AS year, MONTH(end_time) AS month, DAY(end_time) AS day, HOUR(end_time) AS hour, end_station_name, end_station_latitude AS lat, end_station_longitude AS long
        FROM tripData
        )
    GROUP BY year, month, day, hour, end_station_name, lat, long
    ORDER BY year, month, day, hour, end_station_name) AS arrival
ON
    departure.start_station_name = arrival.end_station_name AND
    departure.year = arrival.year AND
    departure.month = arrival.month AND
    departure.day = arrival.day AND
    departure.hour = arrival.hour
""")
aggregate.write.mode('overwrite').saveAsTable('aggregateTripData')

In [19]:
aggregate_trip_weather = spark.sql("""
SELECT *
FROM
    (
    SELECT b.year, b.month, b.day, b.hour, station_name, lat, long, departure_count, arrival_count, wind_speed_rate, visibility, temperature, precipitation
    FROM
        (SELECT YEAR(timestamp) AS year,
        MONTH(timestamp) AS month,
        DAY(timestamp) AS day,
        HOUR(timestamp) AS hour,
        wind_speed_rate,
        visibility,
        temperature,
        precipitation
        FROM weatherData) AS a
    FULL OUTER JOIN
        aggregateTripData AS b
    ON a.year = b.year
    AND a.month = b.month
    AND a.day = b.day
    AND a.hour = b.hour
    )
WHERE month IS NOT NULL
""")
aggregate_trip_weather.createOrReplaceTempView('aggregateTripWeatherData')

In [23]:
spark.sql("""
SELECT incident_category FROM crimeData
GROUP BY incident_category
""").show(60, False)

+--------------------------------------------+
|incident_category                           |
+--------------------------------------------+
|Vehicle Misplaced                           |
|Suspicious                                  |
|Forgery And Counterfeiting                  |
|Sex Offense                                 |
|Family Offense                              |
|Fire Report                                 |
|Assault                                     |
|Recovered Vehicle                           |
|Drug Violation                              |
|Robbery                                     |
|Motor Vehicle Theft?                        |
|Embezzlement                                |
|Vehicle Impounded                           |
|Missing Person                              |
|Rape                                        |
|Human Trafficking (B), Involuntary Servitude|
|Human Trafficking, Commercial Sex Acts      |
|Lost Property                               |
|Arson       

In [24]:
import pyspark.sql.functions as F

crime_data = spark.sql("""
SELECT * FROM crimeData
WHERE incident_category != "Vehicle Misplaced"
AND incident_category != "Forgery And Counterfeiting"
AND incident_category != "Fire Report"
AND incident_category != "Recovered Vehicle"
AND incident_category != "Vehicle Impounded"
AND incident_category != "Embezzlement"
AND incident_category != "Lost Property"
AND incident_category != "Suicide"
AND incident_category != "Other"
AND incident_category != "Gambling"
AND incident_category != "Warrant"
AND incident_category != "Courtesy Report"
AND incident_category != "Miscellaneous Investigation"
AND incident_category != "Non-Criminal"
AND incident_category != "Civil Sidewalks"
AND incident_category != "Case Closure"
AND incident_category != "Other Miscellaneous"
AND incident_category != "Missing Person"
AND incident_category != "Other Offenses"
""")
crime_data = crime_data.withColumn('incident_category', F.when(crime_data['incident_category'] == 'Motor Vehicle Theft?', 'Motor Vehicle Theft').otherwise(crime_data['incident_category']))
crime_data.createOrReplaceTempView('crimeData')
spark.sql("""
SELECT incident_category FROM crimeData
GROUP BY incident_category
""").show(50, False)

+--------------------------------------------+
|incident_category                           |
+--------------------------------------------+
|Suspicious                                  |
|Sex Offense                                 |
|Family Offense                              |
|Assault                                     |
|Drug Violation                              |
|Robbery                                     |
|Rape                                        |
|Human Trafficking (B), Involuntary Servitude|
|Human Trafficking, Commercial Sex Acts      |
|Arson                                       |
|Fraud                                       |
|Homicide                                    |
|Drug Offense                                |
|Weapons Offence                             |
|Burglary                                    |
|Human Trafficking (A), Commercial Sex Acts  |
|Traffic Violation Arrest                    |
|Traffic Collision                           |
|Weapons Carr

In [25]:
bike_stations = spark.sql("""
SELECT DISTINCT start_station_name AS name, start_station_latitude AS lat, start_station_longitude AS long
FROM tripData
""")
bike_stations.createOrReplaceTempView('bikeStations')

In [26]:
crime_to_station = spark.sql("""
SELECT *
FROM
    (
    SELECT name, incident_timestamp, incident_id, distance, DENSE_RANK() OVER (PARTITION BY incident_timestamp, incident_id ORDER BY distance) AS rank
    FROM
        (
        SELECT a.name, b.incident_timestamp, b.incident_id, 
        acos(sin(radians(a.lat)) * sin(radians(b.lat)) +
                cos(radians(a.lat)) * cos(radians(b.lat)) *
                cos(radians(a.long - b.long))) * 6372.8 AS distance
        FROM
            bikeStations AS a
        CROSS JOIN
            crimeData AS b
        )
    )
WHERE rank = 1
""")
crime_to_station.write.mode('overwrite').saveAsTable('crimeToStation')

In [32]:
aggregate_tripWeatherCrime = spark.sql("""
SELECT *
FROM
    (
    SELECT year, month, day, hour, station_name, lat, long, departure_count, arrival_count, wind_speed_rate, visibility, temperature, precipitation, num_incidents
    FROM
        (SELECT COUNT(1) AS num_incidents, name, DATE(incident_timestamp) AS date
        FROM crimeToStation
        GROUP BY date, name) AS a
    FULL OUTER JOIN
        aggregateTripWeatherData AS b
    ON CAST(CONCAT(year, "-", month, "-", day) AS date) = DATE_ADD(date, 1)
    AND a.name = b.station_name
    )
WHERE month IS NOT NULL
""")
aggregate_tripWeatherCrime = aggregate_tripWeatherCrime.withColumn('num_incidents', F.when(aggregate_tripWeatherCrime['num_incidents'].isNull(), 0).otherwise(aggregate_tripWeatherCrime['num_incidents']))
aggregate_tripWeatherCrime.coalesce(1).write.format('parquet').mode('overwrite').save('aggregateTripWeatherCrimeData.parquet')

In [35]:
print(f'There are now {aggregate_tripWeatherCrime.count()} points in the final aggregated dataset')

There are now 2055941 points in the final aggregated dataset


Next, we need to split the data into the training/validation/test sets and save as parquet files so we can easily access the sets during modelling. 

In [38]:
spark.read.parquet('resources/v2/v2_aggregate_trip_weather_crime_data.parquet').createOrReplaceTempView('v2data')

In [53]:
timeData = spark.read.csv('resources/v2/v2_time.csv',
                          header=True,
                          inferSchema=True)
timeData.createOrReplaceTempView('temp')
spark.sql("""
SELECT TO_TIMESTAMP(date, "yyyy/mm/dd") AS date, weekday, holiday
FROM temp
""").createOrReplaceTempView('timeData')
mlData = spark.sql("""
SELECT a.year, a.month, a.day, hour, station_name, departure_count, arrival_count, wind_speed_rate, visibility, temperature, precipitation, num_incidents, weekday, holiday
FROM
v2data AS a
JOIN
timeData AS b
ON a.year == YEAR(b.date)
AND a.month == MONTH(b.date)
AND a.day == DAY(b.date)
""")
mlData.createOrReplaceTempView('v2data_withtime')
mlData.show()

+----+-----+---+----+--------------------+---------------+-------------+---------------+----------+-----------+-------------+-------------+-------+-------+
|year|month|day|hour|        station_name|departure_count|arrival_count|wind_speed_rate|visibility|temperature|precipitation|num_incidents|weekday|holiday|
+----+-----+---+----+--------------------+---------------+-------------+---------------+----------+-----------+-------------+-------------+-------+-------+
|2018|    2|  8|  12|10th Ave at E 15t...|              0|            1|             15|     16093|       11.1|            0|            0|      1|      0|
|2018|    5| 15|  21|10th Ave at E 15t...|              0|            1|             77|     16093|       18.9|            0|            0|      1|      0|
|2018|    6|  4|   9|10th Ave at E 15t...|              1|            0|             77|     16093|       14.4|            0|            0|      1|      0|
|2018|    8| 21|   8|10th Ave at E 15t...|              1|      

In [58]:
spark.sql("""
SELECT * FROM v2data_withtime
WHERE (year = 2018 AND month = 1)
OR (year = 2018 AND month = 2)
OR (year = 2018 AND month = 3)
OR (year = 2018 AND month = 4)
OR (year = 2018 AND month = 5)
OR (year = 2018 AND month = 6)
OR (year = 2018 AND month = 7)
OR (year = 2018 AND month = 8)
OR (year = 2018 AND month = 9)
OR (year = 2018 AND month = 10)
OR (year = 2018 AND month = 11)
OR (year = 2018 AND month = 12)
OR (year = 2019 AND month = 1)
OR (year = 2019 AND month = 2)
OR (year = 2019 AND month = 3)
OR (year = 2019 AND month = 4)
OR (year = 2019 AND month = 5)
OR (year = 2019 AND month = 6)
OR (year = 2019 AND month = 7)
""").coalesce(1).write.format('parquet').mode('overwrite').save('training.parquet')
spark.sql("""
SELECT * FROM v2data_withtime
WHERE (year = 2019 AND month = 8)
OR (year = 2019 AND month = 9)
""").coalesce(1).write.format('parquet').mode('overwrite').save('validation.parquet')
spark.sql("""
SELECT * FROM v2data_withtime
WHERE (year = 2019 AND month = 10)
OR (year = 2019 AND month = 11)
OR (year = 2019 AND month = 12)
OR (year = 2020 AND month = 1)
""").coalesce(1).write.format('parquet').mode('overwrite').save('test.parquet')

All the datasets are prepared so we are ready to move onto the modelling step. First, we need to load all of the training/validation/testing sets. Any station we model needs to have data points present in all three sets. Also, since we are creating models for each station, we need to make sure to only model stations where we have at least 100 data points in the training set. Any station not matching these requirements will be given constant inventory levels; we can assume they are not very popular stations and a constant inventory level will be sufficient. 

In [2]:
training = spark.read.parquet('resources/v2/training.parquet')
training = training.dropna()
validation = spark.read.parquet('resources/v2/validation.parquet')
validation = validation.dropna()
testing = spark.read.parquet('resources/v2/test.parquet')
testing = testing.dropna()
training_station_names = training.select('station_name').distinct().collect()
valid_station_names = list(map(lambda x : x.__getattr__('station_name'),
                               validation.select('station_name').distinct().collect()))
test_station_names = list(map(lambda x : x.__getattr__('station_name'),
                              testing.select('station_name').distinct().collect()))
station_names = []
for row in training_station_names:
    rowname = row.__getattr__('station_name')
    if rowname in valid_station_names and rowname in test_station_names:
        if training.filter(training.station_name == rowname).count() >= 100:
            station_names.append(rowname)

Linear regression to predict departure counts. 

In [82]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

f = open('resources/validations/departure_linear_regression_validation.txt', 'w+')
for rowname in station_names:
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'departure_count').withColumnRenamed('departure_count',
                                                                      'label')
    lr = LinearRegression() 
    paramGrid = ParamGridBuilder().addGrid(lr.regParam,
                                           [0.3, 0.5, 0.7]).addGrid(lr.elasticNetParam,
                                                                    [0.5, 0.8]).build()
    cvModel = CrossValidator(estimator=lr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
             str(RegressionEvaluator(predictionCol='prediction',
                                     labelCol='departure_count',
                                     metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/lr/departure/{rowname}',
                                                      format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Linear regression to predict arrival counts. 

In [84]:
f = open('resources/validations/arrival_linear_regression_validation.txt', 'w+')
for rowname in station_names: 
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'arrival_count').withColumnRenamed('arrival_count',
                                                                    'label')
    lr = LinearRegression() 
    paramGrid = ParamGridBuilder().addGrid(lr.regParam,
                                           [0.3, 0.5, 0.7]).addGrid(lr.elasticNetParam,
                                                                    [0.5, 0.8]).build()
    cvModel = CrossValidator(estimator=lr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
             str(RegressionEvaluator(predictionCol='prediction',
                                     labelCol='arrival_count',
                                     metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/lr/arrival/{rowname}',
                                                      format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Gradient boosted forest to predict departure counts. 

In [9]:
f = open('resources/validations/departure_gradient_boosted_forest_validation.txt', 'w+')
for rowname in station_names:
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'departure_count').withColumnRenamed('departure_count',
                                                                      'label')
    gbtr = GBTRegressor()
    paramGrid = ParamGridBuilder().addGrid(gbtr.maxDepth,
                                           [3, 5]).build()
    cvModel = CrossValidator(estimator=gbtr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
             str(RegressionEvaluator(predictionCol='prediction',
                                     labelCol='departure_count',
                                     metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/gbtr/departure/{rowname}', format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Gradient boosted forest to predict arrival counts. 

In [3]:
f = open('resources/validations/arrival_gradient_boosted_forest_validation.txt', 'w+')
for rowname in station_names:
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'arrival_count').withColumnRenamed('arrival_count',
                                                                    'label')
    gbtr = GBTRegressor()
    paramGrid = ParamGridBuilder().addGrid(gbtr.maxDepth,
                                           [3, 5]).build()
    cvModel = CrossValidator(estimator=gbtr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
             str(RegressionEvaluator(predictionCol='prediction',
                                     labelCol='arrival_count',
                                     metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/gbtr/arrival/{rowname}', format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Random forest to predict departure counts. 

In [12]:
f = open('resources/validations/departure_random_forest_validation.txt', 'w+')
for rowname in station_names:
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'departure_count').withColumnRenamed('departure_count',
                                                                      'label')
    rfr = RandomForestRegressor()
    paramGrid = ParamGridBuilder().addGrid(rfr.numTrees,
                                           [50, 100]).build()
    cvModel = CrossValidator(estimator=rfr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
            str(RegressionEvaluator(predictionCol='prediction',
                                    labelCol='departure_count',
                                    metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/rfr/departure/{rowname}', format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Random forest to predict arrival counts. 

In [3]:
f = open('resources/validations/arrival_random_forest_validation.txt', 'w+')
for rowname in station_names:
    print('.', end='')
    filteredTraining = training.filter(training.station_name == rowname)
    filteredValid = validation.filter(validation.station_name == rowname)
    filteredTest = testing.filter(testing.station_name == rowname)
    vectorA = VectorAssembler(inputCols=['month',
                                         'day',
                                         'hour',
                                         'wind_speed_rate',
                                         'visibility',
                                         'temperature',
                                         'precipitation',
                                         'num_incidents',
                                         'weekday',
                                         'holiday'],
                              outputCol='features')
    stationDF = vectorA.transform(filteredTraining)
    stationDF = stationDF.select('features',
                                 'arrival_count').withColumnRenamed('arrival_count',
                                                                    'label')
    rfr = RandomForestRegressor()
    paramGrid = ParamGridBuilder().addGrid(rfr.numTrees,
                                           [50, 100]).build()
    cvModel = CrossValidator(estimator=rfr,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             numFolds=3).fit(stationDF)
    validDF = vectorA.transform(filteredValid)
    validPred = cvModel.transform(validDF)
    f.write('R^2 value of cvModel on validation set ' + 
            str(RegressionEvaluator(predictionCol='prediction',
                                    labelCol='arrival_count',
                                    metricName='r2').evaluate(validPred)) + '\n')
    testDF = vectorA.transform(filteredTest)
    testPred = cvModel.transform(testDF)
    testPred.coalesce(1).write.mode('overwrite').save(f'resources/rfr/arrival/{rowname}', format='json')
f.close()

.....................................................................................................................................................................................................................................................................................................................................................

Looking through the $R^2$ values of the validation tests, we can see that gradient-boosted tree and random forest are both performing far better than linear regression. However, we can also see that for all three methods, there is quite a large range of $R^2$ values across all stations. This could be due to a not insignificant subset of station being  used on a sporadic basis. The extremely popular stations may have high correlations to the selected features, but less popular stations may be used more randomly. In V3, we will analyze these results, create visualizations using the data we have generated from these models, and pose hypotheses about the data. 

___
### Bibliography

\[1\] P. Hulot, D. Aloise, and S.D. Jena, "Towards Station-Level Demand Prediction for Effective Rebalancing in Bike-Sharing Systems," In KDD'18: Proceedings of the 24th ACM SIGKDD International Conference on Knowledge Discover \& Data Mining, 2018, pp. 378-386. 