![CMCC](http://cmcc.ufabc.edu.br/images/logo_site.jpg)
# **Spark + Python = PySpark**

#### Esse notebook introduz os conceitos básicos do Spark através de sua interface com a linguagem Python. Como aplicação inicial faremos o clássico examplo de contador de palavras . Com esse exemplo é possível entender a lógica de programação funcional para as diversas tarefas de exploração de dados distribuídos.

#### Para isso utilizaremos o livro texto [Trabalhos completos de William Shakespeare](http://www.gutenberg.org/ebooks/100) obtidos do  [Projeto Gutenberg](http://www.gutenberg.org/wiki/Main_Page).  Veremos que esse mesmo algoritmo pode ser empregado em textos de qualquer tamanho.

#### ** Esse notebook contém:  **
#### *Parte 1:* Criando uma base RDD e RDDs de tuplas
#### *Parte 2:* Manipulando RDDs de tuplas
#### *Parte 3:* Encontrando palavras únicas e calculando médias
#### *Parte 4:* Aplicar contagem de palavras em um arquivo
#### *Parte 5:* Similaridade entre Objetos
#### Para os exercícios é aconselhável consultar a documentação da [API do PySpark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### ** Part 1: Criando e Manipulando RDDs **

#### Nessa parte do notebook vamos criar uma base RDD a partir de uma lista com o comando `parallelize`.

#### ** (1a) Criando uma base RDD **
#### Podemos criar uma base RDD de diversos tipos e fonte do Python com o comando `sc.parallelize(fonte, particoes)`, sendo fonte uma variável contendo os dados (ex.: uma lista) e particoes o número de partições para trabalhar em paralelo.

In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext.getOrCreate()

ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
palavrasRDD = sc.parallelize(ListaPalavras, 4)
print(type(palavrasRDD))

<class 'pyspark.rdd.RDD'>


#### ** (1b) Plural **

#### Vamos criar uma função que transforma uma palavra no plural adicionando uma letra 's' ao final da string. Em seguida vamos utilizar a função `map()` para aplicar a transformação em cada palavra do RDD.

#### Em Python (e muitas outras linguagens) a concatenação de strings é custosa. Uma alternativa melhor é criar uma nova string utilizando [`str.format()`](https://docs.python.org/2/library/string.html#format-string-syntax).

#### Nota: a string entre os conjuntos de três aspas representa a documentação da função. Essa documentação é exibida com o comando `help()`. Vamos utilizar a padronização de documentação sugerida para o Python, manteremos essa documentação em inglês.

In [2]:
# EXERCICIO
def Plural(palavra):
    """Adds an 's' to `palavra`.

    Args:
        palavra (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return str.format(palavra + "s")

print(Plural('gato'))

gatos


In [3]:
help(Plural)

Help on function Plural in module __main__:

Plural(palavra)
    Adds an 's' to `palavra`.
    
    Args:
        palavra (str): A string.
    
    Returns:
        str: A string with 's' added to it.



In [4]:
assert Plural('rato')=='ratos', 'resultado incorreto!'
print ('OK')

OK


#### ** (1c) Aplicando a função ao RDD **
#### Transforme cada palavra do nosso RDD em plural usando [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) 

#### Em seguida, utilizaremos o comando  [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) que retorna a RDD como uma lista do Python.

In [5]:
# EXERCICIO
pluralRDD = palavrasRDD.map(Plural)
print (pluralRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [6]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print ('OK')

OK


#### ** Nota: ** utilize o comando `collect()` apenas quando tiver certeza de que a lista caberá na memória. Para gravar os resultados de volta em arquivo texto ou base de dados utilizaremos outro comando.

#### ** (1d) Utilizando uma função `lambda` **
#### Repita a criação de um RDD de plurais, porém utilizando uma função lambda.

In [7]:
# EXERCICIO
pluralLambdaRDD = palavrasRDD.map(lambda palavra : palavra + "s")
print (pluralLambdaRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [8]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print ('OK')

OK


#### ** (1e) Tamanho de cada palavra **
#### Agora use `map()` e uma função `lambda` para retornar o número de caracteres em cada palavra. Utilize `collect()` para armazenar o resultado em forma de listas na variável destino.

In [9]:
# EXERCICIO
pluralTamanho = (pluralRDD
                 .map(lambda palavra : len(palavra))
                 .collect()
                 )
print (pluralTamanho)

[5, 9, 5, 5, 5]


In [10]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print ("OK")

OK


#### ** (1f) RDDs de pares e tuplas **

#### Para contar a frequência de cada palavra de maneira distribuída, primeiro devemos atribuir um valor para cada palavra do RDD. Isso irá gerar um base de dados (chave, valor). Desse modo podemos agrupar a base através da chave, calculando a soma dos valores atribuídos. No nosso caso, vamos atribuir o valor `1` para cada palavra.

#### Um RDD contendo a estrutura de tupla chave-valor `(k,v) ` é chamada de RDD de tuplas ou *pair RDD*.

#### Vamos criar nosso RDD de pares usando a transformação  `map()` com uma função `lambda()`.

In [11]:
# EXERCICIO
palavraPar = palavrasRDD.map(lambda palavra : (palavra, 1))
print (palavraPar.collect())

[('gato', 1), ('elefante', 1), ('rato', 1), ('rato', 1), ('gato', 1)]


In [12]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print ("Ok")

Ok


### ** Parte 2: Manipulando RDD de tuplas **

#### Vamos manipular nossa RDD para contar as palavras do texto.

#### ** (2a) Função `groupByKey()`  **

#### A função [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) agrupa todos os valores de um RDD através da chave (primeiro elemento da tupla) agregando os valores em uma lista.

#### Essa abordagem tem um ponto fraco pois:
  + #### A operação requer que os dados distribuídos sejam movidos em massa para que permaneçam na partição correta.
  + #### As listas podem se tornar muito grandes. Imagine contar todas as palavras do Wikipedia: termos comuns como "a", "e" formarão uma lista enorme de valores que pode não caber na memória do processo escravo.
 

In [13]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey()
for chave, valor in palavrasGrupo.collect():
    valores = list(valor)
    print(f'{chave}: {valores}')

elefante: [1]
rato: [1, 1]
gato: [1, 1]


In [14]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) == [('elefante', [1]), ('gato',[1, 1]), ('rato',[1, 1])], 'Valores incorretos!'
print ("OK")

OK


#### ** (2b) Calculando as contagens **
#### Após o  `groupByKey()` nossa RDD contém elementos compostos da palavra, como chave, e um iterador contendo todos os valores correspondentes aquela chave.
#### Utilizando a transformação `map()` e a função `sum()`, contrua um novo RDD que consiste de tuplas (chave, soma).

In [15]:
# EXERCICIO
contagemGroup = palavrasGrupo.mapValues(lambda x : sum(x))
print (contagemGroup.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [16]:
assert sorted(contagemGroup.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### ** (2c) `reduceByKey` **
#### Um comando mais interessante para a contagem é o  [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) que cria uma nova RDD de tuplas.

#### Essa transformação aplica a transformação `reduce()` vista na aula anterior para os valores de cada chave. Dessa forma, a função de transformação pode ser aplicada em cada partição local para depois ser enviada para redistribuição de partições, reduzindo o total de dados sendo movidos e não mantendo listas grandes na memória.

In [17]:
# EXERCICIO
contagem = palavraPar.reduceByKey(lambda x,y:x+y)
print( contagem.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [18]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### ** (2d) Agrupando os comandos **

#### A forma mais usual de realizar essa tarefa, partindo do nosso RDD palavrasRDD, é encadear os comandos map e reduceByKey em uma linha de comando.

In [19]:
# EXERCICIO
contagemFinal = (palavrasRDD
                 .map(lambda palavra : (palavra, 1))
                 .reduceByKey(lambda x,y:x+y)                
                 )
print (contagemFinal.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [20]:
assert sorted(contagemFinal.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


### ** Parte 3: Encontrando as palavras únicas e calculando a média de contagem **

#### ** (3a) Palavras Únicas **

#### Calcule a quantidade de palavras únicas do RDD. Utilize o mesmo pipeline anterior porém substituindo `.collect()` por outro método do API da RDD.

In [21]:
# EXERCICIO
palavrasUnicas = (contagemFinal
                  .count()
                 )
print (palavrasUnicas)

3


In [22]:
assert palavrasUnicas==3, 'valor incorreto!'
print ("OK")

OK


#### ** (3b) Calculando a Média de contagem de palavras **

#### Encontre a média de frequência das palavras utilizando o RDD `contagem`.

#### Note que a função do comando `reduce()` é aplicada em cada tupla do RDD. Para realizar a soma das contagens, primeiro é necessário mapear o RDD para um RDD contendo apenas os valores das frequências (sem as chaves).

In [23]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
from operator import add
total = (contagemFinal
         .map(lambda x: x[1])
         .reduce(add)         
         )
media = total / float(palavrasUnicas)
print (total)
print (round(media, 2))

5
1.67


In [24]:
assert round(media, 2)==1.67, 'valores incorretos!'
print ("OK")

OK


### ** Parte 4: Aplicar nosso algoritmo em um arquivo **

#### ** (4a) Função `contaPalavras` **

#### Para podermos aplicar nosso algoritmo genéricamente em diversos RDDs, vamos primeiro criar uma função para aplicá-lo em qualquer fonte de dados. Essa função recebe de entrada um RDD contendo uma lista de chaves (palavras) e retorna um RDD de tuplas com as chaves e a contagem delas nessa RDD

In [25]:
# EXERCICIO
def contaPalavras(chavesRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        chavesRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (chavesRDD
            .map(lambda palavra : (palavra, 1))
            .reduceByKey(add)
           )

print (contaPalavras(palavrasRDD).collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [26]:
assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### ** (4b) Normalizando o texto **

#### Quando trabalhamos com dados reais, geralmente precisamos padronizar os atributos de tal forma que diferenças sutis por conta de erro de medição ou diferença de normatização, sejam desconsideradas. Para o próximo passo vamos padronizar o texto para:
  + #### Padronizar a capitalização das palavras (tudo maiúsculo ou tudo minúsculo).
  + #### Remover pontuação.
  + #### Remover espaços no início e no final da palavra.
 
#### Crie uma função `removerPontuacao` que converte todo o texto para minúscula, remove qualquer pontuação e espaços em branco no início ou final da palavra. Para isso, utilize a biblioteca [re](https://docs.python.org/2/library/re.html) para remover todo texto que não seja letra, número ou espaço, encadeando com as funções de string para remover espaços em branco e converter para minúscula (veja [Strings](https://docs.python.org/2/library/stdtypes.html?highlight=str.lower#string-methods)).

In [27]:
# EXERCICIO
import re
def removerPontuacao(texto):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        texto (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^A-Za-z0-9 ]', '', texto).strip().lower()
print (removerPontuacao('Ola, quem esta ai??!'))
print (removerPontuacao(' Sem espaco e_sublinhado!'))

ola quem esta ai
sem espaco esublinhado


In [28]:
assert removerPontuacao(' O uso de virgulas, embora permitido, nao deve contar. ')=='o uso de virgulas embora permitido nao deve contar', 'string incorreta!'
print ("OK")

OK


#### ** (4c) Carregando arquivo texto  **

#### Para a próxima parte vamos utilizar o livro [Trabalhos completos de William Shakespeare](http://www.gutenberg.org/ebooks/100) do [Projeto Gutenberg](http://www.gutenberg.org/wiki/Main_Page). 

#### Para converter um texto em uma RDD, utilizamos a função `textFile()` que recebe como entrada o nome do arquivo texto que queremos utilizar e o número de partições.

#### O nome do arquivo texto pode se referir a um arquivo local ou uma URI de arquivo distribuído (ex.: hdfs://).

#### Vamos também aplicar a função `removerPontuacao()` para normalizar o texto e verificar as 15 primeiras linhas com o comando `take()`.

In [29]:
# Apenas execute a célula
import os.path

arquivo = os.path.join('Data', 'pg100.txt') 

# lê o arquivo com textFile e aplica a função removerPontuacao        
shakesRDD = (sc
             .textFile(arquivo, 8)
             .map(removerPontuacao)
             )

# zipWithIndex gera tuplas (conteudo, indice) onde indice é a posição do conteudo na lista sequencial
# Ex.: sc.parallelize(['gato','cachorro','boi']).zipWithIndex() ==> [('gato',0), ('cachorro',1), ('boi',2)]
# sep.join() junta as strings de uma lista através do separador sep. Ex.: ','.join(['a','b','c']) ==> 'a,b,c'
print ('\n'.join(shakesRDD
                .zipWithIndex()
                .map(lambda linha: '{0}: {1}'.format(linha[0],linha[1]))
                .take(15)
               ))

: 0
project gutenbergs the complete works of william shakespeare by william: 1
shakespeare: 2
: 3
this ebook is for the use of anyone anywhere in the united states and: 4
most other parts of the world at no cost and with almost no restrictions: 5
whatsoever  you may copy it give it away or reuse it under the terms: 6
of the project gutenberg license included with this ebook or online at: 7
wwwgutenbergorg  if you are not located in the united states youll: 8
have to check the laws of the country where you are located before using: 9
this ebook: 10
: 11
see at the end of this file  content note added in 2017: 12
: 13
: 14


#### ** (4d) Extraindo as palavras **
#### Antes de poder usar nossa função  `contaPalavras()`, temos ainda que trabalhar em cima da nossa RDD:
  + #### Precisamos gerar listas de palavras ao invés de listas de sentenças.
  + #### Eliminar linhas vazias.
 
#### As strings em Python tem o método [split()](https://docs.python.org/2/library/string.html#string.split) que faz a separação de uma string por separador. No nosso caso, queremos separar as strings por espaço. 

#### Utilize a função `map()` para gerar um novo RDD como uma lista de palavras.

In [30]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.map(lambda x: x.split())
total = shakesPalavrasRDD.count()
print (shakesPalavrasRDD.take(5))
print (total)

[[], ['project', 'gutenbergs', 'the', 'complete', 'works', 'of', 'william', 'shakespeare', 'by', 'william'], ['shakespeare'], [], ['this', 'ebook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'in', 'the', 'united', 'states', 'and']]
147929


#### Conforme deve ter percebido, o uso da função `map()` gera uma lista para cada linha, criando um RDD contendo uma lista de listas.

#### Para resolver esse problema, o Spark possui uma função análoga chamada `flatMap()` que aplica a transformação do `map()`, porém *achatando* o retorno em forma de lista para uma lista unidimensional.

In [31]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.flatMap(lambda x: x.split())
total = shakesPalavrasRDD.count()
print (shakesPalavrasRDD.take(5))
print (total)

['project', 'gutenbergs', 'the', 'complete', 'works']
959359


In [32]:
assert total==959359, "valor incorreto de palavras!"
print ("OK")
assert shakesPalavrasRDD.take(5)==['project', 'gutenbergs', 'the', 'complete', 'works'],'lista incorreta de palavras'
print ("OK")

OK
OK


#### Reparem que o `flatMap` já eliminou as entradas vazias.

#### ** (4f) Contagem de palavras **
#### Agora que nossa RDD contém uma lista de palavras, podemos aplicar nossa função `contaPalavras()`.

#### Aplique a função em nossa RDD e utilize a função `takeOrdered` para imprimir as 15 palavras mais frequentes.

#### `takeOrdered()` pode receber um segundo parâmetro que instrui o Spark em como ordenar os elementos. Ex.:

#### `takeOrdered(15, key=lambda x: -x)`: ordem decrescente dos valores de x

In [33]:
# EXERCICIO
top15 = contaPalavras(shakesPalavrasRDD).takeOrdered(15, key=lambda x: -x[1])
print ('\n'.join(map(lambda x: f'{x[0]}: {x[1]}', top15)))

the: 29996
and: 28353
i: 21860
to: 20885
of: 18811
a: 15992
you: 14439
my: 13191
in: 12027
that: 11782
is: 9711
not: 9068
with: 8521
me: 8271
for: 8184


In [34]:
assert top15 == [('the', 29996), ('and', 28353), ('i', 21860), ('to', 20885), ('of', 18811), ('a', 15992), ('you', 14439), ('my', 13191), ('in', 12027), ('that', 11782), ('is', 9711), ('not', 9068), ('with', 8521), ('me', 8271), ('for', 8184)],'valores incorretos!'
print ("OK")

OK


### ** Parte 5: Similaridade entre Objetos **

### Nessa parte do laboratório vamos aprender a calcular a distância entre atributos numéricos, categóricos e textuais.

#### ** (5a) Vetores no espaço Euclidiano **

#### Quando nossos objetos são representados no espaço Euclidiano, medimos a similaridade entre eles através da *p-Norma* definida por:

#### $$d(x,y,p) = (\sum_{i=1}^{n}{|x_i - y_i|^p})^{1/p}$$

#### As normas mais utilizadas são $p=1,2,\infty$ que se reduzem em distância absoluta, Euclidiana e máxima distância:

#### $$d(x,y,1) = \sum_{i=1}^{n}{|x_i - y_i|}$$

#### $$d(x,y,2) = (\sum_{i=1}^{n}{|x_i - y_i|^2})^{1/2}$$

#### $$d(x,y,\infty) = \max(|x_1 - y_1|,|x_2 - y_2|, ..., |x_n - y_n|)$$

In [35]:
import numpy as np

# Vamos criar uma função pNorm que recebe como parâmetro p e retorna uma função que calcula a pNorma
def pNorm(p):
    """Generates a function to calculate the p-Norm between two points.

    Args:
        p (int): The integer p.

    Returns:
        Dist: A function that calculates the p-Norm.
    """

    def Dist(x,y):
        return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
    return Dist

In [36]:
# Vamos criar uma RDD com valores numéricos
np.random.seed(42)
numPointsRDD = sc.parallelize(enumerate(np.random.random(size=(10,100))))

In [37]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartPointsRDD = numPointsRDD.cartesian(numPointsRDD)

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
#print(cartPointsRDD.take(1))
cartPointsParesRDD = cartPointsRDD.map(lambda x: ((x[0][0], x[1][0]), (x[0][1], x[1][1])))

# Aplique um mapa para calcular a Distância Euclidiana entre os pares
Euclid = pNorm(2)
distRDD = cartPointsParesRDD.map(lambda x: (x[0], Euclid(x[1][0], x[1][1])))

# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statRDD = distRDD.map(lambda x: x[1])

minv, maxv, meanv = statRDD.min(), statRDD.max(), statRDD.mean()
print (minv, maxv, meanv)

0.0 4.709048183663605 3.75119168897537


In [38]:
assert (minv.round(2), maxv.round(2), meanv.round(2))==(0.0, 4.71, 3.75), 'Valores incorretos'
print ("OK")

OK


#### ** (5b) Valores Categóricos **

#### Quando nossos objetos são representados por atributos categóricos, eles não possuem uma similaridade espacial. Para calcularmos a similaridade entre eles podemos primeiro transformar nosso vetor de atrbutos em um vetor binário indicando, para cada possível valor de cada atributo, se ele possui esse atributo ou não.

#### Com o vetor binário podemos utilizar a distância de Hamming  definida por:

#### $$ H(x,y) = \sum_{i=1}^{n}{x_i != y_i} $$

#### Também é possível definir a distância de Jaccard como:

#### $$ J(x,y) = \frac{\sum_{i=1}^{n}{x_i == y_i} }{\sum_{i=1}^{n}{\max(x_i, y_i}) } $$

In [39]:
# Vamos criar uma função para calcular a distância de Hamming
def Hamming(x,y):
    """Calculates the Hamming distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        H (int): The Hamming distance between x and y.
    """
    return (x!=y).sum()

# Vamos criar uma função para calcular a distância de Jaccard
def Jaccard(x,y):
    """Calculates the Jaccard distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        J (int): The Jaccard distance between x and y.
    """
    return (x==y).sum()/float( np.maximum(x,y).sum() )

In [40]:
# Vamos criar uma RDD com valores categóricos
catPointsRDD = sc.parallelize(enumerate([['alto', 'caro', 'azul'],
                             ['medio', 'caro', 'verde'],
                             ['alto', 'barato', 'azul'],
                             ['medio', 'caro', 'vermelho'],
                             ['baixo', 'barato', 'verde'],
                            ]))

In [41]:
# EXERCICIO
# Crie um RDD de chaves únicas utilizando flatMap
chavesRDD = (catPointsRDD
             .flatMap(lambda palavra : palavra[1])
             .map(lambda palavra : (palavra,1))
             .reduceByKey(add) 
             .map(lambda palavra: palavra[0])
            )

print(chavesRDD.collect())             

chaves = dict((v,k) for k,v in enumerate(chavesRDD.collect()))
nchaves = len(chaves)
print(chaves, nchaves)

['baixo', 'barato', 'alto', 'medio', 'verde', 'vermelho', 'caro', 'azul']
{'baixo': 0, 'barato': 1, 'alto': 2, 'medio': 3, 'verde': 4, 'vermelho': 5, 'caro': 6, 'azul': 7} 8


In [42]:
assert chaves=={'baixo': 0, 'barato': 1, 'alto': 2, 'medio': 3, 'verde': 4, 'vermelho': 5, 'caro': 6, 'azul': 7}, 'valores incorretos!'
print ("OK")

assert nchaves==8, 'número de chaves incorreta'
print ("OK")

OK
OK


In [43]:
def CreateNP(atributos,chaves):  
    """Binarize the categorical vector using a dictionary of keys.

    Args:
        atributos (list): List of attributes of a given object.
        chaves (dict): dictionary with the relation attribute -> index

    Returns:
        array (np.array): Binary array of attributes.
    """
    
    array = np.zeros(len(chaves))
    for atr in atributos:
        array[ chaves[atr] ] = 1
    return array

# Converte o RDD para o formato binário, utilizando o dict chaves
binRDD = catPointsRDD.map(lambda rec: (rec[0],CreateNP(rec[1], chaves)))
binRDD.collect()

[(0, array([0., 0., 1., 0., 0., 0., 1., 1.])),
 (1, array([0., 0., 0., 1., 1., 0., 1., 0.])),
 (2, array([0., 1., 1., 0., 0., 0., 0., 1.])),
 (3, array([0., 0., 0., 1., 0., 1., 1., 0.])),
 (4, array([1., 1., 0., 0., 1., 0., 0., 0.]))]

In [44]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartBinRDD = binRDD.cartesian(binRDD)

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
#print(cartBinRDD.take(1))
cartBinParesRDD = cartBinRDD.map(lambda x: ((x[0][0], x[1][0]), (x[0][1], x[1][1])))

# Aplique um mapa para calcular a Distância de Hamming e Jaccard entre os pares
hamRDD = cartBinParesRDD.map(lambda x: Hamming(x[1][0], x[1][1]))
jacRDD = cartBinParesRDD.map(lambda x: Jaccard(x[1][0], x[1][1]))

# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statHRDD = hamRDD.map(lambda x : x)
statJRDD = jacRDD.map(lambda x : x)

Hmin, Hmax, Hmean = statHRDD.min(), statHRDD.max(), statHRDD.mean()
Jmin, Jmax, Jmean = statJRDD.min(), statJRDD.max(), statJRDD.mean()

print ("\t\tMin\tMax\tMean")
print ("Hamming:\t{:.2f}\t{:.2f}\t{:.2f}".format(Hmin, Hmax, Hmean ))
print ("Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean ))

		Min	Max	Mean
Hamming:	0.00	6.00	3.52
Jaccard:	0.33	2.67	1.14


In [45]:
assert (Hmin.round(2), Hmax.round(2), Hmean.round(2)) == (0.00,6.00,3.52), 'valores incorretos'
print ("OK")
assert (Jmin.round(2), Jmax.round(2), Jmean.round(2)) == (0.33,2.67,1.14), 'valores incorretos'
print ("OK")

OK
OK
