# Set Up

In [0]:
blob_container = "blobcontainer" # The name of your container created in https://portal.azure.com
storage_account = "w261section05group03" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261section05group04" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "houseofthedragon" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
from pyspark.sql.functions import col, when, to_timestamp

##SMOTE

In [0]:
dataset = spark.read.parquet(f"{blob_url}/dev_rank_set_1")

In [0]:
#get current count of undelayed and delayed flights
dataset.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|  0.0|700180|
|  1.0|128046|
+-----+------+



In [0]:
#to use in SMOTE
#continuous flight features, continuous graph features (page rank), categorical flight features
flight_features_cont = ['DISTANCE', 'PREV_DEP_DELAY', 'CRS_DEP_TIME', 'CRS_ARR_TIME','PageRank_origin','PageRank_dest']
flight_features_cat = ['DAY_OF_MONTH']
target_col = ['label']

#dataset_sample = dataset.select('DISTANCE', 'PREV_DEP_DELAY', 'CRS_DEP_TIME', 'CRS_ARR_TIME','PageRank_origin','PageRank_dest','DAY_OF_MONTH','label').limit(10)
dataset_sample = dataset.select('DISTANCE', 'PREV_DEP_DELAY', 'CRS_DEP_TIME', 'CRS_ARR_TIME','PageRank_origin','PageRank_dest','DAY_OF_MONTH','label')
#dataset_sample = dataset.withColumns(features_to_keep,target_col).limit(10)
display(dataset_sample)
#display(dataset)

DISTANCE,PREV_DEP_DELAY,CRS_DEP_TIME,CRS_ARR_TIME,PageRank_origin,PageRank_dest,DAY_OF_MONTH,label
406.0,1.0,1905.0,2031.0,0.0660454647468901,0.0105710820679648,31,1.0
508.0,0.0,1905.0,2051.0,0.0660454647468901,0.0010472549080134,31,0.0
551.0,0.0,1905.0,2002.0,0.0660454647468901,0.0011619875019345,31,1.0
270.0,15.0,1905.0,2015.0,0.0660454647468901,0.0032863159328023,31,0.0
606.0,0.0,1905.0,2015.0,0.0660454647468901,0.0545458420388175,31,1.0
563.0,0.0,1905.0,2000.0,0.0660454647468901,0.000992030384349354,31,0.0
577.0,0.0,1905.0,2053.0,0.0660454647468901,0.0163048185755017,31,0.0
270.0,6.0,1905.0,2011.0,0.0660454647468901,0.0032863159328023,31,0.0
332.0,0.0,1905.0,1929.0,0.0660454647468901,0.0026759716363659,31,0.0
581.0,1.0,1905.0,2056.0,0.0660454647468901,0.0118723501099944,31,0.0


In [0]:
#Making sure label columns has only two classes
dataset_sample.select('label').distinct().count()

Out[71]: 2

In [0]:
import random
import numpy as np
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.functions import rand,col,when,concat,substring,lit,udf,lower,sum as ps_sum,count as ps_count,row_number
from pyspark.sql.window import *
from pyspark.sql import DataFrame
from pyspark.ml.feature import VectorAssembler,BucketedRandomProjectionLSH,VectorSlicer
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.sql.functions import array, create_map, struct
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline 

############################## spark smote oversampling ##########################
#for categorical columns, must take its stringIndexed form (smote should be after string indexing, default by frequency)

def pre_smote_df_process(df,num_cols,cat_cols,target_col,index_suffix="_index"):
    '''
    string indexer and vector assembler
    inputs:
    * df: spark df, original
    * num_cols: numerical cols to be assembled
    * cat_cols: categorical cols to be stringindexed
    * target_col: prediction target
    * index_suffix: will be the suffix after string indexing
    output:
    * vectorized: spark df, after stringindex and vector assemble, ready for smote
    '''
    #if(df.select(target_col).distinct().count() != 2):
        #raise ValueError("Target col must have exactly 2 classes")
        
    #num_cols_list = num_cols.columns
        
    if target_col in num_cols:
    #if array_contains()
        num_cols.remove(target_col)

    # only assembled numeric columns into features
    assembler = VectorAssembler(inputCols = num_cols, outputCol = 'features')
    #assembler.setInputCols(num_cols)
    # index the string cols, except possibly for the label col
    assemble_stages = [StringIndexer(inputCol=column, outputCol=column+index_suffix).fit(df).setHandleInvalid("keep") for column in list(set(cat_cols)-set([target_col]))]
    # add the stage of numerical vector assembler
    assemble_stages.append(assembler)
    pipeline = Pipeline(stages=assemble_stages)
    pos_vectorized = pipeline.fit(df).transform(df)
    
    # drop original num cols and cat cols
    drop_cols = num_cols+cat_cols
    
    keep_cols = [a for a in pos_vectorized.columns if a not in drop_cols]
    
    vectorized = pos_vectorized.select(*keep_cols).withColumn('label',pos_vectorized[target_col]).drop(target_col)
    
    return vectorized

