# Clasificación Multimodal de Salud de Peces (En desarrollo)

Este notebook cubre:

1. División estratificada en entrenamiento y prueba  
2. Evaluación del modelo (Accuracy, Precision, Recall, F1, Matriz de Confusión, Reporte de Clasificación)  
3. Serialización del mejor checkpoint y función de inferencia  
4. Entrenamiento con EarlyStopping y ReduceLROnPlateau  
5. Búsqueda de hiperparámetros (lr, tamaño de batch, arquitectura MLP)  
6. Despliegue con FastAPI  


In [9]:
import sys
import os
# Agrega la raíz del proyecto al path
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [None]:
# 1. Importaciones de librerías
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from PIL import Image
import numpy as np
import joblib
from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
import uvicorn

### Cargue de datos preprocesados

In [11]:
# --- Carga y preprocesado de datos multimodales ---

# --- Configuración de rutas y nombres de columna ---
sensor_cols    = ['temperatura','pH','conductividad','TDS','DO_mgL']
label_col      = 'etiqueta_kmeans'
preproc_dir    = "../data/images_preproc"
label_csv      = "../data/labels2_kmeans_limpio.csv"
nombre_img_col = 'imagen'   

# 1. Lee el CSV y codifica etiquetas de texto a enteros
df = pd.read_csv(label_csv)
le = LabelEncoder()
df['label_enc'] = le.fit_transform(df[label_col])

In [None]:
# 1. DIVISIÓN ESTRATIFICADA
# 1. DIVISIÓN ESTRATIFICADA
def split_dataset(dataset, 
                  labels, 
                  test_size=0.2, 
                  val_size=0.1, 
                  random_state=42):
    """Divide el dataset en train/val/test estratificado."""
    # — sss1: primer splitter para separar test del resto (train+val)
    sss1 = StratifiedShuffleSplit(
        n_splits=1,              # una única división
        test_size=test_size,     # proporción destinada a test (p. ej. 0.2 → 20%)
        random_state=random_state
    )
    # next(...) devuelve los índices para train+val y para test
    #   .split(X, y) necesita un X cualquiera y las etiquetas y;
    #   aquí X es sólo un array de ceros con la longitud de labels
    idx_train_val, idx_test = next(
        sss1.split(
            np.zeros(len(labels)),  # dummy array, sólo importan las etiquetas
            labels                  # etiquetas originales para stratify
        )
    )

    # Extraemos las etiquetas de train+val para el siguiente split
    labels_train_val = labels[idx_train_val]

    # — sss2: segundo splitter para separar train y val dentro del resto
    #    La proporción de validación relativa es val_size/(1-test_size)
    sss2 = StratifiedShuffleSplit(
        n_splits=1,
        test_size=val_size/(1-test_size),
        random_state=random_state
    )
    # next(...) ahora divide indices de train_val en train vs val
    idx_train, idx_val = next(
        sss2.split(
            np.zeros(len(labels_train_val)),  # de nuevo, dummy X
            labels_train_val                  # etiquetas de train+val
        )
    )

    # Mapear de vuelta a los índices originales del dataset
    train_idx = idx_train_val[idx_train]
    val_idx   = idx_train_val[idx_val]

    # Devolver tres Subsets de PyTorch: train, val y test
    return Subset(dataset, train_idx), \
           Subset(dataset, val_idx), \
           Subset(dataset, idx_test)


In [None]:
# Ejemplo de uso:
# train_ds, val_ds, test_ds = split_dataset(full_dataset, full_labels)

In [None]:
# 2. DATALOADERS
batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)

In [None]:
# 3. MODELO: definimos MLP multimodal ejemplo
class MLPMultimodal(nn.Module):
    def __init__(self, input_img_dim, input_sensor_dim, n_classes):
        super().__init__()
        # encoder de imagen (CNN sencilla)
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten()
        )
        # calculamos tamaño de salida de la CNN
        dummy = torch.zeros(1,3,*input_img_dim)
        img_feat_dim = self.cnn(dummy).shape[1]
        # encoder de sensores (MLP)
        self.mlp_sens = nn.Sequential(
            nn.Linear(input_sensor_dim, 32), nn.ReLU(),
            nn.Linear(32, 32), nn.ReLU()
        )
        # capa final combinada
        self.classifier = nn.Sequential(
            nn.Linear(img_feat_dim + 32, 128), nn.ReLU(),
            nn.Linear(128, n_classes)
        )

    def forward(self, img, sens):
        f1 = self.cnn(img)
        f2 = self.mlp_sens(sens)
        x  = torch.cat([f1, f2], dim=1)
        return self.classifier(x)

