# MLlib: Basic Statistics and Exploratory Data Analysis

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

So far we have used different map and aggregation functions, on simple and key/value pair RDD's, in order to get simple statistics that help us understand our datasets. In this notebook we will introduce Spark's machine learning library [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html) through its basic statistics functionality in order to better understand our dataset. We will use the reduced 10-percent [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) datasets through the notebook.   

## Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.  

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
data_file = "../kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## Local vectors

A [local vector](https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector) is often used as a base type for RDDs in Spark MLlib. A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. 

For dense vectors, MLlib uses either Python *lists* or the *NumPy* `array` type. The later is recommended, so you can simply pass NumPy arrays around.  

For sparse vectors, users can construct a `SparseVector` object from MLlib or pass *SciPy* `scipy.sparse` column vectors if SciPy is available in their environment. The easiest way to create sparse vectors is to use the factory methods implemented in `Vectors`.  

### An RDD of dense vectors

Let's represent each network interaction in our dataset as a dense vector. For that we will use the *NumPy* `array` type.  

In [3]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

## Summary statistics

Spark's MLlib provides column summary statistics for `RDD[Vector]` through the function [`colStats`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.Statistics.colStats) available in [`Statistics`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.Statistics). The method returns an instance of [`MultivariateStatisticalSummary`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.MultivariateStatisticalSummary), which contains the column-wise *max*, *min*, *mean*, *variance*, and *number of nonzeros*, as well as the *total count*.  

In [4]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print ("Duration Statistics:")
print (" Mean: {}".format(round(summary.mean()[0],3)))
print (" St. deviation: {}".format(round(sqrt(summary.variance()[0]),3)))
print (" Max value: {}".format(round(summary.max()[0],3)))
print (" Min value: {}".format(round(summary.min()[0],3)))
print (" Total value count: {}".format(summary.count()))
print (" Number of non-zero values: {}".format(summary.numNonzeros()[0]))

Duration Statistics:
 Mean: 47.979
 St. deviation: 707.746
 Max value: 58329.0
 Min value: 0.0
 Total value count: 494021
 Number of non-zero values: 12350.0


### Summary statistics by label  

The interesting part of summary statistics, in our case, comes from being able to obtain them by the type of network attack or 'label' in our dataset. By doing so we will be able to better characterise our dataset dependent variable in terms of the independent variables range of values.  

If we want to do such a thing we could filter our RDD containing labels as keys and vectors as values. For that we just need to adapt our `parse_interaction` function to return a tuple with both elements.     

In [5]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)

The next step is not very sophisticated. We use `filter` on the RDD to leave out other labels but the one we want to gather statistics from.    

In [6]:
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.")

Now we can use the new RDD to call `colStats` on the values.  

In [7]:
normal_summary = Statistics.colStats(normal_label_data.values())

And collect the results as we did before.  

