In [1]:
datasetA = spark.read.csv('base_sintetica_ascii_a.csv', header=True, sep=',')
datasetB = spark.read.csv('base_sintetica_ascii_b.csv', header=True, sep=',')

In [2]:
print datasetA.count()
print datasetB.count()


540
143


In [3]:
import jellyfish
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType

In [4]:
def criaMetaphone(col):
    if (col!=None) : 
        return jellyfish.metaphone(col.strip())
    else:
        return ""
udf_criaMetaphone = F.udf(criaMetaphone, StringType())

In [5]:
datasetA = datasetA.withColumn('dia_nasc', F.split(datasetA['data_nasc'], '/')[0].cast(FloatType()))
datasetA = datasetA.withColumn('mes_nasc', F.split(datasetA['data_nasc'], '/')[1].cast(FloatType()))
datasetA = datasetA.withColumn('ano_nasc', F.split(datasetA['data_nasc'], '/')[2].cast(FloatType()))

datasetA = datasetA.fillna(0, subset=['dia_nasc', 'mes_nasc', 'ano_nasc'])

datasetB = datasetB.withColumn('dia_nasc', F.split(datasetB['data_nasc'], '/')[0].cast(FloatType()))
datasetB = datasetB.withColumn('mes_nasc', F.split(datasetB['data_nasc'], '/')[1].cast(FloatType()))
datasetB = datasetB.withColumn('ano_nasc', F.split(datasetB['data_nasc'], '/')[2].cast(FloatType()))

datasetB = datasetB.fillna(0, subset=['dia_nasc', 'mes_nasc', 'ano_nasc'])

In [6]:
datasetA = datasetA.select([F.col(c).alias(c+'_a') for c in datasetA.columns])
datasetB = datasetB.select([F.col(c).alias(c+'_b') for c in datasetB.columns])

In [7]:
datasetA = datasetA.withColumn('phonetic_nome_a', udf_criaMetaphone(F.col('nome_a')))
datasetA = datasetA.withColumn('phonetic_mae_a', udf_criaMetaphone(F.col('mae_a')))
datasetA = datasetA.withColumn('phonetic_endereco_a', udf_criaMetaphone(F.col('endereco_a')))

datasetB = datasetB.withColumn('phonetic_nome_b', udf_criaMetaphone(F.col('nome_b')))
datasetB = datasetB.withColumn('phonetic_mae_b', udf_criaMetaphone(F.col('mae_b')))
datasetB = datasetB.withColumn('phonetic_endereco_b', udf_criaMetaphone(F.col('endereco_b')))

### Separando os atributos para o linkage

In [8]:
datasetA = datasetA.select(['cpf_a','cep_a', 'cidade_a', 'dia_nasc_a', 'mes_nasc_a', 'ano_nasc_a',
                            'endereco_a', 'estado_a', 
                            'phonetic_mae_a', 'nome_a', 'phonetic_endereco_a',
                            'phonetic_nome_a', 'tipo_sanguineo_a'])

datasetB = datasetB.select(['cpf_b','cep_b', 'cidade_b', 'dia_nasc_b', 'mes_nasc_b', 'ano_nasc_b',
                            'endereco_b', 'estado_b', 
                            'phonetic_mae_b', 'nome_b', 'phonetic_endereco_b',
                            'phonetic_nome_b', 'tipo_sanguineo_b']) 

### Fazendo o linkage

In [9]:
dataset_linkage = datasetA.crossJoin(datasetB)

In [10]:
dataset_linkage.toPandas().head()

Unnamed: 0,cpf_a,cep_a,cidade_a,dia_nasc_a,mes_nasc_a,ano_nasc_a,endereco_a,estado_a,phonetic_mae_a,nome_a,...,dia_nasc_b,mes_nasc_b,ano_nasc_b,endereco_b,estado_b,phonetic_mae_b,nome_b,phonetic_endereco_b,phonetic_nome_b,tipo_sanguineo_b
0,097.627.958-49,65090-677,Sao Luis,10.0,9.0,1948.0,2a Travessa do Ribeirao,MA,LS BTRS,Vicente Oliver Monteiro,...,22.0,11.0,1965.0,Avenida Mario Homem de Melo,RR,ISBL MRX ANTNL,Felipe Bruno dos Santos,AFNT MR HMM T ML,FLP BRN TS SNTS,AB-
1,097.627.958-49,65090-677,Sao Luis,10.0,9.0,1948.0,2a Travessa do Ribeirao,MA,LS BTRS,Vicente Oliver Monteiro,...,0.0,0.0,0.0,Rua Rio Cuiaba,MT,ALN ISTR,Marcio Heitor Silveira,R R KB,MRS HTR SLFR,AB-
2,097.627.958-49,65090-677,Sao Luis,10.0,9.0,1948.0,2a Travessa do Ribeirao,MA,LS BTRS,Vicente Oliver Monteiro,...,23.0,9.0,1961.0,Rua Romeu Lopes de Carvalho,SC,KRLN KBRL,Theo Mario Victor Barbosa,R RM LPS T KRFLH,0 MR FKTR BRBS,A-
3,097.627.958-49,65090-677,Sao Luis,10.0,9.0,1948.0,2a Travessa do Ribeirao,MA,LS BTRS,Vicente Oliver Monteiro,...,11.0,5.0,1981.0,,CE,TRS BRN,Pietro Henrique de Paula,,PTR HNRK T PL,AB-
4,097.627.958-49,65090-677,Sao Luis,10.0,9.0,1948.0,2a Travessa do Ribeirao,MA,LS BTRS,Vicente Oliver Monteiro,...,14.0,12.0,1941.0,Rua Pato Branco,MS,ALS LTX,Kaique Thiago de Paula,R PT BRNK,KK 0K T PL,A-


