# Machine Learning with Spark

## Introduction

You've now explored how to perform operations on Spark RDDs for simple Map-Reduce tasks. Luckily, there are far more advanced use cases for spark, and many of the are found in the ml library, which we are going to explore in this lesson.


## Objectives
* Describe the use case for Machine Learning with Spark
* Load data with Spark DataFrames
* Train a machine learning model with Spark


## A Tale of Two Libraries

If you look at the pyspark documentation, you'll notice that there are two different libraries for machine learning [mllib](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html) and [ml](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html). These libraries are extremely similar to one another, the only difference being that the mllib library is built upon the RDDs you just practiced using; whereas, the ml library is built on higher level Spark DataFrames, which has methods and attributes very similar to pandas. It's important to note that these libraries are much younger than pandas and many of the kinks are still being worked out. 

## Spark DataFrames

In the previous lessons, you've been introduced to SparkContext as the primary way to connect with a Spark Application. Here, we will be using SparkSession, which is from the [sql](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html) component of pyspark. Let's go through the process of manipulating some data here. For this example, we're going to be using the [Forest Fire dataset](https://archive.ics.uci.edu/ml/datasets/Forest+Fires) from UCI, which contains data about the area burned by wildfires in the Northeast region of Portugal in relation to numerous other factors.


In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("machine learning").getOrCreate()

Exception: Java gateway process exited before sending its port number

In [32]:
spark_df = spark.read.csv('./forestfires.csv',header='true',inferSchema='true')

In [33]:
## observing the datatype of df
type(spark_df)

pyspark.sql.dataframe.DataFrame

You'll notice that some of the methods are extremely similar or the same as those found within Pandas:


In [36]:
spark_df.head()

Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [38]:
spark_df.columns

['X',
 'Y',
 'month',
 'day',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'area']

Selecting columns is the same

In [46]:
spark_df[['month','day','rain']].head()

Row(month='mar', day='fri', rain=0.0)

But others not so much...

In [37]:
spark_df.info()

AttributeError: 'DataFrame' object has no attribute 'info'

In [64]:
## this is better
spark_df.describe()

DataFrame[summary: string, X: string, Y: string, month: string, day: string, FFMC: string, DMC: string, DC: string, ISI: string, temp: string, RH: string, wind: string, rain: string, area: string]

## Let's try some aggregations with our DataFrame

In [43]:
spark_df_months = spark_df.groupBy('month').agg({'area':'mean'})
spark_df_months

DataFrame[month: string, avg(area): double]

Notice how the grouped DataFrame is not returned when you call the aggregation method. Remember, this is still Spark! The transformations and actions are kept separate so that it is easier to manage large quantities of data. You can perform the transformation by making a `collect` method call.

In [44]:
spark_df_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

As you can see, there seem to be larger area fires during what would be considered the summer months in Portugal. On your own, practice more aggregations and manipualtions that you might be able to perform on this dataset. Now, we'll move on to using the machine learning applications of pyspark. 

### ML

Pyspark openly admits that they used sklearn as an inspiration for their implementation of a machine learning library. As a result, many of the methods and functionalities look similar, but there are some crucial distinctions. There are four main concepts found within the ML library:

`Transformer`: An algorithm that transforms one pyspark DataFrame into another DataFrame. 

`Estimator`: An algorithm that can be fit onto a pyspark DataFrame that can then be used as a Transformer. 

`Pipeline`: A pipeline very similar to an sklearn pipeline that chains together different actions.

The reasoning behind this separation of the fitting and transforming step is because sklearn is lazily evaluated, so the 'fitting' of a model does not actually take place until the Transformation action is called.

In [170]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

In [77]:
spark

In [191]:
si = StringIndexer(inputCol='month',outputCol='month_num')
model = si.fit(spark_df)
new_df = model.transform(spark_df)

Note the small, but critical distinction between sklearn's implementation of a transformer and pyspark's implementation. sklearn is more object oriented and spark is more functionally based programming

In [192]:
type(si)

pyspark.ml.feature.StringIndexer

In [193]:
type(model)

pyspark.ml.feature.StringIndexerModel

In [194]:
model.labels

