In [1]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import csv
import sys
from datetime import datetime
from time import time
import numpy as np
import ast, csv, warnings, os, pickle, sys

warnings.filterwarnings('ignore')
%load_ext memory_profiler

In [2]:
sc =SparkContext()
sqlcontext = SQLContext(sc) 
#spark 2.1.0
#sqlcontext.setConf("spark.sql.shuffle.partitions", "10")

In [3]:
#Dummy Data
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

data_file = "/idn/home/lwang27/workspace/rNd/kddcup.data.gz"
raw_data = sc.textFile(data_file) #rdd

print "Train data size is {}".format(raw_data.count()) #4,898,431

ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "/idn/home/lwang27/workspace/rNd/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

%memit print "Test data size is {}".format(test_raw_data.count())#311,029

Train data size is 4898431
Test data size is 311029
peak memory: 52.30 MiB, increment: 0.23 MiB


# Data Introduction:
detecting network attacks
features:
duration	protocol_type	service	flag	src_bytes	dst_bytes	land	wrong_fragment	urgent	hot	num_failed_logins	logged_in	num_compromised	root_shell	su_attempted	num_root	num_file_creations	num_shells	num_access_files	num_outbound_cmds	is_host_login	is_guest_login	count	srv_count	serror_rate	srv_serror_rate	rerror_rate	srv_rerror_rate	same_srv_rate	diff_srv_rate	srv_diff_host_rate	dst_host_count	dst_host_srv_count	dst_host_same_srv_rate	dst_host_diff_srv_rate	dst_host_same_src_port_rate	dst_host_srv_diff_host_rate	dst_host_serror_rate	dst_host_srv_serror_rate	dst_host_rerror_rate	dst_host_srv_rerror_rate

In [4]:
raw_data.take(1) #42 columns

[u'0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.']

In [5]:
# Prepare data
from pyspark.ml.linalg import Vectors
from numpy import array
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41] #continuous features
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return (attack, Vectors.dense(array([float(x) for x in clean_line_split])))

training_data = raw_data.map(parse_interaction)

In [6]:
#spark.ml API support for dataframes and ML pipelines.
#use of dataframe metadata to distinguish continous and categorical features.
from pyspark.ml.feature import StringIndexer
df = training_data.toDF() #select the first 10000 rows
#df = training_data.toDF()
df = df.withColumnRenamed("_1", "label")
df = df.withColumnRenamed("_2", "features")
#stringIndexer = StringIndexer(inputCol = "label", outputCol = "indexed") 
##stringIndexer: encodes a string column of labels to a column of label indices.
#si_model = stringIndexer.fit(df)
#td = si_model.transform(df)

In [7]:
df1 = df.repartition(10)

In [8]:
df1.rdd.getNumPartitions()

10

In [9]:
# #Build Model using RandomForest
from pyspark.ml.regression import RandomForestRegressor
t0 = time()
rf = RandomForestRegressor(numTrees = 3,maxDepth=3, seed = 42)
model_rf = rf.fit(df)
tt = time() - t0
print model_rf.featureImportances
print "RF training in {} seconds".format(np.round(tt,3)) 
## of features 38. 
#td_f = td.select('features').collect() 
#collect, return all the elements as an array
#print len(td_f[0].features

In [10]:
#Build model using GBT

from pyspark.ml.regression import GBTRegressor
t0 = time()
gbt = GBTRegressor(maxDepth=3, seed = 42)
model_gbt = gbt.fit(df1)
tt = time() - t0
#print model_gbt.featureImportances
print "GBT training in {} seconds".format(np.round(tt,3)) #640 seconds whole data

GBT training in 1399.916 seconds


In [11]:
#Grid search & cross validation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
t0 = time()
gbt = GBTRegressor()
paramGrid = ParamGridBuilder()\
            .addGrid(gbt.maxDepth, [1,2,3])\
            .addGrid(gbt.maxIter, [5, 20])\
            .build()
evaluator = RegressionEvaluator(metricName = "r2")

crossval = CrossValidator(estimator = gbt, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 3)
model_cv = crossval.fit(df)
tt = time() - t0
print "cross validation in {} seconds".format(np.round(tt,3))

In [12]:
#get model metrics
model_cv.avgMetrics
#Average cross-validation metrics for each paramMap 
#in CrossValidator.estimatorParamMaps, in the corresponding order.

In [13]:
#Prepare test data
test_data = test_raw_data.map(parse_interaction)

df_test = test_data.toDF()
df_test = df_test.withColumnRenamed("_1", "label")
df_test = df_test.withColumnRenamed("_2", "features")


In [14]:
# Make prediction based on best model
predictions = model_cv.transform(df_test.select("features"))

In [15]:
predictions.show()

In [16]:
sc.stop()