In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

#instantiate the spark session
spark = SparkSession.builder.appName("KDD").getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 2)

In [0]:
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
data = spark.sparkContext.textFile("dbfs:/kdd/kddcup_data.gz")
data.take(10)

In [0]:
data2=data.map(lambda x: x.split(","))
print("number of features-")
print(len(data2.take(1)[0]))
data2.take(1)

In [0]:
data3=data2.map(lambda x: [(x[0]),x[1],x[2],int(x[4]),int(x[5]),x[3],x[41]])
data3.take(10)


In [0]:
columns=["duration", "protocol_type", "service", "src_bytes", "dst_bytes", "flag","label"]
kdd=spark.createDataFrame(data3,columns)
kdd.printSchema()

In [0]:
display(kdd.limit(10))

duration,protocol_type,service,src_bytes,dst_bytes,flag,label
0,tcp,http,181,5450,SF,normal.
0,tcp,http,239,486,SF,normal.
0,tcp,http,235,1337,SF,normal.
0,tcp,http,219,1337,SF,normal.
0,tcp,http,217,2032,SF,normal.
0,tcp,http,217,2032,SF,normal.
0,tcp,http,212,1940,SF,normal.
0,tcp,http,159,4087,SF,normal.
0,tcp,http,210,151,SF,normal.
0,tcp,http,212,786,SF,normal.


In [0]:
# check the count of null values for each column
kdd.select([count(when(col(c).isNull(), c)).alias(c) for c in kdd.columns]).show()

In [0]:
kdd.select("protocol_type").distinct().count()

In [0]:
display(kdd.groupBy("protocol_type").count().sort("count"))

protocol_type,count
udp,20354
tcp,190065
icmp,283602


In [0]:
kdd.select("service").distinct().count()

In [0]:
display(kdd.groupBy("service").count().sort("count"))

service,count
pm_dump,1
red_i,1
tftp_u,1
tim_i,7
X11,11
urh_i,14
IRC,43
Z39_50,92
netstat,95
ctf,97


In [0]:
kdd.select("duration").distinct().count()

In [0]:
display(kdd.select("duration").agg({"duration": "avg"}))

avg(duration)
47.97930249928647


In [0]:
display(kdd.groupBy("duration").count().sort(col("count").desc()))

duration,count
0,481671
1,2476
2,870
3,625
5,554
2630,496
4,413
14,322
10,194
7,169


In [0]:
kdd.select("src_bytes").distinct().count()

In [0]:
display(kdd.select("src_bytes").agg({"src_bytes": "avg"}))

avg(src_bytes)
3025.6102959185946


In [0]:
display(kdd.groupBy("src_bytes").count().sort(col("count").desc()))

src_bytes,count
1032,228035
0,115342
520,52774
105,7370
147,2725
54540,2143
146,2033
42,1069
8,1045
28,984


In [0]:
kdd.select("dst_bytes").distinct().count()

In [0]:
display(kdd.select("dst_bytes").agg({"dst_bytes": "avg"}))

avg(dst_bytes)
868.5324247349809


In [0]:
display(kdd.groupBy("dst_bytes").count().sort(col("count").desc()))

dst_bytes,count
0,408258
105,4451
147,2501
146,2289
8314,2133
145,985
42,921
330,854
329,804
331,793


In [0]:
kdd.select("flag").distinct().count()

In [0]:
display(kdd.groupBy("flag").count().sort(col("count").desc()))

flag,count
SF,378440
S0,87007
REJ,26875
RSTR,903
RSTO,579
SH,107
S1,57
S2,24
RSTOS0,11
S3,10


In [0]:
kdd.select("label").distinct().count()

In [0]:
display(kdd.groupBy("label").count().sort(col("count").desc()))

label,count
smurf.,280790
neptune.,107201
normal.,97278
back.,2203
satan.,1589
ipsweep.,1247
portsweep.,1040
warezclient.,1020
teardrop.,979
pod.,264