['aug',
 'sep',
 'mar',
 'jul',
 'feb',
 'jun',
 'oct',
 'apr',
 'dec',
 'jan',
 'may',
 'nov']

In [195]:
new_df.head(4)

[Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0),
 Row(X=7, Y=4, month='oct', day='tue', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0),
 Row(X=7, Y=4, month='oct', day='sat', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0),
 Row(X=8, Y=6, month='mar', day='fri', FFMC=91.7, DMC=33.3, DC=77.5, ISI=9.0, temp=8.3, RH=97, wind=4.0, rain=0.2, area=0.0, month_num=2.0)]

Let's go ahead and remove the day column, as there is almost certainly no correlation between day of the week and areas burned with forest fires.

In [91]:
new_df = new_df.drop('day','month')
new_df.head()

Row(X=7, Y=5, FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0)

As you can see, we have created a new column called "month_num" that represents the month by a number. Now that we have performed this step, we can use Spark's version of OneHotEncoder. Let's make sure we have an accurate representation of the months.

In [107]:
new_df.select('month_num').distinct().collect()

[Row(month_num=8.0),
 Row(month_num=0.0),
 Row(month_num=7.0),
 Row(month_num=1.0),
 Row(month_num=4.0),
 Row(month_num=11.0),
 Row(month_num=3.0),
 Row(month_num=2.0),
 Row(month_num=10.0),
 Row(month_num=6.0),
 Row(month_num=5.0),
 Row(month_num=9.0)]

In [169]:
ohe = feature.OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'])

In [128]:
one_hot_encoded = ohe.fit(new_df).transform(new_df).drop('month_num')
one_hot_encoded.head()

Row(X=7, Y=5, FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_vec=SparseVector(11, {2: 1.0}))

In [269]:
features = ['X',
 'Y',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'month_vec']

target = 'area'

