## Implementing Oversampling technique (SMOTE)

**This notebook is to demonstrate oversampling technique applied to handle imbalanced dataset.**

The resulting dataframe that came out of the SMOTE implementation was then applied to 2 models - Logistic regression and Random Forest. However, due to the limited cluster memory availability, both model run could not succeed and aborted with this error message - "**Error: java.lang.OutOfMemoryError: GC overhead limit exceeded**"

We are attaching this notebook just as a reference to showcase the implementation logic for SMOTE technique.

### Section 1 - Setup Environment

In [0]:
import random
from functools import reduce
from pyspark.sql import Row
from pyspark.sql.functions import rand,udf,lower,sum as ps_sum,count as ps_count,row_number
from pyspark.sql.window import *
from pyspark.sql import DataFrame
from pyspark.ml.feature import VectorAssembler,BucketedRandomProjectionLSH,VectorSlicer
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.sql.functions import array, create_map, struct

from pyspark.sql.functions import col, concat, count, countDistinct, expr, lit, length, max as pyspark_max, min as pyspark_min, mean, substring, split, sum as pyspark_sum, when, to_utc_timestamp, to_timestamp, unix_timestamp, isnan
import pandas as pd
from html import escape
from IPython.display import HTML, display as ipython_display
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType, StringType, BooleanType, DateType, DoubleType
from pandas.tseries.holiday import USFederalHolidayCalendar
from pyspark.sql.functions import substring
from pyspark.sql import functions as F

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, max, substring
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Imputer
from pyspark.ml.classification import LinearSVC as svc
from pyspark.ml import Pipeline

import numpy as np
import itertools
import time

In [0]:
blob_container = "w261-container" # The name of your container created in https://portal.azure.com
storage_account = "w261storageaccount" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

### Section 2 - Load Data

In [0]:
# Reading the split test and train data from blob storage in case of full data
# Reading the toy test and train data from blob storage in case of toy dataset

train_df = spark.read.parquet(f"{blob_url}/original_train_data_full_v2/*")
test_df = spark.read.parquet(f"{blob_url}/original_test_data_full_v2/*")

In [0]:
#Preparing final dataset for modeling
# Let's find out the datatype of the features, which will be the same for both train and test data

print("\n----Categorical columns train-----")
cat_cols = [x for (x, dataType) in train_df.dtypes if dataType == "string"]
print(cat_cols)

print("\n----Numeric columns train-----")
num_cols = [x for (x, dataType) in train_df.dtypes if (((dataType == "double") or (dataType == "int")) & (x != "departure_delay_boolean"))]
print(num_cols)

In [0]:
# One hot encoding - converting categorical columns into binary sparse vectors

from pyspark.ml.feature import (OneHotEncoder, StringIndexer)
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in cat_cols
]

one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in cat_cols],
        outputCols=[f"{x}_OneHotEncoder" for x in cat_cols],
    )
]

from pyspark.ml.feature import Imputer

imputer = [
      Imputer(inputCol=x, outputCol= x + "_Imputed").setStrategy("median")
      for x in num_cols
]


from pyspark.ml.feature import StandardScaler  

scaler_vector_assembler = VectorAssembler(inputCols = [f"{x}_Imputed" for x in num_cols], outputCol="numeric_vec")
scaler_vector_assembler.setHandleInvalid('keep')

scaler = StandardScaler(inputCol="numeric_vec", outputCol="scaled_features")

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler_input = ["scaled_features"]
assembler_input += [f"{x}_OneHotEncoder" for x in cat_cols]

vector_assembler = VectorAssembler(
    inputCols=assembler_input, outputCol="VectorAssembler_features"
)

### Section 3 - Imbalanced data handling

#### 3.0.  Setting up Pipeline

In [0]:
stages = []
stages += imputer
stages += [scaler_vector_assembler , scaler]
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [0]:
from pyspark.ml import Pipeline

general_pipeline = Pipeline().setStages(stages)
pipeline_model = general_pipeline.fit(train_df)
model_data_test = pipeline_model.transform(test_df)
model_data_train = pipeline_model.transform(train_df) 

selectedcols = ["departure_delay_boolean", "VectorAssembler_features"]
model_data_train = model_data_train.select(selectedcols)
model_data_test = model_data_test.select(selectedcols)

In [0]:
model_data_train.write.mode('overwrite').parquet(f"{blob_url}/train_data_pre_upsample")
model_data_test.write.mode('overwrite').parquet(f"{blob_url}/test_data_pre_upsample")
train_df = spark.read.parquet(f"{blob_url}/train_data_pre_upsample/*")
test_df = spark.read.parquet(f"{blob_url}/test_data_pre_upsample/*")