def smote(vectorized_sdf,smote_config):
    '''
    contains logic to perform smote oversampling, given a spark df with 2 classes
    inputs:
    * vectorized_sdf: cat cols are already stringindexed, num cols are assembled into 'features' vector
      df target col should be 'label'
    * smote_config: config obj containing smote parameters
    output:
    * oversampled_df: spark df after smote oversampling
    '''
    dataInput_min = vectorized_sdf[vectorized_sdf['label'] == 1]
    dataInput_maj = vectorized_sdf[vectorized_sdf['label'] == 0]
    
    # LSH, bucketed random projection
    brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=smote_config.seed, bucketLength=smote_config.bucketLength)
    # smote only applies on existing minority instances    
    model = brp.fit(dataInput_min)
    model.transform(dataInput_min)

    # here distance is calculated from brp's param inputCol
    self_join_w_distance = model.approxSimilarityJoin(dataInput_min, dataInput_min, float("inf"), distCol="EuclideanDistance")

    # remove self-comparison (distance 0)
    self_join_w_distance = self_join_w_distance.filter(self_join_w_distance.EuclideanDistance > 0)

    over_original_rows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")

    self_similarity_df = self_join_w_distance.withColumn("r_num", F.row_number().over(over_original_rows))

    self_similarity_df_selected = self_similarity_df.filter(self_similarity_df.r_num <= smote_config.k)

    over_original_rows_no_order = Window.partitionBy('datasetA')

    # list to store batches of synthetic data
    res = []
    
    # two udf for vector add and subtract, subtraction include a random factor [0,1]
    subtract_vector_udf = F.udf(lambda arr: random.uniform(0, 1)*(arr[0]-arr[1]), VectorUDT())
    add_vector_udf = F.udf(lambda arr: arr[0]+arr[1], VectorUDT())
    
    # retain original columns
    original_cols = dataInput_min.columns
    
    for i in range(smote_config.multiplier):
        print("generating batch %s of synthetic instances"%i)
        # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
        df_random_sel = self_similarity_df_selected.withColumn("rand", F.rand()).withColumn('max_rand', F.max('rand').over(over_original_rows_no_order))\
                            .where(F.col('rand') == F.col('max_rand')).drop(*['max_rand','rand','r_num'])
        # create synthetic feature numerical part
        df_vec_diff = df_random_sel.select('*', subtract_vector_udf(F.array('datasetA.features', 'datasetB.features')).alias('vec_diff'))
        df_vec_modified = df_vec_diff.select('*', add_vector_udf(F.array('datasetA.features', 'vec_diff')).alias('features'))
        
        # for categorical cols, either pick original or the neighbour's cat values
        for c in original_cols:
            # randomly select neighbour or original data
            col_sub = random.choice(['datasetA','datasetB'])
            val = "{0}.{1}".format(col_sub,c)
            if c != 'features':
                # do not unpack original numerical features
                df_vec_modified = df_vec_modified.withColumn(c,F.col(val))
        
        # this df_vec_modified is the synthetic minority instances,
        df_vec_modified = df_vec_modified.drop(*['datasetA','datasetB','vec_diff','EuclideanDistance'])
        
        res.append(df_vec_modified)
    
    dfunion = reduce(DataFrame.unionAll, res)
    # union synthetic instances with original full (both minority and majority) df
    oversampled_df = dfunion.union(vectorized_sdf.select(dfunion.columns))
    
    return oversampled_df

