# DSCI 617 Final Project
## Lauren Schmiedeler

In [None]:
import time

# create a function that returns the time elapsed given an initial time
def find_time_elapsed(t0):
    # find the time elapsed in minutes
    time_elapsed = (time.time() - t0) / 60
    units = "minutes"
    # convert the time elapsed to hours if time elapsed >= 1 hour
    if time_elapsed >= 60:
        time_elapsed = time_elapsed / 60
        units = "hours"
    # return a string that tells how much time has elapsed since the initial time
    return "time elapsed = " + str(round(time_elapsed, 5)) + " " + units

In [None]:
from google.colab import drive

# mount Google Drive to import the data files using Google Colab
# Google Colab handles hyperparameter tuning on a large dataset better than my laptop
drive.mount("/content/drive")

Mounted at /content/drive


### 1. Read 12 monthly datasets, `chicago_taxi_trips_2016_01`, `chicago_taxi_trips_2016_02`, …, `chicago_taxi_trips_2016_12`, into a Spark DataFrame (Hint: You can use `df = spark.read.csv('C:\\Users\\yliu3\\Documents\\Data Banks\\Regression\\chicago-taxi-rides-2016\\*.csv', header = True, inferSchema = True`) to read several csv files and combine them into a single DataFrame).

In [None]:
# install findspark and pyspark in Google Colab
!pip install findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=c4b7d48ca72c4cca22b9b404a9b2895d321724251d89902e6466e9ce88a19c2c
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e407

In [None]:
import findspark
from pyspark.sql import SparkSession

findspark.init()

spark = SparkSession\
    .builder\
    .appName("Final Project")\
    .getOrCreate()

seed = 100

In [None]:
import os
import re

t0_read_data = time.time()

# first, create a list of paths to the monthly csv files
paths = []
for file in os.listdir("drive/MyDrive/data"):
    if re.search("chicago_taxi_trips_2016_[0-9]{2}.csv", file):
        paths.append("drive/MyDrive/data/" + file)

# next, read in all the monthly csv files
df = spark.read.csv(paths, header = True, inferSchema = True)
print(df.show(10))
print("\nnumber of rows =", df.count(), "\n")

print(find_time_elapsed(t0_read_data))

+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+-------+---------------+----------------+----------------+-----------------+
|taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|pickup_census_tract|dropoff_census_tract|pickup_community_area|dropoff_community_area| fare|tips|tolls|extras|trip_total|payment_type|company|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|
+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+-------+---------------+----------------+----------------+-----------------+
|     85| 2016-01-13 06:15:00|2016-01-13 06:15:00|         180|       0.4|               null|                null|                   24|        

### 2. There are some missing values.  Please drop the NAs using `df = df.na.drop()`.

In [None]:
# first, select only the "fare", "trip_seconds", and "trip_miles" columns
df = df.select("fare", "trip_seconds", "trip_miles")
df.show(10)

+-----+------------+----------+
| fare|trip_seconds|trip_miles|
+-----+------------+----------+
|  4.5|         180|       0.4|
| 4.45|         240|       0.7|
|42.75|           0|       0.0|
|  7.0|         480|       1.1|
|10.25|         480|      2.71|
|17.75|        1080|       6.2|
| 45.0|        1500|      18.4|
| 3.75|          60|       0.2|
|  5.0|         180|       0.0|
| 3.25|           0|       0.0|
+-----+------------+----------+
only showing top 10 rows



In [None]:
# next, drop the rows with NAs in the remaining columns
df = df.na.drop()
print("number of rows =", df.count())

number of rows = 19862606


In [None]:
# summarize the data
df.summary().show()

