# Project: Machine Learning with Spark
# By: Phuong NGUYEN & Majda EL MOUSSAOUI

#### Introduction

The objective of this mini-project is to use different Spark machine learning libraries to build a predictive model.

The provided dataset comes from [UCI Machine Learning repository](https://archive.ics.uci.edu/ml/datasets/bike+sharing+dataset). It contains the hourly and daily count of rental bikes between years 2011 and 2012 in Capital bikeshare system with the corresponding weather and seasonal information.

The task of this project is to predict hourly bike demand based on the provided features. Since the target variable is known, supervised learning is thus used.

As the target is continuous, we will build several regression models, tune them and compare the results.

We will proceed the project with the following steps:

1. Load and preprocess the data
2. Build a first linear regression model
3. Tune linear regression model
4. Result analysis
5. Feature engineering
6. Try other regression models

#### 1. Load and preprocess the data

The dataset is provided under csv format. The following code loads it by keeping the header and the original schema.

In [5]:
data = spark.read.csv('/FileStore/tables/ml/Bike_Rental_UCI_dataset.csv', header=True, inferSchema=True)

Let's have a look at 5 first rows of the dataset.

In [7]:
data.show(5)

According to [UCI Machine Learning repository](https://archive.ics.uci.edu/ml/datasets/bike+sharing+dataset), here is a short description of each feature in the dataset:

- season : season (1: winter, 2: spring, 3: summer, 4: fall)
- yr : year (0: 2011, 1: 2012)
- mnth : month (1 to 12)
- hr : hour (0 to 23)
- holiday : whether day is holiday or not (extracted from [Web Link](https://dchr.dc.gov/page/holiday-schedules))
- workingday : if day is neither weekend nor holiday is 1, otherwise is 0.
- weathersit :
  - 1: Clear, Few clouds, Partly cloudy, Partly cloudy
  - 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
  - 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
  - 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
- temp : Normalized temperature in Celsius. The values are derived via (t-t_min)/(t_max-t_min), t_min=-8, t_max=+39 (only in hourly scale)
- hum: Normalized humidity. The values are divided to 100 (max)
- windspeed: Normalized wind speed. The values are divided to 67 (max)
- dayOfWeek: day of the week
- demand: count of total rental bikes (TARGET VARIABLE)

In [9]:
# print out the dataframe schema, to ensure that all variables are in correct data type
data.printSchema()

Almost all columns are of numerical values, except dayOfWeek. In order to get the data prepared for machine learning task, we need to encode days of week with numerical values. 
Let's check the different values in dayOfWeek column.

In [11]:
data.select("dayOfWeek").distinct().show()

Hereunder, we create a mapping dictionary to map each value in dayOfWeek column to a numerical value. Monday is encoded as 0, Tuesday is encoded as 1 and so on.

In [13]:
from itertools import chain
from pyspark.sql.functions import create_map, lit

# create a dictionary that maps each day of week to a numerical value
mapping = {'Mon': 0,
           'Tue': 1,
           'Wed': 2,
           'Thr': 3,
           'Fri': 4,
           'Sat': 5,
           'Sun': 6}

# create a new column named day_cat
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

data = data.withColumn('day_cat', mapping_expr[data['dayOfWeek']])

Let's have a look at the data after encoding dayOfWeek.

In [15]:
data.show(5)

In order to train machine learning models, we need a transformer that combines a list of columns into a single vector column. For this purpose, we use VectorAssembler, that takes all the features (except dayOfWeek) as input and outputs a new column named "features".

In [17]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer

In [18]:
# create a feature vector
vectorAssembler = VectorAssembler(
  inputCols= [
    'season',
    'yr',
    'mnth',
    'hr',
    'holiday',
    'workingday',
    'weathersit',
    'temp',
    'hum',
    'windspeed',
    'days',
    'day_cat'
    ],
   outputCol = 'features'
 )

output = vectorAssembler.transform(data)

Have a look at the features and target after applying VectorAssembler.

In [20]:
output.select("features", "demand").show(10, truncate=False)

We split data into train and test set.

In [22]:
train, test = output.randomSplit([0.8, 0.2])
print ("We have %d training examples and %d test examples." % (train.count(), test.count()))

In [23]:
train.cache()
test.cache()

#### 2. Build a first linear regression model

In this section, we build a first linear regression model with some pre-defined hyperparameters. After fitting the model into training set, we will display the evaluation metrics (Mean Absolute Error and R2) on training set and on test set.

In [26]:
# import Linear Regression module
from pyspark.ml.regression import LinearRegression

In [27]:
# initialize Linear Regression model with some hyperparameters
lr = LinearRegression(maxIter=100, elasticNetParam=0.3, featuresCol="features", labelCol="demand")

In [28]:
# fit to training set
lrm = lr.fit(train)

In [29]:
# print out MAE and R2 on train data
trainingSummary = lrm.summary
print("Mean Absolute Error (MAE) on train data = %f" % trainingSummary.meanAbsoluteError)
print("R Squared (R2) on train data = %f" % trainingSummary.r2)

MAE measures the absolute differences between predicted values by the model and the actual values. However, MAE alone is meaningless until we compare with the actual “demand” value, such as mean and standard deviation.

In [31]:
train.select("demand").describe().show()

It can be seen that the demand in training set has a mean of 188.9 and a standard deviation of 180.3. While our Mean Absolute Error is 105.7 which is lower than standard deviation. This is a not bad result.

However, R2 square is 0.387. This metric indicates that in our model, approximate 38.7% of the variability in “demand” can be explained using the model. This is an average result and there is still room for improvement.

Next, we will make predictions on the test data and print out MAE and R2.

In [33]:
# make predictions on test data
lr_preds = lrm.transform(test)

lr_preds.select("prediction", "demand", "features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction",\
                                   labelCol="demand")

# print out MAE and R2 on test data
print("Mean Absolute Error (MAE) on test data = %g" % lr_evaluator.evaluate(lr_preds, {lr_evaluator.metricName: "mae"}))
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_preds, {lr_evaluator.metricName: "r2"}))

While R2 is approximately same as on training set, MAE is a bit worse. 

In next section, we will tune this Linear Regression model by doing cross validation.

#### 3. Tune linear regression model

In this section, we tune the linear regression using cross-validation. We will build a pipeline and search for best parameters in a parameter grid.

In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

We print out the parameters explanation in order to build parameter grid.

In [39]:
lr.explainParams()

We choose to tune:
- number of iterations
- regularization parameter
- elastic net parameter
- solver algorithm for optimization

In [41]:
pipeLine = Pipeline()
pipeLine.setStages([lr])

gridBuilder = ParamGridBuilder().addGrid(lr.maxIter, [10, 50, 100, 150, 200])\
                                .addGrid(lr.regParam, [0, 10, 100, 1000])\
                                .addGrid(lr.elasticNetParam, [0, 0.1, 0.3, 0.7, 1.0])\
                                .addGrid(lr.solver, ["auto", "normal", "l-bfgs"])\
                                .build()

cv = CrossValidator(estimator = lr,
                    estimatorParamMaps = gridBuilder,
                    evaluator = lr_evaluator)

In [42]:
cvm = cv.fit(train)

After fitting the cross validation on training set, we print out the best model parameters.

In [44]:
best_lr_model = cvm.bestModel
best_lr_model.explainParams()

The best model has the following parameters:
- number of iterations: 10
- regularization parameter: 0.0
- elastic net parameter: 0.0
- solver algorithm for optimization: l-bfgs

Next, we print out MAE and R2 score on training and test set.

In [46]:
# print out MAE and R2 on train data
best_lr_summary = best_lr_model.summary

print("Mean Absolute Error (MAE) on train data = %f" % best_lr_summary.meanAbsoluteError)
print("R Squared (R2) on train data = %f" % best_lr_summary.r2)

In [47]:
# make predictions on test data
best_lr_preds = best_lr_model.transform(test)

best_lr_preds.select("prediction", "demand", "features").show(5)

# print out MAE and R2 on test data
print("Mean Absolute Error (MAE) on test data = %g" % lr_evaluator.evaluate(best_lr_preds, {lr_evaluator.metricName: "mae"}))
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(best_lr_preds, {lr_evaluator.metricName: "r2"}))

It can be seen that MAE and R2 score on test set are slighly compared to the model built in previous section.

#### 4. Result analysis

In this section, we will compare true demand with model prediction to understand where the model performs well and where it performs badly.

We will be using best_lr_preds dataframe from previous section, which contains prediction made by best Linear Regression model after cross validation.

Let's display 5 first rows of best_lr_preds to see how it looks like.

In [51]:
best_lr_preds.show(5)

In order to understand where the linear regression model performs well or badly, we will group by the average and standard deviation of real demand and prediction by hour, season and month. To visualize the gap easily, we will prioritize charts over tables.

In [53]:
import pyspark.sql.functions as F

Average and standard deviation of real demand and prediction by **hour**:

In [55]:
# average demand and average prediction by hour
best_lr_preds.groupBy('hr')\
             .agg(F.avg('demand'), F.avg('prediction'))\
             .orderBy('hr', ascending=True)\
             .toPandas()\
             .plot.line(x="hr", y=["avg(demand)", "avg(prediction)"])

In [56]:
# standard deviation of demand and prediction by hour
best_lr_preds.groupBy('hr')\
             .agg(F.stddev('demand'), F.stddev('prediction'))\
             .orderBy('hr', ascending=True)\
             .toPandas()\
             .plot.line(x="hr", y=["stddev_samp(demand)", "stddev_samp(prediction)"])

Linear regression model is not able to capture the demand pattern: average and standard deviation of prediction are pretty far from those of real demand. Especially on rush hours, when both average and standard deviation of real demand are at the peak, the model fails to make a decent prediction.

Average and standard deviation of real demand and prediction by **season**:

In [59]:
# average demand and average prediction by season
best_lr_preds.groupBy('season')\
             .agg(F.avg('demand'), F.avg('prediction'))\
             .orderBy('season', ascending=True)\
             .toPandas()\
             .plot.bar(x="season", y=["avg(demand)", "avg(prediction)"])

In [60]:
# standard deviation of demand and prediction by season
best_lr_preds.groupBy('season')\
             .agg(F.stddev('demand'), F.stddev('prediction'))\
             .orderBy('season', ascending=True)\
             .toPandas()\
             .plot.bar(x="season", y=["stddev_samp(demand)", "stddev_samp(prediction)"])

The model's average prediction per season is pretty near the average target value. This is also true with standard deviation in Spring (season 1).
However, for remaining seasons, there is still room for improvement since standard deviation of real demand and that of prediction are far from each other.

Average and standard deviation of real demand and prediction by **month**:

In [63]:
# average demand and average prediction by month
best_lr_preds.groupBy('mnth')\
             .agg(F.avg('demand'), F.avg('prediction'))\
             .orderBy('mnth', ascending=True)\
             .toPandas()\
             .plot.line(x="mnth", y=["avg(demand)", "avg(prediction)"])

In [64]:
# standard deviation of demand and prediction by month
best_lr_preds.groupBy('mnth')\
             .agg(F.stddev('demand'), F.stddev('prediction'))\
             .orderBy('mnth', ascending=True)\
             .toPandas()\
             .plot.line(x="mnth", y=["stddev_samp(demand)", "stddev_samp(prediction)"])

Same as season, the model captures pretty well monthly trend in average demand. In contrast, the model doesn't seem to understand the trend in standard deviation.

#### 5. Feature engineering

In previous sections, we built, trained and tuned linear regression model on the provided dataset, without taking care of categorical features.
Indeed, we treated categorical features in same manner as numerical features. For example, season can have value 1, 2, 3 and 4 but this represent four different seasons thus should be taken as categorical features.

In this section, we will create dummy variables from categorical features in order to improve the accuracy of the model. These categorical features include: season, mnth, hr, weathersit and day_cat. Yr, holiday and workingday take either 0 or 1 as value, thus are not included in the categorical features from which we'll create dummy variables.

In [68]:
# have a look at the data
data.show(5)

In [69]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import col

cat_cols = ['season', 'mnth', 'hr', 'weathersit', 'day_cat']

indexers = [
    StringIndexer(inputCol=col, outputCol="{0}_indexed".format(col))
    for col in cat_cols
]

encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in encoders],
    outputCol="features"
)


