# eICU Preprocess (standardized)

## 0) Setup

### 0.1 Libraries

In [None]:
import os
import gc
import json
from pathlib import Path
import duckdb
import pandas as pd
import numpy as np
from tqdm import tqdm
import time
from collections import Counter
import tables

### 0.2 Paths & environment

In [None]:

# Paths
BASE_PATH = '/scratch/leticia.ribeiro/dataset/eICU 2.0/'
OUTPUT_DIR = 'outputs/eicu'

patient_path   = Path(BASE_PATH) / 'patient.csv'
vital_path     = Path(BASE_PATH) / 'vitalPeriodic.csv'
lab_path       = Path(BASE_PATH) / 'lab.csv'
customlab_path = Path(BASE_PATH) / 'customLab.csv'
inf_path       = Path(BASE_PATH) / 'infusionDrug.csv'

out = Path(OUTPUT_DIR) / 'preprocess' / 'raw_parquet'
out.mkdir(parents=True, exist_ok=True)

parquet_vital     = out / 'parquet_vital'
parquet_lab       = out / 'parquet_lab'
parquet_customlab = out / 'parquet_customlab'
parquet_inf       = out / 'parquet_inf'
parquet_patient   = out / 'parquet_patient'

print('BASE_PATH:', BASE_PATH)
print('OUTPUT_DIR:', out.resolve())

### 0.3 Defaults / mappings

In [None]:

# Features e defaults
vital_features = [
    'sao2','heartrate','respiration',
    'systemicsystolic','systemicdiastolic','systemicmean'
]

# Labs de interesse (ajuste se necessario)
PARAMETROS_IMPUTACAO = {
    'albumin': {'default': 4.4},
    'creatinine': {'default': 1.0},
    'BUN': {'default': 13.0},
    'sodium': {'default': 140.0},
    'potassium': {'default': 4.5},
    'bicarbonate': {'default': 26.0},
    'calcium': {'default': 9.4},
    'magnesium': {'default': 1.95},
    'phosphate': {'default': 3.65},
    'glucose': {'default': 90.0},
    'lactate': {'default': 1.0},
    'total protein': {'default': 7.3},
    'Hgb': {'default': 14.0},
    'Hct': {'default': 43.0},
    'platelets x 1000': {'default': 250.0},
    'MPV': {'default': 9.5},
    'AST (SGOT)': {'default': 20.0},
    'ALT (SGPT)': {'default': 20.0},
    'total bilirubin': {'default': 0.6},
    'direct bilirubin': {'default': 0.2},
    'PT': {'default': 12.0},
    'PTT': {'default': 32.0},
    'fibrinogen': {'default': 300.0},
    'CPK': {'default': 100.0},
    'CPK-MB': {'default': 2.5},
    'CPK-MB INDEX': {'default': 2.0},
    'Methemoglobin': {'default': 1.0},
    'ionized calcium': {'default': 5.0},
    'urinary creatinine': {'default': 100.0},
    'Ferritin': {'default': 100.0},
    'Carboxyhemoglobin': {'default': 1.0},
    'troponin': {'default': 0.01},
}

lab_features = list(PARAMETROS_IMPUTACAO.keys())
lab_features_lower = [l.lower() for l in lab_features]

NORMAL_VALUES = {
    'sao2': 97.0,
    'heartrate': 85.0,
    'respiration': 18.0,
    'systemicsystolic': 120.0,
    'systemicdiastolic': 70.0,
    'systemicmean': 85.0,
}

impute_defaults_vital = {v: NORMAL_VALUES.get(v, np.nan) for v in vital_features}
impute_defaults_lab = {k.lower(): v['default'] for k, v in PARAMETROS_IMPUTACAO.items()}

print('Vitals:', vital_features)
print('Labs:', lab_features_lower)

In [None]:
# Garantir que a lista de labs desejada esteja presente em `PARAMETROS_IMPUTACAO` e atualizar `lab_features`
requested_labs = [
    'heartrate',
    'systemicsystolic',
    'systemicdiastolic',
    'systemicmean',
    'respiration',
    'sao2',
    'albumin',
    'creatinine',
    'BUN',
    'sodium',
    'potassium',
    'bicarbonate',
    'calcium',
    'magnesium',
    'phosphate',
    'glucose',
    'lactate',
    'total protein',
    'Hgb',
    'Hct',
    'platelets x 1000',
    'MPV',
    'AST (SGOT)',
    'ALT (SGPT)',
    'total bilirubin',
    'direct bilirubin',
    'PT',
    'PTT',
    'fibrinogen',
    'CPK',
    'CPK-MB',
    'CPK-MB INDEX',
    'Methemoglobin',
    'ionized calcium',
    'urinary creatinine',
    'Ferritin',
    'Carboxyhemoglobin',
    'troponin',
]