+-------+------------------+-----------------+------------------+
|summary|              fare|     trip_seconds|        trip_miles|
+-------+------------------+-----------------+------------------+
|  count|          19862606|         19862606|          19862606|
|   mean|13.892086664760871|767.0163579240307| 3.394684370219944|
| stddev|25.385934033731534|1060.416563035996|22.597176756854346|
|    min|               0.0|                0|               0.0|
|    25%|              6.25|              300|               0.1|
|    50%|               8.5|              540|               1.1|
|    75%|             14.25|              900|               2.7|
|    max|            9999.0|            86399|            3353.1|
+-------+------------------+-----------------+------------------+



### 3. You are asked to forecast `fare` using `trip_seconds`, `trip_miles` and build a linear regression with elastic net regularizers.

In [None]:
from pyspark.ml.feature import VectorAssembler

# first, assemble the features ("trip_seconds" and "trip_miles") into a "features" column using VectorAssembler
assembler = VectorAssembler(inputCols = ["trip_seconds", "trip_miles"], outputCol = "features")
df = assembler.transform(df)

# second, rename the "fare" column to "label"
df = df.withColumnRenamed("fare", "label")

df.show(10)

+-----+------------+----------+-------------+
|label|trip_seconds|trip_miles|     features|
+-----+------------+----------+-------------+
|  4.5|         180|       0.4|  [180.0,0.4]|
| 4.45|         240|       0.7|  [240.0,0.7]|
|42.75|           0|       0.0|    (2,[],[])|
|  7.0|         480|       1.1|  [480.0,1.1]|
|10.25|         480|      2.71| [480.0,2.71]|
|17.75|        1080|       6.2| [1080.0,6.2]|
| 45.0|        1500|      18.4|[1500.0,18.4]|
| 3.75|          60|       0.2|   [60.0,0.2]|
|  5.0|         180|       0.0|  [180.0,0.0]|
| 3.25|           0|       0.0|    (2,[],[])|
+-----+------------+----------+-------------+
only showing top 10 rows



In [None]:
t0_split = time.time()

# next, split the data into training and test sets
(train, test) = df.randomSplit([0.7, 0.3], seed = seed)
print("number of observations in the training set =", train.count())
print("number of observations in the test set =", test.count(), "\n")

print(find_time_elapsed(t0_split))

number of observations in the training set = 13902282
number of observations in the test set = 5960324 

time elapsed = 3.13004 minutes


In [None]:
# create a function that fits the given model on the training data and returns the predictions made on the test data
def fit_and_predict(model, train, test):
    # find the initial time
    t0 = time.time()
    
    # fit the model on the training data
    model_fit = model.fit(train)
    
    # make predictions on the test data
    model_predict = model_fit.transform(test).select("prediction", "label")
    
    # print the time elapsed
    print(find_time_elapsed(t0))
    
    # return the predictions made on the test data
    return model_predict

In [None]:
from pyspark.ml.regression import LinearRegression

lrPredictions = fit_and_predict(LinearRegression(elasticNetParam = 0.5), train, test)

time elapsed = 3.34559 minutes


### 4. You are asked to forecast `fare` using `trip_seconds`, `trip_miles` and build a simple tree model.

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dtPredictions = fit_and_predict(DecisionTreeRegressor(seed = seed), train, test)

time elapsed = 6.8952 minutes


### 5. You are asked to forecast `fare` using `trip_seconds`, `trip_miles` and build a random forest model.

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rfPredictions = fit_and_predict(RandomForestRegressor(seed = seed), train, test)

time elapsed = 10.76069 minutes


### 6. You are asked to forecast `fare` using `trip_seconds`, `trip_miles` and build a gradient-boosted tree.

In [None]:
from pyspark.ml.regression import GBTRegressor

gbtPredictions = fit_and_predict(GBTRegressor(seed = seed), train, test)

time elapsed = 1.1932 hours


### 7. Which model do you recommend? Justify your answer.

**I recommend the decision tree model.**

The linear regression model is slightly worse than the other models based on RMSE and quite a bit worse based on $R^2$.  The other three models perform similarly according to both RMSE and $R^2$ with the gradient-boosted tree model performing slightly better than the decision tree and random forest models according to both of these metrics.  

