## FOR GOOGLE COLAB USERS

In [None]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz
!tar xf spark-2.4.2-bin-hadoop2.7.tgzdf!pip install -q findspark

In [None]:
!java -version

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.11.0-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Chamada da biblioteca

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

### Inicializando o Sessão do Spark - 
##### Para usuários jupyter desktop

In [2]:
spark = SparkSession.builder \
                    .appName("Tutorial PySpark SQL") \
                    .config("spark.some.config.option", "some-value") \
                    .getOrCreate()

### Lendo dataframe com spark DataFrame

In [3]:
df = spark.read.csv('DNPBA2017.csv', header=True)

### Identificando a estrutura da base

In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- NUMERODN: string (nullable = true)
 |-- CODINST: string (nullable = true)
 |-- ORIGEM: string (nullable = true)
 |-- NUMERODV: string (nullable = true)
 |-- PREFIXODN: string (nullable = true)
 |-- CODESTAB: string (nullable = true)
 |-- CODMUNNASC: string (nullable = true)
 |-- LOCNASC: string (nullable = true)
 |-- IDADEMAE: string (nullable = true)
 |-- ESTCIVMAE: string (nullable = true)
 |-- ESCMAE: string (nullable = true)
 |-- CODOCUPMAE: string (nullable = true)
 |-- QTDFILVIVO: string (nullable = true)
 |-- QTDFILMORT: string (nullable = true)
 |-- CODMUNRES: string (nullable = true)
 |-- GESTACAO: string (nullable = true)
 |-- GRAVIDEZ: string (nullable = true)
 |-- PARTO: string (nullable = true)
 |-- CONSULTAS: string (nullable = true)
 |-- DTNASC: string (nullable = true)
 |-- HORANASC: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- APGAR1: string (nullable = true)
 |-- APGAR5: string (nullable = true)
 |-- RA

### Quantificando os registros da base

In [5]:
df.count()

203715

### Quantificando municipios distintos

In [44]:
df.select('CODMUNRES').distinct().count()

418

### Retornando sexo e data de nascimento dos nascidos

In [45]:
df.select('SEXO', 'DTNASC').show()

+----+--------+
|SEXO|  DTNASC|
+----+--------+
|   1|24012017|
|   1|27032017|
|   1|29052017|
|   1|19012017|
|   2|23012017|
|   1|02032017|
|   2|27032017|
|   1|11062017|
|   2|23022017|
|   1|24012017|
|   2|07022017|
|   2|27032017|
|   1|28032017|
|   2|29032017|
|   2|19052017|
|   1|23032017|
|   1|02012017|
|   1|23012017|
|   1|08012017|
|   2|25042017|
+----+--------+
only showing top 20 rows



### Retornando sexo e data de nascimento da 3 crianças mais velhas da base

In [46]:
df.select('SEXO', 'DTNASC').orderBy('DTNASC').take(3)

[Row(SEXO=u'2', DTNASC=u'01012017'),
 Row(SEXO=u'2', DTNASC=u'01012017'),
 Row(SEXO=u'2', DTNASC=u'01012017')]

In [47]:
df.select('SEXO', 'DTNASC').orderBy('DTNASC').show(3)

+----+--------+
|SEXO|  DTNASC|
+----+--------+
|   2|01012017|
|   1|01012017|
|   1|01012017|
+----+--------+
only showing top 3 rows



### Retornando so o primeiro registo entre as crianças mais velhas

In [48]:
df.select('SEXO', 'DTNASC').orderBy('DTNASC').first()

Row(SEXO=u'2', DTNASC=u'01012017')

### Contar os nascimentos que ocorreram em 01 de 01 de 2017

In [49]:
df.filter(F.col('DTNASC') == '01012017').count()

374

### Retornando os nascimentos que ocorreram em Julho

In [50]:
df.select('DTNASC', F.when(F.substring(F.col('DTNASC'), 2,4) == '07', 1).otherwise(0)).show()

+--------+----------------------------------------------------------+
|  DTNASC|CASE WHEN (substring(DTNASC, 2, 4) = 07) THEN 1 ELSE 0 END|
+--------+----------------------------------------------------------+
|24012017|                                                         0|
|27032017|                                                         0|
|29052017|                                                         0|
|19012017|                                                         0|
|23012017|                                                         0|
|02032017|                                                         0|
|27032017|                                                         0|
|11062017|                                                         0|
|23022017|                                                         0|
|24012017|                                                         0|
|07022017|                                                         0|
|27032017|          

### Criando coluna com o mês de nascimento 