vector = VectorAssembler(inputCols=features,outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

In [270]:
vectorized_df.head()

Row(X=7, Y=5, FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_vec=SparseVector(11, {2: 1.0}), features=SparseVector(21, {0: 7.0, 1: 5.0, 2: 86.2, 3: 26.2, 4: 94.3, 5: 5.1, 6: 8.2, 7: 51.0, 8: 6.7, 12: 1.0}))

Great! We now have our data in a format that seems acceptable for the last step. Now it's time for us to actually fit our model to data! Let's try and fit a Random Forest Regression model our data.

In [271]:
rf_model = RandomForestRegressor(featuresCol='features',labelCol='area',predictionCol="prediction").fit(vectorized_df)

In [272]:
predictions = rf_model.transform(vectorized_df).select("area","prediction")

In [273]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area')

In [274]:
evaluator.evaluate(predictions,{evaluator.metricName:"r2"})

0.671401928309378

In [275]:
evaluator.evaluate(predictions,{evaluator.metricName:"mae"})

13.710954049069102

### Putting it all in a Pipeline

We just performed a whole lot of transformations to our data, and we can streamline the process to make it much more efficient let's look at how we could take our previous code and combine it to form a pipeline. Let's take a look at all the Esimators we used to create this model:

* StringIndexer
* OneHotEnconderEstimator
* VectorAssembler
* RandomForestRegressor

Once we've fit our model in the Pipeline, we're then going to want to evaluate it to determine how well it performs. We can do this with:

* RegressionEvaluator

In [276]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


In [277]:
string_indexer = StringIndexer(inputCol='month',outputCol='month_num',handleInvalid='keep')
one_hot_encoder = OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'])
vector_assember = VectorAssembler(inputCols=features,outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',labelCol='area')
stages =  [string_indexer, one_hot_encoder, vector_assember,random_forest]


pipeline = Pipeline(stages=stages)

In [278]:
params = ParamGridBuilder()\
.addGrid(random_forest.maxDepth, [5,10,15])\
.addGrid(random_forest.numTrees, [20,50,100])\
.build()

Let's take a look at the params variable we just built

In [279]:
print('total combinations of parameters: ',len(params))

total combinations of parameters:  9


In [280]:
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area',metricName = 'mae')

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params,evaluator=reg_evaluator)

In [281]:
cross_validated_model = cv.fit(spark_df)

Now, let's see how well the model performed! Let's take a look at the average performance for each one of our 9 models. It looks like the optimal performance was a MAE of 23.03. Note that this is worse than our original model, but that's because our original model had substantial data leakage. We didn't do a train-test-split!

In [226]:
cross_validated_model.avgMetrics

[24.283507394467403,
 23.035417293882116,
 22.516006104739546,
 25.172824616927077,
 24.076323818873483,
 23.15615371550553,
 25.249687913709245,
 24.141126374493428,
 23.229345283019505]

Let's take a look at the optimal parameters of our best performing model. The cross_validated_model variable is now saved as the best performing model from the grid search just performed. Let's look to see how well the predictions performed. As you can see, this dataset has a large number of areas of "0.0" burned. Perhaps, it would be better to investigate this problem as classification task.

In [257]:
predictions = cross_validated_model.transform(spark_df)
predictions.select('prediction','area').show(300)

+------------------+-------+
|        prediction|   area|
+------------------+-------+
| 6.586531997403031|    0.0|
| 4.919740844889813|    0.0|
| 5.547914541367747|    0.0|
| 6.893712206015393|    0.0|
| 5.179697933378074|    0.0|
| 9.218160801134067|    0.0|
| 22.01227935091017|    0.0|
| 7.357427417579902|    0.0|
| 8.955984236069005|    0.0|
|21.322545727623893|    0.0|
| 6.731756502788954|    0.0|
|18.223391492201202|    0.0|
| 7.025265763112522|    0.0|
| 10.43514507469672|    0.0|
|62.953059671916726|    0.0|
| 8.678236550869634|    0.0|
| 4.961681338171639|    0.0|
|  7.88099846133502|    0.0|
| 4.502810492724764|    0.0|
|  5.14657175330601|    0.0|
| 10.93142862217377|    0.0|
| 4.615792936762086|    0.0|
|  5.65826956897323|    0.0|
| 9.938450792664792|    0.0|
| 7.678003823745614|    0.0|
| 6.572181564331163|    0.0|
| 8.052658244970239|    0.0|
| 9.852608941724712|    0.0|
|17.723862819276402|    0.0|
|11.038881533381282|    0.0|
|  5.82439771599864|    0.0|
|6.94679905110

Now let's go ahead and take a look at the feature importances of our Random Forest model. In order to do this, we need to unroll our pipeline to access the Random Forest Model. Let's start by first checking out the "bestModel" attribute of our cross_validated_model.

In [259]:
type(cross_validated_model.bestModel)

pyspark.ml.pipeline.PipelineModel

So ml is treating the entire pipeline as the best performing model, let's see if we can go deeper into the pipeline to access the Random Forest model within it. Up above, we put the Random Forest Model as the final "stage" in the stages variable list. Let's look at the stages attribute of the bestModel.

In [260]:
cross_validated_model.bestModel.stages

[StringIndexer_47e4acf62f704a21366f,
 OneHotEncoderEstimator_43a7a11529ff9df60e6b,
 VectorAssembler_41f781ef51b853baa063,
 RandomForestRegressionModel (uid=RandomForestRegressor_4a2488f1d756e41813da) with 100 trees]

Perfect! There's the RandomForestRegressionModel, represented by the last item in the stages list. Now, we should be able to access all the attributes of the Random Forest Regressor.

In [264]:
optimal_rf_model = cross_validated_model.bestModel.stages[3]

In [268]:
optimal_rf_model.fe

Param(parent='RandomForestRegressor_4a2488f1d756e41813da', name='featuresCol', doc='features column name')

In [265]:
optimal_rf_model.featureImportances

SparseVector(22, {0: 0.099, 1: 0.0777, 2: 0.0028, 3: 0.0318, 4: 0.0, 5: 0.006, 6: 0.0, 7: 0.0006, 8: 0.0, 9: 0.0002, 10: 0.0005, 12: 0.0006, 14: 0.1209, 15: 0.0913, 16: 0.1111, 17: 0.0713, 18: 0.1247, 19: 0.1423, 20: 0.119, 21: 0.0001})

## Summary

Hopefully by now you have seen the power of pyspark and its pipelines. With the use of a pipeline, you could train a huge number of models simultaneously, saving you a substantial amount of time and effort. Up next, you will have a chance to build an ml pipeline of your own with a classification problem!