# KDD Cup 99 - PySpark

This is my try with the *KDD Cup of 1999* using Python, Scikit-learn, and Spark.  
The dataset for this data mining competition can be found [here](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html).

## Task description summary

You can find the complete description of the task [here](http://kdd.ics.uci.edu/databases/kddcup99/task.html).  

Software to detect network intrusions protects a computer network from unauthorized users, including perhaps insiders.  The intrusion detector learning task is to build a predictive model (i.e. a classifier) capable of distinguishing between *bad connections*, called intrusions or attacks, and *good normal connections*.  

A connection is a sequence of TCP packets starting and ending at some well defined times, between which data flows to and from a source IP address to a target IP address under some well defined protocol.  Each connection is labeled as either normal, or as an attack, with exactly one specific attack type.  Each connection record consists of about 100 bytes.  

Attacks fall into four main categories:  

- DOS: denial-of-service, e.g. syn flood;  
- R2L: unauthorized access from a remote machine, e.g. guessing password;  
- U2R:  unauthorized access to local superuser (root) privileges, e.g., various ``buffer overflow'' attacks;  
- probing: surveillance and other probing, e.g., port scanning.  

It is important to note that the test data is not from the same probability distribution as the training data, and it includes specific attack types not in the training data. This makes the task more realistic. The datasets contain a total of 24 training attack types, with an additional 14 types in the test data only.    

Some intrusion experts believe that most novel attacks are variants of known attacks and the "signature" of known attacks can be sufficient to catch novel variants. Based on this idea, we will experiment with different machine learning approaches.  

## Approach

We will start by working on a reduced dataset (the 10 percent dataset provided).  

There we will do some exploratory data analysis using `Pandas`. Then we will build a classifier using `Scikit-learn`. Our classifier will just classify entries into `normal` or `attack`. By doing so, we can generalise the model to new attack types.    

However, in our final approach we want to use clustering and anomality detection. We want our model to be able to work well with unknown attack types and also to give an approchimation of the closest attack type. Initially we will do clustering using `Scikit-learn` again and see if we can beat our previous classifier.  

Finally, we will use `Spark` to implement the clustering approach on the complete dataset containing around 5 million interactions.  

## Loading the data

In [8]:
import pandas
import os
from time import time
col_names = ["duration","protocol_type","service","flag","src_bytes",
    "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
    "logged_in","num_compromised","root_shell","su_attempted","num_root",
    "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
    "is_host_login","is_guest_login","count","srv_count","serror_rate",
    "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
    "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
    "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
    "dst_host_rerror_rate","dst_host_srv_rerror_rate","label"]
# kdd_data_10percent = pandas.read_csv("/nfs/data/KDD99/kddcup.data_10_percent", header=None, names = col_names)
cwd = os.getcwd()
data_file = os.path.join(cwd, "data/kddcup.data_10_percent_corrected")
kdd_data_10percent = pandas.read_csv(data_file, header=None, names = col_names)
kdd_data_10percent.describe()
# kdd_data_10percent.head()

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,...,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
count,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,...,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0
mean,47.979302,3025.61,868.5324,4.5e-05,0.006433,1.4e-05,0.034519,0.000152,0.148247,0.010212,...,232.470778,188.66567,0.75378,0.030906,0.601935,0.006684,0.176754,0.176443,0.058118,0.057412
std,707.746472,988218.1,33040.0,0.006673,0.134805,0.00551,0.782103,0.01552,0.355345,1.798326,...,64.74538,106.040437,0.410781,0.109259,0.481309,0.042133,0.380593,0.380919,0.23059,0.23014
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,45.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,46.0,0.41,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,520.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
75%,0.0,1032.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,255.0,1.0,0.04,1.0,0.0,0.0,0.0,0.0,0.0
max,58329.0,693375600.0,5155468.0,1.0,3.0,3.0,30.0,5.0,1.0,884.0,...,255.0,255.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


Now we have our data loaded into a `Pandas` data frame. In order to get familiar with our data, let's have a look at how the labels are distributed.  

In [9]:
kdd_data_10percent['label'].value_counts()

smurf.              280790
neptune.            107201
normal.              97278
back.                 2203
satan.                1589
ipsweep.              1247
portsweep.            1040
warezclient.          1020
teardrop.              979
pod.                   264
nmap.                  231
guess_passwd.           53
buffer_overflow.        30
land.                   21
warezmaster.            20
imap.                   12
rootkit.                10
loadmodule.              9
ftp_write.               8
multihop.                7
phf.                     4
perl.                    3
spy.                     2
Name: label, dtype: int64

## Feature selection

Initially, we will use all features. We need to do something with our categorical variables. For now, we will not include them in the training features.  

In [10]:
num_features = [
    "duration","src_bytes",
    "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
    "logged_in","num_compromised","root_shell","su_attempted","num_root",
    "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
    "is_host_login","is_guest_login","count","srv_count","serror_rate",
    "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
    "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
    "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
    "dst_host_rerror_rate","dst_host_srv_rerror_rate"
]
features = kdd_data_10percent[num_features].astype(float)
features.describe()

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,...,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
count,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,...,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0
mean,47.979302,3025.61,868.5324,4.5e-05,0.006433,1.4e-05,0.034519,0.000152,0.148247,0.010212,...,232.470778,188.66567,0.75378,0.030906,0.601935,0.006684,0.176754,0.176443,0.058118,0.057412
std,707.746472,988218.1,33040.0,0.006673,0.134805,0.00551,0.782103,0.01552,0.355345,1.798326,...,64.74538,106.040437,0.410781,0.109259,0.481309,0.042133,0.380593,0.380919,0.23059,0.23014
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,45.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,46.0,0.41,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,520.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,255.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
75%,0.0,1032.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,255.0,255.0,1.0,0.04,1.0,0.0,0.0,0.0,0.0,0.0
max,58329.0,693375600.0,5155468.0,1.0,3.0,3.0,30.0,5.0,1.0,884.0,...,255.0,255.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


As we mentioned, we are going to reduce the outputs to `normal` and `attack`. 

In [13]:
from sklearn.neighbors import KNeighborsClassifier
labels = kdd_data_10percent['label'].copy()
labels[labels!='normal.'] = 'attack.'
labels.value_counts()

  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)