# separar vitais (jÃ¡ definidos em `vital_features`) e manter apenas labs
requested_lab_only = [x for x in requested_labs if x not in vital_features]

# adicionar entradas faltantes em PARAMETROS_IMPUTACAO com default NaN
for lab in requested_lab_only:
    if lab not in PARAMETROS_IMPUTACAO:
        PARAMETROS_IMPUTACAO[lab] = {'default': np.nan}

lab_features = list(PARAMETROS_IMPUTACAO.keys())
lab_features_lower = [l.lower() for l in lab_features]
impute_defaults_lab = {k.lower(): v.get('default', np.nan) for k, v in PARAMETROS_IMPUTACAO.items()}

print('Updated lab_features length:', len(lab_features))
print('Sample labs added (if any):', [lab for lab in requested_lab_only if PARAMETROS_IMPUTACAO.get(lab)])

## 1) Preprocess pipeline

# Preprocess eICU (DuckDB)

Este notebook gera os parquets base, cria batches de features e gera labels de falencia.
Execute as celulas em ordem.

## 0) Setup


### 0.1 Libraries


In [None]:

# Core
import os
import gc
import json
from pathlib import Path

import duckdb
import pandas as pd
import numpy as np
from tqdm import tqdm
import time

### 0.2 Paths


### 0.3 Defaults / lab mapping


In [None]:

# Utils para labs
LAB_ALIASES = {
    'albumin': ['albumin', 'pre-albumin', 'prealbumin'],
    'alt (sgpt)': ['alt (sgpt)', 'alanine aminotransferase', 'alt'],
    'ast (sgot)': ['ast (sgot)', 'aspartate aminotransferase', 'ast'],
    'bicarbonate': ['bicarbonate', 'co2', 'total co2', 'tco2', 'hco3'],
    'bun': ['bun', 'urea nitrogen', 'blood urea nitrogen'],
    'calcium': ['calcium', 'total calcium'],
    'carboxyhemoglobin': ['carboxyhemoglobin', 'carboxy hb', 'cohb'],
    'cpk': ['cpk', 'creatine kinase', 'ck'],
    'cpk-mb': ['cpk-mb', 'ck-mb', 'cpk mb', 'ck mb'],
    'cpk-mb index': ['cpk-mb index', 'ck-mb index', 'cpk mb index', 'ck mb index'],
    'creatinine': ['creatinine', 'creatinine w gfr', 'creatinine w est gfr', 'creatinine w/ est gfr', 'creatinine w/ gfr'],
    'direct bilirubin': ['direct bilirubin'],
    'ferritin': ['ferritin'],
    'fibrinogen': ['fibrinogen'],
    'glucose': ['glucose', 'blood glucose', 'bedside glucose'],
    'hct': ['hct', 'hematocrit'],
    'hgb': ['hgb', 'hemoglobin'],
    'ionized calcium': ['ionized calcium', 'free calcium', 'ica', 'i-ca'],
    'lactate': ['lactate', 'lactic acid'],
    'magnesium': ['magnesium'],
    'methemoglobin': ['methemoglobin', 'met-hb', 'met hb'],
    'mpv': ['mpv', 'mean platelet volume'],
    'phosphate': ['phosphate', 'phosphorus'],
    'alkaline phosphatase': ['alkaline phosphatase', 'alkaline phos', 'alkaline phos.', 'alk phos', 'alk phos.'],
    'platelets x 1000': ['platelets x 1000', 'platelets', 'platelet'],
    'potassium': ['potassium', 'k'],
    'pt': ['pt', 'prothrombin time'],
    'ptt': ['ptt', 'ptt ratio', 'partial thromboplastin time', 'aptt'],
    'sodium': ['sodium', 'na'],
    'total bilirubin': ['total bilirubin', 'bilirubin'],
    'total protein': ['total protein'],
    'urinary creatinine': ['urinary creatinine', 'urine creatinine'],
    'troponin': ['troponin', 'troponin i', 'troponin - i', 'trop i', 'troponin t', 'troponin - t', 'trop t'],
}

