# Traitements des fichiers des organismes


## Pipeline


1. Chargement des fichiers CNOUS, CAF et MSA
2. Nettoyage des données et premier mapping au bon format de données attendu dans la BDD
3. Application des critères sur les données du CNOUS
4. Merge des 3 dataframes
5. Cleanup (numéro de téléphone, date de naissance + 4 heures)
7. Dédoublonage : Supprimer les bénéficiaires en doubles chez plusieurs organismes nom, prenom, date_naissance, genre, commune de résidence
5. Mapping des json
6. Ajout des valeurs pour les colonnes par défault
8. Output to CSV

## Encoding
CNAF -> ISO-8859-1

CNOUS -> windows-1252

MSA -> utf-8-sig

In [None]:
import csv
import os
from dotenv import load_dotenv
import pandas as pd
import json
from datetime import datetime
import numpy as np

load_dotenv()

cnous_filepath = os.environ['CNOUS_PATHFILE']
msa_filepath = os.environ['MSA_PATHFILE']
cnaf_filepath = os.environ['CNAF_PATHFILE']
base_output_filepath = os.environ['DB_EXPORT']

MEMORY_OPTIMIZATION = True

In [None]:
cnous_df = pd.read_csv(cnous_filepath, encoding='windows-1252', on_bad_lines='skip', sep=';', engine="c")

In [None]:
cnaf_column_type = {
    'code_postal': 'str',
    'tel': 'str',
    'matricul': 'str'
}

cnaf_df = pd.read_csv(cnaf_filepath, encoding='iso-8859-1', on_bad_lines='skip', sep=';', quoting=csv.QUOTE_NONE, dtype=cnaf_column_type, engine="c")

In [None]:
msa_column_type = {
    'code_postal': 'str',
    'numero_tel_portable': 'str',
    'code_insee_commune': 'str',
    'date_de_naissance_alloc': 'str',
    'numero_allocataire': 'str',
    'organisme': 'str'
}

msa_df = pd.read_csv(msa_filepath, encoding='utf-8-sig', on_bad_lines='skip', sep=';', quoting=csv.QUOTE_NONE, dtype=msa_column_type, engine="c")

In [None]:
# map MSA
msa_column_mapping = {
    # infos allocataire
    'numero_allocataire': 'allocataire-matricule',
    'organisme': 'allocataire-code_organisme',
    'qualite_allocataire': 'allocataire-qualite',
    'nom_allocataire': 'allocataire-nom',
    'prenom_allocataire': 'allocataire-prenom',
    'date_de_naissance_alloc': 'allocataire-date_naissance',
    'pays_de_naissance': 'allocataire-pays_naissance',
    'code_insee_commune_alloc': 'allocataire-code_insee_commune_naissance',
    'commune_de_naissance': 'allocataire-commune_naissance',
    'code_iso_pays': 'allocataire-code_iso_pays_naissance',
    'adresse_de_messagerie': 'allocataire-courriel',
    'numero_tel_portable': 'allocataire-telephone',

    # adresse allocataire
    # pas de voie
    'nom_commune': 'adresse_allocataire-commune',
    'code_postal': 'adresse_allocataire-code_postal',
    'code_insee_commune': 'adresse_allocataire-code_insee',
    'complement_adresse': 'adresse_allocataire-cplt_adresse',

    # infos bénéficiaires
    'nom_beneficiaire': 'nom',
    'prenom_beneficiaire': 'prenom',
    'genre_beneficiaire': 'genre',
    'date_de_naissance_beneficiaire': 'date_naissance'
}


df_psp_mapped_msa = msa_df.copy()
df_psp_mapped_msa = df_psp_mapped_msa.rename(columns=msa_column_mapping) 

# situation
df_psp_mapped_msa['situation'] = 'Jeune'
mask_nom_equal = df_psp_mapped_msa['allocataire-nom'] == df_psp_mapped_msa['nom'] 
mask_prenom_equal = df_psp_mapped_msa['allocataire-prenom'] == df_psp_mapped_msa['prenom']
df_psp_mapped_msa.loc[mask_nom_equal & mask_prenom_equal, 'situation'] =  'AAH'

