# Machine Learning with Spark

## Introduction

You've now explored how to perform operations on Spark RDDs for simple MapReduce tasks. Luckily, there are far more advanced use cases for Spark, and many of them are found in the `ml` library, which we are going to explore in this lesson.


## Objectives

You will be able to: 

- Load and manipulate data using Spark DataFrames  
- Define estimators and transformers in Spark ML 
- Create a Spark ML pipeline that transforms data and runs over a grid of hyperparameters 



## A Tale of Two Libraries

If you look at the PySpark documentation, you'll notice that there are two different libraries for machine learning, [mllib](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html) and [ml](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html). These libraries are extremely similar to one another, the only difference being that the `mllib` library is built upon the RDDs you just practiced using; whereas, the `ml` library is built on higher level Spark DataFrames, which has methods and attributes similar to pandas. Spark has stated that in the future, it is going to devote more effort to the `ml` library and that `mllib` will become deprecated. It's important to note that these libraries are much younger than pandas and scikit-learn and there are not as many features present in either.

## Spark DataFrames

In the previous lessons, you were introduced to SparkContext as the primary way to connect with a Spark Application. Here, we will be using SparkSession, which is from the [sql](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html) component of PySpark. The SparkSession acts the same way as SparkContext; it is a bridge between Python and the Spark Application. It's just built on top of the Spark SQL API, a higher-level API than RDDs. In fact, a SparkContext object is spun up around which the SparkSession object is wrapped. Let's go through the process of manipulating some data here. For this example, we're going to be using the [Forest Fire dataset](https://archive.ics.uci.edu/ml/datasets/Forest+Fires) from UCI, which contains data about the area burned by wildfires in the Northeast region of Portugal in relation to numerous other factors.

To begin with, let's create a SparkSession so that we can spin up our spark application. 

In [1]:
# importing the necessary libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext('local[*]')
spark = SparkSession(sc)

To create a SparkSession: 

In [2]:
spark = SparkSession.builder.master('local').getOrCreate()

Now, we'll load the data into a PySpark DataFrame: 

In [3]:
## reading in pyspark df
spark_df = spark.read.csv('./forestfires.csv', header='true', inferSchema='true')

## observing the datatype of df
type(spark_df)

pyspark.sql.dataframe.DataFrame

You'll notice that some of the methods are extremely similar or the same as those found within Pandas.

In [4]:
spark_df.head()

Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [5]:
spark_df.columns

['X',
 'Y',
 'month',
 'day',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'area']

Selecting multiple columns is similar as well: 

In [6]:
spark_df[['month','day','rain']]

DataFrame[month: string, day: string, rain: double]

