# CatBoost

In [1]:
# ============================================
# CATBOOST — PRECISION-CONTROLLED v7.2
# (SSD/HDD) — Big-mode en TRAIN y TEST (streaming)
#   • TRAIN big: escanea fallos y carga subset (pos/near/ sample far)
#   • TEST 2024 streaming (sin cargar todo a RAM)
#   • HDD big-mode: sin rolling; VRAM guardrails (RTX 2060 6GB)
# ============================================

import os, gc, json, warnings
from datetime import datetime
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    precision_recall_curve, average_precision_score, precision_score,
    recall_score, f1_score, confusion_matrix
)
from catboost import CatBoostClassifier, Pool

warnings.filterwarnings("ignore")
pd.options.mode.chained_assignment = None


# ===================== UTILS =====================

def cleanup():
    gc.collect()

def downcast_df(df: pd.DataFrame) -> pd.DataFrame:
    for c in df.columns:
        if pd.api.types.is_float_dtype(df[c]):
            df[c] = pd.to_numeric(df[c], downcast="float")
        elif pd.api.types.is_integer_dtype(df[c]):
            df[c] = pd.to_numeric(df[c], downcast="integer")
    return df


# ===================== IO Helpers (streaming) =====================

def _list_parquet_files(path: str) -> List[str]:
    if os.path.isdir(path):
        return [os.path.join(path, f) for f in os.listdir(path) if f.endswith(".parquet")]
    return [path]

def _peek_row_group(path: str) -> pd.DataFrame:
    files = _list_parquet_files(path)
    for f in files:
        pf = pq.ParquetFile(f)
        if pf.num_row_groups > 0:
            tb = pf.read_row_group(0, columns=None)
            return tb.to_pandas().head(100)
    return pd.DataFrame()

def discover_smart_columns(path: str) -> List[str]:
    peek = _peek_row_group(path)
    if peek.empty:
        return []
    cols = peek.columns.tolist()
    return sorted([c for c in cols if ("smart" in c.lower()) and c.endswith("_raw")])


def _iter_parquet_row_groups(path: str, years: List[int], columns: Optional[List[str]] = None):
    files = _list_parquet_files(path)
    for f in files:
        pf = pq.ParquetFile(f)
        for i in range(pf.num_row_groups):
            tb = pf.read_row_group(i, columns=columns) if columns else pf.read_row_group(i)
            df = tb.to_pandas()
            if "date" not in df.columns:
                continue
            df["date"] = pd.to_datetime(df["date"], errors="coerce")
            mask = df["date"].dt.year.isin(years)
            if mask.any():
                yield downcast_df(df.loc[mask].reset_index(drop=True))
            del df, tb
            cleanup()


def load_data(path: str, years: List[int], columns: Optional[List[str]] = None) -> pd.DataFrame:
    print(f"Loading {years} from {path}...")
    chunks = []
    for df in _iter_parquet_row_groups(path, years, columns=columns):
        chunks.append(df)
        if len(chunks) >= 16:
            chunks = [pd.concat(chunks, ignore_index=True)]
            cleanup()
    out = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
    print(f"Loaded {len(out):,} rows")
    return out


# ===== BIG MODE: pasada 1 – mapa de fallos =====

def scan_fail_dates(path: str, years: List[int]) -> Dict[str, pd.Timestamp]:
    print("Scanning earliest failure dates (streaming)...")
    fail_map: Dict[str, pd.Timestamp] = {}
    for df in _iter_parquet_row_groups(path, years, columns=["serial_number", "date", "failure"]):
        df = df[df["failure"] == 1]
        if df.empty:
            continue
        sns = df["serial_number"].astype(str).values
        dts = pd.to_datetime(df["date"], errors="coerce")
        for sn, dt in zip(sns, dts):
            if pd.isna(dt):
                continue
            if sn not in fail_map or dt < fail_map[sn]:
                fail_map[sn] = dt
    print(f"  Found {len(fail_map):,} failed serials")
    return fail_map


# ===== BIG MODE: pasada 2 – filtrado de filas =====

def load_data_big_filtered(
    path: str,
    years: List[int],
    fail_map: Dict[str, pd.Timestamp],
    lookahead_days: int,
    hard_window: int,
    neg_random_keep_rate: float = 0.0025,
    columns: Optional[List[str]] = None,
) -> pd.DataFrame:
    """
    TRAIN big: conserva todos positivos, negativos cercanos y una muestra de lejanos.
    NaT-safe y causal por disco (pero sin rolling), como en tu RF v5s4.
    """
    print(f"Loading BIG-FILTERED {years} from {path} (keep_rate={neg_random_keep_rate})...")
    rng = np.random.default_rng(42)
    kept = []
    total_rows = 0
    kept_rows = 0

    for df in _iter_parquet_row_groups(path, years, columns=columns):
        total_rows += len(df)
        sn_ser = df["serial_number"].astype(str)
        dt = pd.to_datetime(df["date"], errors="coerce").to_numpy(dtype="datetime64[D]")
        fail_series = sn_ser.map(fail_map)
        fdt = pd.to_datetime(fail_series, errors="coerce").to_numpy(dtype="datetime64[D]")

        dtf = np.full(len(df), 10**9, dtype=np.int64)
        valid = ~np.isnat(fdt)
        if valid.any():
            dd = (fdt[valid] - dt[valid]).astype("timedelta64[D]").astype("int64")
            dtf[valid] = dd

        failure = (df["failure"].values == 1)
        pos_mask = failure | ((dtf >= 0) & (dtf <= lookahead_days))
        near_mask = (dtf > lookahead_days) & (dtf <= hard_window)

        keep = pos_mask | near_mask
        far_neg = (~keep) & (~failure)
        if far_neg.any():
            sample = rng.random(far_neg.sum()) < neg_random_keep_rate
            sel = np.zeros_like(far_neg, dtype=bool)
            sel[np.where(far_neg)[0]] = sample
            keep = keep | sel

        kept.append(df.loc[keep].reset_index(drop=True))
        kept_rows += int(keep.sum())

        if sum(len(x) for x in kept) > 3_000_000:
            kept = [pd.concat(kept, ignore_index=True)]
            cleanup()

    out = pd.concat(kept, ignore_index=True) if kept else pd.DataFrame()
    rate = 100 * kept_rows / max(1, total_rows)
    print(f"  BIG-FILTERED kept {kept_rows:,}/{total_rows:,} rows (~{rate:.2f}%)")
    return out


# ===================== PREP =====================

def prepare_df(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["serial_number"] = df["serial_number"].astype(str)
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["serial_number", "date"])
    df = df.sort_values(["serial_number", "date"]).reset_index(drop=True)
    return df


# ===================== LABELS =====================

def compute_days_to_failure(dfs: pd.DataFrame) -> np.ndarray:
    fail_map: Dict[str, pd.Timestamp] = {}
    fails = dfs[dfs["failure"] == 1]
    for sn, dt in zip(fails["serial_number"], fails["date"]):
        fail_map[sn] = min(dt, fail_map.get(sn, dt))
    dtf = np.full(len(dfs), 10**9, dtype=np.int64)
    for i, (sn, dt) in enumerate(zip(dfs["serial_number"], dfs["date"])):
        if sn in fail_map:
            dtf[i] = (fail_map[sn] - dt).days
    return dtf

def create_labels_from_dtf(dtf: np.ndarray, lookahead: int = 7) -> np.ndarray:
    return ((dtf >= 0) & (dtf <= lookahead)).astype(np.int8)


# ===================== CATEGÓRICOS & NORMALIZACIÓN =====================

def extract_vendor(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.strip()
    return s.str.extract(r"^([A-Za-z]+)", expand=False).fillna("UNK")

def fit_model_stats(df_train: pd.DataFrame, smart_cols: List[str]) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, Tuple[float, float]]]:
    """
    Stats robustas SOLO en TRAIN:
      - Por modelo: median e IQR (p50, q3-q1) => stats_model[model][col] = (med, iqr)
      - Fallback global: median e IQR global por col => stats_global[col] = (med, iqr)
    """
    stats_model: Dict[str, Dict[str, Tuple[float, float]]] = {}
    stats_global: Dict[str, Tuple[float, float]] = {}

    if 'model' in df_train.columns and not df_train.empty:
        for c in smart_cols:
            xg = pd.to_numeric(df_train[c], errors='coerce').dropna().values
            if xg.size:
                med = float(np.median(xg))
                q1, q3 = np.percentile(xg, [25, 75])
                stats_global[c] = (med, float(max(q3 - q1, 1e-6)))
            else:
                stats_global[c] = (0.0, 1.0)

        for m, g in df_train.groupby(df_train['model'].astype(str)):
            d = {}
            for c in smart_cols:
                x = pd.to_numeric(g[c], errors='coerce').dropna().values
                if x.size:
                    med = float(np.median(x)); q1, q3 = np.percentile(x, [25, 75])
                    d[c] = (med, float(max(q3 - q1, 1e-6)))
            stats_model[str(m)] = d
    return stats_model, stats_global

def apply_model_normalization_inplace(df_all: pd.DataFrame, smart_cols: List[str],
                                     stats_model: Dict[str, Dict[str, Tuple[float, float]]],
                                     stats_global: Dict[str, Tuple[float, float]]):
    model_series = df_all['model'].astype(str)
    for c in smart_cols:
        med_map = {m: v[c][0] for m, v in stats_model.items() if c in v}
        iqr_map = {m: v[c][1] for m, v in stats_model.items() if c in v}
        g_med, g_iqr = stats_global.get(c, (0.0, 1.0))
        med_s = model_series.map(med_map).fillna(g_med).astype(np.float32)
        iqr_s = model_series.map(iqr_map).fillna(g_iqr).astype(np.float32)

        col = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0).astype(np.float32)
        z = (col - med_s) / (iqr_s + 1e-6)
        df_all[f'z_{c}'] = z.values
        df_all[f'log1p_{c}'] = np.log1p(np.maximum(col.values, 0.0)).astype(np.float32)


# ===================== FEATURES (JOIN TRAIN∪DEV∪TEST, RAM) =====================

