### Chamada de bibliotecas

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
import pandas as pd

# ROTEIRO PARA LINKAGE

### Lendo e visualizando as bases

In [2]:
datasetA = spark.read.csv('hdfs://node5:9000/raw/csv/base_sintetica_ascii_a.csv', header=True, sep=',')
datasetB = spark.read.csv('hdfs://node5:9000/raw/csv/base_sintetica_ascii_b.csv', header=True, sep=',')

In [3]:
print(datasetA.count())
print(datasetB.count())
datasetA.limit(5).show()
datasetB.limit(5).show()

540
143
+---+--------------+---------+---------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+---------+--------------+----+
| id|        bairro|      cep|   cidade|           cpf| data_nasc|               email|            endereco|estado|idade|                 mae|                nome|numero|    signo|tipo_sanguineo|_c15|
+---+--------------+---------+---------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+---------+--------------+----+
|  0|   Vila Sarney|65090-677| Sao Luis|097.627.958-49|10/09/1948|vicenteolivermont...|2a Travessa do Ri...|    MA|   71|     Louise Beatriz |Vicente Oliver Mo...|   738|   Virgem|           AB-|null|
|  1|Vila Sao Jorge|75044-220| Anapolis|685.096.335-09|24/04/1995|miguelbernardogon...|  Praca Joao Salomao|    GO|   24|     Raquel Barbara |Miguel Bernardo G...|   805|    Touro|        

### Criando colunas com codigos foneticos

In [4]:
#!pip install jellyfish

In [5]:
import jellyfish

In [6]:
def criaMetaphone(col):
    return jellyfish.metaphone(col)
udf_criaMetaphone = F.udf(criaMetaphone, StringType())

In [7]:
datasetA = datasetA.withColumn('phonetic_nome_a', udf_criaMetaphone(F.col('nome')))
datasetA = datasetA.withColumn('phonetic_mae_a', udf_criaMetaphone(F.col('mae')))

datasetA.limit(3).show()

+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+
| id|        bairro|      cep|  cidade|           cpf| data_nasc|               email|            endereco|estado|idade|                 mae|                nome|numero| signo|tipo_sanguineo|_c15|    phonetic_nome_a|phonetic_mae_a|
+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+
|  0|   Vila Sarney|65090-677|Sao Luis|097.627.958-49|10/09/1948|vicenteolivermont...|2a Travessa do Ri...|    MA|   71|     Louise Beatriz |Vicente Oliver Mo...|   738|Virgem|           AB-|null|     FSNT OLFR MNTR|      LS BTRS |
|  1|Vila Sao Jorge|75044-220|Anapolis|685.096.335-09|24/04/1995|miguelb

In [8]:
datasetB = datasetB.fillna({'nome':' '})

In [9]:
datasetB = datasetB.withColumn('phonetic_nome_b', udf_criaMetaphone(F.col('nome')))
datasetB = datasetB.withColumn('phonetic_mae_b', udf_criaMetaphone(F.col('mae')))


datasetB.limit(3).show()

+---+---------+-------------+--------------+----------+--------------------+------+--------------------+--------------------+--------------+---------------+---------------+
| id|      cep|       cidade|           cpf| data_nasc|            endereco|estado|                 mae|                nome|tipo_sanguineo|phonetic_nome_b| phonetic_mae_b|
+---+---------+-------------+--------------+----------+--------------------+------+--------------------+--------------------+--------------+---------------+---------------+
| 01|69304-350|    Boa Vista|506.865.202-46|22/11/1965|Avenida Mario Hom...|    RR|Isabelle Marcia A...|Felipe Bruno dos ...|           AB-|FLP BRN TS SNTS|ISBL MRX ANTNL |
| 02|78132-360|Varzea Grande|325.828.867-42|      null|      Rua Rio Cuiaba|    MT|       Alana Isadora|Marcio Heitor Sil...|           AB-|   MRS HTR SLFR|       ALN ISTR|
| 03|88810-336|     Criciuma|284.536.218-88|23/09/1961|Rua Romeu Lopes d...|    SC|  Caroline Gabriela |Theo Mario Victor...|          

