- Name: Jun Sung Park

# Project on Spark and Cloud Data Platform

###  What is an Intrusion Detection System? Is it possible to implement an Intrusion Detection System on this dataset? Explain the workflow as described in the paper for implementing Intrusion Detection System.

According to the paper provided, "Intrusion detection model using machine learning algorithm on Big Data environment", an Intrusion Detection System is a system that can monitor and analyze big data to catch any attacks attempted on the system/network.

Yes, it is possible to implement an Intrusion Detection System on this dataset. The following workflow is referenced from the paper:

1. Initially, the dataset has to be loaded and exported into Resilient Distributed Datasets (RDD) and DataFrame in Apache Spark. 
2. Following by this step, the original (raw) data would have to be pre-processed so that any categorical data can be converted into numerical values. Then, the dataset would have to be standardized to reduce any bias towards certain data. This way the overall accuracy of the classification model will improve. 
3. Next step is the feature selection process. A ChiSqSelector method will be applied which could be used to deal with the curse of dimensionality by reducing the dimension of the high dimensional data. By doing so, we can improve both the classification accuracy and computation time required for the model. During this step, only the optimal features will be selected by utilizing Chi-Squared test. 
4. After the feature selection, Spark-Chi-SVM (model) is trained using the training dataset.
5. The final step of the workflow is to test and evaluate the model with the test dataset (kddcup). Several metrics such as Area Under Curve (AUROC), or Area Under Precision-Recall Curve (AYPR) can be implemented to measure the model's performance.

### Step 1. Use python urllib library to extract the KDD Cup 99 data from their web repository, store it in a temporary location and then move it to the Databricks filesystem which can enable easy access to this data for analysis. Use the following commands in Databricks to get your data.

In [0]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql import Row
import pyspark.pandas as ps

spark = SparkSession.builder.appName("A3").getOrCreate()

#set the shuffle partition same as number of cpu cores to improve performance 
spark.conf.set("spark.sql.shuffle.partitions", 4)

# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

In [0]:
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size,modificationTime
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903,1667405207000


### Step 2. After storing the data into the Databricks filesystem. Load your data from the disk into Spark's RDD. Print 10 values of your RDD and verify the type of data structure of your data (RDD).

In [0]:
sp_rdd = sc.textFile("dbfs:/kdd/kddcup_data.gz")

sp_rdd.take(10)

Out[5]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,59,59,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,212,1940,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,2,0.00,0.00,0.00,0.00,1.

In [0]:
# verify the type of data structure of your data
type(sp_rdd)

Out[6]: pyspark.rdd.RDD

### Step 3.  Split the data. (Each entry in your RDD is a comma-separated line of data, which you first need to split before you can parse and build your dataframe.) Show the total number of features (columns) and print results. See this link for more details. 
http://kdd.ics.uci.edu/databases/kddcup99/task.html

In [0]:
# Create a comma-separated RDD
cs_rdd = sp_rdd.map(lambda x: x.split(","))

# Take a look at the first row.
print(cs_rdd.take(1))

[['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']]


In [0]:
# Show the total number of features (columns)
len(cs_rdd.take(1)[0])

Out[8]: 42

### Step 4. Now extract these 6 columns (duration, protocol_type, service, src_bytes, dst_bytes, flag and label) from your dataset. Build a new RDD and dataframe. Print schema and display 10 values.

In [0]:
# Extract the following features: 
    # duration, protocol_type, service, src_bytes, dst_bytes, flag and label

# Build new RDD
rdd_6 = cs_rdd.map(lambda x: Row(
    duration=int(x[0]), 
    protocol_type=x[1],
    service=x[2],
    flag=x[3],
    src_bytes=int(x[4]),
    dst_bytes=int(x[5]),
    label=x[-1]
    )
)

# Create DF based on rdd_6
df = sqlContext.createDataFrame(rdd_6)

# Print Schema
df.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- label: string (nullable = true)



In [0]:
# Display 10 values
df.show(10)

+--------+-------------+-------+----+---------+---------+-------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|  label|
+--------+-------------+-------+----+---------+---------+-------+
|       0|          tcp|   http|  SF|      181|     5450|normal.|
|       0|          tcp|   http|  SF|      239|      486|normal.|
|       0|          tcp|   http|  SF|      235|     1337|normal.|
|       0|          tcp|   http|  SF|      219|     1337|normal.|
|       0|          tcp|   http|  SF|      217|     2032|normal.|
|       0|          tcp|   http|  SF|      217|     2032|normal.|
|       0|          tcp|   http|  SF|      212|     1940|normal.|
|       0|          tcp|   http|  SF|      159|     4087|normal.|
|       0|          tcp|   http|  SF|      210|      151|normal.|
|       0|          tcp|   http|  SF|      212|      786|normal.|
+--------+-------------+-------+----+---------+---------+-------+
only showing top 10 rows