In [51]:
df = df.withColumn('MESNASC', F.substring(F.col('DTNASC'), 3,2))

### Criando coluna através de alias

In [52]:
df.select(F.substring(F.col('DTNASC'), 3, 2).alias('mes'), 'DTNASC').show()

+---+--------+
|mes|  DTNASC|
+---+--------+
| 01|24012017|
| 03|27032017|
| 05|29052017|
| 01|19012017|
| 01|23012017|
| 03|02032017|
| 03|27032017|
| 06|11062017|
| 02|23022017|
| 01|24012017|
| 02|07022017|
| 03|27032017|
| 03|28032017|
| 03|29032017|
| 05|19052017|
| 03|23032017|
| 01|02012017|
| 01|23012017|
| 01|08012017|
| 04|25042017|
+---+--------+
only showing top 20 rows



### Criando e usando minha primeira UDF
#### Note que User Defined Functions precisam ser registradas para funcionar. 
#### UDFs podem ser usadas em transformações "withColum" ou "select"

In [53]:
def mesnasc(col):
    return col[2:4]
udf_mesnasc = F.udf(mesnasc, StringType())

In [54]:
df = df.withColumn('MESNASC_2', udf_mesnasc(F.col('DTNASC')))

In [55]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- NUMERODN: string (nullable = true)
 |-- CODINST: string (nullable = true)
 |-- ORIGEM: string (nullable = true)
 |-- NUMERODV: string (nullable = true)
 |-- PREFIXODN: string (nullable = true)
 |-- CODESTAB: string (nullable = true)
 |-- CODMUNNASC: string (nullable = true)
 |-- LOCNASC: string (nullable = true)
 |-- IDADEMAE: string (nullable = true)
 |-- ESTCIVMAE: string (nullable = true)
 |-- ESCMAE: string (nullable = true)
 |-- CODOCUPMAE: string (nullable = true)
 |-- QTDFILVIVO: string (nullable = true)
 |-- QTDFILMORT: string (nullable = true)
 |-- CODMUNRES: string (nullable = true)
 |-- GESTACAO: string (nullable = true)
 |-- GRAVIDEZ: string (nullable = true)
 |-- PARTO: string (nullable = true)
 |-- CONSULTAS: string (nullable = true)
 |-- DTNASC: string (nullable = true)
 |-- HORANASC: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- APGAR1: string (nullable = true)
 |-- APGAR5: string (nullable = true)
 |-- RA

In [56]:
df.select('MESNASC', 'MESNASC_2').limit(10).toPandas()

Unnamed: 0,MESNASC,MESNASC_2
0,1,1
1,3,3
2,5,5
3,1,1
4,1,1
5,3,3
6,3,3
7,6,6
8,2,2
9,1,1


### Agrupando os dados em função do mês de nascimento

In [57]:
df.groupBy('MESNASC').count().orderBy('MESNASC').show()

+-------+-----+
|MESNASC|count|
+-------+-----+
|     01|15924|
|     02|15787|
|     03|18435|
|     04|17821|
|     05|18952|
|     06|18041|
|     07|17118|
|     08|16643|
|     09|15984|
|     10|16423|
|     11|16337|
|     12|16250|
+-------+-----+



### Usando o agrupamento para conhecer a proporção de nascidos por mês

In [58]:
tot = df.count()
df.groupBy('MESNASC').count().orderBy('MESNASC').withColumn('%', F.col('count')/tot*100).show()

+-------+-----+-----------------+
|MESNASC|count|                %|
+-------+-----+-----------------+
|     01|15924|7.816802886385392|
|     02|15787|7.749552070294284|
|     03|18435|9.049407260142846|
|     04|17821|8.748005792406058|
|     05|18952|9.303193186559655|
|     06|18041|8.855999803647252|
|     07|17118|8.402915838303512|
|     08|16643|8.169746950396387|
|     09|15984|7.846255798542081|
|     10|16423|8.061752939155193|
|     11|16337| 8.01953709839727|
|     12|16250|7.976830375770071|
+-------+-----+-----------------+



### Buscando ou contando registros cujo valor do atributo inicia, termina ou está entre... 

In [59]:
df.filter(F.col('CODMUNRES').startswith("29")).count()

203715

In [60]:
df.filter(F.col('CODMUNRES').startswith("29")).select('CODMUNRES').show()

+---------+
|CODMUNRES|
+---------+
|   291110|
|   291110|
|   291955|
|   291955|
|   291955|
|   290000|
|   291955|
|   290000|
|   292440|
|   292840|
|   291110|
|   291110|
|   292840|
|   291110|
|   292840|
|   291110|
|   290590|
|   290590|
|   292440|
|   291840|
+---------+
only showing top 20 rows