# Cria set de lookup rapido (direct mapping de nome -> lab canonical)
alias_lookup = {}
for lab_canonical, variants in LAB_ALIASES.items():
    for variant in variants:
        alias_lookup[variant] = lab_canonical

def normalize_labname(x):
    if pd.isna(x):
        return None
    s = str(x).strip().lower()
    if s == 'phos':
        return 'phosphate'
    if 'alk' in s and 'phos' in s:
        return 'alkaline phosphatase'
    if s in alias_lookup:
        return alias_lookup[s]
    # tentativa por substring (para nomes parciais)
    for lab_canonical, variants in LAB_ALIASES.items():
        if any(v in s for v in variants):
            return lab_canonical
    return None

In [None]:
# Export CSV -> Parquet (DuckDB). Re-run safe.
con = duckdb.connect()

# Validate source files exist
for _p in [patient_path, vital_path, lab_path, customlab_path, inf_path]:
    if not Path(_p).exists():
        raise FileNotFoundError(f"Missing source file: {_p}")


def export_if_missing(path_parquet, sql):
    if Path(path_parquet).exists():
        print('OK (skip):', path_parquet)
        return
    print('Exportando:', path_parquet)
    con.execute(f"COPY ({sql}) TO '{path_parquet}' (FORMAT PARQUET);")
    print('Done:', path_parquet)

# vitalPeriodic
export_if_missing(
    parquet_vital,
    f"""
    SELECT patientunitstayid, observationoffset, temperature, sao2, heartrate,
           respiration, systemicsystolic, systemicdiastolic, systemicmean
    FROM read_csv_auto('{vital_path}')
    """
)

# lab.csv
export_if_missing(
    parquet_lab,
    f"""
    SELECT patientunitstayid, labresultoffset, labname, labresult
    FROM read_csv_auto(
        '{lab_path}',
        sample_size=10000,
        ignore_errors=true,
        quote='"',
        escape='"'
    )
    """
)

# customLab.csv
export_if_missing(
    parquet_customlab,
    f"""
    SELECT patientunitstayid,
           labotheroffset AS labresultoffset,
           labothername   AS labname,
           labotherresult AS labresult
    FROM read_csv_auto(
        '{customlab_path}',
        sample_size=10000,
        ignore_errors=true,
        quote='"',
        escape='"'
    )
    """
)

# infusionDrug.csv
export_if_missing(
    parquet_inf,
    f"""
    SELECT patientunitstayid, infusionoffset, drugname
    FROM read_csv_auto('{inf_path}')
    """
)

# patient.csv
export_if_missing(
    parquet_patient,
    f"""
    SELECT patientunitstayid, age, unitdischargeoffset, unitdischargestatus, hospitaldischargestatus
    FROM read_csv_auto('{patient_path}')
    """
)

In [None]:

# Carregar IDs validos (UTI, >= 1h, idade >= 18)
patient = con.execute(
    f"SELECT patientunitstayid, age, unitdischargeoffset, unitdischargestatus, hospitaldischargestatus FROM read_parquet('{parquet_patient.as_posix()}')"
).fetch_df()

# idade pode vir como string (ex: '> 89')
patient['age'] = patient['age'].astype(str).str.replace('> 89', '90')
patient['age'] = pd.to_numeric(patient['age'], errors='coerce')
patient['unitdischargeoffset'] = pd.to_numeric(patient['unitdischargeoffset'], errors='coerce')

adult = patient['age'] >= 18
one_hour = patient['unitdischargeoffset'] >= 60

valid = patient.loc[adult & one_hour].copy()
patient_ids = valid['patientunitstayid'].dropna().astype(int).unique().tolist()

print('Total stays (all):', patient['patientunitstayid'].nunique())
print('Total stays >=1h and age>=18:', len(patient_ids))

In [None]:

# Time grid e imputacao

def generate_timegrid_eicu(df_vitals, df_labs, step_min=5):
    offsets_min = []
    offsets_max = []

    if not df_vitals.empty:
        offsets_min.append(df_vitals['observationoffset'].min())
        offsets_max.append(df_vitals['observationoffset'].max())

    if not df_labs.empty:
        offsets_min.append(df_labs['labresultoffset'].min())
        offsets_max.append(df_labs['labresultoffset'].max())

    if not offsets_min:
        return np.array([], dtype=int)

    t0_raw = min(offsets_min)
    tmax_raw = max(offsets_max)

    t0 = int(np.floor(t0_raw / step_min) * step_min)
    tmax = int(np.ceil(tmax_raw / step_min) * step_min)

    return np.arange(t0, tmax + 1, step_min, dtype=int)


