# COMP4124 Lab 05: MLlib

The aim of this lab is to get some practice with Spark's MLlib library.

This notebook is split into two parts:

* Part 1 gives an example of cross-validation with MLlib
* In Part 2, you will put together a pipeline for predicting appliance energy consumption

## Set-Up

**Install pyspark so can use within the notebook:**

In [None]:
!pip install pyspark

**Initialise the `SparkSession`:**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Lab05") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
data_path = "/content/drive/My Drive/COMP4124_notebooks/24-25/data/"

In [None]:
import pyspark.sql.functions as F

## Part 1: Cross-Validation Example

For this example, we are going to use the student marks data from the MLlib lecture.

**Load the data, cast columns to the correct types and drop the columns we don't want to use:**

In [None]:
marks_df = spark.read.csv(data_path+'results.csv', header=True)
marks_df = marks_df \
  .select([F.col(c).cast('float').alias(c) for c in marks_df.columns]) \
  .drop(*[c for c in marks_df.columns if 'Question' in c])
marks_df.show(5)

**Split data into training and final test sets:**

In [None]:
seed = 202503
train_df, test_df = marks_df.randomSplit([0.7, 0.3], seed)

### MLlib `CrossValidator`

MLlib provides us with the `CrossValidator`. This lets us do model selection (hyperparameter tuning) to find the best model for a given task. The MLlib ML Tuning Guide is available [here](https://spark.apache.org/docs/latest/ml-tuning.html).

**Note:** MLlib gives us another model selection `TrainValidationSplit`. We won't cover that in this lab, but feel free to try it out as part of your project, or just in your own time.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

To use `CrossValidator`, we need:
* an Estimator
* a parameter grid
* an Evaluator

We will set up our Pipeline from the MLlib lecture, to use as the Estimator.

**Set up instances of VectorAssembler and LinearRegression, then define our Pipeline stages:**

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [None]:
feature_cols = marks_df.columns
feature_cols.remove('Total')
feature_cols

In [None]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lr = LinearRegression(featuresCol='features', labelCol='Total')
pipeline = Pipeline(stages=[assembler, lr])

**We also need to define a parameter grid, i.e. the parameters and their values that we want to investigate.** I am only using very few combinations for this example, so that it runs reasonably quickly.

In [None]:
paramGrid = ParamGridBuilder() \
  .addGrid(lr.regParam, [0.5, 0.1]) \
  .addGrid(lr.maxIter, [1, 5]) \
  .build()

**Set up an Evaluator:** We will use the RegressionEvaluator from the MLlib lecture. We can use `getLabelCol()` and `getPredictionCol()` to get the relevant columns directly from our LinearRegressor, rather than having to specify them ourselves.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator(metricName="mae", labelCol=lr.getLabelCol(), predictionCol=lr.getPredictionCol())

**Now we are ready to create our CrossValidator, using 3 folds:**

In [None]:
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)

**Run the cross-validation, fitting models to our training data:** This may take a while if you investigate many parameter combinations!

In [None]:
cv_model = cv.fit(train_df)

In [None]:
type(cv_model)

`cv_model` now contains the pipeline model with the highest average metric across the 3 folds.

**We can use this model to make predictions on the final test set.**

In [None]:
predictions = cv_model.transform(test_df)

Evaluate how well the model has done on the final test set:

In [None]:
evaluator.evaluate(predictions)

**Investigate the best pipeline and its stages:**

We can use the attribute `bestModel` to access the best pipeline. We can then get a list of stages in this best pipeline via the attribute `stages`.

In [None]:
best_pipeline = cv_model.bestModel
best_pipeline.stages

We can then look at the best parameters found from the combinations we defined in our parameter grid. What were the best LinearRegression parameters found?

In [None]:
best_lr = best_pipeline.stages[1]
best_lr_params = best_lr.extractParamMap()
best_lr_params

If we wanted to just look at the parameters we investigated (`regParam` and `maxIter`):

