# Modelo FT-Transformer 


## Instalación de dependencias e imports

In [None]:
# Dependencias necesarias (ejecutar una vez por sesión de Colab T4)
!pip install -q torch optuna seaborn kaggle category_encoders

In [None]:
import gc
import os
import json
import random
from pathlib import Path

import pandas as pd
import numpy as np

from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple

from category_encoders import TargetEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import VarianceThreshold

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import seaborn as sns

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner

optuna.logging.set_verbosity(optuna.logging.WARNING)
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

In [None]:
# === VARIABLES DE PREPROCESAMIENTO ===
TARGET_COL = "RENDIMIENTO_GLOBAL"

DROP_CANDIDATES = [
    "ID",
    "F_TIENEINTERNET.1",
    "E_PRIVADO_LIBERTAD",
]

HIGH_CARDINALITY = [
    "E_PRGM_ACADEMICO",
    "E_PRGM_DEPARTAMENTO",
]

LOW_CARDINALITY = [
    "F_TIENEINTERNET",
    "F_TIENECOMPUTADOR",
    "F_TIENEAUTOMOVIL",
    "F_TIENELAVADORA",
    "E_PAGOMATRICULAPROPIO",
]

ORDINAL_MAP = {
    "F_ESTRATOVIVIENDA": [
        "Desconocido",
        "Sin Estrato",
        "Estrato 1",
        "Estrato 2",
        "Estrato 3",
        "Estrato 4",
        "Estrato 5",
        "Estrato 6",
    ],
    "E_HORASSEMANATRABAJA": [
        "Desconocido",
        "0",
        "Menos de 10 horas",
        "Entre 11 y 20 horas",
        "Entre 21 y 30 horas",
        "Más de 30 horas",
    ],
    "E_VALORMATRICULAUNIVERSIDAD": [
        "Desconocido",
        "No pagó matrícula",
        "Menos de 500 mil",
        "Entre 500 mil y menos de 1 millón",
        "Entre 1 millón y menos de 2.5 millones",
        "Entre 2.5 millones y menos de 4 millones",
        "Entre 4 millones y menos de 5.5 millones",
        "Entre 5.5 millones y menos de 7 millones",
        "Más de 7 millones",
    ],
    "F_EDUCACIONPADRE": [
        "Desconocido",
        "Ninguno",
        "Primaria incompleta",
        "Primaria completa",
        "Secundaria (Bachillerato) incompleta",
        "Secundaria (Bachillerato) completa",
        "Técnica o tecnológica incompleta",
        "Técnica o tecnológica completa",
        "Educación profesional incompleta",
        "Educación profesional completa",
        "Postgrado",
        "No Aplica",
        "No sabe",
    ],
    "F_EDUCACIONMADRE": [
        "Desconocido",
        "Ninguno",
        "Primaria incompleta",
        "Primaria completa",
        "Secundaria (Bachillerato) incompleta",
        "Secundaria (Bachillerato) completa",
        "Técnica o tecnológica incompleta",
        "Técnica o tecnológica completa",
        "Educación profesional incompleta",
        "Educación profesional completa",
        "Postgrado",
        "No Aplica",
        "No sabe",
    ],
}

NUMERIC_COLUMNS = [
    "PERIODO_ACADEMICO",
    "INDICADOR_1",
    "INDICADOR_2",
    "INDICADOR_3",
    "INDICADOR_4",
]

CLASS_NAMES = ["alto", "medio-alto", "medio-bajo", "bajo"]
CLASS2IDX = {cls: idx for idx, cls in enumerate(CLASS_NAMES)}
IDX2CLASS = {idx: cls for cls, idx in CLASS2IDX.items()}