def create_features_joined_cat_v7(
    df_train: pd.DataFrame,
    df_dev: pd.DataFrame,
    df_test: pd.DataFrame,
    dataset_type: str,
    add_rolling: bool,
    smart_cols: Optional[List[str]] = None
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, np.ndarray, np.ndarray, np.ndarray, List[str], List[int], Dict, Dict]:
    print(f"Creating features (temporal join) for {dataset_type} with TRAIN∪DEV∪TEST...")

    df_train = df_train.copy(); df_train['__subset__'] = 'train'
    df_dev   = df_dev.copy();   df_dev['__subset__']   = 'dev'
    df_test  = df_test.copy();  df_test['__subset__']  = 'test'
    df_all = pd.concat([df_train, df_dev, df_test], ignore_index=True)
    df_all.sort_values(['serial_number','date'], inplace=True)

    if smart_cols is None:
        all_cols = df_all.columns.tolist()
        smart_cols = [c for c in all_cols if ('smart' in c.lower()) and ('_raw' in c.lower())]
    print(f"  Found {len(smart_cols)} SMART attributes (raw)")

    for c in smart_cols:
        df_all[c] = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0)

    # Delta / cummax
    for c in smart_cols:
        g = df_all.groupby('serial_number', sort=False)[c]
        df_all[f'delta_{c}'] = g.diff().fillna(0.0)
        df_all[f'max_{c}']   = g.cummax()

    # Rolling 7d (evitar en HDD big)
    if add_rolling:
        for c in smart_cols:
            r = df_all.groupby('serial_number', sort=False)[c]
            df_all[f'rmean7_{c}'] = r.rolling(window=7, min_periods=2).mean().reset_index(level=0, drop=True).fillna(0.0)
            df_all[f'rstd7_{c}']  = r.rolling(window=7, min_periods=2).std().reset_index(level=0, drop=True).fillna(0.0)

    # Edad / calendario
    df_all['age_days'] = df_all.groupby('serial_number', sort=False).cumcount()
    df_all['month'] = df_all['date'].dt.month
    df_all['day_of_week'] = df_all['date'].dt.dayofweek

    # Categóricas nativas
    if 'model' not in df_all.columns:
        df_all['model'] = 'UNK'
    df_all['model'] = df_all['model'].astype(str).fillna('UNK')
    df_all['vendor'] = extract_vendor(df_all['model']).astype(str).fillna('UNK')

    # Normalización por modelo (aprendida en TRAIN)
    stats_model, stats_global = fit_model_stats(df_train, smart_cols)
    apply_model_normalization_inplace(df_all, smart_cols, stats_model, stats_global)

    # Construcción de X
    drop_cols = ['serial_number', 'date', 'failure']
    X_all = df_all.drop(columns=[c for c in drop_cols if c in df_all.columns], errors='ignore')

    cat_cols = [c for c in ['model','vendor'] if c in X_all.columns]
    num_cols = [c for c in X_all.columns if c not in cat_cols]
    for c in num_cols:
        X_all[c] = pd.to_numeric(X_all[c], errors='coerce').fillna(0.0).astype(np.float32)

    # elimina constantes
    var = X_all[num_cols].var()
    keep_num = var[var > 0].index.tolist()
    X_all = pd.concat([X_all[cat_cols], X_all[keep_num]], axis=1)

    feature_names = list(X_all.columns)
    cat_indices = [X_all.columns.get_loc(c) for c in cat_cols]

    # Split
    tr_mask  = (df_all['__subset__']=='train').values
    dev_mask = (df_all['__subset__']=='dev').values
    te_mask  = (df_all['__subset__']=='test').values

    X_train = X_all.loc[tr_mask].reset_index(drop=True)
    X_dev   = X_all.loc[dev_mask].reset_index(drop=True)
    X_test  = X_all.loc[te_mask].reset_index(drop=True)

    print(f"  Final features: {X_train.shape[1]} (cat={len(cat_indices)}, num≈{len(keep_num)})")
    return X_train, X_dev, X_test, None, None, None, feature_names, cat_indices, {'stats_model': stats_model}, {'stats_global': stats_global, 'smart_cols': smart_cols}


# ===================== GROUPED CV =====================

def make_group_folds(serials: pd.Series, y: np.ndarray, n_splits: int = 5, random_state: int = 42):
    serials = serials.astype(str).values
    uniq_serials, inverse = np.unique(serials, return_inverse=True)
    y_disk = np.zeros(len(uniq_serials), dtype=np.int8)
    for idx_row, disk_idx in enumerate(inverse):
        if y[idx_row] == 1:
            y_disk[disk_idx] = 1
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    for tr_d, va_d in skf.split(uniq_serials, y_disk):
        tr_mask = np.isin(inverse, tr_d)
        va_mask = np.isin(inverse, va_d)
        yield np.where(tr_mask)[0], np.where(va_mask)[0]


# ===================== NEGATIVE SAMPLING =====================

def sample_negatives_hard(
    X: pd.DataFrame, y: np.ndarray, dtf: np.ndarray, lookahead: int,
    neg_pos_ratio: int = 3, hard_window: int = 60, hard_fraction: float = 0.7,
    seed: int = 42,
):
    rng = np.random.default_rng(seed)
    pos_idx = np.where(y == 1)[0]
    if len(pos_idx) == 0:
        raise ValueError("No positives in training fold for hard-negative sampling")

    n_pos = len(pos_idx)
    n_neg_needed = max(n_pos * neg_pos_ratio, 1)

    hard_mask = (dtf > lookahead) & (dtf <= hard_window)
    hard_idx = np.where((y == 0) & hard_mask)[0]
    easy_idx = np.where((y == 0) & (~hard_mask))[0]

    n_hard = min(int(n_neg_needed * hard_fraction), len(hard_idx))
    n_easy = min(n_neg_needed - n_hard, len(easy_idx))

    chosen_hard = rng.choice(hard_idx, size=n_hard, replace=False) if n_hard > 0 else np.empty(0, dtype=int)
    chosen_easy = rng.choice(easy_idx, size=n_easy, replace=False) if n_easy > 0 else np.empty(0, dtype=int)

    sel_idx = np.sort(np.concatenate([pos_idx, chosen_hard, chosen_easy]))
    Xb = X.iloc[sel_idx].reset_index(drop=True)
    yb = y[sel_idx]
    return Xb, yb, sel_idx


# ===================== CATBOOST (GPU) =====================

def get_catboost(
    depth: int = 8,
    iterations: int = 1500,
    learning_rate: float = 0.06,
    l2_leaf_reg: float = 4.0,
    border_count: int = 64,
    random_seed: int = 42,
    gpu_ram_part: float = 0.70,
    bootstrap_type: str = 'Bernoulli',
    subsample: float = 0.8,
):
    return CatBoostClassifier(
        loss_function='Logloss',
        eval_metric='AUC',
        iterations=iterations,
        depth=depth,
        learning_rate=learning_rate,
        l2_leaf_reg=l2_leaf_reg,
        border_count=border_count,
        random_seed=random_seed,
        task_type='GPU',
        devices='0',
        gpu_ram_part=gpu_ram_part,
        auto_class_weights=None,
        bootstrap_type=bootstrap_type,
        subsample=subsample,
        logging_level='Silent',
        allow_writing_files=False
    )


# ===================== THRESHOLDING & EVAL =====================

def pick_threshold_precision_first(
    y_true: np.ndarray, proba: np.ndarray,
    min_precision: float = 0.90, min_recall: float = 0.03,
    top_k_rate: float = 1e-4, min_alerts: int = 5
):
    precision, recall, thr = precision_recall_curve(y_true, proba)
    pr_auc = average_precision_score(y_true, proba)

    valid = (precision >= min_precision) & (recall >= min_recall)
    if valid.any():
        idxs = np.where(valid)[0]
        idx = idxs[-1] - 1 if idxs[-1] >= len(thr) else idxs[-1]
        idx = max(0, min(idx, len(thr)-1))
        chosen = thr[idx]
    else:
        k = max(max(1, min_alerts), int(len(proba) * max(top_k_rate, 1e-6)))
        chosen = float(np.partition(proba, -k)[-k])

    return float(chosen), float(pr_auc)

