In [1]:
import os
import sys
from constants import SPARK_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.1-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

Note : Please download kddcup.data.gz from the following url http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz and place it under folder `datasets-mllib-datasets`.
Also copy http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz which will be used later.

For our demo we will be using 10 percent sample : http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz

In [2]:
import urllib

data_file = "../spark-mllib-datasets/kddcup.data_10_percent.gz"

raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())

Train data size is 494021


** Input Data Format **

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.

In [3]:
# load test data
test_data_file = "../spark-mllib-datasets/corrected.gz"
test_data_raw = sc.textFile(test_data_file)

print "Test data size is {}".format(test_data_raw.count())

Test data size is 311029


In [4]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # remove 1,2,3,41
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41] == 'normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))


training_data = raw_data.map(parse_interaction)
training_data.take(5)

[LabeledPoint(0.0, [0.0,181.0,5450.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,9.0,9.0,1.0,0.0,0.11,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,239.0,486.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,19.0,19.0,1.0,0.0,0.05,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,235.0,1337.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,29.0,29.0,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,219.0,1337.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,6.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,39.0,39.0,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,217.0,2032.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,6.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,49.0,49.0,1.0,0.0,0.02,0.0,0.0,0.0,0.0,0.0])]

In [5]:
# prepare test data
test_data = test_data_raw.map(parse_interaction)

In [7]:
# train classifier
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build logistic model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))

Classifier trained in 61.119 seconds


In [8]:
# Evaluating on new data
labels_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

In [9]:
# calculate the classification error
t0 = time() 
test_accuracy = labels_preds.filter(lambda (v, p): v == p).count() / float(test_data.count()) 
tt = time() - t0 

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 14.118 seconds. Test accuracy is 0.9019