VAL_SIZE = 0.2
RANDOM_STATE = 42
ARTIFACT_DIR = Path("./artifacts_ft_transformer")
ARTIFACT_DIR.mkdir(exist_ok=True, parents=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device hint: {device} | Artifacts: {ARTIFACT_DIR.resolve()}")

## Creación del Dataset

In [None]:
os.environ["KAGGLE_CONFIG_DIR"] = "."
!kaggle competitions download -c udea-ai-4-eng-20252-pruebas-saber-pro-colombia
!unzip -q udea-ai-4-eng-20252-pruebas-saber-pro-colombia.zip

In [None]:
def load_dataset(path: Path) -> pd.DataFrame:
    return pd.read_csv(path, encoding="latin-1")

## Pipeline de Preprocesamiento

### Clases Helpers

In [None]:
class DataFrameImputer(BaseEstimator, TransformerMixin):
    """Rellenar datos faltantes manteniendo la estructura DataFrame."""

    def __init__(self, fill_value: str = "Desconocido") -> None:
        self.fill_value = fill_value
        self.columns_: Optional[List[str]] = None

    def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> "DataFrameImputer":
        self.columns_ = list(X.columns) if hasattr(X, "columns") else None
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        if self.columns_ is None:
            raise RuntimeError("Se debe llamar fit antes de transform.")
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X, columns=self.columns_)
        return X.fillna(self.fill_value)

    def get_feature_names_out(self, input_features: Optional[List[str]] = None) -> np.ndarray:
        if input_features is not None:
            return np.asarray(input_features, dtype=object)
        if self.columns_ is None:
            raise RuntimeError("Llamar fit antes de solicitar los nombres.")
        return np.asarray(self.columns_, dtype=object)

In [None]:
@dataclass
class FeatureConfig:
    drop: List[str]
    high_card: List[str]
    low_card: List[str]
    ordinal: List[str]
    numeric: List[str]
    ordinal_categories: List[List[str]]

    @classmethod
    def from_dataframe(cls, df: pd.DataFrame) -> "FeatureConfig":
        available_columns = set(df.columns)
        drop = [c for c in DROP_CANDIDATES if c in available_columns]
        usable = available_columns - {TARGET_COL}
        high_card = [c for c in HIGH_CARDINALITY if c in usable]
        low_card = [c for c in LOW_CARDINALITY if c in usable]
        ordinal = [c for c in ORDINAL_MAP if c in usable]
        numeric = [c for c in NUMERIC_COLUMNS if c in usable]
        ordinal_categories = [ORDINAL_MAP[c] for c in ordinal]
        return cls(drop, high_card, low_card, ordinal, numeric, ordinal_categories)

### Construcción

In [None]:
def make_preprocessor(config: FeatureConfig, random_state: int = RANDOM_STATE) -> Pipeline:
    transformers = []

    if config.high_card:
        high_card_pipeline = Pipeline(
            steps=[
                ("imputer", DataFrameImputer(fill_value="Desconocido")),
                (
                    "encoder",
                    TargetEncoder(
                        cols=config.high_card,
                        smoothing=0.5,
                        handle_unknown="value",
                        handle_missing="value",
                    ),
                ),
            ]
        )
        transformers.append(("high_card", high_card_pipeline, config.high_card))

    if config.ordinal:
        ordinal_pipeline = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="constant", fill_value="Desconocido")),
                (
                    "encoder",
                    OrdinalEncoder(
                        categories=config.ordinal_categories,
                        dtype=float,
                        handle_unknown="use_encoded_value",
                        unknown_value=-1,
                    ),
                ),
            ]
        )
        transformers.append(("ordinal", ordinal_pipeline, config.ordinal))

    if config.low_card:
        low_card_pipeline = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="constant", fill_value="Desconocido")),
                ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
            ]
        )
        transformers.append(("one_hot", low_card_pipeline, config.low_card))

    if config.numeric:
        numeric_pipeline = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="median")),
                ("scaler", StandardScaler()),
            ]
        )
        transformers.append(("numeric", numeric_pipeline, config.numeric))

    column_transformer = ColumnTransformer(
        transformers=transformers,
        remainder="drop",
        verbose_feature_names_out=False,
    )

    preprocessing_pipeline = Pipeline(
        steps=[
            ("preprocess", column_transformer),
            ("variance", VarianceThreshold(threshold=1e-5)),
        ]
    )
    return preprocessing_pipeline

### Procesar Datasets

In [None]:
def process_dataset(
    df: pd.DataFrame,
    config: FeatureConfig,
    random_state: int = RANDOM_STATE,
) -> pd.DataFrame:
    X = df.drop(columns=[TARGET_COL] + config.drop, errors="ignore")
    y = df[TARGET_COL]

    preprocessing = make_preprocessor(config, random_state=random_state)
    preprocessing.fit(X, y)

    feature_names = preprocessing.named_steps["preprocess"].get_feature_names_out()
    mask = preprocessing.named_steps["variance"].get_support()
    selected_feature_names = feature_names[mask]
    transformed = preprocessing.transform(X)

    processed_df = pd.DataFrame(transformed, columns=selected_feature_names, index=df.index)
    processed_df[TARGET_COL] = y.values
    return processed_df

