# NF26/AI07 - TD3 : Gestion des valeurs Atypiques/Manquantes


In [None]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [None]:
# !pip install numpy==1.23.5
# autre solution pour le problème de dépendances

In [None]:
import pandas as pd
import numpy as np
import sklearn
from datetime import datetime, date

np.NaN = np.nan # problème de dépendances
np.string_ = np.bytes_
np.float_ = np.float64
np.unicode_ = np.str_

import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
spark.conf.set("spark.sql.session.timeZone", "CET")

## Introduction

L'objectif de ce TD est d'implémenter un ETL visant à gérer les données atypiques ou manquantes afin d'alimenter le modèle Étoile étudié lors du TD2. Pour rappel, les 6 diffrérentes bases de données transactionnelles données sont :
- `data_administration`: Ensemble des données administratives liées à chaque consultation.
- `data_medecins`: Ensemble des données liées aux informations associées à chaque Médecin.
- `data_diagnostics`: Ensemble des données liées aux diagnostic de chaque patient lors de chaque consultation.
- `data_treatments`: Ensemble des données liées aux descriptions de chaque traitement.
- `data_medicaments`: Ensemble des informations caractérisant chaque Médicament.
- `data_chambres`: Ensemble des informations caractérisant chaque Chambre de l'Hôpital.

On souhaite notamment pouvoir répondre aux requêtes suivantes :
- "*Quel a été l'âge moyen des patients qui ont eu une certaine Pathology durant une certaine période ?*".
- "*Quel Médicament a été le plus prescrit (en terme de quantité) pour une certaine Pathology durant une certaine période ?*".
- "*Combien de chambres ont accueilli des patients diagnostiqués d'une certaine Pathology durant une certaine période ?*".
- "*Quelle est la proportion de medecins (par spécialité) qui ont dignostiqué une certaine `Pathology` durant une certaine période ?*".


## Lecture des données

Proposition de Code par Arman S-M afin de charger un dossier facilement sur Google Colab :

In [None]:
# Proposition de Code par Arman S-M
import zipfile

def unzip_data(filename):
    #Unzips filename into the current working directory.
    #Args:
    #filename (str): a filepath to a target zip folder to be unzipped.
    zip_ref = zipfile.ZipFile(filename, "r")
    zip_ref.extractall()
    zip_ref.close()

# Usage : unzip_data("nf26-td2.zip")

**Les cellules ci-dessous permettent de lire les différentes bases de données transactionnelles en format `pyspark.pandas.frame.DataFrame`.**

In [None]:
psdf_administration = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_administration.csv', index_col='KeyConsult')
psdf_administration.head(5)

Unnamed: 0_level_0,KeyPatient,Name,FirstName,NumSecu,Date_In,Pathology,KeyChambre,ChambreNumber,Date_Out
KeyConsult,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
KeyConsult45056,KeyPatient38017,Name38017,FistName38017,NS45057,2023-11-22 23:00:00,Pathology27,KeyChambre37,C037,2023-11-23 23:00:00
KeyConsult45057,KeyPatient75131,Name75131,FistName75131,NS45058,2023-11-22 23:00:00,Pathology57,,,2023-11-22 23:00:00
KeyConsult45058,KeyPatient27733,Name27733,FistName27733,NS45059,2023-11-22 23:00:00,Pathology60,KeyChambre5,C005,2023-11-23 23:00:00
KeyConsult45059,KeyPatient76784,Name76784,FistName76784,NS45060,2023-11-22 23:00:00,Pathology30,KeyChambre43,C043,2023-11-24 23:00:00
KeyConsult45060,KeyPatient10645,Name10645,FistName10645,NS45061,2023-11-22 23:00:00,Pathology99,KeyChambre43,C043,2023-12-06 23:00:00


In [None]:
psdf_medecins = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_medecins.csv', index_col='KeyMedecin')
psdf_medecins.head(5)

Unnamed: 0_level_0,NameMedecin,FirstNameMedecin,SpecialityMedecin
KeyMedecin,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
keyMedecin16,NameMedecin16,FirstNameMedecin16,Generaliste
keyMedecin17,NameMedecin17,FirstNameMedecin17,Generaliste
keyMedecin18,NameMedecin18,FirstNameMedecin18,Generaliste
keyMedecin19,NameMedecin19,FirstNameMedecin19,Chirurgien
keyMedecin6,NameMedecin6,FirstNameMedecin6,Dermatologiste