The decision tree model is simpler than both the random forest and gradient-boosted tree models, so I would recommend this model.  A simpler model is particularly helpful when the dataset is as large as the one in this project.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

t0_evaluate = time.time()

# create a DataFrame with metrics (RMSE and R^2) for each model
metrics = []
models = ["LinearRegression", "DecisionTreeRegressor", "RandomForestRegressor", "GBTRegressor"]
predictions = [lrPredictions, dtPredictions, rfPredictions, gbtPredictions]
for i in range(len(models)):
    evaluator = RegressionEvaluator()
    model_predict = predictions[i]
    rmse = round(evaluator.evaluate(model_predict, {evaluator.metricName: "rmse"}), 5)
    r2 = round(evaluator.evaluate(model_predict, {evaluator.metricName: "r2"}), 5)
    metrics.append([models[i], rmse, r2])
metrics_df = spark.createDataFrame(metrics, ["model", "RMSE", "R2"])
# sort the metrics DataFrame by RMSE
metrics_df.sort(["RMSE"], ascending = True).show(truncate = False)

print(find_time_elapsed(t0_evaluate))

+---------------------+--------+-------+
|model                |RMSE    |R2     |
+---------------------+--------+-------+
|GBTRegressor         |23.25334|0.19844|
|RandomForestRegressor|23.35318|0.19154|
|DecisionTreeRegressor|23.35943|0.19111|
|LinearRegression     |24.86094|0.08378|
+---------------------+--------+-------+

time elapsed = 13.67705 minutes


### 8. Please perform hyper parameter tuning on the model you selected in step 7. (Since it is a huge dataset, you may use 1 parameter for each hyperparameter to save time.)

In [None]:
from pandas.io.formats.info import Dtype
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

t0_cv = time.time()

# use the model selected in step 7
dt = DecisionTreeRegressor(seed = seed)
# create a pipeline with one stage (the model)
pipeline = Pipeline(stages = [dt])

parameters = ParamGridBuilder() \
                .addGrid(dt.minInstancesPerNode, [1, 2, 4]) \
                .addGrid(dt.maxDepth, [5, 10]) \
                .build()

# perform cross validation to find the best parameters
cv = CrossValidator(estimator = pipeline, estimatorParamMaps = parameters, evaluator = RegressionEvaluator(), numFolds = 3, seed = seed)
cv_model = cv.fit(train)
print("best model:", cv_model.bestModel.stages[-1], "\n")
print("minInstancesPerNode =", cv_model.bestModel.stages[-1].getMinInstancesPerNode(), "\n")
print("maxDepth =", cv_model.bestModel.stages[-1].getMaxDepth(), "\n")

print(find_time_elapsed(t0_cv))

best model: DecisionTreeRegressionModel: uid=DecisionTreeRegressor_8551eaf5fbe3, depth=10, numNodes=631, numFeatures=2 

minInstancesPerNode = 4 

maxDepth = 10 

time elapsed = 56.94527 minutes


In [None]:
# evaluate the best model (find RMSE and R^2)
evaluator = RegressionEvaluator()
predictions = cv_model.bestModel.transform(test)
rmse = round(evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}), 5)
print("RMSE =", rmse)
r2 = round(evaluator.evaluate(predictions, {evaluator.metricName: "r2"}), 5)
print("R^2 =", r2)

RMSE = 23.24027
R^2 = 0.19934


In [None]:
# display the predictions for the 10 largest labels
# these predictions are very off and are increasing the RMSE
predictions.sort(["label"], ascending = False).select(["label", "prediction"]).show(10)

