In [1]:
# Create an RDD from the KDD99 10% dataset
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
15,application_1499674741924_0019,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# Count the number of lines in the raw dataset
raw_data.count()

494021

In [3]:
# Print first few entries
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.', '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.', '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [4]:
# Filter `normal` data
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

In [5]:
# Count the normal data and measure time
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print ("There are {} 'normal' interactions".format(normal_count))
print ("Count completed in {} seconds".format(round(tt,3)))

There are 97278 'normal' interactions
Count completed in 1.225 seconds

In [6]:
# Sample data
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print ("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 49493 of 494021

In [7]:
# Measuring the normal interaction in the sample dataset
from time import time

# transformations to get normal data
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print ("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)))
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.195
Count done in 1.499 seconds

In [8]:
# Measuring the normal interaction in the entire dataset
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print ("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)))
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.195
Count done in 2.763 seconds

This shows that the normal interaction in the data set is 0.195 (from sampling and from the entire data set). The duration is about a second slower when operating on the entire data set.

In [9]:
# Subtract the normal data from the entire dataset to get attacks
attack_raw_data = raw_data.subtract(normal_raw_data)
print ("There are {} attack interactions".format(round(attack_raw_data.count(),3)))

There are 396743 attack interactions

In [10]:
# Extract protocols
# Isolate each collection of values in two separate RDDs. 
# For that we will use distinct on the CSV-parsed dataset. 
# From the dataset description we know that protocol is the 
# second column and service is the third (tag is the last 
# one and not the first as appears in the page).
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

['icmp', 'udp', 'tcp']

In [11]:
# Extract services
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

['finger', 'http', 'netbios_dgm', 'name', 'hostnames', 'vmnet', 'systat', 'shell', 'netbios_ssn', 'urh_i', 'pop_3', 'ctf', 'domain', 'mtp', 'remote_job', 'exec', 'supdup', 'http_443', 'sunrpc', 'urp_i', 'pop_2', 'csnet_ns', 'smtp', 'whois', 'ldap', 'daytime', 'imap4', 'nntp', 'klogin', 'rje', 'IRC', 'link', 'eco_i', 'tftp_u', 'iso_tsap', 'uucp_path', 'auth', 'ecr_i', 'other', 'domain_u', 'courier', 'discard', 'red_i', 'tim_i', 'time', 'login', 'ftp', 'telnet', 'ntp_u', 'sql_net', 'echo', 'private', 'gopher', 'efs', 'netbios_ns', 'ftp_data', 'nnsp', 'ssh', 'netstat', 'uucp', 'Z39_50', 'kshell', 'X11', 'bgp', 'pm_dump', 'printer']

In [12]:
# Print all possible combinations of protocols x services
product = protocols.cartesian(services).collect()
print ("There are {} combinations of protocol X service".format(len(product)))

There are 198 combinations of protocol X service

In [13]:
# separate normal and attack RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

In [14]:
# Duration of normal and attack
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

In [15]:
# Total duration of normal and attack
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print ("Total duration for 'normal' interactions is {}".format(total_normal_duration))
print ("Total duration for 'attack' interactions is {}".format(total_attack_duration))

Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792

In [16]:
# Mean duration of normal and attack
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print ("Mean duration for 'normal' interactions is {}".format(round(total_normal_duration/float(normal_count),3)))
print ("Mean duration for 'attack' interactions is {}".format(round(total_attack_duration/float(attack_count),3)))

Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621

In [17]:
# Using aggregate
# The aggregate action does not require the return type to be the same type as the RDD. 
# Like with fold, we supply an initial zero value of the type we want to return. Then we provide two functions.
# The first one is used to combine the elements from our RDD with the accumulator. 
# The second function is needed to merge two accumulators.

# Normal interactions
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("Mean duration for 'normal' interactions is {}".\
       format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

# Attack interactions
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3)))

Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621

In [18]:
# Key-value data
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [19]:
# The `reduceByKey()` transformation is used to calculate the total duration of each network interaction type.
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)
durations_by_key.collect()

[('portsweep.', 1991911.0), ('neptune.', 0.0), ('satan.', 64.0), ('pod.', 0.0), ('multihop.', 1288.0), ('back.', 284.0), ('warezclient.', 627563.0), ('nmap.', 0.0), ('smurf.', 0.0), ('guess_passwd.', 144.0), ('ftp_write.', 259.0), ('imap.', 72.0), ('land.', 0.0), ('loadmodule.', 326.0), ('buffer_overflow.', 2751.0), ('perl.', 124.0), ('ipsweep.', 43.0), ('rootkit.', 1008.0), ('phf.', 18.0), ('teardrop.', 0.0), ('warezmaster.', 301.0), ('normal.', 21075991.0), ('spy.', 636.0)]

In [20]:
# counts by keys
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(<class 'int'>, {'neptune.': 107201, 'spy.': 2, 'normal.': 97278, 'guess_passwd.': 53, 'multihop.': 7, 'satan.': 1589, 'loadmodule.': 9, 'warezclient.': 1020, 'rootkit.': 10, 'ftp_write.': 8, 'back.': 2203, 'perl.': 3, 'smurf.': 280790, 'pod.': 264, 'land.': 21, 'warezmaster.': 20, 'nmap.': 231, 'portsweep.': 1040, 'buffer_overflow.': 30, 'teardrop.': 979, 'ipsweep.': 1247, 'imap.': 12, 'phf.': 4})

In [21]:
# calculate per-type durations using combineByKey

sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{'neptune.': (0.0, 107201), 'spy.': (636.0, 2), 'normal.': (21075991.0, 97278), 'guess_passwd.': (144.0, 53), 'multihop.': (1288.0, 7), 'satan.': (64.0, 1589), 'loadmodule.': (326.0, 9), 'warezclient.': (627563.0, 1020), 'rootkit.': (1008.0, 10), 'ftp_write.': (259.0, 8), 'back.': (284.0, 2203), 'perl.': (124.0, 3), 'smurf.': (0.0, 280790), 'pod.': (0.0, 264), 'land.': (0.0, 21), 'warezmaster.': (301.0, 20), 'nmap.': (0.0, 231), 'portsweep.': (1991911.0, 1040), 'buffer_overflow.': (2751.0, 30), 'teardrop.': (0.0, 979), 'ipsweep.': (43.0, 1247), 'imap.': (72.0, 12), 'phf.': (18.0, 4)}