In [1]:
from pyspark import SparkContext

In [1]:
data = "file:///home/lygbug666/workdir/spark-py-notebooks/kddcup.data_10_percent.gz"

In [2]:
raw_data = sc.textFile(data)

# Local vectors

## An RDD of dense vectors

In [3]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

In [9]:
vector_data.first()

array([  0.00000000e+00,   1.81000000e+02,   5.45000000e+03,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   8.00000000e+00,   8.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   1.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   9.00000000e+00,   9.00000000e+00,
         1.00000000e+00,   0.00000000e+00,   1.10000000e-01,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00])

# Summary statistics

Spark's MLlib provides column summary statistics for RDD[Vector] through the function colStats available in Statistics. The method returns an instance of MultivariateStatisticalSummary, which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

In [20]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 


# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print ("Duration Statistics:")
print (" Mean: {}".format(round(summary.mean()[0],3)))
print (" St. deviation: {}".format(round(sqrt(summary.variance()[0]),3)))
print (" Max value: {}".format(round(summary.max()[0],3)))
print (" Min value: {}".format(round(summary.min()[0],3)))
print (" Total value count: {}".format(summary.count()))
print (" Number of non-zero values: {}".format(summary.numNonzeros()[0]))

Duration Statistics:
 Mean: 47.979
 St. deviation: 707.746
 Max value: 58329.0
 Min value: 0.0
 Total value count: 494021
 Number of non-zero values: 12350.0


In [15]:
import numpy as np
from pyspark.mllib.stat import Statistics

# an RDD of Vectors
mat = sc.parallelize([np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])])

# Compute column summary statistics.
summary = Statistics.colStats(mat)
print(summary.mean())  # a dense vector containing the mean value for each column
print(summary.mean()[0])
print(summary.variance())  # column-wise variance
print(sqrt(summary.variance()[1]))
print(summary.numNonzeros())  # number of nonzeros in each column
print(summary.numNonzeros()[0])


[   2.   20.  200.]
2.0
[  1.00000000e+00   1.00000000e+02   1.00000000e+04]
10.0
[ 3.  3.  3.]
3.0


## Summary statistics by label

In [18]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)


The next step is not very sophisticated. We use filter on the RDD to leave out other labels but the one we want to gather statistics from.

In [21]:
normal_label_data = label_vector_data.filter(lambda x: x[0] == "normal.")
normal_summary = Statistics.colStats(normal_label_data.values())

