# MLlib: Clustering

[Introduction to Spark with Python, by Jose A. Dianes](http://jadianes.github.io/spark-py-notebooks)

In this notebook we will use Spark's machine learning library [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html) to perform **K-means Clustering** over our network attacks datasets. We will use the complete [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) datasets in order to test Spark capabilities with large datasets. 

Once we have our data clustered, we will use our knowledge of Spark SQL and data frames from previous notebooks in order to explore each of the obtained clusters in terms of its features.  

Finally we will see how a set of clusters can be used to detect network attacks. We will use our test data as new incoming data. We will assign each interaction to a cluster and, based on the most frequent tag on the cluster, we will decide if the new data is or not an attack. 

At the time of processing this notebook, our Spark cluster contains:  

- Eight nodes, with one of them acting as master and the rest as workers.  
- Each node contains 8Gb of RAM, with 6Gb being used for each node.  
- Each node has a 2.4Ghz Intel dual core processor.  
- Running Apache Spark 1.3.1.  

## Getting the data and creating the RDD

As we said, this time we will use the complete dataset provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally. Remember that the file must be accessible to every worker in our Spark cluster (in our case we use [NFS](https://en.wikipedia.org/wiki/Network_File_System)).    

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

In [2]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())

Train data size is 4898431


The [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) also provide test data that we will load in a separate RDD.  

In [3]:
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")

In [4]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())

Test data size is 311029


## Unsupervised learning with K-Means Clustering

Firs of all, we prepare the data for clustering input. The data contains non-numeric features, and we want to exclude them since k-means works just with numeric features. These are the first three and the last column in each data row that is the label. 

In order to do that, we define a function that we apply to the *RDD* as a `Spark` **transformation** by using `map`. The **action** that actually retrieves the data is `values`. Remember that we can apply as many transofmrations as we want without making `Spark` start any processing. Is is when we trigger an action when all the transformations are applied.  

In [5]:
from numpy import array

def parse_interaction(line):
    """
    Parses a network data interaction.
    """
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))

parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()

We will also standardise our data as we have done so far when performing distance-based clustering.

In [6]:
from time import time
from pyspark.mllib.feature import StandardScaler

standardizer = StandardScaler(True, True)

t0 = time()
standardizer_model = standardizer.fit(parsed_data_values)
tt = time() - t0

standardized_data_values = standardizer_model.transform(parsed_data_values)
print "Data standardized in {} seconds".format(round(tt,3))

Data standardized in 308.751 seconds


And now we are ready to perform k-means clustering. The call to `KMeans.train` triggers the clustering process and returns a list of cluster centroids as well as a model que can use to assign a point to a cluster by calling `predict`. This can be seen in the `error` function declared bellow. There are other parameters for the `train` function, that are explained in the [Spark reference](http://spark.apache.org/docs/latest/mllib-clustering.html#k-means). In our case we will set three of the, the maximum number of iterations for the algorithms, the number of runs, and how do we want to initialise the cluster centroids for the iterative clustering process. 

We also need to determine a good value for K (the number of clusters for k-means). We will do this visually. We will try with a range of values from 5 to 100, and plot their *Within Set Sum of Squared Errors* (the square root of the sum of squared ditances for each point in a cluster to its centroid) as follows. This process will take a long while (in the order of hours). Each time a new value of K has been tried you will see some output printed out. If you want to skip the whole process, you can change the list of values in the for loop to just the best value we have found after this block.

In [None]:
from pyspark.mllib.clustering import KMeans
from math import sqrt

clusters = {}
WSSSE = {}

# sample data so the process ends in a reasonable amount of time
# if you have a really powerful cluster, you can skip sampling
standardized_data_values_sample = standardized_data_values.sample(False, .2, 1234)

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point, clusters_broadcast):
    center = clusters_broadcast.value.centers[clusters_broadcast.value.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

    
for k_value in [5, 10, 25, 50, 80, 100]:    
    c_t0 = time()
    clusters[k_value] = KMeans.train(
            standardized_data_values_sample, 
            k_value, 
            maxIterations=10, 
            runs=4, 
            initializationMode="random"
    )
    c_tt = time() - c_t0

    # we need to broadcast this variable if we want to use it in a
    # Spark transformation (like the following map)
    w_t0 = time()
    current_clusters = sc.broadcast(clusters[k_value])
    WSSSE[k_value] = standardized_data_values_sample.map(lambda point: error(point, current_clusters)).reduce(lambda x, y: x + y)
    w_tt = time() - w_t0
    
    print "K={}, WSSSE={}, clustered in {} seconds, WSSSE calculated in {} seconds".format(
        k_value, WSSSE[k_value], round(c_tt,3), round(w_tt,3))


Now we build a [Pandas data frame](http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html) that we will use to plot the WSSSE values.

In [8]:
import pandas as pd

kmeans_results = pd.DataFrame(WSSSE, columns=['k','WSSSE'])

kmeans_results.plot()

TypeError: Empty 'DataFrame': no numeric data to plot

We know that WSSSE will decrease as far as we increase the number of clusters `k`, but at some point the benefit doesn't improve the clustering result regarding how well the centroids represent our sample distribution. In this case seems that after `k=80` the WSSSE value doesn't decrease at the same rate. We will use that k value for our further explorations.  

## Visualising clusters with the help of PCA

In this section we want to do two things. First we want to visualise the clusters in a two dimensional space. By doing so we will get a feeling of cluster sizes and proximity. The second thing we want to do is to find out the main discriminant variables in our dataset. [Principal Component Analysis](https://en.wikipedia.org/wiki/Principal_component_analysis) will help to get both things done.

## Explaining major clusters

We will end up this introduction to clustering with Spark by having a look at tag composition and feature values for the five bigger clusters.  