But selecting one column is different. If you want to maintain the methods of a spark DataFrame, you should use the `.select()` method. If you want to just select the column, you can use the same method you would use in pandas (this is primarily what you would use if you're attempting to create a boolean mask). 

In [7]:
d = spark_df.select('rain')

In [8]:
d

DataFrame[rain: double]

In [24]:
d.show()

+----+
|rain|
+----+
| 0.0|
| 0.0|
| 0.0|
| 0.2|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
| 0.0|
+----+
only showing top 20 rows



In [9]:
spark_df['rain']

Column<b'rain'>

Let's take a look at all of our data types in this dataframe

In [10]:
spark_df.dtypes

[('X', 'int'),
 ('Y', 'int'),
 ('month', 'string'),
 ('day', 'string'),
 ('FFMC', 'double'),
 ('DMC', 'double'),
 ('DC', 'double'),
 ('ISI', 'double'),
 ('temp', 'double'),
 ('RH', 'int'),
 ('wind', 'double'),
 ('rain', 'double'),
 ('area', 'double')]

## Aggregations with our DataFrame

Let's investigate to see if there is any correlation between what month it is and the area of fire: 

In [11]:
spark_df_months = spark_df.groupBy('month').agg({'area': 'mean'})
spark_df_months

DataFrame[month: string, avg(area): double]

In [15]:
spark_df_months.sort('avg(area)').show()

+-----+------------------+
|month|         avg(area)|
+-----+------------------+
|  jan|               0.0|
|  nov|               0.0|
|  mar| 4.356666666666667|
|  jun| 5.841176470588234|
|  feb|             6.275|
|  oct|             6.638|
|  apr| 8.891111111111112|
|  aug|12.489076086956521|
|  dec|             13.33|
|  jul|        14.3696875|
|  sep|17.942616279069753|
|  may|             19.24|
+-----+------------------+



Notice how the grouped DataFrame is not returned when you call the aggregation method. Remember, this is still Spark! The transformations and actions are kept separate so that it is easier to manage large quantities of data. You can perform the transformation by calling `.collect()`: 

In [16]:
spark_df_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

As you can see, there seem to be larger area fires during what would be considered the summer months in Portugal. On your own, practice more aggregations and manipulations that you might be able to perform on this dataset. 

In [19]:
spark_df_rain = spark_df.groupBy('day').agg({'rain': 'mean'})

In [20]:
spark_df_rain.show()

+---+--------------------+
|day|           avg(rain)|
+---+--------------------+
|fri|0.018823529411764704|
|thu|                 0.0|
|tue|             0.13125|
|sat|                 0.0|
|wed|0.003703703703703704|
|mon|                 0.0|
|sun|0.010526315789473684|
+---+--------------------+



## Boolean Masking 

Boolean masking also works with PySpark DataFrames just like Pandas DataFrames, the only difference being that the `.filter()` method is used in PySpark. To try this out, let's compare the amount of fire in those areas with absolutely no rain to those areas that had rain.

In [21]:
no_rain = spark_df.filter(spark_df['rain'] == 0.0)
some_rain = spark_df.filter(spark_df['rain'] > 0.0)

In [22]:
no_rain.show(5)

+---+---+-----+---+----+----+-----+----+----+---+----+----+----+
|  X|  Y|month|day|FFMC| DMC|   DC| ISI|temp| RH|wind|rain|area|
+---+---+-----+---+----+----+-----+----+----+---+----+----+----+
|  7|  5|  mar|fri|86.2|26.2| 94.3| 5.1| 8.2| 51| 6.7| 0.0| 0.0|
|  7|  4|  oct|tue|90.6|35.4|669.1| 6.7|18.0| 33| 0.9| 0.0| 0.0|
|  7|  4|  oct|sat|90.6|43.7|686.9| 6.7|14.6| 33| 1.3| 0.0| 0.0|
|  8|  6|  mar|sun|89.3|51.3|102.2| 9.6|11.4| 99| 1.8| 0.0| 0.0|
|  8|  6|  aug|sun|92.3|85.3|488.0|14.7|22.2| 29| 5.4| 0.0| 0.0|
+---+---+-----+---+----+----+-----+----+----+---+----+----+----+
only showing top 5 rows



In [23]:
some_rain.show()

+---+---+-----+---+----+-----+-----+----+----+---+----+----+-----+
|  X|  Y|month|day|FFMC|  DMC|   DC| ISI|temp| RH|wind|rain| area|
+---+---+-----+---+----+-----+-----+----+----+---+----+----+-----+
|  8|  6|  mar|fri|91.7| 33.3| 77.5| 9.0| 8.3| 97| 4.0| 0.2|  0.0|
|  7|  4|  aug|sun|91.8|175.1|700.7|13.8|21.9| 73| 7.6| 1.0|  0.0|
|  7|  6|  jul|wed|91.2|183.1|437.7|12.5|12.6| 90| 7.6| 0.2|  0.0|
|  7|  5|  aug|tue|96.1|181.1|671.2|14.3|27.3| 63| 4.9| 6.4|10.82|
|  8|  6|  aug|tue|96.1|181.1|671.2|14.3|21.6| 65| 4.9| 0.8|  0.0|
|  7|  5|  aug|tue|96.1|181.1|671.2|14.3|21.6| 65| 4.9| 0.8|  0.0|
|  4|  4|  aug|tue|96.1|181.1|671.2|14.3|20.7| 69| 4.9| 0.4|  0.0|
|  5|  4|  aug|fri|91.0|166.9|752.6| 7.1|21.1| 71| 7.6| 1.4| 2.17|
+---+---+-----+---+----+-----+-----+----+----+---+----+----+-----+



Now, to perform calculations to find the mean of a column, we'll have to import functions from `pyspark.sql`. As always, to read more about them, check out the [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions).

In [25]:
from pyspark.sql.functions import mean

print('no rain fire area: ', no_rain.select(mean('area')).show(),'\n')

print('some rain fire area: ', some_rain.select(mean('area')).show(),'\n')

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+

no rain fire area:  None 

+---------+
|avg(area)|
+---------+
|  1.62375|
+---------+

some rain fire area:  None 



Yes there's definitely something there! Unsurprisingly, rain plays in a big factor in the spread of wildfire.

Let's obtain data from only the summer months in Portugal (June, July, and August). We can also do the same for the winter months in Portugal (December, January, February).

In [26]:
summer_months = spark_df.filter(spark_df['month'].isin(['jun','jul','aug']))
winter_months = spark_df.filter(spark_df['month'].isin(['dec','jan','feb']))

print('summer months fire area', summer_months.select(mean('area')).show())
print('winter months fire areas', winter_months.select(mean('area')).show())

+------------------+
|         avg(area)|
+------------------+
|12.262317596566525|
+------------------+

summer months fire area None
+-----------------+
|        avg(area)|
+-----------------+
|7.918387096774193|
+-----------------+

winter months fire areas None


## Machine Learning

Now that we've performed some data manipulation and aggregation, lets get to the really cool stuff, machine learning! PySpark states that they've used scikit-learn as an inspiration for their implementation of a machine learning library. As a result, many of the methods and functionalities look similar, but there are some crucial distinctions. There are three main concepts found within the ML library:

`Transformer`: An algorithm that transforms one PySpark DataFrame into another DataFrame. 

`Estimator`: An algorithm that can be fit onto a PySpark DataFrame that can then be used as a Transformer. 

`Pipeline`: A pipeline very similar to an `sklearn` pipeline that chains together different actions.

The reasoning behind this separation of the fitting and transforming step is because Spark is lazily evaluated, so the 'fitting' of a model does not actually take place until the Transformation action is called. Let's examine what this actually looks like by performing a regression on the Forest Fire dataset. To start off with, we'll import the necessary libraries for our tasks.

In [27]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

Looking at our data, one can see that all the categories are numerical except for day and month. We saw some correlation between the month and area burned in a fire, so we will include that in our model. The day of the week, however, is highly unlikely to have any effect on fire, so we will drop it from the DataFrame.

In [28]:
fire_df = spark_df.drop('day')
fire_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [30]:
fire_df.show(5)

+---+---+-----+----+----+-----+---+----+---+----+----+----+
|  X|  Y|month|FFMC| DMC|   DC|ISI|temp| RH|wind|rain|area|
+---+---+-----+----+----+-----+---+----+---+----+----+----+
|  7|  5|  mar|86.2|26.2| 94.3|5.1| 8.2| 51| 6.7| 0.0| 0.0|
|  7|  4|  oct|90.6|35.4|669.1|6.7|18.0| 33| 0.9| 0.0| 0.0|
|  7|  4|  oct|90.6|43.7|686.9|6.7|14.6| 33| 1.3| 0.0| 0.0|
|  8|  6|  mar|91.7|33.3| 77.5|9.0| 8.3| 97| 4.0| 0.2| 0.0|
|  8|  6|  mar|89.3|51.3|102.2|9.6|11.4| 99| 1.8| 0.0| 0.0|
+---+---+-----+----+----+-----+---+----+---+----+----+----+
only showing top 5 rows



In order for us to run our model, we need to turn the months variable into a dummy variable. In `ml` this is a 2-step process that first requires turning the categorical variable into a numerical index (`StringIndexer`). Only after the variable is an integer can PySpark create dummy variable columns related to each category (`OneHotEncoderEstimator`). Your key parameters when using these `ml` estimators are: `inputCol` (the column you want to change) and `outputCol` (where you will store the changed column). Here it is in action: 

In [31]:
si = StringIndexer(inputCol='month', outputCol='month_num')
model = si.fit(fire_df)
new_df = model.transform(fire_df)

In [32]:
new_df.show()

+---+---+-----+----+-----+-----+----+----+---+----+----+----+---------+
|  X|  Y|month|FFMC|  DMC|   DC| ISI|temp| RH|wind|rain|area|month_num|
+---+---+-----+----+-----+-----+----+----+---+----+----+----+---------+
|  7|  5|  mar|86.2| 26.2| 94.3| 5.1| 8.2| 51| 6.7| 0.0| 0.0|      2.0|
|  7|  4|  oct|90.6| 35.4|669.1| 6.7|18.0| 33| 0.9| 0.0| 0.0|      6.0|
|  7|  4|  oct|90.6| 43.7|686.9| 6.7|14.6| 33| 1.3| 0.0| 0.0|      6.0|
|  8|  6|  mar|91.7| 33.3| 77.5| 9.0| 8.3| 97| 4.0| 0.2| 0.0|      2.0|
|  8|  6|  mar|89.3| 51.3|102.2| 9.6|11.4| 99| 1.8| 0.0| 0.0|      2.0|
|  8|  6|  aug|92.3| 85.3|488.0|14.7|22.2| 29| 5.4| 0.0| 0.0|      0.0|
|  8|  6|  aug|92.3| 88.9|495.6| 8.5|24.1| 27| 3.1| 0.0| 0.0|      0.0|
|  8|  6|  aug|91.5|145.4|608.2|10.7| 8.0| 86| 2.2| 0.0| 0.0|      0.0|
|  8|  6|  sep|91.0|129.5|692.6| 7.0|13.1| 63| 5.4| 0.0| 0.0|      1.0|
|  7|  5|  sep|92.5| 88.0|698.6| 7.1|22.8| 40| 4.0| 0.0| 0.0|      1.0|
|  7|  5|  sep|92.5| 88.0|698.6| 7.1|17.8| 51| 7.2| 0.0| 0.0|   

Note the small, but critical distinction between `sklearn`'s implementation of a transformer and PySpark's implementation. `sklearn` is more object oriented and Spark is more functional oriented.

In [33]:
## this is an estimator (an untrained transformer)
type(si)

pyspark.ml.feature.StringIndexer

In [35]:
## this is a transformer (a trained transformer)
type(model)

pyspark.ml.feature.StringIndexerModel

In [40]:
model.labels

['aug',
 'sep',
 'mar',
 'jul',
 'feb',
 'jun',
 'oct',
 'apr',
 'dec',
 'jan',
 'may',
 'nov']

In [37]:
new_df.head(4)

[Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0),
 Row(X=8, Y=6, month='mar', FFMC=91.7, DMC=33.3, DC=77.5, ISI=9.0, temp=8.3, RH=97, wind=4.0, rain=0.2, area=0.0, month_num=2.0)]