In [None]:
train_raw = load_dataset(Path("train.csv"))
train_config = FeatureConfig.from_dataframe(train_raw)
train_df = process_dataset(train_raw, train_config)
print(f"train_df shape: {train_df.shape}")

## Modelo

### Utilidades

In [None]:
def collect_fold_metrics(name: str, fold_scores: List[dict]) -> pd.DataFrame:
    df = pd.DataFrame(fold_scores)
    summary = {
        "model": name,
        "mean_acc": df["accuracy"].mean(),
        "std_acc": df["accuracy"].std(ddof=0),
        "min_acc": df["accuracy"].min(),
        "max_acc": df["accuracy"].max(),
    }
    return df, summary


def describe_class_balance(labels: np.ndarray):
    counts = pd.Series(labels).value_counts().sort_index()
    display(counts.rename(index=IDX2CLASS))

### Partición estratificada

In [None]:
print(f"Shape: {train_df.shape} | Memoria ~{train_df.memory_usage().sum() / 1e6:.1f} MB")

y = train_df[TARGET_COL].map(CLASS2IDX).to_numpy(dtype=np.int64)
X = train_df.drop(columns=[TARGET_COL]).to_numpy(dtype=np.float32)

describe_class_balance(y)
feature_dim = X.shape[1]
print(f"Dimensionalidad final: {feature_dim}")

X_train, X_valid, y_train, y_valid = train_test_split(
    X,
    y,
    test_size=VAL_SIZE,
    random_state=RANDOM_STATE,
    stratify=y,
    shuffle=True,
 )
print(
    f"Split -> train: {X_train.shape[0]} muestras | valid: {X_valid.shape[0]} muestras",
 )

del train_df
_ = gc.collect()

### Configuración del Modelo

In [None]:
ft_config = {
    "d_token": 64,
    "n_blocks": 4,
    "n_heads": 8,
    "dropout": 0.2,
    "ffn_factor": 2.0,
    "batch_size": 512,
    "epochs": 20,
    "lr": 3e-4,
    "weight_decay": 1e-4,
    "warmup_epochs": 2,
    "grad_clip": 1.0,
}

ft_config

### Arquitectura FT-Transformer

In [None]:
class TabularDataset(Dataset):
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        self.features = torch.from_numpy(features.astype(np.float32, copy=False))
        self.labels = torch.from_numpy(labels.astype(np.int64, copy=False))

    def __len__(self) -> int:
        return len(self.features)

    def __getitem__(self, idx: int):
        return self.features[idx], self.labels[idx]


class NumericalFeatureTokenizer(nn.Module):
    def __init__(self, n_features: int, d_token: int) -> None:
        super().__init__()
        self.weight = nn.Parameter(torch.randn(n_features, d_token) * 0.02)
        self.bias = nn.Parameter(torch.zeros(n_features, d_token))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x shape: (batch, features) -> output (batch, features, d_token)
        x = x.unsqueeze(-1)
        return x * self.weight + self.bias


class FTTransformer(nn.Module):
    def __init__(self, n_features: int, n_classes: int, config: Dict[str, float]):
        super().__init__()
        d_token = config["d_token"]
        self.tokenizer = NumericalFeatureTokenizer(n_features, d_token)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, d_token))

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_token,
            nhead=config["n_heads"],
            dim_feedforward=int(d_token * config["ffn_factor"] * 2),
            dropout=config["dropout"],
            batch_first=True,
            activation="gelu",
            norm_first=True,
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=config["n_blocks"])
        self.dropout = nn.Dropout(config["dropout"])
        self.head = nn.Sequential(nn.LayerNorm(d_token), nn.Linear(d_token, n_classes))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        tokens = self.tokenizer(x)
        cls = self.cls_token.expand(tokens.size(0), -1, -1)
        x = torch.cat([cls, tokens], dim=1)
        encoded = self.encoder(self.dropout(x))
        logits = self.head(encoded[:, 0])
        return logits

### Entrenamiento