### Step 5. Get the total number of connections based on the protocol_type and based on the service. Show result in an ascending order. Plot the bar graph for both.

In [0]:
groupby_ptype = df.groupby('protocol_type').count().orderBy('count', ascending = True)
groupby_ptype.display()

protocol_type,count
udp,20354
tcp,190065
icmp,283602


In [0]:
# Bar plot for the total number of connections based on the protocol_type
groupby_ptype_pd = ps.DataFrame(groupby_ptype)
groupby_ptype_pd = groupby_ptype_pd.set_index('protocol_type')
groupby_ptype_pd.plot.bar()

In [0]:
groupby_service = df.groupby('service').count().orderBy('count', ascending = True)
groupby_service.display()

service,count
tftp_u,1
pm_dump,1
red_i,1
tim_i,7
X11,11
urh_i,14
IRC,43
Z39_50,92
netstat,95
ctf,97


In [0]:
# Bar plot for the total number of connections based on the service
groupby_service_pd = ps.DataFrame(groupby_service)
groupby_service_pd = groupby_service_pd.set_index('service')
groupby_service_pd.plot.bar()

### Step 6. Do a further exploratory data analysis, including other columns of this dataset and plot graphs. Plot at least 3 different charts and explain them.

In [0]:
# More features will be explored by creating a new RDD.

rdd_7 = cs_rdd.map(lambda x: Row(
    duration=int(x[0]), 
    protocol_type=x[1],
    service=x[2],
    flag=x[3],
    src_bytes=int(x[4]),
    dst_bytes=int(x[5]),
    land=int(x[6]),
    wrong_fragment=int(x[7]),
    urgent=int(x[8]),
    hot=int(x[9]),
    num_failed_logins=int(x[10]),
    logged_in=int(x[11]),
    num_compromised=int(x[12]),
    root_shell=int(x[13]),
    su_attempted=int(x[14]),
    num_root=int(x[15]),
    num_file_creations=int(x[16]),
    num_shells=int(x[17]),
    num_access_files=int(x[18]),
    num_outbound_cmds=int(x[19]),
    is_host_login=int(x[20]),
    is_guest_login=int(x[21]),
    label=x[-1]
    )
)

# Create DF based on rdd_6
df7 = sqlContext.createDataFrame(rdd_7)

