In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
from sklearn.metrics import classification_report

from pathlib import Path
import os
import sys

from class_imbalance import ImbalanceHandler

# Configurar semilla para reproducibilidad
torch.manual_seed(42)
np.random.seed(42)

# ==========================================
# 1. CARGA Y PREPROCESAMIENTO
# ==========================================

# Cargar datos
notebook_path = os.getcwd()
file_path = Path(notebook_path) / "../dataset/robot_dataset.csv"
df = pd.read_csv(file_path)
df = df.dropna()

In [3]:
# Definir Target: 1 si hay grip_lost O ProtectiveStop
df['target'] = ((df['grip_lost'] == True) | (df['Robot_ProtectiveStop'] == True)).astype(int)

# Seleccionar Features
feature_cols = [c for c in df.columns if c not in ['Num', 'Timestamp', 'cycle', 'grip_lost', 'Robot_ProtectiveStop', 'target']]
X = df[feature_cols].values
y = df['target'].values

# Split Train/Test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Escalar Datos (Muy importante para redes neuronales)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Convertir a Tensores de PyTorch
X_train_t = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1) # Shape (N, 1)
X_test_t = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# Modelo Base Simple (Perceptrón Multicapa)
class SimpleNN(nn.Module):
    def __init__(self, input_dim):
        super(SimpleNN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 64)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.layer2 = nn.Linear(64, 32)
        self.layer3 = nn.Linear(32, 1) # Salida lineal (Logits)
        
    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.dropout(x)
        x = self.relu(self.layer2(x))
        x = self.layer3(x) # No aplicamos Sigmoid aquí, lo hará la función de pérdida
        return x

# Función auxiliar de entrenamiento
def train_torch_model(model, criterion, optimizer, X, y, epochs=20):
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(X)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
    return model

# Función auxiliar de evaluación
def evaluate_torch_model(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        logits = model(X_test)
        probs = torch.sigmoid(logits) # Aplicamos Sigmoid para obtener probabilidad 0-1
        preds = (probs > 0.5).float()
    print(classification_report(y_test, preds))

input_dim = X_train.shape[1]

# ==========================================
# ESTRATEGIA 1: RESAMPLING (Oversampling)
# ==========================================
print("\n--- Estrategia 1: Resampling (Upsampling) ---")

## aqui llamamos a class_imbalance.py
imbalance_handler = ImbalanceHandler()
X_train_res, y_train_res = imbalance_handler.smote_data(X_train_scaled, y_train)


--- Estrategia 1: Resampling (Upsampling) ---


In [None]:

model_res = SimpleNN(input_dim)
# Usamos pérdida estándar porque los datos ya están balanceados artificialmente
criterion = nn.BCEWithLogitsLoss() 
optimizer = optim.Adam(model_res.parameters(), lr=0.001)

train_torch_model(model_res, criterion, optimizer, X_train_res, y_train_res)
evaluate_torch_model(model_res, X_test_t, y_test_t)


# ==========================================
# ESTRATEGIA 2: CLASS-WEIGHTED LOSS
# ==========================================
print("\n--- Estrategia 2: Class Weighted Loss ---")

# 1. Calcular el peso: (Num Negativos / Num Positivos)
num_neg = (y_train == 0).sum()
num_pos = (y_train == 1).sum()
pos_weight_val = num_neg / num_pos
pos_weight_tensor = torch.tensor([pos_weight_val], dtype=torch.float32)

print(f"Peso calculado para clase positiva: {pos_weight_val:.2f}")

model_cw = SimpleNN(input_dim)
# 2. Pasar el peso a la función de pérdida
criterion_cw = nn.BCEWithLogitsLoss(pos_weight=pos_weight_tensor)
optimizer_cw = optim.Adam(model_cw.parameters(), lr=0.001)

# Entrenamos con los datos ORIGINALES (desbalanceados), el peso hace el trabajo
train_torch_model(model_cw, criterion_cw, optimizer_cw, X_train_t, y_train_t)
evaluate_torch_model(model_cw, X_test_t, y_test_t)


# ==========================================
# ESTRATEGIA 3: FOCAL LOSS
# ==========================================
print("\n--- Estrategia 3: Focal Loss ---")

# Definición de la clase Focal Loss en PyTorch
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        # inputs: Logits (salida directa de la capa lineal)
        # targets: Etiquetas verdaderas (0 o 1)
        
        # Calcular Binary Cross Entropy (sin reducción para poder operar)
        bce_loss = nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        
        # Calcular p_t (probabilidad de la clase verdadera)
        # pt = exp(-BCE) es un truco numérico estable porque BCE = -log(pt)
        pt = torch.exp(-bce_loss)
        
        # Fórmula Focal Loss: alpha * (1-pt)^gamma * BCE
        focal_loss = self.alpha * (1-pt)**self.gamma * bce_loss
        
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

model_fl = SimpleNN(input_dim)
# alpha: Factor de balanceo. Si la clase 1 es minoritaria, usa un alpha alto (ej. 0.75 o 0.9)
criterion_fl = FocalLoss(alpha=0.75, gamma=2.0)
optimizer_fl = optim.Adam(model_fl.parameters(), lr=0.001)

train_torch_model(model_fl, criterion_fl, optimizer_fl, X_train_t, y_train_t)
evaluate_torch_model(model_fl, X_test_t, y_test_t)