In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
data_file = './kddcup.data_10_percent.gz'
raw_data = sc.textFile(data_file)

In [5]:
csv_data = raw_data.map(lambda x: x.split(','))
key_value_data = csv_data.map(lambda x: (x[41], x))

In [6]:
key_value_data.take(1)

[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '181',
   '5450',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '8',
   '8',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '9',
   '9',
   '1.00',
   '0.00',
   '0.11',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

In [7]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0])))
duration_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

In [8]:
duration_by_key.collect()

[('normal.', 21075991.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 1991911.0),
 ('ipsweep.', 43.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 64.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

In [9]:
count_by_key = key_value_data.countByKey()

In [10]:
count_by_key

defaultdict(int,
            {'normal.': 97278,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 107201,
             'smurf.': 280790,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 1040,
             'ipsweep.': 1247,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 1589,
             'phf.': 4,
             'nmap.': 231,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

In [11]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
)

In [20]:
sum_counts.collectAsMap()

{'normal.': (21075991.0, 97278),
 'buffer_overflow.': (2751.0, 30),
 'loadmodule.': (326.0, 9),
 'perl.': (124.0, 3),
 'neptune.': (0.0, 107201),
 'smurf.': (0.0, 280790),
 'guess_passwd.': (144.0, 53),
 'pod.': (0.0, 264),
 'teardrop.': (0.0, 979),
 'portsweep.': (1991911.0, 1040),
 'ipsweep.': (43.0, 1247),
 'land.': (0.0, 21),
 'ftp_write.': (259.0, 8),
 'back.': (284.0, 2203),
 'imap.': (72.0, 12),
 'satan.': (64.0, 1589),
 'phf.': (18.0, 4),
 'nmap.': (0.0, 231),
 'multihop.': (1288.0, 7),
 'warezmaster.': (301.0, 20),
 'warezclient.': (627563.0, 1020),
 'spy.': (636.0, 2),
 'rootkit.': (1008.0, 10)}

In [45]:
duration_means_by_type = sum_counts.map(lambda x: (x[0], round(x[1][0] / x[1][1], 3))).collectAsMap()

In [47]:
duration_means_by_type

{'normal.': 216.657,
 'buffer_overflow.': 91.7,
 'loadmodule.': 36.222,
 'perl.': 41.333,
 'neptune.': 0.0,
 'smurf.': 0.0,
 'guess_passwd.': 2.717,
 'pod.': 0.0,
 'teardrop.': 0.0,
 'portsweep.': 1915.299,
 'ipsweep.': 0.034,
 'land.': 0.0,
 'ftp_write.': 32.375,
 'back.': 0.129,
 'imap.': 6.0,
 'satan.': 0.04,
 'phf.': 4.5,
 'nmap.': 0.0,
 'multihop.': 184.0,
 'warezmaster.': 15.05,
 'warezclient.': 615.258,
 'spy.': 318.0,
 'rootkit.': 100.8}

In [48]:
type(duration_means_by_type)

dict

In [51]:
# print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print(tag, duration_means_by_type[tag])

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
neptune. 0.0
smurf. 0.0
pod. 0.0
teardrop. 0.0
land. 0.0
nmap. 0.0