In [None]:
{(k.name,v) for (k,v) in best_lr_params.items() if k.name in ['regParam','maxIter']}

## Part 2: Appliance Energy Prediction

The aim of this section is to use Spark MLlib to create a pipeline that preprocesses a dataset, trains a model, and makes predictions on a final test set. We also want to use hyperparameter tuning (via cross-validation) in order to fine-tune our model.

The content of this lab has been inspired by the sample provided in the [Databricks documentation](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2854662143668609/2084788691983918/6837869239396014/latest.html).

**Data Information**: This dataset contains energy consumption data from appliances at a 10-minute resolution for about 4.5 months. The house temperature and humidity conditions were monitored with a wireless sensor network. Each wireless node transmitted the temperature and humidity conditions around every 3.3 min. Then, the wireless data was averaged for 10-minute periods. The energy data was logged every 10 minutes. Weather from the nearest airport weather station (Chievres Airport, Belgium) was also logged. This dataset is from [Candanedo et al](http://dx.doi.org/10.1016/j.enbuild.2017.01.083) and is hosted by the UCI Machine Learning Repository. [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction).

**Goal**: We want to learn to predict appliances' energy consumption based on weather information. It would also be nice to know which input features are the most relevant to make predictions. This is a regression problme.

### Load and understand the dataset

In [None]:
energy_df = spark.read.csv(data_path+'energydata_complete.csv', header=True)
print(f'Number of rows: {energy_df.count()}')
energy_df.show(5)
energy_df.printSchema()

Cache the dataset:

In [None]:
energy_df.cache()

#### Data description

From the UCI ML Repository description, we know that the columns have the following meanings.

**Attribute information**:
```
date, year-month-day hour:minute:second
Appliances, energy use in Wh
lights, energy use of light fixtures in the house in Wh
T1, Temperature in kitchen area, in Celsius
RH_1, Humidity in kitchen area, in %
T2, Temperature in living room area, in Celsius
RH_2, Humidity in living room area, in %
T3, Temperature in laundry room area
RH_3, Humidity in laundry room area, in %
T4, Temperature in office room, in Celsius
RH_4, Humidity in office room, in %
T5, Temperature in bathroom, in Celsius
RH_5, Humidity in bathroom, in %
T6, Temperature outside the building (north side), in Celsius
RH_6, Humidity outside the building (north side), in %
T7, Temperature in ironing room , in Celsius
RH_7, Humidity in ironing room, in %
T8, Temperature in teenager room 2, in Celsius
RH_8, Humidity in teenager room 2, in %
T9, Temperature in parents room, in Celsius
RH_9, Humidity in parents room, in %
To, Temperature outside (from Chievres weather station), in Celsius
Pressure (from Chievres weather station), in mm Hg
RH_out, Humidity outside (from Chievres weather station), in %
Wind speed (from Chievres weather station), in m/s
Visibility (from Chievres weather station), in km
Tdewpoint (from Chievres weather station), Â°C
rv1, Random variable 1, nondimensional
rv2, Random variable 2, nondimensional
```

**The target variable is the energy use of the Appliances.**

For now, we will leave the two variables `rv1` and `rv2` in our dataset, to see if they are affecting much our methods, then we can try to remove them and see if we improve the results.

#### Cast the columns to appropriate types

Most columns in this dataset are numerical, so we can just cast them to floats. The exception is the 'date' column, which we instead want to cast to a timestamp.

<font color='blue'>**Task:** Cast the 'date' column to a timestamp. Cast all other columns to floats. Assign the resulting DataFrame back to `energy_df`.<br>
There are a lot of columns, so you should do this programmatically rather than manually casting every column individually!</font>

In [None]:
numerical_cols = energy_df.columns
numerical_cols.remove('date')
energy_df = energy_df \
  .select(['date']+[F.col(c).cast("float").alias(c) for c in numerical_cols]) \
  .withColumn('date',F.to_timestamp(F.col('date')))

energy_df.printSchema()

#### Data preprocessing

This dataset is nicely prepared for Machine Learning and requires very little preprocessing. However, rather than keeping the date as a single timestamp, we would like to have some additional columns, including 'day of the year', 'hour', and 'month of the year'.

<font color='blue'>**Task:** Add columns `dayofyear`, `hour` and `month` to the DataFrame. You will need to find the appropriate functions from the DataFrame API. Assign the resulting DataFrame back to `energy_df`.</font>

In [None]:
energy_df = energy_df \
  .withColumn("dayofyear", F.dayofyear('date')) \
  .withColumn("hour", F.hour('date')) \
  .withColumn("month", F.month('date'))

<font color='blue'>**Task:** Drop the 'date' column - we don't want to use this in our predictions now we have our three new time-related columns added. Assign the resulting DataFrame back to `energy_df`.</font>

In [None]:
energy_df = energy_df.drop('date')
energy_df.show(5)

In [None]:
energy_df.printSchema()

#### Data visualisation

Before applying any machine learning algorithm, it is good practice to try to visualise your data. For example, we could see how much energy is consumed by appliances depending on the month.

<font color='blue'>**Task:** Create a histogram of the total amount of energy used per month. Plot the histogram using a library of your choice (e.g. matplotlib).</font>

In [None]:
from matplotlib import pyplot as plt

In [None]:
hist = energy_df.groupBy('month').sum('Appliances').sort("month").toPandas()
hist

In [None]:
plt.bar(hist['month'], hist['sum(Appliances)'])
plt.title('Energy consumption per month')
plt.xlabel('Month')
plt.ylabel('Sum of Appliance Energy')
plt.show()

<font color='blue'>**Open task:** Do any other analysis of the data you think would be interesting or useful!</font>

#### Split into train and test sets

**Split the data into training and test sets:** We will train and tune our model on the training set, and then see how well we do on a final test set.

<font color='blue'>**Task:** Split `energy_df` into 70% for training and 30% for the test set.</font>

In [None]:
seed = 202503
train_df, test_df = energy_df.randomSplit([0.7, 0.3], seed)

### Create a Pipeline

We are going to create a simple Pipeline, using only the following stages:

* **VectorAssembler:** To combine all the input columns into a single vector column (i.e. all the columns apart from 'Appliances').
* **GBTRegressor:** The learning algorithm we will use is [Gradient-Boosted Trees](https://en.wikipedia.org/wiki/Gradient_boosting#Gradient_tree_boosting) (GBTs).

<font color='blue'>**Task:** Create a VectorAssembler. Remember not to include the label column 'Appliances' in the features!</font>

In [None]:
features_cols = energy_df.columns
features_cols.remove('Appliances')
vector_assembler = VectorAssembler(inputCols=features_cols, outputCol="features")

<font color='blue'>**Task:** Create a GBTRegressor. Don't indicate any parameters, other than specifying the class label (i.e. `labelCol`) to be `'Appliances'`.</font>

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="Appliances")

<font color='blue'>**Task:** Create a Pipeline, using your VectorAssembler and GBTRegressor as the two stages.</font>

In [None]:
pipeline = Pipeline(stages=[vector_assembler, gbt])

### Create a CrossValidator

<font color='blue'>**Task:** Define a parameter grid with the parameters you want to investigate. Don't attempt to do too many, otherwise it may take a very long time to run!<br>
You can look at the [GBTRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.GBTRegressor.html) docs for the parameters you can choose.</font>

In [None]:
paramGrid = ParamGridBuilder() \
  .addGrid(gbt.maxDepth, [5, 8]) \
  .addGrid(gbt.maxIter, [10, 20]) \
  .build()

<font color='blue'>**Task:** Create a RegressionEvaluator that uses the Root Mean Squared Error (RMSE) as the performance metric. Specify the `labelCol` and `predictionCol` by getting them from your GBTRegressor.</font>

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

<font color='blue'>**Task:** Create a CrossValidator that uses the Pipeline as an estimator, as well the parameter grid and evaluator that you defined above. Use 3 folds for the cross-validation.</font>

In [None]:
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

<font color='blue'>**Task:** Run the cross-validation using the training data, to create a CrossValidatorModel called `cv_model`.</font>

In [None]:
cv_model = cv.fit(train_df)

**Saving models:** MLlib lets you save models to disk. This means you can load those models directly from a file later on if needed, so you don't have to wait for them to train again. [See the docs for more information](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidatorModel.html#pyspark.ml.tuning.CrossValidatorModel.save)

### Evaluate the results

Let's see how the model does on the final test set!

<font color='blue'>**Task:** Transform the test set to add a column with the predictions, using your CrossValidatorModel. Assign the resulting DataFrame to a variable called `predictions`.</font>

In [None]:
predictions = cv_model.transform(test_df)

In [None]:
predictions.select('Appliances', 'prediction').show(5)

<font color='blue'>**Task:** Use your Evaluator that you defined above to calculate the RMSE.</font>

In [None]:
evaluator.evaluate(predictions)

The result probably seems quite high. You can see if you can improve it!

But first, let's see how we can find out the importance of the features from the GBT.

### Investigating the GBT

**Retrieve the best GBT from the CrossValidatorModel:**

In [None]:
best_pipeline = cv_model.bestModel
gbt_model = best_pipeline.stages[1]

**Check the feature importances from the GBT:**

In [None]:
gbt_model.featureImportances

A lot of these features are showing quite low importance. They won't be affecting the performance much as GBTs perform somewhat an implicit feature selection. But, we could make our model more interpretable if we remove the low importance features.

<font color='blue'>**Task:** Create a list of the features with less than 0.05 in the feature importances. You will need to get the names of the feature columns the DataFrame.</font>

In [None]:
importances = gbt_model.featureImportances

In [None]:
features_to_remove = []
for i in range(len(features_cols)):
  if importances[i] < 0.05:
    features_to_remove.append(features_cols[i])

In [None]:
features_to_remove

#### Remove the low importance features, and re-do the cross validation

<font color='blue'>**Task:** Create a new VectorAssembler, with only higher importance features included. You don't need to modify your training and test DataFrames at all - you simply need to specify the relevant columns as the `inputCols` when creating the VectorAssembler.</font>

In [None]:
features_cols2 = energy_df.columns
features_cols2.remove('Appliances')
for feature in features_to_remove:
    features_cols2.remove(feature)
vector_assembler2 = VectorAssembler(inputCols=features_cols2, outputCol="features")
features_cols2

<font color='blue'>**Task:** Create a new Pipeline with your new VectorAssembler and your GBTRegressor instance.</font>

In [None]:
pipeline2 = Pipeline(stages=[vector_assembler2, gbt])

<font color='blue'>**Task:** Re-do the cross-validation, using your new Pipeline as the estimator.</font>

In [None]:
cv2 = CrossValidator(estimator=pipeline2, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

In [None]:
cv_model2 = cv2.fit(train_df)

<font color='blue'>**Task:** Use your new model to make predictions, and then compute the RMSE.</font>

In [None]:
predictions2 = cv_model2.transform(test_df)
rmse = evaluator.evaluate(predictions2)
print(rmse)

In my case, I got about the same value for the RMSE, but this has achieved using far fewer features.

<font color='blue'>**Task:** Have a look at the feature importances of the new model.</font>

In [None]:
cv_model2.bestModel.stages[1].featureImportances

### Improving the model further

<font color='blue'>**Open task:** There might be many ways to improve the results here. Implement some alternatives below.<br>
Some ideas for you to consider:
* Hyperparameter tuning: You probably initially used quite a small set of parameters in the cross-validation. Try different combinations of parameters and values.
* Alternative models: Try with different regression models from MLlib.
* Additional preprocessing: Is there any normalisation that may be needed? Are there outliers or noise that may have an impact on results?
</font>

**It is good practice to stop the underlying SparkContext when we are done**

In [None]:
spark.stop()

In [None]:
spark