def impute_forward_fill_eicu(obs_offsets, obs_values, time_grid, global_fill=np.nan):
    n = len(time_grid)
    output = np.full(n, global_fill, dtype=np.float32)

    last_val = np.nan
    i_obs = 0

    for i_pred, t_pred in enumerate(time_grid):
        while i_obs < len(obs_offsets) and obs_offsets[i_obs] <= t_pred:
            if pd.notna(obs_values[i_obs]):
                last_val = obs_values[i_obs]
            i_obs += 1

        output[i_pred] = last_val if not np.isnan(last_val) else global_fill

    return output


def impute_batch_eicu(df, variables, impute_defaults, time_grid, offset_col, stay_id_if_empty=None):
    if df.empty:
        if stay_id_if_empty is None:
            return pd.DataFrame()

        res = {
            'patientunitstayid': [stay_id_if_empty] * len(time_grid),
            'offset_min': time_grid
        }

        for var in variables:
            res[var] = np.full(len(time_grid), impute_defaults.get(var, np.nan), dtype=np.float32)
            res[f'{var}_imputed'] = np.ones(len(time_grid), dtype=int)

        return pd.DataFrame(res)

    results = []

    for stay_id, group in df.groupby('patientunitstayid'):
        group = group.sort_values(offset_col).reset_index(drop=True)

        res = {
            'patientunitstayid': [stay_id] * len(time_grid),
            'offset_min': time_grid
        }

        for var in variables:
            if var not in group.columns or group[var].dropna().empty:
                res[var] = np.full(len(time_grid), impute_defaults.get(var, np.nan), dtype=np.float32)
                res[f'{var}_imputed'] = np.ones(len(time_grid), dtype=int)
                continue

            obs_values = pd.to_numeric(group[var], errors='coerce').astype(float).to_numpy()
            obs_offsets = pd.to_numeric(group[offset_col], errors='coerce').to_numpy()

            pred_vals = impute_forward_fill_eicu(obs_offsets, obs_values, time_grid, impute_defaults.get(var, np.nan))
            res[var] = pred_vals

            mask = np.zeros(len(time_grid), dtype=int)
            valid = ~pd.isna(obs_values)
            obs_offsets_valid = obs_offsets[valid]
            idxs = np.searchsorted(time_grid, obs_offsets_valid, side="left")
            idxs = idxs[(idxs >= 0) & (idxs < len(mask))]
            mask[idxs] = 1
            res[f'{var}_imputed'] = 1 - mask

        results.append(pd.DataFrame(res))

    return pd.concat(results, ignore_index=True)

In [None]:
# Batch processing (DuckDB) + vasopressor_active + falencia + mortality + engineered features (separadas)

VASO_DRUGS = [
    'dopamine','epinephrine','adrenalin','norepinephrine','norepi','levophed',
    'phenylephrine','neosynephrine','vasopressin','dobutamine','milrinone',
    'isoproterenol','isuprel'
]

vaso_regex = '|'.join(VASO_DRUGS)

# Mortality map (ICU OR hospital discharge status)
# 1 = Expired, 0 = Alive
mortality_map = (
    patient.assign(
        mortality=(
            (patient['unitdischargestatus'].astype(str).str.lower() == 'expired') |
            (patient['hospitaldischargestatus'].astype(str).str.lower() == 'expired')
        ).astype(int)
    )
    .set_index('patientunitstayid')['mortality']
    .to_dict()
)


def build_vaso_start(batch_ids):
    ids_csv = ','.join(str(i) for i in batch_ids)
    inf = con.execute(
        f"SELECT patientunitstayid, infusionoffset, lower(drugname) AS drugname "
        f"FROM read_parquet('{parquet_inf.as_posix()}') "
        f"WHERE patientunitstayid IN ({ids_csv})"
    ).fetch_df()
    if inf.empty:
        return {}
    inf = inf[inf['drugname'].str.contains(vaso_regex, regex=True, na=False)].copy()
    inf['infusionoffset'] = pd.to_numeric(inf['infusionoffset'], errors='coerce')
    inf = inf.dropna(subset=['patientunitstayid','infusionoffset'])
    if inf.empty:
        return {}
    return inf.groupby('patientunitstayid')['infusionoffset'].min().to_dict()