# organisme
df_psp_mapped_msa['organisme'] = 'MSA'

# trim all values
df_psp_mapped_msa['qualite_destinataire'] = df_psp_mapped_msa['qualite_destinataire'].str.strip() 
df_psp_mapped_msa['nom_destinataire'] = df_psp_mapped_msa['nom_destinataire'].str.strip()
df_psp_mapped_msa['prenom_destinataire'] = df_psp_mapped_msa['prenom_destinataire'].str.strip()

df_psp_mapped_msa['adresse_allocataire-commune'] = df_psp_mapped_msa['adresse_allocataire-commune'].str.strip()
df_psp_mapped_msa['adresse_allocataire-code_postal'] = df_psp_mapped_msa['adresse_allocataire-code_postal'].str.strip()
df_psp_mapped_msa['adresse_allocataire-code_insee'] = df_psp_mapped_msa['adresse_allocataire-code_insee'].str.strip()
df_psp_mapped_msa['adresse_allocataire-cplt_adresse'] = df_psp_mapped_msa['adresse_allocataire-cplt_adresse'].str.strip()

## voie
df_psp_mapped_msa['numero_voie'] = df_psp_mapped_msa['numero_voie'].str.strip()
df_psp_mapped_msa['complement_numero_voie'] = df_psp_mapped_msa['complement_numero_voie'].str.strip()
df_psp_mapped_msa['type_voie'] = df_psp_mapped_msa['type_voie'].str.strip()
df_psp_mapped_msa['voie'] = df_psp_mapped_msa['voie'].str.strip()

df_psp_mapped_msa['nom'] = df_psp_mapped_msa['nom'].str.strip() 
df_psp_mapped_msa['prenom'] = df_psp_mapped_msa['prenom'].str.strip() 

# nom adresse postale de l'allocataire
df_psp_mapped_msa['adresse_allocataire-nom_adresse_postale'] = df_psp_mapped_msa['qualite_destinataire'] + ' ' + df_psp_mapped_msa['nom_destinataire'] + ' ' + df_psp_mapped_msa['prenom_destinataire']

# voie de l'adresse postale de l'allocataire
df_psp_mapped_msa['adresse_allocataire-voie'] = df_psp_mapped_msa['numero_voie'] + ' ' + df_psp_mapped_msa['complement_numero_voie'] + ' ' + df_psp_mapped_msa['type_voie'] + df_psp_mapped_msa['voie']
df_psp_mapped_msa['adresse_allocataire-voie'] = df_psp_mapped_msa['adresse_allocataire-voie'].str.strip()

# sexe
df_psp_mapped_msa['genre'] = df_psp_mapped_msa['genre'].replace(1, 'M')
df_psp_mapped_msa['genre'] = df_psp_mapped_msa['genre'].replace(2, 'F')

# qualite allocataire
df_psp_mapped_msa['allocataire-qualite'] = df_psp_mapped_msa['allocataire-qualite'].str.strip().replace('MME', 'Mme')
df_psp_mapped_msa['allocataire-qualite'] = df_psp_mapped_msa['allocataire-qualite'].str.strip().replace('MR', 'M')

# code commune de naissance de l'allocataire
df_psp_mapped_msa['allocataire-code_insee_commune_naissance'] = df_psp_mapped_msa['allocataire-code_insee_commune_naissance'].replace('0', '')

# date de naissance allocataire
df_psp_mapped_msa['allocataire-date_naissance'] = df_psp_mapped_msa['allocataire-date_naissance'].replace('0', np.NaN)
df_psp_mapped_msa['allocataire-date_naissance'] = df_psp_mapped_msa['allocataire-date_naissance'].replace('00000000', np.NaN)
df_psp_mapped_msa['allocataire-date_naissance'] = pd.to_datetime(df_psp_mapped_msa['allocataire-date_naissance'], format='%Y%m%d')
df_psp_mapped_msa['allocataire-date_naissance'] = df_psp_mapped_msa['allocataire-date_naissance'].dt.strftime('%d/%m/%Y')