In [22]:
print ("Duration Statistics for label: {}".format("normal"))
print (" Mean: {}".format(normal_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print (" Max value: {}".format(round(normal_summary.max()[0],3)))
print (" Min value: {}".format(round(normal_summary.min()[0],3)))
print (" Total value count: {}".format(normal_summary.count()))
print (" Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))

Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


But lets wrap this within a function so we can reuse it with any label.

In [23]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

normal_sum = summary_by_label(raw_data, "normal.")

print ("Duration Statistics for label: {}".format("normal"))
print (" Mean: {}".format(normal_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print (" Max value: {}".format(round(normal_summary.max()[0],3)))
print (" Min value: {}".format(round(normal_summary.min()[0],3)))
print (" Total value count: {}".format(normal_summary.count()))
print (" Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))


Duration Statistics for label: normal
 Mean: 216.65732231336938
 St. deviation: 1359.213
 Max value: 58329.0
 Min value: 0.0
 Total value count: 97278
 Number of non-zero values: 11690.0


Let's try now with some network attack.

We can see that this type of attack is shorter in duration than a normal interaction.

In [25]:
guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.")

print ("Duration Statistics for label: {}".format("guess_passwd"))
print (" Mean: {}".format(guess_passwd_summary.mean()[0],3))
print (" St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)))
print (" Max value: {}".format(round(guess_passwd_summary.max()[0],3)))
print (" Min value: {}".format(round(guess_passwd_summary.min()[0],3)))
print (" Total value count: {}".format(guess_passwd_summary.count()))
print (" Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0]))

Duration Statistics for label: guess_passwd
 Mean: 2.7169811320754715
 St. deviation: 11.88
 Max value: 60.0
 Min value: 0.0
 Total value count: 53
 Number of non-zero values: 4.0


We could build a table with duration statistics for each type of interaction in our dataset. First we need to get a list of labels as described in the first line here.

In [27]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

Then we get a list of statistics for each label.

In [28]:
stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list]

Now we get the duration column, first in our dataset (i.e. index 0).

In [33]:
duration_by_label = [(stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), 
                                float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) 
                                for stat in stats_by_label]

In [35]:
import pandas as pd
pd.set_option('display.max_columns', 50)

stats_by_label_df = pd.DataFrame.from_items(duration_by_label, 
                                            columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')



print ("Duration statistics, by label")
stats_by_label_df

Duration statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In order to reuse this code and get a dataframe from any variable in our dataset we will define a function.

In [36]:
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), 
                            float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
                            for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

In [37]:
get_variable_stats_df(stats_by_label,0)

Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,0.128915,1.110062,0.0,14.0,2203.0
buffer_overflow.,91.7,97.514685,0.0,321.0,30.0
ftp_write.,32.375,47.449033,0.0,134.0,8.0
guess_passwd.,2.716981,11.879811,0.0,60.0,53.0
imap.,6.0,14.17424,0.0,41.0,12.0
ipsweep.,0.034483,0.438439,0.0,7.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,36.222222,41.408869,0.0,103.0,9.0
multihop.,184.0,253.851006,0.0,718.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


In [38]:
print ("src_bytes statistics, by label")
get_variable_stats_df(stats_by_label,1)


src_bytes statistics, by label


Unnamed: 0,Mean,Std Dev,Min,Max,Count
back.,54156.355878,3159.36,13140.0,54540.0,2203.0
buffer_overflow.,1400.433333,1337.133,0.0,6274.0,30.0
ftp_write.,220.75,267.7476,0.0,676.0,8.0
guess_passwd.,125.339623,3.03786,104.0,126.0,53.0
imap.,347.583333,629.926,0.0,1492.0,12.0
ipsweep.,10.0834,5.231658,0.0,18.0,1247.0
land.,0.0,0.0,0.0,0.0,21.0
loadmodule.,151.888889,127.7453,0.0,302.0,9.0
multihop.,435.142857,540.9604,0.0,1412.0,7.0
neptune.,0.0,0.0,0.0,0.0,107201.0


And so on. By reusing the summary_by_label and get_variable_stats_df functions we can perform some exploratory data analysis in large datasets with Spark.

# Correlations

In [4]:
from pyspark.mllib.stat import Statistics
import pandas as pd

correlation_matrix = Statistics.corr(vector_data, method="spearman")

pd.set_option('display.max_columns', 50)

# col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
#              "urgent","hot","num_failed_logins","logged_in","num_compromised",
#             "root_shell","su_attempted","num_root","num_file_creations",
#             "num_shells","num_access_files","num_outbound_cmds",
#             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
#             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
#             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
#             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
#             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
#             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]
col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
            "urgent","hot","num_failed_logins","logged_in","num_compromised",
            "root_shell"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)

corr_df

Py4JJavaError: An error occurred while calling o30.corr.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.PartitionedPairBuffer.growArray(PartitionedPairBuffer.scala:67)
	at org.apache.spark.util.collection.PartitionedPairBuffer.insert(PartitionedPairBuffer.scala:48)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.ZippedWithIndexRDD.compute(ZippedWithIndexRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numCols(RowMatrix.scala:61)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:331)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelationMatrix(PearsonCorrelation.scala:49)
	at org.apache.spark.mllib.stat.correlation.SpearmanCorrelation$.computeCorrelationMatrix(SpearmanCorrelation.scala:90)
	at org.apache.spark.mllib.stat.correlation.Correlations$.corrMatrix(Correlation.scala:66)
	at org.apache.spark.mllib.stat.Statistics$.corr(Statistics.scala:74)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.corr(PythonMLLibAPI.scala:846)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.PartitionedPairBuffer.growArray(PartitionedPairBuffer.scala:67)
	at org.apache.spark.util.collection.PartitionedPairBuffer.insert(PartitionedPairBuffer.scala:48)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.ZippedWithIndexRDD.compute(ZippedWithIndexRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51138)
Traceback (most recent call last):
  File "/home/lygbug666/anaconda3/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/lygbug666/anaconda3/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/home/lygbug666/anaconda3/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/lygbug666/anaconda3/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/home/lygbug666/software/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/home/lygbug666/software/spark/python/pyspark/serializers.py", line 581, in read_int
    raise EOFError
EOFError
----------------------------------------


We have used a Pandas DataFrame here to render the correlation matrix in a more comprehensive way. Now we want those variables that are highly correlated. For that we do a bit of dataframe manipulation.

In [5]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

NameError: name 'corr_df' is not defined

# Conclusions and possible model selection hints

The previous dataframe showed us which variables are highly correlated. We have kept just those variables with at least one strong correlation. We can use as we please, but a good way could be to do some model selection. That is, if we have a group of variables that are highly correlated, we can keep just one of them to represent the group under the assumption that they convey similar information as predictors. Reducing the number of variables will not improve our model accuracy, but it will make it easier to understand and also more efficient to compute.

For example, from the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We might decide not to include dst_host_same_src_port_rate in our model if we include the other two, as a way to reduce the number of variables and later one better interpret our models.

Later on, in those notebooks dedicated to build predictive models, we will make use of this information to build more interpretable models.