## Run the cell 1, cell 2 only the first time you execute this notebook

In [2]:
%sh
wget -P /tmp "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz"
gunzip /tmp/kddcup.data.gz

In [3]:
localkddData = "file:/tmp/kddcup.data"
dbutils.fs.mkdirs("dbfs:/datasets")
dbutils.fs.cp(localkddData, "dbfs:/datasets/")
display(dbutils.fs.ls("dbfs:/datasets"))

In [4]:
import numpy
import datetime
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

Creating the schema for the kddcup data

In [6]:
#Schema for the data
kddSchema = StructType([StructField('Duration', IntegerType(), True),
                       StructField('ProtocolType', StringType(), True),
                       StructField('Service', StringType(), True),
                       StructField('Flag', StringType(), True),
                       StructField('SrcBytes',IntegerType(),True),
                       StructField('DstBytes',IntegerType(),True),
                       StructField('Land',IntegerType(), True),
                       StructField('WrongFragment', IntegerType(), True),
                       StructField('Urgent', IntegerType(), True),
                       StructField('Hot', IntegerType(), True),
                       StructField('NumFailedLogins', IntegerType(), True),
                       StructField('LoggedIn', IntegerType(), True),
                       StructField('NumCompromised', IntegerType(), True),
                       StructField('RootShell', IntegerType(), True),
                       StructField('SuAttempted', IntegerType(), True),
                       StructField('NumRoot', IntegerType(), True),
                       StructField('NumFileCreations', IntegerType(), True),
                       StructField('NumShells', IntegerType(), True),
                       StructField('NumAccessFiles', IntegerType(), True),
                       StructField('NumOutboundCmds', IntegerType(), True),
                       StructField('IsHostLogin', IntegerType(), True),
                       StructField('IsGuestLogin', IntegerType(), True),
                       StructField('Count', IntegerType(), True),
                       StructField('SrvCount', IntegerType(), True),
                       StructField('SerrorRate', DoubleType(), True),
                       StructField('SrvSerrorRate', DoubleType(), True),
                       StructField('RerrorRate', DoubleType(), True),
                       StructField('SrvRerrorRate', DoubleType(), True),
                       StructField('SameSrvRate', DoubleType(), True),
                       StructField('DiffSrvRate', DoubleType(), True),
                       StructField('SrvDiffHostRate', DoubleType(), True),
                       StructField('DstHostCount', IntegerType(), True),
                       StructField('DstHostSrvCount', IntegerType(), True),
                       StructField('DstHostSameSrvRate', DoubleType(), True),
                       StructField('DstHostDiffSrvRate', DoubleType(), True),
                       StructField('DstHostSameSrcPortRate', DoubleType(), True),
                       StructField('DstHostSrvDiffHostRate', DoubleType(), True),
                       StructField('DstHostSerrorRate', DoubleType(), True),
                       StructField('DstHostSrvSerrorRate', DoubleType(), True),
                       StructField('DstHostRerrorRate', DoubleType(), True),
                       StructField('DstHostSrvRerrorRate', DoubleType(), True),
                       StructField('AttackType', StringType(), True)
                       ])

Load the kddcup data from Databricks file system into a dataframe by specifying the schema

In [8]:
#Reading the dataframe from the hdfs file
kddDF= spark.read.csv('dbfs:/datasets/kddcup.data',header=False, schema=kddSchema)
kddDF.createOrReplaceTempView("kddVIEW")

kddDF = spark.table("kddVIEW")
trainData, testData = kddDF.randomSplit([0.7,0.3], 24)

Transform the set of numerical features on log scale

In [10]:
#Transforming the features on log scale
toTransformFeatures = ['SrcBytes','DstBytes','Duration','Hot','NumFailedLogins','NumCompromised','NumRoot','NumFileCreations','NumAccessFiles','Count','SrvCount','DstHostCount','DstHostSrvCount']
trainReplacedDF = trainData.replace(0,1,toTransformFeatures)
testReplacedDF = testData.replace(0,1,toTransformFeatures)
for feature in toTransformFeatures:
  trainReplacedDF = trainReplacedDF.withColumn(feature, log(trainReplacedDF[feature]))
  testReplacedDF = testReplacedDF.withColumn(feature, log(testReplacedDF[feature]))

Index and encode categorical features

In [12]:
#Indexing and Encoding Categorical features
pipeLineStages = []
toIndexColumns = toEncodeColumns = ["ProtocolType", "Service", "Flag", "AttackType"]

