In [1]:
from pyspark import SparkContext,SparkConf

In [2]:
sc=SparkContext()

In [3]:
data_file="/data/kdd/kddcup.data_10_percent.gz"
raw_data=sc.textFile(data_file)

In [4]:
csv_data=raw_data.map(lambda x:x.split(","))
key_value_data=csv_data.map(lambda x: (x[41],x))

In [5]:
key_value_data.take(1)

[(u'normal.',
  [u'0',
   u'tcp',
   u'http',
   u'SF',
   u'181',
   u'5450',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'1',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'8',
   u'8',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'1.00',
   u'0.00',
   u'0.00',
   u'9',
   u'9',
   u'1.00',
   u'0.00',
   u'0.11',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'normal.'])]

In [6]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 

In [7]:
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

In [8]:
durations_by_key.collect()

[(u'guess_passwd.', 144.0),
 (u'nmap.', 0.0),
 (u'warezmaster.', 301.0),
 (u'rootkit.', 1008.0),
 (u'warezclient.', 627563.0),
 (u'smurf.', 0.0),
 (u'pod.', 0.0),
 (u'neptune.', 0.0),
 (u'normal.', 21075991.0),
 (u'spy.', 636.0),
 (u'ftp_write.', 259.0),
 (u'phf.', 18.0),
 (u'portsweep.', 1991911.0),
 (u'teardrop.', 0.0),
 (u'buffer_overflow.', 2751.0),
 (u'land.', 0.0),
 (u'imap.', 72.0),
 (u'loadmodule.', 326.0),
 (u'perl.', 124.0),
 (u'multihop.', 1288.0),
 (u'back.', 284.0),
 (u'ipsweep.', 43.0),
 (u'satan.', 64.0)]

In [9]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(int,
            {u'back.': 2203,
             u'buffer_overflow.': 30,
             u'ftp_write.': 8,
             u'guess_passwd.': 53,
             u'imap.': 12,
             u'ipsweep.': 1247,
             u'land.': 21,
             u'loadmodule.': 9,
             u'multihop.': 7,
             u'neptune.': 107201,
             u'nmap.': 231,
             u'normal.': 97278,
             u'perl.': 3,
             u'phf.': 4,
             u'pod.': 264,
             u'portsweep.': 1040,
             u'rootkit.': 10,
             u'satan.': 1589,
             u'smurf.': 280790,
             u'spy.': 2,
             u'teardrop.': 979,
             u'warezclient.': 1020,
             u'warezmaster.': 20})

In [10]:
# combineByKey
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{u'back.': (284.0, 2203),
 u'buffer_overflow.': (2751.0, 30),
 u'ftp_write.': (259.0, 8),
 u'guess_passwd.': (144.0, 53),
 u'imap.': (72.0, 12),
 u'ipsweep.': (43.0, 1247),
 u'land.': (0.0, 21),
 u'loadmodule.': (326.0, 9),
 u'multihop.': (1288.0, 7),
 u'neptune.': (0.0, 107201),
 u'nmap.': (0.0, 231),
 u'normal.': (21075991.0, 97278),
 u'perl.': (124.0, 3),
 u'phf.': (18.0, 4),
 u'pod.': (0.0, 264),
 u'portsweep.': (1991911.0, 1040),
 u'rootkit.': (1008.0, 10),
 u'satan.': (64.0, 1589),
 u'smurf.': (0.0, 280790),
 u'spy.': (636.0, 2),
 u'teardrop.': (0.0, 979),
 u'warezclient.': (627563.0, 1020),
 u'warezmaster.': (301.0, 20)}

In [11]:
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print tag, duration_means_by_type[tag]

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
nmap. 0.0
smurf. 0.0
pod. 0.0
neptune. 0.0
teardrop. 0.0
land. 0.0