# Print Schema
df7.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- land: long (nullable = true)
 |-- wrong_fragment: long (nullable = true)
 |-- urgent: long (nullable = true)
 |-- hot: long (nullable = true)
 |-- num_failed_logins: long (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- num_compromised: long (nullable = true)
 |-- root_shell: long (nullable = true)
 |-- su_attempted: long (nullable = true)
 |-- num_root: long (nullable = true)
 |-- num_file_creations: long (nullable = true)
 |-- num_shells: long (nullable = true)
 |-- num_access_files: long (nullable = true)
 |-- num_outbound_cmds: long (nullable = true)
 |-- is_host_login: long (nullable = true)
 |-- is_guest_login: long (nullable = true)
 |-- label: string (nullable = true)



In [0]:
# I will select "flag" and "label" column, and apply groupby to them. I will count the occurence of each grouped by row. 
# "flag" column indicates the normal or error status of the connection [1]
# "label" column indicates the class, which is located at the very end of the dataset.

groupby_flag = df7["flag", "label"].groupby("flag", "label").count().sort("count").orderBy( 'flag')
groupby_flag.display()

flag,label,count
OTH,normal.,1
OTH,portsweep.,7
REJ,neptune.,20002
REJ,portsweep.,220
REJ,ipsweep.,83
REJ,normal.,5341
REJ,satan.,1229
RSTO,buffer_overflow.,1
RSTO,warezclient.,1
RSTO,neptune.,455


In [0]:
# It can be observed that the flag named "SF" has the most counts. Out of all counted SF, the label "smurf" is counted 280,790 times, and the label "normal" is counted 91,709 times. Several other labels exist for this flag, but they are minority compared to "smurf" and "normal". 

# It can also be noticed that from the entire 97278 rows with the label "normal.", 91709 of them has the flag "SF". Hence, we know that the majority of the "normal" cases are flagged "SF".  

print("Number of total rows with label as ""normal."" :", df7.filter(df7.label=="normal.").count())

# The second most counted flag is S0, where class "nuptune" is the absolute mojority, consisting of 86744 counts.

# The third most counted flag is REJ, where two majority classes are "neptune" and "normal", which are counted 20002 and 5341 times respectively.

groupby_flag_pd = ps.DataFrame(groupby_flag)
groupby_flag_pd = groupby_flag_pd.set_index('flag')
# groupby_flag_pd.display()
groupby_flag_pd.plot.bar(color = "label")

Number of total rows with label as normal. : 97278


In [0]:
# Average src_bytes for each label was calculated and shown in the table below.
# src_bytes indicates the number of data bytes from source to destination [1].
# The average src_bytes for class "normal" is 1157, which is relatively low compared to the top 3 classes (portsweep, warezclient, and back)

groupby_src = df7.groupby("label").agg(avg("src_bytes").alias("avg_src_bytes")).orderBy("avg_src_bytes", ascending = False)
groupby_src.display()

label,avg_src_bytes
portsweep.,666707.4365384616
warezclient.,300219.562745098
back.,54156.355878347706
pod.,1462.6515151515152
buffer_overflow.,1400.4333333333334
normal.,1157.047523592179
smurf.,935.7722995833184
multihop.,435.1428571428572
imap.,347.5833333333333
rootkit.,294.7


In [0]:
groupby_src_pd = ps.DataFrame(groupby_src)
groupby_src_pd = groupby_src_pd.set_index('label')
groupby_src_pd.plot.bar()

In [0]:
# Similarly the average dst_bytes for each label was calculated and shown in the table below.
# dst_bytes indicates the number of data bytes from destination to source [1].
# The average dst_bytes for the label "normal" is about 3385. This is not too high compared to the labels with top 3 average dst_bytes (warezmaster, multihop, imap)

groupby_dst = df7.groupby("label").agg(avg("dst_bytes").alias("avg_dst_bytes")).orderBy("avg_dst_bytes", ascending = False)
groupby_dst.display()

label,avg_dst_bytes
warezmaster.,3922087.7
multihop.,213016.2857142857
imap.,54948.66666666666
back.,8232.649568769859
phf.,8127.0
buffer_overflow.,6339.833333333333
ftp_write.,5382.25
rootkit.,4276.6
normal.,3384.651000226156
loadmodule.,3009.8888888888887


In [0]:
groupby_dst_pd = ps.DataFrame(groupby_dst)
groupby_dst_pd = groupby_dst_pd.set_index('label')
groupby_dst_pd.plot.bar()

In [0]:
# This time, let's group by protocol_type and label to see which protocol_type is the most prevalent for normal class labels

# It can be observed that the vast majority (769813) of the "normal" label hs tcp protocol type. However, tcp protocol type also contains a large amount of class "neptune", with counts of 107201. 

# One interesting thing to note is that the udp protocol type is mostly labeled as "normal". 

# icmp protocol type is mostly labelled as not normal. Only 1153 of icmp protocol_type was found to be labelled normal. 

groupby_ptype_label = df7.groupby('protocol_type', 'label').count().orderBy('count', ascending = False)
groupby_ptype_label.display()
groupby_ptype_label_pd = ps.DataFrame(groupby_ptype_label)
groupby_ptype_label_pd = groupby_ptype_label_pd.set_index('protocol_type')
groupby_ptype_label_pd.plot.bar(color = "label")

protocol_type,label,count
icmp,smurf.,280790
tcp,neptune.,107201
tcp,normal.,76813
udp,normal.,19177
tcp,back.,2203
tcp,satan.,1416
icmp,normal.,1288
icmp,ipsweep.,1153
tcp,portsweep.,1039
tcp,warezclient.,1020


In [0]:
# This time, let's group by service and label to see which service is the most prevalent for normal class labels

# It can be seen that a significantly large amount of "normal" classes are found to have the service type as "http". 

# Although there aren't too many counts, services such as "ftp_data", "smtp", "other", and "domain_u" also are likelye to be "normal" classes.

# It is interesting to note that "ecr_i" service type are mostly labelled not as "normal". Same with "private" service type (although it still contains 7366 normal classes, 101317 of its rows are classed to be "neptune")

groupby_service_label = df7.groupby('service', 'label').count().orderBy('count', ascending = False)
groupby_service_label.display()
groupby_ptype_service_pd = ps.DataFrame(groupby_service_label)
groupby_ptype_service_pd = groupby_ptype_service_pd.set_index('service')
groupby_ptype_service_pd.plot.bar(color = "label")

service,label,count
ecr_i,smurf.,280790
private,neptune.,101317
http,normal.,61886
smtp,normal.,9598
private,normal.,7366
domain_u,normal.,5862
other,normal.,5632
ftp_data,normal.,3798
http,back.,2203
other,satan.,1246


In [0]:
# This time let's check the average # of num_access_files for each label class.

# It can be seen that the average number of access files for the "normal" classs is 0.005, which is very close to 0.

# Only labels such as phf, spy, ftp_write, multihop, loadmodule are found to have more than 0 average number of access files.

# Other 17 labels have 0 average number of access files. 

groupby_naf = df7.groupby("label").agg(avg("num_access_files").alias("avg_number_of_access_files")).orderBy("avg_number_of_access_files", ascending = False)
groupby_naf.display()

groupby_naf_pd = ps.DataFrame(groupby_naf)
groupby_naf_pd = groupby_naf_pd.set_index('label')
groupby_naf_pd.plot.bar()

label,avg_number_of_access_files
phf.,1.0
spy.,0.5
ftp_write.,0.375
multihop.,0.2857142857142857
loadmodule.,0.1111111111111111
normal.,0.0050062706881309
guess_passwd.,0.0
land.,0.0
ipsweep.,0.0
rootkit.,0.0


### Step 7.  Look at the label column where label == ‘normal’. Now create a new label column where you have a label == ‘normal’ and everything else is considered as an ‘attack’. Split your data (train/test) and based on your new label column now build a simple machine learning model for intrusion detection (you can use few selected columns for your model out of all). Explain which algorithm you have selected and why? Show the results with some success metrics.

##### Data Pre-Processing

In [0]:
# Continuing from Q5 DataFrame (df):
# Let's take a look at how many "normal" rows we have and how many non-"normal" rows we have.

print("Total counts of rows with the label ""normal"": ", df.filter(df.label=="normal.").count())

print("Total counts of rows with the label not equal to ""normal"": ", df.filter(df.label!="normal.").count())

Total counts of rows with the label normal:  97278
Total counts of rows with the label not equal to normal:  396743


In [0]:
# We have 97278 rows that are class "normal" and 396743 rows that are class non-"normal"

In [0]:
# Build a df again based on Question 5 again
df = sqlContext.createDataFrame(rdd_6)

# Create a copy of the df, called df_original
df_original = df.alias('df_original')

# create a new label column where you have a label == ‘normal’ and everything else is considered as an ‘attack’
# Let's define 0 == "normal" and 1 == "attack"
df = df.withColumn("new_label", \
   when((df.label == "normal."), lit(0)) \
     .otherwise(lit(1)) \
  )

# Let's take a brief look at the df again. The categorical features would have to be converted into numerical values. 
df.show()

+--------+-------------+-------+----+---------+---------+-------+---------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|  label|new_label|
+--------+-------------+-------+----+---------+---------+-------+---------+
|       0|          tcp|   http|  SF|      181|     5450|normal.|        0|
|       0|          tcp|   http|  SF|      239|      486|normal.|        0|
|       0|          tcp|   http|  SF|      235|     1337|normal.|        0|
|       0|          tcp|   http|  SF|      219|     1337|normal.|        0|
|       0|          tcp|   http|  SF|      217|     2032|normal.|        0|
|       0|          tcp|   http|  SF|      217|     2032|normal.|        0|
|       0|          tcp|   http|  SF|      212|     1940|normal.|        0|
|       0|          tcp|   http|  SF|      159|     4087|normal.|        0|
|       0|          tcp|   http|  SF|      210|      151|normal.|        0|
|       0|          tcp|   http|  SF|      212|      786|normal.|        0|
|       0|  

In [0]:
# Double Checking the updated df:

print("Total counts of rows with the new_label as ""normal"" (0): ", df.filter(df.new_label==0).count())

print("Total counts of rows with the new_label as ""attack"" (1): ", df.filter(df.new_label==1).count())

Total counts of rows with the new_label as normal (0):  97278
Total counts of rows with the new_label as attack (1):  396743


In [0]:
# Based on your new label column now build a simple machine learning model for intrusion detection (you can use few selected columns for your model out of all):

    # As per the instruction provided on Piazza, I will use the six features we have chosen from Question 5, which are duration, protocol_type, service, src_bytes, dst_bytes, flag.
    # The new class column I will be using will be named as new_label. In order to apply it to the model, instead of classifying as "normal" or "attack", I will classify them as either 0 or 1, 
    # where 0 indicates "normal" and 1 indicates "attack". 

# Explain which algorithm you have selected and why? Show the results with some success metrics.

    # I will implement LogisticRegression model from pyspark machine learning library because I want to make a prediction whether our class will be "normal" (0) or "attack" (1). Because logistic
    # regression is a supervised ML model that can classify the output based on the probability of the class to occur, I believe it is suitable to use to linearly separate the two label classes. 
    # Success metrics such as AUC, Accuracy, Precision, Recall, and F1 score are to be determined to evaluate the implement model. 
    # Results of the success metrics can be viewed throughout the progress of my model.
    # Note that Demo2 Spark Tutorial was used as a reference to implement my Logistic Regression Model [10]. 

In [0]:
# Features to be used.

num_cols = [ 'duration', 'src_bytes', 'dst_bytes'] # Numerical values
cat_cols = ['protocol_type', 'service', 'flag'] # Categorical columns
label_col = 'new_label' # targets

In [0]:
input_cols = num_cols

In [0]:
stages = []
for col in cat_cols:
    string_indexer = StringIndexer(inputCol=col, outputCol=col + "Index")
    stages += [string_indexer]
    input_cols.append(col + "Index")

In [0]:
vect_assembler = VectorAssembler(inputCols= input_cols, outputCol="features")
stages += [vect_assembler]

In [0]:
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(df)
df_pipline = pipeline_model.transform(df)

In [0]:
df_pipline.display()

duration,protocol_type,service,flag,src_bytes,dst_bytes,label,new_label,protocol_typeIndex,serviceIndex,flagIndex,features
0,tcp,http,SF,181,5450,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 181.0, 5450.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,239,486,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 239.0, 486.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,235,1337,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 235.0, 1337.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,219,1337,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 219.0, 1337.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,217,2032,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 217.0, 2032.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,217,2032,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 217.0, 2032.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,212,1940,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 212.0, 1940.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,159,4087,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 159.0, 4087.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,210,151,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 210.0, 151.0, 1.0, 2.0, 0.0))"
0,tcp,http,SF,212,786,normal.,0,1.0,2.0,0.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 212.0, 786.0, 1.0, 2.0, 0.0))"


