<a href="https://colab.research.google.com/github/nortonvanz/PySpark-Basics/blob/main/notebooks/Case_1_PySpark_com_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

**Spark + Python = PySpark**

Esse notebook introduz os conceitos básicos do Spark através de sua interface com a linguagem Python. Como aplicação inicial faremos o clássico examplo de contador de palavras . Com esse exemplo é possível entender a lógica de programação funcional para as diversas tarefas de exploração de dados distribuídos.
Para isso utilizaremos o livro texto Trabalhos completos de William Shakespeare obtidos do Projeto Gutenberg. Veremos que esse mesmo algoritmo pode ser empregado em textos de qualquer tamanho.

**Esse notebook contém:**

Parte 1: Criando uma base RDD e RDDs de tuplas

Parte 2: Manipulando RDDs de tuplas

Parte 3: Encontrando palavras únicas e calculando médias

Parte 4: Aplicar contagem de palavras em um arquivo

Para os exercícios é aconselhável consultar a documentação da API do PySpark

** Part 1: Criando e Manipulando RDDs **

Nessa parte do notebook vamos criar uma base RDD a partir de uma lista com o comando parallelize.

** (1a) Criando uma base RDD **

Podemos criar uma base RDD de diversos tipos e fonte do Python com o comando sc.parallelize(fonte, particoes), sendo fonte uma variável contendo os dados (ex.: uma lista) e particoes o número de partições para trabalhar em paralelo.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkFiles
from pyspark import SparkContext

In [None]:
#sempre vou criar um contexto:
spark = SparkContext.getOrCreate()
spark

In [None]:
ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
#paralelizar em 6
palavrasRDD = spark.parallelize(ListaPalavras, 6)
#mostrar o tipo da estrutura de dados: RDD
print(type(palavrasRDD))

<class 'pyspark.rdd.RDD'>


** (1b) Plural **

Vamos criar uma função que transforma uma palavra no plural adicionando uma letra 's' ao final da string. Em seguida vamos utilizar a função map() para aplicar a transformação em cada palavra do RDD.

Em Python (e muitas outras linguagens) a concatenação de strings é custosa. Uma alternativa melhor é criar uma nova string utilizando str.format().

Nota: a string entre os conjuntos de três aspas representa a documentação da função. Essa documentação é exibida com o comando help(). Vamos utilizar a padronização de documentação sugerida para o Python, manteremos essa documentação em inglês.

