# Fraudolent Transaction Classification

## Importing Libreries and Global Variable Definition

In [0]:
# Importing libraries
import numpy as np
import os
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

import pyspark
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf
import pyspark.pandas as ps

from typing import Dict, List, Optional, Union, \
                   Tuple, Generator, Callable, Any
import gc
import json
import pandas as pd

In [0]:
# Defining global variables
DATA_PATH          = "dbfs:/big_data_project/dataset/"
TEST_IDENTITY      = os.path.join(DATA_PATH, "test_identity.csv")
TEST_TRANSACTION   = os.path.join(DATA_PATH, "test_transaction.csv")
TRAIN_IDENTITY     = os.path.join(DATA_PATH, "train_identity.csv")
TRAIN_TRANSACTION  = os.path.join(DATA_PATH, "train_transaction.csv")
FE_TRAIN_DATA      = os.path.join(DATA_PATH, "fe_train.csv")
FE_TEST_DATA       = os.path.join(DATA_PATH, "fe_test.csv")

# Define paths for saving models
MODEL_PATH       = "dbfs:/big_data_project/models/"
NUM_IMPUTER_PATH = os.path.join(MODEL_PATH, "num_imputer/")
LR_CV_PATH       = os.path.join(MODEL_PATH, "logistic_regression/")
DT_CV_PATH       = os.path.join(MODEL_PATH, "decision_tree/")
RF_CV_PATH       = os.path.join(MODEL_PATH, "random_forest/")
GBT_CV_PATH      = os.path.join(MODEL_PATH, "gradient_boosted_tree/")

MODELS_JSON = {
    "num_imputer" : True,
    "lr_cv_nostd_only_numerical"         : True,
    "lr_cv_std_only_numerical"           : True,
    "lr_cv_nostd_all_features"           : True,
    "lr_cv_nostd_only_categorical"       : True,
    "lr_cv_nostd_over_only_numerical"    : True,
    "lr_cv_std_over_only_numerical"      : True,
    "lr_cv_nostd_over_all_features"      : True,
    "lr_cv_nostd_over_only_categorical"  : True,
    "dt_cv_nostd_only_numerical"         : True,
    "dt_cv_std_only_numerical"           : True,
    "dt_cv_nostd_all_features"           : True,
    "dt_cv_nostd_only_categorical"       : True,
    "dt_cv_nostd_over_only_numerical"    : True,
    "dt_cv_std_over_only_numerical"      : True,
    "dt_cv_nostd_over_all_features"      : True,
    "dt_cv_nostd_over_only_categorical"  : True,
    "rf_cv_nostd_only_numerical"         : True,
    "rf_cv_std_only_numerical"           : True,
    "rf_cv_nostd_all_features"           : True,
    "rf_cv_nostd_only_categorical"       : True,
    "rf_cv_nostd_over_only_numerical"    : True,
    "rf_cv_std_over_only_numerical"      : True,
    "rf_cv_nostd_over_all_features"      : True,
    "rf_cv_nostd_over_only_categorical"  : True,
    "gbt_cv_nostd_only_numerical"        : True,
    "gbt_cv_std_only_numerical"          : True,
    "gbt_cv_nostd_all_features"          : True,
    "gbt_cv_nostd_only_categorical"      : True,
    "gbt_cv_nostd_over_only_numerical"   : True,
    "gbt_cv_std_over_only_numerical"     : True,
    "gbt_cv_nostd_over_all_features"     : True,
    "gbt_cv_nostd_over_only_categorical" : True
}

RAND_SEED = 42  # For reproducibility

SLOW_OPERATIONS      = False
DOWNLOAD_DATA        = False
SAVED_FE_TRAIN       = True
SAVED_FE_TEST        = True

In [0]:
# Download datasets
if DOWNLOAD_DATA:
    !wget -P /tmp https://cloud-storage.eu-central-1.linodeobjects.com/test_identity.csv
    !wget -P /tmp https://cloud-storage.eu-central-1.linodeobjects.com/test_transaction.csv
    !wget -P /tmp https://cloud-storage.eu-central-1.linodeobjects.com/train_identity.csv
    !wget -P /tmp https://cloud-storage.eu-central-1.linodeobjects.com/train_transaction.csv
    
    dbutils.fs.mv(f"file:/tmp/test_identity.csv", "dbfs:/big_data_project/dataset/test_identity.csv")
    dbutils.fs.mv(f"file:/tmp/test_transaction.csv", "dbfs:/big_data_project/dataset/test_transaction.csv")
    dbutils.fs.mv(f"file:/tmp/train_identity.csv", "dbfs:/big_data_project/dataset/train_identity.csv")
    dbutils.fs.mv(f"file:/tmp/train_transaction.csv", "dbfs:/big_data_project/dataset/train_transaction.csv")

## Data Acquisition and Feature engineering

In [0]:
# Just a util time
from functools import wraps
import time

def timeamt(f):
    @wraps(f)
    def wrap(*args, **kwargs):
        start = time.time()
        result = f(*args, **kwargs)
        end = time.time()
        print("Elapsed time: {:.3f}s".format((end - start)))
    
        return result
    
    return wrap

def cols2numpy(df: pyspark.sql.dataframe.DataFrame, col: str) -> np.ndarray:
    if isinstance(df, pyspark.sql.dataframe.DataFrame):
        return df.select(col).to_pandas_on_spark().to_numpy().reshape(-1)

    return df[col].to_numpy().reshape(-1)

@timeamt
def count_null_wperc(df: pyspark.sql.dataframe.DataFrame) -> ps.DataFrame:
    ncount = df.to_pandas_on_spark().isnull().sum()
    perc   = 100 * ncount / len(df.to_pandas_on_spark())

    return ps.concat(
        [
         ncount.to_dataframe().transpose(), 
         perc.to_dataframe().transpose()
         ]
    )

@timeamt
def spark_load_dataset(dataset: str) -> pyspark.sql.dataframe.DataFrame:
    print(f"Loading: {dataset}")
    return spark.read.load(dataset,
                           format="csv",
                           sep=",",
                           inferSchema="true",
                           header="true")

### Load the dataset in PySpark

In [0]:
# Here we load each dataset separately, Then we will merge them
if not (SAVED_FE_TRAIN or SAVED_FE_TEST):
    test_identity_df     = spark_load_dataset(TEST_IDENTITY)
    test_transact_df     = spark_load_dataset(TEST_TRANSACTION)
    train_identity_df    = spark_load_dataset(TRAIN_IDENTITY)
    train_transaction_df = spark_load_dataset(TRAIN_TRANSACTION)

Loading: dbfs:/big_data_project/dataset/test_identity.csv
Elapsed time: 15.582s
Loading: dbfs:/big_data_project/dataset/test_transaction.csv
Elapsed time: 52.027s
Loading: dbfs:/big_data_project/dataset/train_identity.csv
Elapsed time: 7.005s
Loading: dbfs:/big_data_project/dataset/train_transaction.csv
Elapsed time: 49.909s


---

### Exploring the Dataset

**Utility Functions**

In [0]:
@timeamt
def pie_plot(df: pyspark.sql.dataframe.DataFrame, col: str):
    """ Just a Pie plot for a specific column of the given dataset """
    # Temporary persist the dataset
    with df.groupBy([col]).count().to_pandas_on_spark().spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as col_counts:
        data = cols2numpy(col_counts, "count")
        labels = cols2numpy(col_counts, col)
        
        # Plot
        fig = go.Figure(data=[go.Pie(labels=labels, values=data)])
        fig.update_layout(width=800, height=400)
        fig.show()


@timeamt
def bar_plot(x_val : List[List[int]], 
             y_val : List[List[int]], 
             titles: List[str], 
             rows  : int = 1, 
             cols  : int = 1
) -> None:
    """ Just a simple bar plot """
    fig = make_subplots(rows=rows, cols=cols, subplot_titles=titles)

    row, col = 1, 1
    for i, (x, y) in enumerate(zip(x_val, y_val)):
        fig.add_trace(go.Bar(x=x, 
                             y=y, 
                             text=y, 
                             textposition='auto', 
                             name=titles[i]), 
                      row=row, col=col
                      )
        if col % cols == 0:
            row += 1
            col = 1
        else:
            col += 1
    
    fig.show()


@timeamt
def groupby_and_agg_values4bar_plot(df       : pyspark.sql.dataframe.DataFrame, 
                                    grpby_col: str, 
                                    agg_col  : str,
                                    agg_type : str = "sum"
) -> Tuple[str, np.ndarray, np.ndarray]:
    """ Given a dataframe, groups by an input columns and aggregate wrt another column """
    result = df.groupBy(grpby_col).agg(F.sum(F.col(agg_col)))
    title  = f"{grpby_col} x {agg_col}"

    with result.to_pandas_on_spark().spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as grouped_agg:
        values = grouped_agg[f"sum({agg_col})"].to_numpy()
        index  = grouped_agg[grpby_col].to_numpy()

        return title, values, index

**Merging dataset**

In [0]:
# First of all let's merge the datasets
# From the description: the two dataset can be merged
# along the TransactionID column.

print(f"Merging TEST DATASET ...")
test_df = test_transact_df.join(test_identity_df, on="TransactionID", how="left")

print(f"Merging TRAIN DATASET ...")
train_df = train_transaction_df.join(train_identity_df, on="TransactionID", how="left")

Merging TEST DATASET ...
Merging TRAIN DATASET ...


Print informations about *train dataset*

In [0]:
print(f"Size of train_identity: {train_identity_df.to_pandas_on_spark().shape}")
print(f"Size of train_transaction: {train_transaction_df.to_pandas_on_spark().shape}")

Size of train_identity: (144233, 41)
Size of train_transaction: (590540, 394)


The train set seems to be very large. The "identity" train file has over 144,233 observations and 41 features, and the "transaction" train file has over 590,540 and 394 features

In [0]:
# Delete previously used dataset
del test_identity_df
del test_transact_df
del train_identity_df
del train_transaction_df

gc.collect()

Out[6]: 251

**Description of Transaction table features**

<ins>*Categorical Features*</ins> - Transaction

- `ProductCD`: product code, the product for each transaction
- `card1 - card6`: payment card information, such as card type, card category, issue bank, country, etc.
- `addr1,addr2`: billing region and billing country addresses
- `P_emaildomain`: purchaser email domain
- `R_emaildomain`: recipient email domain
- `M1-M9`: match, such as name card and addresses, etc.
- `isFraud`: 0 if it is okay, 1 otherwise

<ins>*Numerical Features*</ins> - Transaction

- `TransactionDT`: timedelta from a given reference datetime (not an actual timestamp)
- `TransactionAMT`: transaction payment amount in USD
- `C1-C14`: counting, such as how many addresses are found to be associated weith the payment card, etc.
- `D1-D15`: timedelta, such as deys between previous transaction, etc.
- `Vxxx`: Vesta engineered rich features, including ranking, counting, and other entity relations

**Description of Identity table features**

Variables in this table are identity information - network connection information (IP, ISP, Proxy, etc) and digital signature (UA/browser/os/version, etc) associated with transactions. `id01` to `id11`are numerical features for identity, which is collected by Vesta and security partners such as device rating, ip_domain rating, proxy rating, etc. All of these are not able to elaborate due to security. `DeviceType` is the type of the device used to pay (`nan`, `mobile`, `desktop`), while `DeviceInfo` describes the type of devices used like SAMSUNG, HUAWEILDN and LG, etc.