#### 3.1. Oversampling (SMOTE)
SMOTE is a method to handle imbalanced data and is applied to upsample the minority class dataset by creating synthetic samples.

In [0]:
# SMOTE Implementation for handling imbalanced data between majority and minority class

def smote(df):
    '''
    contains logic to perform smote oversampling, given a spark df with 2 classes
    inputs:
    * df: cat cols are already stringindexed, num cols are assembled into 'VectorAssembler_features' vector
      df target col should be 'departure_delay_boolean'
    output:
    * oversampled_df: spark df after smote oversampling
    '''
    dataInput_min = df[df['departure_delay_boolean'] == 1]
    dataInput_maj = df[df['departure_delay_boolean'] == 0]
    
    # seed = Random, 12345 or 40
    # bucketLength = math.pow(numRecords, -1/inputDim) = math.pow(5017444, -1/26)
     # LSH, bucketed random projection
    brp = BucketedRandomProjectionLSH(inputCol="VectorAssembler_features", outputCol="hashes",seed=12345, bucketLength=0.55)
    # smote only applies on existing minority instances    
    model = brp.fit(dataInput_min)
    model.transform(dataInput_min)

    # here distance is calculated from brp's param inputCol
    self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")

    # remove self-comparison (distance 0)
    self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)

    over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")

    self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))

    self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= 2)

    over_original_rows_no_order = Window.partitionBy('datasetA')

    # list to store batches of synthetic data
    res = []
    
    # two udf for vector add and subtract, subtraction include a random factor [0,1]
    subtract_vector_udf = F.udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
    add_vector_udf = F.udf(lambda arr: arr[0]+arr[1], VectorUDT())
    
    # retain original columns
    original_cols = dataInput_min.columns
    # range_multiplier = dataInput_maj / dataInput_min
    range_multiplier = 10
    for i in range(range_multiplier):
        print("generating batch %s of synthetic instances"%i)
        # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
        df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                            .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])
        # create synthetic feature numerical part
        df_vec_diff = df_random_sel.select('*', subtract_vector_udf(F.array('datasetA.VectorAssembler_features', 'datasetB.VectorAssembler_features')).alias('vec_diff'))
        df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.VectorAssembler_features', 'vec_diff')).alias('VectorAssembler_features'))
        
        # for categorical cols, either pick original or the neighbour's cat values
        for c in original_cols:
            # randomly select neighbour or original data
            col_sub = random.choice(['datasetA','datasetB'])
            val = "{0}.{1}".format(col_sub,c)
            if c != 'VectorAssembler_features':
                # do not unpack original numerical features
                df_vec_modified = df_vec_modified.withColumn(c,F.col(val))
        
        # this df_vec_modified is the synthetic minority instances,
        df_vec_modified = df_vec_modified.drop(*['datasetA','datasetB','vec_diff','EuclideanDistance'])
        
        res.append(df_vec_modified)
    
    dfunion = reduce(DataFrame.unionAll, res)
    # union synthetic instances with original full (both minority and majority) df
    oversampled_df = dfunion.union(df.select(dfunion.columns))
    
    return oversampled_df

In [0]:
oversampled_train_df = smote(model_data_train)

In [0]:
oversampled_train_df.write.mode('overwrite').parquet(f"{blob_url}/train_data_upsample")
train_df = spark.read.parquet(f"{blob_url}/train_data_upsample/*")

### Section 4 - ML section

#### 4.0. Logistic regression

In [0]:
from pyspark.ml.classification import LogisticRegression
# Setup pipeline

start_lr = time.time()

lr = LogisticRegression(featuresCol='VectorAssembler_features',
                        labelCol='departure_delay_boolean', 
                        regParam = 0.2, 
                        fitIntercept=True, 
                        elasticNetParam=0.0, 
                        maxIter=10, 
                        threshold = 0.6)

stages_lr = [lr]
pipeline_lr = Pipeline(stages=stages_lr)
lr_model = pipeline_lr.fit(train_df)
lrPredictions = lr_model.transform(test_df)

lrPredictions.write.mode('overwrite').parquet(f"{blob_url}/lrPredictions_upsample")

print(f'lr model completed job in {time.time() - start_lr} seconds.')
lr_time_model = time.time() - start_lr

In [0]:
# Logistic Regression Model Evaluation with scikit learn

lrPredictions = spark.read.parquet(f"{blob_url}/lrPredictions_upsample/*")

y_true_lr = lrPredictions.select(['departure_delay_boolean']).collect()
y_pred_lr = lrPredictions.select(['prediction']).collect()

# Print metrics
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true_lr, y_pred_lr))
print(confusion_matrix(y_true_lr, y_pred_lr))