In [None]:
# EXERCICIO
def Plural(palavra):
    """Adds an 's' to `palavra`.

    Args:
        palavra (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return f"{palavra}s"



print (Plural('gato'))

gatos


In [None]:
help(Plural)

Help on function Plural in module __main__:

Plural(palavra)
    Adds an 's' to `palavra`.
    
    Args:
        palavra (str): A string.
    
    Returns:
        str: A string with 's' added to it.



In [None]:
#teste unitário: testa o que tá a direita do assert. Se for falso, exibe o que tá no segudno parâmetro: 'resultado incorreto!'
assert Plural('rato')=='ratos', 'resultado incorreto!'
print ('OK')

OK


** (1c) Aplicando a função ao RDD **

Transforme cada palavra do nosso RDD em plural usando map()

Em seguida, utilizaremos o comando collect() que retorna a RDD como uma lista do Python.

In [None]:
# EXERCICIO
pluralRDD = palavrasRDD.map(lambda x : Plural(x))
print (pluralRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [None]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print('OK')

OK


** Nota: ** utilize o comando collect() apenas quando tiver certeza de que a lista caberá na memória. Para gravar os resultados de volta em arquivo texto ou base de dados utilizaremos outro comando.

** (1d) Utilizando uma função lambda **

Repita a criação de um RDD de plurais, porém utilizando uma função lambda.


In [None]:
# EXERCICIO
pluralLambdaRDD = palavrasRDD.map(lambda x : f"{x}s")
print (pluralLambdaRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [None]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print ('OK')

OK


** (1e) Tamanho de cada palavra **

Agora use map() e uma função lambda para retornar o número de caracteres em cada palavra. Utilize collect() para armazenar o resultado em forma de listas na variável destino.

In [None]:
# EXERCICIO
pluralTamanho = (pluralRDD.map(lambda x : len(x)).collect() )
print (pluralTamanho)

[5, 9, 5, 5, 5]


In [None]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print ("OK")

OK



** (1f) RDDs de pares e tuplas **

Para contar a frequência de cada palavra de maneira distribuída, primeiro devemos atribuir um valor para cada palavra do RDD. Isso irá gerar um base de dados (chave, valor). Desse modo podemos agrupar a base através da chave, calculando a soma dos valores atribuídos. No nosso caso, vamos atribuir o valor 1 para cada palavra.

Um RDD contendo a estrutura de tupla chave-valor (k,v) é chamada de RDD de tuplas ou pair RDD.

Vamos criar nosso RDD de pares usando a transformação map() com uma função lambda().

In [None]:

# EXERCICIO
palavraPar = palavrasRDD.map(lambda x : (x, 1))
print (palavraPar.collect())

[('gato', 1), ('elefante', 1), ('rato', 1), ('rato', 1), ('gato', 1)]


In [None]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print ("OK")


OK


# **Parte 2: Manipulando RDD de tuplas **

Vamos manipular nossa RDD para contar as palavras do texto.

** (2a) Função groupByKey() **

A função groupByKey() agrupa todos os valores de um RDD através da chave (primeiro elemento da tupla) agregando os valores em uma lista.
Essa abordagem tem um ponto fraco pois:

A operação requer que os dados distribuídos sejam movidos em massa para que permaneçam na partição correta.

As listas podem se tornar muito grandes. Imagine contar todas as palavras do Wikipedia: termos comuns como "a", "e" formarão uma lista enorme de valores que pode não caber na memória do processo escravo.


In [None]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey().mapValues(list)
print(palavrasGrupo.collect())


[('elefante', [1]), ('rato', [1, 1]), ('gato', [1, 1])]


In [None]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) == [('elefante', [1]), ('gato',[1, 1]), ('rato',[1, 1])], 'Valores incorretos!'
print ("OK")

OK


* (2b) Calculando as contagens **

Após o groupByKey() nossa RDD contém elementos compostos da palavra, como chave, e um iterador contendo todos os valores correspondentes aquela chave.

Utilizando a transformação mapValues() e a função sum(), contrua um novo RDD que consiste de tuplas (chave, soma).

In [None]:
# EXERCICIO
#contagemGroup = palavrasGrupo.map(lambda x : (x[0], sum(x[1])))
def f(x): return sum(x)
contagemGroup = palavrasGrupo.mapValues(f)


print (contagemGroup.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [None]:
 assert list(sorted(contagemGroup.collect()))==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


** (2c) reduceByKey **

Um comando mais interessante para a contagem é o reduceByKey() que cria uma nova RDD de tuplas.

Essa transformação aplica a transformação reduce() vista na aula anterior para os valores de cada chave. Dessa forma, a função de transformação pode ser aplicada em cada partição local para depois ser enviada para redistribuição de partições, reduzindo o total de dados sendo movidos e não mantendo listas grandes na memória.

In [None]:
from operator import add
# EXERCICIO
contagem = palavraPar.reduceByKey(add)
print (contagem.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [None]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


** (2d) Agrupando os comandos **

A forma mais usual de realizar essa tarefa, partindo do nosso RDD palavrasRDD, é encadear os comandos map e reduceByKey em uma linha de comando.

In [None]:
palavrasRDD.collect()

['gato', 'elefante', 'rato', 'rato', 'gato']

In [None]:
# EXERCICIO
contagemFinal = (palavrasRDD
                 .map(lambda x : (x, 1))
                 .reduceByKey(lambda x, y : x+y)
                 )
print (contagemFinal)

PythonRDD[77] at RDD at PythonRDD.scala:53


In [None]:
assert sorted(contagemFinal)==[('elefante', 1), ('gato', 2), ('rato', 2)], 'valores incorretos!'
print ("OK")

OK


 Parte 3: Encontrando as palavras únicas e calculando a média de contagem

** (3a) Palavras Únicas **

Calcule a quantidade de palavras únicas do RDD. Utilize comandos de RDD da API do PySpark e alguma das últimas RDDs geradas nos exercícios anteriores.

In [None]:
# EXERCICIO
palavrasUnicas = contagemGroup.count()
print (palavrasUnicas)

3


In [None]:
assert palavrasUnicas==3, 'valor incorreto!'
print ("OK")

OK


** (3b) Calculando a Média de contagem de palavras **

Encontre a média de frequência das palavras utilizando o RDD contagem.

Note que a função do comando reduce() é aplicada em cada tupla do RDD. Para realizar a soma das contagens, primeiro é necessário mapear o RDD para um RDD contendo apenas os valores das frequências (sem as chaves).

In [None]:
contagemFinal.collect()

[('elefante', 1), ('rato', 2), ('gato', 2)]

In [None]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
from operator import add
total = (contagemFinal
         .map(lambda x : x[1])
         .reduce(add)
         )
media = total / float(palavrasUnicas)
print (total)
print (round(media, 2))

5
1.67


In [None]:

assert round(media, 2)==1.67, 'valores incorretos!'
print ("OK")

OK


** Parte 4: Aplicar nosso algoritmo em um arquivo **

** (4a) Função contaPalavras **

Para podermos aplicar nosso algoritmo genéricamente em diversos RDDs, vamos primeiro criar uma função para aplicá-lo em qualquer fonte de dados. Essa função recebe de entrada um RDD contendo uma lista de chaves (palavras) e retorna um RDD de tuplas com as chaves e a contagem delas nessa RDD

In [None]:
# EXERCICIO
def contaPalavras(chavesRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        chavesRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (chavesRDD
            .map(lambda x : (x, 1))
            .reduceByKey(add)
           )

print (contaPalavras(palavrasRDD).collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [None]:

assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


** (4b) Normalizando o texto **
Quando trabalhamos com dados reais, geralmente precisamos padronizar os atributos de tal forma que diferenças sutis por conta de erro de medição ou diferença de normatização, sejam desconsideradas. Para o próximo passo vamos padronizar o texto para:

* Padronizar a capitalização das palavras (tudo maiúsculo ou tudo minúsculo).

* Remover pontuação.

* Remover espaços no início e no final da palavra.

Crie uma função removerPontuacao que converte todo o texto para minúscula, remove qualquer pontuação e espaços em branco no início ou final da palavra. Para isso, utilize a biblioteca re para remover todo texto que não seja letra, número ou espaço, encadeando com as funções de string para remover espaços em branco e converter para minúscula (veja Strings).

In [None]:
# EXERCICIO
import re
def removerPontuacao(texto):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        texto (str): A string.

    Returns:
        str: The cleaned up string.
    """