In [None]:
# 4. ENTRENAMIENTO CON EarlyStopping y ReduceLROnPlateau
class EarlyStopping:
    """Detiene el entrenamiento si la métrica de validación no mejora tras 'patience' épocas."""
    def __init__(self, patience=5, min_delta=1e-4):
        self.patience  = patience
        self.min_delta = min_delta
        self.best_loss = np.inf
        self.counter   = 0
        self.should_stop = False

    def step(self, val_loss):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter   = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True

def train_model(model, train_loader, val_loader, device):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)
    earlystop = EarlyStopping(patience=7)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(1, 101):
        # --- entrenamiento ---
        model.train()
        for imgs, sens, labels in train_loader:
            imgs, sens, labels = imgs.to(device), sens.to(device), labels.to(device)
            optimizer.zero_grad()
            out = model(imgs, sens)
            loss = criterion(out, labels)
            loss.backward()
            optimizer.step()

        # --- validación ---
        model.eval()
        val_losses = []
        with torch.no_grad():
            for imgs, sens, labels in val_loader:
                imgs, sens, labels = imgs.to(device), sens.to(device), labels.to(device)
                out = model(imgs, sens)
                val_losses.append(criterion(out, labels).item())
        val_loss = np.mean(val_losses)
        print(f"Época {epoch}: val_loss = {val_loss:.4f}")

        scheduler.step(val_loss)
        earlystop.step(val_loss)
        if earlystop.should_stop:
            print("EarlyStopping: se detiene entrenamiento.")
            break

    # guardamos mejor checkpoint
    torch.save(model.state_dict(), "best_model.pt")
    return model

In [None]:
# 5. EVALUACIÓN EN TEST
def evaluate_model(model, test_loader, device):
    model.to(device).eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for imgs, sens, labels in test_loader:
            imgs, sens  = imgs.to(device), sens.to(device)
            out = model(imgs, sens)
            preds = out.argmax(dim=1).cpu().numpy()
            y_pred.extend(preds)
            y_true.extend(labels.numpy())
    # Métricas
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    cm = confusion_matrix(y_true, y_pred)
    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {prec:.4f}, Recall: {rec:.4f}, F1: {f1:.4f}")
    print("Matriz de confusión:\n", cm)
    print("Reporte de clasificación:\n", classification_report(y_true, y_pred))


In [None]:
# 6. FUNCIÓN DE INFERENCIA
class Inferencia:
    def __init__(self, model_path, input_img_dim, input_sensor_dim, n_classes, device):
        self.device = device
        self.model  = MLPMultimodal(input_img_dim, input_sensor_dim, n_classes)
        self.model.load_state_dict(torch.load(model_path, map_location=device))
        self.model.to(device).eval()
        # transformaciones para la imagen
        self.tx = transforms.Compose([
            transforms.Resize(input_img_dim),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406],
                                 std =[0.229,0.224,0.225])
        ])

    def __call__(self, img_bytes, sens_array):
        # procesar imagen
        img = Image.open(img_bytes).convert("RGB")
        img = self.tx(img).unsqueeze(0).to(self.device)
        # procesar sensores
        sens = torch.tensor(sens_array, dtype=torch.float32).unsqueeze(0).to(self.device)
        # inferencia
        with torch.no_grad():
            out = self.model(img, sens)
            probs = torch.softmax(out, dim=1).cpu().numpy()[0]
            cls   = int(probs.argmax())
        return {"clase": cls, "probabilidades": probs.tolist()}

In [None]:
# 7. BÚSQUEDA DE HIPERPARÁMETROS (ejemplo sencillo)
def hyperparameter_search(param_grid, dataset, labels, device):
    mejores = {}
    for lr in param_grid["lr"]:
        for bs in param_grid["batch_size"]:
            for hid in param_grid["hidden"]:
                # redefinir dataset y loaders
                train_ds, val_ds, _ = split_dataset(dataset, labels)
                tl = DataLoader(train_ds, batch_size=bs, shuffle=True)
                vl = DataLoader(val_ds,   batch_size=bs, shuffle=False)
                # crear modelo con hidden=h
                model = MLPMultimodal(input_img_dim=(64,64),
                                      input_sensor_dim=labels.shape[1],
                                      n_classes=3)
                optimizer = optim.Adam(model.parameters(), lr=lr)
                # corto entrenamiento por N épocas o con early stop
                # ...
                # evaluar en val, guardar mejores
                val_acc = ...  # tu lógica de evaluación
                llave = (lr, bs, hid)
                mejores[llave] = val_acc
    return max(mejores, key=mejores.get)

In [None]:
# 8. DESPLIEGUE CON FastAPI
app = FastAPI()
class Sensors(BaseModel):
    datos: list  # lista de floats

# instancia global de inferencia
infer = Inferencia("best_model.pt", (64,64), input_sensor_dim=10, n_classes=3, device="cpu")

@app.post("/predict/")
async def predict(file: UploadFile = File(...), sensors: Sensors = None):
    resultado = infer(file.file, sensors.datos)
    return resultado

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)