In [11]:
max_porcentagem_diferenca_dias = abs(1.0 - 31.0) / ((1.0 + 31.0) / 2.0)
max_porcentagem_diferenca_meses = abs(1.0 - 12.0) / ((1.0 + 12.0) / 2.0)

max_ano_a = dataset_linkage.agg({"ano_nasc_a": "max"}).collect()[0][0]
max_ano_b = dataset_linkage.agg({"ano_nasc_b": "max"}).collect()[0][0]

max_ano = max(max_ano_a, max_ano_b)

min_ano_a = dataset_linkage.agg({"ano_nasc_a": "min"}).collect()[0][0]
min_ano_b = dataset_linkage.agg({"ano_nasc_b": "min"}).collect()[0][0]

min_ano = min(min_ano_a, min_ano_b)

max_porcentagem_diferenca_anos = abs(min_ano - max_ano) / ((min_ano + max_ano) / 2.0)

In [12]:
def compare( dia_nasc_a, mes_nasc_a, ano_nasc_a, cep_a, cidade_a, nome_a, endereco_a, estado_a, phonetic_nome_a, phonetic_mae_a, phonetic_endereco_a, ts_a,
            dia_nasc_b, mes_nasc_b, ano_nasc_b, cep_b, cidade_b, nome_b, endereco_b, estado_b, phonetic_nome_b, phonetic_mae_b, phonetic_endereco_b, ts_b):
    sim = 0
    
    sim_data = 0.0
    
    try:
        sim_data += max_porcentagem_diferenca_dias - (float(abs(dia_nasc_a - dia_nasc_b) / ((dia_nasc_a + dia_nasc_b) / 2)))
    except:
        pass
    
    try:
        sim_data += max_porcentagem_diferenca_meses - (float(abs(mes_nasc_a - mes_nasc_b) / ((mes_nasc_a + mes_nasc_b) / 2)))
    except:
        pass
    
    try:
        sim_data += max_porcentagem_diferenca_anos - (float(abs(ano_nasc_a - ano_nasc_b) / ((ano_nasc_a + ano_nasc_b) / 2)))
    except:
        pass
    
    sim_nominais = jellyfish.jaro_winkler(unicode(cidade_a), unicode(cidade_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(estado_a), unicode(estado_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(cep_a), unicode(cep_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(endereco_a), unicode(endereco_b))   
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_nome_a), unicode(phonetic_nome_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_mae_a), unicode(phonetic_mae_b))
    sim_nominais += jellyfish.jaro_winkler(unicode(phonetic_endereco_a), unicode(phonetic_endereco_b))
    
    
    # Comparando categorias
    # Note que Hamming é uma distancia, então para saber a similiarade, precisamos
    # encontrar o complemento da medida. 
    sim_cat = 1 - (jellyfish.hamming_distance(unicode(ts_a), unicode(ts_b)))
   
    
    # Media aritmetica simples
    sim = abs(float(sim_nominais + sim_cat + sim_data)/11)
    
    return sim
udf_compare = F.udf(compare, FloatType())

In [13]:
result_linkage = dataset_linkage.withColumn('similarity', udf_compare(F.col('dia_nasc_a'), F.col('mes_nasc_a'), F.col('ano_nasc_a'),
                                                                        F.col('cep_a'), F.col('cidade_a'), F.col('nome_a'), F.col('endereco_a'),F.col('estado_a'), F.col('phonetic_nome_a'), F.col('phonetic_mae_a'),  F.col('phonetic_endereco_a'),  F.col('tipo_sanguineo_a'), 
                                                                        F.col('dia_nasc_b'), F.col('mes_nasc_b'), F.col('ano_nasc_b'), F.col('cep_b'), F.col('cidade_b'), F.col('nome_b'), F.col('endereco_b'),F.col('estado_b'), F.col('phonetic_nome_b'), F.col('phonetic_mae_b'), F.col('phonetic_endereco_b'), F.col('tipo_sanguineo_b')))
    

In [14]:
df = result_linkage.select(['cpf_a', 'cpf_b', 'similarity']).orderBy('similarity', ascending=False).drop_duplicates(['cpf_b'])

In [15]:
df.show()

+--------------+--------------+----------+
|         cpf_a|         cpf_b|similarity|
+--------------+--------------+----------+
|081.783.955-09|081.783.955-09| 1.2333916|
|488.380.897-10|488.380.897-10| 1.2333916|
|475.612.908-02|475.612.908-02| 1.2333916|
|625.938.432-76|625.938.432-76| 1.2333916|
|420.868.827-00|420.868.827-00| 1.2333916|
|390.371.546-89|390.371.546-89| 1.2333916|
|249.678.124-57|249.678.124-57| 1.1424825|
|055.302.287-39|055.302.287-39| 1.2333916|
|506.865.202-46|506.865.202-46| 1.2333916|
|327.630.596-86|327.630.596-86| 1.1837283|
|782.467.545-99|535.871.894-04| 0.9658562|
|248.115.138-00|248.115.138-00| 1.2333916|
|662.831.369-74|442.694.244-60|0.92141384|
|856.757.448-00|455.643.269-38| 1.0816686|
|295.718.231-98|295.718.231-98| 1.1424825|
|134.836.403-39|134.836.403-39| 1.1841491|
|872.870.813-00|872.870.813-00| 1.2297553|
|506.359.495-63|506.359.495-63| 1.2333916|
|895.160.103-00|895.160.103-00| 1.2333916|
|429.056.400-30|429.056.400-30| 1.2333916|
+----------

In [16]:
df.toPandas().to_csv('results_for_accuracy_calc.csv', index = False)

In [17]:
df.filter(F.col('cpf_a') == F.col('cpf_b')).count()

96