# P05: Introdução ao Spark com Python


## Agregações de Dados em RDDs: `reduce`, `fold` e `aggregate`

## Como obter o `SparkContext`

In [None]:
import pyspark
sc = pyspark.SparkContext(appName="P5")
sc

## Obtendo o conjunto de dados de análise reduzido e criando o RDD

Usaremos um conjunto reduzido de dados (10%) da Copa KDD de 1999, que contém quase meio milhão de registros. O arquivo é fornecido como um *Gzip*.

In [2]:
import urllib.request as request
f = request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [3]:
nome_arquivo = "./kddcup.data_10_percent.gz"
rdd = sc.textFile(nome_arquivo)

## Inspecionando a duração da interação por tipo

Tanto `fold` quando o `reduce` aceitam uma função como argumento, que é aplicada a dois elementos do RDD.  Ao contrário da ação `reduce`, a ação `fold` recebe um valor zero inicial adicional para ser usado na chamada inicial.

Suponha que queiramos saber a duração total das interações normais e de ataque. Poderíamos usar o `reduce` da seguinte forma:

In [6]:
csv_rdd = rdd.map(lambda x: x.split(","))

# separa em dois RDDs
normal_csv_rdd = csv_rdd.filter(lambda x: x[41]=="normal.")
ataque_csv_rdd = csv_rdd.filter(lambda x: x[41]!="normal.")

A função passada para a 'redução' recebe e retorna elementos do mesmo tipo do RDD. Se quisermos a soma das durações, precisamos fazer a extração em um novo RDD.

In [10]:
soma_duracao_normal_rdd = normal_csv_rdd.map(lambda x: int(x[0]))
soma_duracao_ataque_rdd = ataque_csv_rdd.map(lambda x: int(x[0]))

Agora podemos fazer a redução em dois novos RDDs.

In [16]:
soma_duracao_normal = soma_duracao_normal_rdd.reduce(lambda x, y: x + y)
soma_duracao_ataque = soma_duracao_ataque_rdd.reduce(lambda x, y: x + y)

print("A soma das durações das interações normais é {} horas".format(round(soma_duracao_normal/3600,1)))
print("A soma das durações das interações de ataque é {} horas".format(round(soma_duracao_ataque/3600,1)))

A soma das durações das interações normais é 5854.4 horas
A soma das durações das interações de ataque é 729.7 horas


Indo além, podemos utilizar contagens para calcular a duração média.

In [20]:
contagem_normal = soma_duracao_normal_rdd.count()
contagem_ataque = soma_duracao_ataque_rdd.count()

print("A duração média das interações normais é {} segundos".format(round(soma_duracao_normal/float(contagem_normal),3)))
print("A duração média das interações de ataque é {} segundos".format(round(soma_duracao_ataque/float(contagem_ataque),3)))

A duração média das interações normais é 216.657 segundos
A duração média das interações de ataque é 6.621 segundos


Embora funcione, essa abordagem para detectar interações de ataque ainda é bem simplista

## Usando `aggregate`  

A ação `aggregate` nos livra da restrição de ter que retornar elementos do mesmo tipo do RDD. Assim como no `fold`, fornecemos um valor de zero inicial. Então, fornecemos duas funções. A primeira é utilizada para combinar os elementos do nosso RDD com um acumulador. A segunda função é necessária para mesclar dois acumuladores.

In [22]:
contagem_soma_normal = soma_duracao_normal_rdd.aggregate(
    (0,0), # o valor inicial
    (lambda acum, valor: (acum[0]+valor, acum[1]+1)), # combina valor com o acumulador
    (lambda acum1, acum2: (acum1[0] + acum2[0], acum1[1] + acum2[1])) # combina acumuladores
)

duracao_media_normal = round(contagem_soma_normal[0] / float(contagem_soma_normal[1]), 3)
print("A duração média das interações normais é {} segundos".format(duracao_media_normal))

A duração média das interações normais é 216.657 segundos


Na agregação anterior, o primeiro elemento do acumulador guarda a soma total ao passo que o segundo elemento guarda a contagem; Combinar o acumulador com o elemento do RDD consiste em somar os valores e incrementar a contagem. Combinar dois acumuladores requer apenas uma soma de dois fatores.

Podemos fazer o mesmo com as interações de ataque.

In [25]:
contagem_soma_ataque = soma_duracao_ataque_rdd.aggregate(
    (0,0), # o valor inicial
    (lambda acum, valor: (acum[0] + valor, acum[1] + 1)), # combina valor com o acumulador
    (lambda acum1, acum2: (acum1[0] + acum2[0], acum1[1] + acum2[1])) # combina acumuladores
)

duracao_media_ataque = round(contagem_soma_ataque[0]/float(contagem_soma_ataque[1]), 3)
print("A duração média para interações de ataque é {} segundos".format(duracao_media_ataque))

A duração média para interações de ataque é 6.621 segundos
