In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import torch
import torch.nn as nn
from timm.models import create_model
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset, Subset
from torchvision import transforms

from tqdm import tqdm
from sklearn.metrics import roc_curve, auc, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
from PIL import Image
import random
import matplotlib.pyplot as plt
import numpy as np
import sys
import json

### Modelos

In [3]:
# Modelos de detección de anomalías con Swin Transformer
# Modelo supervisado (clasificación normal vs. anómalo)
class SwinTransformerAnomalyDetector(nn.Module):
    def __init__(self, pretrained=True, num_classes=1):
        super(SwinTransformerAnomalyDetector, self).__init__()
        # Cargar modelo Swin Transformer preentrenado
        self.swin = create_model(
            'swin_tiny_patch4_window7_224',
            pretrained=pretrained,
            num_classes=0  # Sin cabeza de clasificación
        )

        # Agregar cabeza de detección de anomalías
        self.anomaly_head = nn.Sequential(
            nn.Linear(768, 256),  # 768 es la dimensión de salida del Swin-T
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, num_classes),
            nn.Sigmoid()
        )

    def forward(self, x):
        features = self.swin(x)
        return self.anomaly_head(features)

In [4]:
# Modelo no supervisado (autoencoder para reconstrucción)
class SwinTransformerAutoencoder(nn.Module):
    def __init__(self, pretrained=True):
        super(SwinTransformerAutoencoder, self).__init__()
        # Codificador: Swin Transformer preentrenado
        self.encoder = create_model(
            'swin_tiny_patch4_window7_224',
            pretrained=pretrained,
            num_classes=0
        )

        # Decodificador para reconstruir la imagen
        self.decoder = nn.Sequential(
            nn.Linear(768, 1024),
            nn.ReLU(),
            nn.Linear(1024, 224*224*3),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Obtener representación latente
        latent = self.encoder(x)
        # Reconstruir la imagen
        batch_size = x.size(0)
        reconstructed = self.decoder(latent).view(batch_size, 3, 224, 224)
        return reconstructed, latent

    def detect_anomaly(self, x):
        reconstructed, _ = self.forward(x)
        # Calcular error de reconstrucción como MSE
        reconstruction_error = torch.mean((x - reconstructed) ** 2, dim=[1, 2, 3])
        return reconstruction_error  # Mayor error = mayor probabilidad de anomalía


### Funciones y clases auxiliares


In [5]:
# Dataset y DataLoader para MVTec AD
class MVTecDataset(Dataset):
    def __init__(self, root_path, category, is_train=True, transform=None, mask_transform=None):
        """
        Args:
            root_path: Ruta al directorio raíz de MVTec AD
            category: Categoría de objetos ('bottle', 'cable', 'carpet', etc.)
            is_train: Si es True, carga imágenes de entrenamiento (normales)
                      Si es False, carga imágenes de prueba (normales y anómalas)
            transform: Transformaciones opcionales a aplicar a las imágenes
            mask_transform: Transformaciones opcionales a aplicar a las máscaras
        """
        self.root_path = root_path
        self.category = category
        self.is_train = is_train
        self.transform = transform
        self.mask_transform = mask_transform

        # Definir directorios
        if self.is_train:
            self.image_dir = os.path.join(root_path, category, 'train', 'good')
            self.image_paths = [os.path.join(self.image_dir, f) for f in os.listdir(self.image_dir)
                               if f.endswith('.png')]
            self.labels = np.zeros(len(self.image_paths), dtype=np.float32)  # 0 = normal
            self.mask_paths = None

        else:  # Test set
            self.image_dir = os.path.join(root_path, category, 'test')
            self.image_paths = []
            self.labels = []
            self.mask_paths = []

            # Imágenes normales (buenas)
            good_dir = os.path.join(self.image_dir, 'good')
            if os.path.exists(good_dir):
                good_images = [os.path.join(good_dir, f) for f in os.listdir(good_dir)
                              if f.endswith('.png')]
                self.image_paths.extend(good_images)
                self.labels.extend([0] * len(good_images))  # 0 = normal
                self.mask_paths.extend([None] * len(good_images))

            # Imágenes anómalas (con defectos)
            defect_types = [d for d in os.listdir(self.image_dir)
                           if os.path.isdir(os.path.join(self.image_dir, d)) and d != 'good']

            for defect in defect_types:
                defect_dir = os.path.join(self.image_dir, defect)
                defect_images = [os.path.join(defect_dir, f) for f in os.listdir(defect_dir)
                                if f.endswith('.png')]
                self.image_paths.extend(defect_images)
                self.labels.extend([1] * len(defect_images))  # 1 = anomalía

                # Añadir máscaras de ground truth (si existen)
                gt_dir = os.path.join(root_path, category, 'ground_truth', defect)
                if os.path.exists(gt_dir):
                    for img_path in defect_images:
                        img_name = os.path.basename(img_path)
                        mask_name = img_name.replace('.png', '_mask.png')
                        mask_path = os.path.join(gt_dir, mask_name)
                        if os.path.exists(mask_path):
                            self.mask_paths.append(mask_path)
                        else:
                            self.mask_paths.append(None)
                else:
                    self.mask_paths.extend([None] * len(defect_images))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Cargar imagen
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB') # Convertir a RGB
        label = self.labels[idx]

        # Cargar máscara si existe (solo para test y anomalías)
        mask = None
        if not self.is_train and self.mask_paths[idx] is not None: # Si es test y hay máscara
            mask_path = self.mask_paths[idx]
            mask = Image.open(mask_path).convert('L') # Convertir a escala de grises
            if self.mask_transform:
                mask = self.mask_transform(mask) # Aplicar transformaciones a la máscara
            elif self.transform:
                mask = transforms.Compose([
                    transforms.Resize((224, 224)),
                    transforms.ToTensor(),
                ])(mask) # Aplicar transformaciones por defecto a la máscara
        else:
            # Crear una máscara vacía si no existe
            mask = torch.zeros((1, 224, 224))

        # Aplicar transformaciones a la imagen
        if self.transform:
            image = self.transform(image)

        # Siempre devolver tres elementos
        return image, label, mask

In [6]:
def split_test_datasets(test_datasets, categories, test_size=0.5, random_state=42):
    """
    Divide cada dataset de test en validation y test final

    Args:
        test_datasets: Lista de datasets de test
        categories: Lista de nombres de categorías
        test_size: Proporción para test final (0.5 = 50% val, 50% test)
        random_state: Semilla para reproducibilidad

    Returns:
        val_datasets, final_test_datasets: Listas de datasets divididos
    """
    val_datasets = []
    final_test_datasets = []

    print("\n" + "="*60)
    print("DIVISIÓN DE DATASETS DE TEST")
    print("="*60)

    for i, test_dataset in enumerate(test_datasets):
        # Obtener índices del dataset
        indices = list(range(len(test_dataset)))

        # Dividir índices manteniendo la proporción de anomalías
        labels = [test_dataset.labels[j] for j in indices]

        # Contar normales y anomalías
        normal_count = sum(1 for l in labels if l == 0)
        anomaly_count = sum(1 for l in labels if l == 1)

        try:
            val_indices, test_indices = train_test_split(
                indices,
                test_size=test_size,
                stratify=labels,  # Mantener proporción de normales/anómalas
                random_state=random_state
            )
        except ValueError:
            # Si no se puede estratificar (ej: solo una clase), hacer split simple
            val_indices, test_indices = train_test_split(
                indices,
                test_size=test_size,
                random_state=random_state
            )

        # Crear subsets
        val_subset = Subset(test_dataset, val_indices)
        test_subset = Subset(test_dataset, test_indices)

        val_datasets.append(val_subset)
        final_test_datasets.append(test_subset)

        print(f"{categories[i]:12} | Total: {len(indices):3} | "
              f"Normal: {normal_count:3} | Anomalía: {anomaly_count:3} | "
              f"Val: {len(val_subset):3} | Test: {len(test_subset):3}")

    print("="*60)
    return val_datasets, final_test_datasets

### Funciones de entrenamiento

In [7]:
# Entrenamiento del modelo supervisado
def train_supervised_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, output_dir):
    """Entrena un modelo supervisado de detección de anomalías"""
    os.makedirs(output_dir, exist_ok=True)
    best_auc = 0.0

    for epoch in range(num_epochs):
        # Entrenamiento
        model.train()
        train_loss = 0.0

        for images, labels, _ in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} (train)"):
            images = images.to(device)
            labels = labels.to(device).float().view(-1, 1)

            # Forward
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Validación
        model.eval()
        val_loss = 0.0
        all_scores = []
        all_labels = []

        with torch.no_grad():
            for images, labels, _ in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} (val)"):
                images = images.to(device)
                labels = labels.to(device).float().view(-1, 1)

                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                all_scores.extend(outputs.cpu().numpy().flatten())
                all_labels.extend(labels.cpu().numpy().flatten())

        # Calcular ROC AUC
        fpr, tpr, _ = roc_curve(all_labels, all_scores)
        roc_auc = auc(fpr, tpr)

        # Guardar el mejor modelo
        if roc_auc > best_auc:
            best_auc = roc_auc
            torch.save(model.state_dict(), os.path.join(output_dir, 'mejor_modelo_supervisado.pth'))
            print(f"     Nuevo mejor modelo guardado! AUC: {roc_auc:.4f}")

        print(f"Epoch {epoch+1}/{num_epochs}, "
              f"Train Loss: {train_loss/len(train_loader):.4f}, "
              f"Val Loss: {val_loss/len(val_loader):.4f}, "
              f"ROC AUC: {roc_auc:.4f}")

        scheduler.step()

    return model

