In this notebook we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.

Additionally, we will introduce two ways of performing model selection: by using a correlation matrix and by using hypothesis testing.

In [1]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print("Train data size is {}".format(raw_data.count()))

Train data size is 4898431


In [2]:
test_data_file = './corrected.gz'
test_raw_data = sc.textFile(test_data_file)
print("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


### Labeled Points
A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).

### Preparing the training data
In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').

In [3]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

def parse_interaction(line):
    line_split = line.split(',')
    clean_line_split = line_split[0:1] + line_split[4:41]
    attack = 1.0
    if line_split[41] == 'normal.':
        attack = 0.0
    return LabeledPoint(attack,np.array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

In [11]:
training_data.take(1)[0].label

0.0

### Preparing test data

In [5]:
test_data = test_raw_data.map(parse_interaction)

### Detecting network attacks using Logistic Regression
Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.

In [6]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from timeit import default_timer as timer

# build the model
T0 = timer()
mlr = LogisticRegressionWithLBFGS.train(training_data)
T1 = timer() - T0

print('Classifier trained in {} seconds'.format(round(T1,3)))

Classifier trained in 1192.259 seconds


In [12]:
training_data.filter(lambda x:x.label == 1.0).count()    

3925650

In [13]:
training_data.filter(lambda x:x.label == 0.0).count()   

972781

In [14]:
3925650/972781

4.035492058335843

### Evaluating the model on new data
In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.

In [15]:
labels_and_preds = test_data.map(lambda p:(p.label,mlr.predict(p.features)))

In [17]:
labels_and_preds.take(1)

[(0.0, 0)]

In [18]:
t0 = timer()
test_accuracy = labels_and_preds.filter(lambda x:x[0] == x[1]).count() / float(test_data.count())
tt = timer() - t0

print("Prediction made in {:3f} seconds. Test accuracy is {:.4f}".format(tt,test_accuracy))

Prediction made in 8.274472 seconds. Test accuracy is 0.8626


### Model selection

#### Using a correlation matrix

In [22]:
def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

In [26]:
# Build the model
t0 = timer()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = timer() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 1180.0 seconds


In [28]:
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = timer()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(corr_reduced_test_data.count())
tt = timer() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 8.137 seconds. Test accuracy is 0.8134


#### Using hypothsis testing

In [27]:
feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

In [29]:
def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))

training_data_categorical = raw_data.map(parse_interaction_categorical)

In [31]:
from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)

In [38]:
import pandas as pd
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])

chi_df

Unnamed: 0,Statistic,p-value
land,0.4649835,0.4953041
wrong_fragment,306.8555,0.0
urgent,38.71844,2.705761e-07
hot,19463.31,0.0
num_failed_logins,127.7691,0.0
logged_in,3273098.0,0.0
num_compromised,2011.863,0.0
root_shell,1044.918,0.0
su_attempted,434.0,0.0
num_root,22871.68,0.0


From that we conclude that predictors land and num_outbound_cmds could be removed from our model without affecting our accuracy dramatically. Let's try this.

#### Evaluating the new model
So the only modification to our first parse_interaction function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.

In [39]:
def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))

training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)

In [40]:
# Build the model
t0 = timer()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = timer() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 1189.98 seconds


In [41]:
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = timer()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data_chi.count())
tt = timer() - t0

print("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Prediction made in 8.284 seconds. Test accuracy is 0.872