pipeline = Pipeline(stages=indexers + encoders + [assembler])
data_transformed = pipeline.fit(data).transform(data)
data_transformed.show(5)

In [70]:
train, test = data_transformed.randomSplit([0.8, 0.2])
print ("We have %d training examples and %d test examples." % (train.count(), test.count()))

In [71]:
train.cache()
test.cache()

In [72]:
# initialize Linear Regression model with some hyperparameters
lr = LinearRegression(maxIter=100, elasticNetParam=0.3, featuresCol="features", labelCol="demand")

In [73]:
# fit to training set
lrm = lr.fit(train)

In [74]:
# print out MAE and R2 on train data
trainingSummary = lrm.summary
print("Mean Absolute Error (MAE) on train data = %f" % trainingSummary.meanAbsoluteError)
print("R Squared (R2) on train data = %f" % trainingSummary.r2)

In [75]:
# make predictions on test data
lr_preds = lrm.transform(test)

lr_preds.select("prediction", "demand", "features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction",\
                                   labelCol="demand")

# print out MAE and R2 on test data
print("Mean Absolute Error (MAE) on test data = %g" % lr_evaluator.evaluate(lr_preds, {lr_evaluator.metricName: "mae"}))
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_preds, {lr_evaluator.metricName: "r2"}))

#### 6. Try other regression models