In [0]:
train_df.show()  # Let's look more closely to the train dataset

+-------------+-------+-------------+--------------+---------+-----+-----+-----+----------+-----+------+-----+-----+-----+-----+--------------+-------------+-----+-----+---+---+-----+----+---+---+-----+---+----+---+-----+-----+-----+-----+-----+-----+-----+----+----+-----------------+------------------+-----+-----+----+----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+

In [0]:
# Number of rows and columns of the total dataset
rows, columns = train_df.to_pandas_on_spark().shape
print(f"Number of rows: {rows} x Number of columns: {columns}")

Number of rows: 590540 x Number of columns: 434


The new merged train dataset has 590,540 rows and 434 features

**Label Distribution**

Our target feature is "isFraud", it has values in [0,1], with 0 indicating a valid transaction and 1 indicating a fraudolent transaction.

In [0]:
# Pie Char for isFraud label
pie_plot(train_df, 'isFraud')

Elapsed time: 35.458s


The Pie Charts shows that our label column "isFraud" is *heavily unbalanced* with 569,877 0's and 20,663 1's

**Categorical features Pie Plot**

In [0]:
cat_cols: Dict[str, int] = {
    0: 'ProductCD', 1: 'card4', 2: 'card6', 
    3: 'P_emaildomain', 4: 'R_emaildomain',  5: 'M1', 
    6: 'M2', 7: 'M3',  8: 'M4', 
    9: 'M5', 10: 'M6', 11: 'M7', 
    12: 'M8', 13: 'M9', 14: 'id_12', 
    15: 'id_15', 16: 'id_16', 17: 'id_23', 
    18: 'id_27', 19: 'id_28', 20: 'id_29', 
    21: 'id_30', 22: 'id_31', 23: 'id_33', 
    24: 'id_34', 25: 'id_35', 26: 'id_36', 
    27: 'id_37', 28: 'id_38', 29: 'DeviceType', 
    30: 'DeviceInfo'
}

In [0]:
feature: int = 0  # Change this to see other features
pie_plot(train_df, cat_cols[feature])

Elapsed time: 26.029s


**Relation features and isFraud**

In this section we will see the correlation of the features `card4`, `card6`, `ProductCD` and `DeviceType` with the label `isFraud`

In [0]:
card4_title, card4_xval, card4_yval = groupby_and_agg_values4bar_plot(train_df.filter(F.col("isFraud") == 1), "card4", "isFraud")
card6_title, card6_xval, card6_yval = groupby_and_agg_values4bar_plot(train_df.filter(F.col("isFraud") == 1), "card6", "isFraud")
prod_title,  prod_xval,  prod_yval  = groupby_and_agg_values4bar_plot(train_df.filter(F.col("isFraud") == 1), "ProductCD", "isFraud")
devt_title,  devt_xval,  devt_yval  = groupby_and_agg_values4bar_plot(train_df.filter(F.col("isFraud") == 1), "DeviceType", "isFraud")

titles = [card4_title, card6_title, prod_title, devt_title]
y_vals = [card4_xval, card6_xval, prod_xval, devt_xval]
x_vals = [card4_yval, card6_yval, prod_yval, devt_yval]

rows = 1
cols = 4

bar_plot(x_val=x_vals, y_val=y_vals, titles=titles, rows=rows, cols=cols)

Elapsed time: 27.077s
Elapsed time: 25.794s
Elapsed time: 30.260s
Elapsed time: 32.046s


Elapsed time: 0.177s


In [0]:
del card6_title, card6_xval, card6_yval
del prod_title,  prod_xval,  prod_yval
del devt_title,  devt_xval,  devt_yval

**Describing data: mean, count, std, max ...**

In [0]:
if SLOW_OPERATIONS: train_df.describe()

**Missing Values**

From the small amount of datas printed previously, we can see that there are a lot of missing datas

In [0]:
# Show the percentage of null values for each column
ncount = count_null_wperc(train_df)

Elapsed time: 179.625s


In [0]:
ncount

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,ProductCD,card1,card2,card3,card4,card5,card6,addr1,addr2,dist1,dist2,P_emaildomain,R_emaildomain,C1,C2,C3,C4,C5,C6,C7,C8,C9,C10,C11,C12,C13,C14,D1,D2,D3,D4,D5,D6,D7,D8,D9,D10,D11,D12,D13,D14,D15,M1,M2,M3,M4,M5,M6,M7,M8,M9,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,V29,V30,V31,V32,V33,V34,V35,V36,V37,V38,V39,V40,V41,V42,V43,V44,V45,V46,V47,V48,V49,V50,V51,V52,V53,V54,V55,V56,V57,V58,V59,V60,V61,V62,V63,V64,V65,V66,V67,V68,V69,V70,V71,V72,V73,V74,V75,V76,V77,V78,V79,V80,V81,V82,V83,V84,V85,V86,V87,V88,V89,V90,V91,V92,V93,V94,V95,V96,V97,V98,V99,V100,V101,V102,V103,V104,V105,V106,V107,V108,V109,V110,V111,V112,V113,V114,V115,V116,V117,V118,V119,V120,V121,V122,V123,V124,V125,V126,V127,V128,V129,V130,V131,V132,V133,V134,V135,V136,V137,V138,V139,V140,V141,V142,V143,V144,V145,V146,V147,V148,V149,V150,V151,V152,V153,V154,V155,V156,V157,V158,V159,V160,V161,V162,V163,V164,V165,V166,V167,V168,V169,V170,V171,V172,V173,V174,V175,V176,V177,V178,V179,V180,V181,V182,V183,V184,V185,V186,V187,V188,V189,V190,V191,V192,V193,V194,V195,V196,V197,V198,V199,V200,V201,V202,V203,V204,V205,V206,V207,V208,V209,V210,V211,V212,V213,V214,V215,V216,V217,V218,V219,V220,V221,V222,V223,V224,V225,V226,V227,V228,V229,V230,V231,V232,V233,V234,V235,V236,V237,V238,V239,V240,V241,V242,V243,V244,V245,V246,V247,V248,V249,V250,V251,V252,V253,V254,V255,V256,V257,V258,V259,V260,V261,V262,V263,V264,V265,V266,V267,V268,V269,V270,V271,V272,V273,V274,V275,V276,V277,V278,V279,V280,V281,V282,V283,V284,V285,V286,V287,V288,V289,V290,V291,V292,V293,V294,V295,V296,V297,V298,V299,V300,V301,V302,V303,V304,V305,V306,V307,V308,V309,V310,V311,V312,V313,V314,V315,V316,V317,V318,V319,V320,V321,V322,V323,V324,V325,V326,V327,V328,V329,V330,V331,V332,V333,V334,V335,V336,V337,V338,V339,id_01,id_02,id_03,id_04,id_05,id_06,id_07,id_08,id_09,id_10,id_11,id_12,id_13,id_14,id_15,id_16,id_17,id_18,id_19,id_20,id_21,id_22,id_23,id_24,id_25,id_26,id_27,id_28,id_29,id_30,id_31,id_32,id_33,id_34,id_35,id_36,id_37,id_38,DeviceType,DeviceInfo
,0.0,0.0,0.0,0.0,0.0,0.0,8933.0,1565.0,1577.0,4259.0,1571.0,65706.0,65706.0,352271.0,552913.0,94456.0,453249.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1269.0,280797.0,262878.0,168922.0,309841.0,517353.0,551623.0,515614.0,515614.0,76022.0,279287.0,525823.0,528588.0,528353.0,89113.0,271100.0,271100.0,271100.0,281444.0,350482.0,169360.0,346265.0,346252.0,346252.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,279287.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,76073.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,168969.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,77096.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,89164.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,314.0,508595.0,508595.0,508595.0,508595.0,508595.0,508589.0,508589.0,508589.0,508595.0,508595.0,508595.0,508595.0,508589.0,508589.0,508589.0,508595.0,508595.0,508595.0,508595.0,508595.0,508595.0,508589.0,508589.0,508595.0,508595.0,508595.0,508589.0,508589.0,508589.0,450909.0,450909.0,450721.0,450721.0,450721.0,450909.0,450909.0,450721.0,450721.0,450909.0,450909.0,450909.0,450909.0,450721.0,450909.0,450909.0,450909.0,450721.0,450721.0,450909.0,450909.0,450721.0,450721.0,450909.0,450909.0,450909.0,450909.0,450721.0,450721.0,450909.0,450721.0,450721.0,450909.0,450721.0,450721.0,450909.0,450909.0,450909.0,450909.0,450909.0,450909.0,450721.0,450721.0,450721.0,450909.0,450909.0,450909.0,450909.0,450909.0,450909.0,460110.0,460110.0,460110.0,449124.0,449124.0,449124.0,460110.0,460110.0,460110.0,460110.0,449124.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,449124.0,460110.0,460110.0,460110.0,449124.0,449124.0,460110.0,460110.0,460110.0,460110.0,460110.0,449124.0,460110.0,460110.0,460110.0,460110.0,449124.0,449124.0,460110.0,460110.0,460110.0,449124.0,449124.0,460110.0,460110.0,449124.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,449124.0,449124.0,449124.0,460110.0,460110.0,460110.0,460110.0,460110.0,460110.0,12.0,12.0,1269.0,1269.0,1269.0,12.0,12.0,12.0,12.0,1269.0,1269.0,12.0,12.0,12.0,12.0,12.0,12.0,1269.0,12.0,12.0,12.0,1269.0,1269.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,1269.0,1269.0,1269.0,12.0,12.0,12.0,12.0,12.0,12.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,508189.0,446307.0,449668.0,524216.0,524216.0,453675.0,453675.0,585385.0,585385.0,515614.0,515614.0,449562.0,446307.0,463220.0,510496.0,449555.0,461200.0,451171.0,545427.0,451222.0,451279.0,585381.0,585371.0,585371.0,585793.0,585408.0,585377.0,585371.0,449562.0,449562.0,512975.0,450258.0,512954.0,517251.0,512735.0,449555.0,449555.0,449555.0,449555.0,449730.0,471874.0
,0.0,0.0,0.0,0.0,0.0,0.0,1.512683,0.265012,0.267044,0.721204,0.266028,11.126427,11.126427,59.652352,93.628374,15.994852,76.751617,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.214888,47.549192,44.514851,28.604667,52.467403,87.606767,93.40993,87.31229,87.31229,12.873302,47.293494,89.041047,89.509263,89.469469,15.090087,45.907136,45.907136,45.907136,47.658753,59.349409,28.678836,58.635317,58.633115,58.633115,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,47.293494,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,12.881939,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,28.612626,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,13.05517,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,15.098723,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,0.053172,86.123717,86.123717,86.123717,86.123717,86.123717,86.122701,86.122701,86.122701,86.123717,86.123717,86.123717,86.123717,86.122701,86.122701,86.122701,86.123717,86.123717,86.123717,86.123717,86.123717,86.123717,86.122701,86.122701,86.123717,86.123717,86.123717,86.122701,86.122701,86.122701,76.35537,76.35537,76.323534,76.323534,76.323534,76.35537,76.35537,76.323534,76.323534,76.35537,76.35537,76.35537,76.35537,76.323534,76.35537,76.35537,76.35537,76.323534,76.323534,76.35537,76.35537,76.323534,76.323534,76.35537,76.35537,76.35537,76.35537,76.323534,76.323534,76.35537,76.323534,76.323534,76.35537,76.323534,76.323534,76.35537,76.35537,76.35537,76.35537,76.35537,76.35537,76.323534,76.323534,76.323534,76.35537,76.35537,76.35537,76.35537,76.35537,76.35537,77.913435,77.913435,77.913435,76.053104,76.053104,76.053104,77.913435,77.913435,77.913435,77.913435,76.053104,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,76.053104,77.913435,77.913435,77.913435,76.053104,76.053104,77.913435,77.913435,77.913435,77.913435,77.913435,76.053104,77.913435,77.913435,77.913435,77.913435,76.053104,76.053104,77.913435,77.913435,77.913435,76.053104,76.053104,77.913435,77.913435,76.053104,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,76.053104,76.053104,76.053104,77.913435,77.913435,77.913435,77.913435,77.913435,77.913435,0.002032,0.002032,0.214888,0.214888,0.214888,0.002032,0.002032,0.002032,0.002032,0.214888,0.214888,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.214888,0.002032,0.002032,0.002032,0.214888,0.214888,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,0.214888,0.214888,0.214888,0.002032,0.002032,0.002032,0.002032,0.002032,0.002032,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,86.054967,75.576083,76.145223,88.768923,88.768923,76.823755,76.823755,99.12707,99.12707,87.31229,87.31229,76.127273,75.576083,78.440072,86.445626,76.126088,78.098012,76.399736,92.360721,76.408372,76.418024,99.126393,99.124699,99.124699,99.196159,99.130965,99.125715,99.124699,76.127273,76.127273,86.865411,76.245132,86.861855,87.589494,86.824771,76.126088,76.126088,76.126088,76.126088,76.155722,79.90551


