In [2]:
import os, json, random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import DataLoader, SubsetRandomSampler, Dataset

import optuna
from metrics import metrics_from_preds
from preprocess import load_data

In [3]:
def set_seed(seed: int = 42):
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.use_deterministic_algorithms(True, warn_only=True)
    torch.set_num_threads(1)

In [4]:
set_seed(42)

device = torch.device("cpu")  # para máxima reproducibilidad
print("Usando dispositivo:", device)

Usando dispositivo: cpu


In [5]:
vids_training, labels_training, vids_val, labels_val, vids_test, labels_test = load_data()

In [6]:
class VideoDataset(Dataset):
    def __init__(self, videos, labels):
        self.videos = videos
        self.labels = labels
    def __len__(self):
        return len(self.videos)
    def __getitem__(self, idx):
        x = torch.tensor(np.array(self.videos[idx]), dtype=torch.float32)  # (T, F)
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y

In [7]:
def collate_pad(batch):
    xs, ys = zip(*batch)
    x = torch.stack(xs, dim=0)  # (B, T, F)
    y = torch.stack(ys, dim=0)  # (B,)
    return x, y

In [8]:
set_seed(42)

In [9]:
train_dataset = VideoDataset(vids_training, labels_training)
val_dataset   = VideoDataset(vids_val, labels_val)

g = torch.Generator().manual_seed(42)
perm = torch.randperm(len(train_dataset), generator=g).tolist()
train_sampler = SubsetRandomSampler(perm)

train_loader_seq = DataLoader(
    train_dataset, batch_size=32, shuffle=False, sampler=train_sampler,
    num_workers=0, collate_fn=collate_pad
)
val_loader_seq = DataLoader(
    val_dataset, batch_size=32, shuffle=False,
    num_workers=0, collate_fn=collate_pad
)

In [10]:
class LSTMClassifier(nn.Module):
    """Clasificador secuencial basado en LSTM.
       Entrada:  (B, T, F)
       LSTM:     salida (B, T, hidden)
       Pooling:  último paso temporal
       Salida:   (B, num_classes)
    """
    def __init__(self, input_size=258, hidden_size=128, num_layers=2, dropout=0.1, num_classes=4, bidirectional=False):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0.0,
            batch_first=True,
            bidirectional=bidirectional
        )
        out_dim = hidden_size * (2 if bidirectional else 1)
        self.fc = nn.Linear(out_dim, num_classes)

    def forward(self, x):  # x: (B, T, F)
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # último paso temporal
        return self.fc(out)

In [11]:
def optimize_lstm(train_loader_seq, val_loader_seq, n_trials=10, max_epochs=50, save_prefix="hpo_lstm"):
    set_seed(42)

    sampler = optuna.samplers.TPESampler(seed=42)
    study = optuna.create_study(direction="maximize", sampler=sampler,
                                pruner=optuna.pruners.MedianPruner())
    all_trials = []

    sample_x, _ = next(iter(train_loader_seq))
    input_size = sample_x.shape[-1]

    def objective(trial):
        hidden  = trial.suggest_categorical("hidden_size", [64, 96, 128, 160, 192])
        layers  = trial.suggest_int("num_layers", 1, 4)
        dropout = trial.suggest_float("dropout", 0.0, 0.5)
        bi      = trial.suggest_categorical("bidirectional", [False, True])
        lr      = trial.suggest_float("lr", 5e-5, 5e-3, log=True)
        wd      = trial.suggest_float("weight_decay", 1e-10, 1e-3, log=True)

        model = LSTMClassifier(
            input_size=input_size, hidden_size=hidden, num_layers=layers,
            dropout=dropout, num_classes=4, bidirectional=bi
        ).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)

        best_val = 0.0
        for epoch in range(max_epochs):
            model.train()
            for xb, yb in train_loader_seq:
                xb = xb.to(device); yb = yb.to(device)
                optimizer.zero_grad()
                out = model(xb)
                loss = criterion(out, yb)
                loss.backward()
                optimizer.step()

            model.eval()
            y_true, y_pred = [], []
            with torch.no_grad():
                for xb, yb in val_loader_seq:
                    xb = xb.to(device)
                    preds = model(xb).argmax(dim=1)
                    y_true.extend(yb.numpy())
                    y_pred.extend(preds.cpu().numpy())
            val_acc = metrics_from_preds(y_true, y_pred)["accuracy"]

            if val_acc > best_val:
                best_val = val_acc

            trial.report(val_acc, epoch)
            if trial.should_prune():
                raise optuna.TrialPruned()

        all_trials.append({"trial": trial.number, "params": trial.params, "best_val_acc": best_val})
        return best_val

    study.optimize(objective, n_trials=n_trials)
    with open(f"{save_prefix}_all.json", "w") as f:
        json.dump(all_trials, f, indent=2)
    with open(f"{save_prefix}_best.json", "w") as f:
        json.dump({"best_params": study.best_params, "best_value": study.best_value}, f, indent=2)
    return study

