# Appariement SPlink sur données de décès

## Environnement 

In [None]:
### Installation des packages splink et recordlinkage
!pip install splink
!pip install recordlinkage

In [None]:
### Import des librairies nécessaires
import pandas as pd
from splink.duckdb.linker import DuckDBLinker
import splink.duckdb.comparison_library as cl
import splink.duckdb.comparison_template_library as ctl
from splink.duckdb.blocking_rule_library import block_on

# Pour S3
import os
import s3fs

## Chargement des données

In [None]:


# Create filesystem object
S3_ENDPOINT_URL = "https://" + os.environ["AWS_S3_ENDPOINT"]
fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL})
#print(fs)
fs.ls("projet-ssplab")

In [None]:
#deces = pd.read_parquet("s3a://projet-ssplab/appariements/deces.parquet")
BUCKET = "projet-ssplab"

# Import de la table des décès
FILE_KEY_S3 = "appariements/deces.parquet"
FILE_PATH_S3 = BUCKET + "/" + FILE_KEY_S3

with fs.open(FILE_PATH_S3, mode="rb") as file_in:
    deces = pd.read_parquet(file_in)

# Import de la table des décès perturbée
FILE_KEY_S3 = "appariements/deces_perturb.parquet"
FILE_PATH_S3 = BUCKET + "/" + FILE_KEY_S3

with fs.open(FILE_PATH_S3, mode="rb") as file_in:
    deces_perturb = pd.read_parquet(file_in)

In [None]:
len(deces) == len(deces_perturb)

Besoin de passer les colonnes de noms/prénoms en minuscules dans la table de gauche.

In [None]:
deces['nom_etat_civil'] = deces['nom_etat_civil'].str.lower()
deces['prenoms_etat_civil'] = deces['prenoms_etat_civil'].str.lower()

## Appariement 

Les individus ont les mêmes identifiants ligne à ligne (la base perturbée contient les mêmes individus, triés dans le même ordre)

In [None]:
nb_lignes = 10000
df_gauche = deces.iloc[:nb_lignes]
df_droite = deces_perturb.iloc[:nb_lignes]

In [None]:
df_gauche['ident_deces'].equals(df_droite['ident_deces'])

Part de lignes ayant subi une "perturbation" lors de la création de la table de gauche

In [None]:
#df_droite = df_droite.dropna(subset=['lieunaiss'])
df_droite.agg(part=('perturbation', 'sum')) / len(df_droite)

In [None]:
df_gauche = df_gauche.drop(['datenaiss', 'datedeces', 'lieudeces', 'adeces'], axis=1)

Initialisation de l'objet Linker

In [None]:
linker = DuckDBLinker([df_gauche, df_droite], {"link_type": "link_only", "unique_id_column_name": "ident_deces"})

Règle de blocage

In [None]:
blocking_rules_lieunaissance = [
        "l.lieunaiss = r.lieunaiss"
    ]

blocking_rules_postcode_and_yearofbirth = [
        "l.lieunaiss = r.lieunaiss and l.anais_etat_civil = r.anais_etat_civil",
    ]

blocking_rules_postcode_or_yearofbirth = [
        "l.lieunaiss = r.lieunaiss or l.anais_etat_civil = r.anais_etat_civil",
    ]


In [None]:
#linker.cumulative_num_comparisons_from_blocking_rules_chart(blocking_rules_lieunaissance)
print("Nombre de paires conservées - blocage simple sur le code commune de naissance : "
f"{linker.count_num_comparisons_from_blocking_rule(' or '.join(blocking_rules_postcode_and_yearofbirth))}")

In [None]:
linker.cumulative_num_comparisons_from_blocking_rules_chart(blocking_rules_lieunaissance)

In [None]:
linker.cumulative_num_comparisons_from_blocking_rules_chart(blocking_rules_postcode_and_yearofbirth)

In [None]:
linker.cumulative_num_comparisons_from_blocking_rules_chart(blocking_rules_postcode_or_yearofbirth)

On commence le test en bloquant sur le lieu de naissance et la date de naissance (pour limiter le nombre de paires)

### Règle de comparaison des champs

In [None]:
df_gauche