In [0]:
from pyspark.sql.functions import when
kdd_final = kdd.withColumn("label", when(kdd["label"] == "normal.", "normal").otherwise("attack"))

In [0]:
display(kdd_final.groupBy("label").count().sort(col("count").desc()))

label,count
attack,396743
normal,97278


In [0]:
cols = kdd_final.columns
categoricalColumns = ["protocol_type", "service", "flag"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]


In [0]:
label_stringIdx = StringIndexer(inputCol="label", outputCol="labels")
stages += [label_stringIdx]

In [0]:
numericCols = ["duration", "src_bytes", "dst_bytes"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [0]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(kdd_final)
preppedDataDF = pipelineModel.transform(kdd_final)

In [0]:
display(preppedDataDF)

duration,protocol_type,service,src_bytes,dst_bytes,flag,label,protocol_typeIndex,protocol_typeclassVec,serviceIndex,serviceclassVec,flagIndex,flagclassVec,labels,features
0,tcp,http,181,5450,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 181.0, 5450.0))"
0,tcp,http,239,486,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 239.0, 486.0))"
0,tcp,http,235,1337,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 235.0, 1337.0))"
0,tcp,http,219,1337,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 219.0, 1337.0))"
0,tcp,http,217,2032,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 217.0, 2032.0))"
0,tcp,http,217,2032,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 217.0, 2032.0))"
0,tcp,http,212,1940,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 212.0, 1940.0))"
0,tcp,http,159,4087,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 159.0, 4087.0))"
0,tcp,http,210,151,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 210.0, 151.0))"
0,tcp,http,212,786,SF,normal,1.0,"List(0, 2, List(1), List(1.0))",2.0,"List(0, 65, List(2), List(1.0))",0.0,"List(0, 10, List(0), List(1.0))",1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 212.0, 786.0))"


In [0]:
preppedDataDF.printSchema()

In [0]:
selectedcols = ["labels", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

labels,features,duration,protocol_type,service,src_bytes,dst_bytes,flag,label
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 181.0, 5450.0))",0,tcp,http,181,5450,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 239.0, 486.0))",0,tcp,http,239,486,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 235.0, 1337.0))",0,tcp,http,235,1337,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 219.0, 1337.0))",0,tcp,http,219,1337,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 217.0, 2032.0))",0,tcp,http,217,2032,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 217.0, 2032.0))",0,tcp,http,217,2032,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 212.0, 1940.0))",0,tcp,http,212,1940,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 159.0, 4087.0))",0,tcp,http,159,4087,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 210.0, 151.0))",0,tcp,http,210,151,SF,normal
1.0,"List(0, 80, List(1, 4, 67, 78, 79), List(1.0, 1.0, 1.0, 212.0, 786.0))",0,tcp,http,212,786,SF,normal


In [0]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed=100)
print(trainingData.count())
print(testData.count())

In [0]:
lr = LogisticRegression(labelCol="labels", featuresCol="features", maxIter=10)
lrModel = lr.fit(trainingData)

In [0]:
predictions = lrModel.transform(testData)

In [0]:
selected = predictions.select("label", "prediction")
#display(selected.limit(10))
display(predictions.limit(10))

labels,features,duration,protocol_type,service,src_bytes,dst_bytes,flag,label,rawPrediction,probability,prediction
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 18.0))",0,icmp,ecr_i,18,0,SF,attack,"List(1, 2, List(), List(8.55925032348192, -8.55925032348192))","List(1, 2, List(), List(0.9998082737660126, 1.9172623398741064E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0
0.0,"List(0, 80, List(0, 2, 67, 78), List(1.0, 1.0, 1.0, 520.0))",0,icmp,ecr_i,520,0,SF,attack,"List(1, 2, List(), List(8.55933323234503, -8.55933323234503))","List(1, 2, List(), List(0.9998082896581104, 1.9171034188951943E-4))",0.0


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator( rawPredictionCol='rawPrediction',labelCol='labels', metricName='areaUnderROC')

In [0]:
evaluator.evaluate(predictions)