In [12]:
if __name__ == "__main__":
    study_lstm = optimize_lstm(train_loader_seq, val_loader_seq, n_trials=40, max_epochs=50)
    print("Mejor resultado:", study_lstm.best_value)
    print("Mejores hiperparámetros:", study_lstm.best_params)

    # =============================================================================
    # [9] Reentrenar el mejor modelo de forma 100% reproducible
    # =============================================================================
    best = study_lstm.best_params
    set_seed(42)

    sample_x, _ = next(iter(train_loader_seq))
    input_size = sample_x.shape[-1]

    model = LSTMClassifier(
        input_size=input_size,
        hidden_size=best["hidden_size"],
        num_layers=best["num_layers"],
        dropout=best["dropout"],
        num_classes=4,
        bidirectional=best["bidirectional"]
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=best["lr"], weight_decay=best["weight_decay"])

    for epoch in range(50):
        model.train()
        for xb, yb in train_loader_seq:
            xb = xb.to(device); yb = yb.to(device)
            optimizer.zero_grad()
            out = model(xb)
            loss = criterion(out, yb)
            loss.backward()
            optimizer.step()

        model.eval()
        y_true, y_pred = [], []
        with torch.no_grad():
            for xb, yb in val_loader_seq:
                xb = xb.to(device)
                preds = model(xb).argmax(dim=1)
                y_true.extend(yb.numpy())
                y_pred.extend(preds.cpu().numpy())
        val_acc = metrics_from_preds(y_true, y_pred)["accuracy"]
        print(f"Epoch {epoch+1:03d} | Val Acc: {val_acc:.4f}")

[I 2025-11-30 21:59:09,475] A new study created in memory with name: no-name-b4ef4139-9d1e-42ba-ada2-ec0ece6d0f10
[I 2025-11-30 21:59:41,629] Trial 0 finished with value: 0.8611111111111112 and parameters: {'hidden_size': 96, 'num_layers': 1, 'dropout': 0.02904180608409973, 'bidirectional': False, 'lr': 0.0013035123791853842, 'weight_decay': 1.3934502251337584e-10}. Best is trial 0 with value: 0.8611111111111112.
[I 2025-11-30 22:00:19,922] Trial 1 finished with value: 0.85 and parameters: {'hidden_size': 64, 'num_layers': 2, 'dropout': 0.2623782158161189, 'bidirectional': False, 'lr': 0.0008369042894376068, 'weight_decay': 9.472334467618544e-10}. Best is trial 0 with value: 0.8611111111111112.
[I 2025-11-30 22:04:58,746] Trial 2 finished with value: 0.8611111111111112 and parameters: {'hidden_size': 160, 'num_layers': 3, 'dropout': 0.29620728443102123, 'bidirectional': True, 'lr': 0.0001096524277832185, 'weight_decay': 2.853390105240223e-10}. Best is trial 0 with value: 0.861111111111

Mejor resultado: 0.8777777777777778
Mejores hiperparámetros: {'hidden_size': 160, 'num_layers': 1, 'dropout': 0.0027610585618011996, 'bidirectional': False, 'lr': 0.001435437674097734, 'weight_decay': 2.5054885755573557e-05}
Epoch 001 | Val Acc: 0.6833
Epoch 002 | Val Acc: 0.6889
Epoch 003 | Val Acc: 0.7111
Epoch 004 | Val Acc: 0.7444
Epoch 005 | Val Acc: 0.8222
Epoch 006 | Val Acc: 0.7778
Epoch 007 | Val Acc: 0.7778
Epoch 008 | Val Acc: 0.7944
Epoch 009 | Val Acc: 0.6889
Epoch 010 | Val Acc: 0.8056
Epoch 011 | Val Acc: 0.8222
Epoch 012 | Val Acc: 0.7833
Epoch 013 | Val Acc: 0.8111
Epoch 014 | Val Acc: 0.7111
Epoch 015 | Val Acc: 0.8167
Epoch 016 | Val Acc: 0.7889
Epoch 017 | Val Acc: 0.7167
Epoch 018 | Val Acc: 0.8167
Epoch 019 | Val Acc: 0.7889
Epoch 020 | Val Acc: 0.7944
Epoch 021 | Val Acc: 0.8167
Epoch 022 | Val Acc: 0.7778
Epoch 023 | Val Acc: 0.7944
Epoch 024 | Val Acc: 0.7778
Epoch 025 | Val Acc: 0.7944
Epoch 026 | Val Acc: 0.8222
Epoch 027 | Val Acc: 0.7833
Epoch 028 | Val Acc

