## Đồ án học máy sử dụng Apache Spark ML và SparkSQL
* Đồ án này sử dụng pyspark phiên bản 3.4.0 và được chạy trên Jupyter Notebook

## 1. Nhập dữ liệu và mô tả dữ liệu

#### Load dữ liệu sử dụng một lược đồ tạo thủ công
* Trong notebook này, dữ liệu sử dụng đó là dữ liệu chứa chi tiết của các chuyến bay
* Ở những bước đầu tiên, nhóm thực hiện khám phá dữ liệu sau khi load nó vào DataFrame


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkMLExample').getOrCreate()

In [2]:
# import the dataframe sql data types
from pyspark.sql.types import *
#
# flightSchema describes the structure of the data in the flights.csv file
#
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])
#
# Use the dataframe reader to read the file and 
#
flights = spark.read.csv('data/raw-flight-data.csv', schema=flightSchema, header=True)
flights.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

### Load dữ liệu sử dụng tính năng tự động tạo lược đồ
* Nếu không định nghĩa sẵn lược đồ, có thể cho Spark đọc file và tạo schema tự động
* Để minh hoạ, nhóm chọn tập dữ liệu `airports.csv` vì tính đơn giản của nó

In [4]:
airports = spark.read.csv('data/airports.csv', header=True, inferSchema=True)
airports.show(10)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
+----------+-----------+-----+--------------------+
only showing top 10 rows



Lược đồ được tạo tự động từ Spark:

In [5]:
# Show the inferred schema for the airports dataframe
airports.printSchema()

root
 |-- airport_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)



### Sử dụng các method có sẵn trong DataFrame
Spark DataFrames cung cấp nhiều hàm có sẵn dùng để trích xuất và xử lý dữ liệu.  
Dưới đây là ví dụ dùng để hiển thị 5 thành phố đầu tiên trong tập dữ liệu về sân bay

In [6]:
cities = airports.select("city", "name")
cities.limit(5).show()

+-----------+--------------------+
|       city|                name|
+-----------+--------------------+
|Adak Island|                Adak|
|  Anchorage|Ted Stevens Ancho...|
|      Aniak|       Aniak Airport|
|     Barrow|Wiley Post/Will R...|
|     Bethel|      Bethel Airport|
+-----------+--------------------+



### Combine Operations
You can combine functions in a single statement to perform multiple operations on a DataFrame. 

In this case, we will use the **join** function to combine the **flights** and **airports** DataFrames, and then use the **groupBy** and **count** functions to return the number of flights from each airport.

We will then sort by the descending counts to get the top 5 airports by number of flights. 

In [7]:
from pyspark.sql import functions as F

flightsByOrigin = flights\
.join(airports, flights.OriginAirportID == airports.airport_id)\
.groupBy("city")\
.agg(F.count(F.lit(1)).alias("Count"))\
.orderBy("Count", ascending=False)

flightsByOrigin.limit(5).show()

+-----------------+------+
|             city| Count|
+-----------------+------+
|          Chicago|177845|
|          Atlanta|149970|
|      Los Angeles|118684|
|         New York|118540|
|Dallas/Fort Worth|105024|
+-----------------+------+



### Determine Summary Statistics

Predictive modeling is based on statistics and probability, so you will often start by looking at summary statistics. 

The **describe** function returns a DataFrame containing the **count**, **mean**, **standard deviation**, **minimum**, and **maximum** values for each numeric column.

Let's look at the flights dataframe statistics.

In [8]:
flights.describe().show()

+-------+-----------------+------------------+-------+------------------+------------------+------------------+-----------------+
|summary|       DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|          DepDelay|         ArrDelay|
+-------+-----------------+------------------+-------+------------------+------------------+------------------+-----------------+
|  count|          2719418|           2719418|2719418|           2719418|           2719418|           2691974|          2690385|
|   mean|15.79747468024408|3.8983907586108497|   null| 12742.26441172339|12742.455345592329| 10.53686662649788| 6.63768791455498|
| stddev| 8.79986016898541|1.9859881390373326|   null|1501.9729397025696|1501.9692528927906|36.099528066431446|38.64881489390091|
|    min|                1|                 1|     9E|             10140|             10140|               -63|              -94|
|    max|               31|                 7|     YV|             15376|             1537

## 2. Làm sạch dữ liệu và khám phá dữ liệu

### Determine the Presence of Duplicates
The data you have to work with won't always be perfect - often you'll want to *clean* the data; for example to detect and remove duplicates that might affect your model. 

You can use the **dropDuplicates** function to create a new DataFrame with the duplicates removed, enabling you to determine how many rows are duplicates of other rows.

