
# Notebook Base — Predicción de Estado Glucémico (DM2)
**Objetivo.** Construir un pipeline reproducible (scikit-learn) para predecir **Normal / Prediabetes / Diabetes** a partir de variables antropométricas, demográficas y clínicas.  
**Dataset.** `output-glucosa_labeled.csv` (incluye `Resultado` en mg/dL y `Clase_DM` derivada por umbrales ADA).


In [12]:

# !pip install -q scikit-learn pandas matplotlib joblib
import os, json, math, itertools, warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pathlib import Path
from typing import List, Tuple, Dict

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (f1_score, balanced_accuracy_score, confusion_matrix,
                             classification_report, brier_score_loss, roc_auc_score)
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.inspection import permutation_importance
import joblib

RANDOM_STATE = 42
DATA_PATH = Path("/content/mnt/data/output-glucosa_labeled.csv")
assert DATA_PATH.exists(), f"No se encuentra el dataset en {DATA_PATH}"
print("Usando dataset:", DATA_PATH)


Usando dataset: /content/mnt/data/output-glucosa_labeled.csv


In [13]:

df = pd.read_csv(DATA_PATH)

print("Shape:", df.shape)
print("Columnas:", list(df.columns))

# Meta: target y clases
TARGET = "Clase_DM"   # {Normal, Prediabetes, Diabetes}
GLUCOSE = "Resultado" # mg/dL (no usar como feature si TARGET es Clase_DM)

# Conteos de clase
print("Distribución de clases:\n", df[TARGET].value_counts())


Shape: (100, 51)
Columnas: ['identificacion', 'Fecha_Fin_Registro', 'ips_codigo', 'nombres', 'apellidos', 'tipo_identificacion', 'fecha_nacimiento', 'edad', 'sexo', 'genero', 'Etniats', 'Poblacion_Victima', 'Condicion_Discapacidad', 'Zona_Residencia', 'direccion', 'telefono', 'Municipio', 'ocupacion', 'regimen', 'EPS', 'talla', 'peso', 'imc', 'imc_interpretacion', 'Obesidad_Grado', 'tas', 'tad', 'perimetro_abdominal', 'realiza_ejercicio', 'frecuencia_frutas', 'medicamentos_hta', 'Niveles_Altos_Glucosa', 'Dx_Diabetes_Tipo2_Familia', 'puntaje_total', 'Dm', 'tipo_dm', 'Consumo_Cigarrillo', 'Dx Enfermedad Cardiovascular', 'riesgo_dm', 'interpretacion', 'responsable_registro', 'servicio', 'Nombre_Completo', 'Edad_Años', 'Examen', 'Fecha_Examen', 'Grupo_Analito', 'Analito', 'Resultado', 'Regimen', 'Clase_DM']
Distribución de clases:
 Clase_DM
Prediabetes    47
Normal         46
Diabetes        7
Name: count, dtype: int64


In [14]:

# Columnas potencialmente con fuga (derivadas del examen de glucosa o del target)
BLACKLIST = [
    TARGET, GLUCOSE, "interpretacion", "Niveles_Altos_Glucosa",
    "Examen", "Analito", "Grupo_Analito", "Dm", "tipo_dm"
]

# Identificadores / PII que no deben entrar al modelo
PII = [
    "identificacion", "Nombre_Completo", "nombres", "apellidos",
    "telefono", "direccion", "responsable_registro", "Fecha_Fin_Registro",
    "fecha_nacimiento"
]

# Unificar listas según columnas presentes
BLACKLIST = [c for c in BLACKLIST if c in df.columns]
PII = [c for c in PII if c in df.columns]

# Selección automática de variables:
num_cols = df.select_dtypes(include=[np.number]).columns.difference(BLACKLIST + PII).tolist()
cat_cols = df.select_dtypes(include=["object"]).columns.difference(BLACKLIST + PII).tolist()

print("Numéricas:", num_cols[:15], ("..." if len(num_cols)>15 else ""))
print("Categóricas:", cat_cols[:15], ("..." if len(cat_cols)>15 else ""))

# Remover filas con target faltante (no debería)
df = df.dropna(subset=[TARGET]).copy()

# Train matrix
X = df[num_cols + cat_cols].copy()
y = df[TARGET].astype("category")
class_order = ["Normal", "Prediabetes", "Diabetes"]
y = y.cat.set_categories(class_order)  # asegura orden