In [None]:
df_droite

A faire : voir si le "term_frequency_adjustment" a un impact sur les performances !

In [None]:

comparisons_list = [
        cl.jaro_winkler_at_thresholds("nom_etat_civil", [0.95, 0.88], term_frequency_adjustments = True),
        cl.jaro_winkler_at_thresholds("prenoms_etat_civil", [0.95, 0.88], term_frequency_adjustments = True),
        cl.exact_match("mnais_etat_civil", term_frequency_adjustments=True),
        cl.exact_match("jnais_etat_civil", term_frequency_adjustments=True)
    ]

### Définition du dictionnaire des paramètres

In [None]:
linkage_settings = {
    "link_type": "link_only",
    "blocking_rules_to_generate_predictions": blocking_rules_postcode_and_yearofbirth,
    "comparisons": comparisons_list,
    "unique_id_column_name": "ident_deces"
}

## Estimation des paramètres

In [None]:
linker = DuckDBLinker([df_gauche, df_droite], linkage_settings)

In [None]:
linker.estimate_u_using_random_sampling(max_pairs=1e6)

In [None]:
session_nom = linker.estimate_parameters_using_expectation_maximisation(block_on("nom_etat_civil"))

In [None]:
session_prenom = linker.estimate_parameters_using_expectation_maximisation(block_on("prenoms_etat_civil"))

### Classification des paires

Attention à vérifier l'inpact du seuil **0.5**

In [None]:
#results = linker.predict(threshold_match_probability=0.5)
results = linker.predict()
results_pandas = results.as_pandas_dataframe()
results_pandas.shape


### Résolution des conflits

In [None]:
sql = f"""
with ranked as

(
select *,
row_number() OVER (
    PARTITION BY ident_deces_l order by match_weight desc
    ) as row_number
from {results.physical_name}
)

select *
from ranked
where row_number = 1


"""
results = linker.query_sql(sql)

In [None]:
results

### Evaluation de la qualité

In [None]:
def compute_performance_metrics_FEBRL(results, dataset_size):
    """
    Compute performance metrics of a record linkage process on FEBRL synthetic data.
    The assumption is that the size of the two datasets is the same and every record 
    from dataset A has exactly one match in dataset B.

            Parameters:
                    results (pandas DataFrame): Output from the linkage process
                    dataset_size (int): Length of both datasets to be linked

            Returns:
                    performance_metrics (tuple): Tuple of metrics (TP, TN, FP, FN, precision, recall, F-measure)
    """
    results['actual'] = (results['ident_deces_l'].str.extract(r'(Deces_2021_\d+)') 
                                == results['ident_deces_r'].str.extract(r'(Deces_2021_\d+)'))
    TP = sum(results['actual'])
    FP = sum(~results['actual'])
    #Pairs that were removed in the indexing phase must be taken into account to compute True and False negatives
    FN = dataset_size - TP
    TN = dataset_size*dataset_size - TP - FN - FP

    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    Fscore = 2 * precision * recall / (precision + recall)
    performance_metrics = (TP, TN, FP, FN, precision, recall, Fscore)
    return(performance_metrics)

def print_performance_metrics(linkage_output, dataset_size):
    """
    Prints performance metrics of a record linkage process on synthetic data.
    The assumption is that the size of the two datasets is the same and every record 
    from dataset A has exactly one match in dataset B.

            Parameters:
                    results (pandas DataFrame): Output from the linkage process
                    dataset_size (int): Length of both datasets to be linked

            Returns:
                    None
    """
    TP, TN, FP, FN, precision, recall, Fscore = compute_performance_metrics_FEBRL(results, dataset_size)
    print(f"Vrais positifs : {TP:,}".replace(',', ' '))
    print(f"Vrais négatifs : {TN:,}".replace(',', ' '))
    print(f"Faux positifs : {FP:,}".replace(',', ' '))
    print(f"Faux négatifs : {FN:,}".replace(',', ' '))
    print(f"Précision : {precision:.4}")
    print(f"Rappel : {recall:.4}")
    print(f"F-mesure : {Fscore:.4}")

print_performance_metrics(results, n)



In [None]:
linker.missingness_chart()