In [0]:
#separate columns into numerical and categorical features
num_cols_test = ['DISTANCE', 'PREV_DEP_DELAY', 'CRS_DEP_TIME', 'CRS_ARR_TIME','PageRank_origin','PageRank_dest']
cat_cols_test = ['DAY_OF_MONTH']
target_col_test = 'label' 

#pre-process dataset
sample_processed = pre_smote_df_process(dataset_sample,num_cols = num_cols_test, cat_cols = cat_cols_test,target_col = target_col_test)

#look to see if num features got vectorized
display(sample_processed)

label,DAY_OF_MONTH_index,features
1.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(406.0, 1.0, 1905.0, 2031.0, 0.06604546474689019, 0.01057108206796489))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(508.0, 0.0, 1905.0, 2051.0, 0.06604546474689019, 0.0010472549080134436))"
1.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(551.0, 0.0, 1905.0, 2002.0, 0.06604546474689019, 0.0011619875019345857))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(270.0, 15.0, 1905.0, 2015.0, 0.06604546474689019, 0.003286315932802374))"
1.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(606.0, 0.0, 1905.0, 2015.0, 0.06604546474689019, 0.054545842038817516))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(563.0, 0.0, 1905.0, 2000.0, 0.06604546474689019, 9.92030384349354E-4))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(577.0, 0.0, 1905.0, 2053.0, 0.06604546474689019, 0.016304818575501732))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(270.0, 6.0, 1905.0, 2011.0, 0.06604546474689019, 0.003286315932802374))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(332.0, 0.0, 1905.0, 1929.0, 0.06604546474689019, 0.0026759716363659604))"
0.0,29.0,"Map(vectorType -> dense, length -> 6, values -> List(581.0, 1.0, 1905.0, 2056.0, 0.06604546474689019, 0.011872350109994463))"


In [0]:
#create smote_config class

class smote_config():
    def __init__(self,seed,bucketLength,k,multiplier):
        self.seed = seed
        self.bucketLength = bucketLength
        self.k = k
        self.multiplier = multiplier
    

In [0]:
#create parameters for smote
#12345 for seed, 1 bucket length, k=1, 4 multipler since 20% is delays and 80% is undelayed to get 50/50
smote_params = smote_config(12345,1,1,4)

In [0]:
sample_smote = smote(sample_processed,smote_params)
display(sample_smote)

generating batch 0 of synthetic instances
generating batch 1 of synthetic instances
generating batch 2 of synthetic instances
generating batch 3 of synthetic instances


features,label,DAY_OF_MONTH_index
"Map(vectorType -> dense, length -> 6, values -> List(66.71041092564671, 0.0, 908.0694061709781, 952.2836068791328, 0.0553730355038737, 0.005686689057799806))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(94.86605603360361, -1.7094239816707562, 1931.5584639511221, 2036.122599923628, -0.0031722599112011165, 0.01644145345358648))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(143.0, 0.0, 1900.0, 2011.9341736933038, 4.8656889527088487E-4, 0.06615914234761744))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(147.73562279903288, 0.0, 1029.1697591074692, 1139.0, 1.8181363624707712E-4, 0.05452496623160936))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(218.28244598606676, 866.1217766101306, 271.235284539521, 449.5521783451886, 2.808524486275332E-4, 0.0740532865552374))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(170.0, 0.0, 1757.0, 2015.5543511334183, 2.712590076892598E-4, 0.06615914234761744))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(170.18176770481597, -0.8484960655964535, 1250.0, 1359.7070800546637, -0.003487803730060349, 0.024325950472598067))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(184.0, -1.609825153714242, 1500.0, 1617.3901748462858, 0.019790389897284495, 0.016588887517272822))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(220.01189821182186, -0.8111323841032932, 857.1669857615494, 952.9224562251362, 0.07537161671482673, -0.05153817114569043))",1.0,0.0
"Map(vectorType -> dense, length -> 6, values -> List(191.97414840318788, 4.461222604781834, 1339.4612226047818, 1446.3578162175334, 0.033682293197784825, 0.004012311539358739))",1.0,0.0


In [0]:
#See if SMOTE gets balances out the label counts (roughly 50/50 and could increase the multiplier)
sample_smote.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|  1.0|634762|
|  0.0|700180|
+-----+------+