In [10]:
total_flights = flights.count()
unique_flights = flights.dropDuplicates().count()

print("Number of duplicate rows = ",total_flights - unique_flights)

Number of duplicate rows =  22435


### Identify Missing Values using <code>dropNa</code>

As well as determing if duplicates exist in your data, you should detect missing values, and either remove rows containing missing data or replace the missing values with a suitable relacement. 

The **dropna** function creates a DataFrame with any rows containing missing data removed - you can specify a subset of columns, and whether the row should be removed in *any* or *all* values are missing. You can then use this new DataFrame to determine how many rows contain missing values.

Below, we count rows that have missing Arrival or Departure delay values. 

In [11]:
unique_flights_withoutNA =  flights.dropDuplicates()\
.dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()

print("Missing values (excluding dups) = ", total_flights - unique_flights_withoutNA)

Missing values (excluding dups) =  46233


### Cleaning the Data

Now that you've identified that there are duplicates and missing values, you can clean the data by removing the duplicates and replacing the missing values. 

The **fillna** function replaces missing values with a specified replacement value. 

In this case, we'll remove all duplicate rows and replace missing **ArrDelay** and **DepDelay** values with **0**.

In [12]:
data = flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"]).repartition(32)

# Let's cache this for efficient future use
data.cache()

print("Number of rows in cleaned data set = ", data.count(), "Number of partitions = ", data.rdd.getNumPartitions())

Number of rows in cleaned data set =  2696983 Number of partitions =  32


### Check Summary Statistics

After cleaning the data, we should re-check the statistics - removing rows and changing values may affect the distribution of the data, which in turn could affect any predictive models you might create.

In [13]:
data.describe().show()

+-------+------------------+------------------+-------+------------------+-----------------+------------------+------------------+
|summary|        DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|    DestAirportID|          DepDelay|          ArrDelay|
+-------+------------------+------------------+-------+------------------+-----------------+------------------+------------------+
|  count|           2696983|           2696983|2696983|           2696983|          2696983|           2696983|           2696983|
|   mean|15.798996508320593| 3.900369412784582|   null|12742.459424846207|12742.85937657004|10.531134234068217|6.6679285705545785|
| stddev| 8.801267199135454|1.9864582421701988|   null|1502.0359941370607|1501.993958981797| 36.06172819056576|38.583861473580725|
|    min|                 1|                 1|     9E|             10140|            10140|               -63|               -94|
|    max|                31|                 7|     YV|             15376|         

### Explore Relationships in the Data

Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another and identify any apparent correlation. 

The **corr** function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, which a string negative correlation (near -1) indicates that *low* values for one column are often found with *high* values for the other. A correlation near 0 indicates little apparent relationship between the fields.

Let's look at the correlation between *DepDelay* and *ArrDelay*

In [14]:
data.corr("DepDelay", "ArrDelay")

0.9392630367706979

### Using Spark SQL

In addition to using the DataFrame API directly to query data, you can persist DataFrames as a table and use Spark SQL to query them using the SQL language. SQL is often more intuitive to use when querying tabular data structures.

Here we'll create a view of the cleaned data and run a SQL query to get the average arrival delay by day of week.

In [15]:
data.createOrReplaceTempView("flightData")
spark.sql(""" 
SELECT DayOfWeek, CAST(AVG(ArrDelay) as DECIMAL(6,2)) AS `Avg Delay(min)` 
FROM flightData 
GROUP BY DayOfWeek 
ORDER BY DayOfWeek 
""").show()

+---------+--------------+
|DayOfWeek|Avg Delay(min)|
+---------+--------------+
|        1|          7.08|
|        2|          4.39|
|        3|          7.23|
|        4|         10.78|
|        5|          8.71|
|        6|          2.14|
|        7|          5.25|
+---------+--------------+



## 3. Chuẩn bị dữ liệu, xây dựng pipeline và huấn luyện mô hình



#### Preparing the data for machine learning

In this example, the data has already been cleaned. A subset of columns is selected to use as features. The target variable is created as a Boolean label field named Late with **the value 1 for flights that arrived 15 minutes or more after the scheduled arrival time, or 0 if the flight was early or on-time.**

In [16]:
# Import sql functions and ML libraries
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

In [17]:
data.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



### Split the data into training and test sets

It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. 

In this exercise, 70% of the data is used for training, and reserve 30% for testing.

In [19]:
splits = data.randomSplit([0.7, 0.3])

train = splits[0]
# rename the target variable in the test set to trueLabel
test = splits[1].withColumnRenamed("label", "trueLabel")

train_rows = train.count()
test_rows = test.count()

print ("Training rows count:", train_rows, " Testing rows count:", test_rows)

