In [None]:
# ============================================================
# CLASIFICACIÓN BINARIA
# Entradas:
#   1) artifacts_preprocesamiento.zip   (contiene las tablas procesadas)
# Salida:
#   resultados.zip (historial de entrenamiento, metadatos y modelo final)
# ============================================================

# ============================================================
# Diseñador de red para clasificación binaria basado en heurísticas
# ============================================================

import math
from dataclasses import dataclass
from typing import List


@dataclass
class DisenoRedBinaria:
    capas: List[int]
    P: int
    rho: float
    l2: float
    dropouts: List[float]
    patience: int
    min_delta: float
    max_epochs: int


def clip(x: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, x))


def estimar_parametros(n0: int, capas: List[int]) -> int:
    """
    Cuenta parámetros de una red densa:
      - incluye sesgos en cada capa
      - incluye capa de salida (1 neurona)
    """
    if not capas:
        return 0

    P = (n0 + 1) * capas[0]
    for i in range(1, len(capas)):
        P += (capas[i - 1] + 1) * capas[i]
    P += (capas[-1] + 1) * 1
    return int(P)


def disenar_red_binaria(
    d: int,
    n0: int,
    *,
    k: int = 10,
    c1: float = 2.0,
    r: float = 0.5,
    n_min: int = 8,
    L_max: int = 4,
) -> DisenoRedBinaria:
    """
    Heurística análoga a la de regresión:
      - d  : número de muestras (train)
      - n0 : número de variables de entrada
    Devuelve:
      - DisenoRedBinaria
    """

    # 0. Tope adaptativo de ancho (según tamaño de muestra)
    n_max = min(1024, max(64, math.floor(0.25 * d)))

    # 1. Verificación mínima de viabilidad (suave)
    if d < 2 * n0:
        raise ValueError("Dataset muy pequeño: alto riesgo de sobreajuste")

    # 2. Presupuesto total de parámetros
    P_max = math.floor(k * d)

    # 3. Tamaño de la primera capa oculta (capado por presupuesto y por n_max)
    n1_cap_presupuesto = math.floor(P_max / (n0 + 1))
    n1 = min(math.floor(c1 * n0), n1_cap_presupuesto, n_max)

    if n1 < n_min:
        raise ValueError("Presupuesto insuficiente: no se puede ni una capa >= n_min")

    capas = [int(n1)]

    # 4. Construcción iterativa de capas ocultas (embudo)
    while True:
        if len(capas) >= L_max:
            break

        n_prev = capas[-1]
        n_new = math.floor(r * n_prev)

        if n_new < n_min:
            break

        n_new = min(n_new, n_max)
        capas.append(int(n_new))

    # 5. Estimación de parámetros (incluye sesgos y salida)
    P = estimar_parametros(n0, capas)

    # 6. Validación de complejidad (recorte iterativo)
    while P > P_max:
        if len(capas) > 1:
            capas.pop()
        else:
            n_old = capas[0]
            capas[0] = math.floor(0.9 * capas[0])  # reducción suave
            if capas[0] >= n_old:
                capas[0] = n_old - 1               # garantiza progreso
            if capas[0] < n_min:
                raise ValueError("Presupuesto insuficiente: no cabe una capa >= n_min")

        P = estimar_parametros(n0, capas)

    # ========================================================
    # 7. HIPERPARÁMETROS DE REGULARIZACIÓN (L2, Dropout, ES)
    # ========================================================

    # 7.1 Ocupación del presupuesto
    rho = P / P_max if P_max > 0 else 1.0

    # -------- Dropout base por tamaño de muestra --------
    if d < 2000:
        drop_base = 0.35
    elif d < 20000:
        drop_base = 0.25
    else:
        drop_base = 0.15

    # -------- Ajuste por ocupación rho --------
    if rho >= 0.8:
        drop = drop_base + 0.10
    elif rho >= 0.4:
        drop = drop_base
    else:
        drop = drop_base - 0.10
    drop = clip(drop, 0.05, 0.50)

    # Dropout por capa (más alto al inicio)
    dropouts: List[float] = []
    for i in range(1, len(capas) + 1):
        di = drop * (1.0 - 0.15 * (i - 1))
        di = clip(di, 0.05, 0.50)
        dropouts.append(float(di))

    # -------- L2 base por tamaño de muestra --------
    if d < 2000:
        l2_base = 1e-3
    elif d < 20000:
        l2_base = 3e-4
    else:
        l2_base = 1e-4

    # -------- Ajuste por ocupación rho --------
    if rho >= 0.8:
        l2 = 3.0 * l2_base
    elif rho >= 0.4:
        l2 = 1.0 * l2_base
    else:
        l2 = 0.3 * l2_base
    l2 = clip(l2, 1e-6, 3e-3)

    # -------- Early stopping (patience) --------
    if d < 2000:
        patience = 20
        max_epochs = 400
    elif d < 20000:
        patience = 15
        max_epochs = 200
    else:
        patience = 10
        max_epochs = 100

    # (opcional) min_delta fijo simple
    min_delta = 1e-4

    return DisenoRedBinaria(
        capas=capas,
        P=int(P),
        rho=float(rho),
        l2=float(l2),
        dropouts=dropouts,
        patience=int(patience),
        min_delta=float(min_delta),
        max_epochs=int(max_epochs),
    )