In [8]:
def train_autoencoder_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, output_dir):
    """Entrena un modelo de autoencoder para detección de anomalías no supervisada"""
    os.makedirs(output_dir, exist_ok=True)
    best_auc = 0.0
    best_val_loss = float('inf')  # Para casos donde no se puede calcular AUC

    for epoch in range(num_epochs):
        # Entrenamiento (solo con imágenes normales)
        model.train()
        train_loss = 0.0

        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} (train)"):
            images, _, _ = batch  # Desempacar correctamente (imagen, etiqueta, máscara)
            images = images.to(device)

            # Forward
            reconstructed, _ = model(images)
            loss = criterion(reconstructed, images)

            # Backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Validación - Calcular val_loss y ROC AUC
        model.eval()
        val_loss = 0.0
        all_scores = []
        all_labels = []

        with torch.no_grad():
            for images, labels, masks in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} (val)"):
                images = images.to(device)

                reconstructed, _ = model(images)

                # Calcular validation loss (solo en imágenes normales para ser consistente)
                normal_mask = labels == 0
                if normal_mask.sum() > 0:
                    normal_images = images[normal_mask]
                    normal_reconstructed = reconstructed[normal_mask]
                    val_loss += criterion(normal_reconstructed, normal_images).item()

                # Calcular error de reconstrucción por imagen (para ROC AUC)
                error = torch.mean((images - reconstructed) ** 2, dim=[1, 2, 3])

                all_scores.extend(error.cpu().numpy())
                all_labels.extend(labels.numpy())

        # Calcular métricas
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader) if val_loss > 0 else 0.0

        # ROC AUC solo si tenemos ambas clases (normal y anomalía)
        if len(set(all_labels)) > 1:
            fpr, tpr, _ = roc_curve(all_labels, all_scores)
            roc_auc = auc(fpr, tpr)

            # Guardar el mejor modelo basado en AUC
            if roc_auc > best_auc:
                best_auc = roc_auc
                torch.save(model.state_dict(), os.path.join(output_dir, 'mejor_modelo_multiclase_autoencoder.pth'))
                print(f"     Nuevo mejor modelo guardado! AUC: {roc_auc:.4f}")

            print(f"Epoch {epoch+1}/{num_epochs}, "
                  f"Train Loss: {avg_train_loss:.4f}, "
                  f"Val Loss: {avg_val_loss:.4f}, "
                  f"ROC AUC: {roc_auc:.4f}")
        else:
            # Si solo tenemos una clase, guardar basado en val_loss
            if epoch == 0 or avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), os.path.join(output_dir, 'mejor_modelo_autoencoder.pth'))
                print(f"     Nuevo mejor modelo guardado! Val Loss: {avg_val_loss:.4f}")

            print(f"Epoch {epoch+1}/{num_epochs}, "
                  f"Train Loss: {avg_train_loss:.4f}, "
                  f"Val Loss: {avg_val_loss:.4f}")

        scheduler.step()

    return model