attack.    396743
normal.     97278
Name: label, dtype: int64

## Feature scaling

We are going to use a lot of distance-based methods here. In order to avoid some features distances dominate others, we need to scale all of them.

In [17]:
from sklearn.preprocessing import MinMaxScaler
# features.apply(lambda x: MinMaxScaler().fit_transform(x))
features = MinMaxScaler().fit_transform(features)
features = pandas.DataFrame(features, columns=num_features)
features.describe()

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,...,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
count,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,...,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0,494021.0
mean,0.000823,4.363595e-06,0.000168,4.5e-05,0.002144,5e-06,0.001151,3e-05,0.148247,1.2e-05,...,0.91165,0.739865,0.75378,0.030906,0.601935,0.006684,0.176754,0.176443,0.058118,0.057412
std,0.012134,0.001425228,0.006409,0.006673,0.044935,0.001837,0.02607,0.003104,0.355345,0.002034,...,0.253903,0.415845,0.410781,0.109259,0.481309,0.042133,0.380593,0.380919,0.23059,0.23014
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,6.489989e-08,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.180392,0.41,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,7.499542e-07,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
75%,0.0,1.488371e-06,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,1.0,1.0,0.04,1.0,0.0,0.0,0.0,0.0,0.0
max,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


## Visualising data using Principal Components

By using Principal Component Analysis, we can reduce the dimensionality of our data and plot it into a two-dimensional space. The PCA will capture those dimensions with the maximum variance, reducing the information loss.  

In [7]:
# TODO

## Training a classifier

Following the idea that new attack types will be similar to known types, let's start by trying a k-nearest neighbours classifier. We must to avoid brute force comparisons in the Nxd space at all costs. Being N the number of samples in our data more than 400K, and d the number of features 38, we will end up with an unfeasible modeling process. For this reason we pass `algorithm = 'ball_tree'`. For more on kNN performance, check [here](http://scikit-learn.org/stable/modules/neighbors.html#choice-of-nearest-neighbors-algorithm).      

In [19]:
clf = KNeighborsClassifier(n_neighbors = 5, algorithm = 'ball_tree', leaf_size=500)
t0 = time()
clf.fit(features,labels)
tt = time()-t0
print("Classifier trained in %.3f seconds" % tt)

Classifier trained in 3470.312 seconds


Now let's try the classifier with the testing data. First we need to load the labeled test data. We wil also sample 10 percent of the entries. For that. we will take advantage of the `train_test_split` function in `sklearn`.       

In [20]:
test_data_file = os.path.join(cwd, "data/corrected")
kdd_data_corrected = pandas.read_csv(test_data_file, header=None, names = col_names)
kdd_data_corrected['label'].value_counts()

smurf.              164091
normal.              60593
neptune.             58001
snmpgetattack.        7741
mailbomb.             5000
guess_passwd.         4367
snmpguess.            2406
satan.                1633
warezmaster.          1602
back.                 1098
mscan.                1053
apache2.               794
processtable.          759
saint.                 736
portsweep.             354
ipsweep.               306
httptunnel.            158
pod.                    87
nmap.                   84
buffer_overflow.        22
multihop.               18
named.                  17
sendmail.               17
ps.                     16
xterm.                  13
rootkit.                13
teardrop.               12
xlock.                   9
land.                    9
xsnoop.                  4
ftp_write.               3
worm.                    2
perl.                    2
loadmodule.              2
phf.                     2
sqlattack.               2
udpstorm.                2
i

