# **KDDCup Data Analytics with PySpark RDD: A structured case study**


### Author: Trilok Nath

##### data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

In [0]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

In [0]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [0]:
from pyspark import SparkContext, SparkConf

# Initializing Spark
conf = SparkConf().setAppName("KDDCup_PySpark").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)
print("Ready to go!")

In [0]:
# ##########REQUIRED ONLY IF YOU ARE WORKING WITH GOOGLE COLAB ###########
from google.colab import drive
drive.mount('/content/drive')

In [0]:
# Read and Load Data to Spark
# Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
rdd = spark.sparkContext.textFile("dbfs:/FileStore/kddcup_data.gz")

In [0]:
# Repartition and Cache Data:
print("the original Partitions:",rdd.getNumPartitions())
print("the orginal Parallelism:",spark.sparkContext.defaultParallelism)
rdd = rdd.repartition(8)
print("After Repartitioning", rdd.getNumPartitions())

the original Partitions: 1
the orginal Parallelism: 8
After Repartitioning 8


## Question 1: Get ten records randomly

In [0]:
rdd.takeSample(False,10)

Out[7]: ['0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,tcp,http,SF,240,265,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,10,10,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,249,0.98,0.01,0.00,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,510,510,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,138,15,1.00,1.00,0.00,0.00,0.11,0.06,0.00,255,15,0.06,0.06,0.00,0.00,1.00,1.00,0.00,0.00,neptune.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0

## Question 2: Count elements

In [0]:
rdd.count()

Out[8]: 4898431

## Question 3: Calculate the ratio of `normal` connections

In [0]:
rdd_normal = rdd.filter(lambda line: "normal." in line)

print(f"The ratio of Normal: {round((rdd_normal.count()/rdd.count())*100,2)}%")

The ratio of Normal: 19.86%


## Question 4: Get the list of labels

In [0]:
# rdd.takeSample(False,10)
rdd.map(lambda line: line.split(',')[-1]).distinct().collect()

Out[20]: ['guess_passwd.',
 'neptune.',
 'pod.',
 'loadmodule.',
 'smurf.',
 'ipsweep.',
 'multihop.',
 'warezmaster.',
 'rootkit.',
 'back.',
 'buffer_overflow.',
 'teardrop.',
 'warezclient.',
 'normal.',
 'land.',
 'nmap.',
 'phf.',
 'imap.',
 'perl.',
 'portsweep.',
 'spy.',
 'satan.',
 'ftp_write.']

## Question 5: Count the number of connections for each label

In [0]:
labels = rdd.map(lambda line: line.split(',')[-1]).distinct()
rdd_labels_map = rdd.map(lambda line: (line.split(',')[-1],1))
rdd_reducedkey = rdd_labels_map.reduceByKey(lambda x,y:x+y)
sorted_labels = rdd_reducedkey.sortByKey(ascending=False,keyfunc = lambda x: x[1])

In [0]:
sorted_labels.collect()

Out[38]: [('spy.', 2),
 ('smurf.', 2807886),
 ('satan.', 15892),
 ('ipsweep.', 12481),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('normal.', 972781),
 ('perl.', 3),
 ('portsweep.', 10413),
 ('neptune.', 1072017),
 ('ftp_write.', 8),
 ('rootkit.', 10),
 ('land.', 21),
 ('multihop.', 7),
 ('buffer_overflow.', 30),
 ('phf.', 4),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('back.', 2203),
 ('loadmodule.', 9),
 ('teardrop.', 979),
 ('nmap.', 2316),
 ('imap.', 12)]

## Question 6: Get the connection type with successful `root_shell` connections to servers, where the number of data bytes from source (`src_bytes`) is 500 times more than from server (`dst_bytes`)

In [0]:
sucess_root_shell = rdd.filter(lambda line: line.split(',')[13]=='1')\
                        .map(lambda line: line.split(',')[:14])\
                        .filter(lambda list_: int(list_[4]) > int(list_[5]))

In [0]:
sucess_root_shell.collect()

Out[60]: [['0',
  'tcp',
  'X11',
  'SF',
  '200068',
  '12976',
  '0',
  '0',
  '0',
  '1',
  '0',
  '1',
  '0',
  '1'],
 ['0',
  'tcp',
  'smtp',
  'SF',
  '538',
  '386',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '1']]

## Question 7:  Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses

In [0]:
rdd_normal = rdd.filter(lambda line: 'normal.' in line and line.split(',')[21] != '1')\
                .map(lambda line: (line.split(',')[1],1))\
                .reduceByKey(lambda x,y:x+y)
rdd_attack = rdd.filter(lambda line: 'normal.' not in line and line.split(',')[21] != '1')\
                .map(lambda line: (line.split(',')[1],1))\
                .reduceByKey(lambda x,y:x+y)

In [0]:
print(rdd_normal.collect()+rdd_attack.collect())

[('icmp', 12763), ('udp', 191348), ('tcp', 764894), ('icmp', 2820782), ('udp', 2940), ('tcp', 1101613)]


## Question 8: Get a summary statistics for the sum of `tcp` connections to the same destination IP address (hint: `protocol_type` and `dst_host_count` features)

In [0]:
# Source: https://spark.apache.org/docs/latest/mllib-statistics.html
from pyspark.mllib.stat import Statistics 
from math import sqrt
Split_rdd = rdd.map(lambda line: line.split(','))
summary = Statistics.colStats(Split_rdd.filter(lambda line: line[1] == "tcp").map(lambda line: [int(line[31])])) # the input should be a "vector"

tcp_mean = round(float(summary.mean()),3)
tcp_std = round(float(sqrt(summary.variance())),3)
tcp_min = round(float(summary.min()),3)
tcp_max = round(float(summary.max()),3)

print([tcp_mean, tcp_std, tcp_min, tcp_max])

[201.752, 90.726, 0.0, 255.0]


## [challenge] Question 9: Filter the number of `icmp`-based attacks for each `service`