### Funciones de Visualización

In [9]:
def visualize_examples(images, labels, scores, masks, threshold, output_dir, model_type):
    """Visualiza ejemplos de predicciones correctas e incorrectas"""
    # Crear directorio principal si no existe
    os.makedirs(output_dir, exist_ok=True)

    # Crear directorio de examples
    examples_dir = os.path.join(output_dir, 'examples')
    os.makedirs(examples_dir, exist_ok=True)

    print(f"Creando visualizaciones en: {examples_dir}")

    # Convertir a numpy arrays si no lo son
    images = np.array(images)
    labels = np.array(labels)
    scores = np.array(scores)
    predictions = (scores >= threshold).astype(int)

    print(f"Total de imágenes: {len(images)}")
    print(f"Distribución de etiquetas - Normal: {np.sum(labels == 0)}, Anomalía: {np.sum(labels == 1)}")
    print(f"Distribución de predicciones - Normal: {np.sum(predictions == 0)}, Anomalía: {np.sum(predictions == 1)}")

    # Índices para cada categoría (TP, TN, FP, FN)
    true_positive = np.where((predictions == 1) & (labels == 1))[0]
    true_negative = np.where((predictions == 0) & (labels == 0))[0]
    false_positive = np.where((predictions == 1) & (labels == 0))[0]
    false_negative = np.where((predictions == 0) & (labels == 1))[0]

    print(f"True Positives: {len(true_positive)}")
    print(f"True Negatives: {len(true_negative)}")
    print(f"False Positives: {len(false_positive)}")
    print(f"False Negatives: {len(false_negative)}")

    # Limitar número de ejemplos
    max_examples = 5
    categories = [
        ('true_positive', true_positive[:max_examples]),
        ('true_negative', true_negative[:max_examples]),
        ('false_positive', false_positive[:max_examples]),
        ('false_negative', false_negative[:max_examples])
    ]

    # Determinar si tenemos máscaras disponibles
    has_masks = len(masks) > 0 and any(mask is not None for mask in masks)
    print(f"Máscaras disponibles: {has_masks}")

    # Visualizar ejemplos por categoría
    for category_name, indices in categories:
        if len(indices) == 0:
            print(f"No hay ejemplos para {category_name}")
            continue

        print(f"Creando visualización para {category_name} con {len(indices)} ejemplos")

        # Determinar el número de subplots (2 o 3 columnas por muestra)
        n_cols = 3 if has_masks else 2
        fig_width = 4 * len(indices) * n_cols
        fig_height = 4

        plt.figure(figsize=(fig_width, fig_height))

        for i, idx in enumerate(indices):
            try:
                # Imagen original
                subplot_idx = i * n_cols + 1
                plt.subplot(1, len(indices) * n_cols, subplot_idx)

                img = images[idx]
                if img.shape[0] == 3:  # Si está en formato CHW
                    img = img.transpose(1, 2, 0)  # Convertir a HWC

                # Desnormalizar la imagen
                img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                img = np.clip(img, 0, 1)

                plt.imshow(img)
                plt.title(f"Original\nScore: {scores[idx]:.4f}\nLabel: {int(labels[idx])}")
                plt.axis('off')

                # Máscara de ground truth (si está disponible)
                if has_masks and idx < len(masks) and masks[idx] is not None:
                    plt.subplot(1, len(indices) * n_cols, subplot_idx + 1)
                    mask = masks[idx]
                    if hasattr(mask, 'squeeze'):
                        mask = mask.squeeze()
                    elif isinstance(mask, np.ndarray) and mask.ndim > 2:
                        mask = np.squeeze(mask)

                    plt.imshow(mask, cmap='gray')
                    plt.title("Ground Truth")
                    plt.axis('off')

                    # Predicción
                    plt.subplot(1, len(indices) * n_cols, subplot_idx + 2)
                else:
                    # Sin máscara, solo predicción
                    plt.subplot(1, len(indices) * n_cols, subplot_idx + 1)

                # Mostrar predicción
                if predictions[idx] == 1:
                    plt.text(0.5, 0.5, "ANOMALY", ha='center', va='center',
                             fontsize=16, color='red', weight='bold',
                             transform=plt.gca().transAxes)
                else:
                    plt.text(0.5, 0.5, "NORMAL", ha='center', va='center',
                             fontsize=16, color='green', weight='bold',
                             transform=plt.gca().transAxes)

                plt.title(f"Prediction\nThreshold: {threshold:.4f}")
                plt.axis('off')

            except Exception as e:
                print(f"Error procesando imagen {idx} en categoría {category_name}: {e}")
                continue

        plt.suptitle(f"{category_name.replace('_', ' ').title()}", fontsize=16, y=0.98)
        plt.tight_layout()

        # Guardar figura
        filename = os.path.join(examples_dir, f'{category_name}.png')
        try:
            plt.savefig(filename, dpi=150, bbox_inches='tight')
            print(f"Guardado: {filename}")
        except Exception as e:
            print(f"Error guardando {filename}: {e}")
        finally:
            plt.close()  # Cerrar figura para liberar memoria

    # Matriz de confusión
    try:
        from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
        cm = confusion_matrix(labels, predictions)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Normal', 'Anomalía'])

        plt.figure(figsize=(8, 6))
        disp.plot(cmap='Blues')
        plt.title('Matriz de Confusión')

        cm_filename = os.path.join(output_dir, 'confusion_matrix.png')
        plt.savefig(cm_filename, dpi=150, bbox_inches='tight')
        print(f"Matriz de confusión guardada: {cm_filename}")
        plt.close()

    except Exception as e:
        print(f"Error creando matriz de confusión: {e}")

    print(f"Visualizaciones completadas en: {examples_dir}")


