# Transformações RDD e ações

Nesta palestra, começaremos a aprofundar o uso de Spark e Python. Por favor, veja os videos para uma explicação completa.

## Termos importantes

Vamos rapidamente passar por alguns termos importantes:

Termo                  |Definição
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformação          |Operação Spark que produz um RDD
Ação                   |Operação Spark que produz um objeto local
Spark Job              |Sequência de transformações em dados com uma ação final

## Criando um RDD

Existem duas formas comuns de criar um RDD:

Método                                   |Resultado
----------                               |-------
`sc.parallelize(array)`                  |Cria um array de elementos RDD
`sc.textFile(path/to/file)`                      |Cria um RDD de linhas de um arquivo

## Transformação RDD

Podemos usar transformações para criar um conjunto de instruções que queremos executar no RDD (antes de chamar uma ação e realmente executá-las).

Exemplo de Transformação | Resultado
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Descarte elementos não-pares
`map(lambda x: x * 2)`                   |Multiplica cada elemento RDD por `2`
`map(lambda x: x.split())`               |Divida cada string em palavras
`flatMap(lambda x: x.split())`           |Divide cada string em palavras e aplique a seqüência
`sample(withReplacement=True,0.25)`      |Crie uma amostra de 25% dos elementos com substituição
`union(rdd)`                             |Anexar `rdd` ao RDD existente
`distinct()`                             |Remover duplicatas no RDD
`sortBy(lambda x: x, ascending=False)`   |Classificar elementos em ordem decrescente

## Ações RDD

Depois de ter sua "receita" de transformações pronta, o que você fará em seguida é executá-las chamando uma ação. Aqui estão algumas ações comuns:

Ação                             |Resultado
----------                             |-------
`collect()`                            |Converta o RDD em uma lista na memória
`take(3)`                              |Pega os 3 primeiros elementos do RDD
`top(3)`                               |Pega os top 3 elementos do RDD
`takeSample(withReplacement=True,3)`   |Crie uma amostra de 3 elementos com substituição
`sum()`                                |Soma (Assume elementos numéricos)
`mean()`                               |Média (Assume elementos numéricos)
`stdev()`                              |Desvio padrão (Assume elementos numéricos)

____
# Exemplos

Agora, a melhor maneira de mostrar tudo isso é através de exemplos! Primeiro, revisaremos um pouco criando e trabalhando com um arquivo de texto simples, então passaremos a dados mais realistas, como clientes e dados de vendas.

### Criando um RDD a partir de um arquivo de texto:

** Criando o arquivo de texto **

In [1]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


Agora vamos fazer algumas ações nesse arquivo

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# Mostre o RDD
sc.textFile('example2.txt')

MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [5]:
# Salve a referência desse RDD
text_rdd = sc.textFile('example2.txt')

In [7]:
# Mapeie a função para cada linha
# E então colete os resultados
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

## Map vs flatMap

In [8]:
# Colecione tudo como um único map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# RDDs e pares de valores-chave

Agora que já trabalhamos com RDDs e sabemos como agregar valores com eles, podemos começar a procurar trabalhar com Key Value Pairs. Para fazer isso, vamos criar alguns dados falsos como um novo arquivo de texto.

Estes dados representam alguns serviços vendidos aos clientes para alguns negócios SAAS.

In [9]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [10]:
services = sc.textFile('services.txt')

In [11]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [12]:
services.map(lambda x: x.split())

PythonRDD[10] at RDD at PythonRDD.scala:43

In [13]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

Vamos remover essa primeira hash-tag!

In [26]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [27]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## Usando pares de valores-chave para operações

Vamos agora começar a usar métodos que combinam expressões lambda que usam um argumento ByKey. Esses métodos ByKey assumirão que seus dados estão em um formulário Chave, Valor.

Por exemplo, vamos descobrir as vendas totais por estado:

In [28]:
# Do anterior
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())

In [29]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [52]:
# Comecemos praticando apanhando campos
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [43]:
# Continue com reductionByKey
# Observe como ele assume que o primeiro item é a chave!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Uh oh! Parece que esquecemos que os montantes ainda são strings! Vamos consertar isso:

In [42]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

Podemos continuar nossa análise ordenando esse resultado:

In [69]:
# Puxa o estado e a quantidade
# Adicione-os
# Livre-se de ('Estado', 'Montante')
# Classifique-os pelo valor estimado
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

** Lembre-se de tentar usar o desmembramento para facilitar a leitura. Por exemplo: **

In [78]:
x = ['ID','State','Amount']

In [79]:
def func1(lst):
    return lst[-1]

In [83]:
def func2(id_st_amt):
    # Desmembre os valores
    (Id,st,amt) = id_st_amt
    return amt

In [84]:
func1(x)

'Amount'

In [85]:
func2(x)

'Amount'