Numéricas: ['Consumo_Cigarrillo', 'Edad_Años', 'edad', 'imc', 'ips_codigo', 'perimetro_abdominal', 'peso', 'puntaje_total', 'riesgo_dm', 'tad', 'talla', 'tas'] 
Categóricas: ['Condicion_Discapacidad', 'Dx Enfermedad Cardiovascular', 'Dx_Diabetes_Tipo2_Familia', 'EPS', 'Etniats', 'Fecha_Examen', 'Municipio', 'Obesidad_Grado', 'Poblacion_Victima', 'Regimen', 'Zona_Residencia', 'frecuencia_frutas', 'genero', 'imc_interpretacion', 'medicamentos_hta'] ...


In [15]:

# Preprocesamiento: imputación + One-Hot para categóricas
pre = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore", drop=None), cat_cols),
], remainder="drop")


In [16]:

# Baseline interpretable
logreg = LogisticRegression(max_iter=1000, multi_class="multinomial", random_state=RANDOM_STATE)

# Ensemble potente (sklearn puro para evitar dependencias)
gbc = GradientBoostingClassifier(random_state=RANDOM_STATE)

# Pipelines
pipe_logreg = Pipeline([("pre", pre), ("clf", logreg)])
pipe_gbc = Pipeline([("pre", pre), ("clf", gbc)])

# Versiones calibradas (probabilidades mejor calibradas)
cal_logreg = CalibratedClassifierCV(estimator=pipe_logreg, method="isotonic", cv=3)
cal_gbc   = CalibratedClassifierCV(estimator=pipe_gbc,   method="isotonic", cv=3)
models = {
    "logreg_cal": cal_logreg,
    "gbc_cal": cal_gbc
}


In [17]:

def dm_metrics(y_true, y_pred, labels):
    # Métricas foco: Recall(Diabetes) y Specificity(Diabetes)
    # Convertimos a binario: DM vs No-DM
    dm_label = "Diabetes"
    y_true_bin = (y_true == dm_label).astype(int)
    y_pred_bin = (y_pred == dm_label).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true_bin, y_pred_bin, labels=[0,1]).ravel()
    recall_dm = tp / (tp + fn + 1e-12)
    specificity_dm = tn / (tn + fp + 1e-12)
    return recall_dm, specificity_dm

def evaluate_cv(model, X, y, labels, n_splits=5, random_state=RANDOM_STATE):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    f1s, baccs, recalls_dm, specs_dm = [], [], [], []
    reports = []
    fold = 0
    for tr, va in skf.split(X, y):
        fold += 1
        X_tr, X_va = X.iloc[tr], X.iloc[va]
        y_tr, y_va = y.iloc[tr], y.iloc[va]
        model.fit(X_tr, y_tr)
        y_pred = model.predict(X_va)

        f1s.append(f1_score(y_va, y_pred, average="macro"))
        baccs.append(balanced_accuracy_score(y_va, y_pred))
        r_dm, s_dm = dm_metrics(y_va, y_pred, labels)
        recalls_dm.append(r_dm); specs_dm.append(s_dm)

        rep = classification_report(y_va, y_pred, labels=labels, output_dict=False, zero_division=0)
        reports.append(f"FOLD {fold}\n{rep}")
    results = {
        "f1_macro_mean": float(np.mean(f1s)),
        "bacc_mean": float(np.mean(baccs)),
        "recall_dm_mean": float(np.mean(recalls_dm)),
        "specificity_dm_mean": float(np.mean(specs_dm)),
        "f1_macro_std": float(np.std(f1s)),
        "bacc_std": float(np.std(baccs)),
        "recall_dm_std": float(np.std(recalls_dm)),
        "specificity_dm_std": float(np.std(specs_dm)),
        "reports": reports
    }
    return results

labels = class_order
all_results = {}
for name, mdl in models.items():
    print(f"Entrenando/Evaluando: {name}")
    res = evaluate_cv(mdl, X, y, labels)
    all_results[name] = res

pd.DataFrame([{**{"model":k}, **{m:v for m,v in v.items() if m.endswith("_mean")}} for k,v in all_results.items()])


Entrenando/Evaluando: logreg_cal
Entrenando/Evaluando: gbc_cal


Unnamed: 0,model,f1_macro_mean,bacc_mean,recall_dm_mean,specificity_dm_mean
0,logreg_cal,0.206897,0.333333,0.0,1.0
1,gbc_cal,0.418938,0.437037,0.0,1.0


In [20]:

# Elegir el modelo con mejor F1 macro
best_name = max(all_results.keys(), key=lambda k: all_results[k]["f1_macro_mean"])
best_model = models[best_name]
print("Mejor modelo según F1 macro:", best_name, all_results[best_name]["f1_macro_mean"])

# Entrenar en todo el dataset
best_model.fit(X, y)

# Guardar artefacto
OUT_DIR = Path("/mnt/data/artifacts_dm2")
OUT_DIR.mkdir(parents=True, exist_ok=True)
joblib.dump(best_model, OUT_DIR / f"{best_name}.joblib")
print("Modelo guardado en:", OUT_DIR / f"{best_name}.joblib")