In [None]:
psdf_diagnostics = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_diagnostics.csv', index_col='KeyConsult')
psdf_diagnostics.head(5)

Unnamed: 0_level_0,KeyMedecin,KeyPatient,NamePatient,FirstNamePatient,NumSecu,Age,Weight,Temperature,Tension,Diabete,Pathology,KeyTreatment
KeyConsult,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
KeyConsult45056,keyMedecin5,KeyPatient38017,Name38017,FistName38017,NS45057,20.0,59.793081,37.569814,79.768608,0,Pathology27,KeyTreatment45056
KeyConsult45057,keyMedecin23,KeyPatient75131,Name75131,FistName75131,NS45058,15.0,61.467987,37.565551,164.781518,0,Pathology57,KeyTreatment45057
KeyConsult45058,keyMedecin10,KeyPatient27733,Name27733,FistName27733,NS45059,18.0,59.421285,37.481894,124.363853,0,Pathology60,KeyTreatment45058
KeyConsult45059,keyMedecin15,KeyPatient76784,Name76784,FistName76784,NS45060,20.0,57.974723,37.537167,98.881029,0,Pathology30,KeyTreatment45059
KeyConsult45060,keyMedecin29,KeyPatient10645,Name10645,FistName10645,NS45061,42.0,71.273588,40.379072,80.799002,1,Pathology99,KeyTreatment45060


In [None]:
psdf_treatments = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_treatments.csv', index_col='KeyTreatment')
psdf_treatments.head(5)

Unnamed: 0_level_0,KeyMedicament,QuantityMedicament
KeyTreatment,Unnamed: 1_level_1,Unnamed: 2_level_1
KeyTreatment20428,KeyMed2,2
KeyTreatment20429,KeyMed449,2
KeyTreatment20429,KeyMed321,3
KeyTreatment20429,KeyMed361,3
KeyTreatment20430,KeyMed505,1


In [None]:
psdf_medicaments = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_medicaments.csv', index_col='KeyMedicament')
psdf_medicaments.head(5)

Unnamed: 0_level_0,NameMedicament
KeyMedicament,Unnamed: 1_level_1
KeyMed913,Medicament913
KeyMed914,Medicament914
KeyMed915,Medicament915
KeyMed916,Medicament916
KeyMed917,Medicament917


In [None]:
psdf_chambres = ps.read_csv('/content/drive/MyDrive/Colab Notebooks/TD3/data_chambres.csv', index_col='KeyChambre')
psdf_chambres.head(5)

Unnamed: 0_level_0,ChambreNumber
KeyChambre,Unnamed: 1_level_1
KeyChambre89,C089
KeyChambre90,C090
KeyChambre91,C091
KeyChambre92,C092
KeyChambre93,C093


Afin de pouvoir réaliser notre objectif, nous pourrons convertir ces données en format `pyspark.sql.dataframe.DataFrame`.

In [None]:
sdf_administration = psdf_administration.to_spark(index_col='KeyConsult')
sdf_medecins = psdf_medecins.to_spark(index_col='KeyMedecin')
sdf_diagnostics = psdf_diagnostics.to_spark(index_col='KeyConsult')
sdf_treatments = psdf_treatments.to_spark(index_col='KeyTreatment')
sdf_medicaments = psdf_medicaments.to_spark(index_col='KeyMedicament')
sdf_chambres = psdf_chambres.to_spark(index_col='KeyChambre')

In [1]:
from pyspark.sql.functions import *

## Exercices

On souhaite créer un modèle en étoile permettant de procéder facilement les requêtes suivantes :
- "*Quel a été l'âge moyen des patients qui ont eu une certaine Pathology durant une certaine période ?*".
- "*Quel Médicament a été le plus prescrit (en terme de quantité) pour une certaine Pathology durant une certaine période ?*".
- "*Combien de chambres ont des patients diagnostiqués d'une certaine Pathology durant une certaine période ?*".
- "*Quelle est la proportion de medecins (par spécialité) qui ont dignostiqué une certaine Pathology durant une certaine période ?*".

Pour essayer de répondre à toutes ces questions, nous allons considérer le modèle étoile étudié lors du précédent TD.