In [13]:
import json
from metrics import metrics_from_preds
import torch
import torch.nn as nn
import torch.optim as optim

# --- Cargar los mejores hiperparámetros ---
print("Cargando los mejores hiperparámetros desde hpo_lstm_best.json...")
try:
    with open("hpo_lstm_best.json", "r") as f:
        best_hyperparams = json.load(f)["best_params"]
    print("Hiperparámetros cargados:")
    print(best_hyperparams)
except FileNotFoundError:
    print("Error: El archivo 'hpo_lstm_best.json' no fue encontrado.")
    print("Por favor, ejecuta primero la celda de optimización para generar este archivo.")
    raise

# --- Configurar y entrenar el mejor modelo ---
set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Es necesario determinar el número de features de entrada desde los datos
try:
    sample_x, _ = next(iter(train_loader_seq))
    input_size = sample_x.shape[-1]
except NameError:
    print("Error: 'train_loader_seq' no está definido. Asegúrate de haber ejecutado las celdas de preparación de datos.")
    raise

# Instanciar el modelo con los mejores hiperparámetros
best_model = LSTMClassifier(
    input_size=input_size,
    hidden_size=best_hyperparams["hidden_size"],
    num_layers=best_hyperparams["num_layers"],
    dropout=best_hyperparams["dropout"],
    bidirectional=best_hyperparams["bidirectional"],
    num_classes=4
).to(device)

# Definir criterio de pérdida y optimizador
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(
    best_model.parameters(),
    lr=best_hyperparams["lr"],
    weight_decay=best_hyperparams["weight_decay"]
)

# Entrenamiento del modelo
epochs = 50  # Usamos las mismas épocas que en la búsqueda de hiperparámetros
print(f"\nEntrenando el mejor modelo LSTM durante {epochs} épocas...")

for epoch in range(epochs):
    best_model.train()
    total_loss = 0
    for xb, yb in train_loader_seq:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        out = best_model(xb)
        loss = criterion(out, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(train_loader_seq):.4f}")

print("Entrenamiento completado.")

# --- Evaluación del modelo y cálculo de métricas ---
best_model.eval()
all_metrics = {}

def evaluate_seq_model(loader, model_name):
    y_true, y_pred = [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            predictions = best_model(xb).argmax(dim=1)
            y_true.extend(yb.numpy())
            y_pred.extend(predictions.cpu().numpy())
    
    metrics = metrics_from_preds(y_true, y_pred)
    all_metrics[model_name] = metrics

print("\n--- Métricas del Modelo Final (LSTM) ---")
evaluate_seq_model(train_loader_seq, "Train")
evaluate_seq_model(val_loader_seq, "Validation")

for split_name, metrics in all_metrics.items():
    print(f"\nResultados en el conjunto de {split_name}:")
    for metric_name, value in metrics.items():
        if isinstance(value, float):
            print(f"  {metric_name}: {value:.4f}")
        else:
            print(f"  {metric_name}: {value}")

# --- Guardar el modelo entrenado ---
model_save_path = "lstm_best_model.pth"
torch.save(best_model.state_dict(), model_save_path)
print(f"\nModelo guardado exitosamente en: {model_save_path}")


Cargando los mejores hiperparámetros desde hpo_lstm_best.json...
Hiperparámetros cargados:
{'hidden_size': 160, 'num_layers': 1, 'dropout': 0.0027610585618011996, 'bidirectional': False, 'lr': 0.001435437674097734, 'weight_decay': 2.5054885755573557e-05}

Entrenando el mejor modelo LSTM durante 50 épocas...
Epoch [10/50], Loss: 0.4386
Epoch [20/50], Loss: 0.2168
Epoch [30/50], Loss: 0.0894
Epoch [40/50], Loss: 0.0307
Epoch [50/50], Loss: 0.0146
Entrenamiento completado.

--- Métricas del Modelo Final (LSTM) ---

Resultados en el conjunto de Train:
  accuracy: 0.9980
  precision_macro: 0.9980
  recall_macro: 0.9980
  specificity_macro: 0.9993
  f1_macro: 0.9980
  balanced_accuracy: 0.9980
  confusion_matrix: [[254, 0, 0, 1], [0, 254, 1, 0], [0, 0, 255, 0], [0, 0, 0, 255]]

Resultados en el conjunto de Validation:
  accuracy: 0.7833
  precision_macro: 0.8199
  recall_macro: 0.7833
  specificity_macro: 0.9278
  f1_macro: 0.7768
  balanced_accuracy: 0.7833
  confusion_matrix: [[24, 3, 12, 