def engineered_features(df, var, imputed_col):
    series = df[var].astype(float)
    obs_mask = 1 - df[imputed_col].astype(int)
    n_meas = int(obs_mask.sum())
    min_v = float(series.min())
    max_v = float(series.max())
    mean_v = float(series.mean())
    instab = float(series.diff().abs().sum())
    intens = float(series.abs().mean())
    cumul = float(series.sum())
    return n_meas, min_v, max_v, mean_v, instab, intens, cumul


def process_eicu_batch(batch_ids, step_min=5):
    ids_csv = ','.join(str(i) for i in batch_ids)

    vit = con.execute(
        f"SELECT patientunitstayid, observationoffset, {', '.join(vital_features)} "
        f"FROM read_parquet('{parquet_vital.as_posix()}') "
        f"WHERE patientunitstayid IN ({ids_csv})"
    ).fetch_df()

    lab_raw = con.execute(
        f"SELECT patientunitstayid, labresultoffset, lower(trim(labname)) AS labname, labresult "
        f"FROM read_parquet('{parquet_lab.as_posix()}') "
        f"WHERE patientunitstayid IN ({ids_csv})"
    ).fetch_df()

    custom_raw = con.execute(
        f"SELECT patientunitstayid, labresultoffset, lower(trim(labname)) AS labname, labresult "
        f"FROM read_parquet('{parquet_customlab.as_posix()}') "
        f"WHERE patientunitstayid IN ({ids_csv})"
    ).fetch_df()

    lab = pd.concat([lab_raw, custom_raw], ignore_index=True)
    if not lab.empty:
        lab['labname'] = lab['labname'].map(normalize_labname)
        lab = lab[lab['labname'].isin(lab_features_lower)]
        # Pivot lab data from long format (labname, labresult) to wide format (one column per lab)
        if not lab.empty:
            lab = lab.pivot_table(
                index=['patientunitstayid', 'labresultoffset'],
                columns='labname',
                values='labresult',
                aggfunc='first'
            ).reset_index()
            lab.columns.name = None

    vaso_start = build_vaso_start(batch_ids)

    merged_data = []
    engineered_rows = []
    engineered_report = []
    for sid in batch_ids:
        df_v = vit[vit['patientunitstayid'] == sid]
        df_l = lab[lab['patientunitstayid'] == sid]
        if df_v.empty and df_l.empty:
            continue

        time_grid = generate_timegrid_eicu(df_v, df_l, step_min)
        if len(time_grid) == 0:
            continue

        df_v_filled = impute_batch_eicu(
            df=df_v,
            variables=vital_features,
            impute_defaults=impute_defaults_vital,
            time_grid=time_grid,
            offset_col='observationoffset',
            stay_id_if_empty=sid
        )

        df_l_filled = impute_batch_eicu(
            df=df_l,
            variables=lab_features_lower,
            impute_defaults=impute_defaults_lab,
            time_grid=time_grid,
            offset_col='labresultoffset',
            stay_id_if_empty=sid
        )

        df_merged = pd.merge(df_v_filled, df_l_filled, on=['patientunitstayid','offset_min'], how='outer')

        # vasopressor_active (from first infusion onwards)
        start = vaso_start.get(sid, None)
        if start is None:
            df_merged['vasopressor_active'] = 0
        else:
            df_merged['vasopressor_active'] = (df_merged['offset_min'] >= start).astype(int)

        # mortality (ICU discharge)
        df_merged['mortality'] = int(mortality_map.get(sid, 0))

        # falencia pointwise and windowed (2/3 in 60min)
        cond = (df_merged['systemicmean'] < 65)
        if 'lactate' in df_merged.columns:
            cond = cond | ((df_merged['vasopressor_active'] == 1) & (df_merged['lactate'] >= 2))
        df_merged['falencia_point'] = cond.astype(int)

        window = int(60/step_min)
        threshold = int(np.ceil(window * (2/3)))
        df_merged = df_merged.sort_values('offset_min')
        df_merged['falencia'] = (
            df_merged['falencia_point']
            .rolling(window=window, min_periods=window)
            .sum()
            .ge(threshold)
            .astype(int)
            .fillna(0)
        )

        # engineered features - SEPARADAS (1 linha por paciente)
        included_vars = []
        excluded_vars = []
        row = {'patientunitstayid': int(sid)}
        for var in vital_features + lab_features_lower:
            imp_col = f"{var}_imputed"
            if imp_col in df_merged.columns:
                included_vars.append(var)
                n_meas, min_v, max_v, mean_v, instab, intens, cumul = engineered_features(df_merged, var, imp_col)
                row[f"n_meas_{var}"] = n_meas
                row[f"min_{var}"] = min_v
                row[f"max_{var}"] = max_v
                row[f"mean_{var}"] = mean_v
                row[f"{var}_instab"] = instab
                row[f"{var}_intens"] = intens
                row[f"{var}_cumul"] = cumul
            else:
                excluded_vars.append(var)

        row['mortality'] = int(mortality_map.get(sid, 0))
        engineered_rows.append(row)

        # store per-patient engineering report
        engineered_report.append({
            'patientunitstayid': int(sid),
            'included': included_vars,
            'excluded': excluded_vars,
            'n_rows': int(len(df_merged))
        })

        merged_data.append(df_merged)

    if not merged_data:
        return pd.DataFrame(), pd.DataFrame(), engineered_report

    result = pd.concat(merged_data, ignore_index=True)

    # Round numeric columns to 3 decimal places - vectorized
    numeric_cols = result.select_dtypes(include=[np.number]).columns
    result[numeric_cols] = result[numeric_cols].round(3)

    features_df = pd.DataFrame(engineered_rows)
    if not features_df.empty:
        num_cols = features_df.select_dtypes(include=[np.number]).columns
        features_df[num_cols] = features_df[num_cols].round(3)

    return result, features_df, engineered_report