We can see that we have new attack labels. In any case, we will convert all of the to the `attack.` label.    

In [21]:
kdd_data_corrected['label'][kdd_data_corrected['label']!='normal.'] = 'attack.'
kdd_data_corrected['label'].value_counts()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


attack.    250436
normal.     60593
Name: label, dtype: int64

Again we select features and scale.  

In [23]:
from sklearn.cross_validation import train_test_split
kdd_data_corrected[num_features] = kdd_data_corrected[num_features].astype(float)
kdd_data_corrected[num_features] = MinMaxScaler().fit_transform(kdd_data_corrected[num_features])
# kdd_data_corrected[num_features].apply(lambda x: MinMaxScaler().fit_transform(x))

Now we can sample the 10 percent of the test data (after we scale it). Although we also get training data, we don't need it in this case.  

In [24]:
features_train, features_test, labels_train, labels_test = train_test_split(
    kdd_data_corrected[num_features], 
    kdd_data_corrected['label'], 
    test_size=0.1, 
    random_state=42)

Now, do predictions using our classifier and the test data. kNN classifiers are slow compared to other methods due to all the comparisons required in order to make predictions.  

In [26]:
t0 = time()
pred = clf.predict(features_test)
tt = time() - t0
print("Predicted in %.3f seconds" % tt)

Predicted in 608.156 seconds


That took a lot of time. Actually, the more training data we use with a k-means classifier, the slower it gets to predict. It needs to compare the new data with all the points. Definitively we want some centroid-based classifier if we plan to use it in real-time detection.  

And finally, calculate the R squared value using the test labels.  

In [28]:
from sklearn.metrics import accuracy_score
acc = accuracy_score(pred, labels_test)
print("R squared is %.4f." % acc)

R squared is 0.9200.


## Clustering

So finally, let's try our anomaly detection approach in the reduced dataset. We will start by doing **k-means clustering**. Once we have the cluster centers, we will use them to determine the labels of the test data (unlabeled).  

Based on the assumption that new attack types will resemble old type, we will be able to detect those. Moreover, anything that falls too far from any cluster, will be considered anomalous and therefore a possible attack.  

### KMeans clustering

In [15]:
from sklearn.cluster import KMeans
k = 30
km = KMeans(n_clusters = k)

In [16]:
t0 = time()
km.fit(features)
tt = time()-t0
print "Clustered in {} seconds".format(round(tt,3))

Clustered in 331.394 seconds


Now we can check cluster sizes.  

In [18]:
pandas.Series(km.labels_).value_counts()

0     262807
1      48555
15     38319
5      24427
3      20528
12     19508
28     17879
8      11524
26      9162
7       4941
17      4272
10      4215
25      3996
13      3513
2       2845
29      1951
11      1686
4       1640
24      1557
9       1341
14      1239
16      1230
6       1201
21      1116
18      1020
19       970
23       949
27       775
20       680
22       175
dtype: int64

Get labels for each cluster. Here, we go back to use the complete set of labels.    

In [19]:
labels = kdd_data_10percent['label']
label_names = map(
    lambda x: pandas.Series([labels[i] for i in range(len(km.labels_)) if km.labels_[i]==x]), 
    range(k))

Print labels for each cluster.  

In [20]:
for i in range(k):
    print "Cluster {} labels:".format(i)
    print label_names[i].value_counts()
    print

Cluster 0 labels:
smurf.     262805
normal.         2
dtype: int64

Cluster 1 labels:
neptune.      48551
portsweep.        4
dtype: int64

Cluster 2 labels:
normal.    2845
dtype: int64

Cluster 3 labels:
neptune.      20456
portsweep.       58
satan.           10
normal.           4
dtype: int64

Cluster 4 labels:
normal.      1510
pod.           60
smurf.         36
satan.         22
teardrop.       7
rootkit.        3
spy.            1
nmap.           1
dtype: int64

Cluster 5 labels:
normal.    23165
back.       1258
phf.           3
satan.         1
dtype: int64

Cluster 6 labels:
normal.    1201
dtype: int64

Cluster 7 labels:
normal.         4873
warezclient.      52
rootkit.           4
satan.             4
perl.              3
ipsweep.           2
loadmodule.        1
spy.               1
imap.              1
dtype: int64

Cluster 8 labels:
normal.    11424
smurf.        98
nmap.          2
dtype: int64