In [0]:
# Split the data (train/test)

# I have chosen to have 80% train data and 20% test data. Set seed so the numbers don't change.
train, test = df_pipline.randomSplit([0.8, 0.2], seed=40)

In [0]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol= label_col, featuresCol="features", maxIter=10)

In [0]:
# Train model with train set
lr_model = lr.fit(train)

# Print Coefficients and Intercept of the model
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [3.899987817608827e-05,1.1097019733408875e-07,3.42031537336327e-07,-6.4256930017170095,-0.012115015481420171,3.5921118143781756]
Intercept: 4.759339596467636


In [0]:
# Make prediction using the initial lr_model. Display it to see the predction column. 
pred_initial = lr_model.transform(test)
pred_initial.display()

duration,protocol_type,service,flag,src_bytes,dst_bytes,label,new_label,protocol_typeIndex,serviceIndex,flagIndex,features,rawPrediction,probability,prediction
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-4.674535375859273, 4.674535375859273))","Map(vectorType -> dense, length -> 2, values -> List(0.009243617361849631, 0.9907563826381504))",1.0


In [0]:
# Evaluate the Area Under the ROC curve.
evaluator_LR = BinaryClassificationEvaluator(rawPredictionCol="prediction",  labelCol='new_label',)

# Area Under ROC for the initial model:
area_under_curve_initial = evaluator_LR.evaluate(pred_initial)