In [8]:
print ("Duration Statistics for label: {}".format("normal"))
print (" Mean: {}".format(normal_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print (" Max value: {}".format(round(normal_summary.max()[0],3)))
print (" Min value: {}".format(round(normal_summary.min()[0],3)))
print (" Total value count: {}".format(normal_summary.count()))
print (" Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


Instead of working with a key/value pair we could have just filter our raw data split using the label in column 41. Then we can parse the results as we did before. This will work as well. However having our data organised as key/value pairs will open the door to better manipulations. Since `values()` is a transformation on an RDD, and not an action, we don't perform any computation until we call `colStats` anyway.  

But lets wrap this within a function so we can reuse it with any label.

In [9]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

Let's give it a try with the "normal." label again.  

In [10]:
normal_sum = summary_by_label(raw_data, "normal.")

print ("Duration Statistics for label: {}".format("normal"))
print (" Mean: {}".format(normal_sum.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3)))
print (" Max value: {}".format(round(normal_sum.max()[0],3)))
print (" Min value: {}".format(round(normal_sum.min()[0],3)))
print (" Total value count: {}".format(normal_sum.count()))
print (" Number of non-zero values: {}".format(normal_sum.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


Let's try now with some network attack. We have all of them listed [here](http://kdd.ics.uci.edu/databases/kddcup99/training_attack_types).  

In [11]:
guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.")

print ("Duration Statistics for label: {}".format("guess_password"))
print (" Mean: {}".format(guess_passwd_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)))
print (" Max value: {}".format(round(guess_passwd_summary.max()[0],3)))
print (" Min value: {}".format(round(guess_passwd_summary.min()[0],3)))
print (" Total value count: {}".format(guess_passwd_summary.count()))
print (" Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0]))

Duration Statistics for label: guess_password
 Mean: 2.7169811320754715
 St. deviation: 11.88
 Max value: 60.0
 Min value: 0.0
 Total value count: 53
 Number of non-zero values: 4.0


We can see that this type of attack is shorter in duration than a normal interaction. We could build a table with duration statistics for each type of interaction in our dataset. First we need to get a list of labels as described in the first line [here](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names).      

In [12]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

Then we get a list of statistics for each label.  

In [13]:
stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list]

Now we get the *duration* column, first in our dataset (i.e. index 0).  

In [14]:
duration_by_label = [ 
    (stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) 
    for stat in stats_by_label]

That we can put into a Pandas data frame.  

In [15]:
import pandas as pd
pd.set_option('display.max_columns', 50)

stats_by_label_df = pd.DataFrame.from_items(duration_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

And print it.

In [16]:
print ("Duration statistics, by label")
stats_by_label_df

Duration statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In order to reuse this code and get a dataframe from any variable in our dataset we will define a function.  

In [17]:
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

Let's try for *duration* again.   

In [18]:
get_variable_stats_df(stats_by_label,0)

Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


Now for the next numeric column in the dataset, *src_bytes*.  

In [None]:
print ("src_bytes statistics, by label")
get_variable_stats_df(stats_by_label,1)

src_bytes statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,54156.355878,3159.36,13140.0,54540.0,2203.0
buffer_overflow.,1400.433333,1337.133,0.0,6274.0,30.0
ftp_write.,220.75,267.7476,0.0,676.0,8.0
guess_passwd.,125.339623,3.03786,104.0,126.0,53.0
imap.,347.583333,629.926,0.0,1492.0,12.0
ipsweep.,10.0834,5.231658,0.0,18.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,151.888889,127.7453,0.0,302.0,9.0
multihop.,435.142857,540.9604,0.0,1412.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


And so on. By reusing the `summary_by_label` and `get_variable_stats_df` functions we can perform some exploratory data analysis in large datasets with Spark.  

## Correlations

Spark's MLlib supports [Pearson’s](http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient) and [Spearman’s](http://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient) to calculate pairwise correlation methods among many series. Both of them are provided by the `corr` method in the `Statistics` package.  

We have two options as input. Either two `RDD[Double]`s or an `RDD[Vector]`. In the first case the output will be a `Double` value, while in the second a whole correlation Matrix. Due to the nature of our data, we will obtain the second.    

In [None]:
from pyspark.mllib.stat import Statistics 
correlation_matrix = Statistics.corr(vector_data, method="spearman")

Once we have the correlations ready, we can start inspecting their values.  

In [None]:
import pandas as pd
pd.set_option('display.max_columns', 50)

col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)

corr_df

We have used a *Pandas* `DataFrame` here to render the correlation matrix in a more comprehensive way. Now we want those variables that are highly correlated. For that we do a bit of dataframe manipulation.  

In [None]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

### Conclusions and possible model selection hints

The previous dataframe showed us which variables are highly correlated. We have kept just those variables with at least one strong correlation. We can use as we please, but a good way could be to do some model selection. That is, if we have a group of variables that are highly correlated, we can keep just one of them to represent the group under the assumption that they convey similar information as predictors. Reducing the number of variables will not improve our model accuracy, but it will make it easier to understand and also more efficient to compute.  

For example, from the description of the [KDD Cup 99 task](http://kdd.ics.uci.edu/databases/kddcup99/task.html) we know that the variable `dst_host_same_src_port_rate` references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated to `src_bytes` and `srv_count`. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We might decide not to include `dst_host_same_src_port_rate` in our model if we include the other two, as a way to reduce the number of variables and later one better interpret our models.  

Later on, in those notebooks dedicated to build predictive models, we will make use of this information to build more interpretable models.   