In [1]:

from pyspark import SparkContext, SparkConf
import urllib

f = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [2]:
# prepare the RDD
data_file = "./kddcup.data_10_percent.gz"

conf = SparkConf().setAppName("KDD_Aggregations")
sc = SparkContext(conf=conf)

raw_data = sc.textFile(data_file)

In [3]:
# get the data in csv
csv = raw_data.map(lambda x: x.split(','))

# separate the attack and normal csvs
normal_csv = csv.filter(lambda x: x[41] == 'normal.')
attack_csv = csv.filter(lambda x: x[41] != 'normal.')

In [10]:
# get the durations
normal_duration = normal_csv.map(lambda x: int(x[0]))
attack_duration = attack_csv.map(lambda x: int(x[0]))

In [11]:
# we can now REDUCE
total_normal_duration = normal_duration.reduce(lambda x,y: x+y)
total_attack_duration = attack_duration.reduce(lambda x,y: x+y)

print "Total duration for 'normal' interactions is {}".format(total_normal_duration)
print "Total duration for 'attack' interactions is {}".format(total_attack_duration)

Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792


In [13]:
# What is the mean duration of normal and attack
# Mean is total normal/count(normal) . same for attack
count_normal = normal_duration.count()
count_attack = attack_duration.count()

mean_normal = total_normal_duration / float(count_normal)
mean_attack = total_attack_duration / float(count_attack)

print "Mean duration of attack is = {}".format(mean_attack)
print "Mean duration of normal interaction is = {}".format(mean_normal)

Mean duration of attack is = 6.62089060172
Mean duration of normal interaction is = 216.657322313


In [14]:
# Now calculate the means using aggregate function
normal_sum_count = normal_duration.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))

Mean duration for 'normal' interactions is 216.657


In [16]:
# same for attack
attack_sum_count = attack_duration.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))

Mean duration for 'attack' interactions is 6.621
