In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Intrusion Detection').getOrCreate()

spark.conf.set('sparl.sql.shuffle.partitions', 2)

In [0]:
import urllib.request

# Retrieve our data
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
# Store our data into an RDD
rdd = sc.textFile('dbfs:/kdd/kddcup_data.gz')

In [0]:
# Convert the RDD into a DataFrame
df = rdd.map(lambda line: line.split(',')).toDF()
df.display()

_1,_2,_3,_4,_5,_6,_7,_8,_9,_10,_11,_12,_13,_14,_15,_16,_17,_18,_19,_20,_21,_22,_23,_24,_25,_26,_27,_28,_29,_30,_31,_32,_33,_34,_35,_36,_37,_38,_39,_40,_41,_42
0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,9,9,1.0,0.0,0.11,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,19,19,1.0,0.0,0.05,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,29,29,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.0,0.0,0.0,0.0,1.0,0.0,0.0,39,39,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.0,0.0,0.0,0.0,1.0,0.0,0.0,49,49,1.0,0.0,0.02,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.0,0.0,0.0,0.0,1.0,0.0,0.0,59,59,1.0,0.0,0.02,0.0,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,212,1940,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,2,0.0,0.0,0.0,0.0,1.0,0.0,1.0,1,69,1.0,0.0,1.0,0.04,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,159,4087,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,5,5,0.0,0.0,0.0,0.0,1.0,0.0,0.0,11,79,1.0,0.0,0.09,0.04,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,210,151,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,8,89,1.0,0.0,0.12,0.04,0.0,0.0,0.0,0.0,normal.
0,tcp,http,SF,212,786,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,8,99,1.0,0.0,0.12,0.05,0.0,0.0,0.0,0.0,normal.


In [0]:
# Extract titles of categorical columns
cat_cols = ['_2', '_3', '_4', '_7', '_12', '_21', '_22']

In [0]:
# Extract titles of numerical columns
num_cols = [x for x in df.columns[:-1] if x not in cat_cols]

In [0]:
# Encode the 'normal' class to 0 and all other 'attack' classes as 1
df = df.withColumn('_42', when(df._42 == 'normal.',0).otherwise(1))
df = df.withColumnRenamed('_42', 'class')

In [0]:
# Convert all variable types in numerical columns from string to floats
for x in num_cols:
  df = df.withColumn(x, df[x].cast(FloatType()))

In [0]:
from pyspark.ml.feature import StringIndexer

# Encode the categorical columns
for cat_col in cat_cols:
  string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col+'_encoded')
  df = string_indexer.fit(df).transform(df)

# Drop the original categorical columns
df = df.drop(*cat_cols)

In [0]:
from pyspark.ml.feature import VectorAssembler

# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=num_cols,outputCol='num_features')
df = assembler.transform(df)

In [0]:
from pyspark.ml.feature import StandardScaler

# Standardize the numerical features
standardScaler = StandardScaler(inputCol='num_features', outputCol='scaled_num_features')
df = standardScaler.fit(df).transform(df)

In [0]:
# Assemble numerical features and categorical features together into a single vector
cat_cols = [x + '_encoded' for x in cat_cols]
feat_cols = cat_cols.copy()
feat_cols.append('num_features')

assembler = VectorAssembler(inputCols=feat_cols,outputCol='all_features')
df = assembler.transform(df)

In [0]:
df.display()

