# Description

Pré-traitement des fichiers ".parquet" correspondant à des séries temporelles

Dataset de sortie :  
- une ligne par heure et par patient entre son entrée et sa sortie
- une colonne par feature
-  Nan si pas de valeur pour cette heure et ce patient
- optionnel : 2e dataset censuré à 7 jours. Pour les patients sorties avant : Comblement avec Nan jusqu'à J7

1. import des séries  
2. Retrait des valeurs antérieurs à l'admission (offset de -1h en cas de bug à l'admission)  
3. regroupement des données par heure 
    - pour les pressions artérielles : regroupement invasif et non invasif avec priorité sur l'invasif
4. Ajout de NaN sur les heures manquantes (entre 1ère heure et dernière heure)


# 1.Import

## 1.1 Librairies

In [None]:
import polars as pl
import pandas as pd
import numpy as np
import os
import json
from pathlib import Path
from functools import reduce
from tqdm import tqdm

## 1.2 Constantes

In [None]:
one_week_temporal = False

In [None]:
with open('../params.json', 'r') as file :
    params = json.load(file)

DATASET, VERSION = params['dataset'], params['version']
DATA_FOLD = params['data_folder']

In [None]:
# Constantes
CENSUS_FILE = f'{DATA_FOLD}/{VERSION}/2.clean_data/{DATASET}/static/clean_static_encounters.parquet'
INPUT_FOLDER = f'{DATA_FOLD}/{VERSION}/1.raw_data/{DATASET}/dynamic_features/'
OUTPUT_FOLDER = f'{DATA_FOLD}/{VERSION}/2.clean_data/{DATASET}/temporal/'
with open('temporal_range.json', 'r') as f:
    temporal_range = json.load(f)

In [None]:
encounters = pl.read_parquet(CENSUS_FILE)
encounters_list = encounters.select(pl.col('encounterId')).to_series().to_list()
len(encounters_list)

## 1.3 Fonctions

### 1.3.1 Nettoyage du dataset

In [None]:
test_df = pl.read_parquet(INPUT_FOLDER + 'pam_invasive.parquet')

In [None]:
def cleared_df(df, feature) :
    feature_range = feature.replace('_non_invasive', '').replace('_invasive', '')
    
    print(feature)
    if feature in ['pep', 'fio2'] : 
        lower_bound, upper_bound = temporal_range[feature_range]['range']
    else : 
        # A ajouter Loop par patient 
        set_lower_bound, set_upper_bound = temporal_range[feature_range]['range']

        df_ranged = df.filter(pl.col('valueNumber').is_between(set_lower_bound, set_upper_bound))

        mean = df_ranged['valueNumber'].mean()
        ds = df_ranged['valueNumber'].std()
        stat_lower_bound = mean - (3 * ds)
        stat_upper_bound = mean + 3*ds
        print(f'{feature} : [{stat_lower_bound}-{stat_upper_bound}]')
        
        lower_bound = max(stat_lower_bound, set_lower_bound)
        upper_bound = min(stat_upper_bound, set_upper_bound)
    print(f'{lower_bound} / {upper_bound}')
    df_cleared = (df
        .select(
                ['encounterId', 'delta_inTime_hours', 'valueNumber', 'feature']
        )
        # Retrait des données antérieurs à l'admission et des valeurs hors range
        .filter(
            pl.col('delta_inTime_hours') >= -1,
            pl.col('valueNumber').is_between(lower_bound, upper_bound)
        )
        # Troncature de l'intervalle de la données par rapport à l'entrée             
        .with_columns(
            (pl.col("delta_inTime_hours").cast(pl.Int64)).alias('intervalle')
        )
        
        .group_by(
            'encounterId', 'intervalle'
        )
        .agg(
            pl.col("valueNumber").median().alias(feature)
        )
        .sort(
            'encounterId', 'intervalle'
        )
    )
    print(df_cleared[feature].min())
    print(df_cleared[feature].max())
    return df_cleared

###  1.3.2 Traitements des valeurs de pression artérielle

In [None]:
pressures_features = ['pam', 'pas', 'pad']
non_invasive_pressures = [pressure +'_non_invasive' for pressure in pressures_features]
invasive_pressures = [pressure +'_invasive' for pressure in pressures_features]