In [None]:
# Gerar batches de features
FEATURE_DIR = Path(OUTPUT_DIR) / 'preprocess/batches/features_filtered'
FEATURE_DIR.mkdir(parents=True, exist_ok=True)

FEATURES_ONLY_DIR = Path(OUTPUT_DIR) / 'preprocess/engineered_features'
FEATURES_ONLY_DIR.mkdir(parents=True, exist_ok=True)

batch_size = 1000
print('Total stays:', len(patient_ids))

start_time = time.time()
pbar = tqdm(range(0, len(patient_ids), batch_size), desc='Batches', unit='batch')

for bidx, i in enumerate(pbar):
    batch_ids = patient_ids[i:i+batch_size]

    out_path = FEATURE_DIR / f'batch_{bidx:04d}_sub00.parquet'
    done_flag = FEATURE_DIR / f'batch_{bidx:04d}_sub00.parquet.done'
    report_path = FEATURE_DIR / f'batch_{bidx:04d}_sub00.report.json'

    feat_out_path = FEATURES_ONLY_DIR / f'features_{bidx:04d}.parquet'
    feat_done_flag = FEATURES_ONLY_DIR / f'features_{bidx:04d}.parquet.done'

    if not done_flag.exists() or not feat_done_flag.exists():
        df_batch, df_features, report = process_eicu_batch(batch_ids, step_min=5)

        if df_batch is None or (hasattr(df_batch, 'empty') and df_batch.empty) or (isinstance(df_batch, pd.DataFrame) and df_batch.shape[0] == 0):
            done_flag.write_text('empty')
            if report:
                report_path.write_text(json.dumps(report, ensure_ascii=False))
        else:
            df_batch.to_parquet(out_path, index=False)
            done_flag.write_text('ok')
            if report:
                report_path.write_text(json.dumps(report, ensure_ascii=False))

        if df_features is None or (hasattr(df_features, 'empty') and df_features.empty) or (isinstance(df_features, pd.DataFrame) and df_features.shape[0] == 0):
            feat_done_flag.write_text('empty')
        else:
            df_features.to_parquet(feat_out_path, index=False)
            feat_done_flag.write_text('ok')

        # quick progress summary for this batch
        if report:
            n_pat = len(report)
            total_rows = sum(r.get('n_rows', 0) for r in report)
            from collections import Counter
            exc = Counter()
            for r in report:
                exc.update(r.get('excluded', []))
            most_excluded = exc.most_common(5)
            pbar.set_postfix_str(f"proc={n_pat} stays, rows={total_rows}, top_exc={most_excluded[:3]}")

    if (bidx + 1) % 5 == 0:
        elapsed_min = (time.time() - start_time) / 60.0
        pbar.set_postfix_str(f"elapsed={elapsed_min:.1f}m")