As you can see, we have created a new column called `'month_num'` that represents the month by a number. Now that we have performed this step, we can use Spark's version of `OneHotEncoder()` - `OneHotEncoderEstimator()`. Let's make sure we have an accurate representation of the months.

In [41]:
new_df.select('month_num').distinct().collect()

[Row(month_num=8.0),
 Row(month_num=0.0),
 Row(month_num=7.0),
 Row(month_num=1.0),
 Row(month_num=4.0),
 Row(month_num=11.0),
 Row(month_num=3.0),
 Row(month_num=2.0),
 Row(month_num=10.0),
 Row(month_num=6.0),
 Row(month_num=5.0),
 Row(month_num=9.0)]

In [42]:
## fitting and transforming the OneHotEncoderEstimator
ohe = feature.OneHotEncoderEstimator(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
one_hot_encoded = ohe.fit(new_df).transform(new_df)
one_hot_encoded.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}))

In [44]:
one_hot_encoded.show()

+---+---+-----+----+-----+-----+----+----+---+----+----+----+---------+--------------+
|  X|  Y|month|FFMC|  DMC|   DC| ISI|temp| RH|wind|rain|area|month_num|     month_vec|
+---+---+-----+----+-----+-----+----+----+---+----+----+----+---------+--------------+
|  7|  5|  mar|86.2| 26.2| 94.3| 5.1| 8.2| 51| 6.7| 0.0| 0.0|      2.0|(11,[2],[1.0])|
|  7|  4|  oct|90.6| 35.4|669.1| 6.7|18.0| 33| 0.9| 0.0| 0.0|      6.0|(11,[6],[1.0])|
|  7|  4|  oct|90.6| 43.7|686.9| 6.7|14.6| 33| 1.3| 0.0| 0.0|      6.0|(11,[6],[1.0])|
|  8|  6|  mar|91.7| 33.3| 77.5| 9.0| 8.3| 97| 4.0| 0.2| 0.0|      2.0|(11,[2],[1.0])|
|  8|  6|  mar|89.3| 51.3|102.2| 9.6|11.4| 99| 1.8| 0.0| 0.0|      2.0|(11,[2],[1.0])|
|  8|  6|  aug|92.3| 85.3|488.0|14.7|22.2| 29| 5.4| 0.0| 0.0|      0.0|(11,[0],[1.0])|
|  8|  6|  aug|92.3| 88.9|495.6| 8.5|24.1| 27| 3.1| 0.0| 0.0|      0.0|(11,[0],[1.0])|
|  8|  6|  aug|91.5|145.4|608.2|10.7| 8.0| 86| 2.2| 0.0| 0.0|      0.0|(11,[0],[1.0])|
|  8|  6|  sep|91.0|129.5|692.6| 7.0|13.1| 