+-------+------------------+
|  label|        prediction|
+-------+------------------+
|9500.45|  40.2066219178905|
|9476.21| 57.00532193803531|
|9300.45| 35.75285125770721|
|9276.62| 57.00532193803531|
|9026.31| 26.85222517245868|
|9002.29| 37.87905074626866|
|9001.52|15.904370564212362|
|9001.17| 9.123955842988858|
| 9001.0|12.069101031367095|
|9000.62|7.2128795830498325|
+-------+------------------+
only showing top 10 rows



### 9. Perform k-means clustering using features `trip_seconds`, `trip_miles`, and `fare`. Please recommend the optimal number of clusters and justify your answer.

**I would recommend 5 clusters.**

The best silhouette value results from k = 2, and the second-best silhouette value results from k = 5.

Considering that the training data contains almost 14 million observations, I would recommend 5 clusters.  Clustering a dataset this large into 5 clusters seems more reasonable than clustering it into only 2 clusters.

In [None]:
from pyspark.ml.feature import MinMaxScaler

# first, assemble the "trip_seconds", "trip_miles", and "label" ("fare") columns into a "k_means_features" column using VectorAssembler
assembler = VectorAssembler(inputCols = ["trip_seconds", "trip_miles", "label"], outputCol = "k_means_features")
train_k_means = assembler.transform(train)

# second, scale the "k_means_features" column so that each feature value is between 0 and 1
scaler = MinMaxScaler(inputCol = "k_means_features", outputCol = "k_means_features_scaled", min = 0, max = 1)
train_k_means = scaler.fit(train_k_means).transform(train_k_means)
train_k_means = train_k_means.select(["label", "trip_seconds", "trip_miles", "k_means_features", "k_means_features_scaled"])
train_k_means.sort(["label"], ascending = False).show(10, truncate = False)

+-------+------------+----------+-----------------------+---------------------------------------------------------------+
|label  |trip_seconds|trip_miles|k_means_features       |k_means_features_scaled                                        |
+-------+------------+----------+-----------------------+---------------------------------------------------------------+
|9999.0 |12          |0.0       |[12.0,0.0,9999.0]      |[1.388904964177826E-4,0.0,1.0]                                 |
|9890.12|11820       |161.6     |[11820.0,161.6,9890.12]|[0.13680713897151586,0.048194208344517014,0.9891109110911093]  |
|9800.45|1980        |0.0       |[1980.0,0.0,9800.45]   |[0.02291693190893413,0.0,0.9801430143014302]                   |
|9739.58|1140        |716.0     |[1140.0,716.0,9739.58] |[0.013194597159689347,0.2135337448927858,0.974055405540554]    |
|9600.48|3780        |0.0       |[3780.0,0.0,9600.48]   |[0.04375050637160152,0.0,0.9601440144014401]                   |
|9500.45|2220        |0.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

t0_k_means = time.time()

silhouettes = []
for k in range(2, 12):
    # create, fit, and transform the k-means model
    kmeans = KMeans(featuresCol = "k_means_features_scaled").setK(k).setSeed(seed)
    predictions = kmeans.fit(train_k_means).transform(train_k_means)
    
    # evaluate the k-means model
    evaluator = ClusteringEvaluator(featuresCol = "k_means_features_scaled")
    silhouette = evaluator.evaluate(predictions)
    silhouettes.append([k, silhouette])
    
# create a DataFrame with the silhouette values corresponding to different values of k
silhouettes_df = spark.createDataFrame(silhouettes, ["k", "silhouette"])
silhouettes_df.sort(["k"], ascending = True).show()

print(find_time_elapsed(t0_k_means))

+---+------------------+
|  k|        silhouette|
+---+------------------+
|  2| 0.998739393565863|
|  3|0.7994544107024001|
|  4|0.7236816173458225|
|  5|0.8400285858935831|
|  6|0.7468181144931019|
|  7|0.6423728508376305|
|  8|0.7398369725001089|
|  9|0.6827657265896455|
| 10|0.6249826754905856|
| 11|0.6504570983637393|
+---+------------------+

time elapsed = 3.30081 hours


In [None]:
spark.stop()