In [None]:
import pandas as pd
import numpy as np
import zipfile

import tensorflow as tf
from tensorflow import keras

# Reproducibilidad (opcional)
SEED = 7
np.random.seed(SEED)
tf.random.set_seed(SEED)

# ============================================================
# 1) Abrir ZIP y leer train/val/test
# ============================================================

# Ruta al ZIP
ZIP_PATH = "artifacts_preprocesamiento.zip"

def read_csv_from_zip(zip_path: str, csv_name: str) -> pd.DataFrame:
    with zipfile.ZipFile(zip_path, "r") as z:
        with z.open(csv_name) as f:
            return pd.read_csv(f)

train = read_csv_from_zip(ZIP_PATH, "train_final.csv")
val   = read_csv_from_zip(ZIP_PATH, "val_final.csv")
test  = read_csv_from_zip(ZIP_PATH, "test_final.csv")

# ============================================================
# 2) Definir target y armar X/y
# ============================================================
TARGET_COL = "target"

X_train = train.drop(columns=[TARGET_COL])
y_train = train[TARGET_COL].astype(int)

X_val = val.drop(columns=[TARGET_COL])
y_val = val[TARGET_COL].astype(int)

X_test = test.drop(columns=[TARGET_COL])
y_test = test[TARGET_COL].astype(int)

# d y n0
d  = X_train.shape[0]   # tamaño del entrenamiento
n0 = X_train.shape[1]   # número de variables de entrada

print("d =", d)
print("n0 =", n0)


In [None]:
# ============================================================
# 3) Configuración de evaluación (simetría vs clase crítica)
# ============================================================
#
# True  -> clases simétricas -> optimizamos Accuracy
# False -> clase 1 crítica   -> optimizamos F1 con Recall mínimo
#
# NOTA: Se usan en celdas 6 y 7 (alpha óptima + métricas finales)
# ============================================================

CLASES_SIMETRICAS = False
RECALL_MIN = 0.5  # solo si CLASES_SIMETRICAS = False


In [None]:
diseno = disenar_red_binaria(d, n0)
print(diseno)


In [None]:
capas     = diseno.capas
l2_value  = diseno.l2
dropouts  = diseno.dropouts
patience  = diseno.patience
min_delta = diseno.min_delta
max_epochs= diseno.max_epochs

def build_binary_mlp(n0: int, capas: list, l2_value: float, dropouts: list) -> keras.Model:
    assert len(capas) == len(dropouts), "capas y dropouts deben tener la misma longitud"

    model = keras.Sequential()
    model.add(keras.layers.Input(shape=(n0,)))

    for units, dr in zip(capas, dropouts):
        model.add(
            keras.layers.Dense(
                units,
                activation="relu",
                kernel_regularizer=keras.regularizers.l2(l2_value)
            )
        )
        model.add(keras.layers.Dropout(dr))

    # salida sigmoide (probabilidad clase 1)
    model.add(keras.layers.Dense(1, activation="sigmoid"))
    return model

model = build_binary_mlp(n0=n0, capas=capas, l2_value=l2_value, dropouts=dropouts)


In [None]:
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss="binary_crossentropy",
    metrics=[
        keras.metrics.BinaryAccuracy(name="accuracy", threshold=0.5),
    ],
)

model.summary()


In [None]:
# ============================================================
# 5) Entrenar + validar (EarlyStopping)
# ============================================================
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=patience,
        min_delta=min_delta,
        restore_best_weights=True,
        verbose=1
    )
]

BATCH_SIZE = 32  # puedes ajustar; lo dejo fijo

history = model.fit(
    X_train.astype(np.float32),
    y_train.values,
    validation_data=(X_val.astype(np.float32), y_val.values),
    epochs=max_epochs,
    batch_size=BATCH_SIZE,
    verbose=1,
    callbacks=callbacks
)


In [None]:
# ============================================================
# 6) Alpha óptima en validación + métricas finales en test
# ============================================================

def counts(y, yhat):
    TP = ((y == 1) & (yhat == 1)).sum()
    FP = ((y == 0) & (yhat == 1)).sum()
    TN = ((y == 0) & (yhat == 0)).sum()
    FN = ((y == 1) & (yhat == 0)).sum()
    return int(TP), int(FP), int(TN), int(FN)

def metrics_from_counts(TP, FP, TN, FN):
    denom = TP + TN + FP + FN
    acc = (TP + TN) / denom if denom else 0.0
    rec = TP / (TP + FN) if (TP + FN) else 0.0
    f1  = (2 * TP) / (2 * TP + FP + FN) if (2 * TP + FP + FN) else 0.0
    return float(acc), float(rec), float(f1)