# date de naissance bénéficiaire
df_psp_mapped_msa['date_naissance'] = pd.to_datetime(df_psp_mapped_msa['date_naissance'], format='%Y%m%d')
# df_psp_mapped_msa['date_naissance'] = df_psp_mapped_msa['date_naissance'].dt.strftime('%d/%m/%Y')

# de manière certaine les bénéficiaires avant 2004 sont AAH
mask_dn_before_2004 = df_psp_mapped_msa['date_naissance'].dt.date < datetime(2004, 6, 1).date()
df_psp_mapped_msa.loc[mask_dn_before_2004, 'situation'] =  'AAH'

# remove unused 
df_psp_mapped_msa = df_psp_mapped_msa.drop(columns=[
  'nom_destinataire',
  'prenom_destinataire',
  'qualite_destinataire',
  'numero_voie',
  'complement_numero_voie',
  'type_voie',
  'voie'
])

if MEMORY_OPTIMIZATION:
  del msa_df


In [None]:
# trim sur tous les champs MSA (même dans les json)
df_psp_mapped_msa['allocataire-matricule'] = df_psp_mapped_msa['allocataire-matricule'].str.strip() 
df_psp_mapped_msa['allocataire-qualite'] = df_psp_mapped_msa['allocataire-qualite'].str.strip() 
df_psp_mapped_msa['allocataire-nom'] = df_psp_mapped_msa['allocataire-nom'].str.strip() 
df_psp_mapped_msa['allocataire-prenom'] = df_psp_mapped_msa['allocataire-prenom'].str.strip() 
df_psp_mapped_msa['allocataire-date_naissance'] = df_psp_mapped_msa['allocataire-date_naissance'].str.strip() 
df_psp_mapped_msa['allocataire-pays_naissance'] = df_psp_mapped_msa['allocataire-pays_naissance'].str.strip() 
df_psp_mapped_msa['allocataire-code_insee_commune_naissance'] = df_psp_mapped_msa['allocataire-code_insee_commune_naissance'].str.strip() 
df_psp_mapped_msa['allocataire-commune_naissance'] = df_psp_mapped_msa['allocataire-commune_naissance'].str.strip() 
df_psp_mapped_msa['allocataire-code_iso_pays_naissance'] = df_psp_mapped_msa['allocataire-code_iso_pays_naissance'].str.strip() 
df_psp_mapped_msa['allocataire-courriel'] = df_psp_mapped_msa['allocataire-courriel'].str.strip()
df_psp_mapped_msa['allocataire-telephone'] = df_psp_mapped_msa['allocataire-telephone'].str.strip()
df_psp_mapped_msa.to_csv('msa-test.csv')

In [None]:
# map CNAF
cnaf_column_mapping = {
    # infos allocataire
    'matricul': 'allocataire-matricule',
    'numorg': 'allocataire-code_organisme',
    'qualite': 'allocataire-qualite',
    'nom': 'allocataire-nom',
    'prenom': 'allocataire-prenom',
    
    # adresse allocataire
    'code_postal': 'adresse_allocataire-code_postal',
    'commune': 'adresse_allocataire-commune',
    'adresse': 'adresse_allocataire-voie',
    'code_insee': 'adresse_allocataire-code_insee',

    # infos allocataire
    'dt_naissance': 'allocataire-date_naissance',
    'mail': 'allocataire-courriel',
    'tel': 'allocataire-telephone',
    'nom_eligible': 'nom',
    'prenom_eligible': 'prenom',
    'sexe_eligible': 'genre',
    
    # infos bénéficiaires
    'dt_naissance_eligible': 'date_naissance',
}

df_psp_mapped_cnaf = cnaf_df.copy()
df_psp_mapped_cnaf = df_psp_mapped_cnaf.rename(columns=cnaf_column_mapping) 

# qualité allocataire
df_psp_mapped_cnaf['allocataire-qualite'] = df_psp_mapped_cnaf['allocataire-qualite'].replace('Madame', 'Mme')
df_psp_mapped_cnaf['allocataire-qualite'] = df_psp_mapped_cnaf['allocataire-qualite'].replace('Monsieur', 'M')