### Criando coluna com último e primeiro nome

In [10]:
def criaUltimoNome(col):
    return col.split(' ')[-1]
udf_criaUltimoNome = F.udf(criaUltimoNome, StringType())

def criaPrimeiroNome(col):
    return col.split(' ')[0]
udf_criaUltimoNome = F.udf(criaUltimoNome, StringType())

In [11]:
datasetA = datasetA.withColumn('ultimo_nome_a', udf_criaUltimoNome(F.col('nome')))
datasetB = datasetB.withColumn('ultimo_nome_b', udf_criaUltimoNome(F.col('nome')))

datasetA = datasetA.withColumn('primeiro_nome_a', udf_criaUltimoNome(F.col('nome')))
datasetB = datasetB.withColumn('primeiro_nome_b', udf_criaUltimoNome(F.col('nome')))

datasetA.limit(3).show()
datasetB.limit(3).show()

+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+-------------+---------------+
| id|        bairro|      cep|  cidade|           cpf| data_nasc|               email|            endereco|estado|idade|                 mae|                nome|numero| signo|tipo_sanguineo|_c15|    phonetic_nome_a|phonetic_mae_a|ultimo_nome_a|primeiro_nome_a|
+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+-------------+---------------+
|  0|   Vila Sarney|65090-677|Sao Luis|097.627.958-49|10/09/1948|vicenteolivermont...|2a Travessa do Ri...|    MA|   71|     Louise Beatriz |Vicente Oliver Mo...|   738|Virgem|           AB-|null|     FSNT OLFR MNT

In [12]:
datasetA.limit(3).show()

+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+-------------+---------------+
| id|        bairro|      cep|  cidade|           cpf| data_nasc|               email|            endereco|estado|idade|                 mae|                nome|numero| signo|tipo_sanguineo|_c15|    phonetic_nome_a|phonetic_mae_a|ultimo_nome_a|primeiro_nome_a|
+---+--------------+---------+--------+--------------+----------+--------------------+--------------------+------+-----+--------------------+--------------------+------+------+--------------+----+-------------------+--------------+-------------+---------------+
|  0|   Vila Sarney|65090-677|Sao Luis|097.627.958-49|10/09/1948|vicenteolivermont...|2a Travessa do Ri...|    MA|   71|     Louise Beatriz |Vicente Oliver Mo...|   738|Virgem|           AB-|null|     FSNT OLFR MNT

In [13]:
datasetA = datasetA.withColumnRenamed('id','id_a')
datasetA = datasetA.withColumnRenamed('cep','cep_a')
datasetA = datasetA.withColumnRenamed('cidade','cidade_a')
datasetA = datasetA.withColumnRenamed('cpf','cpf_a')
datasetA = datasetA.withColumnRenamed('data_nasc','data_nasc_a')
datasetA = datasetA.withColumnRenamed('endereco','endereco_a')
datasetA = datasetA.withColumnRenamed('estado','estado_a')
datasetA = datasetA.withColumnRenamed('mae','mae_a')
datasetA = datasetA.withColumnRenamed('nome','nome_a')
datasetA = datasetA.withColumnRenamed('tipo_sanguineo','tipo_sanguineo_a')
datasetA = datasetA.withColumnRenamed('phonetic_nome','phonetic_nome_a')
datasetA = datasetA.withColumnRenamed('phonetic_mae','phonetic_mae_a')
datasetA = datasetA.withColumnRenamed('ultimo_nome','ultimo_nome_a')
datasetA = datasetA.withColumnRenamed('primeiro_nome','primeiro_nome_a')


