# Regression with gradient-boosted trees and MLlib pipelines

This notebook uses a bike-sharing dataset to illustrate MLlib pipelines and the gradient-boosted trees machine learning algorithm. The challenge is to predict the number of bicycle rentals per hour based on the features available in the dataset such as day of the week, weather, season, and so on. Demand prediction is a common problem across businesses; good predictions allow a business or service to optimize inventory and to match supply and demand to make customers happy and maximize profitability.

### Load the dataset

The dataset is from the [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset) and is provided with Databricks Runtime. The dataset includes information about bicycle rentals from the Capital bikeshare system in 2011 and 2012.

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('bikes').getOrCreate()

In [3]:
df = spark.read.csv("data/hour.csv", header="true", inferSchema="true")

# The following command caches the DataFrame in memory. 
# This improves performance since subsequent calls to the DataFrame can read from memory instead of re-reading the 
# data from disk.
df.cache()

DataFrame[instant: int, dteday: timestamp, season: int, yr: int, mnth: int, hr: int, holiday: int, weekday: int, workingday: int, weathersit: int, temp: double, atemp: double, hum: double, windspeed: double, casual: int, registered: int, cnt: int]

In [4]:
df.show(n=5)

+-------+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|             dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|      1|2011-01-01 00:00:00|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|        13| 16|
|      2|2011-01-01 00:00:00|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     8|        32| 40|
|      3|2011-01-01 00:00:00|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     5|        27| 32|
|      4|2011-01-01 00:00:00|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     3|        10| 13|
|      5|2011-01-01 00:00:00|     1|  0| 

In [5]:
print("The dataset has %d rows." % df.count())

The dataset has 17379 rows.


## Preprocess data

This dataset is well prepared for machine learning algorithms. The numeric input columns (temp, atemp, hum, and windspeed) are normalized, categorial values (season, yr, mnth, hr, holiday, weekday, workingday, weathersit) are converted to indices, and all of the columns except for the date (dteday) are numeric.

The goal is to predict the count of bike rentals (the cnt column). Reviewing the dataset, you can see that some columns contain duplicate information. For example, the cnt column equals the sum of the casual and registered columns. We should remove the casual and registered columns from the dataset. The index column instant is also not useful as a predictor.

We can also delete the column dteday, as this information is already included in the other date-related columns yr, mnth, and weekday.

In [6]:
df = df.drop("instant").drop("dteday").drop("casual").drop("registered")
df.show(n=5)

+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|cnt|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0| 16|
|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0| 40|
|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0| 32|
|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0| 13|
|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|  1|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+---+
only showing top 5 rows



Print the dataset schema to see the type of each column.

In [7]:
df.printSchema()

root
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: integer (nullable = true)



### Split data into training and test sets

Randomly split data into training and test sets. By doing this, we can train and tune the model using only the training subset, and then evaluate the model's performance on the test set to get a sense of how the model will perform on new data.

In [8]:
# Split the dataset randomly into 70% for training and 30% for testing. Passing a seed for deterministic behavior

train, test = df.randomSplit([0.7, 0.3], seed = 42)

print("There are %d training examples and %d test examples." % (train.count(), test.count()))

There are 12234 training examples and 5145 test examples.


### Visualize the data

We can plot the data to explore it visually. The following plot shows the number of bicycle rentals during each hour of the day. As we might expect, rentals are low during the night, and peak at commute hours.

In [None]:
# pdf = (train.select("hr", "cnt")).toPandas()

### Train the machine learning pipeline

Now that we have reviewed the data and prepared it as a DataFrame with numeric values, we're ready to train a model to predict future bike sharing rentals.

Most MLlib algorithms require a single input column containing a vector of features and a single target column. The DataFrame currently has one column for each feature. MLlib provides functions to help us prepare the dataset in the required format.

MLlib pipelines combine multiple steps into a single workflow, making it easier to iterate as you develop the model.

In this example, we create a pipeline using the following functions:

* VectorAssembler: Assembles the feature columns into a feature vector.
* VectorIndexer: Identifies columns that should be treated as categorical. This is done heuristically, identifying any column with a small number of distinct values as categorical. In this example, the following columns are considered categorical: yr (2 values), season (4 values), holiday (2 values), workingday (2 values), and weathersit (4 values).
* GBTRegressor: Uses the [Gradient-Boosted Trees (GBT)](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier) algorithm to learn how to predict rental counts from the feature vectors.
* CrossValidator: The GBT algorithm has several hyperparameters. This notebook illustrates how to use [hyperparameter tuning in Spark](https://spark.apache.org/docs/latest/ml-tuning.html). This capability automatically tests a grid of hyperparameters and chooses the best resulting model.

For more information:
[VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler)
[VectorIndexer](https://spark.apache.org/docs/latest/ml-features.html#vectorindexer)

The first step is to create the VectorAssembler and VectorIndexer steps.

In [9]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer

In [10]:
# Remove the target column from the input feature set.
featuresCols = df.columns
featuresCols.remove('cnt')
 
# vectorAssembler combines all feature columns into a single feature vector column, "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
 
# vectorIndexer identifies categorical features and indexes them, and creates a new column "features". 
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

Next, we define the model.

In [11]:
from pyspark.ml.regression import GBTRegressor

In [12]:
# The next step is to define the model training stage of the pipeline. 
# The following command defines a GBTRegressor model that takes an input column "features" by default and learns to predict the labels in the "cnt" column. 

gbt = GBTRegressor(labelCol="cnt")

The third step is to wrap the model you just defined in a CrossValidator stage. CrossValidator calls the GBT algorithm with different hyperparameter settings. It trains multiple models and selects the best one, based on minimizing a specified metric. In this example, the metric is root mean squared error (RMSE).

In [13]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [14]:
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
 
# Define an evaluation metric. The CrossValidator compares the true labels with predicted values for each 
# combination of parameters, and calculates this value to determine the best model.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

We create the pipeline.

In [15]:
from pyspark.ml import Pipeline

In [16]:
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

We train the pipeline.

Now that we have set up the workflow, we can train the pipeline with a single call.

When we call fit(), the pipeline runs feature processing, model tuning, and training and returns a fitted pipeline with the best model it found. This step takes several minutes.

In [17]:
pipelineModel = pipeline.fit(train)

### Make predictions and evaluate results

The final step is to use the fitted model to make predictions on the test dataset and evaluate the model's performance. The model's performance on the test dataset provides an approximation of how it is likely to perform on new data. For example, if we had weather predictions for the next week, we could predict bike rentals expected during the next week.

Computing evaluation metrics is important for understanding the quality of predictions, as well as for comparing models and tuning parameters.

The transform() method of the pipeline model applies the full pipeline to the input dataset. The pipeline applies the feature processing steps to the dataset and then uses the fitted GBT model to make predictions. The pipeline returns a DataFrame with a new column predictions.

In [18]:
predictions = pipelineModel.transform(test)

In [20]:
predictions.select("cnt", "prediction", *featuresCols).show(n=10)

+---+------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+
|cnt|        prediction|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|
+---+------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+
| 33|22.358993226174217|     1|  0|   1|  0|      0|      0|         0|         1|0.16|0.1818| 0.8|   0.1045|
|  5| 7.334803031501155|     1|  0|   1|  0|      0|      1|         1|         1|0.12|0.1212| 0.5|   0.2836|
|  7| 18.12532344601902|     1|  0|   1|  0|      0|      1|         1|         2|0.24|0.2273|0.65|   0.2239|
| 12| 5.966541811759386|     1|  0|   1|  0|      0|      2|         1|         1|0.14|0.1667|0.59|   0.1045|
|  7| 8.459601443664615|     1|  0|   1|  0|      0|      3|         1|         2|0.16| 0.197|0.86|   0.0896|
| 17|15.370219551935245|     1|  0|   1|  0|      0|      3|         1|         2|0.22|0.2273|0.69|   0.1343|
|  3| 9.94

A common way to evaluate the performance of a regression model is the calculate the root mean squared error (RMSE). The value is not very informative on its own, but we can use it to compare different models. CrossValidator determines the best model by selecting the one that minimizes RMSE.

In [21]:
rmse = evaluator.evaluate(predictions)

print("RMSE on our test set: %g" % rmse)

RMSE on our test set: 45.1486


We can also plot the results. In this case, the hourly count of rentals shows a similar shape.

In [22]:
# predictions.select("hr", "prediction")

DataFrame[hr: int, prediction: double]

It's also a good idea to examine the residuals, or the difference between the expected result and the predicted value. 

The residuals should be randomly distributed; if there are any patterns in the residuals, the model may not be capturing something important. In this case, the average residual is about 0.5, less than 0.5% of the average value of the cnt column.

In [23]:
import pyspark.sql.functions as F

In [24]:
predictions_with_residuals = predictions.withColumn("residual", (F.col("cnt") - F.col("prediction")))

predictions_with_residuals.agg({'residual': 'mean'}).show(n=1)

+------------------+
|     avg(residual)|
+------------------+
|0.4479244253106421|
+------------------+



We can plot the residuals across the hours of the day to look for any patterns. In this example, there are no obvious correlations.

In [25]:
#predictions_with_residuals.select("hr", "residual")

### Improving the model

Here are some suggestions for improving this model:

* The count of rentals is the sum of registered and casual rentals. These two counts may have different behavior, as frequent cyclists and casual cyclists may rent bikes for different reasons. We can try training one GBT model for registered and one for casual, and then add their predictions together to get the full prediction.

* For efficiency, this notebook used only a few hyperparameter settings. We might be able to improve the model by testing more settings. A good start is to increase the number of trees by setting maxIter=200; this takes longer to train but might more accurate.

* This notebook used the dataset features as-is, but we might be able to improve performance with some feature engineering. For example, the weather might have more of an impact on the number of rentals on weekends and holidays than on workdays. We could try creating a new feature by combining those two columns. MLlib provides a suite of feature transformers; find out more in the ML guide.