![title](Schemas_Etoile_TD_new.pdf)

**Question 1.** Reprendre les algorithmes du TD2 pour construire le modèle Étoile ci-dessus.

In [None]:
t1 = sdf_administration.join(sdf_diagnostics, "KeyConsult")
facts = t1.select("*").withColumn("KeyDates", monotonically_increasing_id())
facts = facts.select("KeyConsult", sdf_administration.KeyPatient, "KeyMedecin", "KeyTreatment", "KeyDates", "KeyChambre")

dim_patients = sdf_diagnostics.select("KeyPatient", "KeyConsult", "NamePatient", "FirstNamePatient", "NumSecu", "Age", "Weight", "Temperature", "Tension", "Diabete", "Pathology")

dates = sdf_administration.select("Date_In", "Date_Out")
dim_dates = dates.select("*").withColumn("KeyDates", monotonically_increasing_id()).select("KeyDates", "Date_In", "Date_Out")

dim_medecin = sdf_medecins.select("*")

dim_traitement = sdf_treatments.join(sdf_medicaments, "KeyMedicament").select("KeyTreatment", "KeyMedicament", "QuantityMedicament", "NameMedicament")

dim_chambre = sdf_chambres.select("*")



**Question 2.** Calculer l'âge moyen des patients :
- Pour la `Pathology95` durant le mois de mars 2023 : avg(Age) = ?
- Pour la `Pathology18` durant le mois de mars 2023 : avg(Age) = ?
- Pour la `Pathology76` durant le mois de juillet 2023 : avg(Age) = ?