#default evaluation is areaUnderROC
print("areaUnderROC for the initial LR model = %g" % area_under_curve_initial)

evaluator_LR.getMetricName()

areaUnderROC for the initial LR model = 0.95998
Out[54]: 'areaUnderROC'

In [0]:
# Let's check accuracy, precision, recall, and f1 for our evaluation metrics. 

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol = "new_label", predictionCol = "prediction", metricName = "accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol = "new_label", predictionCol = "prediction", metricName = "precisionByLabel")
evaluator_recall = MulticlassClassificationEvaluator(labelCol = "new_label", predictionCol = "prediction", metricName = "recallByLabel")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol = "new_label", predictionCol = "prediction", metricName = "f1")

In [0]:
initial_accuracy = evaluator_accuracy.evaluate(pred_initial)
initial_precision = evaluator_precision.evaluate(pred_initial)
initial_recall = evaluator_recall.evaluate(pred_initial)
initial_f1score = evaluator_f1.evaluate(pred_initial)

print("Initial Test Accuracy = %g" % initial_accuracy)
print("Initial Precision = %g" % initial_precision)
print("Initial Recall = %g" % initial_recall)
print("Initial F1 Score = %g" % initial_f1score)

Initial Test Accuracy = 0.977468
Initial Precision = 0.954805
Initial Recall = 0.930917
Initial F1 Score = 0.97736


