# Working with key/value pair RDDs

## Getting the data and creating the RDD

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## Creating a pair RDD for interaction types

In [3]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [4]:
key_value_data.take(1)
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 

## Data aggregations with key/value pair RDDs

We have a specific counting action for key/value pairs.  

### Using `combineByKey`

In [5]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{'back.': (284.0, 2203),
 'buffer_overflow.': (2751.0, 30),
 'ftp_write.': (259.0, 8),
 'guess_passwd.': (144.0, 53),
 'imap.': (72.0, 12),
 'ipsweep.': (43.0, 1247),
 'land.': (0.0, 21),
 'loadmodule.': (326.0, 9),
 'multihop.': (1288.0, 7),
 'neptune.': (0.0, 107201),
 'nmap.': (0.0, 231),
 'normal.': (21075991.0, 97278),
 'perl.': (124.0, 3),
 'phf.': (18.0, 4),
 'pod.': (0.0, 264),
 'portsweep.': (1991911.0, 1040),
 'rootkit.': (1008.0, 10),
 'satan.': (64.0, 1589),
 'smurf.': (0.0, 280790),
 'spy.': (636.0, 2),
 'teardrop.': (0.0, 979),
 'warezclient.': (627563.0, 1020),
 'warezmaster.': (301.0, 20)}

In [6]:
sum_counts.collect()[0][1]

(1991911.0, 1040)

In [7]:
duration_means_by_type = sum_counts.map(lambda row: (row[0], round(row[1][0]/row[1][1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key = duration_means_by_type.get, reverse=True):
    print(tag, duration_means_by_type[tag])

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
neptune. 0.0
land. 0.0
smurf. 0.0
pod. 0.0
teardrop. 0.0
nmap. 0.0


In [8]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x*x, x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0] + (value*value),acc[1]+value, acc[2]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1], acc1[2]+acc2[2])) # combine accumulators
)

sum_counts.collectAsMap()

{'back.': (2750.0, 284.0, 2203),
 'buffer_overflow.': (528031.0, 2751.0, 30),
 'ftp_write.': (24145.0, 259.0, 8),
 'guess_passwd.': (7730.0, 144.0, 53),
 'imap.': (2642.0, 72.0, 12),
 'ipsweep.': (241.0, 43.0, 1247),
 'land.': (0.0, 0.0, 21),
 'loadmodule.': (25526.0, 326.0, 9),
 'multihop.': (623634.0, 1288.0, 7),
 'neptune.': (0.0, 0.0, 107201),
 'nmap.': (0.0, 0.0, 231),
 'normal.': (184281756189.0, 21075991.0, 97278),
 'perl.': (5566.0, 124.0, 3),
 'phf.': (180.0, 18.0, 4),
 'pod.': (0.0, 0.0, 264),
 'portsweep.': (58958002693.0, 1991911.0, 1040),
 'rootkit.': (522230.0, 1008.0, 10),
 'satan.': (436.0, 64.0, 1589),
 'smurf.': (0.0, 0.0, 280790),
 'spy.': (202970.0, 636.0, 2),
 'teardrop.': (0.0, 0.0, 979),
 'warezclient.': (5352634543.0, 627563.0, 1020),
 'warezmaster.': (25707.0, 301.0, 20)}

Como salida tenemos, (key, (suma_cuadrados, suma, n))

# Calculamos la desviación típica
La ecuación es sqrt(sum(x^2)/n - (sum(x)/n)^2)

In [9]:
import math

In [10]:
duration_means_by_type = sum_counts.map(lambda row: (row[0], math.sqrt(round(row[1][0]/row[1][2],3) - round(row[1][1]/row[1][2],3)*round(row[1][1]/row[1][2],3))))

In [11]:
duration_means_by_type.take(1)

[('portsweep.', 7281.621862305059)]

Python 3 no permite generar funciones lambda con múltiples parámetros por el momento,
nos gustaría escribir:
lambda key, (n, suma_cuadrados, suma): 
Pero no nos deja, así que obtamos por la alternativa:

In [12]:
#def f(row): 
#    (key, (suma, suma_cuadrados, n)) = row
#    !!! otras operaciones sobre la tupla
#    return 

In [13]:
def agregacion(row):
    (key, (suma_cuadrados, suma, n)) = row
    return (key, (round(suma_cuadrados/n,3), round(suma/n,3)))

In [14]:
def calculo_final(row):
    (key, (media_cuadrados, media)) = row
    varianza = round(media_cuadrados - media*media,3)
    desv = round(math.sqrt(varianza),3)
    return (key, (media, desv, varianza))

In [15]:
duration_means_by_type_2 = sum_counts.map(agregacion) \
                                     .map(calculo_final)

In [16]:
duration_means_by_type_2.take(1)

[('portsweep.', (1915.299, 7281.622, 53022016.946))]

In [17]:
sc.stop()