# Función auxiliar para verificar y crear directorios
def ensure_directory_exists(path):
    """Asegura que un directorio existe, lo crea si no existe"""
    try:
        os.makedirs(path, exist_ok=True)
        print(f"Directorio verificado/creado: {path}")
        return True
    except Exception as e:
        print(f"Error creando directorio {path}: {e}")
        return False


In [10]:
def visualize_anomaly_maps(model, test_loader, device, output_dir, threshold, model_type):
    """Genera y guarda mapas de anomalías para imágenes de test"""
    os.makedirs(os.path.join(output_dir, 'anomaly_maps'), exist_ok=True)
    model.eval()

    # Seleccionar algunas imágenes para visualización
    samples_seen = {'normal': 0, 'anomaly': 0}
    max_samples = 5

    with torch.no_grad():
        for batch in test_loader:
            # Manejar batch con o sin máscara
            if len(batch) == 3:
                images, labels, masks = batch
                has_masks = True
            else:
                images, labels = batch
                masks = None
                has_masks = False

            images = images.to(device)
            batch_size = images.size(0)

            # Modelo autoencoder
            if model_type == 'autoencoder':
                reconstructed, _ = model(images)
                # Calcular error de reconstrucción por píxel
                error_maps = torch.abs(images - reconstructed)
                # Normalizar mapas de error para visualización
                error_maps_mean = error_maps.mean(dim=1)  # Promediar a través de los canales

                # Convertir a numpy para visualización
                images_np = images.cpu().numpy()
                error_maps_np = error_maps_mean.cpu().numpy()
                reconstructed_np = reconstructed.cpu().numpy()

                for i in range(batch_size):
                    label = labels[i].item()
                    category = 'normal' if label == 0 else 'anomaly'

                    if samples_seen[category] < max_samples:
                        # Determinar número de subplots
                        n_cols = 4 if has_masks else 3
                        fig, axes = plt.subplots(1, n_cols, figsize=(5 * n_cols, 5))

                        # Imagen original
                        img = images_np[i].transpose(1, 2, 0)
                        img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                        img = np.clip(img, 0, 1)
                        axes[0].imshow(img)
                        axes[0].set_title("Original")
                        axes[0].axis('off')

                        # Imagen reconstruida
                        rec_img = reconstructed_np[i].transpose(1, 2, 0)
                        rec_img = rec_img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                        rec_img = np.clip(rec_img, 0, 1)
                        axes[1].imshow(rec_img)
                        axes[1].set_title("Reconstrucción")
                        axes[1].axis('off')

                        # Mapa de error
                        error_map = error_maps_np[i]
                        vmax = np.max(error_map) if np.max(error_map) > 0 else 1
                        im = axes[2].imshow(error_map, cmap='jet', vmin=0, vmax=vmax)
                        axes[2].set_title(f"Mapa de Anomalía\n(Score: {error_maps_mean[i].mean().item():.4f})")
                        axes[2].axis('off')

                        # Máscara de ground truth (si está disponible)
                        if has_masks and masks is not None:
                            mask_np = masks[i].squeeze().cpu().numpy()
                            axes[3].imshow(mask_np, cmap='gray')
                            axes[3].set_title("Ground Truth Mask")
                            axes[3].axis('off')

                        plt.colorbar(im, ax=axes[2])
                        plt.tight_layout()
                        plt.savefig(os.path.join(output_dir, 'anomaly_maps', f'{category}_{samples_seen[category]}.png'))
                        plt.close()

                        samples_seen[category] += 1

            # Modelo supervisado (como no genera mapas de error, guardamos las imágenes)
            elif model_type == 'supervisado':  # Cambiado de 'supervised' a 'supervisado'
                scores = model(images).cpu().numpy().flatten()
                images_np = images.cpu().numpy()

                for i in range(batch_size):
                    label = labels[i].item()
                    category = 'normal' if label == 0 else 'anomaly'

                    if samples_seen[category] < max_samples:
                        # Determinar número de subplots
                        n_cols = 2 if not has_masks else 3
                        fig, axes = plt.subplots(1, n_cols, figsize=(5 * n_cols, 5))

                        # Imagen original
                        img = images_np[i].transpose(1, 2, 0)
                        img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                        img = np.clip(img, 0, 1)
                        axes[0].imshow(img)
                        axes[0].set_title(f"Original\nScore: {scores[i]:.4f}")
                        axes[0].axis('off')

                        # Etiqueta de predicción
                        predicted = "ANOMALY" if scores[i] >= threshold else "NORMAL"
                        color = "red" if predicted == "ANOMALY" else "green"
                        axes[1].text(0.5, 0.5, predicted, ha='center', va='center',
                                    fontsize=20, color=color)
                        axes[1].set_title(f"Prediction\nScore: {scores[i]:.4f}")
                        axes[1].axis('off')

                        # Máscara de ground truth (si está disponible)
                        if has_masks and masks is not None:
                            mask_np = masks[i].squeeze().cpu().numpy()
                            axes[2].imshow(mask_np, cmap='gray')
                            axes[2].set_title("Ground Truth Mask")
                            axes[2].axis('off')

                        plt.tight_layout()
                        plt.savefig(os.path.join(output_dir, 'anomaly_maps', f'{category}_{samples_seen[category]}.png'))
                        plt.close()

                        samples_seen[category] += 1

            if all(count >= max_samples for count in samples_seen.values()):
                break