def metrics_at_threshold(y_true: np.ndarray, proba: np.ndarray, thr: float) -> Dict:
    y_pred = (proba >= thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return {
        'precision': float(precision_score(y_true, y_pred, zero_division=0)),
        'recall': float(recall_score(y_true, y_pred, zero_division=0)),
        'f1': float(f1_score(y_true, y_pred, zero_division=0)),
        'confusion_matrix': [[int(tn), int(fp)], [int(fn), int(tp)]],
        'fpr': float(fp / (fp + tn)) if (fp + tn) > 0 else 0.0
    }


# ===================== MAIN PIPELINE (v7.2 con BIG switches) =====================

def train_catboost_precision_pipeline_v72(
    train_parquet: str,
    test_parquet: str | None,
    dataset_type: str,
    train_years: List[int] = [2020, 2021, 2022],
    dev_years: List[int]   = [2023],
    test_years: List[int]  = [2024],
    lookahead_days: int = 7,
    n_splits: int = 5,
    neg_pos_ratio: int = 3,        # SSD=3, HDD=5
    hard_window: int = 60,         # HDD=90
    hard_fraction: float = 0.7,
    cb_depth: int = 8,
    cb_iterations: int = 1500,
    cb_learning_rate: float = 0.06,
    cb_l2_leaf_reg: float = 4.0,
    cb_border_count: int = 64,
    cb_bootstrap_type: str = 'Bernoulli',
    cb_subsample: float = 0.8,
    cb_gpu_ram_part: float = 0.70,
    min_precision: float = 0.90,
    min_recall: float = 0.03,
    top_k_rate: float = 1e-4,
    min_alerts: int = 20,
    output_dir: str = './models_cb_precision_v72',
    random_state: int = 42,
    # BIG mode
    big_mode: bool = False,                 # TRAIN big
    dev_big_mode: bool = False,             # DEV big (útil en HDD)
    test_big_mode: bool = False,            # TEST streaming
    neg_random_keep_rate_train: float = 0.0025,
    neg_random_keep_rate_dev: float = 0.01,
    add_rolling: bool = True,               # desactiva rolling en HDD big
):
    print("="*100)
    print(f"CATBOOST (GPU, PRECISION-CONTROLLED v7.2) - {dataset_type.upper()}")
    print("="*100)
    os.makedirs(output_dir, exist_ok=True)

    # Descubrir SMART para leer solo lo necesario en BIG mode
    smart_cols_scan = discover_smart_columns(train_parquet)
    base_cols = ['serial_number', 'date', 'failure', 'model', 'capacity_bytes']
    read_cols = base_cols + smart_cols_scan

    # ---------- Load TRAIN ----------
    if big_mode:
        fail_map_train = scan_fail_dates(train_parquet, train_years)
        df_tr_raw = load_data_big_filtered(
            train_parquet, train_years, fail_map_train,
            lookahead_days=lookahead_days, hard_window=hard_window,
            neg_random_keep_rate=neg_random_keep_rate_train, columns=read_cols
        )
    else:
        df_tr_raw = load_data(train_parquet, train_years, columns=read_cols)

    # ---------- Load DEV ----------
    if dev_years:
        if dev_big_mode:
            fail_map_dev = scan_fail_dates(train_parquet, dev_years)
            df_dev_raw = load_data_big_filtered(
                train_parquet, dev_years, fail_map_dev,
                lookahead_days=lookahead_days, hard_window=hard_window,
                neg_random_keep_rate=neg_random_keep_rate_dev, columns=read_cols
            )
        else:
            df_dev_raw = load_data(train_parquet, dev_years, columns=read_cols)
    else:
        df_dev_raw = pd.DataFrame()

    # ---------- Load TEST (si no streaming) ----------
    if test_parquet and test_years and (not test_big_mode):
        df_te_raw = load_data(test_parquet, test_years, columns=read_cols)
    else:
        df_te_raw = pd.DataFrame()

    if df_tr_raw.empty:
        raise ValueError("Training data is empty!")

    # ---------- Prepare ----------
    df_tr  = prepare_df(df_tr_raw)
    df_dev = prepare_df(df_dev_raw) if not df_dev_raw.empty else pd.DataFrame()
    df_te  = prepare_df(df_te_raw) if not df_te_raw.empty else pd.DataFrame()

    # ---------- Labels ----------
    dtf_tr  = compute_days_to_failure(df_tr)
    y_tr    = create_labels_from_dtf(dtf_tr, lookahead_days)
    print(f"TRAIN labels: {int(y_tr.sum()):,} pos ({100*y_tr.mean():.4f}%)")
    if y_tr.sum() < 50:
        raise ValueError(f"Insufficient positive samples in TRAIN: {y_tr.sum()}")

    dtf_dev = compute_days_to_failure(df_dev) if not df_dev.empty else np.array([], dtype=np.int64)
    y_dev   = create_labels_from_dtf(dtf_dev, lookahead_days) if not df_dev.empty else np.array([], dtype=np.int8)

    # ---------- Features (RAM, sin TEST si streaming) ----------
    X_tr, X_dev, X_te, _, _, _, feature_names, cat_indices, stats_model_wrap, stats_global_wrap = create_features_joined_cat_v7(
        df_tr, df_dev, df_te, dataset_type, add_rolling=add_rolling, smart_cols=smart_cols_scan
    )
    stats_model = stats_model_wrap['stats_model']
    stats_global = stats_global_wrap['stats_global']
    smart_cols = stats_global_wrap['smart_cols']

    # ---------- CV por disco (OOF diagnóstico) ----------
    serials_tr = df_tr['serial_number']
    oof_proba = np.zeros(len(y_tr), dtype=np.float32)
    fold_metrics = []

    for fold, (tr_idx, va_idx) in enumerate(make_group_folds(serials_tr, y_tr, n_splits=n_splits, random_state=random_state), start=1):
        X_tr_fold, y_tr_fold = X_tr.iloc[tr_idx].reset_index(drop=True), y_tr[tr_idx]
        dtf_tr_fold = dtf_tr[tr_idx]

        X_va_fold, y_va_fold = X_tr.iloc[va_idx].reset_index(drop=True), y_tr[va_idx]

        print(f"\nFold {fold}/{n_splits}: train={len(y_tr_fold):,} (pos={int(y_tr_fold.sum()):,}) | val={len(y_va_fold):,} (pos={int(y_va_fold.sum()):,})")

        Xb, yb, _ = sample_negatives_hard(
            X_tr_fold, y_tr_fold, dtf_tr_fold,
            lookahead=lookahead_days,
            neg_pos_ratio=neg_pos_ratio,
            hard_window=hard_window,
            hard_fraction=hard_fraction,
            seed=random_state,
        )
        print(f"  After hard-neg sampling: {len(yb):,} (pos={int(yb.sum()):,}, neg={len(yb)-int(yb.sum()):,})")

        model = get_catboost(
            depth=cb_depth, iterations=cb_iterations, learning_rate=cb_learning_rate,
            l2_leaf_reg=cb_l2_leaf_reg, border_count=cb_border_count,
            bootstrap_type=cb_bootstrap_type, subsample=cb_subsample,
            gpu_ram_part=cb_gpu_ram_part, random_seed=random_state
        )
        train_pool = Pool(Xb, label=yb, cat_features=cat_indices)
        valid_pool = Pool(X_va_fold, label=y_va_fold, cat_features=cat_indices)
        model.fit(train_pool, eval_set=valid_pool, use_best_model=True, early_stopping_rounds=200)

        proba_va = model.predict_proba(valid_pool)[:, 1]
        thr_oof, pr_auc = pick_threshold_precision_first(
            y_va_fold, proba_va, min_precision=min_precision, min_recall=min_recall,
            top_k_rate=top_k_rate, min_alerts=min_alerts
        )
        oof_proba[va_idx] = proba_va
        m = metrics_at_threshold(y_va_fold, proba_va, thr_oof)
        m.update({'pr_auc': float(pr_auc), 'threshold': float(thr_oof), 'fold': int(fold),
                  'best_iteration': int(model.get_best_iteration())})
        fold_metrics.append(m)

        del X_tr_fold, y_tr_fold, X_va_fold, y_va_fold, Xb, yb, model, train_pool, valid_pool
        cleanup()

    # ---------- OOF (diag, no prod) ----------
    thr_oof_global, pr_auc_oof = pick_threshold_precision_first(
        y_tr, oof_proba, min_precision=min_precision, min_recall=min_recall,
        top_k_rate=top_k_rate, min_alerts=min_alerts
    )
    agg_metrics = metrics_at_threshold(y_tr, oof_proba, thr_oof_global)
    agg_metrics.update({'pr_auc': float(pr_auc_oof), 'threshold': float(thr_oof_global)})
    print("\nOOF (diag, not used for prod):")
    print(json.dumps(agg_metrics, indent=2))

    # ---------- Calibración UMBRAL en DEV ----------
    thr_prod = float(thr_oof_global)
    dev_metrics = None
    if not df_dev.empty:
        Xb_full, yb_full, _ = sample_negatives_hard(
            X_tr, y_tr, dtf_tr, lookahead=lookahead_days,
            neg_pos_ratio=neg_pos_ratio, hard_window=hard_window,
            hard_fraction=hard_fraction, seed=random_state
        )
        final_model_dev = get_catboost(
            depth=cb_depth, iterations=cb_iterations, learning_rate=cb_learning_rate,
            l2_leaf_reg=cb_l2_leaf_reg, border_count=cb_border_count,
            bootstrap_type=cb_bootstrap_type, subsample=cb_subsample,
            gpu_ram_part=cb_gpu_ram_part, random_seed=random_state
        )
        train_pool_full = Pool(Xb_full, label=yb_full, cat_features=cat_indices)
        final_model_dev.fit(train_pool_full, use_best_model=False)

        dev_pool = Pool(X_dev, label=y_dev, cat_features=cat_indices)
        proba_dev = final_model_dev.predict_proba(dev_pool)[:, 1]
        thr_prod, pr_auc_dev = pick_threshold_precision_first(
            y_dev, proba_dev, min_precision=min_precision, min_recall=min_recall,
            top_k_rate=top_k_rate, min_alerts=min_alerts
        )
        dev_metrics = metrics_at_threshold(y_dev, proba_dev, thr_prod)
        dev_metrics.update({'pr_auc': float(pr_auc_dev), 'threshold': float(thr_prod)})
        print("\nDEV calibration metrics (used for PROD threshold):")
        print(json.dumps(dev_metrics, indent=2))

        del final_model_dev, train_pool_full, dev_pool
        cleanup()

    # ---------- FINAL model ----------
    Xb_full, yb_full, _ = sample_negatives_hard(
        X_tr, y_tr, dtf_tr, lookahead=lookahead_days,
        neg_pos_ratio=neg_pos_ratio, hard_window=hard_window,
        hard_fraction=hard_fraction, seed=random_state
    )
    final_model = get_catboost(
        depth=cb_depth, iterations=cb_iterations, learning_rate=cb_learning_rate,
        l2_leaf_reg=cb_l2_leaf_reg, border_count=cb_border_count,
        bootstrap_type=cb_bootstrap_type, subsample=cb_subsample,
        gpu_ram_part=cb_gpu_ram_part, random_seed=random_state
    )
    train_pool_full = Pool(Xb_full, label=yb_full, cat_features=cat_indices)
    final_model.fit(train_pool_full, use_best_model=False)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    model_prefix = os.path.join(output_dir, f"{dataset_type}_cb_precv72_{timestamp}")
    final_model.save_model(f"{model_prefix}.cbm")
    features_meta = {
        'feature_names': feature_names,
        'cat_indices': cat_indices,
        'threshold': float(thr_prod),
        'calibration': 'dev_years' if not df_dev.empty else 'oof_global',
        'catboost_params': {
            'depth': cb_depth, 'iterations': cb_iterations, 'learning_rate': cb_learning_rate,
            'l2_leaf_reg': cb_l2_leaf_reg, 'border_count': cb_border_count,
            'bootstrap_type': cb_bootstrap_type, 'subsample': cb_subsample,
            'gpu_ram_part': cb_gpu_ram_part
        },
        'smart_cols': smart_cols,
        'stats_model': stats_model,   # para normalización streaming
        'stats_global': stats_global  # fallback global
    }
    with open(f"{model_prefix}_features.json", 'w') as f:
        json.dump(features_meta, f, indent=2)
    print(f"\n✓ Final model saved: {model_prefix}.cbm")

    # ---------- TEST (RAM normal) opcional ----------
    test_metrics = None
    if not df_te.empty and (not test_big_mode):
        y_test = create_labels_from_dtf(compute_days_to_failure(df_te), lookahead_days)
        test_pool = Pool(X_te, label=y_test, cat_features=cat_indices)
        proba_test = final_model.predict_proba(test_pool)[:, 1]
        test_metrics = metrics_at_threshold(y_test, proba_test, float(thr_prod))
        test_metrics.update({'pr_auc': float(average_precision_score(y_test, proba_test)), 'threshold_used': float(thr_prod)})

    # ---------- Metadata ----------
    metadata = {
        'dataset_type': dataset_type,
        'train_years': train_years, 'dev_years': dev_years, 'test_years': test_years,
        'lookahead_days': lookahead_days,
        'n_splits': n_splits,
        'neg_pos_ratio': neg_pos_ratio, 'hard_window': hard_window, 'hard_fraction': hard_fraction,
        'catboost_params': {
            'depth': cb_depth, 'iterations': cb_iterations, 'learning_rate': cb_learning_rate,
            'l2_leaf_reg': cb_l2_leaf_reg, 'border_count': cb_border_count,
            'bootstrap_type': cb_bootstrap_type, 'subsample': cb_subsample,
            'task_type': 'GPU', 'gpu_ram_part': cb_gpu_ram_part, 'logging_level': 'Silent'
        },
        'min_precision': min_precision, 'min_recall': min_recall,
        'top_k_rate': top_k_rate, 'min_alerts': min_alerts,
        'oof_metrics': {
            'precision': agg_metrics['precision'], 'recall': agg_metrics['recall'],
            'f1': agg_metrics['f1'], 'confusion_matrix': agg_metrics['confusion_matrix'],
            'fpr': agg_metrics['fpr'], 'pr_auc': agg_metrics['pr_auc'],
            'threshold': agg_metrics['threshold']
        },
        'dev_metrics': dev_metrics, 'test_metrics': test_metrics,
        'feature_names': feature_names, 'cat_indices': cat_indices,
        'smart_cols': smart_cols
    }
    meta_path = os.path.join(output_dir, f"{dataset_type}_cb_precv72_{timestamp}_metadata.json")
    with open(meta_path, 'w') as f:
        json.dump(metadata, f, indent=2)
    print(f"\n✓ Metadata saved: {meta_path}")
    print("="*100)
    return metadata


# ===================== TEST-ONLY 2024 (STREAMING, BIG) =====================

class StreamCatFeaturesBuilder:
    """
    Construye features de TEST en streaming (sin rolling) con:
      • delta / cummax / age_days por serial
      • calendario
      • model & vendor categóricas (strings)
      • normalización por modelo (z_*, log1p_*) usando stats_model + stats_global
    """
    def __init__(self, smart_cols: List[str], stats_model: Dict, stats_global: Dict):
        self.smart_cols = smart_cols
        self.stats_model = stats_model
        self.stats_global = stats_global
        self.last_vals: Dict[str, Dict[str, float]] = {}
        self.cummax_vals: Dict[str, Dict[str, float]] = {}
        self.age: Dict[str, int] = {}

    def _ensure_serial(self, sn: str):
        if sn not in self.last_vals:
            self.last_vals[sn] = {c: 0.0 for c in self.smart_cols}
            self.cummax_vals[sn] = {c: 0.0 for c in self.smart_cols}
            self.age[sn] = 0

    def transform_chunk(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.sort_values(["serial_number", "date"]).reset_index(drop=True)

        # Base types
        df["model"] = df.get("model", "UNK").astype(str).fillna("UNK")
        df["vendor"] = extract_vendor(df["model"]).astype(str).fillna("UNK")
        df["month"] = df["date"].dt.month
        df["day_of_week"] = df["date"].dt.dayofweek

        for c in self.smart_cols:
            df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)

        # delta/cummax/age
        deltas = {f"delta_{c}": [] for c in self.smart_cols}
        cummaxs = {f"max_{c}": [] for c in self.smart_cols}
        ages = []
        serials = df["serial_number"].astype(str).values

        for idx, sn in enumerate(serials):
            self._ensure_serial(sn)
            ages.append(self.age[sn])
            self.age[sn] += 1
            for c in self.smart_cols:
                v = float(df.at[idx, c])
                d = v - self.last_vals[sn][c]
                self.last_vals[sn][c] = v
                self.cummax_vals[sn][c] = max(self.cummax_vals[sn][c], v)
                deltas[f"delta_{c}"].append(d)
                cummaxs[f"max_{c}"].append(self.cummax_vals[sn][c])

        for k, arr in deltas.items():
            df[k] = arr
        for k, arr in cummaxs.items():
            df[k] = arr
        df["age_days"] = ages

        # normalización por modelo (vectorizada por chunk)
        model_series = df["model"].astype(str)
        for c in self.smart_cols:
            med_map = {m: v[c][0] for m, v in self.stats_model.items() if c in v}
            iqr_map = {m: v[c][1] for m, v in self.stats_model.items() if c in v}
            g_med, g_iqr = self.stats_global.get(c, (0.0, 1.0))
            med_s = model_series.map(med_map).fillna(g_med).astype(np.float32)
            iqr_s = model_series.map(iqr_map).fillna(g_iqr).astype(np.float32)
            col = pd.to_numeric(df[c], errors='coerce').fillna(0.0).astype(np.float32)
            z = (col - med_s) / (iqr_s + 1e-6)
            df[f'z_{c}'] = z.values
            df[f'log1p_{c}'] = np.log1p(np.maximum(col.values, 0.0)).astype(np.float32)

        # posibles features
        feat_cols = (
            ["model", "vendor", "capacity_bytes"]
            + self.smart_cols
            + [f"delta_{c}" for c in self.smart_cols]
            + [f"max_{c}" for c in self.smart_cols]
            + [f"z_{c}" for c in self.smart_cols]
            + [f"log1p_{c}" for c in self.smart_cols]
            + ["age_days", "month", "day_of_week"]
        )
        return df[feat_cols]


def evaluate_saved_catboost_2024_streaming(
    model_path: str,
    features_meta_path: str,  # *_features.json (feature_names, thr, cat_indices, stats)
    parquet_path: str,
    test_years: List[int] = [2024],
    lookahead_days: int = 7,
    chunk_limit_groups: int = 0  # 0 = todos los row-groups
):
    """
    Evalúa 2024 en streaming:
      • NO carga todo 2024 a RAM.
      • Labels con fail_map del TEST (lookahead).
      • Features sin rolling (delta, cummax, age_days) + normalización por modelo (stats entrenadas).
    """
    print("="*100)
    print("EVALUATE SAVED CATBOOST — TEST 2024 (STREAMING)")
    print("="*100)

    model = CatBoostClassifier()
    model.load_model(model_path)
    with open(features_meta_path, "r") as f:
        meta = json.load(f)
    feature_names = meta["feature_names"]
    thr = float(meta.get("threshold", 0.5))
    cat_indices = meta.get("cat_indices", [])
    smart_cols = meta.get("smart_cols", [])
    stats_model = meta.get("stats_model", {})
    stats_global = meta.get("stats_global", {})

    base_cols = ['serial_number', 'date', 'failure', 'model', 'capacity_bytes']
    read_cols = base_cols + smart_cols

    fail_map_test = scan_fail_dates(parquet_path, test_years)
    builder = StreamCatFeaturesBuilder(smart_cols=smart_cols, stats_model=stats_model, stats_global=stats_global)

    tn=fp=fn=tp=0
    disk_stat: Dict[str, Tuple[float, int]] = {}
    processed_groups = 0

    for df in _iter_parquet_row_groups(parquet_path, test_years, columns=read_cols):
        if chunk_limit_groups and processed_groups >= chunk_limit_groups:
            break

        # Labels
        sn_ser = df["serial_number"].astype(str)
        dt = pd.to_datetime(df["date"], errors="coerce").to_numpy(dtype="datetime64[D]")
        fdt = pd.to_datetime(sn_ser.map(fail_map_test), errors="coerce").to_numpy(dtype="datetime64[D]")
        dtf = np.full(len(df), 10**9, dtype=np.int64)
        valid = ~np.isnat(fdt)
        if valid.any():
            dd = (fdt[valid] - dt[valid]).astype("timedelta64[D]").astype("int64")
            dtf[valid] = dd
        y_chunk = ((dtf >= 0) & (dtf <= lookahead_days)).astype(np.int8)

        # Features streaming
        X_chunk = builder.transform_chunk(df)

        # Alinear columnas al orden del modelo (completar faltantes)
        for col in feature_names:
            if col not in X_chunk.columns:
                # categóricas por si acaso
                if col in ["model", "vendor"]:
                    X_chunk[col] = "UNK"
                else:
                    X_chunk[col] = 0.0
        X_chunk = X_chunk[feature_names]
        # Pool con cat_features
        pool = Pool(X_chunk, label=y_chunk, cat_features=cat_indices)
        proba = model.predict_proba(pool)[:, 1]
        y_pred = (proba >= thr).astype(np.int8)

        cm = confusion_matrix(y_chunk, y_pred, labels=[0,1])
        tn += int(cm[0,0]); fp += int(cm[0,1]); fn += int(cm[1,0]); tp += int(cm[1,1])

        for s, p, y in zip(sn_ser.values, proba, y_chunk):
            if s not in disk_stat:
                disk_stat[s] = (p, int(y))
            else:
                mp, my = disk_stat[s]
                disk_stat[s] = (max(mp, p), max(my, int(y)))

        processed_groups += 1
        cleanup()

    row_metrics = {
        'precision': float(tp / max(1, tp + fp)),
        'recall': float(tp / max(1, tp + fn)),
        'f1': float((2*tp) / max(1, 2*tp + fp + fn)),
        'confusion_matrix': [[tn, fp], [fn, tp]],
        'fpr': float(fp / max(1, fp + tn))
    }

    y_disk = np.array([v[1] for v in disk_stat.values()], dtype=np.int8)
    yhat_disk = np.array([1 if v[0] >= thr else 0 for v in disk_stat.values()], dtype=np.int8)
    cm_d = confusion_matrix(y_disk, yhat_disk, labels=[0,1])
    tn_d, fp_d, fn_d, tp_d = int(cm_d[0,0]), int(cm_d[0,1]), int(cm_d[1,0]), int(cm_d[1,1])
    disk_metrics = {
        'precision': float(tp_d / max(1, tp_d + fp_d)),
        'recall': float(tp_d / max(1, tp_d + fn_d)),
        'f1': float((2*tp_d) / max(1, 2*tp_d + fp_d + fn_d)),
        'confusion_matrix': [[tn_d, fp_d], [fn_d, tp_d]],
        'n_disks': int(len(disk_stat))
    }

    print("\nRow-level TEST metrics (streaming 2024):")
    print(json.dumps(row_metrics, indent=2))
    print("\nDisk-level TEST metrics (streaming 2024):")
    print(json.dumps(disk_metrics, indent=2))
    print("="*100)
    return {"row": row_metrics, "disk": disk_metrics}


# ===================== WRAPPERS =====================

def train_ssd_precision_v72_catboost():
    # SSD cabe en RAM → rolling ON; test normal; dev normal
    return train_catboost_precision_pipeline_v72(
        train_parquet='./Procesados/finales/SSD_FULL_CLEAN.parquet',
        test_parquet='./Procesados/finales/SSD_FULL_CLEAN.parquet',
        dataset_type='SSD',
        train_years=[2020, 2021, 2022],
        dev_years=[2023],
        test_years=[2024],
        lookahead_days=7,
        n_splits=5,
        neg_pos_ratio=3,
        hard_window=60,
        hard_fraction=0.7,
        cb_depth=8,
        cb_iterations=1500,
        cb_learning_rate=0.06,
        cb_l2_leaf_reg=4.0,
        cb_border_count=64,
        cb_bootstrap_type='Bernoulli',
        cb_subsample=0.8,
        cb_gpu_ram_part=0.70,
        min_precision=0.90,
        min_recall=0.03,
        top_k_rate=1e-4,
        min_alerts=20,
        output_dir='./models_cb_ssd_precv72',
        big_mode=False,
        dev_big_mode=False,
        test_big_mode=False,
        neg_random_keep_rate_train=0.0,
        neg_random_keep_rate_dev=0.0,
        add_rolling=True,
        random_state=42
    )

def train_hdd_precision_v72_catboost_big(neg_random_keep_rate_train: float = 0.0025):
    # HDD enorme → TRAIN big + DEV big + TEST streaming; rolling OFF
    return train_catboost_precision_pipeline_v72(
        train_parquet='./Procesados/finales/HDD_FULL_CLEAN.parquet',
        test_parquet='./Procesados/finales/HDD_FULL_CLEAN.parquet',
        dataset_type='HDD',
        train_years=[2020, 2021, 2022],
        dev_years=[2023],
        test_years=[2024],
        lookahead_days=7,
        n_splits=3,               # menos folds para RAM/tiempo
        neg_pos_ratio=5,
        hard_window=90,
        hard_fraction=0.7,
        cb_depth=6,
        cb_iterations=1000,
        cb_learning_rate=0.06,
        cb_l2_leaf_reg=4.0,
        cb_border_count=32,
        cb_bootstrap_type='Bernoulli',
        cb_subsample=0.7,
        cb_gpu_ram_part=0.60,
        min_precision=0.90,
        min_recall=0.03,
        top_k_rate=7.5e-5,
        min_alerts=30,
        output_dir='./models_cb_hdd_precv72',
        big_mode=True,
        dev_big_mode=True,
        test_big_mode=True,      # no evaluamos test aquí; se hace con la función streaming aparte
        neg_random_keep_rate_train=neg_random_keep_rate_train,
        neg_random_keep_rate_dev=0.01,
        add_rolling=False,
        random_state=42
    )


In [6]:
train_ssd_precision_v7_catboost()

CATBOOST (GPU, PRECISION-CONTROLLED v7.1) - SSD (Train=[2020, 2021, 2022] | Dev=[2023] | Test=[2024])
Loading [2020, 2021, 2022] from ./Procesados/finales/SSD_FULL_CLEAN.parquet...
Loaded 987,010 rows
Loading [2023] from ./Procesados/finales/SSD_FULL_CLEAN.parquet...
Loaded 1,137,101 rows
Loading [2024] from ./Procesados/finales/SSD_FULL_CLEAN.parquet...
Loaded 1,220,745 rows
TRAIN labels: 1,070 pos (0.1084%)
Creating features (temporal join) for SSD with TRAIN∪DEV∪TEST...
  Found 13 SMART attributes (raw)
  Final features: 97 (cat=2, num≈95)

Fold 1/5: train=791,918 (pos=854) | val=195,092 (pos=216)
  After hard-neg sampling: 3,416 (pos=854, neg=2,562)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 2/5: train=786,233 (pos=859) | val=200,777 (pos=211)
  After hard-neg sampling: 3,436 (pos=859, neg=2,577)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 3/5: train=800,508 (pos=855) | val=186,502 (pos=215)
  After hard-neg sampling: 3,420 (pos=855, neg=2,565)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 4/5: train=779,562 (pos=858) | val=207,448 (pos=212)
  After hard-neg sampling: 3,432 (pos=858, neg=2,574)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 5/5: train=789,819 (pos=854) | val=197,191 (pos=216)
  After hard-neg sampling: 3,416 (pos=854, neg=2,562)


Default metric period is 5 because AUC is/are not implemented for GPU



OOF (diag, not used for prod):
{
  "precision": 1.0,
  "recall": 0.0308411214953271,
  "f1": 0.05983680870353581,
  "confusion_matrix": [
    [
      985940,
      0
    ],
    [
      1037,
      33
    ]
  ],
  "fpr": 0.0,
  "pr_auc": 0.3743214108139969,
  "threshold": 0.8701784014701843
}


Default metric period is 5 because AUC is/are not implemented for GPU



DEV calibration metrics (used for PROD threshold):
{
  "precision": 0.0,
  "recall": 0.0,
  "f1": 0.0,
  "confusion_matrix": [
    [
      1136733,
      113
    ],
    [
      255,
      0
    ]
  ],
  "fpr": 9.939780761862204e-05,
  "pr_auc": 0.00039457193290159346,
  "threshold": 0.8728576110992888
}


Default metric period is 5 because AUC is/are not implemented for GPU



✓ Final model saved: ./models_cb_ssd_precv7/SSD_cb_precv7_20251105_204900.cbm

✓ Metadata saved: ./models_cb_ssd_precv7/SSD_cb_precv7_20251105_204900_metadata.json


{'dataset_type': 'SSD',
 'train_years': [2020, 2021, 2022],
 'dev_years': [2023],
 'test_years': [2024],
 'lookahead_days': 7,
 'n_splits': 5,
 'neg_pos_ratio': 3,
 'hard_window': 60,
 'hard_fraction': 0.7,
 'catboost_params': {'depth': 8,
  'iterations': 1500,
  'learning_rate': 0.06,
  'l2_leaf_reg': 4.0,
  'border_count': 64,
  'bootstrap_type': 'Bernoulli',
  'subsample': 0.8,
  'task_type': 'GPU',
  'logging_level': 'Silent'},
 'min_precision': 0.9,
 'min_recall': 0.03,
 'top_k_rate': 0.0001,
 'min_alerts': 20,
 'oof_metrics': {'precision': 1.0,
  'recall': 0.0308411214953271,
  'f1': 0.05983680870353581,
  'confusion_matrix': [[985940, 0], [1037, 33]],
  'fpr': 0.0,
  'pr_auc': 0.3743214108139969,
  'threshold': 0.8701784014701843},
 'dev_metrics': {'precision': 0.0,
  'recall': 0.0,
  'f1': 0.0,
  'confusion_matrix': [[1136733, 113], [255, 0]],
  'fpr': 9.939780761862204e-05,
  'pr_auc': 0.00039457193290159346,
  'threshold': 0.8728576110992888},
 'test_metrics': {'precision': 0

In [2]:
train_hdd_precision_v72_catboost_big()

CATBOOST (GPU, PRECISION-CONTROLLED v7.2) - HDD
Scanning earliest failure dates (streaming)...
  Found 3,725 failed serials
Loading BIG-FILTERED [2020, 2021, 2022] from ./Procesados/finales/HDD_FULL_CLEAN.parquet (keep_rate=0.0025)...
  BIG-FILTERED kept 613,954/125,216,564 rows (~0.49%)
Scanning earliest failure dates (streaming)...
  Found 4,321 failed serials
Loading BIG-FILTERED [2023] from ./Procesados/finales/HDD_FULL_CLEAN.parquet (keep_rate=0.01)...
  BIG-FILTERED kept 1,246,686/90,546,032 rows (~1.38%)
TRAIN labels: 29,043 pos (4.7305%)
Creating features (temporal join) for HDD with TRAIN∪DEV∪TEST...
  Found 5 SMART attributes (raw)
  Final features: 31 (cat=2, num≈29)

Fold 1/3: train=408,717 (pos=19,314) | val=205,237 (pos=9,729)
  After hard-neg sampling: 115,884 (pos=19,314, neg=96,570)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 2/3: train=410,379 (pos=19,398) | val=203,575 (pos=9,645)
  After hard-neg sampling: 116,388 (pos=19,398, neg=96,990)


Default metric period is 5 because AUC is/are not implemented for GPU



Fold 3/3: train=408,812 (pos=19,374) | val=205,142 (pos=9,669)
  After hard-neg sampling: 116,244 (pos=19,374, neg=96,870)


Default metric period is 5 because AUC is/are not implemented for GPU



OOF (diag, not used for prod):
{
  "precision": 0.9988545246277205,
  "recall": 0.030024446510346726,
  "f1": 0.05829656371172617,
  "confusion_matrix": [
    [
      584910,
      1
    ],
    [
      28171,
      872
    ]
  ],
  "fpr": 1.7096618117970084e-06,
  "pr_auc": 0.9128897872007613,
  "threshold": 0.9994227886199951
}


Default metric period is 5 because AUC is/are not implemented for GPU



DEV calibration metrics (used for PROD threshold):
{
  "precision": 0.9726027397260274,
  "recall": 0.03002658289028516,
  "f1": 0.05825470315888179,
  "confusion_matrix": [
    [
      1213554,
      28
    ],
    [
      32110,
      994
    ]
  ],
  "fpr": 2.30721945447444e-05,
  "pr_auc": 0.7992707484853779,
  "threshold": 0.9997780998603982
}


Default metric period is 5 because AUC is/are not implemented for GPU



✓ Final model saved: ./models_cb_hdd_precv72/HDD_cb_precv72_20251106_083714.cbm

✓ Metadata saved: ./models_cb_hdd_precv72/HDD_cb_precv72_20251106_083714_metadata.json


{'dataset_type': 'HDD',
 'train_years': [2020, 2021, 2022],
 'dev_years': [2023],
 'test_years': [2024],
 'lookahead_days': 7,
 'n_splits': 3,
 'neg_pos_ratio': 5,
 'hard_window': 90,
 'hard_fraction': 0.7,
 'catboost_params': {'depth': 6,
  'iterations': 1000,
  'learning_rate': 0.06,
  'l2_leaf_reg': 4.0,
  'border_count': 32,
  'bootstrap_type': 'Bernoulli',
  'subsample': 0.7,
  'task_type': 'GPU',
  'gpu_ram_part': 0.6,
  'logging_level': 'Silent'},
 'min_precision': 0.9,
 'min_recall': 0.03,
 'top_k_rate': 7.5e-05,
 'min_alerts': 30,
 'oof_metrics': {'precision': 0.9988545246277205,
  'recall': 0.030024446510346726,
  'f1': 0.05829656371172617,
  'confusion_matrix': [[584910, 1], [28171, 872]],
  'fpr': 1.7096618117970084e-06,
  'pr_auc': 0.9128897872007613,
  'threshold': 0.9994227886199951},
 'dev_metrics': {'precision': 0.9726027397260274,
  'recall': 0.03002658289028516,
  'f1': 0.05825470315888179,
  'confusion_matrix': [[1213554, 28], [32110, 994]],
  'fpr': 2.3072194544744

# Demás Técnicas

In [5]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CATBOOST — PRECISION CONTROLLED (GPU, v8) — Sólo técnicas compatibles CatBoost
-------------------------------------------------------------------------------
- Categóricas nativas + timestamp (ordered boosting)
- Ingeniería causal y normalización robusta por modelo (mediana/IQR)
- Hard-negative sampling (+ undersampling opcional)
- class_weights (auto o manual) — nunca mezclados con undersampling
- Umbral calibrado en DEV (precision-first con recall mínimo)
- Guardrails VRAM para RTX 2060 (6GB)

Requisitos: pandas, numpy, pyarrow, scikit-learn, imbalanced-learn, catboost
"""

import os, gc, json, warnings
from datetime import datetime
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    precision_recall_curve, average_precision_score, precision_score,
    recall_score, f1_score, confusion_matrix
)

from imblearn.under_sampling import RandomUnderSampler
from catboost import CatBoostClassifier, Pool

warnings.filterwarnings("ignore")

# ===================== Utils =====================

def cleanup():
    """GC agresivo para liberar RAM."""
    gc.collect()


def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)


# ===================== IO =====================

def load_data(path: str, years: List[int]) -> pd.DataFrame:
    """Carga parquet (archivo o carpeta) y filtra por años en 'date'."""
    print(f"Loading years={years} from {path} ...")
    if not path:
        return pd.DataFrame()
    files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith('.parquet')] if os.path.isdir(path) else [path]
    chunks = []
    for f in files:
        pf = pq.ParquetFile(f)
        for rg in range(pf.num_row_groups):
            df = pf.read_row_group(rg).to_pandas()
            if 'date' not in df.columns:
                continue
            df['date'] = pd.to_datetime(df['date'], errors='coerce')
            df = df[df['date'].dt.year.isin(years)]
            if not df.empty:
                chunks.append(df)
            if len(chunks) >= 20:
                chunks = [pd.concat(chunks, ignore_index=True)]
                cleanup()
    out = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
    print(f"Loaded {len(out):,} rows")
    return out


# ===================== Prep =====================

def prepare_df(df: pd.DataFrame) -> pd.DataFrame:
    """Tipa y ordena de forma causal por (serial_number, date)."""
    df = df.copy()
    df['serial_number'] = df['serial_number'].astype(str)
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df = df.dropna(subset=['serial_number', 'date']).sort_values(['serial_number', 'date']).reset_index(drop=True)
    return df


# ===================== Labels (Days-To-Failure) =====================

def compute_days_to_failure(df: pd.DataFrame) -> np.ndarray:
    """Días hasta el primer fallo por disco (>=0 antes del fallo)."""
    fail_map: Dict[str, pd.Timestamp] = {}
    fails = df[df['failure'] == 1]
    for sn, dt in zip(fails['serial_number'], fails['date']):
        fail_map[sn] = min(dt, fail_map.get(sn, dt))
    dtf = np.full(len(df), 1_000_000_000, dtype=np.int64)
    for i, (sn, dt) in enumerate(zip(df['serial_number'], df['date'])):
        if sn in fail_map:
            dtf[i] = (fail_map[sn] - dt).days
    return dtf


def create_labels_from_dtf(dtf: np.ndarray, lookahead: int = 7) -> np.ndarray:
    """Etiqueta positiva si está a [0, lookahead] días del fallo."""
    return ((dtf >= 0) & (dtf <= lookahead)).astype(np.int8)


# ===================== Categóricas =====================

def extract_vendor(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.strip()
    return s.str.extract(r"^([A-Za-z]+)", expand=False).fillna("UNK")


# ===================== Normalización por modelo =====================

def fit_model_stats(df_train: pd.DataFrame, smart_cols: List[str]) -> Dict[str, Dict[str, Tuple[float, float]]]:
    """Mediana e IQR por (model, atributo) aprendidos en TRAIN."""
    stats: Dict[str, Dict[str, Tuple[float, float]]] = {}
    if 'model' not in df_train.columns or df_train.empty:
        return stats
    for m, g in df_train.groupby(df_train['model'].astype(str), sort=False):
        d = {}
        for c in smart_cols:
            x = pd.to_numeric(g[c], errors='coerce')
            x = x[np.isfinite(x)]
            if x.empty:
                continue
            med = float(np.median(x))
            q1, q3 = np.percentile(x, [25, 75])
            iqr = float(max(q3 - q1, 1e-6))
            d[c] = (med, iqr)
        stats[str(m)] = d
    return stats


def apply_model_normalization(df_all: pd.DataFrame, smart_cols: List[str], stats: Dict[str, Dict[str, Tuple[float, float]]]):
    """z-score robusto por modelo + log1p para atributos SMART *_raw."""
    global_median, global_iqr = {}, {}
    for c in smart_cols:
        x = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0).values
        global_median[c] = float(np.median(x))
        q1, q3 = np.percentile(x, [25, 75])
        global_iqr[c] = float(max(q3 - q1, 1e-6))
    model_series = df_all['model'].astype(str)
    for c in smart_cols:
        med_map = {m: v[c][0] for m, v in stats.items() if c in v}
        iqr_map = {m: v[c][1] for m, v in stats.items() if c in v}
        med_s = model_series.map(med_map).fillna(global_median[c]).astype(np.float32)
        iqr_s = model_series.map(iqr_map).fillna(global_iqr[c]).astype(np.float32)
        col = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0).astype(np.float32)
        z = (col - med_s) / (iqr_s + 1e-6)
        df_all[f'z_{c}'] = z.values
        df_all[f'log1p_{c}'] = np.log1p(np.maximum(col.values, 0.0)).astype(np.float32)


# ===================== Features (join temporal) =====================

def create_features_joined_cat(
    df_train: pd.DataFrame,
    df_dev: pd.DataFrame,
    df_test: pd.DataFrame,
    dataset_type: str,
    add_rolling: bool,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, np.ndarray, np.ndarray, np.ndarray, List[str], List[int]]:
    """Une TRAIN∪DEV∪TEST, crea features causales y marca categóricas nativas."""
    print(f"Creating features (temporal join) for {dataset_type} with TRAIN∪DEV∪TEST...")

    df_train = df_train.copy(); df_train['__subset__'] = 'train'
    df_dev   = df_dev.copy();   df_dev['__subset__']   = 'dev'
    df_test  = df_test.copy();  df_test['__subset__']  = 'test'
    df_all = pd.concat([df_train, df_dev, df_test], ignore_index=True)
    df_all.sort_values(['serial_number','date'], inplace=True)

    all_cols = df_all.columns.tolist()
    smart_cols = [c for c in all_cols if ('smart' in c.lower()) and ('_raw' in c.lower())]
    print(f"  Found {len(smart_cols)} SMART attributes (raw)")

    # Limpia numéricas
    for c in smart_cols:
        df_all[c] = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0)

    # Deltas y cummax por disco (causal)
    for c in smart_cols:
        g = df_all.groupby('serial_number', sort=False)[c]
        df_all[f'delta_{c}'] = g.diff().fillna(0.0)
        df_all[f'max_{c}']   = g.cummax()

    # Rolling 7d
    if add_rolling:
        for c in smart_cols:
            r = df_all.groupby('serial_number', sort=False)[c]
            df_all[f'rmean7_{c}'] = r.rolling(window=7, min_periods=2).mean().reset_index(level=0, drop=True).fillna(0.0)
            df_all[f'rstd7_{c}']  = r.rolling(window=7, min_periods=2).std().reset_index(level=0, drop=True).fillna(0.0)

    # Edad y calendario
    df_all['age_days'] = df_all.groupby('serial_number', sort=False).cumcount()
    d = df_all['date']
    df_all['month'] = d.dt.month
    df_all['day_of_week'] = d.dt.dayofweek

    # Categóricas nativas (+ vendor)
    if 'model' not in df_all.columns:
        df_all['model'] = 'UNK'
    df_all['model'] = df_all['model'].astype(str).fillna('UNK')
    df_all['vendor'] = extract_vendor(df_all['model']).astype(str).fillna('UNK')

    # Timestamp (CTR temporal)
    df_all['ts_sec'] = (df_all['date'].astype('int64') // 10**9).astype(np.int64)

    # Normalización por modelo aprendida en TRAIN
    stats = fit_model_stats(df_train, smart_cols)
    apply_model_normalization(df_all, smart_cols, stats)

    # Drop no predictoras
    drop_cols = ['serial_number', 'date', 'failure']
    X_all = df_all.drop(columns=[c for c in drop_cols if c in df_all.columns], errors='ignore')

    # Categóricas nativas
    cat_cols = [c for c in ['model','vendor'] if c in X_all.columns]
    num_cols = [c for c in X_all.columns if c not in cat_cols + ['ts_sec']]
    for c in num_cols:
        X_all[c] = pd.to_numeric(X_all[c], errors='coerce').fillna(0.0).astype(np.float32)

    # Elimina constantes
    var = X_all[num_cols].var()
    keep_num = var[var > 0].index.tolist()
    X_all = pd.concat([X_all[cat_cols], X_all[keep_num], df_all[['ts_sec']]], axis=1)

    feature_names = list(X_all.columns)
    cat_indices = [X_all.columns.get_loc(c) for c in cat_cols]
    ts_idx = X_all.columns.get_loc('ts_sec')

    # Split back
    tr_mask  = (df_all['__subset__']=='train').values
    dev_mask = (df_all['__subset__']=='dev').values
    te_mask  = (df_all['__subset__']=='test').values

    X_train = X_all.loc[tr_mask].reset_index(drop=True)
    X_dev   = X_all.loc[dev_mask].reset_index(drop=True)
    X_test  = X_all.loc[te_mask].reset_index(drop=True)

    ts_train = X_train.iloc[:, ts_idx].values.astype(np.int64)
    ts_dev   = X_dev.iloc[:, ts_idx].values.astype(np.int64)
    ts_test  = X_test.iloc[:, ts_idx].values.astype(np.int64)

    # Quitar ts_sec de features
    X_train = X_train.drop(columns=['ts_sec'])
    X_dev   = X_dev.drop(columns=['ts_sec'])
    X_test  = X_test.drop(columns=['ts_sec'])
    feature_names.remove('ts_sec')

    print(f"  Final features: {X_train.shape[1]} (cat={len(cat_indices)}, num≈{len(keep_num)})")
    return X_train, X_dev, X_test, ts_train, ts_dev, ts_test, feature_names, cat_indices


# ===================== Grouped CV (por disco) =====================

def make_group_folds(serials: pd.Series, y: np.ndarray, n_splits: int = 5, random_state: int = 42):
    """Folds estratificados por disco (cada fold recibe discos con/sin fallo)."""
    serials = serials.astype(str).values
    uniq_serials, inverse = np.unique(serials, return_inverse=True)
    y_disk = np.zeros(len(uniq_serials), dtype=np.int8)
    for i, d_idx in enumerate(inverse):
        if y[i] == 1:
            y_disk[d_idx] = 1
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    for tr_d, va_d in skf.split(uniq_serials, y_disk):
        tr_mask = np.isin(inverse, tr_d)
        va_mask = np.isin(inverse, va_d)
        yield np.where(tr_mask)[0], np.where(va_mask)[0]


# ===================== Hard negatives =====================

def sample_negatives_hard(
    X: pd.DataFrame, y: np.ndarray, dtf: np.ndarray, lookahead: int,
    neg_pos_ratio: int = 4, hard_window: int = 60, hard_fraction: float = 0.7,
    seed: int = 42,
):
    """
    Selecciona negativos cercanos al fallo (dtf ∈ (lookahead, hard_window]) y completa con fáciles.
    Devuelve subset (Xb, yb) + indices (sel_idx) para alinear timestamps.
    """
    rng = np.random.default_rng(seed)
    pos_idx = np.where(y == 1)[0]
    if len(pos_idx) == 0:
        raise ValueError("No positives in training fold for hard-negative sampling")
    n_pos = len(pos_idx)
    n_neg_needed = max(n_pos * neg_pos_ratio, 1)

    hard_mask = (dtf > lookahead) & (dtf <= hard_window)
    hard_idx = np.where((y == 0) & hard_mask)[0]
    easy_idx = np.where((y == 0) & (~hard_mask))[0]

    n_hard = min(int(n_neg_needed * hard_fraction), len(hard_idx))
    n_easy = min(n_neg_needed - n_hard, len(easy_idx))

    chosen_hard = rng.choice(hard_idx, size=n_hard, replace=False) if n_hard > 0 else np.empty(0, dtype=int)
    chosen_easy = rng.choice(easy_idx, size=n_easy, replace=False) if n_easy > 0 else np.empty(0, dtype=int)

    sel_idx = np.sort(np.concatenate([pos_idx, chosen_hard, chosen_easy]))
    Xb = X.iloc[sel_idx].reset_index(drop=True)
    yb = y[sel_idx]
    return Xb, yb, sel_idx


# ===================== Balancing (CatBoost-compatible) =====================

def normalize_balancing_name(name: str) -> str:
    """Normaliza alias de estrategia."""
    n = (name or 'none').strip().lower()
    aliases = {
        'none': 'none', 'no': 'none',
        'under': 'under', 'undersample': 'under',
        'class_auto': 'class_auto', 'auto_class_weights': 'class_auto', 'balanced': 'class_auto',
        'class_manual': 'class_manual', 'class_weight': 'class_manual'
    }
    return aliases.get(n, n)


def undersample_with_timestamps(X: pd.DataFrame, y: np.ndarray, timestamps: Optional[np.ndarray],
                                target_neg_pos: int, max_total: Optional[int], seed: int):
    """UNDER sample preservando timestamps exactos via _rowid_."""
    Xu = X.copy()
    Xu["_rowid_"] = np.arange(len(Xu), dtype=np.int64)
    n_pos = int((y == 1).sum()); n_neg = int((y == 0).sum())
    target_pos = n_pos
    target_neg = min(n_neg, target_pos * max(1, int(target_neg_pos)))
    if max_total:
        per_pos = max(1, max_total // (1 + target_neg_pos))
        target_pos = min(target_pos, per_pos)
        target_neg = min(n_neg, per_pos * target_neg_pos)

    rus = RandomUnderSampler(sampling_strategy={0:int(target_neg), 1:int(target_pos)}, random_state=seed)
    X_res, y_res = rus.fit_resample(Xu, y)
    ts_res = None
    if timestamps is not None:
        rowid = X_res.pop("_rowid_").astype(int).values
        ts_res = timestamps[rowid]
    else:
        X_res = X_res.drop(columns=["_rowid_"], errors='ignore')
    return X_res.reset_index(drop=True), y_res.astype(np.int8), ts_res


# ===================== CatBoost (GPU) =====================

def get_catboost(
    depth: int = 8,
    iterations: int = 1500,
    learning_rate: float = 0.06,
    l2_leaf_reg: float = 4.0,
    border_count: int = 64,
    rsm: float | None = None,          # <- dejamos el arg para compatibilidad, pero NO se pasa al modelo
    random_seed: int = 42,
    gpu_ram_part: float = 0.85,
    bootstrap_type: str = 'Bernoulli',
    subsample: float = 0.8,
    balancing_mode: str = 'none',      # 'none'|'class_auto'|'class_manual'
    class_weight_ratio: float | None = None,
):
    """
    CatBoost GPU (clasificación). NOTA: 'rsm' no está soportado en GPU salvo pairwise -> no se envía.
    """
    params = dict(
        loss_function='Logloss',
        eval_metric='AUC',
        iterations=iterations,
        depth=depth,
        learning_rate=learning_rate,
        l2_leaf_reg=l2_leaf_reg,
        border_count=border_count,
        # rsm   <-- NO AGREGAR AQUÍ EN GPU CLASIFICACIÓN
        random_seed=random_seed,
        task_type='GPU',
        devices='0',
        gpu_ram_part=gpu_ram_part,
        bootstrap_type=bootstrap_type,
        subsample=subsample,
        logging_level='Silent',
        allow_writing_files=False
    )
    if balancing_mode == 'class_auto':
        params['auto_class_weights'] = 'Balanced'
    elif balancing_mode == 'class_manual' and class_weight_ratio is not None:
        params['class_weights'] = [1.0, float(class_weight_ratio)]

    return CatBoostClassifier(**params)


# ===================== Thresholding & Eval =====================

def pick_threshold_precision_first(
    y_true: np.ndarray, proba: np.ndarray,
    min_precision: float = 0.90, min_recall: float = 0.03,
    top_k_rate: float = 1e-4, min_alerts: int = 5
):
    """Elige umbral que cumpla precisión y recall; fallback a top-k."""
    precision, recall, thr = precision_recall_curve(y_true, proba)
    pr_auc = average_precision_score(y_true, proba)

    valid = (precision >= min_precision) & (recall >= min_recall)
    if valid.any():
        idxs = np.where(valid)[0]
        idx = idxs[-1] - 1 if idxs[-1] >= len(thr) else idxs[-1]
        idx = max(0, min(idx, len(thr)-1))
        chosen = thr[idx]
    else:
        k = max(max(1, min_alerts), int(len(proba) * max(top_k_rate, 1e-6)))
        chosen = float(np.partition(proba, -k)[-k])

    return float(chosen), float(pr_auc)


def metrics_at_threshold(y_true: np.ndarray, proba: np.ndarray, thr: float) -> Dict:
    y_pred = (proba >= thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return {
        'precision': float(precision_score(y_true, y_pred, zero_division=0)),
        'recall': float(recall_score(y_true, y_pred, zero_division=0)),
        'f1': float(f1_score(y_true, y_pred, zero_division=0)),
        'confusion_matrix': [[int(tn), int(fp)], [int(fn), int(tp)]],
        'fpr': float(fp / (fp + tn)) if (fp + tn) > 0 else 0.0
    }


# ===================== MAIN PIPELINE =====================

def train_catboost_precision_pipeline(
    train_parquet: str,
    test_parquet: Optional[str],
    dataset_type: str,
    train_years: List[int] = [2020, 2021, 2022, 2023],
    dev_years:   List[int] = [2024],
    test_years:  List[int] = [2025],
    lookahead_days: int = 7,
    n_splits: int = 5,

    # Hard-negatives
    hard_neg_ratio: int = 4,     # neg:pos en subset
    hard_window: int = 60,       # HDD=90
    hard_fraction: float = 0.7,

    # Balancing (sólo compatibles CatBoost)
    balancing: str = 'under',    # 'none'|'under'|'class_auto'|'class_manual'
    balancing_neg_pos_ratio: int = 4,  # para 'under'
    max_under_samples: Optional[int] = None,
    manual_pos_weight: Optional[float] = None,  # n_neg/n_pos si 'class_manual'

    # CatBoost
    cb_depth: int = 8,
    cb_iterations: int = 1500,
    cb_learning_rate: float = 0.06,
    cb_l2_leaf_reg: float = 4.0,
    cb_border_count: int = 64,
    cb_rsm: float = 0.9,
    cb_bootstrap_type: str = 'Bernoulli',
    cb_subsample: float = 0.8,

    # Umbral
    min_precision: float = 0.90,
    min_recall: float = 0.03,
    top_k_rate: float = 1e-4,
    min_alerts: int = 20,

    output_dir: str = './models_cb_prec_v8',
    random_state: int = 42,
):
    print("="*92)
    print(f"CATBOOST (GPU, PRECISION v8) - {dataset_type.upper()}  Train={train_years} | Dev={dev_years} | Test={test_years}")
    balancing = normalize_balancing_name(balancing)
    print(f"Balancing: {balancing}")
    print("="*92)
    ensure_dir(output_dir)

    # --------- Load ---------
    df_tr_raw = load_data(train_parquet, train_years)
    df_dev_raw = load_data(train_parquet, dev_years) if dev_years else pd.DataFrame()
    df_te_raw  = load_data(test_parquet, test_years) if (test_parquet and test_years) else pd.DataFrame()
    if df_tr_raw.empty:
        raise ValueError("Training data is empty!")

    # --------- Prepare ---------
    df_tr  = prepare_df(df_tr_raw)
    df_dev = prepare_df(df_dev_raw) if not df_dev_raw.empty else pd.DataFrame()
    df_te  = prepare_df(df_te_raw) if not df_te_raw.empty else pd.DataFrame()

    # --------- Labels ---------
    dtf_tr = compute_days_to_failure(df_tr)
    y_tr   = create_labels_from_dtf(dtf_tr, lookahead_days)
    print(f"TRAIN labels: pos={int(y_tr.sum()):,} ({100*y_tr.mean():.5f}%)")
    if y_tr.sum() < 50:
        raise ValueError(f"Insufficient positive samples in TRAIN: {y_tr.sum()}")

    dtf_dev = compute_days_to_failure(df_dev) if not df_dev.empty else np.array([], dtype=np.int64)
    y_dev   = create_labels_from_dtf(dtf_dev, lookahead_days) if not df_dev.empty else np.array([], dtype=np.int8)

    # --------- Features ---------
    X_tr, X_dev, X_te, ts_tr, ts_dev, ts_te, feature_names, cat_indices = create_features_joined_cat(
        df_tr, df_dev, df_te, dataset_type, add_rolling=True
    )
    cat_cols = [feature_names[i] for i in cat_indices]

    # --------- CV por disco (OOF diag) ---------
    serials_tr = df_tr['serial_number']
    oof_proba = np.zeros(len(y_tr), dtype=np.float32)
    fold_metrics = []

    for fold, (tr_idx, va_idx) in enumerate(make_group_folds(serials_tr, y_tr, n_splits=n_splits, random_state=random_state), start=1):
        X_tr_fold, y_tr_fold = X_tr.iloc[tr_idx].reset_index(drop=True), y_tr[tr_idx]
        dtf_tr_fold = dtf_tr[tr_idx]
        ts_tr_fold = ts_tr[tr_idx]

        X_va_fold, y_va_fold = X_tr.iloc[va_idx].reset_index(drop=True), y_tr[va_idx]
        ts_va_fold = ts_tr[va_idx]

        print(f"\nFold {fold}/{n_splits}: train={len(y_tr_fold):,} (pos={int(y_tr_fold.sum()):,}) | val={len(y_va_fold):,} (pos={int(y_va_fold.sum()):,})")

        # 1) Hard-negatives (control tamaño, enfoca frontera)
        Xb, yb, sel_idx = sample_negatives_hard(
            X_tr_fold, y_tr_fold, dtf_tr_fold,
            lookahead=lookahead_days,
            neg_pos_ratio=hard_neg_ratio,
            hard_window=hard_window,
            hard_fraction=hard_fraction,
            seed=random_state
        )
        ts_b = ts_tr_fold[sel_idx]
        print(f"  After hard-neg: {len(yb):,} (pos={int(yb.sum()):,}, neg={len(yb)-int(yb.sum()):,})")

        # 2) Balancing (sólo compatibles)
        ts_bal = ts_b
        balancing_mode_for_cb = 'none'
        class_weight_ratio = None

        if balancing == 'under':
            X_bal, y_bal, ts_bal = undersample_with_timestamps(
                Xb, yb, timestamps=ts_b,
                target_neg_pos=balancing_neg_pos_ratio,
                max_total=max_under_samples,
                seed=random_state
            )
        elif balancing == 'class_auto':
            X_bal, y_bal = Xb, yb
            balancing_mode_for_cb = 'class_auto'
        elif balancing == 'class_manual':
            X_bal, y_bal = Xb, yb
            # ratio = n_neg/n_pos sobre el subset corriente
            n_pos = max(1, int((y_bal == 1).sum()))
            n_neg = max(1, int((y_bal == 0).sum()))
            class_weight_ratio = float(n_neg / n_pos) if manual_pos_weight is None else float(manual_pos_weight)
            balancing_mode_for_cb = 'class_manual'
        else:  # 'none'
            X_bal, y_bal = Xb, yb

        print(f"  After balancing [{balancing}]: {len(y_bal):,} (pos={int(y_bal.sum()):,}, neg={len(y_bal)-int(y_bal.sum()):,})")

        # 3) CatBoost
        model = get_catboost(
            depth=cb_depth,
            iterations=cb_iterations,
            learning_rate=cb_learning_rate,
            l2_leaf_reg=cb_l2_leaf_reg,
            border_count=cb_border_count,
            rsm=cb_rsm,
            random_seed=random_state,
            bootstrap_type=cb_bootstrap_type,
            subsample=cb_subsample,
            balancing_mode=balancing_mode_for_cb,
            class_weight_ratio=class_weight_ratio
        )
        train_pool = Pool(X_bal, label=y_bal, cat_features=cat_indices, timestamp=ts_bal)
        valid_pool = Pool(X_va_fold, label=y_va_fold, cat_features=cat_indices, timestamp=ts_va_fold)
        model.fit(train_pool, eval_set=valid_pool, use_best_model=True, early_stopping_rounds=200)

        proba_va = model.predict_proba(valid_pool)[:, 1]
        thr_oof, pr_auc = pick_threshold_precision_first(
            y_va_fold, proba_va, min_precision=min_precision, min_recall=min_recall,
            top_k_rate=top_k_rate, min_alerts=min_alerts
        )
        oof_proba[va_idx] = proba_va
        m = metrics_at_threshold(y_va_fold, proba_va, thr_oof)
        m.update({'pr_auc': float(pr_auc), 'threshold': float(thr_oof), 'fold': int(fold),
                  'best_iteration': int(model.get_best_iteration())})
        fold_metrics.append(m)
        print(f"  Fold {fold} @thr={thr_oof:.4f} | P={m['precision']:.3f} R={m['recall']:.3f} F1={m['f1']:.3f} PR-AUC={pr_auc:.4f}")

        del X_tr_fold, y_tr_fold, X_va_fold, y_va_fold, Xb, yb, X_bal, y_bal, model, train_pool, valid_pool
        cleanup()

    # --------- OOF (diagnóstico) ---------
    thr_oof_global, pr_auc_oof = pick_threshold_precision_first(
        y_tr, oof_proba, min_precision=min_precision, min_recall=min_recall,
        top_k_rate=top_k_rate, min_alerts=min_alerts
    )
    agg = metrics_at_threshold(y_tr, oof_proba, thr_oof_global)
    agg.update({'pr_auc': float(pr_auc_oof), 'threshold': float(thr_oof_global)})
    print("\nOOF (diag):")
    print(json.dumps(agg, indent=2))

    # --------- Calibración en DEV ---------
    thr_prod = float(thr_oof_global)
    dev_metrics = None
    if not df_dev.empty:
        # Repite subset + balancing en TODO TRAIN para modelo final de calibración
        Xb_full, yb_full, sel_idx_full = sample_negatives_hard(
            X_tr, y_tr, dtf_tr, lookahead=lookahead_days,
            neg_pos_ratio=hard_neg_ratio, hard_window=hard_window,
            hard_fraction=hard_fraction, seed=random_state
        )
        ts_full = ts_tr[sel_idx_full]

        ts_bal_full = ts_full
        balancing_mode_for_cb = 'none'
        class_weight_ratio = None

        if balancing == 'under':
            X_bal_full, y_bal_full, ts_bal_full = undersample_with_timestamps(
                Xb_full, yb_full, timestamps=ts_full,
                target_neg_pos=balancing_neg_pos_ratio,
                max_total=max_under_samples,
                seed=random_state
            )
        elif balancing == 'class_auto':
            X_bal_full, y_bal_full = Xb_full, yb_full
            balancing_mode_for_cb = 'class_auto'
        elif balancing == 'class_manual':
            X_bal_full, y_bal_full = Xb_full, yb_full
            n_pos = max(1, int((y_bal_full == 1).sum()))
            n_neg = max(1, int((y_bal_full == 0).sum()))
            class_weight_ratio = float(n_neg / n_pos) if manual_pos_weight is None else float(manual_pos_weight)
            balancing_mode_for_cb = 'class_manual'
        else:
            X_bal_full, y_bal_full = Xb_full, yb_full

        final_model_dev = get_catboost(
            depth=cb_depth, iterations=cb_iterations, learning_rate=cb_learning_rate,
            l2_leaf_reg=cb_l2_leaf_reg, border_count=cb_border_count, rsm=cb_rsm,
            random_seed=random_state, bootstrap_type=cb_bootstrap_type, subsample=cb_subsample,
            balancing_mode=balancing_mode_for_cb, class_weight_ratio=class_weight_ratio
        )
        train_pool_full = Pool(X_bal_full, label=y_bal_full, cat_features=cat_indices, timestamp=ts_bal_full)
        final_model_dev.fit(train_pool_full, use_best_model=False)

        dev_pool = Pool(X_dev, label=y_dev, cat_features=cat_indices, timestamp=ts_dev)
        proba_dev = final_model_dev.predict_proba(dev_pool)[:, 1]

        thr_prod, pr_auc_dev = pick_threshold_precision_first(
            y_dev, proba_dev, min_precision=min_precision, min_recall=min_recall,
            top_k_rate=top_k_rate, min_alerts=min_alerts
        )
        dev_metrics = metrics_at_threshold(y_dev, proba_dev, thr_prod)
        dev_metrics.update({'pr_auc': float(pr_auc_dev), 'threshold': float(thr_prod)})
        print("\nDEV calibration (threshold for PROD):")
        print(json.dumps(dev_metrics, indent=2))

        del final_model_dev, train_pool_full, dev_pool
        cleanup()

    # --------- TEST ---------
    test_metrics = None
    if not df_te.empty:
        Xb_full, yb_full, sel_idx_full = sample_negatives_hard(
            X_tr, y_tr, dtf_tr, lookahead=lookahead_days,
            neg_pos_ratio=hard_neg_ratio, hard_window=hard_window,
            hard_fraction=hard_fraction, seed=random_state
        )
        ts_full = ts_tr[sel_idx_full]

        ts_bal_full = ts_full
        balancing_mode_for_cb = 'none'
        class_weight_ratio = None

        if balancing == 'under':
            X_bal_full, y_bal_full, ts_bal_full = undersample_with_timestamps(
                Xb_full, yb_full, timestamps=ts_full,
                target_neg_pos=balancing_neg_pos_ratio,
                max_total=max_under_samples,
                seed=random_state
            )
        elif balancing == 'class_auto':
            X_bal_full, y_bal_full = Xb_full, yb_full
            balancing_mode_for_cb = 'class_auto'
        elif balancing == 'class_manual':
            X_bal_full, y_bal_full = Xb_full, yb_full
            n_pos = max(1, int((y_bal_full == 1).sum()))
            n_neg = max(1, int((y_bal_full == 0).sum()))
            class_weight_ratio = float(n_neg / n_pos) if manual_pos_weight is None else float(manual_pos_weight)
            balancing_mode_for_cb = 'class_manual'
        else:
            X_bal_full, y_bal_full = Xb_full, yb_full

        final_model = get_catboost(
            depth=cb_depth, iterations=cb_iterations, learning_rate=cb_learning_rate,
            l2_leaf_reg=cb_l2_leaf_reg, border_count=cb_border_count, rsm=cb_rsm,
            random_seed=random_state, bootstrap_type=cb_bootstrap_type, subsample=cb_subsample,
            balancing_mode=balancing_mode_for_cb, class_weight_ratio=class_weight_ratio
        )
        train_pool_full = Pool(X_bal_full, label=y_bal_full, cat_features=cat_indices, timestamp=ts_bal_full)
        final_model.fit(train_pool_full, use_best_model=False)

        y_test = create_labels_from_dtf(compute_days_to_failure(df_te), lookahead_days)
        test_pool = Pool(X_te, label=y_test, cat_features=cat_indices, timestamp=ts_te)
        proba_test = final_model.predict_proba(test_pool)[:, 1]

        thr_used = float(thr_prod)
        test_metrics = metrics_at_threshold(y_test, proba_test, thr_used)
        test_metrics.update({'pr_auc': float(average_precision_score(y_test, proba_test)), 'threshold_used': thr_used})

        stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        prefix = os.path.join(output_dir, f"{dataset_type}_cb_prec_v8_{balancing}_{stamp}")
        final_model.save_model(f"{prefix}.cbm")
        with open(f"{prefix}_features.json", "w") as f:
            json.dump({
                'feature_names': feature_names,
                'cat_indices': cat_indices,
                'threshold': thr_used,
                'calibration': 'dev_years',
                'balancing': balancing
            }, f, indent=2)
        print(f"\n✓ Final model saved: {prefix}.cbm")
        del final_model, train_pool_full, test_pool
        cleanup()

    # --------- Metadata ---------
    meta = {
        'dataset_type': dataset_type,
        'train_years': train_years,
        'dev_years': dev_years,
        'test_years': test_years,
        'lookahead_days': lookahead_days,
        'n_splits': n_splits,
        'hard_negative_sampling': {
            'neg_pos_ratio': hard_neg_ratio,
            'hard_window': hard_window,
            'hard_fraction': hard_fraction
        },
        'balancing': balancing,
        'catboost_params': {
            'depth': cb_depth, 'iterations': cb_iterations, 'learning_rate': cb_learning_rate,
            'l2_leaf_reg': cb_l2_leaf_reg, 'border_count': cb_border_count, 'rsm': cb_rsm,
            'bootstrap_type': cb_bootstrap_type, 'subsample': cb_subsample,
            'task_type': 'GPU', 'logging_level': 'Silent'
        },
        'threshold_objective': {'min_precision': min_precision, 'min_recall': min_recall,
                                'top_k_rate': top_k_rate, 'min_alerts': min_alerts},
        'oof_metrics': {
            'precision': agg['precision'], 'recall': agg['recall'], 'f1': agg['f1'],
            'confusion_matrix': agg['confusion_matrix'], 'fpr': agg['fpr'],
            'pr_auc': agg['pr_auc'], 'threshold': agg['threshold']
        },
        'dev_metrics': dev_metrics,
        'test_metrics': test_metrics,
        'feature_names': feature_names,
        'cat_indices': cat_indices,
        'normalization': 'per-model robust z + log1p on *_raw'
    }
    meta_path = os.path.join(output_dir, f"{dataset_type}_cb_prec_v8_{balancing}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_metadata.json")
    with open(meta_path, 'w') as f:
        json.dump(meta, f, indent=2)
    print(f"\n✓ Metadata saved: {meta_path}")
    print("="*92)
    return meta


# ===================== WRAPPERS =====================

def train_ssd_cb_v8():
    """SSD: muy desbalanceado; empezar con UNDER o CLASS_AUTO, y calibrar en 2024."""
    return train_catboost_precision_pipeline(
        train_parquet='./Procesados/finales/SSD_FULL_CLEAN.parquet',
        test_parquet='./Procesados/finales/SSD_FULL_CLEAN.parquet',
        dataset_type='SSD',
        train_years=[2020, 2021, 2022, 2023],
        dev_years=[2024],
        test_years=[2025],
        lookahead_days=7,
        n_splits=5,

        hard_neg_ratio=3,
        hard_window=60,
        hard_fraction=0.7,

        balancing='under',                # 'under' o 'class_auto'
        balancing_neg_pos_ratio=3,
        max_under_samples=40_000,

        cb_depth=8,
        cb_iterations=1500,
        cb_learning_rate=0.06,
        cb_l2_leaf_reg=4.0,
        cb_border_count=64,
        cb_rsm=0.9,
        cb_bootstrap_type='Bernoulli',
        cb_subsample=0.8,

        min_precision=0.90,
        min_recall=0.03,
        top_k_rate=1e-4,
        min_alerts=20,
        output_dir='./models_cb_ssd_prec_v8',
        random_state=42
    )


def train_hdd_cb_v8():
    """HDD: dataset grande; suele ir mejor UNDER suave o CLASS_AUTO (no ambos)."""
    return train_catboost_precision_pipeline(
        train_parquet='./Procesados/finales/HDD_FULL_CLEAN.parquet',
        test_parquet='./Procesados/finales/HDD_FULL_CLEAN.parquet',
        dataset_type='HDD',
        train_years=[2020, 2021, 2022, 2023],
        dev_years=[2024],
        test_years=[2025],
        lookahead_days=7,
        n_splits=5,

        hard_neg_ratio=5,
        hard_window=90,
        hard_fraction=0.7,

        balancing='under',                # probar 'class_auto' si no haces under
        balancing_neg_pos_ratio=5,
        max_under_samples=60_000,

        cb_depth=8,
        cb_iterations=1800,
        cb_learning_rate=0.06,
        cb_l2_leaf_reg=6.0,
        cb_border_count=128,
        cb_rsm=0.9,
        cb_bootstrap_type='Bernoulli',
        cb_subsample=0.8,

        min_precision=0.90,
        min_recall=0.03,
        top_k_rate=7.5e-5,
        min_alerts=30,
        output_dir='./models_cb_hdd_prec_v8',
        random_state=42
    )


In [6]:
train_ssd_cb_v8()

CATBOOST (GPU, PRECISION v8) - SSD  Train=[2020, 2021, 2022, 2023] | Dev=[2024] | Test=[2025]
Balancing: under
Loading years=[2020, 2021, 2022, 2023] from ./Procesados/finales/SSD_FULL_CLEAN.parquet ...
Loaded 2,124,111 rows
Loading years=[2024] from ./Procesados/finales/SSD_FULL_CLEAN.parquet ...
Loaded 1,220,745 rows
Loading years=[2025] from ./Procesados/finales/SSD_FULL_CLEAN.parquet ...
Loaded 0 rows
TRAIN labels: pos=1,325 (0.06238%)
Creating features (temporal join) for SSD with TRAIN∪DEV∪TEST...
  Found 13 SMART attributes (raw)
  Final features: 97 (cat=2, num≈95)

Fold 1/5: train=1,709,653 (pos=1,079) | val=414,458 (pos=246)
  After hard-neg: 4,316 (pos=1,079, neg=3,237)
  After balancing [under]: 4,316 (pos=1,079, neg=3,237)


Default metric period is 5 because AUC is/are not implemented for GPU


  Fold 1 @thr=0.7843 | P=1.000 R=0.045 F1=0.086 PR-AUC=0.2639

Fold 2/5: train=1,707,701 (pos=1,054) | val=416,410 (pos=271)
  After hard-neg: 4,216 (pos=1,054, neg=3,162)
  After balancing [under]: 4,216 (pos=1,054, neg=3,162)


Default metric period is 5 because AUC is/are not implemented for GPU


  Fold 2 @thr=0.8877 | P=0.900 R=0.033 F1=0.064 PR-AUC=0.2755

Fold 3/5: train=1,682,081 (pos=1,061) | val=442,030 (pos=264)
  After hard-neg: 4,244 (pos=1,061, neg=3,183)
  After balancing [under]: 4,244 (pos=1,061, neg=3,183)


Default metric period is 5 because AUC is/are not implemented for GPU


  Fold 3 @thr=0.7458 | P=1.000 R=0.049 F1=0.094 PR-AUC=0.3433

Fold 4/5: train=1,691,757 (pos=1,061) | val=432,354 (pos=264)
  After hard-neg: 4,244 (pos=1,061, neg=3,183)
  After balancing [under]: 4,244 (pos=1,061, neg=3,183)


Default metric period is 5 because AUC is/are not implemented for GPU


  Fold 4 @thr=0.9003 | P=1.000 R=0.030 F1=0.059 PR-AUC=0.3314

Fold 5/5: train=1,705,252 (pos=1,045) | val=418,859 (pos=280)
  After hard-neg: 4,180 (pos=1,045, neg=3,135)
  After balancing [under]: 4,180 (pos=1,045, neg=3,135)


Default metric period is 5 because AUC is/are not implemented for GPU


  Fold 5 @thr=0.8969 | P=1.000 R=0.032 F1=0.062 PR-AUC=0.2810

OOF (diag):
{
  "precision": 0.975609756097561,
  "recall": 0.03018867924528302,
  "f1": 0.05856515373352855,
  "confusion_matrix": [
    [
      2122785,
      1
    ],
    [
      1285,
      40
    ]
  ],
  "fpr": 4.710790442371487e-07,
  "pr_auc": 0.2520734140833044,
  "threshold": 0.8857017755508423
}


Default metric period is 5 because AUC is/are not implemented for GPU



DEV calibration (threshold for PROD):
{
  "precision": 0.0,
  "recall": 0.0,
  "f1": 0.0,
  "confusion_matrix": [
    [
      1220396,
      122
    ],
    [
      227,
      0
    ]
  ],
  "fpr": 9.995755900363616e-05,
  "pr_auc": 0.0006149018328227931,
  "threshold": 0.9080645798309006
}

✓ Metadata saved: ./models_cb_ssd_prec_v8/SSD_cb_prec_v8_under_20251105_235058_metadata.json


{'dataset_type': 'SSD',
 'train_years': [2020, 2021, 2022, 2023],
 'dev_years': [2024],
 'test_years': [2025],
 'lookahead_days': 7,
 'n_splits': 5,
 'hard_negative_sampling': {'neg_pos_ratio': 3,
  'hard_window': 60,
  'hard_fraction': 0.7},
 'balancing': 'under',
 'catboost_params': {'depth': 8,
  'iterations': 1500,
  'learning_rate': 0.06,
  'l2_leaf_reg': 4.0,
  'border_count': 64,
  'rsm': 0.9,
  'bootstrap_type': 'Bernoulli',
  'subsample': 0.8,
  'task_type': 'GPU',
  'logging_level': 'Silent'},
 'threshold_objective': {'min_precision': 0.9,
  'min_recall': 0.03,
  'top_k_rate': 0.0001,
  'min_alerts': 20},
 'oof_metrics': {'precision': 0.975609756097561,
  'recall': 0.03018867924528302,
  'f1': 0.05856515373352855,
  'confusion_matrix': [[2122785, 1], [1285, 40]],
  'fpr': 4.710790442371487e-07,
  'pr_auc': 0.2520734140833044,
  'threshold': 0.8857017755508423},
 'dev_metrics': {'precision': 0.0,
  'recall': 0.0,
  'f1': 0.0,
  'confusion_matrix': [[1220396, 122], [227, 0]],
 