# Fonction pour traiter les datasets de pressions
def merged_pressure(invasive_df, non_invasive_df, feature_name):
    # Lire les datasets invasifs et non invasifs

    # Fusionner les datasets en priorisant les valeurs invasives
    merged_df = (
        invasive_df
        .join(
            non_invasive_df, on=["encounterId", "intervalle"], how="full"
        )
        .with_columns(
            # Priorité aux valeurs invasives, compléter avec non-invasives si nécessaire
            pl.coalesce([pl.col(f'{feature_name}_invasive'), pl.col(f'{feature_name}_non_invasive')]).alias(feature_name)
        )
        .with_columns(
           pl.col("encounterId").fill_null(pl.col("encounterId_right")),
           pl.col("intervalle").fill_null(pl.col("intervalle_right"))
        )
        .select(
            ['encounterId', 'intervalle', feature_name]
        )

    )
    return merged_df

### 1.3.3 Comblement des intervalles manquants par Null

In [None]:
def fill_missing_intervalle(df, feature) :
    df_with_null = (
        df
        # Trouver l'intervalle maximum pour chaque patient
        .group_by("encounterId")
        
        .agg([
            pl.col("intervalle").max().alias("max_inter")
        ])

        # Étendre l'intervalle pour inclure toutes les heures entre 0 et max_hour
        .with_columns([
            pl.struct(
                ["encounterId", "max_inter"]
                ).map_elements(
                lambda row: list(range(0, row["max_inter"] + 1)),
                return_dtype=pl.List(pl.Int64)
                ).alias("all_hours")
        ])

        # Exploser les heures dans une nouvelle ligne
        .explode("all_hours")

        # Joindre avec les données existantes pour aligner les heures
        .join(
            df,
            left_on=["encounterId", "all_hours"],
            right_on=["encounterId", "intervalle"],
            how="left"
        )

        # Remplacer les valeurs manquantes par des NaN dans les colonnes de features

        .with_columns([
            pl.col(feature).fill_null(float('nan')) for feature in df.columns if feature not in ["encounterId", "intervalle"]
        ])
        
        # Renommer les colonnes pour uniformité
        .rename({"all_hours": "intervalle"})

        # Réorganiser les colonnes
        .select(["encounterId", "intervalle", feature])
    )
    return df_with_null

# Script

## Cleaning features

In [None]:
for filename in os.listdir(INPUT_FOLDER):

    if filename.endswith('.parquet'):


        feature = os.path.splitext(filename)[0]

        if feature not in (non_invasive_pressures + invasive_pressures) :
            
            raw_df = pl.read_parquet(os.path.join(INPUT_FOLDER, filename))
            cleared = cleared_df(raw_df, feature)
            with_missing_values = fill_missing_intervalle(cleared, feature)
            
            with_missing_values.write_parquet(os.path.join(OUTPUT_FOLDER, f'cleared_{filename}'))
            print(f'ok {feature}')
            
        
        elif feature in invasive_pressures :
            pressure_feature = feature.replace('_invasive', '')
            non_invasive_feature = f'{pressure_feature}_non_invasive'
            raw_invasive = pl.read_parquet(os.path.join(INPUT_FOLDER, filename))
            
            raw_non_invasive = pl.read_parquet(os.path.join(INPUT_FOLDER, filename.replace('invasive', 'non_invasive')))

            cleared_invasive = cleared_df(raw_invasive, feature)
            cleared_non_invasive = cleared_df(raw_non_invasive, non_invasive_feature)
            merged_df = merged_pressure(cleared_invasive, cleared_non_invasive, pressure_feature)
            with_missing_values = fill_missing_intervalle(merged_df, pressure_feature)

            with_missing_values.write_parquet(os.path.join(OUTPUT_FOLDER, f'cleared_{pressure_feature}.parquet'))
            print(f'ok {pressure_feature}')
            

## Merge in 1 week dataset