datasetB = datasetB.withColumnRenamed('id','id_b')
datasetB = datasetB.withColumnRenamed('cep','cep_b')
datasetB = datasetB.withColumnRenamed('cidade','cidade_b')
datasetB = datasetB.withColumnRenamed('cpf','cpf_b')
datasetB = datasetB.withColumnRenamed('data_nasc','data_nasc_b')
datasetB = datasetB.withColumnRenamed('endereco','endereco_b')
datasetB = datasetB.withColumnRenamed('estado','estado_b')
datasetB = datasetB.withColumnRenamed('mae','mae_b')
datasetB = datasetB.withColumnRenamed('nome','nome_b')
datasetB = datasetB.withColumnRenamed('tipo_sanguineo','tipo_sanguineo_b')
datasetB = datasetB.withColumnRenamed('phonetic_nome','phonetic_nome_b')
datasetB = datasetB.withColumnRenamed('phonetic_mae','phonetic_mae_b')
datasetB = datasetB.withColumnRenamed('ultimo_nome','ultimo_nome_b')
datasetB = datasetB.withColumnRenamed('primeiro_nome','primeiro_nome_b')

### Separando atributos para linkage

In [14]:
datasetA = datasetA.select(['id_a', 'cep_a', 'cidade_a','cpf_a', 'data_nasc_a','endereco_a', 'estado_a', 'mae_a','nome_a', 'tipo_sanguineo_a', 'phonetic_nome_a', 
'phonetic_mae_a', 'ultimo_nome_a' , 'primeiro_nome_a'])
datasetB = datasetB.select(['id_b', 'cep_b', 'cidade_b', 'cpf_b', 'data_nasc_b', 'endereco_b', 'estado_b', 'mae_b', 'nome_b', 'tipo_sanguineo_b' ,'phonetic_nome_b',
'phonetic_mae_b', 'ultimo_nome_b', 'primeiro_nome_b'])

### Criando dataset de comparação

In [15]:
dataset_linkage = datasetA.crossJoin(datasetB)

In [16]:
dataset_linkage.count()
dataset_linkage.limit(5).show()

+----+---------+--------+--------------+-----------+--------------------+--------+---------------+--------------------+----------------+---------------+--------------+-------------+---------------+----+---------+-------------+--------------+-----------+--------------------+--------+--------------------+--------------------+----------------+---------------+---------------+-------------+---------------+
|id_a|    cep_a|cidade_a|         cpf_a|data_nasc_a|          endereco_a|estado_a|          mae_a|              nome_a|tipo_sanguineo_a|phonetic_nome_a|phonetic_mae_a|ultimo_nome_a|primeiro_nome_a|id_b|    cep_b|     cidade_b|         cpf_b|data_nasc_b|          endereco_b|estado_b|               mae_b|              nome_b|tipo_sanguineo_b|phonetic_nome_b| phonetic_mae_b|ultimo_nome_b|primeiro_nome_b|
+----+---------+--------+--------------+-----------+--------------------+--------+---------------+--------------------+----------------+---------------+--------------+-------------+---------

### Criando função de comparação

In [17]:
import sys
if sys.version_info[0] >= 3:
    unicode = str

