## from codementor

https://www.codementor.io/spark/tutorial/spark-python-data-aggregations

##Getting the data and creating the RDD

In this section we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally. 


MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

In [4]:

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [3]:
!hdfs dfs -ls /tmp/kdd*

-rw-r--r--   3 cloud-user hdfs   18115902 2017-01-17 03:35 /tmp/kddcup.data.gz
-rw-r--r--   3 cloud-user hdfs    2144903 2017-01-17 08:52 /tmp/kddcup.data_10_percent.gz


In [5]:
!hdfs dfs -put ./kddcup.data_10_percent.gz /tmp/


put: `/tmp/kddcup.data_10_percent.gz': File exists


In [5]:
data_file = "/tmp/kddcup.data_10_percent.gz"

raw_data = sc.textFile(data_file)

 ## Inspecting interaction duration by tag

Both fold and reduce take a function as an argument that is applied to two elements of the RDD. The fold action differs from reduce in that it gets and additional initial zero value to be used for the initial call. This value should be the identity element for the function provided.

As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use reduce as follows. 

In [6]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

In [7]:
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

In [9]:
type(normal_duration_data)

pyspark.rdd.PipelinedRDD

In [10]:
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print "Total duration for 'normal' interactions is {}".\
    format(total_normal_duration)
print "Total duration for 'attack' interactions is {}".\
    format(total_attack_duration)



Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792


In [11]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print "Mean duration for 'normal' interactions is {}".\
    format(round(total_normal_duration/float(normal_count),3))
print "Mean duration for 'attack' interactions is {}".\
    format(round(total_attack_duration/float(attack_count),3))


Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621


## A better way, using aggregate

The aggregate action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with fold, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before. 


In [12]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine val/acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
)

In [13]:
print "Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))


Mean duration for 'normal' interactions is 216.657


In [14]:
normal_sum_count

(21075991, 97278)

In [15]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print "Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))


Mean duration for 'attack' interactions is 6.621


## Working with key/value pair RDDs

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are sually used to perform aggregations and other processings by key. 

In [16]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [17]:
key_value_data.take(1)

[(u'normal.',
  [u'0',
   u'tcp',
   u'http',
   u'SF',
   u'181',
   u'5450',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'1',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'8',
   u'8',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'1.00',
   u'0.00',
   u'0.00',
   u'9',
   u'9',
   u'1.00',
   u'0.00',
   u'0.11',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'normal.'])]

In [18]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()


[(u'guess_passwd.', 144.0),
 (u'nmap.', 0.0),
 (u'warezmaster.', 301.0),
 (u'rootkit.', 1008.0),
 (u'warezclient.', 627563.0),
 (u'smurf.', 0.0),
 (u'pod.', 0.0),
 (u'neptune.', 0.0),
 (u'normal.', 21075991.0),
 (u'spy.', 636.0),
 (u'ftp_write.', 259.0),
 (u'phf.', 18.0),
 (u'portsweep.', 1991911.0),
 (u'teardrop.', 0.0),
 (u'buffer_overflow.', 2751.0),
 (u'land.', 0.0),
 (u'imap.', 72.0),
 (u'loadmodule.', 326.0),
 (u'perl.', 124.0),
 (u'multihop.', 1288.0),
 (u'back.', 284.0),
 (u'ipsweep.', 43.0),
 (u'satan.', 64.0)]

In [19]:
counts_by_key = key_value_data.countByKey()
counts_by_key


defaultdict(int,
            {u'back.': 2203,
             u'buffer_overflow.': 30,
             u'ftp_write.': 8,
             u'guess_passwd.': 53,
             u'imap.': 12,
             u'ipsweep.': 1247,
             u'land.': 21,
             u'loadmodule.': 9,
             u'multihop.': 7,
             u'neptune.': 107201,
             u'nmap.': 231,
             u'normal.': 97278,
             u'perl.': 3,
             u'phf.': 4,
             u'pod.': 264,
             u'portsweep.': 1040,
             u'rootkit.': 10,
             u'satan.': 1589,
             u'smurf.': 280790,
             u'spy.': 2,
             u'teardrop.': 979,
             u'warezclient.': 1020,
             u'warezmaster.': 20})

## Using combineByKey

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the aggregate equivlent since it allows the user to return values that are not the same type as our input data.

For example, we can use it to calculate per-type average durations as follows. 

In [24]:
from time import time
t0 = time()
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

tt= time() - t0
print "Aggregate  in {} secs".format(round(tt,3))

sum_counts.collectAsMap()



Aggregate and collect in 0.042 secs


{u'back.': (284.0, 2203),
 u'buffer_overflow.': (2751.0, 30),
 u'ftp_write.': (259.0, 8),
 u'guess_passwd.': (144.0, 53),
 u'imap.': (72.0, 12),
 u'ipsweep.': (43.0, 1247),
 u'land.': (0.0, 21),
 u'loadmodule.': (326.0, 9),
 u'multihop.': (1288.0, 7),
 u'neptune.': (0.0, 107201),
 u'nmap.': (0.0, 231),
 u'normal.': (21075991.0, 97278),
 u'perl.': (124.0, 3),
 u'phf.': (18.0, 4),
 u'pod.': (0.0, 264),
 u'portsweep.': (1991911.0, 1040),
 u'rootkit.': (1008.0, 10),
 u'satan.': (64.0, 1589),
 u'smurf.': (0.0, 280790),
 u'spy.': (636.0, 2),
 u'teardrop.': (0.0, 979),
 u'warezclient.': (627563.0, 1020),
 u'warezmaster.': (301.0, 20)}

In [21]:
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print tag, duration_means_by_type[tag]

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
nmap. 0.0
smurf. 0.0
pod. 0.0
neptune. 0.0
teardrop. 0.0
land. 0.0
