![GMV](https://www.gmv.com/export/system/modules/com.gmv.teresa.site/resources/theme/img/logo_gmv.svg)  ![Apache Spark](http://spark.apache.org/images/spark-logo.png)

# KDD99 Supervised Learning II

# Apache Spark Initialization

In [None]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="SecurityDataScience")

In [None]:
%matplotlib inline

## 0. Libraries

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import * 
sqlContext = SQLContext(sc)

## 1. Data Description

**Intrinsic attributes**

These attributes are extracted from the headers' area of the network packets.

Col|Feature name  | description |	type
---|--------------|-------------|------------
1  |duration 	  |length (number of seconds) of the connection |continuous
2  |protocol_type |type of the protocol, e.g. tcp, udp, etc. |discrete
3  |service 	  |network service on the destination, e.g., http, telnet, etc. |discrete
4  |flag 	      |normal or error status of the connection. The possible status are this: SF, S0, S1, S2, S3, OTH, REJ, RSTO, RSTOS0, SH, RSTRH, SHR 	|discrete 
5  |src_bytes 	  |number of data bytes from source to destination 	|continuous
6  |dst_bytes 	  |number of data bytes from destination to source 	|continuous
7  |land 	      |1 if connection is from/to the same host/port; 0 otherwise 	|discrete
8  |wrong_fragment|sum of bad checksum packets in a connection 	|continuous
9  |urgent 	      |number of urgent packets. Urgent packets are packets with the urgent bit activated 	|continuous


**Class attribute**

The 42nd attribute is the ***class_attack*** attribute, it indicates which type of connections is each instance: normal or which attack. The values it can take are the following: *anomaly, dict, dict_simple, eject, eject-fail, ffb, ffb_clear, format, format_clear, format-fail, ftp-write, guest, imap, land, load_clear, loadmodule, multihop, perl_clear, perlmagic, phf, rootkit, spy, syslog, teardrop, warez, warezclient, warezmaster, pod, back, ip- sweep, neptune, nmap, portsweep, satan, smurf and normal*.

** Categories of class attribute **


class_attack |Category
-------|--------------
smurf| dos
neptune| dos
back| dos
teardrop| dos
pod| dos
land| dos
normal|normal
satan|probe
ipsweep|probe
portsweep|probe
nmap|probe
warezclient|r2l
guess_passwd|r2l
warezmaster|r2l
imap|r2l
ftp_write|r2l
multihop|r2l
phf|r2l
spy|r2l
buffer_overflow|u2r
rootkit|u2r
loadmodule|u2r
perl|u2r

## 2. Load Data

In [None]:
textFileConn = sc.textFile('./data/KDD/KDDTrain+.txt', 4)


In [None]:
#Creating the schema

#we define the name of the columns

columnNames=["class_attack", "duration","protocol_type","service","flag","src_bytes","dst_bytes","land",
                 "wrong_fragment","urgent"]

In [None]:
#quick fields initialitation all for FloatType
connFields = [StructField(colName, FloatType(), True) for colName in columnNames]

In [None]:
#we proceed to modify the respective fields so that they reflect the correct data type:
connFields[0].dataType = StringType()
connFields[2].dataType = StringType()
connFields[3].dataType = StringType()
connFields[4].dataType = StringType()

In [None]:
# we can construct our schema, which we will use later below for building the data frame
connSchema = StructType(connFields)

In [None]:
#Parsing the file
def parseReg(p):
    return ( p[41]
            ,float(p[0])
            ,p[1], p[2], p[3] 
            ,float(p[4])
            ,float(p[5])
            ,float(p[6])
            ,float(p[7])
            ,float(p[8])
            )

In [None]:
connParsedFile = (textFileConn.map(lambda line: line.split(','))
                              .map(parseReg))

In [None]:
# We are now ready to build our data frame, using the connParsedFile RDD computed above and the schema 
# variable already calculated:
conn = sqlContext.createDataFrame(connParsedFile, connSchema)
conn.cache()

In [None]:
conn.take(3)

In [None]:
conn.limit(4).toPandas()

In [None]:
#get all the distint values of class_attack
conn.select("class_attack").distinct().toPandas()

## 3. Data Preparation

### 3.1 Encoding categorical features

In [None]:
from pyspark.sql import functions as F