_1,_5,_6,_8,_9,_10,_11,_13,_14,_15,_16,_17,_18,_19,_20,_23,_24,_25,_26,_27,_28,_29,_30,_31,_32,_33,_34,_35,_36,_37,_38,_39,_40,_41,class,_2_encoded,_3_encoded,_4_encoded,_7_encoded,_12_encoded,_21_encoded,_22_encoded,num_features,scaled_num_features,all_features
0.0,181.0,5450.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,9.0,9.0,1.0,0.0,0.11,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(181.0, 5450.0, 8.0, 8.0, 1.0, 9.0, 9.0, 1.0, 0.10999999940395355))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(1.8315795475795847E-4, 0.16495156759878016, 0.03753270996838467, 0.032477705818327186, 2.576061479419014, 0.1390060564670239, 0.08487328273976809, 2.434387312415174, 0.22854328921547365))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 181.0, 5450.0, 8.0, 8.0, 1.0, 9.0, 9.0, 1.0, 0.10999999940395355))"
0.0,239.0,486.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,19.0,19.0,1.0,0.0,0.05,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(239.0, 486.0, 8.0, 8.0, 1.0, 19.0, 19.0, 1.0, 0.05000000074505806))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(2.418494540726634E-4, 0.014709442541836176, 0.03753270996838467, 0.032477705818327186, 2.576061479419014, 0.29345723031927273, 0.1791769302283993, 2.434387312415174, 0.1038833153906453))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 239.0, 486.0, 8.0, 8.0, 1.0, 19.0, 19.0, 1.0, 0.05000000074505806))"
0.0,235.0,1337.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,29.0,29.0,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(235.0, 1337.0, 8.0, 8.0, 1.0, 29.0, 29.0, 1.0, 0.029999999329447746))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(2.378017644647527E-4, 0.040466100161388824, 0.03753270996838467, 0.032477705818327186, 2.576061479419014, 0.4479084041715215, 0.2734805777170305, 2.434387312415174, 0.06232998691241417))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 235.0, 1337.0, 8.0, 8.0, 1.0, 29.0, 29.0, 1.0, 0.029999999329447746))"
0.0,219.0,1337.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,6.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,39.0,39.0,1.0,0.0,0.03,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(219.0, 1337.0, 6.0, 6.0, 1.0, 39.0, 39.0, 1.0, 0.029999999329447746))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(2.2161100603310997E-4, 0.040466100161388824, 0.0281495324762885, 0.02435827936374539, 2.576061479419014, 0.6023595780237703, 0.36778422520566173, 2.434387312415174, 0.06232998691241417))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 219.0, 1337.0, 6.0, 6.0, 1.0, 39.0, 39.0, 1.0, 0.029999999329447746))"
0.0,217.0,2032.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,6.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,49.0,49.0,1.0,0.0,0.02,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(217.0, 2032.0, 6.0, 6.0, 1.0, 49.0, 49.0, 1.0, 0.019999999552965164))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(2.1958716122915463E-4, 0.06150120832306813, 0.0281495324762885, 0.02435827936374539, 2.576061479419014, 0.7568107518760191, 0.4620878726942929, 2.434387312415174, 0.04155332460827611))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 217.0, 2032.0, 6.0, 6.0, 1.0, 49.0, 49.0, 1.0, 0.019999999552965164))"
0.0,217.0,2032.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,6.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,59.0,59.0,1.0,0.0,0.02,0.0,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(217.0, 2032.0, 6.0, 6.0, 1.0, 59.0, 59.0, 1.0, 0.019999999552965164))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28), values -> List(2.1958716122915463E-4, 0.06150120832306813, 0.0281495324762885, 0.02435827936374539, 2.576061479419014, 0.9112619257282679, 0.5563915201829241, 2.434387312415174, 0.04155332460827611))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35), values -> List(1.0, 2.0, 1.0, 217.0, 2032.0, 6.0, 6.0, 1.0, 59.0, 59.0, 1.0, 0.019999999552965164))"
0.0,212.0,1940.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,1.0,69.0,1.0,0.0,1.0,0.04,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 23, 24, 25, 26, 28, 29), values -> List(212.0, 1940.0, 1.0, 2.0, 1.0, 1.0, 1.0, 69.0, 1.0, 1.0, 0.03999999910593033))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 23, 24, 25, 26, 28, 29), values -> List(2.1452754921926629E-4, 0.05871670479663, 0.0046915887460480836, 0.008119426454581797, 2.576061479419014, 7.022596807228778, 0.01544511738522488, 0.6506951676715553, 2.434387312415174, 2.077666276853266, 0.9493774082191798))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 30, 31, 32, 33, 35, 36), values -> List(1.0, 2.0, 1.0, 212.0, 1940.0, 1.0, 2.0, 1.0, 1.0, 1.0, 69.0, 1.0, 1.0, 0.03999999910593033))"
0.0,159.0,4087.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0,5.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,11.0,79.0,1.0,0.0,0.09,0.04,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(159.0, 4087.0, 5.0, 5.0, 1.0, 11.0, 79.0, 1.0, 0.09000000357627869, 0.03999999910593033))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(1.608956619144497E-4, 0.12369854252774579, 0.02345794373024042, 0.02029856613645449, 2.576061479419014, 0.16989629123747368, 0.7449988151601865, 2.434387312415174, 0.18698997234710754, 0.9493774082191798))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35, 36), values -> List(1.0, 2.0, 1.0, 159.0, 4087.0, 5.0, 5.0, 1.0, 11.0, 79.0, 1.0, 0.09000000357627869, 0.03999999910593033))"
0.0,210.0,151.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,8.0,89.0,1.0,0.0,0.12,0.04,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(210.0, 151.0, 8.0, 8.0, 1.0, 8.0, 89.0, 1.0, 0.11999999731779099, 0.03999999910593033))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(2.1250370441531092E-4, 0.004570217744479964, 0.03753270996838467, 0.032477705818327186, 2.576061479419014, 0.12356093908179903, 0.8393024626488178, 2.434387312415174, 0.24931994764965668, 0.9493774082191798))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 22, 23, 28, 31, 32, 33, 35, 36), values -> List(1.0, 2.0, 1.0, 210.0, 151.0, 8.0, 8.0, 1.0, 8.0, 89.0, 1.0, 0.11999999731779099, 0.03999999910593033))"
0.0,212.0,786.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.0,8.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,8.0,99.0,1.0,0.0,0.12,0.05,0.0,0.0,0.0,0.0,0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,"Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 5, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(212.0, 786.0, 1.0, 8.0, 8.0, 1.0, 8.0, 99.0, 1.0, 0.11999999731779099, 0.05000000074505806))","Map(vectorType -> sparse, length -> 34, indices -> List(1, 2, 5, 15, 16, 21, 24, 25, 26, 28, 29), values -> List(2.1452754921926629E-4, 0.023789345345438754, 1.278604652529553, 0.03753270996838467, 0.032477705818327186, 2.576061479419014, 0.12356093908179903, 0.933606110137449, 2.434387312415174, 0.24931994764965668, 1.1867218044828063))","Map(vectorType -> sparse, length -> 41, indices -> List(0, 1, 4, 8, 9, 12, 22, 23, 28, 31, 32, 33, 35, 36), values -> List(1.0, 2.0, 1.0, 212.0, 786.0, 1.0, 8.0, 8.0, 1.0, 8.0, 99.0, 1.0, 0.11999999731779099, 0.05000000074505806))"