def print_confusion_matrix(TP, FP, TN, FN, *, title="Matriz de confusión"):
    # Formato estándar:
    #            Pred 0     Pred 1
    # True 0       TN        FP
    # True 1       FN        TP
    print(f"\n{title}:")
    print("            Pred 0     Pred 1")
    print(f"True 0     {TN:8d}  {FP:8d}")
    print(f"True 1     {FN:8d}  {TP:8d}")

# ---- 6.1) Buscar alpha óptima en VALIDACIÓN ----
p_val = model.predict(X_val.astype(np.float32), verbose=0).ravel()

ALPHA_GRID = np.linspace(0.0, 1.0, 2001)  # paso 0.0005

best_alpha = None
best_val = -1.0

fallback_alpha = None
best_recall = -1.0

for a in ALPHA_GRID:
    yhat_val = (p_val >= a).astype(int)

    TP, FP, TN, FN = counts(y_val.values, yhat_val)
    acc, rec, f1 = metrics_from_counts(TP, FP, TN, FN)

    if CLASES_SIMETRICAS:
        # clases simétricas -> max accuracy
        if acc > best_val:
            best_alpha, best_val = float(a), float(acc)
    else:
        # clase 1 crítica -> max F1 con recall mínimo
        if rec >= RECALL_MIN and f1 > best_val:
            best_alpha, best_val = float(a), float(f1)

        # fallback: el que logre el mejor recall aunque no cumpla el mínimo
        if rec > best_recall:
            fallback_alpha, best_recall = float(a), float(rec)

ALPHA = best_alpha if best_alpha is not None else fallback_alpha

# ---- Mensajes obligatorios solicitados ----
metric_principal = "Accuracy" if CLASES_SIMETRICAS else "F1"

print("\n=== Configuración de decisión (binaria) ===")
print("Métrica principal:", metric_principal)
print("¿Clase positiva (1) crítica?:", (not CLASES_SIMETRICAS))
if not CLASES_SIMETRICAS:
    print("Recall mínimo exigido:", RECALL_MIN)
print("Alpha empleado (umbral):", ALPHA)

# ---- 6.2) Métricas finales en TEST usando ALPHA ----
p_test = model.predict(X_test.astype(np.float32), verbose=0).ravel()
yhat_test = (p_test >= ALPHA).astype(int)

TP, FP, TN, FN = counts(y_test.values, yhat_test)
acc, rec, f1 = metrics_from_counts(TP, FP, TN, FN)

test_metrics = {
    "TP": TP, "FP": FP, "TN": TN, "FN": FN,
    "accuracy": acc,
    "recall": rec,
    "f1": f1,
}

print_confusion_matrix(TP, FP, TN, FN, title="Matriz de confusión (TEST)")
# ---- Mostrar métricas como tabla ----
print("\nMétricas en test:")
print("-----------------------------------")
print(f"{'Métrica':<15} | {'Valor':>10}")
print("-----------------------------------")
print(f"{'Accuracy':<15} | {acc:10.4f}")
print(f"{'Recall':<15} | {rec:10.4f}")
print(f"{'F1':<15} | {f1:10.4f}")
print("-----------------------------------")

# ============================================================
# 7) Guardar resultados y empaquetar ZIP final
# ============================================================

import os
import json
import zipfile
import pandas as pd

OUT_DIR = "salida_binaria"
ZIP_NAME = "resultados.zip"

os.makedirs(OUT_DIR, exist_ok=True)

metadata = {
    "n_samples_train": int(d),
    "n_features": int(n0),
    "architecture": capas,
    "l2": float(l2_value),
    "dropouts": dropouts,
    "patience": int(patience),
    "min_delta": float(min_delta),
    "max_epochs": int(max_epochs),

    # --- claves para inferencia universal ---
    "alpha": float(ALPHA),

    # --- decisión / prioridad ---
    "clases_simetricas": bool(CLASES_SIMETRICAS),
    "clase_positiva_critica": bool(not CLASES_SIMETRICAS),
    "metrica_principal": metric_principal,
    "recall_min": None if CLASES_SIMETRICAS else float(RECALL_MIN),

    # --- resultados finales ---
    "metrics_test": test_metrics,
    "confusion_matrix_test": {
        "TN": TN, "FP": FP,
        "FN": FN, "TP": TP
    },
}

with open(os.path.join(OUT_DIR, "metadata.json"), "w") as f:
    json.dump(metadata, f, indent=2)

# Guardar modelo (ya entrenado)
model_path = os.path.join(OUT_DIR, "modelo.keras")
model.save(model_path)

# Guardar historial de entrenamiento (igual que regresión)
history_path = os.path.join(OUT_DIR, "historial_entrenamiento.csv")
pd.DataFrame(history.history).to_csv(history_path, index=False)

# Empaquetar todo en ZIP
with zipfile.ZipFile(ZIP_NAME, "w", zipfile.ZIP_DEFLATED) as zipf:
    for file in [model_path, history_path, os.path.join(OUT_DIR, "metadata.json")]:
        zipf.write(file, arcname=os.path.basename(file))

print(f"\n✔ ZIP generado correctamente: {ZIP_NAME}")