In [18]:
def compare( id_a, cep_a, cidade_a, cpf_a , data_nasc_a , endereco_a, estado_a, mae_a, nome_a, tipo_sanguineo_a, phonetic_nome_a, 
    phonetic_mae_a, ultimo_nome_a , primeiro_nome_a, 
            id_b, cep_b, cidade_b, cpf_b, data_nasc_b, endereco_b, estado_b, mae_b, nome_b, tipo_sanguineo_b ,phonetic_nome_b,
    phonetic_mae_b, ultimo_nome_b, primeiro_nome_b):
    sim = 0
    
    # Comparando atributos nominais
    sim_nominais = jellyfish.jaro_winkler(unicode(primeiro_nome_a), unicode(primeiro_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(ultimo_nome_a), unicode(ultimo_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_nome_a), unicode(phonetic_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_mae_a), unicode(phonetic_mae_b))
    
    sim_nominais += jellyfish.jaro_winkler(unicode(cidade_a), unicode(cidade_b))

    sim_nominais += jellyfish.jaro_winkler(unicode( endereco_a), unicode(endereco_b ))
    sim_nominais += jellyfish.jaro_winkler(unicode( estado_a), unicode( estado_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(mae_a ), unicode(mae_b ))
    sim_nominais += jellyfish.jaro_winkler(unicode( nome_a), unicode( nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode( tipo_sanguineo_a), unicode( tipo_sanguineo_b ))

    
    # Comparando categorias
    # Note que Hamming é uma distancia, então para saber a similiarade, precisamos
    # encontrar o complemento da medida. 
    sim_cat = 1 - (jellyfish.hamming_distance(unicode(data_nasc_a), unicode(data_nasc_b)))
       
    sim_cat += 1 - (jellyfish.hamming_distance(unicode( cpf_a), unicode(cpf_a )))

    sim_cat += 1 - (jellyfish.hamming_distance(unicode( cep_a ), unicode( cep_b )))
    
    # Media aritmetica simples
    sim = str(abs(float(sim_nominais + sim_cat)/7))
    
    return sim
udf_compare = F.udf(compare, StringType())

In [19]:
result_linkage = dataset_linkage.withColumn('similaridade', udf_compare( F.col('id_a'), F.col('cep_a'), F.col('cidade_a'), F.col('cpf_a'), F.col('data_nasc_a'), F.col('endereco_a'), F.col('estado_a'), F.col('mae_a'), F.col('nome_a'), F.col('tipo_sanguineo_a'), F.col('phonetic_nome_a'), F.col('phonetic_mae_a'), F.col('ultimo_nome_a'), F.col('primeiro_nome_a'), 
                                                                        F.col('id_b'),F.col('cep_b'), F.col('cidade_b'), F.col('cpf_b'), F.col('data_nasc_b'), F.col('endereco_b'), F.col('estado_b'), F.col('mae_b'), F.col('nome_b'), F.col('tipo_sanguineo_b'), F.col('phonetic_nome_b'), F.col('phonetic_mae_b'), F.col('ultimo_nome_b'), F.col('primeiro_nome_b')))

### Rodando comparação

In [20]:
result_linkage.select(['id_a', 'id_b', 'similaridade']).show()

+----+----+-------------------+
|id_a|id_b|       similaridade|
+----+----+-------------------+
|   0|  01| 0.6406754984912334|
|   0|  02|  1.261807481683258|
|   0|  03|0.49203985991770666|
|   0|  04| 0.5407525589202609|
|   0|  05| 0.6180216091706777|
|   0|  06| 0.4062373437838956|
|   0|  07| 0.4807294327480663|
|   0|  08|0.10634696566373579|
|   0|  09| 0.9409161117235652|
|   0|  10| 1.2809133166458706|
|   0|  11| 0.5070947918774006|
|   0|  12| 0.7047443814524561|
|   0|  13| 0.7301802006570001|
|   0|  14| 0.4757095279750895|
|   0|  15|0.38775698484747323|
|   0|  16|0.16254377552514207|
|   0|  17| 0.1833121311482362|
|   0|  18| 0.4297843403433465|
|   0|  19| 0.5555817957060194|
|   0|  20| 0.5135951993529507|
+----+----+-------------------+
only showing top 20 rows



In [21]:
datamart = result_linkage.orderBy(['similaridade'], ascending=False).dropDuplicates(['id_b'])

In [22]:
datamart.count()

143

In [23]:

datamart.select(['id_a', 'nome_a', 'id_b', 'nome_b', 'similaridade']).show()

+----+--------------------+----+--------------------+------------------+
|id_a|              nome_a|id_b|              nome_b|      similaridade|
+----+--------------------+----+--------------------+------------------+
|  20|Luan Caue Anthony...|  07|Luan Caue Anthony...|1.5714285714285714|
| 355|Danilo Thiago Yur...| 125|   Samuel Noah Lopes|1.1120690676865321|
| 381|Raul Miguel Ryan ...| 124|Danilo Severino d...|1.1169485822436134|
|  63|Alexandre Bruno M...|  51|Alexandre Bruno M...|1.8571428571428572|
|  28|Leandro Cesar Jor...|  15|Leandro Cesar Jor...|1.7817460317460316|
|  66| Ricardo Igor Barros|  54| Ricardo Igor Barros|1.7142857142857142|
| 294|Levi Nelson Noah ...| 132|Rodrigo Erick Rob...|1.2291639919499207|
| 298|Cesar Isaac Luis ...| 101| Isaac Luiz Aparicio|1.0867532761274468|
|  24|Gabriel Caua Marc...|  11|Gabriel Caua Marc...|1.7142857142857142|
|  99|Kaue Francisco Be...| 138|Heitor Leonardo C...| 1.061084779208718|
| 294|Levi Nelson Noah ...|  29|Calebe Osvaldo Ba..

# CALCULADORA DE ACURÁCIA

In [24]:
# Let us consider a cuttoff point set as 0.85
cutoff = 1.0

# sorting and deduplicating the resulting dataset
data = datamart.withColumn('similaridade', F.col('similaridade').cast(DoubleType()))
data = datamart.orderBy('similaridade').dropDuplicates(['id_b'])
data = datamart.withColumn('match', F.when(F.col('similaridade') >= cutoff, '1').otherwise('0'))

In [25]:
def inspect_pairs(cpf_a, cpf_b, match):
    if match == '1':
        if cpf_a == cpf_b:
            return "TP"
        else:
            return "FP"
    else:
        if cpf_a != cpf_b:
            return "TN"
        else: 
            return "FN"
udf_inspect_pairs = F.udf(inspect_pairs, StringType())

In [26]:
data = data.withColumn('perf', udf_inspect_pairs(F.col('cpf_a'), F.col('cpf_b'), F.col('match')))

In [27]:
data.select('perf','similaridade','match').show()

+----+------------------+-----+
|perf|      similaridade|match|
+----+------------------+-----+
|  TP|1.5714285714285714|    1|
|  FP|1.1120690676865321|    1|
|  FP|1.1169485822436134|    1|
|  TP|1.8571428571428572|    1|
|  TP|1.7817460317460316|    1|
|  TP|1.7142857142857142|    1|
|  FP|1.2291639919499207|    1|
|  FP|1.0867532761274468|    1|
|  TP|1.7142857142857142|    1|
|  FP| 1.061084779208718|    1|
|  FP|1.3609100319969885|    1|
|  TP|1.8571428571428572|    1|
|  FP|1.1935020239642087|    1|
|  TP|1.8571428571428572|    1|
|  TP|1.8571428571428572|    1|
|  TP| 1.846382189239332|    1|
|  TP|1.8571428571428572|    1|
|  FP| 1.235138036566608|    1|
|  FP| 1.413253482966552|    1|
|  TP|1.8015873015873016|    1|
+----+------------------+-----+
only showing top 20 rows



In [28]:
dic_results = {}
TP = data.filter(F.col('perf') == "TP").count()
TN = data.filter(F.col('perf') == "TN").count()
FP = data.filter(F.col('perf') == "FP").count()
FN = data.filter(F.col('perf') == "FN").count()

print(TP, TN, FP, FN)

81 1 61 0


In [29]:
dic_results['accuracy'] =  float(TP + TN) / (FP + TP + FN + TN)
dic_results['ppv'] = float(TP) / (TP + FP)
dic_results['npv'] = float(TN) / (TN + FN)
dic_results['sens'] = float(TP) / (TP + FN)
dic_results['spec'] = float(TN) / (TN + FP)

In [30]:
final_results = pd.DataFrame(dic_results, index=[0])

In [31]:
final_results

Unnamed: 0,accuracy,ppv,npv,sens,spec
0,0.573427,0.570423,1.0,1.0,0.016129