for column in toIndexColumns:
  currentIndexer = column+"_Indexer"
  currentIndexer = StringIndexer(inputCol=column, outputCol=column+"_index")
  pipeLineStages.append(currentIndexer)

for column in toEncodeColumns:
  currentEncoder = column+"_HotEncoder"
  currentEncoder = OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec")
  pipeLineStages.append(currentEncoder)

Creating a list of all the columns that are unimportant for the predictions

In [14]:
#Creating a list of all the unwanted indexed and categorical features
indexedAndCategoricalFeatures = ['ProtocolType','Service','Flag','AttackType','AttackType_vec','ProtocolType_index','Service_index','Flag_index']

#Creating a list of all Highly Correlated Columns
highlyCorrelatedFeatures = ['SrvRerrorRate', 'DstHostSrvRerrorRate', 'DstHostRerrorRate', 'DstHostSameSrcPortRate', 'DstHostSameSrvRate', 'DstHostSerrorRate', 'DstHostSrvSerrorRate', 'SrvSerrorRate', 'SameSrvRate']

#Creating a list containing the target feature
target_column = ['AttackType_index']

#Creating a list of all unimportant features by getting the list of unwanted features ran by the python proram by my team members
unimportantFeatures = ['Urgent','NumFailedLogins','NumCompromised','SuAttempted','NumAccessFiles','NumOutboundCmds','IsHostLogin','SrvSerrorRate','SrvRerrorRate','SameSrvRate','DstHostSameSrvRate','DstHostSameSrcPortRate','DstHostSerrorRate','DstHostSrvSerrorRate','DstHostRerrorRate','DstHostSrvRerrorRate']

unwanted_columns = indexedAndCategoricalFeatures + highlyCorrelatedFeatures + target_column + unimportantFeatures

Creating a Vector of all the features using Vector Assembler

In [16]:
#Creating an assembler of all the features
feature_assembler = VectorAssembler(inputCols = [column for column in testReplacedDF.columns if column not in unwanted_columns], outputCol="features")

Declare the Decison Tree classifier, Random Forest classifier and KMeans model

In [18]:
#Declaring a decision tree classifier
decisionTree = DecisionTreeClassifier(labelCol='AttackType_index', featuresCol='features')
#Declaring a random forest classifier
randomForest = RandomForestClassifier(labelCol='AttackType_index', featuresCol='features', numTrees=5)
#Creating a kmeans model
kMeans = KMeans().setK(30).setSeed(2)

Create pipeline stages for all the prediction models

In [20]:
pipeLineStages.append(feature_assembler)

decisionTreePipeLineStages = list(pipeLineStages)
randomForestPipeLineStages = list(pipeLineStages)
kMeansPipeLineStages = list(pipeLineStages)

decisionTreePipeLineStages.append(decisionTree)
randomForestPipeLineStages.append(randomForest)
kMeansPipeLineStages.append(kMeans)

decisionTreePipeLine = Pipeline(stages = decisionTreePipeLineStages)
randomForestPipeLine = Pipeline(stages = randomForestPipeLineStages)
kMeansPipeLine = Pipeline(stages = kMeansPipeLineStages)

Create an evaluator, with accuracy as a Measure

In [22]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="AttackType_index", predictionCol="prediction", metricName="accuracy")

Run the Decision Tree model

In [24]:

#Creating a model by fitting the traindata with the pipeline
decisionTreeModel = decisionTreePipeLine.fit(trainReplacedDF)
#Making predictions by transforming the testdata
decisionTreePredictions = decisionTreeModel.transform(testReplacedDF)

decisionTreeAccuracy = evaluator.evaluate(decisionTreePredictions)
print  "Decision Tree Accuracy", decisionTreeAccuracy
print "Decision Tree Error", 1.0 - decisionTreeAccuracy

Run the Random Forest model

In [26]:
#Creating a model by fitting the traindata with the pipeline
randomForestModel = randomForestPipeLine.fit(trainReplacedDF)
#Making predictions by transforming the testdata
randomForestTreePredictions = randomForestModel.transform(testReplacedDF)

randomForestAccuracy = evaluator.evaluate(randomForestTreePredictions)
print  "Random Forest Accuracy", randomForestAccuracy
print "Random Forest Error", 1.0 - randomForestAccuracy

Run the kMeans model

In [28]:
#Creating a model by fitting the traindata with the pipeline
kMeansModel = kMeansPipeLine.fit(trainReplacedDF)
#Making predictions by transforming the testdata
kMeansPredictions = kMeansModel.transform(testReplacedDF)

kMeansPredictions.head()