# Tuning Machine Learning models in Spark

<a href = "http://yogen.io"><img src="http://yogen.io/assets/logo.svg" alt="yogen" style="width: 200px; float: right;"/></a>

### If you are running this notebook in Google Colab

Copy the following to a code cell and run it. It will install and set up Spark for you.

```python
!pip install pyspark==3.1.1

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.ui.port", "4050").getOrCreate()  ## For ngrok to tunnel to
```

## ML Pipelines in Spark

ML model training and tuning often represents running the same steps once and again. Often, we run the same steps with small variations in order to evaluate combinations of parameters. 

In order to make this use case a lot easier, Spark provides the [Pipeline](https://spark.apache.org/docs/3.1.1/ml-pipeline.html) abstraction.

A Pipeline represents a series of steps in the processing of a dataset. Each step is a Transformer or an Estimator. The whole Pipeline is an Estimator, so we can .fit the whole pipeline in one step. When we do that, the steps'  .fit and .transform methods will be called in turn.

![pipelineestimator](https://spark.apache.org/docs/3.1.1/img/ml-Pipeline.png)

![PipelineModel](https://spark.apache.org/docs/3.1.1/img/ml-PipelineModel.png)

## Example: predicting flight delays

We'll be using the same [Transtats'](https://www.transtats.bts.gov/) OTP performance data] from way back when. Remember it?

It's a table that contains all domestic departures by US air air carriers that represent at least one percent of domestic scheduled passenger revenues, with data on each individual departure including [Tail Number](https://en.wikipedia.org/wiki/Tail_number), departure delay, origin, destination and carrier.


### Load the data

In [None]:
catalog.list()

In [None]:
df = catalog.load('flights_features')

In [None]:
df.columns

In [None]:
df.show(10)

## Handle different fields in different ways

We have features of at least three kinds:

* Numeric continuous fields, which we can use as input to many algorithms as they are. In particular, decision trees can take continuous variables with any value as input, since they only look for the cutoff point that most increases the homogeneity of the resulting groups. In contrast, if we were using a logistic regression with regularization, for example, we would need to first scale the variables to have comparable magnitudes.

* There are fields which we will treat as categorical variables, but which are already integers. These need to be one-hot encoded.

* Finally, there are several categorical variables that are encoded as strings. These need to be one-hot encoded, but OneHotEncoder requires numeric input. Therefore, we will need to apply a StringIndexer to each of them before one-hot encoding.

```python
# Reminder:

categorical_fields = ['DepHour', 'DepMonth','DayOfWeek']

string_fields = ['Airline']

continuous_fields = ['Distance', 'Airtime']

target_field = 'DepDel15'
```

In [None]:
categorical_fields = ['DepHour', 'DepMonth','DayOfWeek','DepYear']

string_fields = ['Airline']

continuous_fields = ['Distance']

target_field = 'DepDel15'

In [None]:
df.printSchema()

In [None]:
df.schema.fields

In [None]:
one_field = df.schema.fields[0]

one_field.name

In [None]:
one_field.dataType

## Handling categorical fields

Let's do the processing of just one field first, as an example. Then we will process the rest.

### StringIndexer 

A [StringIndexer](https://spark.apache.org/docs/3.1.1/ml-features.html#stringindexer) is an estimator that takes a single string field, then produces a transformer that codifies said field as numeric labels that are fit for feeding to a one-hot encoding. 

We need to specify an input column, an output column, and a way to handle invalids. In this case, invalids are values that the indexer has not seen during fitting but that the transformer finds during processing. Its values are 'error' (the default), which is pretty self-explanatory, 'skip', which drops them, and 'keep', which is what we want. It will assign all unseen labels to a single category index.

In [None]:
from pyspark.ml.feature import StringIndexer

carrier_indexer = StringIndexer(
    inputCol='Airline', outputCol='AirlineIndex', handleInvalid='keep'
)

In [None]:
carriers = df.select('Airline')

In [None]:
carrier_indexer_transformer = carrier_indexer.fit(carriers)

In [None]:
carrier_indexer_transformer.transform(carriers).show(10)

### OneHotEncoder

A [OneHotEncoder](https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator) generates a n-1 length vector column for an n-category column of category indices. 

We need to specify an input and an output column.

In [None]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
onehot_encoder = OneHotEncoder(inputCols=['AirlineIndex'], outputCols=['AirlineOneHot'])

In [None]:
# The whole process for a single field would be like this:

carrier_indexer_transformer = carrier_indexer.fit(carriers)
indexed = carrier_indexer_transformer.transform(carriers)

onehot_encoder_model = onehot_encoder.fit(indexed)
onehot_encoded = onehot_encoder_model.transform(indexed)

onehot_encoded

In [None]:
len(df.select('Airline').distinct().rdd.map(lambda x:x[0]).collect())

In [None]:
onehot_encoded.show(5)

### SparseVectors

The vectors produced by OneHotEncoder will each have only one non-zero value, but can potentially be very long. An efficient way to represent them is therefore a SparseVector, and that is what OneHotEncoder generates. 

A SparseVector is a data structure that only stores the length of the vector, a list of positions, and a list of values. All other values are assumed to be 0s.

This way, a vector like the following, with lenght 15 and non-zero values only on positions 3 and 9:

```python
[0.0, 0.0, 0.0, 6.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0]
```

can be compactly expressed as

```python
(15, [3, 9], [6.0, 4.0])
```

In [None]:
from pyspark.ml.linalg import SparseVector

## Let's build our first Pipeline!

Our pipeline consists of a number of StringIndexers, followed by one OneHotEncoder, followed by a VectorAssembler, with a RandomForestClassifier at the end.

A Spark Pipeline is a single Estimator. We build it secifying the stages it comprises, and then we are ready to .fit it in one go. This will save us a lot of trouble, since we don't need to fit and transform each stage individually.

In [None]:
from pyspark.ml.pipeline import Pipeline

Pipeline

### StringIndexer stages

We only need to StringIndex some of the fields. We are going to build the input and output column names programatically.


In [None]:
string_fields

In [None]:
indexers = [StringIndexer(inputCol=field, outputCol=field + 'Index', handleInvalid='keep') for field in string_fields]

### OneHotEncoderEstimator

One OneHotEncoderEstimator can handle all categorical columns. We are also going to build it programatically

In [None]:
num_categoricals = categorical_fields

In [None]:
num_categoricals

In [None]:
num_categoricals_onehot_outputCols = [ field + 'OneHot' for field in num_categoricals]

In [None]:
string_fields

In [None]:
string_categoricals_onehot_inputCols = [ field + 'Index' for field in string_fields]
string_categoricals_onehot_outputCols = [ field + 'OneHot' for field in string_fields]

In [None]:
onehotencoder = OneHotEncoder(inputCols = string_categoricals_onehot_inputCols + num_categoricals, 
                              outputCols = string_categoricals_onehot_outputCols + num_categoricals_onehot_outputCols)

### VectorAssembler

Once we have generated our features, we can assemble them into a single features column, together with the continuous_fields.

In [None]:
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols= string_categoricals_onehot_outputCols 
                     + num_categoricals_onehot_outputCols 
                     + continuous_fields, 
                     outputCol='features')

### RandomForestClassifier

Aaaaand we are ready to do some Machine Learning! We'll use a RandomForestClassifier to try to predict delayed versus non delayed flights, a binary classification task.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier = RandomForestClassifier(featuresCol='features', labelCol=target_field)

### Pipeline!

Now that we have all the stages, we are finally ready to put them together into a single Estimator, our Pipeline.

In [None]:
indexers

In [None]:
onehotencoder

In [None]:
pipeline = Pipeline(stages = indexers + [onehotencoder, va, rf_classifier])

pipeline

Now that we have gone to the trouble of building our Pipeline, fitting it and using it to predict the probabilty of delay on unseen data is as easy as using a single Estimator:

In [None]:
%%time

pipeline_model = pipeline.fit(df)

In [None]:
catalog.list()

In [None]:
predicted = pipeline_model.transform(catalog.load('X_test'))
predicted

In [None]:
predicted.select('probability').take(5)

In [None]:
catalog.load('y_test').take(5)

## Evaluating and tuning our Pipeline

Probably the most interesting use of Spark Pipelines is quickly (in terms of coding time) evaluating many combinations of hyperparameters to feed our model and choosing the best ones. For that, we can use a TrainValidationSplit or a CrossValidator. The CrossValidator will generally perform better, but it will take several times as much. I'm using here the TrainValidationSplit because the API is the same.

In [None]:
from pyspark.ml.tuning import TrainValidationSplit, CrossValidator

help(TrainValidationSplit)

### Params and Evaluators

In order to evaluate different sets of parameters, we need a) the set of parameters to iterate through and b) a metric to compare the results. 

The first element is represented by ParamMaps, which we build with a ParamGridBuilder, and the second by an Evaluator that needs to be specific to the relevant task.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

builder = ParamGridBuilder()

our_param_map = builder.addGrid(rf_classifier.maxDepth, [5, 15])\
                       .addGrid(rf_classifier.numTrees, [10, 30])\
                       .build()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

our_evaluator = BinaryClassificationEvaluator(labelCol=target_field)

We now have all the elements in place to perform our fit:

In [None]:
split = TrainValidationSplit(estimator=pipeline, 
                             evaluator=our_evaluator, 
                             estimatorParamMaps=our_param_map)

In [None]:
parameters = catalog.load('parameters')

In [None]:
data_train, data_test = df.randomSplit(
    weights=[parameters["train_fraction"], 1 - parameters["train_fraction"]]
 )

In [None]:
%%time

split_model_chosen = split.fit(data_train)

And now we can predict on the rest of the flights and compare them with reality:

In [None]:
%%time

predictions = split_model_chosen.transform(data_test).select('features',
                                                        target_field,
                                                        'rawPrediction',
                                                        'probability',
                                                        'prediction')

In [None]:
predictions.show(5)

### Let's have a look

We are now ready to compare our predictions with reality. Do these features have any predictive power at all?

In [None]:
predictions.show(10)

In [None]:
from pyspark.sql.functions import col, udf

In [None]:
import pandas as pd

In [None]:
predicted_probs = predictions.select(target_field, 'probability').toPandas()

In [None]:
y_true = predicted_probs[target_field]
y_prob_predicted = predicted_probs['probability'].map(lambda vector: vector[1])

In [None]:
predicted_probs['prediction'] = predicted_probs['probability'].apply(lambda l: 0 if l[0]>0.90 else 1)

In [None]:
predicted_probs.value_counts(target_field)

In [None]:
predicted_probs.value_counts('prediction')

In [None]:

fpr, tpr, thresholds = roc_curve(y_true, y_prob_predicted)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

plt.plot(fpr, tpr)
plt.plot(np.linspace(0,1, 10), np.linspace(0,1, 10))

Not bad, considering we have not performed any feature engineering at all!

### Further Reading

https://spark.apache.org/docs/latest/ml-tuning.html

https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark

In [None]:
# Spark job for the cluster

from pyspark.sql import SparkSession, types, functions
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(csvname, header= True, inferSchema=True)

# Preprocessing
csvname = 'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv'
columns_of_interest = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                       'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                       'DepTime', 'DepDelay', 'AirTime', 'Distance']


# Feature extraction
flights = spark.read.csv(csvname, header=True, inferSchema=True)
flights = flights.select(columns_of_interest)

flights = flights.na.drop()
flights = flights.withColumn('DepHour', (flights['DepTime'] / 100).cast(types.IntegerType()))
flights = flights.withColumn('Delayed', (flights['DepDelay'] > 15).cast(types.IntegerType()))

# Train/test split
flights_sample, rest = flights.randomSplit([.8, .2])

# Build the Pipeline
categorical_fields = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 
                      'Origin', 'OriginCityName', 'OriginStateName', 
                      'Dest', 'DestCityName', 'DestStateName']