In [None]:
def encodeCategorical(df, catName):
    #Encode the categorical variable in different columns foreach categories 
    #and the value is equal to 1 if the category is equal to column name and 0 otherwise. 
    #Finally drops the categorical variable
    
    categories = df.select(catName).distinct().toPandas()[catName]
    aux = df
    for c in categories:
        aux = aux.withColumn(c, F.when(df[catName] == c, 1).otherwise(0))
        
    return aux.drop(catName)

### Encoding *protocol_type*

In [None]:
conn.select("protocol_type").distinct().toPandas()

In [None]:
connEncoded = encodeCategorical(conn, "protocol_type")

In [None]:
connEncoded.limit(10).toPandas()

### Encoding *service*

In [None]:
connEncoded.select("service").distinct().toPandas()

In [None]:
connEncoded = encodeCategorical(connEncoded, "service")

In [None]:
connEncoded.limit(10).toPandas()

### Encoding *flag*

In [None]:
connEncoded.select("flag").distinct().toPandas()

In [None]:
connEncoded = encodeCategorical(connEncoded, "flag")

In [None]:
connEncoded.limit(10).toPandas()

###  Encoding *class_attack* (**label**) like Integers

In [None]:
connEncoded.select("class_attack").distinct().toPandas()

In [None]:
categories = connEncoded.select("class_attack").distinct().toPandas()["class_attack"]

In [None]:
dictCategories = dict((v,int(k)) for (k,v) in categories.to_dict().items())

In [None]:
from pyspark.sql.functions import udf

In [None]:
def categoriesToInt(cat):
    return dictCategories[cat]

udfCategoriesToInt = udf(categoriesToInt, IntegerType())

In [None]:
connEncoded = connEncoded.withColumn("class_attack", udfCategoriesToInt("class_attack") )

In [None]:
connEncoded.limit(10).toPandas()

### 3.2 Input Normalization

http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import DenseVector

In [None]:
connEncoded.limit(5).toPandas()

In [None]:
features = connEncoded.drop("class_attack")

In [None]:
features.limit(10).toPandas()

In [None]:
stats = features.describe().toPandas()

In [None]:
minValue = np.array(stats[stats.summary=="min"].values[0][1:], float)

In [None]:
maxValue = np.array(stats[stats.summary=="max"].values[0][1:], float)

In [None]:
def minMaxScaler(minV, maxV, row):
    return DenseVector([(row[i]-minV[i])/(maxV[i]-minV[i]) for i in range(len(row))])
    

In [None]:
labeledData = connEncoded.rdd.map(lambda x: (x[0], minMaxScaler(minValue, maxValue, x[1:])))

In [None]:
labeledDataFrame = sqlContext.createDataFrame(labeledData, ["label", "features"])

In [None]:
labeledDataFrame.limit(5).toPandas()

### 3.3 Principal Component Analysis (PCA)

In [None]:
from pyspark.ml.feature import PCA

In [None]:
v = labeledDataFrame.limit(1).toPandas()["features"].values[0]
n_features = len(v.array)
print("Total number of features: %d" %n_features)

In [None]:
def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).rdd.map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).rdd.map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

In [None]:
from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).rdd.map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

In [None]:
%%time
comp, score, eigVals = pca(labeledDataFrame)

In [None]:
varianceExplained = eigVals.cumsum()/eigVals.sum()

In [None]:
varianceExplained

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(1 - varianceExplained, drawstyle = 'steps-post')
plt.title('PCA Reconstruction Error');

In [None]:
n_factors = ((1 - varianceExplained) > 0.10).sum()
print("Number of factors with 10% of reonstraction Error: ", n_factors)

#### 3.2.1 Apache Spark Implementation

In [None]:
#Apache Spark API
pca = PCA(k = n_factors, inputCol="features", outputCol="pca_features")

In [None]:
%%time
pcaModel = pca.fit(labeledDataFrame)

In [None]:
pcaDataFrame = pcaModel.transform(labeledDataFrame).drop("features")

In [None]:
pcaDataFrame.limit(10).toPandas()

## 4. Logistic Regression

### 4.1 Data Preparation to Logistic Regression

In [None]:
# Sampling with replacement with the 30% of data
connSample = pcaDataFrame.sample(withReplacement = True, fraction = 0.30).cache()