In [0]:
bar_plot(x_val=[ncount.columns], y_val=[list(ncount.to_numpy()[1])], titles=["% of Null value"])

Elapsed time: 0.043s


---

### Features Engineering

As we can see from the previous plot, some features have a lot of missing values, for this reason we have to manage them in some way, otherwise we could have some problems during training. To handle missing values I would like to setup a kind of Pipeline in order to drop unnecessary features. This is the complete Pipeline

1. Drop columns with null percentage grater or equal than 80%
3. Standardize some categorical features

**(1)** First of all I would like to drop all the features that has a number of missing values in percentage grater than or equal of 90%. Such an higher number of missing values means that a particular feature does not give any kind of information that can help the predictions of the model. **(2)** The last thing that I would like to do is to *standardize* some features. What do I mean by standardizing? I mean give a more general name for features of the "same class". That is, for instance let's take the feature `P_emaildomain`, we can see that there are values like `yahoo.co.jp`, `yahoo.co.uk` or `yahoo.net`; the final value for these would be just `yahoo`. In this way, when we will apply One-Hot-Encoding on these categorical features we will have less and more general values to handle. This standardization is applied to the following features: `P_emaildomain`, `R_emaildomain`, `id_30`, `DeviceInfo` and `id_31`. A better understanding of what this standardization does, can be gained from the following code.

---

*EMAIL STANDARDIZATION*

In [0]:
EMAILS: Dict[str, str] = {
    "netzero.net"      : "netzero",    "yahoo.co.jp"     : "yahoo",
    "prodigy.net.mx"   : "prodigy",    "windstream.net"  : "windstream",
    "outlook.es"       : "outlook",    "embarqmail.com"  : "centurylink",
    "charter.net"      : "charter",    "gmx.de"          : "gmx",
    "mail.com"         : "mail",       "centurylink.net" : "centurylink",
    "cableone.net"     : "cableone",   "hotmail.fr"      : "outlook",
    "sbcglobal.net"    : "yahoo",      "frontier.com"    : "frontier",
    "anonymous.com"    : "anonymous",  "yahoo.fr"        : "yahoo",
    "outlook.com"      : "outlook",    "live.com.mx"     : "outlook",
    "ymail.com"        : "yahoo",      "frontiernet.net" : "frontiernet",
    "cfl.rr.com"       : "spectrum",   "live.fr"         : "outlook",
    "hotmail.com"      : "outlook",    "cox.net"         : "cox",
    "hotmail.es"       : "outlook",    "aol.com"         : "aol",
    "msn.com"          : "microsoft",  "suddenlink.net"  : "suddenlink",
    "gmail.com"        : "google",     "protonmail.com"  : "proton",
    "roadrunner.com"   : "roadrunner", "web.de"          : "web.de",
    "gmail"            : "google",     "netzero.com"     : "netzero",
    "live.com"         : "outlook",    "icloud.com"      : "apple",
    "comcast.net"      : "comcast",    "hotmail.co.uk"   : "outlook",
    "yahoo.co.uk"      : "yahoo",      "att.net"         : "yahoo",
    "optonline.net"    : "optimum",    "sc.rr.com"       : "spectrum",
    "yahoo.com"        : "yahoo",      "verizon.net"     : "verizon",
    "servicios-ta.com" : "ta",         "bellsouth.net"   : "bellsouth",
    "hotmail.de"       : "outlook",    "twc.com"         : "spectrum", 
    "q.com"            : "qcom",       "rocketmail.com"  : "rocketmail",
    "juno.com"         : "juno",       "mac.com"         : "apple",
    "yahoo.com.mx"     : "yahoo",      "earthlink.net"   : "earthlink",
    "aim.com"          : "aim",        "ptd.net"         : "pdt",
    "yahoo.de"         : "yahoo",      "yahoo.es"        : "yahoo", 
    "me.com"           : "apple",      "scranton.edu"    : "scranton"
}

*OS STANDARDIZATION*

In [0]:
op_systems = ['iOS 11.2.0', 'Mac OS X 10_12_4', 'Android 6.0.1', 
              'Mac OS X 10_13_5', 'Mac', 'iOS 11.1.0', 
              'iOS 11.0.2', 'func', 'Linux', 'Windows 10', 
              'iOS 11.2.2', 'iOS 11.4.1', 'iOS 11.3.1', 
              'Android 5.0', 'Windows', 'Android 6.0', 
              'Mac OS X 10_12_5', 'Mac OS X 10.9', 
              'iOS 11.0.3', 'Android 7.1.1', 
              'Android 5.1.1', 'Mac OS X 10_13_4', 
              'Mac OS X 10.11', 'Android 7.0', 
              'Android 8.0.0', 'iOS 11.1.2', 
              'iOS 11.0.1', 'iOS 10.0.2', 
              'Mac OS X 10.10', 'Mac OS X 10.13', 
              'Windows 8.1', 'iOS 10.3.1', 'Mac OS X 10_7_5', 
              'Mac OS X 10_10_5', 'Android 5.0.2', 'Mac OS X 10_11_3', 
              'Android', 'Mac OS X 10_13_2', 'Android 8.1.0', 
              'Mac OS X 10_12_2', 'iOS 11.2.1', 'Android 4.4.2', 
              'iOS 11.0.0', 'iOS 10.1.1', 'Mac OS X 10_12_3', 
              'iOS 10.3.3', 'Windows Vista', 'Windows XP', 
              'Mac OS X 10_12', 'iOS 10.3.2', 'Mac OS X 10.12', 
              'iOS 10.2.1', 'iOS 11.4.0', 'other', 'Mac OS X 10_13_3', 
              'Android 7.1.2', 'Windows 7', 'Mac OS X 10_13_1', 
              'Mac OS X 10_12_1', 'iOS 11.3.0', 'Windows 8', 
              'iOS 11.1.1', 'Mac OS X 10_9_5', 'Mac OS X 10_6_8', 
              'Mac OS X 10_11_4', 'iOS 11.2.5', 'iOS 11.2.6', 
              'Mac OS X 10_11_6', 'iOS', 'Mac OS X 10_12_6', 
              'iOS 10.2.0', 'Mac OS X 10_8_5', 'Mac OS X 10_11_5', 
              'Mac OS X 10.6', 'iOS 9.3.5']

DEVICE_OS: Dict[str, str] = {os : (os.split()[0].lower() if os and os != "func" and os != "other" else os ) for os in op_systems}

*BROWSER STANDARDIZATION*

In [0]:
def stand_browser(browser: str) -> str:
    """ Return the generic type of a browser """
    if "chrome"  in browser: return "Chrome"             # Check for Chrome
    if "opera"   in browser: return "Opera"              # Check for Opera
    if "edge"    in browser: return "Edge"               # Check for Edge
    if "safari"  in browser: return "Safari"             # Check for Safari
    if "ie"      in browser: return "Internet Explorer"  # Check for Internet Explorer
    if "firefox" in browser: return "Firefox"            # Check for Firefox
    if "google"  in browser: return "Google"             # Check for Google Search Application
    if "samsung" in browser: return "Samsung Browser"    # Check for Samsung Browser

    return "Other"

BROWSERS: Dict[str,str] = {x.id_31 : stand_browser(x.id_31) for x in train_df.select('id_31').collect() if x.id_31 is not None}

*DEVICE INFO STANDARDIZATION*

In [0]:
def stand_deviceinfo(dev_info: str) -> str:
    """ Check prefixes of the device info and return the type """
    if "SM"     in dev_info or "SAMSUNG" in dev_info or "GT-"  in dev_info: return "Samsung"   # Check for a samsung device
    if "Moto G" in dev_info or "Moto"    in dev_info or "moto" in dev_info: return "Motorola"  # Check for a motorola device
    if "HUAWEI" in dev_info or "ALE-"    in dev_info or "-L"   in dev_info: return "Huawei"    # Check for Huawei device

    if "iOS"   in dev_info or "MacOS" in dev_info: return "Apple"  # Check for iOS device
    if "Blade" in dev_info or "BLADE" in dev_info: return "ZTE"    # Check for ZTE device
    
    if "Windows" in dev_info: return "Microsoft"  # Check for Windows device
    if "Redmi"   in dev_info: return "Xiaomi"     # Check for Xiaomi device
    if "LG-"     in dev_info: return "LG"         # Check for a LG device
    if "rv:"     in dev_info: return "RV"         # Check for Rv device
    if "Linux"   in dev_info: return "Linux"      # Check for Linux device
    if "XT"      in dev_info: return "Sony"       # Check for Sony device
    if "HTC"     in dev_info: return "HTC"        # Check for HTC device
    if "ASUS"    in dev_info: return "Asus"       # Check for Asus device
    
    return "Other"


DEVICE_INFOS = {info.DeviceInfo : stand_deviceinfo(info.DeviceInfo) for info in list(train_df.select('DeviceInfo').collect()) if info.DeviceInfo is not None}

---

**Define the features that they don't have to be removed in any case**

In [0]:
LOCKED_FEATURES: List[str] = ["TransactionAmt", "ProductCD", "card1", "card2",
                              "card3", "card4", "card5", "card6", "addr1", "addr2",
                              "P_emaildomain", "R_emaildomain", "C1", "C2", "C3",
                              "C4", "C5", "C6", "C7", "C8", "C9", "C10", "C11",
                              "C13", "D1", "D2", "D3", "D4", "D5", "D6", "D8", 
                              "D9", "D10", "D11", "D12", "D13", "D14", "D15", "M2", 
                              "M3", "M4", "M5", "M6", "M7", "M8", "M9", "id_30"]  # Note that the major are categorical features

**Defining Classes for Feature Engineering**

