In [1]:
%%configure -f
{
"kind": "pyspark",

"conf": {
   
    "spark.master": "yarn",
    "spark.yarn.dist.archives": "/use_case/hhmatching/Library_zip/extra_lib_3.zip#lcm_zip", 
    "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "./lcm_zip/bin/python",
    "spark.driver.cores": "2", 
    "spark.driver.memory": "6000m",  
    "spark.executor.memory": "6000m",
    "spark.executor.cores": "2", 
     "spark.dynamicAllocation.enabled": "true",
     "spark.dynamicAllocation.minExecutors": "2"
    
      
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1160,application_1620749185953_126168,pyspark,idle,Link,Link,
1161,application_1620749185953_126259,pyspark,idle,Link,Link,


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import QuantileDiscretizer
import pyspark.sql.functions as F
from pyspark.sql.functions import lit,log, when, regexp_replace, col
from pyspark.sql.types import DoubleType,IntegerType, StringType, StructField, StructType
import numpy as np
import pandas as pd
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import dataframe
def spark_shape(self):
    return (self.count(),len(self.columns))
dataframe.DataFrame.shape = spark_shape

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
from time import time
from datetime import timedelta

spark = SparkSession\
    .builder\
    .appName("example-spark")\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1162,application_1620749185953_126280,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Exploratory data analysis

## read the csv file, get the shape of the dataset, check data types

In [3]:
# 1)
# read the dataset
df=spark.read.csv("dataset_test.csv",header=True, inferSchema="true").withColumn("rowId", F.monotonically_increasing_id())
df=df.drop('_c0') 
# check the shape of the dataset
row_num=int(df.count())
col_num=len(df.columns)
print('number of rows = {}, number of columns = {}.'.format(row_num, col_num))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

number of rows = 2540047, number of columns = 50.

In [4]:
# 2) 
# check the data types

# select columns with distinct values less than 10 and convert them to string:
convert_to_cat = list(filter(None, [field if df.groupBy(field ).count().count() < 10 else None for field in df.columns ]))
convert_to_cat=['is_sm_ips_ports', 'ct_state_ttl', 'is_ftp_login', 'ct_ftp_cmd']
for field in convert_to_cat:
    df=df.withColumn(field,F.col(field).cast(StringType()))
    
## the function dtype(df) returns two lists populated with the column names of continuous (1st list) and categorical (2nd list) columns
## the function prints the values in two lists as well as their lenghts
## the input of the function is a spark dataframe
def dtype(df):
    categorical = [field for (field, dataType) in df.dtypes if dataType == 'string']
    numerical = [field for (field, dataType) in df.dtypes if dataType == 'double' or dataType == 'int' or dataType == 'bigint' ]
    #list of categorical type columns' names
    print('categorical => ', categorical, 'len = ', len(categorical))
    print()
    #list of continuous type columns' names
    print('numerical => ', numerical, 'len = ', len(numerical))
    return numerical, categorical
numerical, categorical = dtype(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

categorical =>  ['srcip', 'sport', 'dstip', 'dsport', 'proto', 'state', 'service', 'is_sm_ips_ports', 'ct_state_ttl', 'is_ftp_login', 'ct_ftp_cmd', 'attack_cat'] len =  12

numerical =>  ['dur', 'sbytes', 'dbytes', 'sttl', 'dttl', 'sloss', 'dloss', 'Sload', 'Dload', 'Spkts', 'Dpkts', 'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz', 'trans_depth', 'res_bdy_len', 'Sjit', 'Djit', 'Stime', 'Ltime', 'Sintpkt', 'Dintpkt', 'tcprtt', 'synack', 'ackdat', 'ct_flw_http_mthd', 'ct_srv_src', 'ct_srv_dst', 'ct_dst_ltm', 'ct_src_ ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'Label', 'rowId'] len =  38

In [5]:
# 3)
# check descriptive statistics
## the function statist
def statistics(df):
    statistics=df.describe().toPandas().transpose()
    statistics.columns=statistics.iloc[0,:].values
    statistics=statistics.iloc[1:,:]
    return statistics
st=statistics(df)
# remove string values and convert to integer
st_=st[~pd.to_numeric(st['max'], errors='coerce').isnull()]
st_=st_[~pd.to_numeric(st_['min'], errors='coerce').isnull()]
st_=st_.astype('float')
st_=st_.round(decimals=2)
st_.head(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

            count         mean        stddev  min           max
dur     2540047.0         0.66  1.392000e+01  0.0  8.786640e+03
sbytes  2540047.0      4339.60  5.640599e+04  0.0  1.435577e+07
dbytes  2540047.0     36427.59  1.610960e+05  0.0  1.465753e+07
sttl    2540047.0        62.78  7.462000e+01  0.0  2.550000e+02
dttl    2540047.0        30.77  4.285000e+01  0.0  2.540000e+02
sloss   2540047.0         5.16  2.252000e+01  0.0  5.319000e+03
dloss   2540047.0        16.33  5.659000e+01  0.0  5.507000e+03
Sload   2540047.0  36956447.51  1.186043e+08  0.0  5.988000e+09
Dload   2540047.0   2450861.22  4.224863e+06  0.0  1.287619e+08
Spkts   2540047.0        33.29  7.628000e+01  0.0  1.064600e+04

In [6]:
# 4) 
# check disticnt values of categorical columns and optimize categories
list_of_columns_to_transform=[]
for col in categorical:
    d=df.groupBy(col).count().count()
    print("Number of distinct values in column {} is {}".format(col,d))
    if d > 15: 
        list_of_columns_to_transform.append(col)
        
#list_of_columns_to_transform=['srcip', 'dstip', 'dsport', 'sport', 'proto', 'state']
print(' after optimization:')
for column in list_of_columns_to_transform:
    c=df.groupBy(column).count()
    c=c.withColumn('ratio', c['count']/df.count()) #calculate a normalized count of unique values
    c=c.where(c['ratio']>0.009).select(column).toPandas() # to create a list of values which occur more that 0.01 times 
    # if a value repeates rarely than in 1% of rows, change this value to a sting "other"
    df=df.withColumn(column, when(F.col(column).isin([i[0] for i in c.values]), F.col(column)).otherwise('other'))
    print("Number of distinct values in column {} is {}".format(column,df.groupBy(column).count().count()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of distinct values in column srcip is 44
Number of distinct values in column sport is 64600
Number of distinct values in column dstip is 47
Number of distinct values in column dsport is 64630
Number of distinct values in column proto is 135
Number of distinct values in column state is 16
Number of distinct values in column service is 13
Number of distinct values in column is_sm_ips_ports is 2
Number of distinct values in column ct_state_ttl is 7
Number of distinct values in column is_ftp_login is 5
Number of distinct values in column ct_ftp_cmd is 9
Number of distinct values in column attack_cat is 14
 after optimization:
Number of distinct values in column srcip is 19
Number of distinct values in column sport is 4
Number of distinct values in column dstip is 20
Number of distinct values in column dsport is 11
Number of distinct values in column proto is 3
Number of distinct values in column state is 4

In [7]:
# 5.1) 
#check lable attribute and the distribution of classes

# label attribute value counts
print(df.groupBy('Label').count().show())

#check the ratio of every attack class, select top 5
rows=df.count()
df.groupBy('attack_cat').count().orderBy('count', ascending=False).show()
c=df.groupBy('attack_cat').count().orderBy('count', ascending=False).collect()
[str(round(c[i][1]/rows*100, 1))+str(" %") for i in range(5)]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------+
|Label|  count|
+-----+-------+
|    1| 321283|
|    0|2218764|
+-----+-------+

None
+----------------+-------+
|      attack_cat|  count|
+----------------+-------+
|            null|2218764|
|         Generic| 215481|
|        Exploits|  44525|
|        Fuzzers |  19195|
|             DoS|  16353|
| Reconnaissance |  12228|
|         Fuzzers|   5051|
|        Analysis|   2677|
|        Backdoor|   1795|
|  Reconnaissance|   1759|
|      Shellcode |   1288|
|       Backdoors|    534|
|       Shellcode|    223|
|           Worms|    174|
+----------------+-------+

['87.4 %', '8.5 %', '1.8 %', '0.8 %', '0.6 %']

In [8]:
# 5.2) 
#create new target columns 
df=df.withColumnRenamed('Label', 'target') # 12.6 %
df=df.withColumn('target1', when(df['attack_cat']=='Generic', 1).otherwise(0)) # 8.5 %
df=df.withColumn('target2', when(df['attack_cat']=='Exploits', 1).otherwise(0)) # 1.8 %
df=df.withColumn('target3', when(df['attack_cat']==' Fuzzers ', 1).otherwise(0)) # 0.8 %
# drop attack_cat column with types of anomaly, and datetime columns 
# p.s. datetime columns are useful for feature enginerring and window functions however it is not in the scope of the presented use-case
df=df.drop(*['attack_cat','Stime','Ltime']) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# cast rowId column to string
df = df.withColumn('rowId',df['rowId'].cast(StringType()))
# run the function dtype() to get lists of numerical and categorical columns
numerical, categorical = dtype(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

categorical =>  ['srcip', 'sport', 'dstip', 'dsport', 'proto', 'state', 'service', 'is_sm_ips_ports', 'ct_state_ttl', 'is_ftp_login', 'ct_ftp_cmd', 'rowId'] len =  12

numerical =>  ['dur', 'sbytes', 'dbytes', 'sttl', 'dttl', 'sloss', 'dloss', 'Sload', 'Dload', 'Spkts', 'Dpkts', 'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz', 'trans_depth', 'res_bdy_len', 'Sjit', 'Djit', 'Sintpkt', 'Dintpkt', 'tcprtt', 'synack', 'ackdat', 'ct_flw_http_mthd', 'ct_srv_src', 'ct_srv_dst', 'ct_dst_ltm', 'ct_src_ ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'target', 'target1', 'target2', 'target3'] len =  38

In [10]:
# 6.1) 
# proportion of missing values in a column
null_fields=[]
null_fields_cat=[]
for field in df.columns:
    if field in categorical:
        null_num=int(df.where(F.col(field).isNull()).count())
        if null_num>0:
            proportion=round(null_num/row_num*100, 2)
            null_fields_cat.append(tuple([field, proportion]))
            print('{} % of missing values - {}, categorical'.format(proportion, field))
    elif field in numerical:
        null_num=int(df.where(F.col(field).isNull()).count())
        if null_num>0: 
            proportion=round(null_num/row_num*100, 2)
            null_fields.append(tuple([field, proportion]))
            print('{} % of missing values - {}, numerical'.format(proportion, field))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

53.08 % of missing values - ct_flw_http_mthd, numerical
56.29 % of missing values - is_ftp_login, categorical

In [11]:
# 6.2) 
#replace missing values with -99

#the functin check_nulls(df, value_to_fillin) checks if continuous columns that have null values inlude a value,
# which a user wants to replace null values with (value_to_fillin).
# the function prints out the result

def check_nulls(df, value_to_fillin):
    n=int(value_to_fillin)
    print("Nulls are going to be filled in with: {}  \n ".format(n))
    numerical = [field for (field, dataType) in df.dtypes if dataType == 'double' or dataType == 'int' or dataType == 'bigint' ]
    for c in numerical:
        if int(df.where(F.col(c).isNull()).count())>0:
            print('Column with null values:', c)
            n_count=int(df.where(F.col(c)==n).count())
            if n_count>0:
                print('Oops, the column has {} rows with value'.format(n_count) , n, '. Try another value \n ')
            else: 
                print("All clean!")

check_nulls(df, -99)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nulls are going to be filled in with: -99  
 
Column with null values: ct_flw_http_mthd
All clean!

In [12]:
#6.3)
# make a copy of the dataframe with null values (needed for Weight of Evidence calculation)
df_nulls=df
# replace null values for numerical columns
df=df.fillna(-99)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#7)
# check outlier and which proportion of data they occupy

# create a dataframe with standard deviation values of each column
#in order to remove deviated values with more that 3 or 5 std from the mean
numerical = [field for (field, dataType) in df.dtypes if dataType == 'double' or dataType == 'int' or dataType == 'bigint']
stat=df.select(*numerical).describe()
std=stat.where(F.col('summary')=='stddev')
std=std.select(*numerical)
for i in std.columns:
    std = std.withColumn(i,F.col(i).cast(DoubleType()))
std=std.toPandas().transpose()
std.columns=['std']
std['col_name']=std.index.values
col_name_list=std.index.values
std['3std']=std['std'].apply(lambda x: x*3)
std['5std']=std['std'].apply(lambda x: x*5)
std=spark.createDataFrame(std)

def outliers(df, std, list_of_columns, column_std='3std'):
    start=df.count()
    for l in list_of_columns:
        try:
            before=df.count()
            scalar_std=std.where(F.col('col_name')==l).select(column_std).collect()[0][0]
            df=df.withColumn('{}_{}'.format(column_std, l),F.lit(scalar_std))
            b=column_std+'_'+l
            df=df.filter(F.abs(F.col(l))<F.col(b))
            now=df.count()
            print(l, 'rows kept =', now, '|| % of all set =', round(now/start*100, 2), '|| % from what is left' , round(now/before*100, 2) )
        except ZeroDivisionError as err:
            print(err)
outliers (df, std, numerical)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dur rows kept = 2533455 || % of all set = 99.74 || % from what is left 99.74
sbytes rows kept = 2532020 || % of all set = 99.68 || % from what is left 99.94
dbytes rows kept = 2461920 || % of all set = 96.92 || % from what is left 97.23
sttl rows kept = 2132942 || % of all set = 83.97 || % from what is left 86.64
dttl rows kept = 2104433 || % of all set = 82.85 || % from what is left 98.66
sloss rows kept = 2100179 || % of all set = 82.68 || % from what is left 99.8
dloss rows kept = 2100156 || % of all set = 82.68 || % from what is left 100.0
Sload rows kept = 2062626 || % of all set = 81.2 || % from what is left 98.21
Dload rows kept = 1905712 || % of all set = 75.03 || % from what is left 92.39
Spkts rows kept = 1904533 || % of all set = 74.98 || % from what is left 99.94
Dpkts rows kept = 1903790 || % of all set = 74.95 || % from what is left 99.96
swin rows kept = 1903790 || % of all set = 74.95 || % from what is left 100.0
dwin rows kept = 1903790 || % of all set = 74.95 || % fro

## Weight of Evidence and Information Value

In [14]:
# 8)
# cast target column to categorical
df_nulls = df_nulls.withColumn("target3",  when(df_nulls["target3"] == 1, "YES").otherwise("NO"))
df_nulls=df_nulls.drop('target', 'target1', 'target2')
# change the name of the primary key column to 'PK_column'
df_nulls=df_nulls.withColumnRenamed('rowId','PK_column')
df_nulls=df_nulls.withColumnRenamed('target3','target')
columns_to_remove=set(['Stime', 'Label', 'Ltime', 'dtcpb', 'rowId'])
n, c = dtype(df_nulls)
list_of_columns=list(set(n)-columns_to_remove)

s = StructType([StructField("PK_column", StringType(), True)])
d = spark.createDataFrame([], s)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

categorical =>  ['srcip', 'sport', 'dstip', 'dsport', 'proto', 'state', 'service', 'is_sm_ips_ports', 'ct_state_ttl', 'is_ftp_login', 'ct_ftp_cmd', 'PK_column', 'target'] len =  13

numerical =>  ['dur', 'sbytes', 'dbytes', 'sttl', 'dttl', 'sloss', 'dloss', 'Sload', 'Dload', 'Spkts', 'Dpkts', 'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz', 'trans_depth', 'res_bdy_len', 'Sjit', 'Djit', 'Sintpkt', 'Dintpkt', 'tcprtt', 'synack', 'ackdat', 'ct_flw_http_mthd', 'ct_srv_src', 'ct_srv_dst', 'ct_dst_ltm', 'ct_src_ ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm'] len =  34

In [15]:
print(list_of_columns)
print(df_nulls.groupBy('target').count().show())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['ct_dst_sport_ltm', 'dloss', 'Dintpkt', 'Dload', 'Spkts', 'synack', 'ct_srv_src', 'sttl', 'dttl', 'tcprtt', 'ct_srv_dst', 'ct_src_dport_ltm', 'ct_dst_src_ltm', 'dmeansz', 'Sload', 'Djit', 'dwin', 'ackdat', 'Sintpkt', 'ct_dst_ltm', 'sloss', 'ct_flw_http_mthd', 'Sjit', 'res_bdy_len', 'smeansz', 'sbytes', 'Dpkts', 'trans_depth', 'dur', 'dbytes', 'stcpb', 'ct_src_ ltm', 'swin']
+------+-------+
|target|  count|
+------+-------+
|   YES|  19195|
|    NO|2520852|
+------+-------+

None

## Woe and IV for continuous columns

In [16]:
# the function woe will print the results of WoE and IV calculations for each variable(predictor).
# each continuous column is transformed into categorical with QuantileDiscretizer function, 
# the output of which is 10 equaly distributed bins/value-ranges including "missing" category for null values 
# WoE and IV are calculated for each bin value

# the list total_iv_list will be populated with total IV (sum of bins IV) of a variable.
# the list variable_list will be populated with the names of variables

def woe(main_df, d, numbuck=10, limit=-99, list_of_columns=list):
    variable_list=[]
    total_iv_list=[]
    nan=int(limit)
    nan_bin=str(nan)+','+str(nan)
    n=int(main_df.groupBy('target').count().collect()[1][1])
    y=int(main_df.groupBy('target').count().collect()[0][1])
    for predictor in list_of_columns:
        predictor_range=predictor+'_range'
        predictor_bin=predictor+'_bin'
        # binning
        df = QuantileDiscretizer(numBuckets=numbuck, inputCol=predictor,outputCol=predictor_range).fit(main_df).transform(main_df)
        df=df.fillna(nan, subset=[predictor, predictor_range])
        df_gr = df.groupBy(predictor_range) \
            .agg(F.min(predictor).alias("min"),F.max(predictor).alias("max")) \
            .withColumn(predictor_bin,F.concat(F.col("min"),F.lit(","),F.col("max")))  \
            .withColumn(predictor_bin, regexp_replace(predictor_bin, nan_bin, 'missing'))
        df_gr=df_gr.drop('min', 'max')
        df=df.join(df_gr, predictor_range, 'left').drop(predictor_range)
        d = df.select('PK_column', predictor_bin).join(d, df.PK_column == d.PK_column, 'left').drop(d.PK_column)
        # WoE calculation
        df = df.crosstab(predictor_bin,'target')
        variable_list.append(predictor)
        df = df.withColumn("YES_%",(F.col("YES")/y))
        df = df.withColumn("NO_%",(F.col("NO")/n))
        df = df.withColumn("WoE", F.log(F.col("NO_%")/F.col("YES_%")))
        df = df.withColumn("IV", F.log(F.col("NO_%") / F.col("YES_%")) * (F.col("NO_%") - F.col("YES_%")))
        df = df.withColumnRenamed(df.columns[0],predictor_bin)
        df=df.withColumn('order', regexp_replace(predictor_bin, ",-", '.'))
        df= df.withColumn('order', regexp_replace("order", ",", '.'))
        df = df.withColumn("order", df["order"].cast(DoubleType()))
        df = df.orderBy("order")
        df=df.withColumn("WoE",  F.round(df["WoE"], 2)).withColumn("IV",  F.round(df["IV"], 2))
        df=df.withColumn("NO_%",  F.round(df["NO_%"]*100, 2)).withColumn("YES_%",  F.round(df["YES_%"]*100, 2))
        df=df.select(predictor_bin, 'NO_%', 'YES_%', 'WoE', 'IV')
        df.show()
        iv_sum = df.groupBy().sum().select("sum(IV)").collect()[0][0]
        print('Variable ', predictor, ' total Information Value => ', iv_sum)
        total_iv_list.append(iv_sum)
    return d, variable_list, total_iv_list


d, variable_list, total_iv_list = woe(df_nulls, d, list_of_columns=list_of_columns)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+-----+-----+----+
|ct_dst_sport_ltm_bin| NO_%|YES_%|  WoE|  IV|
+--------------------+-----+-----+-----+----+
|                 1,1| 77.5|82.37|-0.06| 0.0|
|                2,14|12.45|17.63|-0.35|0.02|
|               15,60|10.05|  0.0| null|null|
+--------------------+-----+-----+-----+----+

Variable  ct_dst_sport_ltm  total Information Value =>  0.02
+---------+-----+-----+-----+----+
|dloss_bin| NO_%|YES_%|  WoE|  IV|
+---------+-----+-----+-----+----+
|      0,3|48.17| 93.8|-0.67| 0.3|
|      4,4| 6.65| 5.14| 0.26| 0.0|
|      5,7|12.57| 0.79| 2.76|0.33|
|     8,15|11.85| 0.14| 4.43|0.52|
|    16,26|10.11| 0.04| 5.62|0.57|
|  27,5507|10.65| 0.09| 4.73| 0.5|
+---------+-----+-----+-----+----+

Variable  dloss  total Information Value =>  2.2199999999999998
+--------------------+-----+-----+-----+----+
|         Dintpkt_bin| NO_%|YES_%|  WoE|  IV|
+--------------------+-----+-----+-----+----+
|59.08359100000000...| 9.57|56.95|-1.78|0.84|
|0.415412,0.82754

In [17]:
# Information value filter

# create pandas dataframe with variable names and their IV values
tuples=list(zip(variable_list, total_iv_list))
IV_pandasDF=pd.DataFrame(tuples, columns=['variable_name', 'IV'])
IV_pandasDF.sort_values('IV', ascending=False).head(20)

# The function to print the columns with IV lower than 0.1, that is, attributes that have low predictive power
def iv_check(variable_list, total_iv_list):
    tuples=list(zip(variable_list, total_iv_list))
    IV_pandasDF=pd.DataFrame(tuples, columns=['variable_name', 'IV'])
    IV_pandasDF=IV_pandasDF.sort_values('IV', ascending=False)
    to_drop=[]
    top10=IV_pandasDF.variable_name.values[:10]
    for i in range(IV_pandasDF.count()[0]):
        if IV_pandasDF.iloc[i,1]<0.1:
            column_to_drop=IV_pandasDF.iloc[i,0]
            to_drop.append(column_to_drop)
    print("Columns to drop IV < 0.1 {}".format(sorted(to_drop)))
    print("Top 10 columns {}".format(top10))
    print()
    return list(top10)

top_10_list=iv_check(variable_list, total_iv_list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columns to drop IV < 0.1 ['ct_dst_sport_ltm', 'dwin', 'res_bdy_len', 'stcpb', 'swin', 'trans_depth']
Top 10 columns ['Dload' 'Dpkts' 'dmeansz' 'dbytes' 'Dintpkt' 'Sjit' 'Sload' 'dloss'
 'sttl' 'smeansz']

## Woe and IV for categorical columns

In [18]:

def woe_cat(main_df, list_of_cat=list, var_list=list, iv_list=list):
    main_df=main_df.select(*['PK_column', 'target'], *list_of_cat).fillna('missing')
    n=int(main_df.groupBy('target').count().collect()[1][1])
    y=int(main_df.groupBy('target').count().collect()[0][1])
    for predictor in list_of_cat:
        predictor_bin=predictor+'_bin'
        df=main_df.crosstab(predictor,'target')
        df = df.withColumn("YES_%",(F.col("YES")/y))
        df = df.withColumn("NO_%",(F.col("NO")/n))
        df = df.withColumn("WoE", F.log(F.col("NO_%")/F.col("YES_%")))
        df = df.withColumn("IV", F.log(F.col("NO_%") / F.col("YES_%")) * (F.col("NO_%") - F.col("YES_%")))
        df = df.withColumnRenamed(df.columns[0],predictor_bin)
        df=df.withColumn("WoE",  F.round(df["WoE"], 2)).withColumn("IV",  F.round(df["IV"], 2))
        df=df.withColumn("NO_%",  F.round(df["NO_%"]*100, 2)).withColumn("YES_%",  F.round(df["YES_%"]*100, 2))
        df=df.select(predictor_bin, 'NO_%', 'YES_%', 'WoE', 'IV')
        df.show()
        iv_sum = df.groupBy().sum().select("sum(IV)").collect()[0][0]
        print('Variable ', predictor, ' total Information Value => ', iv_sum)
        var_list.append(predictor)
        iv_list.append(iv_sum)
    return var_list, iv_list
variable_list_cat=[]
total_iv_list_cat=[]
var_list, iv_list=woe_cat(df_nulls, list(set(categorical)-{'rowId'}), var_list=variable_list_cat, iv_list=total_iv_list_cat)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----+-----+-----+----+
|     dstip_bin|NO_%|YES_%|  WoE|  IV|
+--------------+----+-----+-----+----+
| 149.171.126.2|7.84|  0.0| null|null|
| 149.171.126.7|7.55|  0.0| null|null|
|149.171.126.12|1.15| 7.38|-1.86|0.12|
| 149.171.126.8|7.44|  0.0| null|null|
| 149.171.126.1|7.84|  0.0| null|null|
|149.171.126.15|1.94|10.39|-1.68|0.14|
| 149.171.126.4|7.84|  0.0| null|null|
|149.171.126.10|1.65|12.26| -2.0|0.21|
| 149.171.126.5|7.81|  0.0| null|null|
| 149.171.126.0|7.81|  0.0| null|null|
|149.171.126.14|1.72|11.45| -1.9|0.18|
|149.171.126.17|0.98|11.59|-2.47|0.26|
| 149.171.126.9|7.56|  0.0| null|null|
| 149.171.126.6|7.53|  0.0| null|null|
| 149.171.126.3|7.85|  0.0| null|null|
|  175.45.176.0|1.85|  0.0| null|null|
|149.171.126.18|4.04|  5.8|-0.36|0.01|
|  175.45.176.1|2.93|  0.0| null|null|
|         other|2.96|41.13|-2.63| 1.0|
|  175.45.176.3|3.72|  0.0| null|null|
+--------------+----+-----+-----+----+

Variable  dstip  total Information Value =>  1.92
+---------+--

In [19]:
top_10_categorical=iv_check(var_list, iv_list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columns to drop IV < 0.1 ['is_sm_ips_ports']
Top 10 columns ['ct_state_ttl' 'dstip' 'srcip' 'state' 'service' 'dsport' 'is_ftp_login'
 'ct_ftp_cmd' 'proto' 'sport']

In [20]:
df=df.drop(*['ct_dst_sport_ltm', 'dwin', 'res_bdy_len', 'stcpb', 'swin', 'trans_depth', 'is_sm_ips_ports'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## One Hot Encoding

In [21]:
#
def encoding_only(df, enc, categorical):
    for predictor in categorical:
        print('encoding of categorical variable...', predictor)
        df_pivot = df.select('rowId', predictor).groupBy('rowId').pivot(predictor).count()
        col_names = list(set(df_pivot.columns) - {'rowId'})
        for c_name in col_names:
            df_pivot = df_pivot.withColumnRenamed(c_name, predictor + '_' + c_name)
        df_pivot = df_pivot.fillna(0)
        enc = df_pivot.join(enc, df_pivot.rowId == enc.rowId, 'left').drop(enc.rowId)
    return enc

categorical = [field for (field, dataType) in df.dtypes if dataType == 'string']
s = StructType([StructField("rowId", StringType(), True)])
enc = spark.createDataFrame([], s)
enc_categorical=encoding_only(df, enc, set(categorical)-{'rowId'})

#
df=df.drop(*list(set(categorical)-{'rowId'}))
df=df.join(enc_categorical, df.rowId == enc_categorical.rowId, 'inner').drop(enc_categorical.rowId)
print("Number of rows = {} and columns = {}".format(df.count(), len(df.columns)))

#
for name in df.columns:
      df = df.withColumnRenamed(name, name.replace('.', '_'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

encoding of categorical variable... dstip
encoding of categorical variable... sport
encoding of categorical variable... dsport
encoding of categorical variable... ct_state_ttl
encoding of categorical variable... srcip
encoding of categorical variable... state
encoding of categorical variable... is_ftp_login
encoding of categorical variable... proto
encoding of categorical variable... service
encoding of categorical variable... ct_ftp_cmd
Number of rows = 2540047 and columns = 128

## Vectorization and scaling

In [22]:
#
def vectorA(df, num_cols):
    assembler = VectorAssembler(inputCols=sorted(num_cols), outputCol='features')
    df = assembler.transform(df)
    return df
def scaling(df):
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
    scalerModel = scaler.fit(df)
    scaledDF = scalerModel.transform(df)
    return scaledDF

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
#
vector_cols=list(set(df.columns)-{'target', 'rowId', 'target1', 'target2', 'target3'})
df_v=vectorA(df, vector_cols)
scaledDF=scaling(df_v)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Correlation

In [31]:
result = Correlation.corr(scaledDF, 'scaledFeatures', method='pearson')
result = result.collect()[0]["pearson({})".format('scaledFeatures')].values
corr_pd_df=pd.DataFrame(result.reshape(-1, len(vector_cols)),
columns=vector_cols, index=vector_cols)
correlated_features = set()
for i in range(len(corr_pd_df.columns)):
    for j in range(i):
        if abs(corr_pd_df.iloc[i, j]) > 0.5 and corr_pd_df.columns[i] not in correlated_features:
            correlated_features.add(corr_pd_df.columns[i])
            print("correlated pair => ", corr_pd_df.columns[i], ' and ', corr_pd_df.index[j])
print(len(correlated_features))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

correlated pair =>  ct_srv_dst  and  Dintpkt
correlated pair =>  dstip_149_171_126_0  and  srcip_59_166_0_5
correlated pair =>  dmeansz  and  service_-
correlated pair =>  state_INT  and  srcip_59_166_0_0
correlated pair =>  srcip_59_166_0_2  and  srcip_59_166_0_0
correlated pair =>  srcip_149_171_126_18  and  dtcpb
correlated pair =>  sport_0  and  dtcpb
correlated pair =>  state_FIN  and  dtcpb
correlated pair =>  dsport_21  and  dtcpb
correlated pair =>  smeansz  and  dmeansz
correlated pair =>  service_ftp-data  and  srcip_59_166_0_5
correlated pair =>  srcip_175_45_176_1  and  srcip_59_166_0_5
correlated pair =>  srcip_other  and  dstip_149_171_126_18
correlated pair =>  synack  and  dstip_149_171_126_8
correlated pair =>  ct_src_dport_ltm  and  dmeansz
correlated pair =>  service_smtp  and  dstip_149_171_126_18
correlated pair =>  Sintpkt  and  srcip_149_171_126_10
correlated pair =>  dstip_149_171_126_3  and  ct_src_dport_ltm
correlated pair =>  service_ssh  and  srcip_59_166_0_

## PCA

In [24]:
# the function to calculate PCA with known k
def pca(df, features, k):
    pca = PCA(k=k, inputCol=features, outputCol="pcaFeatures")
    model = pca.fit(df)
    return model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
# the function to find best k 
def pca_k(df, features="features"):
    for k in range (50, 130, 2):
        model=pca(df, features, k)
        if sum(list(model.explainedVariance)) >= 0.95: 
            print(k) 
            break
    return model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [127]:
m = pca_k(scaledDF, features='scaledFeatures')
df_pca = m.transform(scaledDF)
print(m.explainedVariance)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

72
[0.13401840939250606,0.0476674091486717,0.04434563372952574,0.03521484881358813,0.031055141809113857,0.02956061878661711,0.027837626542030782,0.02489161470123653,0.023277946764674425,0.022113424231441994,0.016973849673313134,0.01600351185556135,0.014745767147059836,0.013993543108932043,0.013697614222293212,0.013297982762827003,0.011888382818516221,0.011606460751716794,0.011279224534084932,0.010778533838407931,0.010686283164410363,0.010043937308605071,0.009503888878700576,0.009024509545724672,0.008803863477505357,0.008620174295089123,0.008392567991203184,0.008309549661629934,0.008305101627248698,0.008302685151070638,0.008297680050625065,0.008289722355920338,0.008286826986571147,0.008281915684474365,0.008276372401179809,0.008271429230509003,0.008262968806936225,0.008260056837375353,0.008253837074072469,0.008251777506601287,0.008242560007890271,0.008237869959577643,0.008228091267015809,0.008225182111560241,0.008217336480679436,0.007954657095020837,0.007868541062487862,0.007816334989080

In [25]:
#
m = pca(scaledDF, "scaledFeatures", 72)
df_pca = m.transform(scaledDF)
print(m.explainedVariance)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.12402900234057278,0.04821415446759538,0.04156625460131342,0.03506381496206099,0.03246178537025871,0.02985057714863424,0.0269544552206585,0.0250339161562357,0.023171296529383163,0.0219265378305739,0.016961468348385276,0.01594321383326257,0.014830460489077463,0.014425101981016862,0.014324242303574791,0.013187478219048622,0.01245844876685683,0.012093782511061373,0.011765999374369037,0.011391050531808769,0.010639801672139786,0.010018734336994949,0.00947955955795116,0.009367050754522474,0.00914536118341031,0.008897483523359768,0.008849798139620517,0.008845437895517332,0.008841607283304425,0.008837440994613819,0.008828817908153526,0.008825595540267992,0.008820673068458256,0.008814780035488088,0.008809914292757065,0.008800381837114384,0.008797251617101983,0.008790670017196026,0.008788274164148773,0.008778448222682164,0.00877314255621359,0.008762638744519992,0.0087602050481973,0.008751796227951136,0.008434108634929669,0.008366555922543772,0.008323543513731674,0.00829149549166395,0.008163013

In [26]:
#
df_final=df_pca
df_final=df_final.randomSplit([0.8,0.2],24)
train=df_final[0]
test=df_final[1]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Evaluation functions

In [27]:
def area_under(result, labelCol="target" ):
    evaluator=BinaryClassificationEvaluator(rawPredictionCol= "probability",labelCol=labelCol)
    print("The area under ROC curve = {}".format(round(evaluator.evaluate(result), 5)))
    evaluator=BinaryClassificationEvaluator(rawPredictionCol= "probability",labelCol=labelCol, metricName="areaUnderPR")
    print("The area under Precision-Recall curve = {}".format(round(evaluator.evaluate(result), 5)))
def conf_matrix(result, target):
    tp=result[(result[target]==1)&(result.prediction==1)].shape()[0]
    fn=result[(result[target]==1)&(result.prediction==0)].shape()[0]
    fp=result[(result[target]==0)&(result.prediction==1)].shape()[0]
    tn=result[(result[target]==0)&(result.prediction==0)].shape()[0]
    print("TP = ", tp)
    print("FN = ",fn)
    print("FP = ", fp)
    print("TN = ", tn)
    if ((tp+fn)>0)&((tp+fp)>0):
        print('recall = ', round(tp/(tp+fn), 5) )
        print('precision = ', round(tp/(tp+fp), 5))
    else: print('recall and precision is 0')
    print('accuracy =', round((tp+tn)/(tp+tn+fn+fp),5))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Random Forest

In [43]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol= "probability",labelCol="target3", metricName="areaUnderPR")
RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target3" )

paramGrid = (ParamGridBuilder()
             .addGrid(RandomForest.maxDepth, [2, 5, 10, 15, 20])
             .addGrid(RandomForest.minInfoGain, [0.0, 0.03, 0.1])
             .addGrid(RandomForest.numTrees, [5, 20, 50, 100, 128])
             .addGrid(RandomForest.impurity, ['entropy', 'gini'])
             .build())

cv = CrossValidator(estimator = RandomForest,
                      estimatorParamMaps = paramGrid,
                      evaluator = evaluator,
                      numFolds = 5)

cvModel = cv.fit(train)
result=cvModel.transform(test)
print("The area under Precision-Recall for test set after CV  is {}".format(evaluator.evaluate(result)))

RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target3", numTrees = 128, maxDepth=15 )
RF_model=RandomForest.fit(train)
result=RF_model.transform(test)
evaluator=BinaryClassificationEvaluator(rawPredictionCol= "probability",labelCol="target3", metricName="areaUnderPR")
print("The area under Precision-Recall curve = {}".format(round(evaluator.evaluate(result), 2)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under Precision-Recall curve = 0.59

In [44]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='RandomForestClassifier_40ce83b394027d5bf0ee', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False, Param(parent='RandomForestClassifier_40ce83b394027d5bf0ee', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10, Param(parent='RandomForestClassifier_40ce83b394027d5bf0ee', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'auto', Param(parent='RandomForestClassifier_40ce83b394027d5bf0ee', name='featuresCol', doc='features column name'): 'pcaFeatures', Param(parent='RandomF

In [172]:
RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target", numTrees = 128, maxDepth=15 )
RF_model=RandomForest.fit(train)
result=RF_model.transform(test)
area_under(result, labelCol="target" )
conf_matrix(result, "target")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99968
The area under Precision-Recall curve = 0.99781
TP =  62184
FN =  2113
FP =  1612
TN =  442139
recall =  0.96714
precision =  0.97473
accuracy = 0.99267

In [173]:
RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target1", numTrees = 128, maxDepth=15 )
RF_model=RandomForest.fit(train)
result=RF_model.transform(test)
area_under(result, labelCol="target1" )
conf_matrix(result, "target1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.9997
The area under Precision-Recall curve = 0.99739
TP =  42044
FN =  1059
FP =  39
TN =  464906
recall =  0.97543
precision =  0.99907
accuracy = 0.99784

In [174]:
RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target2", numTrees = 128, maxDepth=15 )
RF_model=RandomForest.fit(train)
result=RF_model.transform(test)
area_under(result, labelCol="target2" )
conf_matrix(result, "target2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99517
The area under Precision-Recall curve = 0.77001
TP =  5047
FN =  3848
FP =  2006
TN =  497147
recall =  0.5674
precision =  0.71558
accuracy = 0.98848

In [31]:
RandomForest= RandomForestClassifier(featuresCol='pcaFeatures', labelCol="target3", numTrees = 128, maxDepth=15 )
RF_model=RandomForest.fit(train)
result=RF_model.transform(test)
area_under(result, labelCol="target3" )
conf_matrix(result, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99472
The area under Precision-Recall curve = 0.58478
TP =  1048
FN =  2785
FP =  501
TN =  503756
recall =  0.27342
precision =  0.67657
accuracy = 0.99353

## GBT Classifier

In [30]:
from pyspark.ml.classification import  GBTClassifier

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol= "probability",labelCol="target3", metricName="areaUnderPR")
GBT= GBTClassifier(featuresCol='pcaFeatures', labelCol="target3")

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder().addGrid(GBT.maxDepth, [5, 10, 15, 20]).build())
# Cross validate
cv = CrossValidator(estimator = GBT,estimatorParamMaps = paramGrid,evaluator = evaluator, numFolds = 5)
# Fit the training set and transform the testing set
cvModel = cv.fit(train)
result=cvModel.transform(test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
#list of best model parameters
bestModelGBT = cvModel.bestModel
bestModelGBT.explainParams()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (undefined)\nfeaturesCol: features column name (default: features, current: pcaFeatures)\nimpurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (undefined)\nlabelCol: label column name (default: label, current: target3)\nlossType: Loss function which GBT tries to minimize (case-insensitive). Suppor

In [28]:
from pyspark.ml.classification import  GBTClassifier
GBT= GBTClassifier(featuresCol='pcaFeatures', labelCol="target", maxDepth=15 )
GBT_model=GBT.fit(train)
result=GBT_model.transform(test)
area_under(result, labelCol="target" )
conf_matrix(result, "target")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99986
The area under Precision-Recall curve = 0.99901
TP =  63188
FN =  1066
FP =  684
TN =  443152
recall =  0.98341
precision =  0.98929
accuracy = 0.99656

In [29]:
GBT= GBTClassifier(featuresCol='pcaFeatures', labelCol="target1", maxDepth=15)
GBT_model=GBT.fit(train)
result=GBT_model.transform(test)
area_under(result, labelCol="target1" )
conf_matrix(result, "target1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99986
The area under Precision-Recall curve = 0.99871
TP =  42648
FN =  579
FP =  55
TN =  464808
recall =  0.98661
precision =  0.99871
accuracy = 0.99875

In [30]:
GBT= GBTClassifier(featuresCol='pcaFeatures', labelCol="target2", maxDepth=15)
GBT_model=GBT.fit(train)
result=GBT_model.transform(test)
area_under(result, labelCol="target2" )
conf_matrix(result, "target2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99279
The area under Precision-Recall curve = 0.75387
TP =  5024
FN =  3802
FP =  2053
TN =  497211
recall =  0.56923
precision =  0.70991
accuracy = 0.98848

In [31]:
GBT= GBTClassifier(featuresCol='pcaFeatures', labelCol="target2", maxDepth=15)
GBT_model=GBT.fit(train)
result=GBT_model.transform(test)
area_under(result, labelCol="target3" )
conf_matrix(result, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.9211
The area under Precision-Recall curve = 0.08168
TP =  135
FN =  3714
FP =  6935
TN =  497306
recall =  0.03507
precision =  0.01909
accuracy = 0.97904

## Logistic Regression

In [38]:
from pyspark.ml.classification import LogisticRegression

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
LR= LogisticRegression(featuresCol='pcaFeatures', labelCol="target")
LR_model=LR.fit(train)
result=LR_model.transform(test)
area_under(result, labelCol="target" )
conf_matrix(result, "target")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99924
The area under Precision-Recall curve = 0.99458
TP =  61968
FN =  2286
FP =  3701
TN =  440135
recall =  0.96442
precision =  0.94364
accuracy = 0.98822

In [40]:
LR= LogisticRegression(featuresCol='pcaFeatures', labelCol="target1")
LR_model=LR.fit(train)
result=LR_model.transform(test)
area_under(result, labelCol="target1" )
conf_matrix(result, "target1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99931
The area under Precision-Recall curve = 0.99577
TP =  42141
FN =  1094
FP =  122
TN =  464733
recall =  0.9747
precision =  0.99711
accuracy = 0.99761

In [41]:
LR= LogisticRegression(featuresCol='pcaFeatures', labelCol="target2") 
LR_model=LR.fit(train)
result=LR_model.transform(test)
area_under(result, labelCol="target2" )
conf_matrix(result, "target2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99178
The area under Precision-Recall curve = 0.65982
TP =  4286
FN =  4536
FP =  2080
TN =  497188
recall =  0.48583
precision =  0.67326
accuracy = 0.98698

In [42]:
LR= LogisticRegression(featuresCol='pcaFeatures', labelCol="target3")
LR_model=LR.fit(train)
result=LR_model.transform(test)
area_under(result, labelCol="target3" )
conf_matrix(result, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The area under ROC curve = 0.99173
The area under Precision-Recall curve = 0.34504
TP =  189
FN =  3644
FP =  327
TN =  503930
recall =  0.04931
precision =  0.36628
accuracy = 0.99218

## Linear Support Vector Classifier

In [25]:
from pyspark.ml.classification import LinearSVC

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [164]:
SVM= LinearSVC(featuresCol='pcaFeatures', labelCol="target")
SVM_model=SVM.fit(train)
result=SVM_model.transform(test)
result=result.withColumn('prediction',F.col('prediction').cast(IntegerType()))
#area_under(result, labelCol="target" )
#conf_matrix(result, "target")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [170]:
#area_under(result, labelCol="target" )
conf_matrix(result, "target")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

TP =  64025
FN =  272
FP =  5947
TN =  437804
recall =  0.99577
precision =  0.91501
accuracy = 0.98776

In [26]:
#LinearSVC
from pyspark.ml.classification import LinearSVC
SVM= LinearSVC(featuresCol='pcaFeatures', labelCol="target1")
SVM_model=SVM.fit(train)
result=SVM_model.transform(test)
result=result.withColumn('prediction',F.col('prediction').cast(IntegerType()))
#area_under(result, labelCol="target1" )
conf_matrix(result, "target1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

TP =  42112
FN =  1123
FP =  78
TN =  464777
recall =  0.97403
precision =  0.99815
accuracy = 0.99764

In [27]:
#LinearSVC
from pyspark.ml.classification import LinearSVC
SVM= LinearSVC(featuresCol='pcaFeatures', labelCol="target2")
SVM_model=SVM.fit(train)
result=SVM_model.transform(test)
result=result.withColumn('prediction',F.col('prediction').cast(IntegerType()))
#area_under(result, labelCol="target2" )
conf_matrix(result, "target2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

TP =  4111
FN =  4711
FP =  2219
TN =  497049
recall =  0.46599
precision =  0.64945
accuracy = 0.98636

In [28]:
LinearSVC
from pyspark.ml.classification import LinearSVC
SVM= LinearSVC(featuresCol='pcaFeatures', labelCol="target3")
SVM_model=SVM.fit(train)
result=SVM_model.transform(test)
result=result.withColumn('prediction',F.col('prediction').cast(IntegerType()))
#area_under(result, labelCol="target3" )
conf_matrix(result, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

TP =  0
FN =  3833
FP =  1
TN =  504256
recall =  0.0
precision =  0.0
accuracy = 0.99245

In [168]:
result.groupBy('prediction').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+
|prediction| count|
+----------+------+
|         1| 69972|
|         0|438076|
+----------+------+

In [167]:
result=result.withColumn('prediction',F.col('prediction').cast(IntegerType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Bisecting K-means

In [25]:
from pyspark.ml.clustering import BisectingKMeans
BM=BisectingKMeans(featuresCol='pcaFeatures', k=2, seed=24)
BM_model=BM.fit(df_pca)
resultBM=BM_model.transform(df_pca)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Evaluate clustering.
cost = BM_model.computeCost(df_pca)
print("Within Set Sum of Squared Errors = " + str(cost))

resultBM.groupBy('prediction').count().show()
conf_matrix(resultBM, "target")
conf_matrix(resultBM, "target1")
conf_matrix(resultBM, "target2")
conf_matrix(resultBM, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Within Set Sum of Squared Errors = 284064978.194632
+----------+-------+
|prediction|  count|
+----------+-------+
|         1|1492269|
|         0|1047778|
+----------+-------+

TP =  58190
FN =  263093
FP =  1434079
TN =  784685
recall =  0.18112
precision =  0.03899
accuracy = 0.33183
TP =  3118
FN =  212363
FP =  1489151
TN =  835415
recall =  0.01447
precision =  0.00209
accuracy = 0.33012
TP =  27443
FN =  17082
FP =  1464826
TN =  1030696
recall =  0.61635
precision =  0.01839
accuracy = 0.41658
TP =  12374
FN =  6821
FP =  1479895
TN =  1040957
recall =  0.64465
precision =  0.00829
accuracy = 0.41469

In [45]:
from pyspark.ml.clustering import KMeans
KM=KMeans(featuresCol='pcaFeatures', k=2, seed=24)
KM_model=KM.fit(df_pca)
resultKM=KM_model.transform(df_pca)
conf_matrix(resultKM, "target1")
conf_matrix(resultKM, "target2")
conf_matrix(resultKM, "target3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

TP =  3454
FN =  212027
FP =  2057041
TN =  267525
recall =  0.01603
precision =  0.00168
accuracy = 0.10668
TP =  28367
FN =  16158
FP =  2032128
TN =  463394
recall =  0.6371
precision =  0.01377
accuracy = 0.1936
TP =  13651
FN =  5544
FP =  2046844
TN =  474008
recall =  0.71117
precision =  0.00663
accuracy = 0.19199