In [0]:
#from pyspark import  SparkContext
#sc = SparkContext( 'local', 'pyspark')

**Install, Initialise**

In [0]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://apache.osuosl.org/spark/spark-2.2.2/spark-2.2.2-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
#
# if the current version of Spark is not used, there may be errors
# check here for current versions http://apache.osuosl.org/spark
#
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:6 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:8 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ Packages [80.2 kB]
Get:9 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic/main Sources [1,745 kB]
Get:10 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [761 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 Packages [1,057 kB]
Get:12 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [795 kB]
Get:13 http://archive.ubuntu.com/ubuntu

In [0]:
!ls

sample_data  spark-2.4.4-bin-hadoop2.7	spark-2.4.4-bin-hadoop2.7.tgz


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
#
#findspark.init('/content/spark-2.2.2-bin-hadoop2.7')
#
# https://stackoverflow.com/questions/42223498/findspark-init-indexerror-list-index-out-of-range-error
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
sc

**Getting the data and creating the RDD**

In [0]:
import urllib
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [0]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

**An RDD of dense vectors**

In [0]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

**Summary statistics**

In [0]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print("Duration Statistics:")
print(" Mean: {}".format(round(summary.mean()[0],3)))
print(" St. deviation: {}".format(round(sqrt(summary.variance()[0]),3)))
print(" Max value: {}".format(round(summary.max()[0],3)))
print(" Min value: {}".format(round(summary.min()[0],3)))
print(" Total value count: {}".format(summary.count()))
print(" Number of non-zero values: {}".format(summary.numNonzeros()[0]))

Duration Statistics:
 Mean: 47.979
 St. deviation: 707.746
 Max value: 58329.0
 Min value: 0.0
 Total value count: 494021
 Number of non-zero values: 12350.0


**Summary statistics by label**

In [0]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)

In [0]:
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.")

In [0]:
normal_summary = Statistics.colStats(normal_label_data.values())

