In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Intrusion Detection').getOrCreate()

spark.conf.set('sparl.sql.shuffle.partitions', 2)

In [0]:
import urllib.request

# Retrieve our data
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
# Store our data into an RDD
rdd = sc.textFile('dbfs:/kdd/kddcup_data.gz')

In [0]:
# Convert the RDD into a DataFrame
df = rdd.map(lambda line: line.split(',')).toDF()
df.show(10)

In [0]:
# Extract the titles of categorical columns
cat_cols = ['_2', '_3', '_4', '_7', '_12', '_21', '_22']

In [0]:
# Extract the titles of numerical columns
num_cols = [x for x in df.columns[:-1] if x not in cat_cols]

In [0]:
# Encode the 'normal' class to 0 and all other 'attack' classes as 1
df = df.withColumn('_42', when(df._42 == 'normal.', 0).otherwise(1))
df = df.withColumnRenamed('_42', 'class')

In [0]:
# Convert all variable types in the numerical columns from string to floats
for x in num_cols:
  df = df.withColumn(x, df[x].cast(FloatType()))

In [0]:
from pyspark.ml.feature import StringIndexer

# Encode the categorical columns
for cat_col in cat_cols:
  string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col+'_encoded')
  df = string_indexer.fit(df).transform(df)

# Drop the original categorical columns
df = df.drop(*cat_cols)

In [0]:
from pyspark.ml.feature import VectorAssembler

# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=num_cols, outputCol='num_features')
df = assembler.transform(df)

In [0]:
from pyspark.ml.feature import StandardScaler

# Standardize the numerical features
standardScaler = StandardScaler(inputCol='num_features', outputCol='scaled_num_features')
df = standardScaler.fit(df).transform(df)

In [0]:
# Assemble numerical features and categorical features together into a single vector
cat_cols = [x + '_encoded' for x in cat_cols]
feat_cols = cat_cols.copy()
feat_cols.append('num_features')

assembler = VectorAssembler(inputCols=feat_cols, outputCol='all_features')
df = assembler.transform(df)

In [0]:
# Show the preprossed data
df.show(10)

In [0]:
# Split our data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

In [0]:
# Check if the classes are balanced or not
dataset_size = float(train.select('class').count())
num_positives = train.select('class').where('class == 1').count()
per_ones = (float(num_positives)/float(dataset_size))*100
num_negatives = float(dataset_size-num_positives)
print('The number of ones are {}'.format(num_positives))
print('Percentage of ones are {}'.format(per_ones))

In [0]:
# Handle the imbalance by giving the minority classes a higher weight, and doing the otherwise for the majority class
balancing_ratio = num_negatives/dataset_size
print('Balancing ratio = {}'.format(balancing_ratio))

train = train.withColumn('class_weights', when(train['class'] == 1, balancing_ratio).otherwise(1-balancing_ratio))
train.select('class_weights').show(5)

In [0]:
from pyspark.ml.feature import ChiSqSelector

# Feature selection using chi-square
selector = ChiSqSelector(featuresCol='all_features', outputCol='aspect', labelCol='class', fpr=0.05, numTopFeatures=17)
train = selector.fit(train).transform(train)
test = selector.fit(test).transform(test)
test.select('aspect').show(5, truncate=False)

In [0]:
from pyspark.ml.classification import LinearSVC

# Predict the class labels using Support Vector Machine
svc = LinearSVC(labelCol='class', featuresCol='aspect', weightCol='class_weights', maxIter=10)
model = svc.fit(train)
predict_train = model.transform(train)
predict_test = model.transform(test)
predict_test.select('class','prediction').show(5)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate the performance of our model using ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='class')
predict_test.select('class', 'rawPrediction', 'prediction').show(5)
print('The area under ROC for train set is {}'.format(evaluator.evaluate(predict_train)))
print('The area under ROC for test set is {}'.format(evaluator.evaluate(predict_test)))