In [61]:
df.filter(F.col('CODMUNRES').endswith("2740")).count()

35366

In [62]:
df.filter(F.col('CODMUNRES').endswith("2740")).select('CODMUNRES').show()

+---------+
|CODMUNRES|
+---------+
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
|   292740|
+---------+
only showing top 20 rows



In [63]:
df.filter(F.col('PESO').between('0110', '6710' )).count()

203645

### Descrevendo uma variável

In [64]:
df.describe('PESO').show()

+-------+------------------+
|summary|              PESO|
+-------+------------------+
|  count|            203715|
|   mean|3198.3266173979227|
| stddev| 575.2462242684788|
|    min|              0110|
|    max|                NA|
+-------+------------------+



### Contando valores "NA" 

In [65]:
df.filter(F.col('PESO')=='NA').count()

70

### Contando valores Nulos

In [66]:
df.filter(F.col('PESO').isNull()).count()

0

### Buscando valores "NA" em "PESO" e criando uma coluna nova para guardar os resultados
#### As demais linhas são para validar os resultados

In [67]:
df = df.withColumn('NOVO_PESO', F.when(F.col('PESO') == 'NA', None).otherwise(F.col('PESO')))

In [68]:
df.filter(F.col('PESO')=='NA').select('PESO', 'NOVO_PESO').show()

+----+---------+
|PESO|NOVO_PESO|
+----+---------+
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
|  NA|     null|
+----+---------+
only showing top 20 rows



In [69]:
df.filter(F.col('PESO')!='NA').select('PESO', 'NOVO_PESO').show()

+----+---------+
|PESO|NOVO_PESO|
+----+---------+
|3150|     3150|
|3300|     3300|
|3330|     3330|
|3610|     3610|
|3770|     3770|
|3140|     3140|
|2775|     2775|
|2490|     2490|
|3450|     3450|
|3270|     3270|
|3000|     3000|
|3200|     3200|
|3250|     3250|
|3600|     3600|
|3650|     3650|
|3190|     3190|
|2820|     2820|
|1900|     1900|
|3800|     3800|
|3130|     3130|
+----+---------+
only showing top 20 rows



### Renomeando Coluna

In [70]:
df = df.withColumnRenamed('NOVO_PESO', 'PESO_PP') #PESO PRE PROCESSADO

### Descrevendo coluna nova 

In [71]:
df.describe('PESO_PP').show()

+-------+------------------+
|summary|           PESO_PP|
+-------+------------------+
|  count|            203645|
|   mean|3198.3266173979227|
| stddev| 575.2462242684788|
|    min|              0110|
|    max|              6710|
+-------+------------------+



# PARTE 2: ROTEIRO PARA LINKAGE

### Lendo e visualizando as bases

In [84]:
datasetA = spark.read.csv('dataset_a_v3.csv', header=True, sep=';')
datasetB = spark.read.csv('dataset_b_v3.csv', header=True, sep=';')

In [85]:
print datasetA.count()
print datasetB.count()
datasetA.limit(5).show()
datasetB.limit(5).show()

100
20
+-----+--------------------+---------+------+--------------------+--------+---------------+
|cod_a|              nome_a|     dn_a|sexo_a|               mae_a|cidade_a|primeiro_nome_a|
+-----+--------------------+---------+------+--------------------+--------+---------------+
|    1|EDSON GOMES    DO...|1-01-2007|     1|WEDILAINE VIEIRA ...|  280030|          EDSON|
|    2|ALESSANDRA KAUANÈ...|1-02-2008|     2|VITORIA LUCIA AMO...|  280740|     ALESSANDRA|
|    3|DAVI GONÇALVES DA...|1-04-2007|     1| VILMA GOMES MOREIRA|  280030|           DAVI|
|    4|ALISON DE JESUS T...|1-05-2007|     2|VERA LUCIA FRANCI...|  280030|         ALISON|
|    5|DANNYEL COSTA DE ...|1-12-2006|     1|VANILDA TRINDADE ...|  280030|        DANNYEL|
+-----+--------------------+---------+------+--------------------+--------+---------------+

+-----+--------------------+----------+------+--------------------+--------+---------------+
|cod_b|              nome_b|      dn_b|sexo_b|               mae_b|cida

### Criando colunas com codigos foneticos

In [86]:
import jellyfish

In [87]:
def criaMetaphone(col):
    return jellyfish.metaphone(col)