elapsed_min = (time.time() - start_time) / 60.0
print(f'Done. Features saved in: {FEATURE_DIR} (elapsed {elapsed_min:.1f} min)')
print(f'Done. Engineered features saved in: {FEATURES_ONLY_DIR} (elapsed {elapsed_min:.1f} min)')

## Build HDF5

In [None]:
# Build HDF5 (eICU) with engineered static features
from pathlib import Path
import numpy as np
import pandas as pd
import tables

BATCH_DIR = Path(OUT_DIR) / "preprocess" / "batches" / "features_filtered"
ENGINEERED_DIR = Path(OUT_DIR) / "preprocess" / "engineered_features"
H5_PATH = Path(OUT_DIR) / "preprocess" / "h5" / "dataset_eicu.h5"
SPLIT_TSV = Path(OUT_DIR) / "split_all.tsv"

ID_COL = "patientunitstayid"
TIME_COL = "offset_min"
LABEL_COL = "falencia"
SEED = 42
TRAIN_FRAC = 0.8
VAL_FRAC = 0.1

# --- helpers ---
def ensure_groups(h5, groups):
    for g in groups:
        if f"/{g}" not in h5:
            h5.create_group("/", g, f"{g} group")


def create_earray(h5, path, atom, n_cols, expectedrows=10_000_000):
    parent, name = path.rsplit("/", 1)
    return h5.create_earray(parent, name, atom=atom, shape=(0, n_cols), expectedrows=expectedrows)


def build_split(stay_ids, seed, train_frac, val_frac):
    rng = np.random.default_rng(seed)
    stay_ids = np.array(sorted(stay_ids), dtype=int)
    rng.shuffle(stay_ids)
    n = len(stay_ids)
    n_train = int(n * train_frac)
    n_val = int(n * val_frac)
    return {
        "train": set(stay_ids[:n_train].tolist()),
        "val": set(stay_ids[n_train:n_train + n_val].tolist()),
        "test": set(stay_ids[n_train + n_val:].tolist()),
    }


def load_split_map(split_tsv, stay_ids, seed, train_frac, val_frac):
    if split_tsv.exists():
        df = pd.read_csv(split_tsv, sep="	")
        if {ID_COL, "split"}.issubset(df.columns):
            return dict(zip(df[ID_COL].astype(int), df["split"]))
        if {"stay_id", "split"}.issubset(df.columns):
            return dict(zip(df["stay_id"].astype(int), df["split"]))
    split_sets = build_split(stay_ids, seed, train_frac, val_frac)
    split_tsv.parent.mkdir(parents=True, exist_ok=True)
    with split_tsv.open("w", encoding="utf-8") as f:
        f.write("stay_id\tsplit\n")
        for split, ids in split_sets.items():
            for sid in sorted(ids):
                f.write(f"{sid}\t{split}\n")
    return {sid: split for split, ids in split_sets.items() for sid in ids}


paths = sorted(p for p in BATCH_DIR.glob("batch_*.parquet") if "_amostra" not in p.name)
if not paths:
    raise SystemExit(f"No batch_*.parquet found in {BATCH_DIR}")

# collect stay ids
stay_ids = set()
for p in paths:
    df_ids = pd.read_parquet(p, columns=[ID_COL])
    stay_ids.update(df_ids[ID_COL].dropna().astype(int).unique().tolist())

split_map = load_split_map(SPLIT_TSV, stay_ids, SEED, TRAIN_FRAC, VAL_FRAC)

# base columns
first_df = pd.read_parquet(paths[0])
drop_cols = {ID_COL, TIME_COL, "falencia", "falencia_point", "mortality"}
base_cols = [c for c in first_df.columns if c not in drop_cols]
if LABEL_COL not in first_df.columns:
    raise SystemExit(f"Label column '{LABEL_COL}' not found in {paths[0].name}")

# engineered (static) columns
engineered_cols = []
feat0 = ENGINEERED_DIR / "features_0000.parquet"
if feat0.exists():
    feat_df = pd.read_parquet(feat0)
    engineered_cols = [
        c for c in feat_df.columns
        if c not in {ID_COL, "mortality", "falencia", "falencia_point"}
    ]

H5_PATH.parent.mkdir(parents=True, exist_ok=True)
if H5_PATH.exists():
    H5_PATH.unlink()

