
# Titanic — Denso para HGB, TE-OOF + GroupKFold

Correções principais:
- `OneHotEncoder(..., sparse_output=False)` para gerar **matriz densa** no caminho OHE.
- No caminho **TE-OOF**, usamos `np.hstack` (denso) em vez de `sparse.hstack`.
- Garantimos **matriz densa** também no treino final antes de `fit` e na predição.


In [9]:

import os
from dataclasses import dataclass
from typing import List, Optional, Dict

import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import StratifiedKFold, GroupKFold
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import f1_score, roc_auc_score

from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression

SEED = 42
np.random.seed(SEED)

DATA_DIR = ""
train_path = os.path.join(DATA_DIR, "train.csv")
test_path  = os.path.join(DATA_DIR, "test.csv")

TARGET = "Survived"


## Funções utilitárias

In [10]:

def optimal_threshold_by_f1(y_true, y_prob):
    thresholds = np.linspace(0.05, 0.95, 181)
    best_thr, best_f1 = 0.5, -1.0
    for t in thresholds:
        f1 = f1_score(y_true, (y_prob >= t).astype(int))
        if f1 > best_f1:
            best_f1, best_thr = f1, t
    return best_thr, best_f1

def youden_j_threshold(y_true, y_prob):
    from sklearn.metrics import roc_curve
    fpr, tpr, thr = roc_curve(y_true, y_prob)
    j = tpr - fpr
    ix = j.argmax()
    return thr[ix], j[ix]


## Feature Engineering

In [11]:

class FeatureEngineering(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        df = X.copy()
        # Title
        df["Title"] = df["Name"].str.extract(r",\s*([^.]+)\.")
        title_map = {
            "Mlle": "Miss", "Ms": "Miss", "Mme": "Mrs",
            "Lady": "Royal", "Countess": "Royal", "Sir": "Royal", "Don": "Royal", "Dona": "Royal", "Jonkheer": "Royal",
            "Capt": "Officer", "Col": "Officer", "Major": "Officer", "Dr": "Officer", "Rev": "Officer",
        }
        df["Title"] = df["Title"].replace(title_map).fillna("Unknown")

        # Família
        df["FamilySize"] = df["SibSp"] + df["Parch"] + 1
        df["IsAlone"] = (df["FamilySize"] == 1).astype(int)

        # Frequência do ticket
        ticket_freq = df["Ticket"].value_counts()
        df["TicketFreq"] = df["Ticket"].map(ticket_freq).astype(int)

        # Deck da cabine
        df["CabinDeck"] = df["Cabin"].astype(str).str[0]
        df.loc[df["Cabin"].isna(), "CabinDeck"] = "U"

        # Interações
        df["Age*Class"] = df["Age"] * df["Pclass"]
        df["FarePerPerson"] = df["Fare"] / df["FamilySize"]

        # FamilyID p/ GroupKFold
        df["Surname"] = df["Name"].str.extract(r"^(.*?),")
        df["FamilyID"] = (df["Surname"].fillna("") + "_" + df["Ticket"].astype(str))
        return df


## Conjuntos de Features

In [12]:

BASE_NUM = ["Age", "SibSp", "Parch", "Fare"]
BASE_CAT = ["Pclass", "Sex", "Embarked"]

FE_NUM_EXTRA = ["FamilySize", "IsAlone", "TicketFreq", "Age*Class", "FarePerPerson"]
FE_CAT_EXTRA = ["Title", "CabinDeck"]

ALL_NUM = BASE_NUM + FE_NUM_EXTRA
ALL_CAT = BASE_CAT + FE_CAT_EXTRA


## Target Encoding OOF (sem vazamento)

In [13]:

class OOFTargetEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, cols: List[str], n_splits: int = 5, smoothing: float = 10.0, use_groups: bool = False, random_state: int = 42):
        self.cols = cols
        self.n_splits = n_splits
        self.smoothing = smoothing
        self.use_groups = use_groups
        self.random_state = random_state
        self.global_mean_ = None
        self.mapping_: Dict[str, Dict] = {}

    def fit(self, X, y=None, **fit_params):
        X = pd.DataFrame(X).copy()
        y = np.asarray(y)
        groups = fit_params.get("groups", None)

        self.global_mean_ = float(np.mean(y))
        self.mapping_.clear()

        # Mapeamento final para uso em transform (usa todo o treino)
        for col in self.cols:
            stats_full = X.groupby(col).size().to_frame("count")
            means_full = X.join(pd.Series(y, index=X.index, name="y")).groupby(col)["y"].mean().to_frame("mean")
            enc_full = stats_full.join(means_full, how="left")
            enc_full["te"] = (enc_full["count"] * enc_full["mean"] + self.smoothing * self.global_mean_) / (enc_full["count"] + self.smoothing)
            self.mapping_[col] = enc_full["te"].to_dict()
        return self

    def transform(self, X):
        df = pd.DataFrame(X).copy()
        for col in self.cols:
            df[f"TE_{col}"] = df[col].map(self.mapping_.get(col, {})).fillna(self.global_mean_)
        return df[[c for c in df.columns if c.startswith("TE_")]]


## Preparação (Imputação + OHE denso)

In [14]:

from sklearn.pipeline import Pipeline as SkPipeline