### Funciones de Evaluación

In [11]:
def evaluar_modelo(model, final_test_loader, device, output_dir, model_type='autoencoder'):
    """
    Evaluación final del modelo en el conjunto de test independiente - versión mejorada
    """
    # Asegurar que el directorio de salida existe
    ensure_directory_exists(output_dir)

    model.eval()
    all_scores = []
    all_labels = []
    all_images = []
    all_masks = []

    print("\n" + "="*50)
    print("REALIZANDO EVALUACIÓN FINAL...")
    print("="*50)

    with torch.no_grad():
        for images, labels, masks in tqdm(final_test_loader, desc="Evaluación final"):
            images = images.to(device)

            if model_type == 'autoencoder':
                # Para autoencoder: calcular error de reconstrucción
                reconstructed, _ = model(images)
                error = torch.mean((images - reconstructed) ** 2, dim=[1, 2, 3])
                scores = error.cpu().numpy()
            else:
                # Para modelo supervisado
                outputs = model(images)
                scores = outputs.cpu().numpy().flatten()

            all_scores.extend(scores)
            all_labels.extend(labels.numpy())
            all_images.extend(images.cpu().numpy())
            all_masks.extend(masks.cpu().numpy() if hasattr(masks, 'cpu') else masks)

    # Resto de la función de evaluación...
    from sklearn.metrics import roc_curve, auc, average_precision_score, precision_recall_fscore_support, precision_recall_curve

    # Calcular métricas finales
    fpr, tpr, thresholds = roc_curve(all_labels, all_scores)
    avg_precision = average_precision_score(all_labels, all_scores)
    roc_auc = auc(fpr, tpr)

    # Encontrar mejor threshold
    optimal_idx = np.argmax(tpr - fpr)
    optimal_threshold = thresholds[optimal_idx]

    # Calcular accuracy con threshold óptimo
    predictions = (np.array(all_scores) > optimal_threshold).astype(int)
    accuracy = np.mean(predictions == all_labels)

    # Calcular precision, recall, F1
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, predictions, average='binary'
    )

    precision_curve, recall_curve, _ = precision_recall_curve(all_labels, all_scores)

    print("\n" + "="*50)
    print(" RESULTADOS FINALES DEL MODELO")
    print("="*50)
    print(f"ROC AUC:     {roc_auc:.4f}")
    print(f"Accuracy:    {accuracy:.4f}")
    print(f"Precision:   {precision:.4f}")
    print(f"Recall:      {recall:.4f}")
    print(f"F1-Score:    {f1:.4f}")
    print(f"Threshold:   {optimal_threshold:.6f}")
    print("="*50)

    # Visualizar curvas
    plt.figure(figsize=(12, 5))

    # ROC Curve
    plt.subplot(1, 2, 1)
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")

    # Precision-Recall Curve
    plt.subplot(1, 2, 2)
    plt.plot(recall_curve, precision_curve, color='blue', lw=2, label=f'PR curve (AP = {avg_precision:.3f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc="upper right")

    plt.tight_layout()
    curves_filename = os.path.join(output_dir, 'curvas_de_evaluacion.png')
    plt.savefig(curves_filename, dpi=150, bbox_inches='tight')
    plt.close()

    print(f'Curvas de evaluación guardadas en: {curves_filename}')
    print(f'Directorio de salida: {output_dir}')

    # Visualizar algunos ejemplos de predicciones
    visualize_examples(all_images, all_labels, all_scores, all_masks, optimal_threshold, output_dir, model_type)

    return {
        'roc_auc': float(roc_auc),
        'accuracy': float(accuracy),
        'precision': float(precision),
        'recall': float(recall),
        'f1': float(f1),
        'threshold': float(optimal_threshold),
        'scores': all_scores,
        'labels': all_labels
    }

### Cuerpo principal

In [18]:
# Variables de configuración
#data_path = 'data/'  # Ruta al dataset MVTec AD
data_path = '/content/drive/MyDrive/VpCIII/data'
#output_dir = 'models/'  # Ruta para guardar el modelo entrenado
output_dir = '/content/models'
reports_path = 'reports/'  # Ruta para guardar los reportes e imágenes

# Lista de todas las categorías en MVTec AD
categories = [
    "bottle",
    "cable",
    "capsule",
    "carpet",
    "grid",
    "hazelnut",
    "leather",
    "metal_nut",
    "pill",
    "screw",
    "tile",
    "toothbrush",
    "transistor",
    "wood",
    "zipper",
]

In [19]:
# Crear directorios de salida en caso de que no existan
os.makedirs(output_dir, exist_ok=True)
os.makedirs(reports_path, exist_ok=True)

In [20]:
# Semilla para reproducibilidad de los experimentos
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x7e1eaaf8ab10>

In [21]:
# Si tenemos disponible GPU, lo usamos
# Chequeamos si tenemos disponible GPU (CUDA)
if torch.cuda.is_available():
    device = "cuda"
# Chequeamos si tenemos disponible aceleración por hardware en un chip de Apple (MPS)
elif torch.backends.mps.is_available():
    device = "mps"
# Por defecto usamos CPU
else:
    device = "cpu"

print(f" Usando dispositivo: {device}")

 Usando dispositivo: cuda


In [22]:
# Definir transformaciones
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Transformación para máscaras (sin normalización)
mask_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

In [23]:
# Crear Datasets
print("\n" + "="*60)
print("CARGANDO DATASETS MVTec AD")
print("="*60)

# Crear datasets para todas las categorías
train_datasets = []
test_datasets = []

for category in categories:
    try:
        train_dataset = MVTecDataset(
            root_path=data_path,
            category=category,
            is_train=True,
            transform=train_transform,
            mask_transform=mask_transform
        )

        test_dataset = MVTecDataset(
            root_path=data_path,
            category=category,
            is_train=False,
            transform=test_transform,
            mask_transform=mask_transform
        )

        train_datasets.append(train_dataset)
        test_datasets.append(test_dataset)
        print(f"{category:12} | Train: {len(train_dataset):3} | Test: {len(test_dataset):3}")
    except Exception as e:
        print(f" Error al cargar la categoría {category}: {e}")


CARGANDO DATASETS MVTec AD
bottle       | Train: 209 | Test:  83
cable        | Train: 224 | Test: 150
capsule      | Train: 219 | Test: 132
carpet       | Train: 280 | Test: 117
grid         | Train: 264 | Test:  78
hazelnut     | Train: 391 | Test: 110
leather      | Train: 245 | Test: 124
metal_nut    | Train: 220 | Test: 115
pill         | Train: 267 | Test: 167
screw        | Train: 320 | Test: 160
tile         | Train: 230 | Test: 117
toothbrush   | Train:  60 | Test:  42
transistor   | Train: 213 | Test: 100
wood         | Train: 247 | Test:  79
zipper       | Train: 240 | Test: 151


In [24]:
# Dividir datasets de test en validación y test final
val_datasets, final_test_datasets = split_test_datasets(test_datasets, categories, test_size=0.5)

# Combinar datasets
train_dataset = ConcatDataset(train_datasets)  # Unir todos los datasets de entrenamiento
val_dataset = ConcatDataset(val_datasets)      # Unir todos los datasets de validación
final_test_dataset = ConcatDataset(final_test_datasets)  # Unir todos los datasets de test final

print(f"\n RESUMEN DE DATASETS:")
print(f"   Entrenamiento: {len(train_dataset):4} imágenes")
print(f"   Validación:    {len(val_dataset):4} imágenes")
print(f"   Test final:    {len(final_test_dataset):4} imágenes")
print(f"   Total:         {len(train_dataset) + len(val_dataset) + len(final_test_dataset):4} imágenes")



DIVISIÓN DE DATASETS DE TEST
bottle       | Total:  83 | Normal:  20 | Anomalía:  63 | Val:  41 | Test:  42
cable        | Total: 150 | Normal:  58 | Anomalía:  92 | Val:  75 | Test:  75
capsule      | Total: 132 | Normal:  23 | Anomalía: 109 | Val:  66 | Test:  66
carpet       | Total: 117 | Normal:  28 | Anomalía:  89 | Val:  58 | Test:  59
grid         | Total:  78 | Normal:  21 | Anomalía:  57 | Val:  39 | Test:  39
hazelnut     | Total: 110 | Normal:  40 | Anomalía:  70 | Val:  55 | Test:  55
leather      | Total: 124 | Normal:  32 | Anomalía:  92 | Val:  62 | Test:  62
metal_nut    | Total: 115 | Normal:  22 | Anomalía:  93 | Val:  57 | Test:  58
pill         | Total: 167 | Normal:  26 | Anomalía: 141 | Val:  83 | Test:  84
screw        | Total: 160 | Normal:  41 | Anomalía: 119 | Val:  80 | Test:  80
tile         | Total: 117 | Normal:  33 | Anomalía:  84 | Val:  58 | Test:  59
toothbrush   | Total:  42 | Normal:  12 | Anomalía:  30 | Val:  21 | Test:  21
transistor   | Total: 

In [25]:
# Defino el tamaño de los lotes
batch_size = 8 # Tamaño de los lotes, mas grande muere

# Crear dataloaders
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4
)