Great, we now have a OneHotEncoded sparse vector in the `'month_vec'` column! Because Spark is optimized for big data, sparse vectors are used rather than entirely new columns for dummy variables because it is more space efficient. You can see in this first row of the DataFrame:  

`month_vec=SparseVector(11, {2: 1.0})` this indicates that we have a sparse vector of size 11 (because of the parameter `dropLast = True` in `OneHotEncoderEstimator()`) and this particular data point is the 2nd index of our month labels (march, based off the labels in the `model` StringEstimator transformer).  

The final requirement for all machine learning models in PySpark is to put all of the features of your model into one sparse vector. This is once again for efficiency sake. Here, we are doing that with the `VectorAssembler()` estimator.

In [45]:
features = ['X',
 'Y',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'month_vec']

target = 'area'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

In [46]:
vectorized_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}), features=SparseVector(21, {0: 7.0, 1: 5.0, 2: 86.2, 3: 26.2, 4: 94.3, 5: 5.1, 6: 8.2, 7: 51.0, 8: 6.7, 12: 1.0}))

In [47]:
vectorized_df.show(5)

+---+---+-----+----+----+-----+---+----+---+----+----+----+---------+--------------+--------------------+
|  X|  Y|month|FFMC| DMC|   DC|ISI|temp| RH|wind|rain|area|month_num|     month_vec|            features|
+---+---+-----+----+----+-----+---+----+---+----+----+----+---------+--------------+--------------------+
|  7|  5|  mar|86.2|26.2| 94.3|5.1| 8.2| 51| 6.7| 0.0| 0.0|      2.0|(11,[2],[1.0])|(21,[0,1,2,3,4,5,...|
|  7|  4|  oct|90.6|35.4|669.1|6.7|18.0| 33| 0.9| 0.0| 0.0|      6.0|(11,[6],[1.0])|(21,[0,1,2,3,4,5,...|
|  7|  4|  oct|90.6|43.7|686.9|6.7|14.6| 33| 1.3| 0.0| 0.0|      6.0|(11,[6],[1.0])|(21,[0,1,2,3,4,5,...|
|  8|  6|  mar|91.7|33.3| 77.5|9.0| 8.3| 97| 4.0| 0.2| 0.0|      2.0|(11,[2],[1.0])|(21,[0,1,2,3,4,5,...|
|  8|  6|  mar|89.3|51.3|102.2|9.6|11.4| 99| 1.8| 0.0| 0.0|      2.0|(11,[2],[1.0])|(21,[0,1,2,3,4,5,...|
+---+---+-----+----+----+-----+---+----+---+----+----+----+---------+--------------+--------------------+
only showing top 5 rows



In [52]:
vectorized_df.select('features').show(1)

+--------------------+
|            features|
+--------------------+
|(21,[0,1,2,3,4,5,...|
+--------------------+
only showing top 1 row



Great! We now have our data in a format that seems acceptable for the last step. It's time for us to actually fit our model to data! Let's fit a Random Forest Regression model to our data. Although there are still a bunch of other features in the DataFrame, it doesn't matter for the machine learning model API. All that needs to be specified are the names of the features column and the label column. 

In [53]:
## instantiating and fitting the model
rf_model = RandomForestRegressor(featuresCol='features', 
                                 labelCol='area', predictionCol='prediction').fit(vectorized_df)

In [54]:
rf_model.featureImportances

SparseVector(21, {0: 0.1078, 1: 0.0738, 2: 0.1268, 3: 0.1302, 4: 0.0667, 5: 0.0809, 6: 0.1188, 7: 0.078, 8: 0.1735, 10: 0.0019, 11: 0.0345, 13: 0.0051, 14: 0.0, 15: 0.0, 16: 0.0001, 17: 0.0004, 20: 0.0013})

In [55]:
## generating predictions
predictions = rf_model.transform(vectorized_df).select('area', 'prediction')
predictions.head(10)

[Row(area=0.0, prediction=7.034477831585538),
 Row(area=0.0, prediction=5.741407869028901),
 Row(area=0.0, prediction=5.899075598696159),
 Row(area=0.0, prediction=6.410605534328506),
 Row(area=0.0, prediction=5.987347138750447),
 Row(area=0.0, prediction=6.147514975840005),
 Row(area=0.0, prediction=4.846220517895806),
 Row(area=0.0, prediction=11.515385720211498),
 Row(area=0.0, prediction=10.57529604485081),
 Row(area=0.0, prediction=6.065056356539797)]

In [56]:
predictions.show(10)

+----+------------------+
|area|        prediction|
+----+------------------+
| 0.0| 7.034477831585538|
| 0.0| 5.741407869028901|
| 0.0| 5.899075598696159|
| 0.0| 6.410605534328506|
| 0.0| 5.987347138750447|
| 0.0| 6.147514975840005|
| 0.0| 4.846220517895806|
| 0.0|11.515385720211498|
| 0.0| 10.57529604485081|
| 0.0| 6.065056356539797|
+----+------------------+
only showing top 10 rows



Now we can evaluate how well the model performed using `RegressionEvaluator`.

In [57]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area')

In [58]:
## evaluating r^2
evaluator.evaluate(predictions,{evaluator.metricName: 'r2'})

0.7071823469855036

In [59]:
## evaluating mean absolute error
evaluator.evaluate(predictions,{evaluator.metricName: 'mae'})

13.099804205576223

## Putting it all in a Pipeline

We just performed a whole lot of transformations to our data. Let's take a look at all the estimators we used to create this model:

* `StringIndexer()` 
* `OneHotEnconderEstimator()` 
* `VectorAssembler()` 
* `RandomForestRegressor()` 

Once we've fit our model in the Pipeline, we're then going to want to evaluate it to determine how well it performs. We can do this with:

* `RegressionEvaluator()` 

We can streamline all of these transformations to make it much more efficient by chaining them together in a pipeline. The Pipeline object expects a list of the estimators prior set to the parameter `stages`.

In [60]:
# importing relevant libraries
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml import Pipeline

In [61]:
## instantiating all necessary estimator objects

string_indexer = StringIndexer(inputCol='month', outputCol='month_num', handleInvalid='keep')
one_hot_encoder = OneHotEncoderEstimator(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
vector_assember = VectorAssembler(inputCols=features, outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features', labelCol='area')
stages = [string_indexer, one_hot_encoder, vector_assember, random_forest]

# instantiating the pipeline with all them estimator objects
pipeline = Pipeline(stages=stages)

### Cross-validation 

You might have missed a critical step in the random forest regression above; we did not cross validate or perform a train/test split! Now we're going to fix that by performing cross-validation and also testing out multiple different combinations of parameters in PySpark's `GridSearch()` equivalent. To begin with, we will create a parameter grid that contains the different parameters we want to use in our model.

In [64]:
# creating parameter grid

params = ParamGridBuilder().addGrid(random_forest.maxDepth, 
                                    [5, 10, 15]).addGrid(random_forest.numTrees, 
                                                         [20 ,50, 100]).build()

Let's take a look at the params variable we just built.

In [65]:
print('total combinations of parameters: ', len(params))

params[0]

total combinations of parameters:  9


{Param(parent='RandomForestRegressor_8028ab5af607', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
 Param(parent='RandomForestRegressor_8028ab5af607', name='numTrees', doc='Number of trees to train (>= 1).'): 20}

Now it's time to combine all the steps we've created to work in a single line of code with the `CrossValidator()` estimator.

In [66]:
## instantiating the evaluator by which we will measure our model's performance
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area', metricName = 'mae')

## instantiating crossvalidator estimator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=reg_evaluator, parallelism=4)

In [67]:
## fitting crossvalidator
cross_validated_model = cv.fit(fire_df)

In [68]:
cross_validated_model

CrossValidatorModel_ab2e303bebc6

Now, let's see how well the model performed! Let's take a look at the average performance for each one of our 9 models. It looks like the optimal performance is an MAE around 23. Note that this is worse than our original model, but that's because our original model had substantial data leakage. We didn't do a train-test split!

In [72]:
cross_validated_model.avgMetrics

[21.546567885598343,
 21.700783994333744,
 21.71813240757526,
 22.32852941832796,
 22.460430571200067,
 22.71186702248246,
 22.786022559039647,
 22.84838924554876,
 22.97834940511436]

Now, let's take a look at the optimal parameters of our best performing model. The `cross_validated_model` variable is now saved as the best performing model from the grid search just performed. Let's look to see how well the predictions performed. As you can see, this dataset has a large number of areas of "0.0" burned. Perhaps, it would be better to investigate this problem as a classification task.

In [73]:
predictions = cross_validated_model.transform(spark_df)
predictions.select('prediction', 'area').show(300)

+------------------+-------+
|        prediction|   area|
+------------------+-------+
|  6.21638032027937|    0.0|
| 5.689308392052814|    0.0|
| 6.287278576093263|    0.0|
|7.2303391348948916|    0.0|
| 4.893834494984671|    0.0|
|7.2776916492575365|    0.0|
| 6.405596897636968|    0.0|
| 7.578126198825025|    0.0|
| 7.938687942969916|    0.0|
| 6.299323466928851|    0.0|
|  7.16811037397764|    0.0|
| 6.406995913205646|    0.0|
|7.5923550770931625|    0.0|
| 9.224165482931967|    0.0|
| 48.82703158890588|    0.0|
| 8.184943906863463|    0.0|
|  4.81675815285105|    0.0|
| 7.417314140998465|    0.0|
|  5.01449999168616|    0.0|
| 4.894479532568757|    0.0|
| 9.801897997313683|    0.0|
| 4.991812691841854|    0.0|
| 5.784231376331861|    0.0|
| 7.793788787286428|    0.0|
| 7.492145101337208|    0.0|
| 6.476184075585179|    0.0|
| 8.322843023650924|    0.0|
| 9.203488531085192|    0.0|
| 33.63931723470933|    0.0|
|10.148866640975502|    0.0|
| 34.65955114543513|    0.0|
|6.64835441981

Now let's go ahead and take a look at the feature importances of our random forest model. In order to do this, we need to unroll our pipeline to access the random forest model. Let's start by first checking out the `.bestModel` attribute of our `cross_validated_model`. 

In [74]:
type(cross_validated_model.bestModel)

pyspark.ml.pipeline.PipelineModel

`ml` is treating the entire pipeline as the best performing model, so we need to go deeper into the pipeline to access the random forest model within it. Previously, we put the random forest model as the final "stage" in the stages variable list. Let's look at the `.stages` attribute of the `.bestModel`.

In [75]:
cross_validated_model.bestModel.stages

[StringIndexer_51c3e33022eb,
 OneHotEncoderEstimator_89d82973e660,
 VectorAssembler_e733005e5671,
 RandomForestRegressionModel (uid=RandomForestRegressor_8028ab5af607) with 100 trees]

Perfect! There's the RandomForestRegressionModel, represented by the last item in the stages list. Now, we should be able to access all the attributes of the random forest regressor.

In [76]:
optimal_rf_model = cross_validated_model.bestModel.stages[3]

In [77]:
optimal_rf_model.featureImportances

SparseVector(22, {0: 0.1014, 1: 0.0651, 2: 0.1497, 3: 0.1523, 4: 0.0762, 5: 0.0741, 6: 0.1139, 7: 0.0866, 8: 0.1444, 9: 0.0, 10: 0.0155, 11: 0.0087, 12: 0.0008, 13: 0.0094, 14: 0.0001, 15: 0.0004, 16: 0.0001, 17: 0.0006, 18: 0.0001, 20: 0.0004})

In [78]:
optimal_rf_model.getNumTrees

100

## Summary

In this lesson, you learned about PySpark's DataFrames, machine learning models, and pipelines. With the use of a pipeline, you can train a huge number of models simultaneously, saving you a substantial amount of time and effort. Up next, you will have a chance to build a PySpark machine learning pipeline of your own with a classification problem!