In [None]:
connSample.limit(10).toPandas()

In [None]:
#Modify class_attack catagory to binary category 
connLR = connSample.withColumn('label', when(connSample['label'] == 4, 0.0).otherwise(1.0))
connLR.cache()

### 4.2 Training Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingDataLR, testDataLR) = connLR.randomSplit([0.7, 0.3])

In [None]:
lr = LogisticRegression(featuresCol="pca_features", labelCol= "label",
                        maxIter = 5, regParam = 0.1)

In [None]:
%%time
# Train model with trainingData.
lrModel = lr.fit(trainingDataLR)

In [None]:
print("Logistic Regression coefficients:" , lrModel.coefficients)
print("Logistic Regression intercept:", lrModel.intercept)

In [None]:
# Make predictions with testData.
predictionsLR = lrModel.transform(testDataLR)

In [None]:
# Select example rows to display.
predictionsLR.select("label", "prediction", "probability").limit(10).toPandas()

In [None]:
# Compute raw scores on the test set
predictionAndLabelsLR = predictionsLR.select("prediction", "label").rdd

In [None]:
# Instantiate Basic Metrics object
basicMetricsLR = MulticlassMetrics(predictionAndLabelsLR)

In [None]:
print("Summary Stats")
print("Weighted Precision = %s" % basicMetricsLR.weightedPrecision)
print("Weigthed Recall = %s" % basicMetricsLR.weightedRecall)
print("Weighted F1 Score = %s" % basicMetricsLR.weightedFMeasure())
print("Confusion Matrix:")
pd.DataFrame(basicMetricsLR.confusionMatrix().toArray())

In [None]:
# Instantiate Advanced Metrics object
advMetricsLR = BinaryClassificationMetrics(predictionAndLabelsLR)

In [None]:
# Area under precision-recall curve
print("Area under PR = %s" % advMetricsLR.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % advMetricsLR.areaUnderROC)

### 4.3 Model selection via Cross Validation to LogisticRegresion

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
lr = LogisticRegression(featuresCol="pca_features", labelCol= "label")

In [None]:
grid = ParamGridBuilder().addGrid(lr.regParam, np.power(10.0, range(-3, 4, 1))) \
                         .addGrid(lr.maxIter, range(5, 21, 5)) \
                         .build()

In [None]:
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

In [None]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

In [None]:
%%time 
cvModel = cv.fit(connLR)

In [None]:
%%time
evaluator.evaluate(cvModel.transform(connLR))

In [None]:
cvModel.bestModel

-----

## 5. Linear Support Vector Machines (SVM)

### 5.1 Data Preparation to SVM

In [None]:
# Sampling with replacement with the 30% of data
connSample = pcaDataFrame.sample(withReplacement = True, fraction = 0.30).cache()

In [None]:
connSample.limit(10).toPandas()

In [None]:
#Modify class_attack catagory to binary category 
connSVN = connSample.withColumn('label', when(connSample['label'] == 4, 0.0).otherwise(1.0))\
          .rdd.map(lambda row: LabeledPoint(row[0], row[1:]))
connSVN.cache()

### 5.2 Training SVN

In [None]:
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import Row

In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingDataSVM, testDataSVM) = connSVN.randomSplit([0.7, 0.3])

In [None]:
# Build the model
svm = SVMWithSGD.train(trainingDataSVM, iterations = 10)

In [None]:
# Evaluating the model on training data
labelsAndPredsSVM = testDataSVM.map(lambda p: Row(prediction = 1.0*svm.predict(p.features), label = p.label))

In [None]:
trainErr = labelsAndPredsSVM.filter(lambda r: r.label != r.prediction).count() / float(testDataSVM.count())
print("Training Error = " + str(trainErr))

In [None]:
# Instantiate Basic Metrics object
basicMetricsSVM =  MulticlassMetrics(labelsAndPredsSVM)

In [None]:
print("Summary Stats")
print("Weighted Precision = %s" % basicMetricsSVM.weightedPrecision)
print("Weighted Recall = %s" % basicMetricsSVM.weightedRecall)
print("Weighted F1 Score = %s" % basicMetricsSVM.weightedFMeasure())
print("Confusion Matrix:")
pd.DataFrame(basicMetricsSVM.confusionMatrix().toArray())