In [None]:
tmp = dim_patients.join(facts, "KeyPatient").join(dim_dates, "KeyDates")
tmp1 = tmp.filter((col("Pathology") == "Pathology95") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp2 = tmp.filter((col("Pathology") == "Pathology18") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp3 = tmp.filter((col("Pathology") == "Pathology95") & ((month(col("Date_In")) == 7) & (year(col("Date_In")) == 2023)))

# tmp1.show()
print(tmp1.agg(avg(col("Age"))).collect()[0][0])
print(tmp2.agg(avg(col("Age"))).collect()[0][0])
print(tmp3.agg(avg(col("Age"))).collect()[0][0])

156.7948717948718
inf
inf


## Problématique :
Il semblerait qu'il y ait des erreurs au niveau de la variable `Âge`. Certaines données ont été non-répertoriées dans la base de données `diagnostic` et ont été remplacées par la valeur `Inf`. D'autres données sont anormalement elevées et apportent un biais très important.

**Objectifs :** Notre objectif sera de construire un ETL qui permette d'être robuste face à ces problèmes au niveau de la variable `Âge`. Nous allons pour cela faire un travail de pré-traitement de données par approche supervisée.
- Pour la suite, nous considèrerons qu'un âge est anormalement élevé s'il est superieur à 150 ans. En cas d'âge superieur à 150 ans, nous considèreons cette valeur comme atypique et nous devrons alors remplacer cette valeur par une valeur cohérentes.
- Pour la suite, Nous remplacerons les valeurs `Inf` par une estimation cohérente de l'âge en fonction des autres variables.

Comme nous l'avons vu en CM, cette phase de nettoyage de données doit se faire à partir des données source et juste avant le transfert dans le Data Warehouse, c'est à dire au cours de l'ETL.

## Questions :

**Question 3.** Construire un DataFrame contenant l'ensemble des observations 'non-problématiques' dans la base de données source `psdf_diagnostics`. Nous appellerons ce DataFrame  `psdf_diagnostics_clean`.

In [None]:
psdf_diagnostics_clean = psdf_diagnostics.loc[psdf_diagnostics["Age"] <= 150]
# print(len(psdf_diagnostics))
# print(psdf_diagnostics["Age"].max())
# print(len(psdf_diagnostics_clean))
# print(psdf_diagnostics_clean["Age"].max())

50000
inf
49582
100.0


**Question 4.** À partir du DataFrame `psdf_diagnostics_clean`, calibrer un modèle d'estimation de l'âge par qui permettra de remplacer les valeurs d'âge atypiques ou manquantes par la moyenne des âges issus du DataFrame `psdf_diagnostics_clean`.

In [None]:
def function_estim_age_mean(psdf_diagnostics_clean):
    estim_age_mean = psdf_diagnostics_clean["Age"].mean()
    return estim_age_mean

# function_estim_age_mean(psdf_diagnostics_clean)

31.96982776007422

**Question 5.** Construire une fonction qui permette de remplacer les valeurs d'âge atypiques ou manquantes d'un DataFrame source par la moyenne des âges calculée à partir de la fonction `function_estim_age_mean(psdf_diagnostics_clean)`. Nous appelerons cette nouvelle fonction `function_ETL_age_mean(psdf_diagnostics)`. Cette nouvelle fonction ressortira un nouveau DataFrame contenant l'ensemble des valeurs (celle cohérentes et celles incohérentes estimées et remplacées). Nous appelerons ce nouveau DataFrame `psdf_diagnostics_estim_mean`.

In [None]:
def function_ETL_age_mean(psdf_diagnostics):
    psdf_diagnostics_estim_mean = psdf_diagnostics.copy()
    psdf_diagnostics_estim_mean["Age"] = psdf_diagnostics_estim_mean["Age"].mask((psdf_diagnostics_estim_mean["Age"] > 150) | (psdf_diagnostics_estim_mean["Age"].isna()), function_estim_age_mean(psdf_diagnostics_clean))
    return psdf_diagnostics_estim_mean

psdf_diagnostics_estim_mean = function_ETL_age_mean(psdf_diagnostics)

**Question 5.bis** Reprendre les codes des questions 1 et 2 en remplaçant `psdf_diagnostics` par `psdf_diagnostics_estim_mean`.

In [None]:
sdf_diagnostics_estim_mean = psdf_diagnostics_estim_mean.to_spark(index_col='KeyConsult')

t1 = sdf_administration.join(sdf_diagnostics_estim_mean, "KeyConsult")
facts = t1.select("*").withColumn("KeyDates", monotonically_increasing_id())
facts = facts.select("KeyConsult", sdf_administration.KeyPatient, "KeyMedecin", "KeyTreatment", "KeyDates", "KeyChambre")

dim_patients = sdf_diagnostics_estim_mean.select("KeyPatient", "KeyConsult", "NamePatient", "FirstNamePatient", "NumSecu", "Age", "Weight", "Temperature", "Tension", "Diabete", "Pathology")

dates = sdf_administration.select("Date_In", "Date_Out")
dim_dates = dates.select("*").withColumn("KeyDates", monotonically_increasing_id()).select("KeyDates", "Date_In", "Date_Out")

dim_medecin = sdf_medecins.select("*")

dim_traitement = sdf_treatments.join(sdf_medicaments, "KeyMedicament").select("KeyTreatment", "KeyMedicament", "QuantityMedicament", "NameMedicament")

dim_chambre = sdf_chambres.select("*")



tmp = dim_patients.join(facts, "KeyPatient").join(dim_dates, "KeyDates")
tmp1 = tmp.filter((col("Pathology") == "Pathology95") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp2 = tmp.filter((col("Pathology") == "Pathology18") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp3 = tmp.filter((col("Pathology") == "Pathology76") & ((month(col("Date_In")) == 7) & (year(col("Date_In")) == 2023)))

# tmp1.show()
print(tmp1.agg(avg(col("Age"))).collect()[0][0])
print(tmp2.agg(avg(col("Age"))).collect()[0][0])
print(tmp3.agg(avg(col("Age"))).collect()[0][0])



40.48563219282432
20.179396555201482
69.34131125684405


**Question 6.** À partir du DataFrame `psdf_diagnostics_clean`, calibrer un modèle d'estimation de l'âge par qui permettra de remplacer les valeurs d'âge atypiques ou manquantes par apprentissage supervisé à partir du DataFrame `psdf_diagnostics_clean`. Nous pourrons pour cela utiliser les fonctions proposées par Scikit-Learn : https://scikit-learn.org/stable/

In [None]:
from sklearn.linear_model import LinearRegression

def function_estim_age_ML(psdf_diagnostics_clean):
    df = psdf_diagnostics_clean.to_pandas()
    X_train = df[["Weight", "Temperature", "Tension"]]
    Y_train = df[["Age"]]
    estim_age_ML = LinearRegression().fit(X_train, Y_train)
    return estim_age_ML

estim = function_estim_age_ML(psdf_diagnostics_clean)

**Question 7.** Construire une fonction qui permette de remplacer les valeurs d'âge atypiques ou manquantes d'un DataFrame source par la valeurs estimée des âges à partir de la fonction `function_estim_age_ML(psdf_diagnostics_clean)`. Nous appelerons cette nouvelle fonction `function_ETL_age_ML(psdf_diagnostics)`. Cette nouvelle fonction ressortira un nouveau DataFrame contenant l'ensemble des valeurs (celle cohérentes et celles incohérentes estimées puis remplacées). Nous appelerons ce nouveau DataFrame `psdf_diagnostics_estim_ML`.

In [None]:
def function_ETL_age_ML(psdf_diagnostics):
    psdf_diagnostics_estim_ML = psdf_diagnostics.copy()
    indexToPredict = psdf_diagnostics_estim_ML[psdf_diagnostics_estim_ML["Age"] > 150].index
    XToPredict = psdf_diagnostics_estim_ML.loc[indexToPredict, ["Weight", "Temperature", "Tension"]]
    YAgeEstim = function_estim_age_ML(psdf_diagnostics_estim_ML).predict(XToPredict)
    psdf_diagnostics_estim_ML.loc[indexToPredict, "Age"] = YAgeEstim
    return psdf_diagnostics_estim_ML

psdf_diagnostics_estim_ML = function_ETL_age_ML(psdf_diagnostics)

PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

**Question 8.** Reprendre les algorithmes de la Question 1 pour construire le modèle Étoile ci-dessus mais en considérant cette fois le DataFrame `psdf_diagnostics_estim_mean` puis le DataFrame `psdf_diagnostics_estim_ML`.

In [None]:
sdf_diagnostics_estim_ML = psdf_diagnostics_estim.to_spark(index_col='KeyConsult')

t1 = sdf_administration.join(sdf_diagnostics_estim_mean, "KeyConsult")
facts = t1.select("*").withColumn("KeyDates", monotonically_increasing_id())
facts = facts.select("KeyConsult", sdf_administration.KeyPatient, "KeyMedecin", "KeyTreatment", "KeyDates", "KeyChambre")

dim_patients = sdf_diagnostics_estim_mean.select("KeyPatient", "KeyConsult", "NamePatient", "FirstNamePatient", "NumSecu", "Age", "Weight", "Temperature", "Tension", "Diabete", "Pathology")

dates = sdf_administration.select("Date_In", "Date_Out")
dim_dates = dates.select("*").withColumn("KeyDates", monotonically_increasing_id()).select("KeyDates", "Date_In", "Date_Out")

dim_medecin = sdf_medecins.select("*")

dim_traitement = sdf_treatments.join(sdf_medicaments, "KeyMedicament").select("KeyTreatment", "KeyMedicament", "QuantityMedicament", "NameMedicament")

dim_chambre = sdf_chambres.select("*")



tmp = dim_patients.join(facts, "KeyPatient").join(dim_dates, "KeyDates")
tmp1 = tmp.filter((col("Pathology") == "Pathology95") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp2 = tmp.filter((col("Pathology") == "Pathology18") & ((month(col("Date_In")) == 3) & (year(col("Date_In")) == 2023)))
tmp3 = tmp.filter((col("Pathology") == "Pathology76") & ((month(col("Date_In")) == 7) & (year(col("Date_In")) == 2023)))

# tmp1.show()
print(tmp1.agg(avg(col("Age"))).collect()[0][0])
print(tmp2.agg(avg(col("Age"))).collect()[0][0])
print(tmp3.agg(avg(col("Age"))).collect()[0][0])



**Question 9.** Calculer l'âge moyen des patients qui ont eu la `Pathology95` puis la `Pathology18` durant le mois de mars 2023 en utilisant ces deux modèles Étoiles.

**Remarque.** Dans le but de pouvoir comparer les résultats pour chaque méthode, les réponses que nous aurions eu à partir des données initialement clean et réelles auraient été :
- Pour la `Pathology95` durant le mois de mars 2023 : avg(Age) = 41.205128205128204
- Pour la `Pathology18` durant le mois de mars 2023 : avg(Age) = 20.0
- Pour la `Pathology76` durant le mois de juillet 2023 : avg(Age) = 70.56756756756756