with tables.open_file(H5_PATH, mode="w") as h5:
    ensure_groups(h5, ["data", "labels", "patient_windows"])
    if engineered_cols:
        ensure_groups(h5, ["static"])

    data_arrays = {}
    label_arrays = {}
    window_arrays = {}
    stay_id_arrays = {}
    static_arrays = {}

    for split in ["train", "val", "test"]:
        data_arrays[split] = create_earray(h5, f"/data/{split}", tables.Float32Atom(), n_cols=len(base_cols))
        label_arrays[split] = create_earray(h5, f"/labels/{split}", tables.Float32Atom(), n_cols=1)
        window_arrays[split] = create_earray(h5, f"/patient_windows/{split}", tables.Int32Atom(), n_cols=3)
        stay_id_arrays[split] = h5.create_earray("/patient_windows", f"{split}_stay_ids", atom=tables.Int32Atom(), shape=(0,))
        if engineered_cols:
            static_arrays[split] = create_earray(h5, f"/static/{split}", tables.Float32Atom(), n_cols=len(engineered_cols))

    for idx, path in enumerate(paths):
        print(f"Reading {path}")
        df = pd.read_parquet(path)
        df[ID_COL] = df[ID_COL].astype(int)
        if TIME_COL in df.columns:
            df = df.sort_values([ID_COL, TIME_COL])

        engineered_map = {}
        if engineered_cols:
            feat_path = ENGINEERED_DIR / f"features_{idx:04d}.parquet"
            if feat_path.exists():
                feat_df = pd.read_parquet(feat_path)
                feat_df[ID_COL] = feat_df[ID_COL].astype(int)
                feat_df = feat_df.set_index(ID_COL)[engineered_cols]
                engineered_map = {int(sid): row.to_numpy(dtype=np.float32, copy=False) for sid, row in feat_df.iterrows()}
            else:
                print(f"WARNING: missing engineered file {feat_path}, will fill NaNs")

        for stay_id, df_sid in df.groupby(ID_COL):
            split = split_map.get(int(stay_id), "train")
            d_arr = data_arrays[split]
            l_arr = label_arrays[split]
            w_arr = window_arrays[split]
            sid_arr = stay_id_arrays[split]

            start = d_arr.nrows
            base_feat = df_sid[base_cols].astype(np.float32).to_numpy()
            lbl = df_sid[[LABEL_COL]].astype(np.float32).to_numpy()
            d_arr.append(base_feat)
            l_arr.append(lbl)
            stop = d_arr.nrows

            w_arr.append(np.array([[start, stop, int(stay_id)]], dtype=np.int32))
            sid_arr.append(np.array([int(stay_id)], dtype=np.int32))

            if engineered_cols:
                static = engineered_map.get(int(stay_id))
                if static is None:
                    static_vals = np.full((1, len(engineered_cols)), np.nan, dtype=np.float32)
                else:
                    static_vals = static[None, :]
                static_arrays[split].append(static_vals.astype(np.float32, copy=False))

    h5.set_node_attr("/", "feature_names", base_cols)
    if engineered_cols:
        h5.set_node_attr("/", "static_feature_names", engineered_cols)
    h5.set_node_attr("/", "label_column", LABEL_COL)

print("H5 saved to", H5_PATH, "with", len(base_cols), "dynamic features and", len(engineered_cols), "static features")

In [None]:
# Build per-stay falencia summary (falencia_normal + 45/60min + mortality)
import sys
from pathlib import Path
import subprocess

PREPROCESS_DIR = Path(OUTPUT_DIR) / "preprocess"
INPUT_DIR = PREPROCESS_DIR / "batches" / "features_filtered"
OUT_CSV = PREPROCESS_DIR / "falencia_stay_summary.csv"
MORTALITY_PATH = PREPROCESS_DIR / "mortality_by_stay.csv"

cmd = [
    sys.executable,
    str(Path("src/clustering/minirocket/pipelines/build_falencia_stay_summary.py")),
    "--input_dir", str(INPUT_DIR),
    "--mortality_path", str(MORTALITY_PATH),
    "--out_csv", str(OUT_CSV),
    "--time_col", "offset_min",
    "--id_col", "patientunitstayid",
    "--mbp_col", "systemicmean",
    "--vaso_col", "vasopressor_ativo",
    "--lactate_col", "lactate",
    "--falencia_col", "falencia",
    "--step_min_45", "5", "--window_min_45", "45",
    "--step_min_60", "60", "--window_min_60", "60",
]
print(" ".join(cmd))
subprocess.run(cmd, check=True)