In [None]:
def reset_first_value(df) :
    cleaned_dfs = []
    features_col = list(set(df.columns) - set(['encounterId', 'intervalle']))
    for encounterId, group in df.group_by("encounterId"):
        # Vérifie si au moins une variable n'est pas nulle
        group = (group
                    .fill_nan(None)
                    .with_columns(
                        pl.any_horizontal(features_col).is_not_null()
                        .alias('has_data')
                        )
                )
        # Trouver l'index de la première ligne où la variable has_data est True
        first_valid_index = group.select(pl.col("has_data")).to_pandas()["has_data"].idxmax()

        # Si aucun élément n'est valide, ignorer le patient
        if first_valid_index == -1:
            continue

        # Garder les lignes à partir de la première valide
        group = group[first_valid_index:]

        # Réinitialiser l'intervalle pour commencer à zéro
        group = group.with_columns(
            (pl.col("intervalle") - pl.lit(group["intervalle"][0])).alias("intervalle")
        )
        
        # Supprimer la colonne temporaire "has_data"
        group = group.drop("has_data")


        cleaned_dfs.append(group)

    # Fusionner tous les groupes nettoyés
    cleaned_df = pl.concat(cleaned_dfs)
    return cleaned_df

In [None]:
def process_parquet_files(input_folder, static_df):
    # Liste des `encounterId` uniques du dataset statique
    encounter_ids = static_df["encounterId"].unique()

    # Création d'un dataframe de 0 à 180h 
    # 180h permet de prendre une marge de 12h supplémentaire afin de ne pas perdre de valeurs après suppression de l'offset
    intervalle_series = pl.Series("intervalle", range(0, 180))
    intervalle_df = pl.DataFrame({"intervalle": intervalle_series})  
      
    # Créer l'intervalle standardisé de 0 à 180 heures pour chaque patient
    standard_intervals = (
        pl.DataFrame({"encounterId": encounter_ids})
        .join(
            intervalle_df,
            how="cross"  # Produit cartésien
        )
    )

    # Contrôle du nombre de lignes du dataset des encounters
    print("Standard intervals shape:", standard_intervals.shape)

    # Initialiser un dataframe standardisé
    merged_df = standard_intervals

    # Parcourir chaque fichier .parquet et le fusionner avec l'intervalle standardisé
    for file in os.listdir(input_folder):
        if file.endswith(".parquet"):
            # Nom de la feature basée sur le nom du fichier
            feature_name = os.path.splitext(file)[0]

            # Charger le fichier .parquet
            df = pl.read_parquet(os.path.join(input_folder, file))

            # Joindre avec l'intervalle standardisé
            merged_df = (
                merged_df.join(
                    df,
                    on=["encounterId", "intervalle"],
                    how="left"
                )
                .rename({feature_name: feature_name})  # Renommer pour conserver le nom original
            )
    merged_df = merged_df.drop(['pep', 'fio2'])
    reset_intervalle_df = reset_first_value(merged_df)
    
    return reset_intervalle_df.filter(pl.col('intervalle') < 168)


In [None]:
print(f"Expected temporal dataset lenght : {encounters.unique('encounterId').shape[0]*168}")

In [None]:
OUTPUT_FOLDER

In [None]:
if one_week_temporal :
    temporal_week = process_parquet_files(OUTPUT_FOLDER, encounters)

6813180

In [None]:
temporal_fold = f'{DATA_FOLD}/{VERSION}/3.analysis/times_series/{DATASET}/'
temporal_week.write_parquet(temporal_fold + 'one_week.parquet')

In [None]:
temporal_week['fr'].max()

## Cleaning Report

In [None]:
missing_encounters_folder = os.path.join(INPUT_FOLDER, 'missing_encounters')

In [None]:

for filename in os.listdir(OUTPUT_FOLDER):
    print(f'--------{filename}----------')
    if filename.endswith(".parquet"):
        # Charger le dataset .parquet
        feature_data = pl.read_parquet(os.path.join(OUTPUT_FOLDER, filename))
        
        # Récupérer les encounterId du dataset .parquet
        feature_encounters = feature_data.select("encounterId").unique()
        
        # Trouver les lignes du dataset encounters n'apparaissant pas dans la feature
        missing_encounters = encounters.join(feature_encounters, on="encounterId", how="anti")
        
        # Afficher le résultat
        print(f"Encounters manquants: {missing_encounters.shape[0]}/{encounters.unique('encounterId').shape[0]}")
        missing_filename = os.path.join(missing_encounters_folder, filename.replace('cleared', 'missing'))
        #missing_encounters.write_parquet(missing_filename)

# Overall Tabular

## Define features

