<a href="https://colab.research.google.com/github/venu-analytics/Analytics-Projects/blob/Python-Dev/Session8_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Initialisation

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
!tar xf spark-2.2.1-bin-hadoop2.7.tgz
!pip install -q findspark

Hit:1 http://security.ubuntu.com/ubuntu artful-security InRelease
Hit:2 http://archive.ubuntu.com/ubuntu artful InRelease
Hit:3 http://archive.ubuntu.com/ubuntu artful-updates InRelease
Hit:4 http://archive.ubuntu.com/ubuntu artful-backports InRelease
Reading package lists... Done


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.2.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
sc = spark.sparkContext
sc

## Getting the data and creating the RDD

In [None]:
import urllib
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [None]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [None]:
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

## An RDD of Dense vector

In [None]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

In [None]:
vector_data.take(5)

[array([0.00e+00, 1.81e+02, 5.45e+03, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 9.00e+00, 9.00e+00,
        1.00e+00, 0.00e+00, 1.10e-01, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00]),
 array([0.00e+00, 2.39e+02, 4.86e+02, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 1.90e+01, 1.90e+01,
        1.00e+00, 0.00e+00, 5.00e-02, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00]),
 array([0.000e+00, 2.350e+02, 1.337e+03, 0.000e+00, 0.000e+00, 0.000e+00,
        0.000e+00, 0.000e+00, 1.000e+00, 0.000e+00

## Summary Statistics

In [None]:
from pyspark.mllib.stat import Statistics
from math import sqrt

#Compute column summary statistics
summary = Statistics.colStats(vector_data)

print("Duration Statistics:")
print("Mean is {}".format(round(summary.mean()[0],3)))
print("St. deviation is {}".format(round(summary.variance()[0],3)))
print("Max value is {}".format(round(summary.max()[0],3)))
print("Min value is {}".format(round(summary.min()[0],3)))
print("Total Values Count: {}".format(summary.count()))
print("Number of non-zero values: {}".format(summary.numNonzeros()[0]))

Duration Statistics:
Mean is 47.979
St. deviation is 500905.069
Max value is 58329.0
Min value is 0.0
Total Values Count: 494021
Number of non-zero values: 12350.0


## Summary Statistics by Label

In [None]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)

In [None]:
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.")


**Now we can use the new RDD to call colStats on the values.**

In [None]:
normal_summary = Statistics.colStats(normal_label_data.values())

**And collect the results as we did before.**

In [None]:
print("Duration Statistics for label: {}".format("normal"))
print(" Mean: {}".format(normal_summary.mean()[0],3))
print(" St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print(" Max value: {}".format(round(normal_summary.max()[0],3)))
print(" Min value: {}".format(round(normal_summary.min()[0],3)))
print(" Total value count: {}".format(normal_summary.count()))
print(" Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


In [None]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

In [None]:
 normal_sum = summary_by_label(raw_data, "normal.")

print("Duration Statistics for label: {}".format("normal"))
print (" Mean: {}".format(normal_sum.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3)))
print (" Max value: {}".format(round(normal_sum.max()[0],3)))
print (" Min value: {}".format(round(normal_sum.min()[0],3)))
print (" Total value count: {}".format(normal_sum.count()))
print (" Number of non-zero values: {}".format(normal_sum.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


In [None]:
guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.")

print ("Duration Statistics for label: {}".format("guess_password"))
print( " Mean: {}".format(guess_passwd_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)))
print (" Max value: {}".format(round(guess_passwd_summary.max()[0],3)))
print (" Min value: {}".format(round(guess_passwd_summary.min()[0],3)))
print (" Total value count: {}".format(guess_passwd_summary.count()))
print (" Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0]))

Duration Statistics for label: guess_password
 Mean: 2.7169811320754715
 St. deviation: 11.88
 Max value: 60.0
 Min value: 0.0
 Total value count: 53
 Number of non-zero values: 4.0


In [None]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

In [None]:
stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list]

In [None]:
duration_by_label = [ 
    (stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) 
    for stat in stats_by_label]

In [None]:
import pandas as pd
pd.set_option('display.max_columns', 50)

stats_by_label_df = pd.DataFrame.from_items(duration_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

In [None]:
print ("Duration statistics, by label")
stats_by_label_df

Duration statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [None]:
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

In [None]:
get_variable_stats_df(stats_by_label,0)

Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [None]:
print ("src_bytes statistics, by label")
get_variable_stats_df(stats_by_label,1)

src_bytes statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,54156.355878,3159.36,13140.0,54540.0,2203.0
buffer_overflow.,1400.433333,1337.133,0.0,6274.0,30.0
ftp_write.,220.75,267.7476,0.0,676.0,8.0
guess_passwd.,125.339623,3.03786,104.0,126.0,53.0
imap.,347.583333,629.926,0.0,1492.0,12.0
ipsweep.,10.0834,5.231658,0.0,18.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,151.888889,127.7453,0.0,302.0,9.0
multihop.,435.142857,540.9604,0.0,1412.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [None]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print ("Sample size is ",sample_size , "of ", total_size)

Sample size is  49493 of  494021


In [None]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data_sample = raw_data_sample.map(parse_interaction)
vector_data_sample.take(2)

[array([0.00e+00, 1.81e+02, 5.45e+03, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 9.00e+00, 9.00e+00,
        1.00e+00, 0.00e+00, 1.10e-01, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00]),
 array([0.00e+00, 2.10e+02, 1.51e+02, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 8.00e+00, 8.90e+01,
        1.00e+00, 0.00e+00, 1.20e-01, 4.00e-02, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00])]

In [None]:
from pyspark.mllib.stat import Statistics 
correlation_matrix = Statistics.corr(vector_data_sample, method="spearman") # we have used a smaller sample here

In [None]:
import pandas as pd
pd.set_option('display.max_columns', 50)

col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)

corr_df

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,is_hot_login,is_guest_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
duration,1.0,0.014854,0.297922,-0.001017,-0.007621,0.028234,0.101065,-0.001439,0.15692,0.01203,0.033684,,0.015708,0.072401,0.008206,0.004576,,,0.196682,-0.258991,-0.250785,-0.073436,-0.073242,-0.022575,-0.023619,0.061804,-0.05015,0.129145,-0.161715,-0.220296,-0.215577,0.237118,-0.065232,0.104728,-0.058296,-0.056524,-0.007542,-0.014086
src_bytes,0.014854,1.0,-0.169473,-0.00895,-0.022158,-0.004041,0.106511,-0.007582,-0.093463,0.114493,0.002368,,-0.005549,0.023594,0.020213,-0.002262,,,0.023215,0.673891,0.72892,-0.657138,-0.651919,-0.344384,-0.335915,0.74651,-0.74259,-0.105738,0.134125,0.74601,0.732665,-0.717587,0.818764,-0.143084,-0.645579,-0.640117,-0.305603,-0.307658
dst_bytes,0.297922,-0.169473,1.0,-0.002882,-0.02159,0.011808,0.187336,0.017105,0.883522,0.163355,0.026866,,-0.004147,0.039026,-0.000986,0.061647,,,0.079851,-0.63739,-0.495782,-0.206125,-0.200039,-0.100882,-0.082943,0.228386,-0.221267,0.526787,-0.605922,0.026204,0.053968,-0.033624,-0.397466,0.574511,-0.164738,-0.156567,-0.01156,-0.006518
land,-0.001017,-0.00895,-0.002882,1.0,-0.000303,-2.9e-05,-0.000497,-5.7e-05,-0.00263,-0.00041,-7e-05,,-0.000231,-0.00014,-8.1e-05,-0.000181,,,-0.00022,-0.010684,-0.01009,0.013622,0.013664,-0.001607,-0.001625,0.003415,-0.003419,0.011359,-0.019338,-0.010545,0.004034,-0.00405,0.005183,0.020627,0.013307,0.010495,-0.001766,-0.00174
wrong_fragment,-0.007621,-0.022158,-0.02159,-0.000303,1.0,-0.000214,-0.003725,-0.000428,-0.019703,-0.003071,-0.000524,,-0.001727,-0.001049,-0.000606,-0.001354,,,-0.001645,-0.054439,-0.026845,-0.007755,-0.022209,-0.001948,-0.012171,0.00959,-0.008084,0.009094,-0.028674,-0.056104,-0.047199,0.055455,-0.015379,0.004921,0.015411,-0.022931,0.051155,-0.013032
urgent,0.028234,-0.004041,0.011808,-2.9e-05,-0.000214,1.0,-0.000351,-3.8e-05,0.010865,0.069687,-5.5e-05,,0.123856,-9.5e-05,-5.7e-05,0.158046,,,-0.000157,-0.007555,-0.007587,-0.002109,-0.002096,-0.001136,-0.001148,0.002414,-0.002418,-0.001233,-0.013534,-0.009236,-0.004762,0.009611,-0.001707,-0.001527,-0.00218,-0.002164,-0.001248,-0.001231
hot,0.101065,0.106511,0.187336,-0.000497,-0.003725,-0.000351,1.0,0.114307,0.184684,0.799647,0.140633,,0.004374,0.045623,0.019523,-0.002224,,,0.443815,-0.117548,-0.111706,-0.033359,-0.033131,0.017551,0.055817,0.040439,-0.039853,0.038803,-0.069,-0.013506,0.019205,-0.015684,-0.083343,-0.010144,0.002638,-0.002728,0.182773,0.178575
num_failed_logins,-0.001439,-0.007582,0.017105,-5.7e-05,-0.000428,-3.8e-05,0.114307,1.0,-0.003719,-0.000579,-9.8e-05,,-0.000328,-0.0002,-0.000114,-0.000253,,,-0.000309,-0.014698,-0.014722,-0.004218,-0.004192,0.035713,0.035298,0.004829,-0.004836,-0.002468,-0.024911,-0.010999,0.005705,-0.005728,-0.005109,-0.003056,0.014629,0.014833,0.031014,0.031362
logged_in,0.15692,-0.093463,0.883522,-0.00263,-0.019703,0.010865,0.184684,-0.003719,1.0,0.155888,0.026616,,0.087655,0.053241,0.030734,0.068745,,,0.083506,-0.575376,-0.435923,-0.187335,-0.180977,-0.091605,-0.073307,0.21657,-0.213788,0.510475,-0.674912,0.084897,0.11602,-0.0961,-0.360047,0.655313,-0.140352,-0.130404,-0.002099,0.005451
num_compromised,0.01203,0.114493,0.163355,-0.00041,-0.003071,0.069687,0.799647,-0.000579,0.155888,1.0,0.085353,,0.032517,0.041631,0.024032,0.009234,,,-0.002228,-0.093844,-0.08848,-0.028906,-0.028721,0.014927,0.060239,0.034067,-0.03379,0.041642,-0.032615,0.006644,0.037112,-0.037242,-0.075873,-0.020783,0.004655,0.004613,0.203364,0.205928


In [None]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

Unnamed: 0,src_bytes,dst_bytes,logged_in,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
src_bytes,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
dst_bytes,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
logged_in,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
count,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
srv_count,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
serror_rate,False,False,False,False,False,False,True,False,False,True,True,False,False,False,False,False,False,True,True,False,False
srv_serror_rate,False,False,False,False,False,True,False,False,False,True,True,False,False,False,False,False,False,True,True,False,False
rerror_rate,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,True,True
srv_rerror_rate,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,True,True
same_srv_rate,False,False,False,False,False,True,True,False,False,False,True,False,True,True,True,False,False,True,True,False,False