final_test_loader = DataLoader(
    final_test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4
)

print(f"\n DataLoaders creados con batch_size={batch_size}")


 DataLoaders creados con batch_size=8




In [26]:
# Para nuestro enfoque model_type pueden ser dos: 'supervisado' o 'autoencoder'
model_type = 'autoencoder' # Habria que entrenar los dos
#model_type = 'supervisado'

pretrained = True # Usar pre-entrenado
lr = 1e-4
num_epochs = 1

print(f"\n CONFIGURACIÓN DEL MODELO:")
print(f"   Tipo:          {model_type}")
print(f"   Pre-entrenado: {pretrained}")
print(f"   Learning rate: {lr}")
print(f"   Épocas:        {num_epochs}")
print(f"   Categorías:    TODAS ({len(categories)} categorías combinadas)")
print(f"   Estrategia:    Multi-categoría para generalización")


 CONFIGURACIÓN DEL MODELO:
   Tipo:          autoencoder
   Pre-entrenado: True
   Learning rate: 0.0001
   Épocas:        1
   Categorías:    TODAS (15 categorías combinadas)
   Estrategia:    Multi-categoría para generalización


In [27]:
# Creamos el modelo
if model_type == 'supervisado':
    model = SwinTransformerAnomalyDetector(pretrained=pretrained).to(device)
    criterion = nn.BCELoss()