udf_criaMetaphone = F.udf(criaMetaphone, StringType())

In [89]:
datasetA = datasetA.withColumn('phonetic_nome_a', udf_criaMetaphone(F.col('nome_a')))
datasetA = datasetA.withColumn('phonetic_mae_a', udf_criaMetaphone(F.col('mae_a')))
datasetB = datasetB.withColumn('phonetic_nome_b', udf_criaMetaphone(F.col('nome_b')))
datasetB = datasetB.withColumn('phonetic_mae_b', udf_criaMetaphone(F.col('mae_b')))

datasetA.limit(3).show()
datasetB.limit(3).show()

+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+
|cod_a|              nome_a|     dn_a|sexo_a|               mae_a|cidade_a|primeiro_nome_a|     phonetic_nome_a|  phonetic_mae_a|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+
|    1|EDSON GOMES    DO...|1-01-2007|     1|WEDILAINE VIEIRA ...|  280030|          EDSON|    ETSN KMS TS SNTS|     WTLN FR BSR|
|    2|ALESSANDRA KAUANÈ...|1-02-2008|     2|VITORIA LUCIA AMO...|  280740|     ALESSANDRA|ALSNTR KN SS TS SNTS|FTR LX AMRM T SS|
|    3|DAVI GONÇALVES DA...|1-04-2007|     1| VILMA GOMES MOREIRA|  280030|           DAVI|     TF KNKLFS T  RX|     FLM KMS MRR|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+

+-----+--------------------+----------+------+--------------------+--------+-------------

### Criando coluna com último nome

In [92]:
def criaUltimoNome(col):
    return col.split(' ')[-1]
udf_criaUltimoNome = F.udf(criaUltimoNome, StringType())

In [93]:
datasetA = datasetA.withColumn('ultimo_nome_a', udf_criaUltimoNome(F.col('nome_a')))
datasetB = datasetB.withColumn('ultimo_nome_b', udf_criaUltimoNome(F.col('nome_b')))

datasetA.limit(3).show()
datasetB.limit(3).show()

+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+-------------+
|cod_a|              nome_a|     dn_a|sexo_a|               mae_a|cidade_a|primeiro_nome_a|     phonetic_nome_a|  phonetic_mae_a|ultimo_nome_a|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+-------------+
|    1|EDSON GOMES    DO...|1-01-2007|     1|WEDILAINE VIEIRA ...|  280030|          EDSON|    ETSN KMS TS SNTS|     WTLN FR BSR|       SANTOS|
|    2|ALESSANDRA KAUANÈ...|1-02-2008|     2|VITORIA LUCIA AMO...|  280740|     ALESSANDRA|ALSNTR KN SS TS SNTS|FTR LX AMRM T SS|       SANTOS|
|    3|DAVI GONÇALVES DA...|1-04-2007|     1| VILMA GOMES MOREIRA|  280030|           DAVI|     TF KNKLFS T  RX|     FLM KMS MRR|        ROCHA|
+-----+--------------------+---------+------+--------------------+--------+---------------+--------------------+----------------+-------

### Separando atributos para linkage

In [94]:
datasetA = datasetA.select(['cod_a', 'dn_a', 
                            'sexo_a', 'cidade_a', 
                            'primeiro_nome_a', 'ultimo_nome_a', 
                            'phonetic_nome_a', 'phonetic_mae_a'])

datasetB = datasetB.select(['cod_b', 'dn_b', 
                            'sexo_b', 'cidade_b', 
                            'primeiro_nome_b', 'ultimo_nome_b', 
                            'phonetic_nome_b', 'phonetic_mae_b']) 

### Criando dataset de comparação

In [96]:
dataset_linkage = datasetA.crossJoin(datasetB)

In [97]:
dataset_linkage.count()
dataset_linkage.limit(5).show()

+-----+---------+------+--------+---------------+-------------+----------------+--------------+-----+----------+------+--------+---------------+-------------+--------------------+------------------+
|cod_a|     dn_a|sexo_a|cidade_a|primeiro_nome_a|ultimo_nome_a| phonetic_nome_a|phonetic_mae_a|cod_b|      dn_b|sexo_b|cidade_b|primeiro_nome_b|ultimo_nome_b|     phonetic_nome_b|    phonetic_mae_b|
+-----+---------+------+--------+---------------+-------------+----------------+--------------+-----+----------+------+--------+---------------+-------------+--------------------+------------------+
|    1|1-01-2007|     1|  280030|          EDSON|       SANTOS|ETSN KMS TS SNTS|   WTLN FR BSR|    1|09/13/2007|     1|  280030|            EDU|      TAVACHO|          ET PRR TFX|    SMN ANTN RTRKS|
|    1|1-01-2007|     1|  280030|          EDSON|       SANTOS|ETSN KMS TS SNTS|   WTLN FR BSR|    2|05/17/2007|     1|  280030|           JOSE|       SANTOS| JS IKR SNTN TS SNTS|      NKLN FTM MXT|
|    