Cluster 9 labels:
normal.       1230
satan.         109
teardrop.       

### Cluster description  

We can see that, in most clusters, there is a dominant label. It would be interesting to go cluster by cluster and analyise mayority labels, or how labels are split between different clusters (some with more dominance than others). All that would help us understand each type of attack! This is also a benefit of using a clustering-based approach.  

#### TODO:
- Get dominant labels
- Analyse cluster centers, specially for heterogeneous clusters containing `normal`. This will discover conflictive interactions.  

### Predictions

We can now predict using our test data.  

In [21]:
t0 = time()
pred = km.predict(kdd_data_corrected[num_features])
tt = time() - t0
print "Assigned clusters in {} seconds".format(round(tt,3))

Assigned clusters in 0.693 seconds


We can see that the assignment process is much faster than the prediction process with our kNN. But we still need to assign labels.  

In [22]:
# TODO: get mayority label for each cluster assignment (we have labels from the previous step)

In [23]:
# TODO: check these labels with those in the corrected test data in order to calculate accuracy

## Using the complete dataset with Spark

The script [KDDCup99.py](KDDCup99.py) runds through a series of steps to perform k-means clustering over the complete dataset using `PySpark` with different K values in order to find the best one.  

The clustering results are stored in a `CSV` file. This file is very convenient for visualisation purposes. It would be very hard to cluster and visualise results of the complete dataset using `Scikit-learn`.  

The following chart depicts the **first two pincipal components** for the clustering results.  

![](clusters.png)

Remember that we have up to 24 different labels in our complete dataset. However we have generated up to 80 different clusters. As a result of this, some of the clusters appear very close in the first principal component. This is due to the variability of interactions for a given type of attack (or label).   

In [1]:
# TODO: follow the same approach for label assignment in the test data as before

### Clustering using Spark

In order to show how we use `Spark` to do k-means clustering in our dataset, let's perform here a single clustering run with the complete dataset, for a K value of 80 that showed to be particulary good.

In [1]:
# Some imports we will use
from collections import OrderedDict
from time import time

First we need to load the data, using the complete dataset file stored in NFS.

In [2]:
data_file = "/nfs/data/KDD99/kddcup.data"
raw_data = sc.textFile(data_file)

As a warm up, let's count the number of interactions by label.

In [3]:
# count by all different labels and print them decreasingly
print "Counting all different labels"
labels = raw_data.map(lambda line: line.strip().split(",")[-1])

t0 = time()
label_counts = labels.countByValue()
tt = time()-t0

sorted_labels = OrderedDict(sorted(label_counts.items(), key=lambda t: t[1], reverse=True))
for label, count in sorted_labels.items():
    print label, count
    
print "Counted in {} seconds".format(round(tt,3))

Counting all different labels
smurf. 2807886
neptune. 1072017
normal. 972781
satan. 15892
ipsweep. 12481
portsweep. 10413
nmap. 2316
back. 2203
warezclient. 1020
teardrop. 979
pod. 264
guess_passwd. 53
buffer_overflow. 30
land. 21
warezmaster. 20
imap. 12
rootkit. 10
loadmodule. 9
ftp_write. 8
multihop. 7
phf. 4
perl. 3
spy. 2
Counted in 9.12 seconds


Now we prepare the data for clustering input. The data contains non-numeric features, and we want to exclude them since k-means works just with numeric features. These are the first three and the last column in each data row that is the label.  
In order to do that, we define a function that we apply to the *RDD* as a `Spark` **transformation** by using `map`. Remember that we can apply as many transofmrations as we want without making `Spark` start any processing. Is is when we trigger an action when all the transformations are applied.    

In [None]:
def parse_interaction(line):
    """
    Parses a network data interaction.
    """
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))

parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()

Additionally, we have used `cache` in order to keep the results at hand once they are calculated. 

We will also standardise our data as we have done so far when performing distance-based clustering.

In [8]:
from pyspark.mllib.feature import StandardScaler
standardizer = StandardScaler(True, True)
t0 = time()
standardizer_model = standardizer.fit(parsed_data_values)
tt = time() - t0
standardized_data_values = standardizer_model.transform(parsed_data_values)
print "Data standardized in {} seconds".format(round(tt,3))

Data standardized in 9.54 seconds


We can now perform k-means clustering.  

In [12]:
from pyspark.mllib.clustering import KMeans
t0 = time()
clusters = KMeans.train(standardized_data_values, 80, 
                        maxIterations=10, runs=5, 
                        initializationMode="random")
tt = time() - t0
print "Data clustered in {} seconds".format(round(tt,3))

Data clustered in 137.496 seconds


Once we have our clusters, we can use them to label test data and test accuracy.

In [13]:
# TODO