else:  # autoencoder
    model = SwinTransformerAutoencoder(pretrained=pretrained).to(device)
    criterion = nn.MSELoss()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/114M [00:00<?, ?B/s]

In [28]:
# Optimizador y scheduler
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-5) # AdamW es una variante de Adam con decaimiento de peso
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
# Scheduler para reducir la tasa de aprendizaje CosineAnnealingLR sirve para reducir la tasa de aprendizaje de forma cíclica

In [29]:
# Entrenamos el modelo
print(f"\n INICIANDO ENTRENAMIENTO DEL MODELO {model_type.upper()}...")
print("="*60)

if model_type == 'supervisado':
    model = train_supervised_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,  #  Usando val_loader en lugar de test_loader
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=num_epochs,
        device=device,
        output_dir=output_dir
    )
else:
    model = train_autoencoder_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,  #  Usando val_loader en lugar de test_loader
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=num_epochs,
        device=device,
        output_dir=output_dir
    )

print("\n Entrenamiento finalizado!")


 INICIANDO ENTRENAMIENTO DEL MODELO AUTOENCODER...


Epoch 1/1 (train): 100%|██████████| 454/454 [05:24<00:00,  1.40it/s]
Epoch 1/1 (val): 100%|██████████| 108/108 [02:02<00:00,  1.14s/it]


     Nuevo mejor modelo guardado! AUC: 0.5582