# voie de l'adresse postale de l'allocataire
df_psp_mapped_cnaf['adresse_allocataire-cplt_adresse'] = cnaf_df['compl_adresse1'] + ' ' + cnaf_df['compl_adresse2'] + ' ' + cnaf_df['compl_adresse3']

# organisme
df_psp_mapped_cnaf['organisme'] = 'CAF'

# sexe
df_psp_mapped_cnaf['genre'] = df_psp_mapped_cnaf['genre'].replace('Masculin', 'M')
df_psp_mapped_cnaf['genre'] = df_psp_mapped_cnaf['genre'].replace('Féminin', 'F')

# situation
df_psp_mapped_cnaf['situation'] = 'Jeune'
mask_nom_equal = df_psp_mapped_cnaf['allocataire-nom'] == df_psp_mapped_cnaf['nom'] 
mask_prenom_equal = df_psp_mapped_cnaf['allocataire-prenom'] == df_psp_mapped_cnaf['prenom']
df_psp_mapped_cnaf.loc[mask_nom_equal & mask_prenom_equal, 'situation'] =  'AAH'

# Format date_naissance to datetime python object for processing
df_psp_mapped_cnaf['date_naissance'] = pd.to_datetime(df_psp_mapped_cnaf['date_naissance'], format='%d/%m/%Y')
df_psp_mapped_cnaf['date_naissance'] = pd.to_datetime(df_psp_mapped_cnaf['date_naissance'], format='%Y-%m-%d')

# de manière certaine les bénéficiaires avant 2004 sont AAH
mask_dn_before_2004 = df_psp_mapped_cnaf['date_naissance'].dt.date < datetime(2004, 6, 1).date()
df_psp_mapped_cnaf.loc[mask_dn_before_2004, 'situation'] =  'AAH'

df_psp_mapped_cnaf['allocataire-qualite'] = df_psp_mapped_cnaf['allocataire-qualite'].replace('MR', 'M')

# remove unused 
df_psp_mapped_cnaf = df_psp_mapped_cnaf.drop(columns=[
'compl_adresse1',
'compl_adresse2',
'compl_adresse3',
'nb_eligibles'
])

if MEMORY_OPTIMIZATION:
  del cnaf_df


In [None]:
# map CNOUS
cnous_column_mapping = {
    # infos allocataire
    'ine': 'allocataire-matricule',
    # pas de code organisme
    'civiliteLibelleCourt': 'allocataire-qualite',
    'nom': 'allocataire-nom',
    'prenom': 'allocataire-prenom',
    'dateNaissance': 'allocataire-date_naissance',
    'mail': 'allocataire-courriel',
    'lieuNaissCodeCommuneInsee': 'allocataire-code_insee_commune_naissance',
    'lieuNaissLibelleCommune': 'allocataire-commune_naissance',
    'lieuNaissCodePays': 'allocataire-code_iso_pays_naissance',
    'lieuNaissLibellePays': 'allocataire-pays_naissance',

    # adresse allocataire
    'adresseVoie': 'adresse_allocataire-voie',
    'adresseCodePostal': 'adresse_allocataire-code_postal',
    'adresseLocalite': 'adresse_allocataire-commune',
    'adresseCodeLocalite': 'adresse_allocataire-code_insee',
    'adresseComplement1': 'adresse_allocataire-cplt_adresse',
    'adresseComplement2': '',
    'adresseCodePays': '-',

    # infos bénéficiaires
    'genre': 'civiliteLibelleCourt',
}


df_psp_mapped_cnous = cnous_df.copy()
df_psp_mapped_cnous.rename(columns=cnous_column_mapping, inplace=True) 

# organisme
df_psp_mapped_cnous['organisme'] = 'CNOUS'
df_psp_mapped_cnous['situation'] = 'Boursier'

# remove weird values
df_psp_mapped_cnous = df_psp_mapped_cnous[df_psp_mapped_cnous['allocataire-date_naissance'] != 'dateNaissance']