### Criando função de comparação

In [109]:
def compare(cod_a, dn_a, sexo_a, cidade_a, primeiro_nome_a, ultimo_nome_a, phonetic_nome_a, phonetic_mae_a,
           cod_b, dn_b, sexo_b, cidade_b, primeiro_nome_b, ultimo_nome_b, phonetic_nome_b, phonetic_mae_b):
    sim = 0
    
    # Comparando atributos nominais
    sim_nominais = jellyfish.jaro_winkler(unicode(primeiro_nome_a), unicode(primeiro_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(ultimo_nome_a), unicode(ultimo_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_nome_a), unicode(phonetic_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_mae_a), unicode(phonetic_mae_b))
    
    # Comparando categorias
    # Note que Hamming é uma distancia, então para saber a similiarade, precisamos
    # encontrar o complemento da medida. 
    sim_cat = 1 - (jellyfish.hamming_distance(unicode(sexo_a), unicode(sexo_b)))
    sim_cat += 1 - (jellyfish.hamming_distance(unicode(dn_a), unicode(sexo_b)))
    sim_cat += 1 - (jellyfish.hamming_distance(unicode(cidade_a), unicode(cidade_b)))
    
    # Media aritmetica simples
    sim = str(abs(float(sim_nominais + sim_cat)/7))
    
    return sim
udf_compare = F.udf(compare, StringType())

### Rodando comparação

In [110]:
result_linkage = dataset_linkage.withColumn('similaridade', udf_compare(F.col('cod_a'), F.col('dn_a'), F.col('sexo_a'), F.col('cidade_a'), F.col('primeiro_nome_a'), F.col('ultimo_nome_a'), F.col('phonetic_nome_a'), F.col('phonetic_mae_a'),
                                                                       F.col('cod_b'), F.col('dn_b'), F.col('sexo_b'), F.col('cidade_b'), F.col('primeiro_nome_b'), F.col('ultimo_nome_b'), F.col('phonetic_nome_b'), F.col('phonetic_mae_b')))

In [111]:
result_linkage.select(['cod_a', 'cod_b', 'similaridade']).show()

+-----+-----+--------------+
|cod_a|cod_b|  similaridade|
+-----+-----+--------------+
|    1|    1|0.371142547928|
|    1|    2|0.313484739988|
|    1|    3|0.311724386724|
|    1|    4|0.404577234934|
|    1|    5|0.378786160929|
|    1|    6|0.971446608947|
|    1|    7|0.431389311746|
|    1|    8| 0.34897443826|
|    1|    9|0.739332096475|
|    1|   10|0.575290582433|
|    1|   11|0.664727709266|
|    1|   12|0.697595856524|
|    1|   13|0.436555403661|
|    1|   14|0.846889221889|
|    1|   15|0.712662337662|
|    1|   16| 0.34705988456|
|    1|   17|0.487968975469|
|    1|   18|0.688083718336|
|    1|   19|0.407490128919|
|    1|   20|0.713810726311|
+-----+-----+--------------+
only showing top 20 rows



## DESAFIOS

##### Desafio 1: Deduplique a base de linkage para encontrar os melhores resultados para cada registro do datasetB
##### Desafio 2: Separe a data em três colunas diferentes (dia, mês e ano) e reimplemente a função 'compare' para que a distancia de hamming participe do calculo da similaridade
##### Desafio 3: Quais os atributos mais importantes a serem comparados? Ao invés de uma média aritmética simples, implemente uma média aritmética ponderada dando pesos diferentes para cada atributos.
##### Desafio 4: Explore novas similaridades. Faça uso de diferentes medidas para identidficar as que melhor se encaixam nesse par de datasets. ##### Desafio 5: Leia um pouco sobre penalidades, um decréscimo que se dá à similaridade quando um valor nulo participa da comparação. 
##### Desafio 6: Leia um pouco sobre avaliação de acurácia e faça uma planilha tabulando os resultados de sensibilidade, especificidade e precisão para todos os seus experimentos. 
##### Desafio 7: Tem como melhorar essa implementação? Tente otimizar essa implementação usando os conceitos discutidos em sala. 