# Working with key/value pair RDDs

**NOTE: This notebook is worth 10% of the grade of project 2.**

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are usually used to perform aggregations and other processings by key.  

In this notebook we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.  

## Getting the data and creating the RDD

In this notebook we will use the reduced dataset (1 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a *Gzip* file in the local directory.  

In [1]:

import os
from pyspark import SparkContext
data_file = "file://" + os.getcwd() + "/../kddcup.data_1_percent.gz"
sc=SparkContext.getOrCreate() 
raw_data=sc.textFile(data_file)

## Creating a pair RDD for interaction types

In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretely we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.  

Normally we create key/value pair RDDs by applying a function using `map` to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.  

In [2]:
csv_data = raw_data.map(lambda x: x.split(","))
# TODO: From each row of the data, generate a key-value pair. The key will be the tag and the value will be the CSV data.
# HINT: x[41] contains the network interaction tag
key_value_data = csv_data.map(lambda x: (x[41], x))
print(type(key_value_data))

<class 'pyspark.rdd.PipelinedRDD'>


We have now our key/value pair data ready to be used. Let's get the first element in order to see how it looks like.  

In [3]:
key_value_data.take(1)

[('normal.',
  ['0',
   'udp',
   'private',
   'SF',
   '105',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '2',
   '2',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '255',
   '241',
   '0.95',
   '0.01',
   '0.01',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

## Data aggregations with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.  

For example, we have a `reduceByKey` transformation that we can use as follows to calculate the total duration of each network interaction type.  

In [4]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
# TODO: Aggregate the durations of network interactions which have the same key (i.e., tag) 
# HINT: The argument of reduceByKey is a lambda which takes two values and returns the reduced result.
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x+y)

durations_by_key.collect()

[('land.', 0.0),
 ('portsweep.', 250025.0),
 ('loadmodule.', 103.0),
 ('neptune.', 0.0),
 ('buffer_overflow.', 237.0),
 ('satan.', 12.0),
 ('pod.', 0.0),
 ('ipsweep.', 0.0),
 ('back.', 48.0),
 ('teardrop.', 0.0),
 ('nmap.', 0.0),
 ('smurf.', 0.0),
 ('warezclient.', 88429.0),
 ('guess_passwd.', 0.0),
 ('normal.', 2127905.0),
 ('warezmaster.', 19.0),
 ('imap.', 0.0)]

We have a specific counting action for key/value pairs.  

In [5]:
# TODO: count the number of rows for each key
counts_by_key = key_value_data.countByKey()

counts_by_key

defaultdict(int,
            {'back.': 195,
             'buffer_overflow.': 3,
             'guess_passwd.': 5,
             'imap.': 1,
             'ipsweep.': 134,
             'land.': 3,
             'loadmodule.': 1,
             'neptune.': 10704,
             'nmap.': 24,
             'normal.': 9641,
             'pod.': 28,
             'portsweep.': 85,
             'satan.': 161,
             'smurf.': 28219,
             'teardrop.': 92,
             'warezclient.': 103,
             'warezmaster.': 3})

### Using `combineByKey`

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the `aggregate` equivalent since it allows the user to return values that are not the same type as our input data.

`combineByKey` is essentially a combination of `map` and `reduce`. `combineByKey` requires three lambda functions as arguments:
  - **createCombiner**: Given a value V, return a combination of values or tuples
  - **mergeValue**: Given a combination C and a value V, returns a combination of values or tuples
  - **mergeCombiners**: Given two combinations C1 and C2, returns a combination of values or tuples

For example, we can use it to calculate per-type average durations as follows.  

In [6]:
# TODO: generate `sum_counts` as to sum up the elements of the same key, and for each key, to return a tuple of (sum, count).

    
# http://www.hadoopexam.com/adi/index.php/spark-blog/90-how-combinebykey-works-in-spark-step-by-step-explaination   

sum_counts = key_value_duration.combineByKey(
    
    (lambda v: (v, 1)), # create a combiner http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/
    (lambda c, v: (c[0]+v, c[1]+1)), # sum value and increment count
    (lambda c1, c2: (c1[0]+c2[0], c1[1]+c2[1])) # merge two combiners
)

sum_counts.collectAsMap()

{'back.': (48.0, 195),
 'buffer_overflow.': (237.0, 3),
 'guess_passwd.': (0.0, 5),
 'imap.': (0.0, 1),
 'ipsweep.': (0.0, 134),
 'land.': (0.0, 3),
 'loadmodule.': (103.0, 1),
 'neptune.': (0.0, 10704),
 'nmap.': (0.0, 24),
 'normal.': (2127905.0, 9641),
 'pod.': (0.0, 28),
 'portsweep.': (250025.0, 85),
 'satan.': (12.0, 161),
 'smurf.': (0.0, 28219),
 'teardrop.': (0.0, 92),
 'warezclient.': (88429.0, 103),
 'warezmaster.': (19.0, 3)}

We can see that the arguments are pretty similar to those passed to `aggregate` in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.  

In [11]:
# TODO: create a RDD 'duration_means_by_type' by mapping each key-value pair of `sum_counts` as the key and the duration mean
# HINT: duration mean = sum / count
duration_means_by_type = sum_counts.map(lambda x: (x[0], x[1][0] / x[1][1])).collectAsMap()
#sum_counts.map(lambda key, value: (key, value[0] / value[1]))
#lambda (label, (value_sum, count)): (label, value_sum / count)
#duration_means_by_type=duration_means_by_type.collectAsMap()
# Print them sorted
#duration_means_by_type.collectAsMap()
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print(tag + ": " + str(duration_means_by_type[tag]))

portsweep.: 2941.470588235294
warezclient.: 858.5339805825242
normal.: 220.71413753759984
loadmodule.: 103.0
buffer_overflow.: 79.0
warezmaster.: 6.333333333333333
back.: 0.24615384615384617
satan.: 0.07453416149068323
imap.: 0.0
nmap.: 0.0
neptune.: 0.0
teardrop.: 0.0
ipsweep.: 0.0
land.: 0.0
pod.: 0.0
smurf.: 0.0
guess_passwd.: 0.0