string_fields = [field.name for field in flights_sample.schema.fields if field.dataType == types.StringType()]

continuous_fields = ['Distance', 'DepHour']

target_field = 'Delayed'

indexers = [StringIndexer(inputCol=field, outputCol=field + 'Index', handleInvalid='keep') for field in string_fields]
non_string_categoricals = [field for field in categorical_fields if field not in string_fields]
non_string_categorical_onehotencoders = [OneHotEncoder(inputCol=field, outputCol=field + 'OneHot') for field in non_string_categoricals]
string_categorical_onehotencoders = [OneHotEncoder(inputCol=field+'Index', outputCol=field + 'OneHot') for field in string_fields]

input_cols_onehotencoded = [field + 'OneHot' for field in categorical_fields]

va = VectorAssembler(inputCols= input_cols_onehotencoded + continuous_fields, outputCol='features')

rf_classifier = RandomForestClassifier(featuresCol='features', labelCol='Delayed')

pipeline = Pipeline(stages=indexers + 
                    string_categorical_onehotencoders + 
                    non_string_categorical_onehotencoders + 
                    [va] + 
                    [rf_classifier])

# Tuning and Training
builder = ParamGridBuilder()

our_param_map = builder.addGrid(rf_classifier.maxDepth, [5, 15])\
                       .addGrid(rf_classifier.numTrees, [10, 30])\
                       .build()        

our_evaluator = BinaryClassificationEvaluator(labelCol='Delayed')

split = TrainValidationSplit(estimator=pipeline, 
                             evaluator=our_evaluator, 
                             estimatorParamMaps=our_param_map)

split_model_chosen = split.fit(flights_sample)
predicted = split_model_chosen.transform(rest)
predictions = predicted.select('features',
                               'Delayed',
                               'rawPrediction',
                               'probability',
                               'prediction')

predicted.write.json('out/predicted')
split_model_chosen.bestModel.save('out/split_model_chosen')

In [None]:
# Check output
predicted_probs = predictions.select('Delayed', 'probability').toPandas()
y_true = predicted_probs['Delayed']
y_prob_predicted = predicted_probs['probability'].map(lambda vector: vector[1])

from sklearn.metrics import roc_curve

fpr, tpr, thresholds = roc_curve(y_true, y_prob_predicted)
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

plt.plot(fpr, tpr)
plt.plot(np.linspace(0,1, 10), np.linspace(0,1, 10))