In [None]:
INPUT_DIR = Path(INPUT_FOLDER)
PARTITION_DIR = Path(OUTPUT_FOLDER + "./tmp_by_patient")
PARTITION_DIR.mkdir(exist_ok=True)
OUTPUT_FILE = "tableau_fusionne.parquet"
SAMPLE_CSV = "echantillon.csv"
LOG_FILE = "log_erreurs.txt"
PIVOT_DIR = Path(OUTPUT_FOLDER + "./tmp_feature_pivot")
PIVOT_DIR.mkdir(exist_ok=True)

In [None]:
ALL_FEATURES = [
    "nad_dose_poids", "tracheo", "ecmo_type", "poids_suivi", "pad_invasive", "endotracheal_tube",
    "plq", "pplat", "spo2", "temp", "pam_invasive", "fio2", "urine_output", "tidal_volume",
    "o2_flow", "pep", "fr", "admin_o2", "pad_non_invasive", "installation", "pas_invasive",
    "mode_vent", "pas_non_invasive", "iv_input", "db_sang_cvvhf", "heart_rate", "pfc",
    "pam_non_invasive", "cgr", "db_sang_hdi"
]

In [None]:
STR_FEATURES = [
    "tracheo", "ecmo_type","endotracheal_tube",
     "admin_o2", "installation", "mode_vent"
]

NUM_FEATURES = [
    "nad_dose_poids", "poids_suivi", "pad_invasive", 
    "pplat", "spo2", "temp", "pam_invasive","o2_flow", "fio2", "tidal_volume",
   "pep", "fr", "pad_non_invasive", "pas_invasive",
    "pas_non_invasive", "heart_rate",
    "pam_non_invasive", "db_sang_cvvhf", "db_sang_hdi"
]
SUM_FEATURES = [
   "urine_output", "iv_input", "plq", "pfc","cgr"
]

In [None]:
pl.read_parquet(INPUT_FOLDER + 'mode_vent.parquet')['valueString'].value_counts().sort('count', descending=True).filter(pl.col('count') > 100)['valueString'].to_list()

## Prepare datasets

In [None]:
trach = pl.read_parquet(INPUT_FOLDER + 'tracheo.parquet')

trach = (trach
        .with_columns(
            pl.any_horizontal([
                pl.col('valueString').is_not_null(),
                pl.col('utcValueDateTime').is_not_null(),
                pl.col('valueNumber').is_not_null(),
            ]).alias('has_trach')
        )
        .with_columns([
            pl.col('has_trach').cast(pl.String).alias('valueString'),
            pl.lit(None, dtype=pl.Datetime).alias('utcValueDateTime'),
            pl.lit(None, dtype=pl.Float64).alias('valueNumber'),
    ])
    # Si vous ne souhaitez plus garder la colonne d'origine
    .drop('has_trach')
    .filter(pl.col('valueString').is_not_null())

)    

In [None]:
iot = pl.read_parquet(INPUT_FOLDER + 'endotracheal_tube.parquet')
print(iot.columns)
iot = (iot
        .with_columns(
            pl.any_horizontal([
                pl.col('valueString').is_not_null(),
                pl.col('utcValueDateTime').is_not_null(),
                pl.col('valueNumber').is_not_null(),
            ]).alias('has_ett')
        )
        .with_columns([
            pl.col('has_ett').cast(pl.String).alias('valueString'),
            pl.lit(None, dtype=pl.Datetime).alias('utcValueDateTime'),
            pl.lit(None, dtype=pl.Float64).alias('valueNumber'),
    ])
    # Si vous ne souhaitez plus garder la colonne d'origine
    .drop('has_ett')
    .filter(pl.col('valueString').is_not_null())

)    

In [None]:
install = pl.read_parquet(INPUT_FOLDER + 'installation.parquet') 

install = install.with_columns(
    pl.when(
        pl.col('valueString').str.contains('Ventral', literal=True)
        ).then(pl.lit('Ventral'))
        .otherwise(pl.lit(None, dtype=pl.String))
        .alias('valueString'),
    pl.lit(None, dtype=pl.Float64).alias('valueNumber')
).filter(pl.col('valueString').is_not_null())

install['valueString'].value_counts()

In [None]:
cgr = pl.read_parquet(INPUT_FOLDER + 'cgr.parquet') 