Training rows count: 1886184  Testing rows count: 810799


In [61]:
train.show(1, vertical = True)

-RECORD 0----------------
 DayofMonth      | 1     
 DayOfWeek       | 1     
 Carrier         | 9E    
 OriginAirportID | 10423 
 DestAirportID   | 11433 
 DepDelay        | -5    
 label           | 0     
only showing top 1 row



### Prepare the training data for SparkML

A predictive model often requires multiple stages of feature preparation. For example, it is common when using some algorithms to distingish between *continuous features* (which have a calculable numeric value) and *categorical features* (which are numeric representations of discrete categories). It is also common to normalize continuous numeric features to use a common scale (for example, by scaling all numbers to a proportional decimal value between 0 and 1).

A pipeline consists of a series of **transformer** and **estimator** stages that typically prepare a DataFrame for modeling and then train a predictive model. 

In this case, we will create a pipeline with seven stages:
1. **StringIndexer** estimator that converts string values to indexes for categorical features
2. **VectorAssembler** that combines categorical features into a single vector
3. **VectorIndexer** that creates indexes for a vector of categorical features
4. **VectorAssembler** that creates a vector of continuous numeric features
5. **MinMaxScaler** that normalizes continuous numeric features
6. **VectorAssembler** that creates a vector of categorical and continuous features
7. **LogisticRegression** classifier that trains a classification model.

In [20]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml import Pipeline

#Stage 1. convert string values to indexes for categorical features
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")

#Stage 2. combine categorical features into a single vector
catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")

#Stage 3. create indexes for a vector of categorical features
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

#Stage 4. create a vector of continuous numeric features
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")

#Stage 5. normalize continuous numeric features
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

#Stage 6. creates a vector of categorical and continuous features
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

#Stage 7. LogisticRegression classifier that trains a classification model
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)

# Now define the pipeline
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [72]:
piplineModel.transform(train).filter("DayOfWeek == 7").show(2, vertical=True, truncate = 100)

