## Tuning Model Parameters

In this exercise, you will optimise the parameters for a classification model.

### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [1]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Load the source data
csv = spark.read.csv('wasb:///data/flights.csv', inferSchema=True, header=True)

# Select features and label
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))

# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1528428190678_0008,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Define the Pipeline
Now define a pipeline that creates a feature vector and trains a classification model

In [2]:
# Define the pipeline
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
lr = LogisticRegression(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[assembler, lr])

### Tune Parameters
You can tune parameters to find the best model for your data. A simple way to do this is to use  **TrainValidationSplit** to evaluate each combination of parameters defined in a **ParameterGrid** against a subset of the training data in order to find the best performing parameters.

In [3]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, [0.35, 0.30]).build()
tvs = TrainValidationSplit(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)

model = tvs.fit(train)

### Test the Model
Now you're ready to apply the model to the test data.

In [4]:
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "probability", "trueLabel")
predicted.show(100)

+--------------------+----------+--------------------+---------+
|            features|prediction|         probability|trueLabel|
+--------------------+----------+--------------------+---------+
|[1.0,1.0,10140.0,...|       0.0|[0.91805525483914...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90191948208358...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.89493463140551...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90858998481218...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90202857538504...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.89505058980440...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.94830482226047...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.94830482226047...|        0|
|[1.0,1.0,10140.0,...|       1.0|[0.01079481033447...|        1|
|[1.0,1.0,10140.0,...|       0.0|[0.92841955411988...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.91342971646641...|        0|
|[1.0,1.0,10140.0,...|       1.0|[0.55291190910388...|        1|
|[1.0,1.0,10140.0,...|   

### Compute Confusion Matrix Metrics
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision* and *recall* can be calculated.

In [5]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          113457.0|
|       FP|           11555.0|
|       TN|          638788.0|
|       FN|           48386.0|
|Precision|0.9075688733881547|
|   Recall|0.7010312463313211|
+---------+------------------+

### Review the Area Under ROC
Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. the spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this.

In [6]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print "AUR = ", aur

AUR =  0.841631849526