In [0]:
# Class for Feature Standardization
class StandardizeFeature:
    def __init__(self, strategy: Dict[str, bool]) -> None:
        self.__strategy = strategy
    
    @timeamt
    def _standardize_emails(self, do: bool, df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        """ Standardized by replacing each occurrence in EMAILS with its value """
        if not do:
            return df

        try:
            return df.replace(EMAILS, subset=["P_emaildomain", "R_emaildomain"])
        except KeyError as ke:
            return None
    
    @timeamt
    def _standardize_os(self, do: bool, df: pyspark.sql.dataframe.DataFrame) -> None:
        """ Standardized by replacing each occurrence in DEVICE_OS with its value """
        if not do:
            return df

        try:
            return df.replace(DEVICE_OS, subset=["id_30"])
        except KeyError:
            return None
    
    @timeamt
    def _standardize_devinfo(self, do: bool, df: pyspark.sql.dataframe.DataFrame) -> None:
        """ Standardized by replacing each occurrence in DEVICE_INFOS with its value """
        if not do:
            return df

        try:
            return df.replace(DEVICE_INFOS, subset=["DeviceInfo"])
        except KeyError:
            return None
    
    @timeamt
    def _standardize_browser(self, do: bool, df: pyspark.sql.dataframe.DataFrame) -> None:
        """ Standardized by replacing each occurrence in BROWSERS with its value """
        if not do:
            return df

        try:
            return df.replace(BROWSERS, subset=["id_31"])
        except KeyError:
            return None
    
    def __call__(self, df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        df = self._standardize_emails(self.__strategy['email'], df)
        df = self._standardize_os(self.__strategy['os'], df)
        df = self._standardize_devinfo(self.__strategy['devinfo'], df)
        df = self._standardize_browser(self.__strategy['browser'], df)
        return df


# Class for Feature Engineering Pipeline
class FEPipeline:
    def __init__(self, np_threshold: float, 
                 std_strategy      : Dict[str, bool],
                 locked            : List[str] = []
    ) -> None:
        self._null_perc_threshold  = np_threshold
        self._standardize_strategy = std_strategy
        self._locked_features      = locked
    
    def _get_features2drop(self, df: pyspark.sql.dataframe.DataFrame) -> List[str]:
        """ Return a list of features whose null count is grater 
        or equal a given threshold of the the entire dataset """
        perc = count_null_wperc(df)
        values = perc.to_numpy()
        todrop = []
        
        for i, feature in enumerate(perc.columns):
            if values[1, i] > self._null_perc_threshold and feature not in self._locked_features:
                todrop.append(feature)
        
        return todrop
    
    def __call__(self, df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        """ Apply the entire feature engineering pipeline """
        print("STARTING FEATURE ENGINEERING PIPELINE")
        # 1. Drop features with null percentage > given threshold
        print(f"1. Drop features with null percentage > {self._null_perc_threshold} ...", end=" ")
        todrop1 = self._get_features2drop(df)
        print(f"N. features to drop {len(todrop1)}")
        
        df = df.drop(*todrop1)

        # 2. Apply standardization
        print("3. Apply Standardization to: %s" % (", ".join([f for f, v in self._standardize_strategy.items() if v])))
        stfeature = StandardizeFeature(self._standardize_strategy)
        df = stfeature(df)

        print("ENDING FEATURE ENGINEERING")

        return df

**Apply the Pipeline**

In [0]:
strategy = {
    "email"   : True,  # Use False to avoid standardization
    "os"      : True,  # Use False to avoid standardization 
    "devinfo" : True,  # Use False to avoid standardization 
    "browser" : True   # Use False to avoid standardization
}

fepipeline = FEPipeline(80.0, strategy, locked=LOCKED_FEATURES)
train_df   = fepipeline(train_df)

STARTING FEATURE ENGINEERING PIPELINE
1. Drop features with null percentage > 80.0 ... Elapsed time: 235.596s
N. features to drop 67
3. Apply Standardization to: email, os, devinfo, browser
Elapsed time: 0.136s
Elapsed time: 0.113s
Elapsed time: 0.951s
Elapsed time: 0.160s
ENDING FEATURE ENGINEERING


---

### Feature Selection

In this section we are going to select some features, over all features of the training dataset, that will be used to make predictions. Since we have both categorical and numerical fetures, I will be good try to see how different models behaves wrt to which features have been chosen or not. For this purpose we are going to create a class.

In [0]:
class FeatureSelector:
    CATEGORICAL_FEATURE: List[str] = [
        'ProductCD', 'card4', 'card6', 
        'P_emaildomain', 'R_emaildomain',  'M1', 
        'M2', 'M3',  'M4', 'M5', 'M6', 'M7', 
        'M8', 'M9', 'id_12', 'id_15', 'id_16', 'id_23', 
        'id_27', 'id_28', 'id_29', 'id_30', 'id_31', 'id_33', 
        'id_34', 'id_35', 'id_36', 'id_37', 'id_38', 'DeviceType', 
        'DeviceInfo'
    ]
    
    @staticmethod
    def select_only_numerical_features(
        df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        """ Return a dataframe with only numerical features """
        # Drop non numerical columns
        new_df = df.drop(*FeatureSelector.CATEGORICAL_FEATURE)
        return new_df

    @staticmethod
    def select_all(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        """ Return a dataframe with all features """
        return df
      
    @staticmethod
    def select_only_categorical(df: pyspark.sql.dataframe.DataFrame, target_feature: str) -> pyspark.sql.dataframe.DataFrame:
        """ Return a dataframe with only categorical features """
        new_categorical = FeatureSelector.return_new_categorical(df)
        not_categorical = [x for x in df.columns if x not in new_categorical + [target_feature]]
        new_df = df.drop(*not_categorical)
        return new_df
    
    @staticmethod
    def filter_df2df(from_df: pyspark.sql.dataframe.DataFrame,
                     to_df  : pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
        """ Return a new dataset dropping features in to_df not in from_df """
        todrop = []
        for col in to_df.columns:
            if col not in from_df.columns:
                todrop.append(col)
                
        return to_df.drop(*todrop)
      
    @staticmethod
    def return_new_categorical(df: pyspark.sql.dataframe.DataFrame) -> List[str]:
        """ Return the categorical features still presents in the dataset """
        return [col for i, col in enumerate(df.columns) if df.dtypes[i][1] == "string"]

---

## Machine Learning Models

In this section we are going to explore different ML methods to predict whether a given transaction is fraudolent or not. The methods that I would like to experience with are: *Decision Trees*, *Random Forest*, *Logistic Regression*

### Preliminaries Operations

In [0]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel,             \
                                      DecisionTreeClassifier, DecisionTreeClassificationModel, \
                                      RandomForestClassifier, RandomForestClassificationModel, \
                                      GBTClassifier, GBTClassificationModel

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Transformer

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.stat import Correlation

from sklearn.metrics import confusion_matrix, precision_score, recall_score, accuracy_score, matthews_corrcoef, f1_score

**Defining Some Utility Functions**

Before applying the learning pipleline, I would like to define some utility functions. The most imporstant utility function is the one that create a feature engineering pipeline that involves: StringIndexer, Imputer, OneHotEncoder and VectorAssembler. This is a simple explanation of what this operations do:

1. `StringIndexer`: encodes a string column of labels to a column of label indices
2. `Imputer`: replace null values according to a given strategy ("mean" or "mode")
3. `OneHotEncoder`: maps a categorical feature to a binary vector with at most a single one-value indicating the presence of a specific feature value
4. `VectorAssembler`: wraps multiple features into a single feature columns with vector values containing the old features
5. `StandardScaler`: Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the set

There is also a function used to do the random splitting for the training and the test set (a deeper explanation later). Finally, there is another function to apply pearson correlation

In [0]:
class UtilityFunctions:
    @staticmethod
    def create_pipeline(df              : pyspark.sql.dataframe.DataFrame, 
                        target_feat     : str,
                        imputer_strategy: Optional[str] = "mean",
                        use_imputer     : bool = False,
                        use_va          : bool = False,
                        use_ohe         : bool = False,
                        use_stridx      : bool = False,
                        return_stages   : bool = False
    ) -> Union[pyspark.ml.Transformer, 
               List[Union[OneHotEncoder, Imputer, VectorAssembler, StringIndexer]], 
               pyspark.sql.dataframe.DataFrame]:
        """
        Create a Pipeline stages of StringIndexer, Imputer, OneHotEncoder
        and VectorAssembler, according to a given strategy.
        
        Parameters
        ----------
        df : pyspark.sql.dataframe.DataFrame
            The input dataframe
        target_feat : str
            the target feature
        imputer_stragety : Optional[str]
            the strategy for the imputer (mean or mode)
        use_imputer : bool
            If true, add to the stages also the imputer
        use_va : bool
            If true, add to the stages also the Vector Assembler
        use_ohe : bool
            If true, add to the stages also the OneHotEncoding
        use_stridx : bool
            If true, add to the stages also the StringIndexer
        return_stages : bool
            If True, does not perform fitting but 
            returns the list of stages
            
        Returns
        -------
        pyspark.ml.Transformer
            the pipeline of transformation fit to `df`
        """
        # Get the Imputer
        imputer = UtilityFunctions.return_imputer(use_imputer, df, imputer_strategy)

        # Get StringIndexers for categorical features
        indexers = UtilityFunctions.return_stringindexer(use_stridx, FeatureSelector.return_new_categorical(df))
        lindexer = UtilityFunctions.return_stringindexer(use_stridx, [target_feat])
            
        # Get OneHotEncoder for categorical features
        cat_features = [idx.getOutputCol() for idx in indexers] if indexers else []
        ohe = UtilityFunctions.return_onehotencoder(use_ohe, cat_features)
        
        # Get the VectorAssembler
        # Note: if we use OHE, this means that we have some categorical features
        # in the dataset. On the other hand, if use_ohe is False, it is very
        # likely that we are not considering categorical features at all, i.e.
        # we are considering only numerical features. This is useful later,
        # when defining the entire Learning pipeline.
        num_features = [feat for feat in df.columns if feat not in FeatureSelector.CATEGORICAL_FEATURE and feat != target_feat]
        ohe_features = ohe.getOutputCols() if use_ohe else (cat_features if use_stridx else [])
        vassblr = UtilityFunctions.return_vectorized(use_va, num_features + ohe_features)
        
        # If there is no stages
        if not (use_imputer or use_va or use_stridx or use_ohe or use_va or return_stages):
            return df
        
        stages =   ([imputer] if use_imputer else []) \
                 + (indexers  if use_stridx  else []) \
                 + ([ohe]     if use_ohe     else []) \
                 + (lindexer  if use_stridx  else []) \
                 + ([vassblr] if use_va      else [])
        
        if return_stages:
            return stages
        
        pipeline = Pipeline(stages=stages)
        transformer = pipeline.fit(df)
        
        return transformer
    
    @staticmethod
    def return_onehotencoder(do: bool, cols: List[str]) -> Optional[OneHotEncoder]:
        """ Return a OneHotEncoding instance for columns cols """
        if not do: return None
    
        print("        - Use OneHotEncoder ...")
        return OneHotEncoder(inputCols=cols, 
                             outputCols=list(map(lambda x: f"{x}_encoded", cols)),
                             handleInvalid="keep")
    
    @staticmethod
    def return_stringindexer(do: bool, cols: List[str]) -> Optional[List[StringIndexer]]:
        """ Return a StringIndexer for each input col """
        if not do: return None
        
        print(f"        - Use StringIndexer for cols: {cols} ...")
        return [StringIndexer(inputCol=c, outputCol=f"{c}_indexed", handleInvalid="keep") for c in cols]
    
    @staticmethod
    def return_imputer(do      : bool,
                       df      : pyspark.sql.dataframe.DataFrame, 
                       strategy: Optional[str] = "mean"
    ) -> Optional[Imputer]:
        """ Return an Imputer with strategy """
        if not do:
            return None
        
        print(f"        - Use {strategy} Imputer ...")
        cols = df.columns
        
        # Depending on the strategy we are going to select
        # which types of feature the imputer should impute
        if strategy == "mean":
            features = [col for col in cols if col not in FeatureSelector.CATEGORICAL_FEATURE]
        else:
            features = [col for col in cols if col in FeatureSelector.CATEGORICAL_FEATURE]

        # Now,let's define the simple Imputer and then apply it
        imputer = Imputer()
        imputer.setStrategy(strategy)
        imputer.setInputCols(features)
        imputer.setOutputCols(features)

        return imputer

    @staticmethod
    def stratified_sampling(df    : pyspark.sql.dataframe.DataFrame, 
                            target: str, 
                            one   : float, 
                            zero  : float
    ) -> pyspark.sql.dataframe.DataFrame:
        """ A stratified sampling for unbalanced dataset and binary target feature """
        return df.sampleBy(target, fractions={0: zero, 1: one}, seed=RAND_SEED)

    @staticmethod
    def return_vectorized(do            : bool,
                          features      : List[str],
                          handle_inv    : str = "keep"
    ) -> Optional[VectorAssembler]:
        """ Return a VectorAssember to vectorized all the features except for the target """
        if not do:
            return None
        
        print("        - Use VectorAssembler ...")
        # Construct the VectorAssembler
        vec_assembler = VectorAssembler(inputCols=features, outputCol="features", handleInvalid=handle_inv)

        return vec_assembler
    
    @staticmethod
    def pearson_correlation(df            : pyspark.sql.dataframe.DataFrame,
                            target_feature: str,
                            threshold     : float,
                            locked        : List[str]) -> List[str]:
        """ Returns the list of numerical features with an high (> threshold) correlation """
        features = [f for f in df.columns if f not in FeatureSelector.CATEGORICAL_FEATURE]
        
        # Vectorized rows for the Correlation computation
        vec_assembler = UtilityFunctions.return_vectorized(True, features, "keep")
        dense_df = vec_assembler.transform(df)
        
        # Compute the Pearson Correlation
        pearson_corr = Correlation.corr(dense_df, "features", "pearson") \
                                  .collect()[0][0]                       \
                                  .toArray()
        
        # Find features with any value grater than the threshold
        ncols  = pearson_corr.shape[1]
        todrop = []
        for col in range(ncols):
            if any(pearson_corr[:col, col] > threshold) and \
                   features[col] not in locked:
                todrop.append(features[col])
        
        return todrop
    
    @staticmethod
    def plot_confusion_matrix(cm: np.ndarray, exp_name: str) -> None:
        """ Plot the confusion matrix in a better way """
        x = ["0.0", "0.1"]
        fig = px.imshow(cm, width=600, height=400, 
                        color_continuous_scale="blues", 
                        x=x, y=x, text_auto=True,
                        title=f"<b>Confusion Matrix for {exp_name}</b>",
                        labels=dict(x="Predicted Label", y="True Label"))
        
        fig.show()
        
    @staticmethod
    def compute_metrics(y_true: pd.DataFrame, 
                        y_pred: pd.DataFrame) -> Tuple[float, float, float, float]:
        """ Compute: Precision, Recall, Accuracy and F1-score """
        precision = precision_score(y_true, y_pred, average="macro")
        recall    = recall_score(y_true, y_pred, average="macro")
        accuracy  = accuracy_score(y_true, y_pred)
        f1score   = (2 * (precision * recall))/(precision + recall)
        
        return precision, recall, accuracy, f1score
      
    @staticmethod
    def summarize_train_test_result(cv_model   : CrossValidatorModel,
                                    predictions: pyspark.sql.dataframe.DataFrame,
                                    exp_name   : str
                                    ) -> None:
        """ Gives an summarized overview of trained models during k-fold validation """
        print(f"============ SUMMARIZING FOR EXPERIMENT {exp_name} ============")
        print("***************** Training Summary *****************")
        
        best_model = cv_model
        
        if isinstance(best_model.stages[-1], LogisticRegressionModel):
            print("|----- Best model parameters according to K-Fold Cross Validation: lambda=[{:.3f}]; alfa=[{:.3f}]".format(
                best_model.stages[-1]._java_obj.getRegParam(), best_model.stages[-1]._java_obj.getElasticNetParam()
            ))
        elif isinstance(best_model.stages[-1], DecisionTreeClassificationModel):
            print("|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[{:d}]; impurity=[{:s}]".format(
                best_model.stages[-1]._java_obj.getMaxDepth(), best_model.stages[-1]._java_obj.getImpurity()
            ))
        elif isinstance(best_model.stages[-1], RandomForestClassificationModel):
            print("|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[{:d}]".format(
                best_model.stages[-1]._java_obj.getMaxDepth()
            ))
        elif isinstance(best_model.stages[-1], GBTClassificationModel):
            print("|----- Model parameters: maxDepth=[{:d}]; maxIter=[{:d}]".format(
                best_model.stages[-1]._java_obj.getMaxDepth(), best_model.stages[-1]._java_obj.getMaxIter()
            ))
        
        print("|----- Best Model Stages")
        for stage in best_model.stages:
            print(stage)
            
        print()
        
        print("***************** Test Summary *****************")
        y_true = predictions.select("isFraud")
        y_true = y_true.toPandas().astype('float64')

        y_pred = predictions.select("prediction")
        y_pred = y_pred.toPandas()
        
        cm = confusion_matrix(y_true, y_pred, labels=[0.0, 1.0])
        
        UtilityFunctions.plot_confusion_matrix(cm, exp_name)
        
        precision, recall, accuracy, f1score  = UtilityFunctions.compute_metrics(y_true, y_pred)
        
        print("|----- General Metrics")
        print(f"    + Precision {precision}")
        print(f"    + Recall {recall}")
        print(f"    + Accuracy {accuracy}")
        print(f"    + F1-score {f1score}")
        
        evaluator = BinaryClassificationEvaluator()
        evaluator.setLabelCol("isFraud")
        evaluator.setMetricName("areaUnderROC")
        auroc = evaluator.evaluate(predictions)
        
        print(f"    + AUC ROC {auroc}")
        print("=" * (len(f"============ SUMMARIZING FOR EXPERIMENT {exp_name} ============")))
        
    @staticmethod
    def summarize_batch(results: List[Tuple[PipelineModel, 
                                            pyspark.sql.dataframe.DataFrame, 
                                            str]]) -> None:
        """ Gives a summarization for each result in results """
        for result in results:
            model, predictions, exp_name = result
            UtilityFunctions.summarize_train_test_result(model, predictions, exp_name)
            
    @staticmethod
    def oversample(df       : pyspark.sql.dataframe.DataFrame, 
                   min_class: int, 
                   max_class: int,
                   fraction : float=0.7) -> pyspark.sql.dataframe.DataFrame:
        """ Do oversampling of the minority class """
        # Take the dataset with minor and major class
        minor_df = df.filter(F.col('isFraud') == min_class)
        major_df = df.filter(F.col('isFraud') == max_class)
        
        # Compute the ratio
        ratio = int(major_df.count() / minor_df.count())
        a     = range(ratio)
        
        # Oversample
        minor_oversample    = minor_df.sample(withReplacement=True, fraction=float(ratio), seed=42)
        minor_oversample, _ = minor_oversample.randomSplit([fraction, 1.0 - fraction])
        
        return major_df.unionAll(minor_oversample)

**Splitting the Dataset**

Before applying features selection and more feature engineering pipelines, I want to split the entire training dataset into two smaller dataset, one for effectively training the models and the other to actually test the latter. However, since the dataset is highly unbalanced we cannot a simple random splitting which might lead to a poor splitting strategy, where for instance the test set ends up containing only examples that are labeled with the most representative class (in this case *non fraudolent transaction*). For this reason, we are not going to use a simple random splitting, but instead a **stratified random sampling** in which we can choose how many amount of a particular class we can sample. In this way we can ensure that the less frequent class will be chosen as well both in the training and in the test set. For this particular case I'm going to choose about the 50% of non fraudolent samples and fraudolent samples.

In [0]:
if not SAVED_FE_TRAIN:
    train = UtilityFunctions.stratified_sampling(train_df, target="isFraud", zero=.6, one=.7)
else:
    train = spark_load_dataset(FE_TRAIN_DATA)

Loading: dbfs:/big_data_project/dataset/fe_train.csv
Elapsed time: 37.184s


In [0]:
# Let's have a look at the resulting training set
train.groupby('isFraud').count().show()

+-------+------+
|isFraud| count|
+-------+------+
|      1| 14511|
|      0|342530|
+-------+------+



In [0]:
# Take the resulting test set by subtracting the entire train_df by the new training set
test = train_df.subtract(train) if not SAVED_FE_TEST else spark_load_dataset(FE_TEST_DATA)

Loading: dbfs:/big_data_project/dataset/fe_test.csv
Elapsed time: 24.623s


**Uses the Imputer**

There are still a lot of null values both in the training and in the test dataset. To avoid feeding machine learning pipelines using very sparse dataset (sparsity in terms of number of missing features), I would like to use the so-called *Imputer*. It is a model defined by the `Imputer` class that, given a precise strategy (one between *mode* and *mean*), fill missing value accordingly. The `mean` strategy just replace every missing value of a column by just taking the mean value of it. Instead the `mode` strategy replace each null value with the most frequent one. Since we have both numerical and categorical data with a very high number of null values I will use the first strategy on the former (numerical) and replace the null on categorical data with a `"N"` value. For categorical features is the only solutions I found, since it seems that the Imputer model does not work using these types of data, moreover using the `fillna` of `pyspark.pandas.DataFrame` leads to a huge degradation of performances.

*Uses only if train and test data are not loaded*

In [0]:
# Get the Imputer for numerical features only
num_imputer_model = PipelineModel.load(NUM_IMPUTER_PATH + "imputer") if MODELS_JSON["num_imputer"] else \
                        UtilityFunctions.create_pipeline(train, 'isFraud', use_imputer=True)

In [0]:
# Apply the Imputer and the remaining fillna
train = num_imputer_model.transform(train)
train = train.fillna("N")

In [0]:
# If not previously saved the model, saves
if not MODELS_JSON["num_imputer"]:
    num_imputer_model.save(NUM_IMPUTER_PATH + "imputer")
    MODELS_JSON["num_imputer"] = True

Do the same for test data

In [0]:
test = num_imputer_model.transform(test)
test = test.fillna("N")

**Using Pearson Correlation**

The last preliminar step, before stepping into the definition of the training models, is to drop some numerical columns (they are too much and not all necessary) once more, but in this case using the Pearson Correlation. That is, we define a threshold (usually of 0.90, but it must be between -1 and 1) and drop all features with a correlation coefficient grater than or equal of the given threshold. The main reasoning is the following: if two features are highly correlated (i.e., the correlation tends but not equal to 1) then they represents in some way the same feature, and this does not help during predictions, for this reason one of the two will be dropped. 

*Uses only if train and test data are not loaded*

In [0]:
todrop = UtilityFunctions.pearson_correlation(train, 'isFraud', .90, locked=LOCKED_FEATURES + ['isFraud'])

GETTING VECTOR ASSEMBLER ...


In [0]:
# Drop columns from the train and the test dataset
train = train.drop(*todrop)
test  = test.drop(*todrop)

In [0]:
print(f"Train dataset remaining features: {len(train.columns)}")
print(f"Test dataset remaining features: {len(test.columns)}")

Train dataset remaining features: 232
Test dataset remaining features: 232


**Save Feature Engineerized Test and Train Datasets**

Here we save the resulting train and test dataset for easily usage

*Uses only if train and test data are not loaded*

In [0]:
if not (SAVED_FE_TRAIN or SAVED_FE_TEST):
    train.write.option("header", True).csv(FE_TRAIN_DATA, mode="overwrite")
    test.write.option("header", True).csv(FE_TEST_DATA, mode="overwrite")

**Features Selection**

  There is still a problem with the unbalancing of the dataset. The stratified sampling ensures both the train and the test dataset containing samples labeled with 0's and other samples labeled with 1's. However, at the end the proportion of the train dataset is the same of the original. For this reason I decide to apply **oversampling**. With oversampling the minor class (in this case the class 1), the final train dataset will contains an equal number of samples labeled with 1 and samples labeled with 0. However, I see that having an equal number of samples for each class leads to a high number of False Positive, so I decide to keep only the 60% of the 1's. Finally, the train dataset contains 63.3% of non fraud and 36.7% of fraudolent transactions.

In [0]:
oversample_train = UtilityFunctions.oversample(train, min_class=1, max_class=0, fraction=.6)

In [0]:
pie_plot(oversample_train, "isFraud")

Elapsed time: 48.016s


Let's define a Python dictionary contiaining different choice of features of the training dataset selected using the `FeatureSelector`

In [0]:
SELECTORS = {
    "only_numerical"        : FeatureSelector.select_only_numerical_features(train),
    "over_only_numerical"   : FeatureSelector.select_only_numerical_features(oversample_train),
    "all_features"          : FeatureSelector.select_all(train),
    "over_all_features"     : FeatureSelector.select_all(oversample_train),
    "only_categorical"      : FeatureSelector.select_only_categorical(train, "isFraud"),
    "over_only_categorical" : FeatureSelector.select_only_categorical(oversample_train, "isFraud")
}

### Pipelines Definitions

In the following few sections I'm going to create and run several Machine Learning Pipelines mainly composed of: `StringIndexer` (optionally), `One Hot Encoding` (optionally), `VectorAssembler`, `StandardScaler` (optionally) and a `CrossValidator`. The Cross Validator will use `LogisticRegression`, `DecisionTree`, `RandomForest` and `Gradient Boosted Tree` classifiers as estimators and a `BinaryClassificationEvaluator` as evaluators. Each of these pipelines will be applied to the following experiments: 

- *Logistic Regression*: only numerical features (with and without standardization), only categorical, all features, oversampled all the previous
- *Decision Tree*: only numerical features (with and without standardization), only categorical, all features, oversampled all the previous
- *Random Forest*: only numerical features (with and without standardization), only categorical, all features, oversampled all the previous
- *Gradient Boosted Tree*: only numerical features (with and without standardization), only categorical, all features, oversampled all the previous

I decided to use random dropping to find which features are better to be chosen rather than others.

In [0]:
EXPERIMENTS = {
    "lr_cv_nostd_only_numerical"        : (SELECTORS["only_numerical"],        "isFraud", "lr", False, False, True,  5, LR_CV_PATH),
    "lr_cv_std_only_numerical"          : (SELECTORS["only_numerical"],        "isFraud", "lr", True,  True,  True,  5, LR_CV_PATH),
    "lr_cv_nostd_all_features"          : (SELECTORS["all_features"],          "isFraud", "lr", False, False, False, 5, LR_CV_PATH),
    "lr_cv_nostd_only_categorical"      : (SELECTORS["only_categorical"],      "isFraud", "lr", False, False, False, 5, LR_CV_PATH),
    "lr_cv_nostd_over_only_numerical"   : (SELECTORS["over_only_numerical"],   "isFraud", "lr", False, False, True,  2, LR_CV_PATH),
    "lr_cv_std_over_only_numerical"     : (SELECTORS["over_only_numerical"],   "isFraud", "lr", True,  True,  True,  2, LR_CV_PATH),
    "lr_cv_nostd_over_all_features"     : (SELECTORS["over_all_features"],     "isFraud", "lr", False, False, False, 2, LR_CV_PATH),
    "lr_cv_nostd_over_only_categorical" : (SELECTORS["over_only_categorical"], "isFraud", "lr", False, False, False, 2, LR_CV_PATH),
    
    "dt_cv_nostd_only_numerical"        : (SELECTORS["only_numerical"],        "isFraud", "dt", False, False, True,  5, DT_CV_PATH),
    "dt_cv_std_only_numerical"          : (SELECTORS["only_numerical"],        "isFraud", "dt", True,  True,  True,  5, DT_CV_PATH),
    "dt_cv_nostd_all_features"          : (SELECTORS["all_features"],          "isFraud", "dt", False, False, False, 2, DT_CV_PATH),
    "dt_cv_nostd_only_categorical"      : (SELECTORS["only_categorical"],      "isFraud", "dt", False, False, False, 5, DT_CV_PATH),
    "dt_cv_nostd_over_only_numerical"   : (SELECTORS["over_only_numerical"],   "isFraud", "dt", False, False, True,  2, DT_CV_PATH),
    "dt_cv_std_over_only_numerical"     : (SELECTORS["over_only_numerical"],   "isFraud", "dt", True,  True,  True,  2, DT_CV_PATH),
    "dt_cv_nostd_over_all_features"     : (SELECTORS["over_all_features"],     "isFraud", "dt", False, False, False, 2, DT_CV_PATH),
    "dt_cv_nostd_over_only_categorical" : (SELECTORS["over_only_categorical"], "isFraud", "dt", False, False, False, 2, DT_CV_PATH),
    
    "rf_cv_nostd_only_numerical"        : (SELECTORS["only_numerical"],        "isFraud", "rf", False, False, True,  5, RF_CV_PATH),
    "rf_cv_std_only_numerical"          : (SELECTORS["only_numerical"],        "isFraud", "rf", True,  True,  True,  5, RF_CV_PATH),
    "rf_cv_nostd_all_features"          : (SELECTORS["all_features"],          "isFraud", "rf", False, False, False, 3, RF_CV_PATH),
    "rf_cv_nostd_only_categorical"      : (SELECTORS["only_categorical"],      "isFraud", "rf", False, False, False, 3, RF_CV_PATH),
    "rf_cv_nostd_over_only_numerical"   : (SELECTORS["over_only_numerical"],   "isFraud", "rf", False, False, True,  2, RF_CV_PATH),
    "rf_cv_std_over_only_numerical"     : (SELECTORS["over_only_numerical"],   "isFraud", "rf", True,  True,  True,  2, RF_CV_PATH),
    "rf_cv_nostd_over_all_features"     : (SELECTORS["over_all_features"],     "isFraud", "rf", False, False, False, 2, RF_CV_PATH),
    "rf_cv_nostd_over_only_categorical" : (SELECTORS["over_only_categorical"], "isFraud", "rf", False, False, False, 2, RF_CV_PATH),
    
    "gbt_cv_nostd_only_numerical"        : (SELECTORS["only_numerical"],        "isFraud", "gbt", False, False, True,  0, GBT_CV_PATH),
    "gbt_cv_std_only_numerical"          : (SELECTORS["only_numerical"],        "isFraud", "gbt", True,  True,  True,  0, GBT_CV_PATH),
    "gbt_cv_nostd_all_features"          : (SELECTORS["all_features"],          "isFraud", "gbt", False, False, False, 0, GBT_CV_PATH),
    "gbt_cv_nostd_only_categorical"      : (SELECTORS["only_categorical"],      "isFraud", "gbt", False, False, False, 0, GBT_CV_PATH),
    "gbt_cv_nostd_over_only_numerical"   : (SELECTORS["over_only_numerical"],   "isFraud", "gbt", False, False, True,  0, GBT_CV_PATH),
    "gbt_cv_std_over_only_numerical"     : (SELECTORS["over_only_numerical"],   "isFraud", "gbt", True,  True,  True,  0, GBT_CV_PATH),
    "gbt_cv_nostd_over_all_features"     : (SELECTORS["over_all_features"],     "isFraud", "gbt", False, False, False, 0, GBT_CV_PATH),
    "gbt_cv_nostd_over_only_categorical" : (SELECTORS["over_only_categorical"], "isFraud", "gbt", False, False, False, 0, GBT_CV_PATH),
}

**Utility Functions for Learning Pipeline**

In [0]:
class MLPipelines:
    MODELS = ["dt", "lr", "rf", "gbt"]
    
    @staticmethod
    @timeamt
    def run_crossvalidator(df            : pyspark.sql.dataframe.DataFrame,
                           target_feature: str,
                           model         : str,
                           with_std      : bool = False,
                           with_mean     : bool = False,
                           only_numerical: bool = True,
                           k_fold        : int  = 5,
    ) -> Union[Transformer, List[Transformer]]:
        """
        Defines the general pipeline for logistic regression
        and returns the Transformers for fitted models.
        
        Parameters
        ----------
        df : pyspark.sql.dataframe.DataFrame
            The training dataset
        target_feature : str
            The target variable to be predicted
        model : str
            one between [
                dt  -> DecisionTreeClassifier, 
                lr  -> LogisticRegressionClassifier, 
                rf  -> RandomForestClassifier, 
                gbt -> GradientBoostedTreeClassifier
                ]
        with_std : bool
            if True uses the StandardScaler with withStd
        with_mean : bool
            if True uses the StandardScaler with withMean
        only_numerical : bool
            if True the dataset contains only numerical features
        k_fold : int
            The number of fold to be used
        
        Returns
        -------
        Union[Tranformer, List[Transformer]]
            One or a list of transformers model returned by .fit()
        """
        # Check if the model name is correct
        assert model in MLPipelines.MODELS, f"Please select one of {MLPipelines.MODELS}"
        
        # Defines use variables for FE pipeline
        use_imputer = False
        use_va      = True
        use_ohe     = not only_numerical
        use_stridx  = not only_numerical
        
        if model == "dt" or model == "rf" or model == "gbt":
            use_ohe = False
        
        # Create a base Pipeline
        stages = UtilityFunctions.create_pipeline(df=df,
                                                  target_feat=target_feature,
                                                  imputer_strategy=None,
                                                  use_imputer=use_imputer,
                                                  use_va=use_va,
                                                  use_ohe=use_ohe,
                                                  use_stridx=use_stridx,
                                                  return_stages=True)
        
        # If at least one is true
        if with_std or with_mean:
            assembler = stages[-1] 
            scaler    = StandardScaler(inputCol=assembler.getOutputCol(),
                                       outputCol=f"std_{assembler.getOutputCol()}",
                                       withStd=with_std, withMean=with_mean)
            stages += [scaler]
        
        # Get the name of features col
        feature_col = stages[-1].getOutputCol()
        
        # Get the cross validator
        cv_func = MLPipelines.get_cross_validator(model)
        pipeline, param_grid = cv_func(stages, feature_col, target_feature)
        
        if param_grid is not None:
            # Define the cross validator
            cross_val = CrossValidator(estimator=pipeline,
                                       estimatorParamMaps=param_grid,
                                       evaluator=BinaryClassificationEvaluator(
                                           metricName="areaUnderROC").setLabelCol(target_feature),
                                       numFolds=k_fold,
                                       collectSubModels=True)

            cv_model = cross_val.fit(df)
            return cv_model.bestModel
        else:
            model = pipeline.fit(df)
            return model
            
    @staticmethod
    def logistic_regression_crossvalidator(stages     : list,
                                           feature_col: str, 
                                           target     : str, 
                                           max_iter   : int = 100) -> Tuple[Pipeline, ParamGridBuilder]:
        """ Complete the Pipeline with the LogisticRegression classifier """
        log_reg = LogisticRegression(featuresCol=feature_col, labelCol=target, maxIter=max_iter)
        stages += [log_reg]
        
        # Setup the complete Pipeline
        pipeline = Pipeline(stages=stages)
        
        # Construct the grid of parameter using ParamGridBuilder
        param_grid = ParamGridBuilder().addGrid(log_reg.regParam, [0.0, 0.05, 0.1]) \
                                       .addGrid(log_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
                                       .build()
    
        return pipeline, param_grid
    
    @staticmethod
    def decision_tree_crossvalidator(stages     : list, 
                                     feature_col: str, 
                                     target     : str) -> Tuple[Pipeline, ParamGridBuilder]:
        """ Complete the Pipeline with the DecisionTree classifier """
        dec_tree = DecisionTreeClassifier(featuresCol=feature_col, labelCol=target, maxBins=100)
        stages  += [dec_tree]
        
        # Setup the complete Pipeline
        pipeline = Pipeline(stages=stages)
        
        # Construct the grid of parameter using ParamGridBuilder
        param_grid = ParamGridBuilder().addGrid(dec_tree.maxDepth, [3, 5, 8])           \
                                       .addGrid(dec_tree.impurity, ["gini", "entropy"]) \
                                       .build()
        
        return pipeline, param_grid
    
    @staticmethod
    def random_forest_crossvalidator(stages     : list, 
                                     feature_col: str, 
                                     target     : str) -> Tuple[Pipeline, ParamGridBuilder]:
        """ Complete the Pipeline with the RandomForest classifier """
        rand_forest = RandomForestClassifier(featuresCol=feature_col, labelCol=target, maxBins=100)
        stages     += [rand_forest]
        
        # Setup the complete Pipeline
        pipeline = Pipeline(stages=stages)
        
        # Construct the grid of parameter using ParamGridBuilder
        param_grid = ParamGridBuilder().addGrid(rand_forest.maxDepth, [3,   5,   8]) \
                                       .addGrid(rand_forest.numTrees, [10, 50, 100]) \
                                       .build()
        
        return pipeline, param_grid
    
    @staticmethod
    def gradient_boosted_tree_crossvalidator(stages     : list,
                                             feature_col: str,
                                             target_col : str) -> Tuple[Pipeline, Optional[ParamGridBuilder]]:
        """ Complete the Pipeline with the GradientBoostedTree classifier """
        gbt     = GBTClassifier(featuresCol=feature_col, labelCol=target_col, maxBins=38, maxDepth=8, maxIter=50)
        stages += [gbt]
        
        # Setup the complete Pipeline
        pipeline = Pipeline(stages=stages)
        
        return pipeline, None
        
    @staticmethod
    def get_cross_validator(model: str) -> Callable[[Any], CrossValidator]:
        """ Return the method to create the correct CrossValidator """
        validators = [
            MLPipelines.decision_tree_crossvalidator,
            MLPipelines.logistic_regression_crossvalidator,
            MLPipelines.random_forest_crossvalidator,
            MLPipelines.gradient_boosted_tree_crossvalidator
        ]
        
        return validators[MLPipelines.MODELS.index(model)]
      
    @staticmethod
    def run_experiments(test_df: pyspark.sql.dataframe.DataFrame,
                        filter : str,
                        print_metrics: bool=True
    ) -> List[Tuple[PipelineModel, pyspark.sql.dataframe.DataFrame, str]]:
        """ Runs the experiments selected in EXPERIMENTS """
        outputs = []
        for exp_name, experiment in EXPERIMENTS.items():
            if not exp_name.startswith(filter):
                continue
                
            df_train, tfeature, model, withstd, withmean, onum, kfold, path = experiment
            
            print(f"----------- Running Experiment {exp_name} -----------")
            print(f"    + StandardScaler: with std {withstd}, with mean {withmean}")
            print(f"    + Only Numerical: {onum}")
            print(f"    + Number KFold: {kfold}")
            print(f"    + Number of Features: {len(df_train.columns)}")
            
            model_name = "Cross Validator" if kfold > 0 else "Gradient Boosted Tree"
            print("    + %s ..." % (f"Running {model_name}" if not MODELS_JSON[exp_name] else "Loading Pre-trained Model"))
            
            if not MODELS_JSON[exp_name]:
                cv_model = MLPipelines.run_crossvalidator(df_train, target_feature=tfeature, 
                                                                    model=model, with_std=withstd,
                                                                    with_mean=withmean,
                                                                    only_numerical=onum, k_fold=kfold)
            else:
                cv_model = PipelineModel.load(path + exp_name)
            
            df_test = FeatureSelector.filter_df2df(df_train, test_df)
            
            print("    + Running Best Model on Test set ...")
            predictions = cv_model.transform(df_test)
            
            if print_metrics:
                UtilityFunctions.summarize_batch([(cv_model, predictions, exp_name)])
            else:
                outputs.append((cv_model, predictions, exp_name))
            
            if not MODELS_JSON[exp_name]:
                print("    + Saving Resulting Model ...")
                cv_model.write().overwrite().save(path + exp_name)
                MODELS_JSON[exp_name] = True
            
            print("\n")
        
        return outputs

---

### Logistic Regression

In [0]:
results = MLPipelines.run_experiments(test, "lr", print_metrics=False)

----------- Running Experiment lr_cv_nostd_only_numerical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment lr_cv_std_only_numerical -----------
    + StandardScaler: with std True, with mean True
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment lr_cv_nostd_all_features -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    + Number KFold: 5
    + Number of Features: 232
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment lr_cv_nostd_only_categorical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    +

In [0]:
UtilityFunctions.summarize_batch(results)

***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[0.000]
|----- Best Model Stages
VectorAssembler_94f5415dd63b
LogisticRegressionModel: uid=LogisticRegression_ff08d75fa89f, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.8520780713917224
    + Recall 0.614397603992311
    + Accuracy 0.9772513348849945
    + F1-score 0.7139766915538753
    + AUC ROC 0.8340874611271605
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[0.000]
|----- Best Model Stages
VectorAssembler_007acb8170df
StandardScalerModel: uid=StandardScaler_585060084198, numFeatures=204, withMean=true, withStd=true
LogisticRegressionModel: uid=LogisticRegression_825a9ee41b85, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.8531322765308793
    + Recall 0.6135334836428971
    + Accuracy 0.977247056407448
    + F1-score 0.7137621015522558
    + AUC ROC 0.8329775297141897
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[1.000]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_502b1ab7542f, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4040f28fa8ca, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6f951be0d8c5, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_a545d2e020fc, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4f28697dddd4, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4b4202c3ab5b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_69717d58f0e0, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_9e9427afcd6d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_7474cbfcb724, handleInvalid=ke

|----- General Metrics
    + Precision 0.8583308867550888
    + Recall 0.6241204901707698
    + Accuracy 0.977721967415115
    + F1-score 0.7227244037928259
    + AUC ROC 0.8570424600983673
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[1.000]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_3943c40b06d9, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_bc88d1ffb683, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_dee6db64bc3c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_33c92ff24775, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_897c3e15a628, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_f7b9b9400835, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_1cd7df82af68, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c98a8ac2868b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_15c327350083, handleInvalid=ke

|----- General Metrics
    + Precision 0.5424799017597155
    + Recall 0.6991256097416756
    + Accuracy 0.8604446193866374
    + F1-score 0.6109212444325405
    + AUC ROC 0.8017546847547923
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[0.500]
|----- Best Model Stages
VectorAssembler_60b4cccaa6e5
LogisticRegressionModel: uid=LogisticRegression_f18ebf5560e8, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.5727320147135642
    + Recall 0.7373129741612232
    + Accuracy 0.9099252122124863
    + F1-score 0.6446843409988705
    + AUC ROC 0.8409709469882269
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[1.000]
|----- Best Model Stages
VectorAssembler_5c194b5a3f7a
StandardScalerModel: uid=StandardScaler_809c66dd9ca9, numFeatures=204, withMean=true, withStd=true
LogisticRegressionModel: uid=LogisticRegression_ea5aab9746c7, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.5727320147135642
    + Recall 0.7373129741612232
    + Accuracy 0.9099252122124863
    + F1-score 0.6446843409988705
    + AUC ROC 0.8409710149661672
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[0.500]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_3c0ef2cfc81e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6e22132333d3, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_03f3cbb6f2fe, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c84b4e0271d2, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_822ab60fcda3, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_76945b93e9c7, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_803ea13ec747, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ac2998a389a7, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_1786eb9683da, handleInvalid=k

|----- General Metrics
    + Precision 0.5740070507833459
    + Recall 0.7643222468698667
    + Accuracy 0.9033192428806134
    + F1-score 0.6556328992321817
    + AUC ROC 0.862136117955044
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: lambda=[0.000]; alfa=[0.000]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_d84db1472444, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4d0ad9b4923d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_871fc147d605, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6bfa480748cc, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_54f6507043d5, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_5a5e5b14447b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_92d87dad45ad, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_732dc42aff03, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_79b17ec7b214, handleInvalid=ke

|----- General Metrics
    + Precision 0.542475921593088
    + Recall 0.6991775951632087
    + Accuracy 0.8603932776560789
    + F1-score 0.610938567281237
    + AUC ROC 0.8017472680782937


### Decision Tree

In [0]:
dt_results = MLPipelines.run_experiments(test, "dt", print_metrics=False)

----------- Running Experiment dt_cv_nostd_only_numerical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment dt_cv_std_only_numerical -----------
    + StandardScaler: with std True, with mean True
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment dt_cv_nostd_all_features -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    + Number KFold: 2
    + Number of Features: 232
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment dt_cv_nostd_only_categorical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    +

In [0]:
UtilityFunctions.summarize_batch(dt_results)

***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[5]; impurity=[entropy]
|----- Best Model Stages
VectorAssembler_d5f5d012734b
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_90ef744a67e5, depth=5, numNodes=29, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.8780786296069837
    + Recall 0.6013450135040904
    + Accuracy 0.9773155120481928
    + F1-score 0.7138296157931833
    + AUC ROC 0.4289720448772771
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[5]; impurity=[entropy]
|----- Best Model Stages
VectorAssembler_b2d043b044c4
StandardScalerModel: uid=StandardScaler_f87e38f6eabe, numFeatures=204, withMean=true, withStd=true
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_7a7eba5b5d59, depth=5, numNodes=29, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.8780786296069837
    + Recall 0.6013450135040904
    + Accuracy 0.9773155120481928
    + F1-score 0.7138296157931833
    + AUC ROC 0.4289720448772771
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]; impurity=[entropy]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_fd6da02d612d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c48965b6fec3, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4f1b3ebd223e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_643b64059e8c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_665dc61b7179, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_a943f0e249bf, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_dfb2ed3a9e9d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8b0051b2135f, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_088f59d41d5f, handleInval

|----- General Metrics
    + Precision 0.869923998363693
    + Recall 0.654836502584921
    + Accuracy 0.9790953587075575
    + F1-score 0.7472097922903491
    + AUC ROC 0.32499648690359983
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[5]; impurity=[gini]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_b656cd13ca3c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3b5980f39dc6, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_b310a6b5f724, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_bfced1688551, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_beab72050a26, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3b46e13cf1ea, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_a477d33b73c7, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3486eaa112b7, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3b80685960d8, handleInvalid=k

|----- General Metrics
    + Precision 0.732816245207917
    + Recall 0.5024268574235862
    + Accuracy 0.9734392113910186
    + F1-score 0.5961361975863677
    + AUC ROC 0.7074635101835314
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[3]; impurity=[entropy]
|----- Best Model Stages
VectorAssembler_c6cd42824f48
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c726835167f1, depth=3, numNodes=7, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.5418342244727902
    + Recall 0.6981960252198072
    + Accuracy 0.8586348233844469
    + F1-score 0.6101568924608747
    + AUC ROC 0.5358708989219462
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[3]; impurity=[entropy]
|----- Best Model Stages
VectorAssembler_2f7fc77a9e24
StandardScalerModel: uid=StandardScaler_8279da8ff4a7, numFeatures=204, withMean=true, withStd=true
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_273a49f052d8, depth=3, numNodes=7, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.5418342244727902
    + Recall 0.6981960252198072
    + Accuracy 0.8586348233844469
    + F1-score 0.6101568924608747
    + AUC ROC 0.5358708989219462
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[3]; impurity=[entropy]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_aefea88ec795, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_d4d662803cee, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_4886604b71e7, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_cf6c5f090a8a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_1d0617562c8e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8d5ed5009538, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_7051d65a6047, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_33a2823a96e1, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_9f2cf3bef427, handleInval

|----- General Metrics
    + Precision 0.5418342244727902
    + Recall 0.6981960252198072
    + Accuracy 0.8586348233844469
    + F1-score 0.6101568924608747
    + AUC ROC 0.5358708989219462
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[3]; impurity=[gini]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_1b13a59fca6a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_00fa768bae0c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_b183033c30df, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ea94a48eb9f4, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6a4abcbb2c6f, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ab8f8d94aecd, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_82a1902b5005, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_cd549af0e5ae, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_82ec0876d120, handleInvalid=

|----- General Metrics
    + Precision 0.5291488065813443
    + Recall 0.6927250968770087
    + Accuracy 0.7862001985213581
    + F1-score 0.5999877029273325
    + AUC ROC 0.6795285011417181


### Random Forest

In [0]:
rf_results = MLPipelines.run_experiments(test, "rf", print_metrics=False)

----------- Running Experiment rf_cv_nostd_only_numerical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment rf_cv_std_only_numerical -----------
    + StandardScaler: with std True, with mean True
    + Only Numerical: True
    + Number KFold: 5
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment rf_cv_nostd_all_features -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    + Number KFold: 3
    + Number of Features: 232
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment rf_cv_nostd_only_categorical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    +

In [0]:
UtilityFunctions.summarize_batch(rf_results)

***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
VectorAssembler_d08426e39d28
RandomForestClassificationModel: uid=RandomForestClassifier_d4047a9dced2, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.9375942755043576
    + Recall 0.6165127687528482
    + Accuracy 0.978928498083242
    + F1-score 0.7438854934658496
    + AUC ROC 0.8459139391759181
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
VectorAssembler_76aaad38d380
StandardScalerModel: uid=StandardScaler_4754ec2ae0c0, numFeatures=204, withMean=true, withStd=true
RandomForestClassificationModel: uid=RandomForestClassifier_92dce755c9dd, numTrees=100, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.9352700786462955
    + Recall 0.6131228990094366
    + Accuracy 0.9787359665936474
    + F1-score 0.7406847102143052
    + AUC ROC 0.849082759528532
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_756b40d13dd5, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_e0483cda803e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_edd88f563c80, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_d0780898a5e1, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_575656600589, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_63a78c8b443f, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_de0c6d26451a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_08e3f4c84c6f, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8f30d913e204, handleInvalid=keep
StringIndexer

|----- General Metrics
    + Precision 0.9304897141147712
    + Recall 0.6130049856076292
    + Accuracy 0.9786589539978094
    + F1-score 0.7390952931831364
    + AUC ROC 0.8489835648438565
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_c852b556b32d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_089f3353a63d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_dc2abae149ad, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_0b0c02022f06, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_39083a11f843, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_cbd789706bca, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_fc343a3794e6, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_0879d598c59e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3d184f70111c, handleInvalid=keep
StringIndexe


Precision is ill-defined and being set to 0.0 in labels with no predicted samples. Use `zero_division` parameter to control this behavior.

|----- General Metrics
    + Precision 0.4867217449342826
    + Recall 0.5
    + Accuracy 0.9734434898685652
    + F1-score 0.4932715301279786
    + AUC ROC 0.7845890401460132
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
VectorAssembler_3f70174e0f8d
RandomForestClassificationModel: uid=RandomForestClassifier_10ccca7036e2, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.6124815088846494
    + Recall 0.7610122016387404
    + Accuracy 0.9397419222343921
    + F1-score 0.6787157421517545
    + AUC ROC 0.8661300212628554
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
VectorAssembler_faaf836a65cd
StandardScalerModel: uid=StandardScaler_ff853798acc1, numFeatures=204, withMean=true, withStd=true
RandomForestClassificationModel: uid=RandomForestClassifier_41f41a4f29cc, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.6116190623557974
    + Recall 0.7623734432473743
    + Accuracy 0.9390359734392114
    + F1-score 0.6787258716803928
    + AUC ROC 0.8649277396940455
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_79fd9bcb9609, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c1db21c75824, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6ce7166d2a35, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_9ef3af04a73e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_eff4bfd7c67c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ac8d2b1130eb, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_390ae10613e5, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ff3b3bd6e070, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8bf7f3d124e9, handleInvalid=keep
StringIndexe

|----- General Metrics
    + Precision 0.6166589708523571
    + Recall 0.7684994948251251
    + Accuracy 0.9410468578860898
    + F1-score 0.6842568837026629
    + AUC ROC 0.8729080896253194
***************** Training Summary *****************
|----- Best model parameters according to K-Fold Cross Validation: maxDepth=[8]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_8fdfca75c39c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_06ecf1d16507, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_12112299720c, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_e441b81efcba, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c9d1779ca19b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_281c227add5b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_dc969b9c81dc, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_68d0e632c0cb, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_fabf238941b6, handleInvalid=keep
StringIndexe

|----- General Metrics
    + Precision 0.5501643216938918
    + Recall 0.7012236104264074
    + Accuracy 0.8829879175794085
    + F1-score 0.6165765260854369
    + AUC ROC 0.8106930460246264


### Gradient Boosted Tree

In [0]:
gbt_results = MLPipelines.run_experiments(test, "gbt", print_metrics=False)

----------- Running Experiment gbt_cv_nostd_only_numerical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: True
    + Number KFold: 0
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment gbt_cv_std_only_numerical -----------
    + StandardScaler: with std True, with mean True
    + Only Numerical: True
    + Number KFold: 0
    + Number of Features: 205
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment gbt_cv_nostd_all_features -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
    + Number KFold: 0
    + Number of Features: 232
    + Loading Pre-trained Model ...
    + Running Best Model on Test set ...


----------- Running Experiment gbt_cv_nostd_only_categorical -----------
    + StandardScaler: with std False, with mean False
    + Only Numerical: False
 

In [0]:
UtilityFunctions.summarize_batch(gbt_results)

***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
VectorAssembler_bbd68a87303a
GBTClassificationModel: uid = GBTClassifier_b05196297186, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.9328288911771181
    + Recall 0.7251172820743921
    + Accuracy 0.983814519441402
    + F1-score 0.8159617738183462
    + AUC ROC 0.9115445765054955
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
VectorAssembler_413709d774c0
StandardScalerModel: uid=StandardScaler_188b21694e41, numFeatures=204, withMean=true, withStd=true
GBTClassificationModel: uid = GBTClassifier_f14a853c5eb9, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.9325474392208113
    + Recall 0.7235083954140278
    + Accuracy 0.9837332283680176
    + F1-score 0.8148347263265827
    + AUC ROC 0.9139045956817043
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_2630fe5f4c49, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_50e9c0c91077, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c338ad82f8ef, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3d3cee8f957b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_059822e03064, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_66e8c01dad4e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_5c19e7d8a982, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8b424a612a15, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_012c390749ab, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_e7

|----- General Metrics
    + Precision 0.9374035210695222
    + Recall 0.7359423130052222
    + Accuracy 0.9844477341182913
    + F1-score 0.824545532031759
    + AUC ROC 0.9200487888721999
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_122e149e6196, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_5213fbdd964b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_7ec715ff3b02, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_fd3b5f193c52, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_ad7720e68ef9, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_6dbf7037518b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_f24dac94c1af, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_99d267dcc110, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_b24b3b14e876, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_a05

|----- General Metrics
    + Precision 0.8211805182853962
    + Recall 0.5433826556694986
    + Accuracy 0.9746115142387732
    + F1-score 0.6540045332114808
    + AUC ROC 0.845602512759801
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
VectorAssembler_32cc2ec0ab7e
GBTClassificationModel: uid = GBTClassifier_940598f24a77, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.6372501298653945
    + Recall 0.8456898428155795
    + Accuracy 0.9428951601861993
    + F1-score 0.7268210070374986
    + AUC ROC 0.9308226051522023
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
VectorAssembler_956db7063fe6
StandardScalerModel: uid=StandardScaler_8682dc2d7cec, numFeatures=204, withMean=true, withStd=true
GBTClassificationModel: uid = GBTClassifier_a69a913fae77, numTrees=50, numClasses=2, numFeatures=204

***************** Test Summary *****************


|----- General Metrics
    + Precision 0.6376730960284476
    + Recall 0.8447554725759228
    + Accuracy 0.9432117675246441
    + F1-score 0.7267504809241503
    + AUC ROC 0.9316931887166953
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_f6f4047394fc, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_646ff1099289, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_bd460ccb1263, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_fe03a4940332, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3e2c3107ad2a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_773bee4cd0da, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_5b626140b31e, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_04a01f873604, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_27459499ea7d, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_72

|----- General Metrics
    + Precision 0.6520696732515474
    + Recall 0.859711477249596
    + Accuracy 0.9486839403066812
    + F1-score 0.7416308661804886
    + AUC ROC 0.9408700594404137
***************** Training Summary *****************
|----- Model parameters: maxDepth=[8]; maxIter=[50]
|----- Best Model Stages
StringIndexerModel: uid=StringIndexer_bdd968aea43a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_b44389659aeb, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_0459f3ebad8b, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_8f99bb2c43db, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_add958999fe0, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_c3cb83e21ed9, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_30cec28e7673, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_addfe4b22a6a, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_d5ae39821ebc, handleInvalid=keep
StringIndexerModel: uid=StringIndexer_3dd

|----- General Metrics
    + Precision 0.559317720235051
    + Recall 0.748121032558256
    + Accuracy 0.8823033611719606
    + F1-score 0.6400871161213421
    + AUC ROC 0.8501134973494732