-RECORD 0--------------------------------------------------------------
 DayofMonth      | 1                                                   
 DayOfWeek       | 7                                                   
 Carrier         | 9E                                                  
 OriginAirportID | 10423                                               
 DestAirportID   | 11433                                               
 DepDelay        | -2                                                  
 label           | 0                                                   
 CarrierIdx      | 10.0                                                
 catFeatures     | [10.0,1.0,7.0,10423.0,11433.0]                      
 idxCatFeatures  | [10.0,1.0,6.0,10423.0,11433.0]                      
 numFeatures     | [-2.0]                                              
 normFeatures    | [0.036244800950683304]                              
 features        | [10.0,1.0,6.0,10423.0,11433.0,0.0362448009506

### Train the Classification Model

Next, you need to train the classification model. 

The pipeline itself is an estimator, and so it has a fit method that you can call to run the pipeline on a specified DataFrame. 

In this case, we will run the pipeline on the training data to train a model.

In [21]:
import timeit
start_time = timeit.default_timer()

piplineModel = pipeline.fit(train)

elapsed = timeit.default_timer() - start_time

print ("Model training complete in:", elapsed, "secs")

Model training complete in: 12.12018579999949 secs


#### Test the Pipeline Model

The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified DataFrame and apply the trained model to generate predictions. We can use this approach to predict delay status for flights where the label is unknown; but in this case we are using the test data which includes a known true label value, so you can compare the predicted status to the actual status.

Let's transform the **test** DataFrame using the pipeline to generate label predictions.

In [22]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(10, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,11433.0,12339.0,0.033273915626856804]|0.0       |0        |
|[10.0,1.0,0.0,12478.0,11278.0,0.035056446821152706]|0.0       |0        |
|[10.0,1.0,0.0,12478.0,14100.0,0.06833036244800951] |0.0       |1        |
|[10.0,1.0,0.0,13244.0,11193.0,0.035056446821152706]|0.0       |1        |
|[10.0,1.0,0.0,13487.0,11193.0,0.033273915626856804]|0.0       |0        |
|[10.0,1.0,0.0,13487.0,11193.0,0.03683897801544861] |0.0       |1        |
|[10.0,1.0,0.0,13487.0,14730.0,0.04278074866310161] |0.0       |0        |
|[10.0,1.0,0.0,14122.0,11433.0,0.0677361853832442]  |0.0       |1        |
|[2.0,1.0,0.0,10397.0,11298.0,0.035056446821152706] |0.0       |0        |
|[2.0,1.0,0.0,10821.0,11298.0,0.03149138443256091]  |0.0       |0        |
+------------------------

Looking at the result, the prediction column contains the predicted value for the label, and the trueLabel column contains the actual known value from the testing data. It looks like there are a mix of correct and incorrect predictions.

## 4. Đánh giá kết quả và finetune mô hình

#### Evaluating the classifier: Compute Confusion Matrix Metrics

Classifiers are typically evaluated by creating a confusion matrix, which indicates the number of:

True Positives

True Negatives

False Positives

False Negatives

From these core measures, other evaluation metrics such as precision and recall can be calculated.

In [23]:
def show_metrics(tp, fp, tn, fn):
    print(f"TP = {tp}")
    print(f"FP = {fp}")
    print(f"TN = {tn}")
    print(f"FN = {fn}")
    print(f"Precision = {tp / (tp + fp)}")
    print(f"Recall = {tp / (tp + fn)}")

In [24]:
tp = float(predicted.filter("prediction == 1.0 AND trueLabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND trueLabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND trueLabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND trueLabel == 1").count())
show_metrics(tp, fp, tn, fn)

TP = 19439.0
FP = 77.0
TN = 648611.0
FN = 142672.0
Precision = 0.9960545193687231
Recall = 0.11991166546378716


#### View the Raw Prediction and Probability

The prediction is based on a raw prediction score that describes a labeled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (in this case, 0 and 1). The value with the highest confidence is selected as the prediction.


In [25]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel")\
.show(10, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.6608635911522716,-1.6608635911522716]|[0.8403538958333158,0.15964610416668423]|0.0       |0        |
|[1.6271287650280646,-1.6271287650280646]|[0.8357759286253585,0.16422407137464146]|0.0       |0        |
|[0.846317950913954,-0.846317950913954]  |[0.6997941786898367,0.30020582131016327]|0.0       |1        |
|[1.6337358540744185,-1.6337358540744185]|[0.8366807708136917,0.16331922918630826]|0.0       |1        |
|[1.6778103740905967,-1.6778103740905967]|[0.8426143708076425,0.15738562919235755]|0.0       |0        |
|[1.5938985231552538,-1.5938985231552538]|[0.8311638912087153,0.16883610879128474]|0.0       |1        |
|[1.4570114813374075,-1.4570114813374075]|[0.8110751630

Note that the results include rows where the probability for 0 (the first value in the probability vector) is only slightly higher than the probability for 1 (the second value in the probability vector). The default discrimination threshold (the boundary that decides whether a probability is predicted as a 1 or a 0) is set to 0.5; so the prediction with the highest probability is always used, no matter how close to the threshold.

#### Review the Area Under ROC

Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. The spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this. 

The ROC curve shows the True Positive and False Positive rates plotted for varying thresholds.

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("Area under the ROC curve = ", aur)

Area under the ROC curve =  0.9233582145600991


#### Change the Discrimination Threshold

The AUC score seems to indicate a reasonably good model, but the performance metrics seem to indicate that it predicts a high number of False Negative labels (i.e. it predicts 0 when the true label is 1), leading to a low Recall. 

You can affect the way a model performs by changing its parameters. For example, as noted previously, the default discrimination threshold is set to 0.5 - so if there are a lot of False Positives, you may want to consider raising this; or conversely, you may want to address a large number of False Negatives by lowering the threshold.

In [27]:
#Change the threshold to 0.3 and create a new LogisticRegression model
lr2 = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3, threshold=0.35)

#Set up new pipeline
pipeline2 = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr2])
model2 = pipeline2.fit(train)

#Make new predictions
newPrediction = model2.transform(test)
newPrediction.select("rawPrediction", "probability", "prediction", "trueLabel")\
.show(10, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.660863591152272,-1.660863591152272]  |[0.8403538958333158,0.15964610416668423]|0.0       |0        |
|[1.627128765028065,-1.627128765028065]  |[0.8357759286253587,0.16422407137464134]|0.0       |0        |
|[0.8463179509139545,-0.8463179509139545]|[0.6997941786898368,0.30020582131016316]|0.0       |1        |
|[1.633735854074419,-1.633735854074419]  |[0.8366807708136917,0.16331922918630826]|0.0       |1        |
|[1.6778103740905972,-1.6778103740905972]|[0.8426143708076426,0.15738562919235743]|0.0       |0        |
|[1.5938985231552543,-1.5938985231552543]|[0.8311638912087153,0.16883610879128474]|0.0       |1        |
|[1.4570114813374078,-1.4570114813374078]|[0.8110751630

In [28]:
# Recalculate confusion matrix, using the new predictions
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())

show_metrics(tp2, fp2, tn2, fn2)

TP = 42033.0
FP = 120.0
TN = 648568.0
FN = 120078.0
Precision = 0.9971532275282898
Recall = 0.259285304513574