cgr = cgr.with_columns(
    pl.when(pl.col('valueString').str.contains('CGR', literal=True))
    .then(True).otherwise(pl.lit(None)).cast(pl.Float64).alias('valueNumber')
    ,pl.lit(None, dtype=pl.Float64).alias('valueString')
    ).filter(pl.col('valueNumber').is_not_null())

plq = pl.read_parquet(INPUT_FOLDER + 'plq.parquet') 

plq = plq.with_columns(
    pl.when(pl.col('valueString').str.contains('PLAQUETTE', literal=True))
    .then(True).otherwise(pl.lit(None)).cast(pl.Float64).alias('valueNumber')
    ,pl.lit(None, dtype=pl.Float64).alias('valueString')
    ).filter(pl.col('valueNumber').is_not_null())

pfc = pl.read_parquet(INPUT_FOLDER + 'pfc.parquet') 

pfc = pfc.with_columns(
    pl.when(pl.col('valueString').str.contains('PFC', literal=True))
    .then(True).otherwise(pl.lit(None)).cast(pl.Float64).alias('valueNumber')
    ,pl.lit(None, dtype=pl.Float64).alias('valueString')
    ).filter(pl.col('valueNumber').is_not_null())

## Load Datasets

In [None]:
feature_files = sorted(INPUT_DIR.glob("*.parquet"))
col_order = ['encounterId', 'ptCensusId', 'feature', 'valueString', 'valueNumber', 'utcValueDateTime', 'utcChartTime', 'utcInTime', 'delta_inTime_hours']

feature_dfs = {}
log_lines = []

for file in tqdm(feature_files, desc="Préchargement des features"):
    try:
        if str(file).endswith("endotracheal_tube.parquet"):
            df = iot.filter(pl.col("encounterId").is_in(encounters_list))
        elif str(file).endswith("tracheo.parquet"):
            df = trach.filter(pl.col("encounterId").is_in(encounters_list))
        elif str(file).endswith("installation.parquet"):
            df = install.filter(pl.col("encounterId").is_in(encounters_list))
        elif str(file).endswith("cgr.parquet"):
            df = cgr.filter(pl.col("encounterId").is_in(encounters_list))
        elif str(file).endswith("pfc.parquet"):
            df = pfc.filter(pl.col("encounterId").is_in(encounters_list))
        elif str(file).endswith("plq.parquet"):
            df = plq.filter(pl.col("encounterId").is_in(encounters_list))
        else :
            df = pl.read_parquet(file)
            df = df.filter(pl.col("encounterId").is_in(encounters_list))
        if str(file).endswith("urine_output.parquet") or str(file).endswith("iv_input.parquet"):
            df = df.with_columns([
                pl.lit(None, dtype=pl.Utf8).alias("valueString"),
                pl.lit(None, dtype=pl.Datetime).alias("utcValueDateTime")
            ]).select(col_order).filter(pl.col("encounterId").is_in(encounters_list))
        
        feature_dfs[file.name] = df.drop('utcValueDateTime', 'ptCensusId', 'utcInTime', 'delta_inTime_hours')
    except Exception as e:
        log_lines.append(f"Erreur dans le fichier {file.name}: {str(e)}")

## Pivot Datasets

In [None]:


pivoted_df = {}
for feature_name, df in feature_dfs.items():
    feat = feature_name.replace('.parquet', '')
    if feat in STR_FEATURES:
        print(f'str : {feat}')
        df_pivot = (df
                .select(
                        ['encounterId', 'utcChartTime', 'feature', 'valueString']
                    )
                .sort(
                        'encounterId', 'utcChartTime'
                    )
                .pivot(
                        index=['encounterId', 'utcChartTime'],
                        on='feature',
                        values='valueString',
                        aggregate_function='first'
                    )
                .cast(
                        {'encounterId': pl.Int32}
                )
        )
    elif feat in NUM_FEATURES:
        print(f'num : {feat}')
        df_pivot = (df
                .select(
                        ['encounterId', 'utcChartTime', 'feature', 'valueNumber']
                    )
                .sort(
                        'encounterId', 'utcChartTime'
                    )
                .pivot(
                        index=['encounterId', 'utcChartTime'],
                        on='feature',
                        values='valueNumber',
                        aggregate_function='median'
                    )
                .cast(
                        {'encounterId': pl.Int32}
                )
            )
    elif feat in SUM_FEATURES:
        print(f'sum : {feat}')
        df_pivot = (df
                .select(
                        ['encounterId', 'utcChartTime', 'feature', 'valueNumber']
                    )
                .sort(
                        'encounterId', 'utcChartTime'
                    )
                .pivot(
                        index=['encounterId', 'utcChartTime'],
                        on='feature',
                        values='valueNumber',
                        aggregate_function='sum'
                    )
                .cast(
                        {'encounterId': pl.Int32}
                )
            )
    df_pivot.write_parquet(PIVOT_DIR / f'{feat}.parquet')
    pivoted_df[feat] = df_pivot