# Importancias por permutación (puede tardar un poco pero n es pequeña)
# Obtener nombres de variables tras preprocesamiento
# Nota: CalibratedClassifierCV no expone get_feature_names; estimador base sí (pipe -> pre -> OHE)
from sklearn import set_config
set_config(transform_output="default")

base_pipe = best_model.estimator if hasattr(best_model, "estimator") else best_model
preproc = base_pipe.named_steps["pre"]
feature_names = []
# num
feature_names.extend(preproc.named_transformers_["num"].get_feature_names_out(num_cols).tolist())
# cat
cat_names = preproc.named_transformers_["cat"].get_feature_names_out(cat_cols).tolist()
feature_names.extend(cat_names)

r = permutation_importance(best_model, X, y, n_repeats=10, random_state=RANDOM_STATE, scoring="f1_macro")
imp = pd.DataFrame({"feature": feature_names, "importance_mean": r.importances_mean})
imp = imp.sort_values("importance_mean", ascending=False).head(20)
imp.to_csv(OUT_DIR / "top_features.csv", index=False)
imp.head(15)


Mejor modelo según F1 macro: gbc_cal 0.4189376966143691
Modelo guardado en: /mnt/data/artifacts_dm2/gbc_cal.joblib


AttributeError: 'ColumnTransformer' object has no attribute 'transformers_'

In [None]:

# Gráfico: Top features
plt.figure()
plt.barh(imp["feature"].iloc[::-1], imp["importance_mean"].iloc[::-1])
plt.title("Top 20 Features (Permutation Importance)")
plt.xlabel("Importancia media")
plt.tight_layout()
plt.show()

# Distribución de probabilidad de Diabetes
proba = best_model.predict_proba(X)
# Alinear índice de clase 'Diabetes'
cls_idx = list(best_model.classes_).index("Diabetes")
p_dm = proba[:, cls_idx]

plt.figure()
plt.hist(p_dm, bins=20)
plt.title("Distribución de Probabilidades — Clase 'Diabetes'")
plt.xlabel("P(Diabetes)")
plt.ylabel("Frecuencia")
plt.tight_layout()
plt.show()


In [None]:

# Ejemplo de inferencia (primeras 5 filas)
pred = best_model.predict(X.head(5))
proba = best_model.predict_proba(X.head(5))
print("Predicciones:", pred)
print("Probabilidades (por clase en orden):", list(best_model.classes_))
print(np.round(proba, 3))

# Política de umbrales (ejemplo): alertar si P(Diabetes) >= 0.5 o P(Prediabetes)+P(Diabetes) >= 0.7
def triage_policy(probs, classes):
    idx_dm = list(classes).index("Diabetes")
    idx_pre = list(classes).index("Prediabetes")
    flag = (probs[idx_dm] >= 0.5) or ((probs[idx_dm] + probs[idx_pre]) >= 0.7)
    return int(flag)

triage_flags = [triage_policy(p, best_model.classes_) for p in best_model.predict_proba(X.head(5))]
print("Flags de triage (1=priorizar):", triage_flags)


In [None]:

def slice_report(model, X, y, slice_col):
    if slice_col not in df.columns:
        print(f"Columna {slice_col} no está en el dataset.")
        return None
    data = X.copy()
    data[slice_col] = df[slice_col].values
    res = []
    for v in data[slice_col].dropna().unique():
        mask = data[slice_col] == v
        if mask.sum() < 10:
            continue
        y_true = y[mask]
        y_pred = model.predict(X[mask])
        f1m = f1_score(y_true, y_pred, average="macro", zero_division=0)
        bacc = balanced_accuracy_score(y_true, y_pred)
        r_dm, s_dm = dm_metrics(y_true, y_pred, class_order)
        res.append({"slice": f"{slice_col}={v}", "n": int(mask.sum()),
                    "f1_macro": f1m, "bacc": bacc, "recall_dm": r_dm, "specificity_dm": s_dm})
    return pd.DataFrame(res).sort_values("f1_macro", ascending=False)

for col in ["genero", "Municipio", "Zona_Residencia", "regimen"]:
    print(f"\nMétricas por cohorte: {col}")
    rep = slice_report(best_model, X, y, col)
    if rep is not None:
        display(rep)


In [None]:

summary = {
    "best_model": best_name,
    "metrics": {k:v for k,v in all_results[best_name].items() if k.endswith("_mean")},
    "artifact": str((OUT_DIR / f"{best_name}.joblib").resolve()),
    "top_features_csv": str((OUT_DIR / "top_features.csv").resolve())
}
with open(OUT_DIR / "summary.json", "w") as f:
    json.dump(summary, f, indent=2)
summary