In [0]:
# Let's hypertune our model by using cross validator so that we get the best model.

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20])
             .build())

# cross validator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator_LR, numFolds=5)

# Run cross validations
cv_model = cv.fit(train)

In [0]:
# Using the best model, let's evaluate our prediction with the cross validated model.

# use best model to predict
pred_cv = cv_model.bestModel.transform(test)

# Display the cross-validated prediction result:
pred_cv.display()


# AUC for the prediction made on the cross validated model.
area_under_curve_cv = evaluator_LR.evaluate(pred_cv)

# Accuracy with crossvalidated model
cv_accuracy = evaluator_accuracy.evaluate(pred_cv)

# Precision
cv_precision = evaluator_precision.evaluate(pred_cv)

# Recall
cv_recall = evaluator_recall.evaluate(pred_cv)

# F1 Score
cv_f1score = evaluator_f1.evaluate(pred_cv)

#default evaluation is areaUnderROC
print("areaUnderROC for cross validated model = %g" % area_under_curve_cv)
print("Test Accuracy for cross validated model = %g" % cv_accuracy)
print("Precision for cross validated model = %g" % cv_precision)
print("Recall for cross validated model = %g" % cv_recall)
print("F1 Score Accuracy for cross validated model = %g" % cv_f1score)

duration,protocol_type,service,flag,src_bytes,dst_bytes,label,new_label,protocol_typeIndex,serviceIndex,flagIndex,features,rawPrediction,probability,prediction
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0
0,icmp,eco_i,SF,8,0,ipsweep.,1,0.0,7.0,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1, 4), values -> List(8.0, 7.0))","Map(vectorType -> dense, length -> 2, values -> List(-3.517836231659109, 3.517836231659109))","Map(vectorType -> dense, length -> 2, values -> List(0.028808974354223398, 0.9711910256457766))",1.0


areaUnderROC for cross validated model = 0.959961
Test Accuracy for cross validated model = 0.977437
Precision for cross validated model = 0.954655
Recall for cross validated model = 0.930917
F1 Score Accuracy for cross validated model = 0.97733


In [0]:
# It can be seen that our initial model generally performed better as the initial model has higher AUC, Accuracy, Precision, and F1 Score. Recall is the same for the initial and the cross validated model. I believe, the cross-validated model could be cuasing slightly overfitting of our model.

# Although the differences of the evaltion metrics scores are not too big (very minute difference) between the initial model and the cross validated model, it could be better to use the initial model.

## References

[1] http://kdd.ics.uci.edu/databases/kddcup99/task.html

[2] https://azure.microsoft.com/en-us/overview/what-is-paas/

[3] https://www.microsoftpressstore.com/articles/article.aspx?p=3113586

[4] Lecture 5 - MIE1628, University of Toronto

[5] https://learn.microsoft.com/en-us/archive/msdn-magazine/2015/september/microsoft-azure-fault-tolerance-pitfalls-and-resolutions-in-the-cloud

[6] https://www.cloudflare.com/learning/performance/glossary/what-is-latency/

[7] https://azure.microsoft.com/en-us/resources/cloud-computing-dictionary/what-are-private-public-hybrid-clouds/#overview

[8] https://azure.microsoft.com/en-us/resources/cloud-computing-dictionary/what-are-private-public-hybrid-clouds/#benefits

[9] https://azure.microsoft.com/en-us/resources/cloud-computing-dictionary/what-is-a-public-cloud/

[10] Demo 2 - MIE1628 Databricks Spark Tutorial Session