## Join datasets

In [None]:
# join des features pivotées sur l'encounterId et l'utcChartTime

# Sauvegarde des fichiers pris en compte si bug pendant la procédure
done_files = []

joined_df = pl.read_parquet(PIVOT_DIR / 'heart_rate.parquet')


for file in tqdm(PIVOT_DIR.glob("*.parquet"), desc="Scan des features"):
    print(f'Début : {str(file).split('/')[-1]}')
    df = pl.read_parquet(file)
    if not str(file).endswith('heart_rate.parquet') and file not in done_files:
        joined_df = joined_df.join(
            df,
            on=['encounterId', 'utcChartTime'],
            how='full',
            coalesce=True
        )
        print('jointure ok')
    joined_df.write_parquet(OUTPUT_FILE)
    done_files.append(file)
    print(f'fichier terminé')

joined_df = joined_df.sort('encounterId', 'utcChartTime')

    

In [None]:
joined_df.write_parquet(OUTPUT_FOLDER + OUTPUT_FILE)

## Group by 1 hour dataset

In [None]:
min_chart = joined_df.select('encounterId', 'utcChartTime').group_by('encounterId').agg(pl.col('utcChartTime').min().name.suffix('_min'))

joined_delta = (
    joined_df
        .join(min_chart, on ='encounterId', how='left')
        .with_columns(
            ((pl.col("utcChartTime") - pl.col("utcChartTime_min")).dt.total_minutes() / 60).cast(pl.Int32).alias("delta_hour")
        )
        .group_by(['encounterId', 'delta_hour'])
        .agg(
            [
                pl.first(STR_FEATURES),
                pl.median(NUM_FEATURES),
                pl.sum(SUM_FEATURES)
            ]
        )
        .select(['encounterId', 'delta_hour'] + STR_FEATURES + NUM_FEATURES + SUM_FEATURES)
        .sort('encounterId', 'delta_hour')

)



In [None]:
joined_delta.filter(pl.col())

In [None]:
joined_delta.sort('encounterId', 'delta_hour').write_parquet(OUTPUT_FOLDER + 'all_features_with_delta.parquet')

## Bonus : visualisation

In [None]:
df = pl.read_parquet(OUTPUT_FOLDER + 'all_features_with_delta.parquet')
df_encounters = df.filter(pl.col('delta_hour') > 24 )['encounterId'].unique().to_list()

In [None]:
import random

rand_encounter = random.choice(df_encounters)
df_encounter = df.filter(pl.col('encounterId') == rand_encounter)

In [None]:
df_encounter.with_columns(
            pl.when(pl.col(SUM_FEATURES) > 0).then(pl.col(SUM_FEATURES)).otherwise(pl.lit(None, dtype=pl.Float64))
        ).filter(~pl.all_horizontal(pl.col(ALL_FEATURES).is_null()))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns


variables = NUM_FEATURES  # Remplacez par les colonnes de df
df_encounter = df_encounter.select(["delta_hour"] + variables).to_pandas()

# Créer une figure avec plusieurs sous-graphiques (un par variable)
fig, axes = plt.subplots(len(variables), 1, figsize=(10, 5 * len(variables)), sharex=True)

for i, var in enumerate(variables):
    if df_encounter[var].sum() > 0 :
        sns.lineplot(data=df_encounter, x="delta_hour", y=var, ax=axes[i])
        axes[i].set_title(f"Évolution de {var} en fonction de delta_hour")
        axes[i].set_ylabel(var)

# Ajouter un label commun pour l'axe des x
plt.xlabel("delta_hour")
plt.tight_layout()
plt.show()