Note that there are now more True Positives and less False Negatives, and Recall has improved. By changing the discrimination threshold, the model now gets more predictions correct - though it's worth noting that the number of False Positives has also increased.

#### Tune Parameters using cross validation with grid search

You can tune parameters to find the best model for your data. To do this we can use the **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple folds of the data split into training and validation datasets, in order to find the best performing parameters. 

* Note that this can take a long time to run because every parameter combination is tried multiple times.

In [29]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.3])\
.addGrid(lr.maxIter, [10])\
.addGrid(lr.threshold, [0.25, 0.3, 0.35])\
.build()

cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(),\
                    estimatorParamMaps=paramGrid, numFolds=5)

modelCV = cv.fit(train)

#### Test the Model

Now we're ready to apply the model to the test data.

In [30]:
predictionCV = modelCV.transform(test)
predictedCV = predictionCV.select("features", "prediction", "trueLabel")
predictedCV.show(10, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,11433.0,12339.0,0.033273915626856804]|0.0       |0        |
|[10.0,1.0,0.0,12478.0,11278.0,0.035056446821152706]|0.0       |0        |
|[10.0,1.0,0.0,12478.0,14100.0,0.06833036244800951] |1.0       |1        |
|[10.0,1.0,0.0,13244.0,11193.0,0.035056446821152706]|0.0       |1        |
|[10.0,1.0,0.0,13487.0,11193.0,0.033273915626856804]|0.0       |0        |
|[10.0,1.0,0.0,13487.0,11193.0,0.03683897801544861] |0.0       |1        |
|[10.0,1.0,0.0,13487.0,14730.0,0.04278074866310161] |0.0       |0        |
|[10.0,1.0,0.0,14122.0,11433.0,0.0677361853832442]  |1.0       |1        |
|[2.0,1.0,0.0,10397.0,11298.0,0.035056446821152706] |0.0       |0        |
|[2.0,1.0,0.0,10821.0,11298.0,0.03149138443256091]  |0.0       |0        |
+------------------------

In [31]:
# Recalculate confusion matrix, using the new predictions
tp3 = float(predictionCV.filter("prediction == 1.0 AND truelabel == 1").count())
fp3 = float(predictionCV.filter("prediction == 1.0 AND truelabel == 0").count())
tn3 = float(predictionCV.filter("prediction == 0.0 AND truelabel == 0").count())
fn3 = float(predictionCV.filter("prediction == 0.0 AND truelabel == 1").count())

show_metrics(tp3, fp3, tn3, fn3)

TP = 86614.0
FP = 1472.0
TN = 647216.0
FN = 75497.0
Precision = 0.9832890584201803
Recall = 0.5342882346046844


Note that the recall metrics has improved.

In [32]:
bestPipeline = modelCV.bestModel
bestLRModel = bestPipeline.stages[6]
bestParams = bestLRModel.extractParamMap()

In [33]:
#type(bestParams)
for k,v in bestParams.items():
    print("Key: ", k, " ---> Value = ", v)

Key:  LogisticRegression_2fe46111e484__aggregationDepth  ---> Value =  2
Key:  LogisticRegression_2fe46111e484__elasticNetParam  ---> Value =  0.0
Key:  LogisticRegression_2fe46111e484__family  ---> Value =  auto
Key:  LogisticRegression_2fe46111e484__featuresCol  ---> Value =  features
Key:  LogisticRegression_2fe46111e484__fitIntercept  ---> Value =  True
Key:  LogisticRegression_2fe46111e484__labelCol  ---> Value =  label
Key:  LogisticRegression_2fe46111e484__maxBlockSizeInMB  ---> Value =  0.0
Key:  LogisticRegression_2fe46111e484__maxIter  ---> Value =  10
Key:  LogisticRegression_2fe46111e484__predictionCol  ---> Value =  prediction
Key:  LogisticRegression_2fe46111e484__probabilityCol  ---> Value =  probability
Key:  LogisticRegression_2fe46111e484__rawPredictionCol  ---> Value =  rawPrediction
Key:  LogisticRegression_2fe46111e484__regParam  ---> Value =  0.3
Key:  LogisticRegression_2fe46111e484__standardization  ---> Value =  True
Key:  LogisticRegression_2fe46111e484__thres

The improvement in recall was obtained with the change in threshold (Original: 0.35, Tuned: 0.30)

In [34]:
eval2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur2 = eval2.evaluate(predictionCV)
print ("Area under the ROC curve = ", aur2)

Area under the ROC curve =  0.9233577619046763


No significant reduction in Area under the ROC curve.