In [0]:
# Split our data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

In [0]:
# Check if the classes are balanced or not
dataset_size = float(train.select('class').count())
num_positives = train.select('class').where('class == 1').count()
per_ones = (float(num_positives)/float(dataset_size))*100
num_negatives = float(dataset_size-num_positives)
print('The number of ones are {}'.format(num_positives))
print('Percentage of ones are {}'.format(per_ones))

In [0]:
# Handle the imbalance by giving the minority classes a higher weight, and otherwise for the majority class
balancing_ratio = num_negatives/dataset_size
print('Balancing ratio = {}'.format(balancing_ratio))

train = train.withColumn('class_weights', when(train['class'] == 1,balancing_ratio).otherwise(1-balancing_ratio))
train.select('class_weights').show(5)

In [0]:
from pyspark.ml.feature import ChiSqSelector

# Feature selection using chi-square
selector = ChiSqSelector(featuresCol='all_features',outputCol='aspect',labelCol='class',fpr=0.05, numTopFeatures=17)
train = selector.fit(train).transform(train)
test = selector.fit(test).transform(test)
test.select('aspect').show(5,truncate=False)

In [0]:
from pyspark.ml.classification import LinearSVC

# Predict the class labels using Support Vector Machine
svc = LinearSVC(labelCol='class',featuresCol='aspect',weightCol='class_weights',maxIter=10)
model = svc.fit(train)
predict_train = model.transform(train)
predict_test = model.transform(test)
predict_test.select('class','prediction').show(10)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate the performance of our model using ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='class')
predict_test.select('class','rawPrediction','prediction').show(5)
print('The area under ROC for train set is {}'.format(evaluator.evaluate(predict_train)))
print('The area under ROC for test set is {}'.format(evaluator.evaluate(predict_test)))