# Trabalhando com RDDs de pares chave/valor

#### [Baseado em "Introduction to Spark with Python, by Jose A. Dianes"](https://github.com/jadianes/spark-py-notebooks)

O Spark fornece funções específicas para lidar com RDDs cujos elementos são pares de chave/valor. Eles geralmente são usados para realizar agregações e outros processamentos por chave.

Neste caderno, mostraremos como, ao trabalhar com pares de chave/valor, podemos processar nosso conjunto de dados de interações de rede de uma maneira mais prática e poderosa do que a usada em cadernos anteriores. As agregações de pares chave/valor se mostrarão particularmente eficazes ao tentar explorar cada tipo de tag em nossos ataques de rede, de maneira individual.

## Obtendo os dados e criando o RDD

Como fizemos em nosso primeiro notebook, usaremos o conjunto de dados reduzido (10 por cento) fornecido para a [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), contendo quase meio milhão de interações de rede. O arquivo é fornecido como um arquivo Gzip que será baixado localmente.  

In [2]:
from urllib.request import urlretrieve

f = urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [3]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## Criando um par RDD para tipos de interação

Neste notebook, queremos fazer algumas análises exploratórias de dados em nosso conjunto de dados de interações de rede. Mais concretamente, queremos analisar cada tipo de interação de rede em termos de algumas de suas variáveis, como duração. Para fazer isso, primeiro precisamos criar o RDD adequado para isso, em que cada interação é analisada como uma linha CSV representando o valor e é colocada junto com sua tag correspondente como uma chave.

Normalmente, criamos RDDs de pares de chave/valor aplicando uma função usando `map` aos dados originais. Esta função retorna o par correspondente para um dado elemento RDD. Nós podemos proceder da seguinte forma.

In [4]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

Agora temos nossos dados de par chave/valor prontos para serem usados. Vamos pegar o primeiro elemento para ver como é.

In [5]:
key_value_data.take(1)

[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '181',
   '5450',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '8',
   '8',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '9',
   '9',
   '1.00',
   '0.00',
   '0.11',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

## Agregações de dados com RDDs de par chave/valor

Podemos usar todas as transformações e ações disponíveis para RDDs normais com RDDs de par chave / valor. Nós só precisamos fazer as funções trabalharem com elementos de par. Além disso, o Spark fornece funções específicas para trabalhar com RDDs contendo elementos de par. Eles são muito semelhantes aos disponíveis para RDDs gerais. 

Por exemplo, temos uma transformação `reduceByKey` que podemos usar da seguinte maneira para calcular a duração total de cada tipo de interação de rede.

In [6]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

[('normal.', 21075991.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 1991911.0),
 ('ipsweep.', 43.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 64.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

Temos uma ação de contagem específica para pares chave/valor. 

In [7]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(int,
            {'normal.': 97278,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 107201,
             'smurf.': 280790,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 1040,
             'ipsweep.': 1247,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 1589,
             'phf.': 4,
             'nmap.': 231,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

### Utilizando o `combineByKey`

Essa é a mais geral das funções de agregação por chave. A maioria dos outros combinadores por chave é implementada usando-o. Podemos pensar nisso como o equivalente a `aggregate`, pois permite que o usuário retorne valores que não são do mesmo tipo que nossos dados de entrada.

Por exemplo, podemos usá-lo para calcular durações médias por tipo, como segue.

In [8]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{'normal.': (21075991.0, 97278),
 'buffer_overflow.': (2751.0, 30),
 'loadmodule.': (326.0, 9),
 'perl.': (124.0, 3),
 'neptune.': (0.0, 107201),
 'smurf.': (0.0, 280790),
 'guess_passwd.': (144.0, 53),
 'pod.': (0.0, 264),
 'teardrop.': (0.0, 979),
 'portsweep.': (1991911.0, 1040),
 'ipsweep.': (43.0, 1247),
 'land.': (0.0, 21),
 'ftp_write.': (259.0, 8),
 'back.': (284.0, 2203),
 'imap.': (72.0, 12),
 'satan.': (64.0, 1589),
 'phf.': (18.0, 4),
 'nmap.': (0.0, 231),
 'multihop.': (1288.0, 7),
 'warezmaster.': (301.0, 20),
 'warezclient.': (627563.0, 1020),
 'spy.': (636.0, 2),
 'rootkit.': (1008.0, 10)}