# infos bénéficiaires = allocataire
df_psp_mapped_cnous['date_naissance'] = pd.to_datetime(df_psp_mapped_cnous['allocataire-date_naissance'], format='%d/%m/%Y')
df_psp_mapped_cnous['allocataire-date_naissance'] = df_psp_mapped_cnous['date_naissance'].dt.strftime('%d/%m/%Y')
df_psp_mapped_cnous['nom'] = df_psp_mapped_cnous['allocataire-nom']
df_psp_mapped_cnous['prenom'] = df_psp_mapped_cnous['allocataire-prenom']
df_psp_mapped_cnous['genre'] = df_psp_mapped_cnous['allocataire-qualite']
df_psp_mapped_cnous['allocataire-qualite'] = df_psp_mapped_cnous['allocataire-qualite'].replace('F', 'Mme')

# TODO: Voir problème encoding sur allocaitre_adresse

if MEMORY_OPTIMIZATION:
  del cnous_df

In [None]:
# apply criterias on CNOUS datas
from datetime import timedelta
from dateutil.relativedelta import relativedelta

end_date = pd.to_datetime('2024-10-15').date()
start_date = end_date - relativedelta(years=28)

cnous_situation_mask = (df_psp_mapped_cnous['date_naissance'].dt.date >= start_date) & (df_psp_mapped_cnous['date_naissance'].dt.date <= end_date)
df_psp_mapped_cnous_filtered = df_psp_mapped_cnous[cnous_situation_mask]

print(f"{len(df_psp_mapped_cnous) - len(df_psp_mapped_cnous_filtered)} rows for CNOUS dataframe were removed based on criterias")

In [None]:
if MEMORY_OPTIMIZATION:
  del df_psp_mapped_cnous

# Merge dans un seul dataframe cible pour BDD Postgresql

In [None]:
# concat into a single dataframe
df_all = pd.concat([df_psp_mapped_cnous_filtered, df_psp_mapped_msa, df_psp_mapped_cnaf], axis=0, ignore_index=True)

# remove rows with missing necessary values (if one of those value are missing we cannot generate a code)
necessary_column = ['nom', 'prenom', 'date_naissance', 'genre']
df_all_valid_row = df_all.dropna(subset=necessary_column)

# remove columns with all null value
df_all_valid = df_all_valid_row.dropna(axis=1, how='all')

# test
assert len(df_all_valid[df_all['nom'].isnull() | df_all_valid['prenom'].isnull() | df_all_valid['date_naissance'].isnull()]) == 0

In [None]:
# Upper case these columns for the merge
df_all_valid['prenom'] = df_all_valid['prenom'].astype(str).apply(lambda x: x.upper())
df_all_valid['nom'] = df_all_valid['nom'].astype(str).apply(lambda x: x.upper())
df_all_valid['genre'] = df_all_valid['genre'].astype(str).apply(lambda x: x.upper())

In [None]:
# lower case on emails on all
df_all_valid['allocataire-courriel'] = df_all_valid['allocataire-courriel'].str.lower()

In [None]:
# remove rows when beneficiary is before september 1993
mask_before_1993 = pd.to_datetime(df_all_valid['date_naissance']) > datetime(1993, 9, 16)
df_all_valid_after93 = df_all_valid[mask_before_1993]

print(f"{len(df_all_valid) - len(df_all_valid_after93)} rows where removed because date_naissance was before 1993")

In [None]:
if MEMORY_OPTIMIZATION:
    del df_psp_mapped_cnous_filtered
    del df_psp_mapped_msa
    del df_psp_mapped_cnaf

In [None]:
# add missing 0 to phone numbers
mask_tel_not_null = df_all_valid_after93['allocataire-telephone'].notna()
mask_no_zero_phone_number = ~df_all_valid_after93.loc[mask_tel_not_null, 'allocataire-telephone'].str.startswith('0')
mask_9_char_phone = df_all_valid_after93.loc[mask_tel_not_null, 'allocataire-telephone'].str.len() == 9
df_all_valid_after93.loc[mask_tel_not_null & mask_no_zero_phone_number & mask_9_char_phone, 'allocataire-telephone'] = '0' + df_all_valid_after93['allocataire-telephone']

# set '0' phone values to None
mask_tel_eq_zero = df_all_valid_after93['allocataire-telephone'] == '0'
df_all_valid_after93.loc[mask_tel_eq_zero, 'allocataire-telephone'] = np.NaN

