# Task 4

We haven't discussed MLlib in detail in our class, so consider MLlib as another python package that you are using, like the scikit-learn. What you write using this package, pyspark will be using the spark engine to run your code. I have put guidelines and helpful links (as comments) along with this notebook for taking you through this.

## Imports

In [21]:
import numpy as np

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1618867634217_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read

#### Read 100 data points for testing the code, once you get to the bottom then read the entire dataset

In [2]:
aws_credentials = {"key": "","secret": ""}
## here 100 data points for testing the code
pandas_df = pd.read_csv("s3://mds-s3-student49/output/ml_data_SYD.csv", storage_options=aws_credentials, index_col=0, parse_dates=True).iloc[:100].dropna()
feature_cols = list(pandas_df.drop(columns="Observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
pandas_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

            ACCESS-CM2  ACCESS-ESM1-5  ...   TaiESM1   Observed
time                                   ...                     
1889-01-01    0.040427       1.814552  ...  2.257933   0.006612
1889-01-02    0.073777       0.303965  ...  2.287381   0.090422
1889-01-03    0.232656       0.019976  ...  1.199909   1.401452
1889-01-04    0.911319      13.623777  ...  2.106737  14.869798
1889-01-05    0.698013       0.021048  ...  1.763335   0.467628
...                ...            ...  ...       ...        ...
1889-04-06    5.961386       0.142560  ...  1.836278   1.381577
1889-04-07    2.546004       0.015131  ...  3.678201   3.047011
1889-04-08    8.811083       1.143435  ...  4.575716   0.405841
1889-04-09    2.257756       0.083106  ...  2.876456   0.022048
1889-04-10    0.327372       0.015648  ...  1.726308   0.006612

[100 rows x 26 columns]

In [4]:
feature_cols

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['ACCESS-CM2', 'ACCESS-ESM1-5', 'AWI-ESM-1-1-LR', 'BCC-CSM2-MR', 'BCC-ESM1', 'CMCC-CM2-HR4', 'CMCC-CM2-SR5', 'CMCC-ESM2', 'CanESM5', 'EC-Earth3-Veg-LR', 'FGOALS-g3', 'GFDL-CM4', 'INM-CM4-8', 'INM-CM5-0', 'KIOST-ESM', 'MIROC6', 'MPI-ESM-1-2-HAM', 'MPI-ESM1-2-HR', 'MPI-ESM1-2-LR', 'MRI-ESM2-0', 'NESM3', 'NorESM2-LM', 'NorESM2-MM', 'SAM0-UNICON', 'TaiESM1']

## Preparing dataset for ML

In [5]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#initialize RandomForestRegressor
rf = RandomForestRegressor(labelCol = "Observed", featuresCol = "Features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find best hyperparameter settings

You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/) as a reference. All what you need to complete this task are in there. 

Some additional info [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

Official Documentation of MLlib, Random forest regression [here](http://spark.apache.org/docs/3.0.1/ml-classification-regression.html#random-forest-regression). When using spark documentation always keep in my API sometimes change with versions, new updates/features come in every version release, so always make sure you choose the documentation of the correct spark version. Please find version what you use [here](http://spark.apache.org/docs/).

Use these parameters for coming up with ideal parameters, you could try more parameters, but unfourtunately with this single node cluster we dont have enough power to do it.

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]

    - In the CrossValidator use evaluator to be RegressionEvaluator(labelCol="Observed")

In [9]:
#Paramgrid for hyperparameter tuning
rfparamGrid = (ParamGridBuilder()
             #.addGrid(rf.maxDepth, [5, 10])
               .addGrid(rf.maxDepth, [5, 10])
             #.addGrid(rf.bootstrap, [False, True])
               .addGrid(rf.bootstrap, [False, True])
             #.addGrid(rf.numTrees, [10, 50, 100])
               .addGrid(rf.numTrees, [10, 50, 100])
             .build())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#define model evaluator
rfevaluator = RegressionEvaluator(predictionCol="prediction", 
                                  labelCol="Observed", 
                                  metricName="rmse")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
#cross validate 
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#fit model
rfcvModel = rfcv.fit(training)
print(rfcvModel)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CrossValidatorModel_dd16f1aaed2e

In [24]:
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(rfcvModel.avgMetrics):.2f}")
print(f"numTrees: {rfcvModel.bestModel.getNumTrees}")
print(f"numTrees: {rfcvModel.bestModel.getMaxDepth()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 3.54
numTrees: 50
numTrees: 5

In [11]:
##Once you finish testing the model on 100 data points, then load entire dataset and run 
## this could take ~15 min.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Hyperparameter tuning using the entire dataset

In [26]:
#Load full data

## here 100 data points for testing the code
pandas_df = pd.read_csv("s3://mds-s3-student49/output/ml_data_SYD.csv", 
                        storage_options=aws_credentials, 
                        index_col=0, parse_dates=True).dropna()

feature_cols = list(pandas_df.drop(columns="Observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
#initialize RandomForestRegressor
rf = RandomForestRegressor(labelCol = "Observed", featuresCol = "Features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
#Paramgrid for hyperparameter tuning
rfparamGrid = (ParamGridBuilder()
             #.addGrid(rf.maxDepth, [5, 10])
               .addGrid(rf.maxDepth, [5, 10])
             #.addGrid(rf.bootstrap, [False, True])
               .addGrid(rf.bootstrap, [False, True])
             #.addGrid(rf.numTrees, [10, 50, 100])
               .addGrid(rf.numTrees, [10, 50, 100])
             .build())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
#define model evaluator
rfevaluator = RegressionEvaluator(predictionCol="prediction", 
                                  labelCol="Observed", 
                                  metricName="rmse")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
#cross validate 
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
rfcvModel_full = rfcv.fit(training)
print(rfcvModel_full)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CrossValidatorModel_dd16f1aaed2e

In [46]:
# Print run info
print("\nBest model")
print("==========")
print(f"\nCV Score: {np.mean(rfcvModel_full.avgMetrics):.2f}")
print(f"numTrees: {rfcvModel_full.bestModel.getNumTrees}")
print(f"MaxDepth: {rfcvModel_full.bestModel.getMaxDepth()}")
print(f"Bootstrap: {rfcvModel_full.bestModel.getBootstrap()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 8.22
numTrees: 100
MaxDepth: 5
Bootstrap: True