prep_ohe_dense = SkPipeline(steps=[
    ("fe", FeatureEngineering()),
    ("ct", ColumnTransformer(
        transformers=[
            ("num", SimpleImputer(strategy="median"), ALL_NUM),
            ("cat", SkPipeline(steps=[
                ("imp", SimpleImputer(strategy="most_frequent")),
                ("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),  # denso!
            ]), ALL_CAT),
        ],
        remainder="drop",
    )),
])


## Cross-Validation com OOF e Thresholding 

In [15]:

@dataclass
class CVResult:
    oof_prob: np.ndarray
    oof_pred: np.ndarray
    best_threshold: float
    f1_at_best_thr: float
    auc: float
    folds_scores: List[float]

def cross_validate_with_threshold(model, X, y, cv, groups=None, encoder: str = "ohe") -> CVResult:
    X = X.copy()
    y = np.asarray(y)

    oof_prob = np.zeros(len(X), dtype=float)
    folds_scores = []

    for fold, (tr_idx, val_idx) in enumerate(cv.split(X, y, groups)):
        X_tr, X_val = X.iloc[tr_idx], X.iloc[val_idx]
        y_tr, y_val = y[tr_idx], y[val_idx]

        fe = FeatureEngineering()
        X_tr_fe = fe.fit_transform(X_tr)
        X_val_fe = fe.transform(X_val)

        if encoder == "ohe":
            prep = clone(prep_ohe_dense)
            X_tr_enc = prep.fit_transform(X_tr)   # já denso
            X_val_enc = prep.transform(X_val)
        elif encoder == "te":
            # Imputação antes do TE (mantendo denso)
            for col in ALL_NUM:
                med = X_tr_fe[col].median()
                X_tr_fe[col] = X_tr_fe[col].fillna(med)
                X_val_fe[col] = X_val_fe[col].fillna(med)
            for col in ALL_CAT:
                X_tr_fe[col] = X_tr_fe[col].fillna("UNK")
                X_val_fe[col] = X_val_fe[col].fillna("UNK")

            te = OOFTargetEncoder(cols=ALL_CAT, n_splits=5, smoothing=10.0, use_groups=isinstance(cv, GroupKFold))
            te.fit(X_tr_fe[ALL_CAT], y_tr, groups=(groups[tr_idx] if groups is not None else None))

            X_tr_enc = np.hstack([
                X_tr_fe[ALL_NUM].values,
                te.transform(X_tr_fe[ALL_CAT]).values
            ])
            X_val_enc = np.hstack([
                X_val_fe[ALL_NUM].values,
                te.transform(X_val_fe[ALL_CAT]).values
            ])
        else:
            raise ValueError("encoder must be 'ohe' or 'te'")

        clf = clone(model)
        clf.fit(X_tr_enc, y_tr)
        if hasattr(clf, "predict_proba"):
            prob = clf.predict_proba(X_val_enc)[:, 1]
        else:
            df = clf.decision_function(X_val_enc)
            df = (df - df.min()) / (df.max() - df.min() + 1e-9)
            prob = df

        oof_prob[val_idx] = prob
        f1 = f1_score(y_val, (prob >= 0.5).astype(int))
        folds_scores.append(f1)
        print(f"Fold {fold}: F1@0.5 = {f1:.4f}")

    best_thr, best_f1 = optimal_threshold_by_f1(y, oof_prob)
    auc = roc_auc_score(y, oof_prob)
    oof_pred = (oof_prob >= best_thr).astype(int)

    print(f"AUC (OOF): {auc:.4f}")
    print(f"Best threshold by F1: {best_thr:.3f} | F1@best = {best_f1:.4f}")

    return CVResult(oof_prob=oof_prob, oof_pred=oof_pred, best_threshold=best_thr, f1_at_best_thr=best_f1, auc=auc, folds_scores=folds_scores)


## Exemplo 1 — StratifiedKFold + OHE

In [16]:

train = pd.read_csv(train_path)
test  = pd.read_csv(test_path)

X_full = train.drop(columns=[TARGET])
y_full = train[TARGET].values

cv_strat = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
model = HistGradientBoostingClassifier(random_state=SEED)

print("=== StratifiedKFold + OHE (dense) ===")
res_ohe = cross_validate_with_threshold(model, X_full, y_full, cv=cv_strat, groups=None, encoder="ohe")


=== StratifiedKFold + OHE (dense) ===
Fold 0: F1@0.5 = 0.7879
Fold 1: F1@0.5 = 0.7910
Fold 2: F1@0.5 = 0.7101
Fold 3: F1@0.5 = 0.7556
Fold 4: F1@0.5 = 0.7883
AUC (OOF): 0.8734
Best threshold by F1: 0.470 | F1@best = 0.7681


## Exemplo 2 — GroupKFold (FamilyID) + TE-OOF 

In [17]:

fe_tmp = FeatureEngineering()
train_fe_tmp = fe_tmp.fit_transform(train)
family_groups = train_fe_tmp["FamilyID"].astype("category").cat.codes.values

cv_group = GroupKFold(n_splits=5)
print("\n=== GroupKFold (FamilyID) + TE-OOF (dense) ===")
res_te = cross_validate_with_threshold(model, X_full, y_full, cv=cv_group, groups=family_groups, encoder="te")



=== GroupKFold (FamilyID) + TE-OOF (dense) ===
Fold 0: F1@0.5 = 0.7385
Fold 1: F1@0.5 = 0.8088
Fold 2: F1@0.5 = 0.7424
Fold 3: F1@0.5 = 0.6435
Fold 4: F1@0.5 = 0.7770
AUC (OOF): 0.8525
Best threshold by F1: 0.460 | F1@best = 0.7485



### Notas
- **TE-OOF** tende a ajudar quando há muitas categorias e dados moderados.
- **GroupKFold** evita vazamento entre membros da mesma família.
- **Thresholding** otimiza F1 mas não substitui **calibração**; para produção, considere `CalibratedClassifierCV`.
- Você pode trocar o modelo por `RandomForestClassifier`, `GradientBoostingClassifier` ou `LogisticRegression`.