# add 4h on all birthdates
df_all_valid_after93.loc[:,'date_naissance'] = df_all_valid_after93['date_naissance'] + timedelta(hours=4)

In [None]:
# remove duplicate beneficiaries
df_all_valid_no_duplicate = df_all_valid_after93.drop_duplicates(subset=[
  'date_naissance',
  'nom', 
  'prenom', 
  'genre', 
  'organisme', 
  'situation',
  'allocataire-qualite', 
  'allocataire-matricule',
  'allocataire-code_organisme', 
  'allocataire-telephone',
  # 'allocataire-nom', 
  'allocataire-prenom', 
  'allocataire-date_naissance',
  'allocataire-courriel', 
  # we can remove this column additionaly
  'allocataire-code_insee_commune_naissance',
  'allocataire-commune_naissance', 
  'allocataire-code_iso_pays_naissance',
  'allocataire-pays_naissance']
)

print(f"{len(df_all_valid_after93) - len(df_all_valid_no_duplicate)} duplicate rows where removed")

In [None]:
# map to json values for target DB model 
## map allocataire json
def to_json_allocataire_without_null(row):
    allocataire_mapping = {
        'qualite': row['allocataire-qualite'],
        'matricule': row['allocataire-matricule'],
        'code_organisme': row['allocataire-code_organisme'],
        'telephone': row['allocataire-telephone'],
        'nom': row['allocataire-nom'],
        'prenom': row['allocataire-prenom'],
        'date_naissance': row['allocataire-date_naissance'],
        'courriel': row['allocataire-courriel'],
        'code_insee_commune_naissance': row['allocataire-code_insee_commune_naissance'],
        'commune_naissance': row['allocataire-commune_naissance'],
        'code_iso_pays_naissance': row['allocataire-code_iso_pays_naissance'],
        'pays_naissance': row['allocataire-pays_naissance']
    }
    filtered_NaN_allocataire = {k: v for k, v in allocataire_mapping.items() if pd.notnull(v)}
    return json.dumps(filtered_NaN_allocataire, ensure_ascii=False)

df_all_valid_no_duplicate['allocataire'] = df_all_valid_no_duplicate.apply(to_json_allocataire_without_null, axis=1)

In [None]:
## map adresse_allocataire json
def to_json_adresse_without_null(row):
    adresse_mapping = {
        'voie': row['adresse_allocataire-voie'],
        'code_postal': row['adresse_allocataire-code_postal'],
        'nom_adresse_postale': row['adresse_allocataire-nom_adresse_postale'],
        'commune': row['adresse_allocataire-commune'],
        'code_insee': row['adresse_allocataire-code_insee'],
        'cplt_adresse': row['adresse_allocataire-cplt_adresse'],
    }
    filtered_address = {k: v for k, v in adresse_mapping.items() if pd.notnull(v)}
    return json.dumps(filtered_address, ensure_ascii=False)

df_all_valid_no_duplicate['adresse_allocataire'] = df_all_valid_no_duplicate.apply(to_json_adresse_without_null, axis=1)

In [None]:

## drop null value
df_final = df_all_valid_no_duplicate.drop(columns=[
  'allocataire-qualite',
  'allocataire-matricule',
  'allocataire-code_organisme',
  'allocataire-nom',
  'allocataire-prenom',
  'allocataire-telephone',
  'allocataire-date_naissance',
  'allocataire-courriel',
  'allocataire-code_insee_commune_naissance',
  'allocataire-commune_naissance',
  'allocataire-code_iso_pays_naissance',
  'allocataire-pays_naissance',
  'adresse_allocataire-voie',
  'adresse_allocataire-nom_adresse_postale',
  'adresse_allocataire-code_postal',
  'adresse_allocataire-commune',
  'adresse_allocataire-code_insee',
  'adresse_allocataire-cplt_adresse',
])


In [None]:
# Add missing default column needed for target DB model
import datetime

df_final['updated_at'] = datetime.datetime.now()
df_final['exercice_id'] = 3
df_final['uuid_doc'] = np.NaN

In [None]:
# output to CSV
df_final.to_csv(base_output_filepath)