In [0]:
print("Duration Statistics for label: {}".format("normal"))
print(" Mean: {}".format(normal_summary.mean()[0],3))
print(" St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print(" Max value: {}".format(round(normal_summary.max()[0],3)))
print(" Min value: {}".format(round(normal_summary.min()[0],3)))
print(" Total value count: {}".format(normal_summary.count()))
print(" Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


In [0]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

In [0]:
normal_sum = summary_by_label(raw_data, "normal.")

print("Duration Statistics for label: {}".format("normal"))
print(" Mean: {}".format(normal_sum.mean()[0],3))
print(" St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3)))
print(" Max value: {}".format(round(normal_sum.max()[0],3)))
print(" Min value: {}".format(round(normal_sum.min()[0],3)))
print(" Total value count: {}".format(normal_sum.count()))
print(" Number of non-zero values: {}".format(normal_sum.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


In [0]:
guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.")

print("Duration Statistics for label: {}".format("guess_password"))
print(" Mean: {}".format(guess_passwd_summary.mean()[0],3))
print(" St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)))
print(" Max value: {}".format(round(guess_passwd_summary.max()[0],3)))
print(" Min value: {}".format(round(guess_passwd_summary.min()[0],3)))
print(" Total value count: {}".format(guess_passwd_summary.count()))
print(" Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0]))

Duration Statistics for label: guess_password
 Mean: 2.7169811320754715
 St. deviation: 11.88
 Max value: 60.0
 Min value: 0.0
 Total value count: 53
 Number of non-zero values: 4.0


In [0]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

In [0]:
stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list]

In [0]:
duration_by_label = [ 
    (stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) 
    for stat in stats_by_label]

In [0]:
import pandas as pd
pd.set_option('display.max_columns', 50)

stats_by_label_df = pd.DataFrame.from_items(duration_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

  after removing the cwd from sys.path.


In [0]:
print("Duration statistics, by label")
stats_by_label_df

Duration statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [0]:
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

In [0]:
get_variable_stats_df(stats_by_label,0)

  


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [0]:
print("src_bytes statistics, by label")
get_variable_stats_df(stats_by_label,1)

src_bytes statistics, by label


  


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,54156.355878,3159.36,13140.0,54540.0,2203.0
buffer_overflow.,1400.433333,1337.133,0.0,6274.0,30.0
ftp_write.,220.75,267.7476,0.0,676.0,8.0
guess_passwd.,125.339623,3.03786,104.0,126.0,53.0
imap.,347.583333,629.926,0.0,1492.0,12.0
ipsweep.,10.0834,5.231658,0.0,18.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,151.888889,127.7453,0.0,302.0,9.0
multihop.,435.142857,540.9604,0.0,1412.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


**Correlations**

In [0]:
from pyspark.mllib.stat import Statistics 
correlation_matrix = Statistics.corr(vector_data, method="spearman")

In [0]:
import pandas as pd
pd.set_option('display.max_columns', 50)

col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)

corr_df

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,is_hot_login,is_guest_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
duration,1.0,0.014196,0.299189,-0.001068,-0.008025,0.017883,0.108639,0.014363,0.159564,0.010687,0.040425,0.026015,0.013401,0.061099,0.008632,0.019407,-2e-05,-1e-05,0.205606,-0.259032,-0.250139,-0.074211,-0.073663,-0.025936,-0.02642,0.062291,-0.050875,0.123621,-0.161107,-0.217167,-0.211979,0.231644,-0.065202,0.100692,-0.056753,-0.057298,-0.007759,-0.013891
src_bytes,0.014196,1.0,-0.167931,-0.009404,-0.019358,9.4e-05,0.11392,-0.008396,-0.089702,0.118562,0.003067,0.002282,-0.00205,0.02771,0.014403,-0.001497,1e-05,1.9e-05,0.027511,0.66623,0.722609,-0.65746,-0.652391,-0.34218,-0.332977,0.744046,-0.739988,-0.104042,0.130377,0.741979,0.729151,-0.712965,0.815039,-0.140231,-0.64592,-0.641792,-0.297338,-0.300581
dst_bytes,0.299189,-0.167931,1.0,-0.00304,-0.022659,0.007234,0.193156,0.021952,0.882185,0.169772,0.026054,0.012192,-0.003884,0.034154,-5.4e-05,0.065776,-3e-05,4.1e-05,0.085947,-0.639157,-0.497683,-0.205848,-0.198715,-0.100958,-0.081307,0.229677,-0.222572,0.521003,-0.611972,0.024124,0.055033,-0.035073,-0.396195,0.578557,-0.167047,-0.158378,-0.003042,0.001621
land,-0.001068,-0.009404,-0.00304,1.0,-0.000333,-6.5e-05,-0.000539,-7.6e-05,-0.002785,-0.000447,-9.3e-05,-4.9e-05,-0.00023,-0.000149,-7.6e-05,-0.000211,-0.002868,0.002099,-0.00025,-0.010939,-0.010128,0.01416,0.014342,-0.000451,-0.00169,0.002153,-0.001846,0.020678,-0.019923,-0.012341,0.002576,-0.001803,0.004265,0.016171,0.013566,0.012265,0.000389,-0.001816
wrong_fragment,-0.008025,-0.019358,-0.022659,-0.000333,1.0,-0.00015,-0.004042,-0.000568,-0.020911,-0.00337,-0.000528,-0.000248,-0.001727,-0.00116,-0.000507,-0.001519,-0.000146,0.000442,-0.001869,-0.057711,-0.029117,-0.008849,-0.023382,0.00043,-0.012676,0.010218,-0.009386,0.012117,-0.029149,-0.058225,-0.04956,0.055542,-0.015449,0.007306,0.010387,-0.024117,0.046656,-0.013666
urgent,0.017883,9.4e-05,0.007234,-6.5e-05,-0.00015,1.0,0.008594,0.063009,0.006821,0.031765,0.067437,2e-05,0.061994,0.061383,-6.5e-05,0.02338,0.012915,0.005191,-0.0001,-0.004778,-0.004799,-0.001338,-0.001327,-0.000705,-0.000726,0.001521,-0.001522,-0.000788,-0.005894,-0.005698,-0.004078,0.005208,-0.001939,-0.000976,-0.001381,-0.00137,-0.000786,-0.000782
hot,0.108639,0.11392,0.193156,-0.000539,-0.004042,0.008594,1.0,0.11256,0.189126,0.811529,0.101983,-0.0004,0.003096,0.028694,0.009146,0.004224,-0.000392,-0.000247,0.463706,-0.120847,-0.114735,-0.035487,-0.034934,0.013468,0.052003,0.041342,-0.040555,0.032141,-0.074178,-0.01796,0.018783,-0.017198,-0.086998,-0.014141,-0.004706,-0.010721,0.199019,0.189142
num_failed_logins,0.014363,-0.008396,0.021952,-7.6e-05,-0.000568,0.063009,0.11256,1.0,-0.00219,0.004619,0.016895,0.072748,0.01006,0.015211,-9.3e-05,0.005581,0.003439,-0.001554,-0.000428,-0.018024,-0.018027,-0.003674,-0.004027,0.035324,0.034877,0.005716,-0.005538,-0.003096,-0.028369,-0.015092,0.003004,-0.00296,-0.006617,-0.002588,0.014713,0.014914,0.032395,0.032151
logged_in,0.159564,-0.089702,0.882185,-0.002785,-0.020911,0.006821,0.189126,-0.00219,1.0,0.16119,0.025293,0.011813,0.082533,0.05553,0.024354,0.072698,7.9e-05,0.000127,0.089318,-0.578287,-0.438947,-0.187114,-0.180122,-0.091962,-0.072287,0.216969,-0.214019,0.503807,-0.682721,0.080352,0.114526,-0.093565,-0.359506,0.659078,-0.143283,-0.132474,0.007236,0.012979
num_compromised,0.010687,0.118562,0.169772,-0.000447,-0.00337,0.031765,0.811529,0.004619,0.16119,1.0,0.085558,0.048985,0.028557,0.031223,0.011256,0.006977,0.001042,-0.000443,-0.002504,-0.097212,-0.091154,-0.030516,-0.030264,0.008573,0.054006,0.035253,-0.034953,0.036497,-0.041615,0.003465,0.03898,-0.039091,-0.078843,-0.020979,-0.005019,-0.004504,0.214115,0.217858


In [0]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

Unnamed: 0,src_bytes,dst_bytes,hot,logged_in,num_compromised,num_outbound_cmds,is_hot_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
src_bytes,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
dst_bytes,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
hot,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
logged_in,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
num_compromised,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
num_outbound_cmds,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
is_hot_login,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
count,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
srv_count,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
serror_rate,False,False,False,False,False,False,False,False,False,False,True,False,False,True,True,False,False,False,False,False,False,True,True,False,False


**8. Classification with Logistic Regression**


In [0]:
print("Train data size is {}".format(raw_data.count()))

Train data size is 494021


In [0]:
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")

In [0]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


**Preparing the training data**

In [0]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

**Preparing the test data**

In [0]:
test_data = test_raw_data.map(parse_interaction)

**Detecting network attacks using Logistic Regression**

In [0]:
##Training a classifier

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 171.486 seconds


**Evaluating the model on new data**

In [0]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

In [0]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda x : x[0]==x[1]).count() / float(test_data.count())
tt = time() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 11.658 seconds. Test accuracy is 0.9049



**Evaluating the new model**

In [0]:
def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

In [0]:
# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 159.27 seconds


In [0]:
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda x : x[0] == x[1]).count() / float(corr_reduced_test_data.count())
tt = time() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 11.479 seconds. Test accuracy is 0.8158


**9. Detecting network attacks using Decision Trees**

**Preparing the data**

In [0]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

In [0]:
def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

**Training a classifier**

In [0]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))


Classifier trained in 14.535 seconds


**Evaluating the model**

In [0]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

In [0]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda x : x[0] == x[1]).count() / float(test_data.count())
tt = time() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 17.393 seconds. Test accuracy is 0.9196


**Interpreting the model**

In [0]:
print("Learned classification tree model:")
print(tree_model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 59.0)
   If (feature 25 <= 0.5)
    If (feature 9 <= 0.5)
     If (feature 36 <= 0.46499999999999997)
      Predict: 0.0
     Else (feature 36 > 0.46499999999999997)
      Predict: 1.0
    Else (feature 9 > 0.5)
     If (feature 4 <= 1099.0)
      Predict: 0.0
     Else (feature 4 > 1099.0)
      Predict: 1.0
   Else (feature 25 > 0.5)
    If (feature 38 <= 0.105)
     If (feature 22 <= 6.5)
      Predict: 0.0
     Else (feature 22 > 6.5)
      Predict: 1.0
    Else (feature 38 > 0.105)
     If (feature 3 in {3.0,1.0})
      Predict: 0.0
     Else (feature 3 not in {3.0,1.0})
      Predict: 1.0
  Else (feature 22 > 59.0)
   If (feature 5 <= 2.0)
    If (feature 11 <= 0.5)
     Predict: 1.0
    Else (feature 11 > 0.5)
     Predict: 0.0
   Else (feature 5 > 2.0)
    If (feature 2 in {0.0,10.0,1.0,3.0,23.0})
     If (feature 4 <= 6.5)
      Predict: 1.0
     Else (feature 4 > 6.5)


In [0]:
print("Service 0 is {}".format(services[0]))
print("Service 52 is {}".format(services[52]))

Service 0 is http
Service 52 is netbios_dgm


**Building a minimal model using the three main splits**

In [0]:
def create_labeled_point_minimal(line_split):
    # leave_out = [41]
    clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[0] = flags.index(clean_line_split[0])
    except:
        clean_line_split[0] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)

In [0]:
# Build the model
t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2, 
                                          categoricalFeaturesInfo={0: len(flags)},
                                          impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 7.348 seconds


In [0]:
#Now we can predict on the testing data and calculate accuracy.
predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)

In [0]:
t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda x : x[0] == x[1]).count() / float(test_data_minimal.count())
tt = time() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 9.38 seconds. Test accuracy is 0.9153