Epoch 1/1, Train Loss: 1.0191, Val Loss: 0.8213, ROC AUC: 0.5582

 Entrenamiento finalizado!


In [30]:
# Evaluación Final con el conjunto de test independiente
print(f"\n CARGANDO MEJOR MODELO PARA EVALUACIÓN FINAL...")
if model_type == 'supervisado':
    model.load_state_dict(torch.load(os.path.join(output_dir, 'mejor_modelo_supervisado.pth')))
else:
    # Cargar el mejor modelo autoencoder
    model.load_state_dict(torch.load(os.path.join(output_dir, 'mejor_modelo_multiclase_autoencoder.pth')))


# Evaluación final
final_results = evaluar_modelo(model, final_test_loader, device, reports_path, model_type='autoencoder', )



 CARGANDO MEJOR MODELO PARA EVALUACIÓN FINAL...
Directorio verificado/creado: reports/

REALIZANDO EVALUACIÓN FINAL...


Evaluación final: 100%|██████████| 109/109 [02:02<00:00,  1.12s/it]



 RESULTADOS FINALES DEL MODELO
ROC AUC:     0.5645
Accuracy:    0.6859
Precision:   0.7625
Recall:      0.8254
F1-Score:    0.7927
Threshold:   0.376019
Curvas de evaluación guardadas en: reports/curvas_de_evaluacion.png
Directorio de salida: reports/
Creando visualizaciones en: reports/examples
Total de imágenes: 866
Distribución de etiquetas - Normal: 236, Anomalía: 630
Distribución de predicciones - Normal: 183, Anomalía: 683
True Positives: 521
True Negatives: 74
False Positives: 162
False Negatives: 109
Máscaras disponibles: True
Creando visualización para true_positive con 5 ejemplos
Guardado: reports/examples/true_positive.png
Creando visualización para true_negative con 5 ejemplos
Guardado: reports/examples/true_negative.png
Creando visualización para false_positive con 5 ejemplos
Guardado: reports/examples/false_positive.png
Creando visualización para false_negative con 5 ejemplos
Guardado: reports/examples/false_negative.png
Matriz de confusión guardada: reports/confusion_ma

<Figure size 800x600 with 0 Axes>

In [31]:
#print(final_results)
threshold = final_results['threshold']

In [32]:
# Guardar resultados finales
def convert_numpy_types(obj):
    """Convierte tipos numpy a tipos nativos de Python para JSON"""
    if isinstance(obj, dict):
        return {key: convert_numpy_types(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_types(item) for item in obj]
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        return obj

results_to_save = {k: v for k, v in final_results.items() if k not in ['scores', 'labels']}
results_to_save = convert_numpy_types(results_to_save)  # Convertir tipos numpy
results_file = os.path.join(reports_path, 'resultados_finales.json')

with open(results_file, 'w') as f:
    json.dump(results_to_save, f, indent=2)

print(f"\n Resultados guardados en: {results_file}")
print("\n ¡PROCESO COMPLETADO EXITOSAMENTE!")


 Resultados guardados en: reports/resultados_finales.json

 ¡PROCESO COMPLETADO EXITOSAMENTE!


In [33]:
# Visualizar mapas de anomalías
print("Generando mapas de anomalías...")
visualize_anomaly_maps(
    model=model,
    test_loader=final_test_loader,
    device=device,
    output_dir=reports_path,
    threshold=threshold,
    model_type=model_type
)

Generando mapas de anomalías...


