In [0]:
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
#Part A - 3
rdd1 = sc.textFile('dbfs:/kdd/kddcup_data.gz')
rdd1.take(10)

In [0]:
type(rdd1)

In [0]:
#Part A - 4
#split the data
rdd2 = rdd1.map(lambda x: x.split(","))
#rdd2.collect()
#show total number of feartures
print(len(rdd2.take(1)[0]))

In [0]:
#Part A - 5
#duration, protocol_type, service, src_bytes, dst_bytes, flag and label
def a5(x):
  output = x[:6]
  output.append(x[-1])
  return output
  
rdd3 = rdd2.map(lambda x: a5(x))\
           .map(lambda x: [int(x[0]),str(x[1]),str(x[2]),str(x[3]),int(x[4]),int(x[5]),str(x[6])])
rdd3.collect()

In [0]:
#create dataframe and displact first 10 columns
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DoubleType
deptSchema = StructType([       
    StructField('duration', IntegerType(), True),
    StructField('protocol_type', StringType(), True),
    StructField('service', StringType(), True),
    StructField('flag', StringType(), True),
    StructField('src_bytes', IntegerType(), True),
    StructField('dst_bytes', IntegerType(), True),
    StructField('label', StringType(), True),
])

df = spark.createDataFrame(rdd3, schema = deptSchema)
df.printSchema()
df.show(10,truncate=False)

In [0]:
#Part A - 6
df1 = df.groupBy('protocol_type').count().sort('count')
df1.show()

In [0]:
display(df1)

protocol_type,count
udp,20354
tcp,190065
icmp,283602


In [0]:
df2 = df.groupBy('service').count().sort('count')
df2.show(100)

In [0]:
#df2.toPandas().plot(kind='bar')
display(df2)

service,count
tftp_u,1
red_i,1
pm_dump,1
tim_i,7
X11,11
urh_i,14
IRC,43
Z39_50,92
netstat,95
ctf,97


In [0]:
#Part A - 7
#a. flag
df_flag = df.groupBy('flag').count().sort('count')
df_flag.show(100)

In [0]:
display(df_flag)

flag,count
OTH,8
S3,10
RSTOS0,11
S2,24
S1,57
SH,107
RSTO,579
RSTR,903
REJ,26875
S0,87007


In [0]:
#b.label
df_label = df.groupBy('label').count().sort('count')
df_label.show(30)

In [0]:
display(df_label)

label,count
spy.,2
perl.,3
phf.,4
multihop.,7
ftp_write.,8
loadmodule.,9
rootkit.,10
imap.,12
warezmaster.,20
land.,21


In [0]:
df.describe('duration').show()

In [0]:
def duraiton_level(x):
  output = 'a'
  if x <= 5:
    output = 'less than 5'
  elif x > 5 and x <= 20:
    output = '5 to 20'
  elif x > 20 and x <= 50:
    output = '20 to 50'
  elif x > 50 and x <= 70:
    output = '50 to 70'
  elif x > 70 and x <= 100:
    output = '70 to 100'
  else:
    output = 'over 100'
  return output

df_duration = df.select('duration')
duration_classified = df_duration.rdd.map(lambda x: duraiton_level(x[0]))
duration_classified_kv = duration_classified.map(lambda x:(x,1))
df_duration_classified_kv = spark.createDataFrame(duration_classified_kv,["Duraton level","1"])
df_duration_plot = df_duration_classified_kv.groupBy('Duraton level').count().sort('count')

In [0]:
display(df_duration_plot)

Duraton level,count
70 to 100,61
50 to 70,83
20 to 50,699
5 to 20,1661
over 100,4908
less than 5,486609


In [0]:
#Part 8:
#create new column
from pyspark.sql.functions import when
#encode the protocol type and label
df_8 = df.withColumn('label_encoded',\
                   when(df.label == 'normal.',0).\
                   otherwise(1)).\
                   withColumn('icmp_encoded',\
                   when(df.protocol_type == 'icmp',1).\
                   otherwise(0)).\
                   withColumn('tcp_encoded',\
                   when(df.protocol_type == 'tcp',1).\
                   otherwise(0)).\
                   withColumn('udp_encoded',\
                   when(df.protocol_type == 'udp',1).\
                   otherwise(0))
#collect duration, bytes and encoded protocol as features.
df_ml = df_8.select('duration','src_bytes','dst_bytes','icmp_encoded','tcp_encoded','udp_encoded','label_encoded')

In [0]:
#split data with 60% to 40%
train,test = df_ml.randomSplit([0.6, 0.4])

In [0]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [0]:
from pyspark.mllib.regression import LabeledPoint
train_label = train.rdd.map(lambda line:LabeledPoint(line[-1],[line[:-1]]))
test_label = test.rdd.map(lambda line:LabeledPoint(line[-1],[line[:-1]]))

In [0]:
svm = SVMWithSGD.train(train_label,iterations=100)

In [0]:
predictionAndLabels = test_label.map(lambda x: (float(svm.predict(x.features[0])), x.label))
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under PR = %s" % metrics.areaUnderPR)
print("Area under ROC = %s" % metrics.areaUnderROC)