In [None]:
def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: torch.optim.Optimizer,
    grad_clip: float,
 ) -> Tuple[float, float]:
    model.train()
    total_loss = 0.0
    total_correct = 0
    total = 0

    for xb, yb in loader:
        xb = xb.to(device)
        yb = yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        if grad_clip > 0:
            # Este clip suave evita explosiones en los primeros pasos
            nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
        optimizer.step()

        total_loss += loss.item() * xb.size(0)
        total_correct += (logits.argmax(dim=1) == yb).sum().item()
        total += xb.size(0)

    return total_loss / total, total_correct / total


@torch.no_grad()
def evaluate(model: nn.Module, loader: DataLoader, criterion: nn.Module) -> Tuple[float, float]:
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total = 0

    for xb, yb in loader:
        xb = xb.to(device)
        yb = yb.to(device)
        logits = model(xb)
        loss = criterion(logits, yb)
        total_loss += loss.item() * xb.size(0)
        total_correct += (logits.argmax(dim=1) == yb).sum().item()
        total += xb.size(0)

    return total_loss / total, total_correct / total


def build_scheduler(optimizer: torch.optim.Optimizer, warmup_epochs: int):
    if warmup_epochs <= 0:
        return None

    def lr_lambda(epoch: int):
        # Calentamos el LR para estabilizar transformers pequeños
        return min(1.0, (epoch + 1) / warmup_epochs)

    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lr_lambda)


def train_ft_transformer(
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_valid: np.ndarray,
    y_valid: np.ndarray,
    config: Dict[str, float],
 ) -> Tuple[nn.Module, List[dict]]:
    # Dataloader simple porque todo ya está numerizado
    train_loader = DataLoader(
        TabularDataset(X_train, y_train),
        batch_size=config["batch_size"],
        shuffle=True,
        drop_last=False,
    )
    valid_loader = DataLoader(
        TabularDataset(X_valid, y_valid),
        batch_size=config["batch_size"],
        shuffle=False,
    )

    model = FTTransformer(feature_dim, len(CLASS_NAMES), config).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config["lr"],
        weight_decay=config["weight_decay"],
    )
    scheduler = build_scheduler(optimizer, config.get("warmup_epochs", 0))

    history = []
    best_state = None
    best_acc = 0.0

    for epoch in range(1, config["epochs"] + 1):
        train_loss, train_acc = train_one_epoch(
            model, train_loader, criterion, optimizer, config["grad_clip"]
        )
        val_loss, val_acc = evaluate(model, valid_loader, criterion)
        if scheduler is not None and epoch <= config.get("warmup_epochs", 0):
            scheduler.step()

        if val_acc > best_acc:
            # Guardamos el checkpoint más estable para inferencia
            best_acc = val_acc
            best_state = {k: v.detach().cpu() for k, v in model.state_dict().items()}

        history.append(
            {
                "epoch": epoch,
                "train_loss": train_loss,
                "train_acc": train_acc,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "lr": optimizer.param_groups[0]["lr"],
            }
        )
        # El print rápido ayuda al monitoreo en Colab
        print(
            f"Epoch {epoch:02d} | train_loss={train_loss:.4f} acc={train_acc:.4f} | "
            f"val_loss={val_loss:.4f} acc={val_acc:.4f}",
        )

    if best_state is not None:
        model.load_state_dict(best_state)

    return model, history

### Ejecución del entrenamiento

In [None]:
ft_model, training_history = train_ft_transformer(
    X_train,
    y_train,
    X_valid,
    y_valid,
    ft_config,
 )
history_df = pd.DataFrame(training_history)
history_df

### Evaluación en validación

In [None]:
ft_model.eval()
with torch.no_grad():
    logits = ft_model(torch.from_numpy(X_valid).to(device)).cpu().numpy()

y_pred = logits.argmax(axis=1)
val_acc = accuracy_score(y_valid, y_pred)
print(f"Accuracy validación: {val_acc:.4f}")

conf_mat = confusion_matrix(y_valid, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(
    conf_mat,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=CLASS_NAMES,
    yticklabels=CLASS_NAMES,
 )
plt.xlabel("Predicción")
plt.ylabel("Real")
plt.title("Matriz de confusión FT-Transformer")
plt.tight_layout()
plt.show()

print(classification_report(y_valid, y_pred, target_names=CLASS_NAMES))