In [87]:
import os
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset


class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Recorrer directorios
        for class_name in sorted(os.listdir(root_dir)):
            class_path = os.path.join(root_dir, class_name)
            if os.path.isdir(class_path):
                label = class_name  # Obtener etiqueta de la carpeta
                for img_file in os.listdir(class_path):
                    self.image_paths.append(os.path.join(class_path, img_file))
                    self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image,label

In [89]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

# Entrenamiento (solo 'good')
dir='mvtec_anomaly_detection/bottle/train'
train_dataset = CustomImageDataset(dir,transform=transform)

print(f"Total training images: {len(train_dataset)}")
print(train_dataset.image_paths[:5])  # Muestra las primeras 5 imágenes

# Prueba (con defectos y 'good')
test_dataset = CustomImageDataset('mvtec_anomaly_detection/bottle/test', transform=transform)

print(f"Total test images: {len(test_dataset)}")
print(test_dataset.image_paths[70:])  # Muestra las primeras 5 imágenes

Total training images: 209
['mvtec_anomaly_detection/bottle/train/good/204.png', 'mvtec_anomaly_detection/bottle/train/good/091.png', 'mvtec_anomaly_detection/bottle/train/good/078.png', 'mvtec_anomaly_detection/bottle/train/good/147.png', 'mvtec_anomaly_detection/bottle/train/good/135.png']
Total test images: 83
['mvtec_anomaly_detection/bottle/test/good/003.png', 'mvtec_anomaly_detection/bottle/test/good/009.png', 'mvtec_anomaly_detection/bottle/test/good/006.png', 'mvtec_anomaly_detection/bottle/test/good/013.png', 'mvtec_anomaly_detection/bottle/test/good/016.png', 'mvtec_anomaly_detection/bottle/test/good/012.png', 'mvtec_anomaly_detection/bottle/test/good/002.png', 'mvtec_anomaly_detection/bottle/test/good/018.png', 'mvtec_anomaly_detection/bottle/test/good/000.png', 'mvtec_anomaly_detection/bottle/test/good/005.png', 'mvtec_anomaly_detection/bottle/test/good/001.png', 'mvtec_anomaly_detection/bottle/test/good/008.png', 'mvtec_anomaly_detection/bottle/test/good/015.png']


In [97]:
import os
import csv
from torchvision.utils import save_image
from tqdm import tqdm

# Dataset a exportar
dataset = test_dataset  # o train_dataset

# Directorios de salida
output_dir = "export_test"
img_dir = os.path.join(output_dir, "imagenes")
csv_path = os.path.join(output_dir, "etiquetas.csv")

# Crear carpetas
os.makedirs(img_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

# Crear CSV
with open(csv_path, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['filename', 'label'])

    for idx, (img, label) in enumerate(tqdm(dataset)):
        filename = f"test_img_{idx:05d}.png"
        save_path = os.path.join(img_dir, filename)

        # Guardar imagen
        save_image(img, save_path)

        # Escribir en el CSV
        writer.writerow([filename, label])

print(f"\n✅ Imágenes exportadas en: {img_dir}")
print(f"✅ Etiquetas guardadas en: {csv_path}")


100%|██████████| 83/83 [00:04<00:00, 19.73it/s]


✅ Imágenes exportadas en: export_test/imagenes
✅ Etiquetas guardadas en: export_test/etiquetas.csv





In [98]:
import os
import csv
from torchvision.utils import save_image
from tqdm import tqdm

# Dataset a exportar
dataset = train_dataset  # o train_dataset

# Directorios de salida
output_dir = "export_train"
img_dir = os.path.join(output_dir, "imagenes")
csv_path = os.path.join(output_dir, "etiquetas.csv")

# Crear carpetas
os.makedirs(img_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

# Crear CSV
with open(csv_path, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['filename', 'label'])

    for idx, (img, label) in enumerate(tqdm(dataset)):
        filename = f"train_img_{idx:05d}.png"
        save_path = os.path.join(img_dir, filename)

        # Guardar imagen
        save_image(img, save_path)

        # Escribir en el CSV
        writer.writerow([filename, label])

print(f"\n✅ Imágenes exportadas en: {img_dir}")
print(f"✅ Etiquetas guardadas en: {csv_path}")


100%|██████████| 209/209 [00:10<00:00, 19.41it/s]


✅ Imágenes exportadas en: export_train/imagenes
✅ Etiquetas guardadas en: export_train/etiquetas.csv





In [99]:
import pandas as pd
import os

# Cargar etiquetas de train y test
df_train = pd.read_csv("export_train/etiquetas.csv")
df_test = pd.read_csv("export_test/etiquetas.csv")

# Unir ambos datasets
df = pd.concat([df_train, df_test], ignore_index=True)

# Codificar etiquetas: 'good' → 0, otros → 1
df["label"] = df["label"].apply(lambda x: 0 if x == "good" else 1)
df.to_csv("imagenes_reales/dataset_labels.csv", index=False)
print("✅ Archivo combinado guardado en: imagenes_reales/dataset_labels.csv")

# -------------------------
# Análisis del dataset
# -------------------------
# Cargar el dataset fusionado
df = pd.read_csv("imagenes_reales/dataset_labels.csv")

# Cálculos
normal_count = df[df["label"] == 0].shape[0]
anomalous_count = df[df["label"] == 1].shape[0]
total_count = len(df)

normal_percentage = (normal_count / total_count) * 100
anomalous_percentage = (anomalous_count / total_count) * 100

# Obtener nombres de imágenes anómalas
anomalous_filenames = df[df["label"] == 1]["filename"].tolist()

# Mostrar resultados
print(f"\nTotal de imágenes: {total_count}")
print(f"Imágenes normales (label=0): {normal_count} ({normal_percentage:.2f}%)")
print(f"Imágenes anómalas (label=1): {anomalous_count} ({anomalous_percentage:.2f}%)")
print("\nImágenes anómalas:")
print(anomalous_filenames)


✅ Archivo combinado guardado en: imagenes_reales/dataset_labels.csv

Total de imágenes: 292
Imágenes normales (label=0): 229 (78.42%)
Imágenes anómalas (label=1): 63 (21.58%)

Imágenes anómalas:
['test_img_00000.png', 'test_img_00001.png', 'test_img_00002.png', 'test_img_00003.png', 'test_img_00004.png', 'test_img_00005.png', 'test_img_00006.png', 'test_img_00007.png', 'test_img_00008.png', 'test_img_00009.png', 'test_img_00010.png', 'test_img_00011.png', 'test_img_00012.png', 'test_img_00013.png', 'test_img_00014.png', 'test_img_00015.png', 'test_img_00016.png', 'test_img_00017.png', 'test_img_00018.png', 'test_img_00019.png', 'test_img_00020.png', 'test_img_00021.png', 'test_img_00022.png', 'test_img_00023.png', 'test_img_00024.png', 'test_img_00025.png', 'test_img_00026.png', 'test_img_00027.png', 'test_img_00028.png', 'test_img_00029.png', 'test_img_00030.png', 'test_img_00031.png', 'test_img_00032.png', 'test_img_00033.png', 'test_img_00034.png', 'test_img_00035.png', 'test_img_00

FLATTEN

In [103]:
import numpy as np


def flatten_image(image_path):
   img = Image.open(image_path) 
   img_array = np.array(img)    # Convertir de una imaegn PIL a un array para luego hacer el flatten
   flattened_image = img_array.flatten()
   return flattened_image

In [104]:
import torch

flattened_images = []
labels = []

for _, row in df.iterrows():
    image_path = row['filename']
    flattened_image = flatten_image(f"imagenes_reales/{image_path}")  # debe devolver un tensor
    flattened_tensor = torch.from_numpy(flattened_image)
    flattened_images.append(flattened_tensor)
    labels.append(row['label'])

# Convertimos la lista de tensores a un solo tensor
image_tensor = torch.stack(flattened_images)  # shape: [N, ...]


print(image_tensor.shape)  # Verificar la forma del tensor de imágenes

torch.Size([292, 196608])


Modelo Preentrenado

In [106]:
import numpy as np
import pandas as pd
import torch
import torchvision.transforms as transforms
from torchvision import models
from PIL import Image


# Cargar modelo preentrenado ResNet-50
model = models.resnet50(pretrained=True)

# Eliminar la capa de clasificación (fc)
model = torch.nn.Sequential(*list(model.children())[:-1])

# Ver la arquitectura resultante
print(model)

model.eval()  # Poner en modo evaluación

# Transformaciones necesarias para ResNet-50
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Redimensionar a 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # se usan estos valores por que son los que se usaron
                                                                                 # el entrenamiento de la resnet
])

# Lista para almacenar los embeddings
embeddings = []

# Iterar sobre las imágenes del dataset
for index, row in df.iterrows():
    image_path = row['filename']
    image = Image.open(f"imagenes_reales/{image_path}")
    image = transform(image)
    image = image.unsqueeze(0)   #añadir una dimension mas para el batch

    with torch.no_grad():      #deshabilita el calculo del gradiente
        embedding = model(image)
    embedding = embedding.squeeze().cpu().numpy()   #squeeze: elimina dimensiones de tamaño 1 del tensor
                                                    #cpu: mueve el tensor embedding a la gpu
                                                    #numpy: convierte a un array de numpy
    embeddings.append(embedding)

#Crear un DataFrame con los embeddings
embeddings_df = pd.DataFrame(embeddings)
embeddings_df['label'] = df['label']  # Agregar las etiquetas al nuevo dataset

# Guardar el DataFrame con los embeddings
embeddings_df.to_csv("embeddings.csv", index=False)



Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)


Autoencoder

In [112]:
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import pandas as pd
import numpy as np
import ast
from tqdm import tqdm
from PIL import Image


import torch
import torch.nn as nn

# Definir el Autoencoder con espacio latente 16x1x1
class Autoencoder(nn.Module):
    def __init__(self, input_channels=3, output_channels=1):
        super(Autoencoder, self).__init__()

        # Encoder (Reducir hasta 16x1x1)
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=7, stride=7)
        )

        # Decoder (Expandir desde 16x1x1 hasta 1x28x28)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(16, 16, kernel_size=7, stride=7),
            nn.ReLU(),

            nn.ConvTranspose2d(16, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),

            nn.ConvTranspose2d(16, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),

            nn.ConvTranspose2d(32, output_channels, kernel_size=3, stride=1, padding=1),
            nn.Sigmoid()  # Normalizar salida [0,1]
        )

    def forward(self, x):
        encoded = self.encoder(x)

        decoded = self.decoder(encoded)
        return decoded

# Prueba con un tensor de entrada (batch_size=1, canal=1, 28x28)
x_test = torch.randn(1, 3, 28, 28)

# Inicializar modelo y obtener salida
model = Autoencoder()
output = model(x_test)
print(model.state_dict)
# Imprimir dimensiones de salida
print("Dimensión final de salida:", output.shape)  # Esperado: (1, 1, 28, 28)


<bound method Module.state_dict of Autoencoder(
  (encoder): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=7, stride=7, padding=0, dilation=1, ceil_mode=False)
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(16, 16, kernel_size=(7, 7), stride=(7, 7))
    (1): ReLU()
    (2): ConvTranspose2d(16, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (5): ReLU()
    (6): ConvTranspose2d(32, 1, kernel_size=

In [113]:
# Configurar dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hiperparámetros
batch_size = 128
learning_rate = 0.001
num_epochs = 10

# Transformaciones para MNIST
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((28,28)),
])

# Cargar MNIST
train_dataset = datasets.STL10(root='./data', split='train',        # Puedes usar 'train+unlabeled' también
    download=True,
    transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)   # se crea un iterador para entrenar

# Crear instancia del Autoencoder
autoencoder = Autoencoder().to(device)

# Definir función de pérdida y optimizador
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=learning_rate)

# Entrenar el Autoencoder
print("Entrenando Autoencoder en STL10...")
for epoch in range(num_epochs):
    running_loss = 0.0
    for images, _ in tqdm(train_loader, desc=f"Época {epoch+1}/{num_epochs}"):  #se itera sobre los batches utilizando el iterador antes mencionado
        images = images.to(device)

        # Forward
        outputs = autoencoder(images)      # se obtiene la salida del autonencoder
        loss = criterion(outputs, images)  # Comparar salida con entrada

        # Backpropagation
        optimizer.zero_grad()             #se ponen a cero los gradientes acumulados de los parametros
                                          # del modelo en el optimizador
        loss.backward()                   # se calcula el gradiente con respecto a los parámetros del modelo
        optimizer.step()                  # se actualizan los pesos del modelo utilizando el optimizador

        running_loss += loss.item()

    print(f"Época [{epoch+1}/{num_epochs}], Pérdida: {running_loss / len(train_loader):.6f}")

# Guardar modelo entrenado
model_path = "autoencoder_mnist.pth"
torch.save(autoencoder.state_dict(), model_path)
print(f"Modelo guardado en {model_path}")



Files already downloaded and verified
Entrenando Autoencoder en STL10...


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
Época 1/10: 100%|██████████| 40/40 [00:03<00:00, 11.96it/s]


Época [1/10], Pérdida: 0.056128


Época 2/10: 100%|██████████| 40/40 [00:03<00:00, 11.97it/s]


Época [2/10], Pérdida: 0.044193


Época 3/10: 100%|██████████| 40/40 [00:03<00:00, 12.15it/s]


Época [3/10], Pérdida: 0.036190


Época 4/10: 100%|██████████| 40/40 [00:03<00:00, 12.12it/s]


Época [4/10], Pérdida: 0.032786


Época 5/10: 100%|██████████| 40/40 [00:03<00:00, 12.18it/s]


Época [5/10], Pérdida: 0.031077


Época 6/10: 100%|██████████| 40/40 [00:03<00:00, 12.09it/s]


Época [6/10], Pérdida: 0.030069


Época 7/10: 100%|██████████| 40/40 [00:03<00:00, 12.00it/s]


Época [7/10], Pérdida: 0.029037


Época 8/10: 100%|██████████| 40/40 [00:03<00:00, 12.19it/s]


Época [8/10], Pérdida: 0.028558


Época 9/10: 100%|██████████| 40/40 [00:03<00:00, 12.43it/s]


Época [9/10], Pérdida: 0.028053


Época 10/10: 100%|██████████| 40/40 [00:03<00:00, 12.43it/s]

Época [10/10], Pérdida: 0.027759
Modelo guardado en autoencoder_mnist.pth





In [114]:
# ==========================
# Cargar modelo y evaluar solo el Encoder
# ==========================

model = Autoencoder()
model.load_state_dict(torch.load(model_path))
model.eval()  # Modo evaluación

# Extraer solo el encoder
encoder = autoencoder.encoder
encoder.to(device)
encoder.eval()

# Transformaciones para imágenes externas
transform = transforms.Compose([  # Asegurar que tenga 1 canal
    transforms.ToTensor(),
    transforms.Resize((28,28)),
])

# Lista para almacenar los embeddings
embeddings = []

# Iterar sobre las imágenes del dataset
for index, row in df.iterrows():
    image_path = row['filename']
    image = Image.open(f"imagenes_reales/{image_path}")  # Convertir a escala de grises
    image = transform(image)
    image = image.unsqueeze(0).to(device)   #añadir una dimension mas para el batch

    with torch.no_grad():      #deshabilita el calculo del gradiente
        embedding = encoder(image)
    embedding = embedding.squeeze().cpu().numpy()   #squeeze: elimina dimensiones de tamaño 1 del tensor
                                                    #cpu: mueve el tensor embedding a la gpu
                                                    #numpy: convierte a un array de numpy
    embeddings.append(embedding)

# Procesar todas las imágenes en el dataset externo
print("Extrayendo embeddings...")


# Convertir a DataFrame y guardar
embeddings_df = pd.DataFrame(embeddings)
embeddings_df['label'] = df['label']  # Agregar las etiquetas al nuevo dataset)

# Guardar en archivo CSV
output_file = "dataset_embeddings_encoder.csv"
embeddings_df.to_csv(output_file, index=False)

print(f"Embeddings guardados en {output_file}")

  model.load_state_dict(torch.load(model_path))


Extrayendo embeddings...
Embeddings guardados en dataset_embeddings_encoder.csv


RESNET AUTOENCODER

In [116]:
import torch
import torch.nn as nn
import torchvision.models as models


class ViewLayer(nn.Module):
    def __init__(self, shape):
        super().__init__()
        self.shape = shape

    def forward(self, x):
        return x.view(x.size(0), *self.shape)

class ResNetAutoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet50(pretrained=True)

        # Compacto: todas las capas convolucionales hasta layer4
        self.encoder = nn.Sequential(*list(resnet.children())[:8],
                                     nn.Conv2d(2048, 2048, kernel_size=4),
                                     nn.Flatten(),
                                     nn.Linear(2048, 1024),
                                        nn.ReLU(),
                                        nn.Dropout(0.2),
                                     nn.Linear(1024, 256),
                                        nn.ReLU(),
                                        nn.Dropout(0.2),
                                        nn.Linear(256, 64))



        # Decoder: invertir el proceso usando conv transpuestas
        self.decoder = nn.Sequential(
            # Primero, expandimos [B, 64] → [B, 256]
            nn.Linear(64, 256),
            nn.ReLU(),

            # Expandimos más: [B, 256] → [B, 2048]
            nn.Linear(256, 2048),
            nn.ReLU(),

            # Ahora lo preparamos para reshape: [B, 2048] → [B, 2048, 1, 1]
            ViewLayer((2048, 1, 1)),  # capa auxiliar para reshaping
            nn.ConvTranspose2d(2048, 2048, kernel_size=4),  # [B, 2048, 4, 4]
            nn.ReLU(),
            nn.ConvTranspose2d(2048, 1024, kernel_size=4, stride=2, padding=1),  # -> [B, 1024, 8, 8]
            nn.ReLU(),
            nn.ConvTranspose2d(1024, 512, kernel_size=4, stride=2, padding=1),   # -> [B, 512, 16, 16]
            nn.ReLU(),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),    # -> [B, 256, 32, 32]
            nn.ReLU(),
            nn.ConvTranspose2d(256, 64, kernel_size=4, stride=2, padding=1),     # -> [B, 64, 64, 64]
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),       # -> [B, 3, 128, 128]
            nn.Sigmoid()  # para imágenes normalizadas en [0,1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Prueba con un tensor de entrada (batch_size=1, canal=3, 28x28)
x_test = torch.randn(1, 3, 128, 128)

# Inicializar modelo y obtener salida
model = ResNetAutoencoder()
output = model(x_test)
print(model.state_dict)
output_encoder=model.encoder(x_test)
# Imprimir dimensiones de salida
print("Dimensión final de salida:", output.shape)  # Esperado: (1, 1, 28, 28)
print("Dimensión final de salida encoder:", output_encoder.shape)  # Esperado: (1, 1, 28, 28)



<bound method Module.state_dict of ResNetAutoencoder(
  (encoder): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsa

In [None]:
# Configurar dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hiperparámetros
batch_size = 128
learning_rate = 0.001
num_epochs = 10

# Transformaciones para MNIST
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((128,128)),
])

# Cargar MNIST
train_dataset = datasets.STL10(root='./data', split='train',        # Puedes usar 'train+unlabeled' también
    download=True,
    transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)   # se crea un iterador para entrenar

# Crear instancia del Autoencoder
autoencoder = ResNetAutoencoder().to(device)

# Definir función de pérdida y optimizador
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=learning_rate)

# Entrenar el Autoencoder
print("Entrenando Autoencoder en STL10...")
for epoch in range(num_epochs):
    running_loss = 0.0
    for images, _ in tqdm(train_loader, desc=f"Época {epoch+1}/{num_epochs}"):  #se itera sobre los batches utilizando el iterador antes mencionado
        images = images.to(device)

        # Forward
        outputs = autoencoder(images)      # se obtiene la salida del autonencoder
        loss = criterion(outputs, images)  # Comparar salida con entrada

        # Backpropagation
        optimizer.zero_grad()             #se ponen a cero los gradientes acumulados de los parametros
                                          # del modelo en el optimizador
        loss.backward()                   # se calcula el gradiente con respecto a los parámetros del modelo
        optimizer.step()                  # se actualizan los pesos del modelo utilizando el optimizador

        running_loss += loss.item()

    print(f"Época [{epoch+1}/{num_epochs}], Pérdida: {running_loss / len(train_loader):.6f}")

# Guardar modelo entrenado
model_path = "autoencoder_resnet.pth"
torch.save(autoencoder.state_dict(), model_path)
print(f"Modelo guardado en {model_path}")



Files already downloaded and verified


KeyboardInterrupt: 

In [118]:
# ==========================
# Cargar modelo y evaluar solo el Encoder
# ==========================

# Cargar el modelo entrenado
model = ResNetAutoencoder()
model.load_state_dict(torch.load("autoencoder_resnet.pth"))
model.eval()  # Modo evaluación

# Extraer solo el encoder
encoder = model.encoder
encoder.to(device)
encoder.eval()

# Transformaciones para imágenes externas
transform = transforms.Compose([  # Asegurar que tenga 1 canal
    transforms.ToTensor(),
    transforms.Resize((128,128)),
    transforms.Lambda(lambda x: x.expand(3, -1, -1))  # convertir 1 canal → 3 canales
])

# Lista para almacenar los embeddings
embeddings = []

# Iterar sobre las imágenes del dataset
for index, row in df.iterrows():
    image_path = row['filename']
    image = Image.open(f"imagenes_reales/{image_path}")
    image = transform(image)
    image = image.unsqueeze(0).to(device)   #añadir una dimension mas para el batch

    with torch.no_grad():      #deshabilita el calculo del gradiente
        embedding = encoder(image)
    embedding = embedding.squeeze().cpu().numpy()   #squeeze: elimina dimensiones de tamaño 1 del tensor
                                                    #cpu: mueve el tensor embedding a la gpu
                                                    #numpy: convierte a un array de numpy
    embeddings.append(embedding)

# Procesar todas las imágenes en el dataset externo
print("Extrayendo embeddings...")


# Convertir a DataFrame y guardar
embeddings_df = pd.DataFrame(embeddings)
embeddings_df['label'] = df['label']  # Agregar las etiquetas al nuevo dataset)

# Guardar en archivo CSV
output_file = "dataset_embeddings_encoder_resnet.csv"
embeddings_df.to_csv(output_file, index=False)

print(f"Embeddings guardados en {output_file}")

  model.load_state_dict(torch.load("autoencoder_resnet.pth"))


Extrayendo embeddings...
Embeddings guardados en dataset_embeddings_encoder_resnet.csv


DISTRIBUCION DE LOS DATOS CON CENTROIDE

In [119]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def AnomalyDetector(file_path, k=3):  # k controla qué tan lejos consideramos anómalo
    df = pd.read_csv(file_path)

    embedding_columns = df.columns[:-1]  # nombres de columnas excepto la última
    embeddings = df[embedding_columns].values

    # Calcular el centroide
    centroid = np.mean(embeddings, axis=0)
    distances = np.linalg.norm(embeddings - centroid, axis=1)

    # Calcular la media y desviación estándar de las distancias
    mean_dist = np.mean(distances)
    std_dist = np.std(distances)

    # Definir umbral basado en k desviaciones estándar
    threshold = mean_dist + k * std_dist

    # Identificar anomalías
    anomalous_examples = df.loc[distances > threshold]

    return anomalous_examples


In [125]:
from sklearn.metrics import accuracy_score, f1_score

# ==========================
# Cargar los Embeddings
# ==========================
file_path = "embeddings.csv"  # Cambia con la ruta correcta


print("Anomaly Detector")
anomalies = AnomalyDetector(file_path,0.2)

print(anomalies)


true_labels = df["label"]
predicted_labels=np.where(df.index.isin(anomalies.index), 1, 0)
acc= accuracy_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels)
print("Accuracy:", acc)
print("F1 Score:", f1)

Anomaly Detector
            0         1         2         3         4         5         6  \
2    1.138372  0.474673  0.076546  0.025930  1.464019  0.825359  0.570955   
12   0.943661  0.864050  0.040693  0.001843  1.591970  0.630615  0.596591   
22   1.237791  0.727628  0.093860  0.046641  1.735232  0.540565  0.719597   
25   0.942185  0.699878  0.330590  0.016602  1.539579  0.389360  0.932401   
48   1.024678  0.540794  0.227426  0.005122  1.402829  0.419207  0.703321   
74   0.925889  0.602573  0.190677  0.016207  1.304960  0.423691  0.759735   
75   1.033656  0.597987  0.192873  0.006274  1.343840  0.358058  0.689633   
85   1.005027  0.630002  0.228996  0.011468  1.362785  0.407044  0.849415   
102  0.781428  0.724238  0.282848  0.018604  1.465149  0.528963  0.589808   
106  0.818515  0.536316  0.197660  0.000691  1.279285  0.502947  0.689971   
130  1.003127  0.634560  0.340014  0.018634  1.474930  0.353921  0.769650   
138  0.984462  0.983883  0.259868  0.003474  1.491165  0.63

In [126]:
from sklearn.metrics import accuracy_score, f1_score

# ==========================
# Cargar los Embeddings
# ==========================

df = pd.DataFrame(embeddings)
df['label'] = labels  # última columna = etiqueta

df.to_csv("tus_embeddings.csv", index=False)

file_path = "tus_embeddings.csv"  # Cambia con la ruta correcta


print("Anomaly Detector")
anomalies = AnomalyDetector(file_path,0.05)

print(anomalies)


true_labels = df["label"]
predicted_labels=np.where(df.index.isin(anomalies.index), 1, 0)
acc= accuracy_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels)
print("Accuracy:", acc)
print("F1 Score:", f1)

Anomaly Detector
             0         1          2          3          4          5  \
1    85.229996  4.675968 -53.141420  -9.637234 -214.40220 -18.426817   
4    87.173060  6.110612 -55.606724  -5.331498 -215.95230 -18.256690   
9    89.009200  6.396141 -53.122093  -3.047558 -215.05301 -20.361510   
12   87.688190  5.572481 -48.674942  -3.559689 -213.26302 -22.216146   
21   86.384250  4.861780 -50.735940  -6.241511 -210.22803 -20.669264   
..         ...       ...        ...        ...        ...        ...   
273  87.821340  4.975814 -48.848260  -3.769339 -209.41348 -22.609660   
276  82.375370  3.913394 -49.141052  -9.592222 -212.40822 -19.732508   
280  90.488710  7.060682 -51.876213   0.334909 -212.11804 -21.653784   
282  87.273094  4.883232 -44.166435   0.810245 -201.20331 -25.145105   
287  79.988800  2.955720 -48.465008 -12.517033 -212.16797 -19.065527   

             6          7          8          9  ...         55         56  \
1    -9.321927 -60.085340  88.949210 -16

METODO ANTONIO

In [127]:
import torch

class GDAOneClassTorch:
    def fit(self, X):
        """
        Ajusta el modelo GDA para una sola clase usando PyTorch.
        :param X: Tensor de forma (n_samples, n_features)
        """
        self.mu = X.mean(dim=0)
        self.centered = X - self.mu
        self.sigma = torch.matmul(self.centered.T, self.centered) / X.shape[0]     #calcula la matriz de covarianzas

        # Regularización para evitar matriz singular
        epsilon = 1e-5
        self.sigma += epsilon * torch.eye(self.sigma.shape[0])
        self.inv_sigma = torch.inverse(self.sigma)
        self.det_sigma = torch.det(self.sigma)

    def score(self, X):
        centered = X - self.mu
        tmp = torch.matmul(centered, self.inv_sigma)
        quad_form = (tmp * centered).sum(dim=1)
        return -0.5 * quad_form  # Cuanto más positivo, más anómalo

    def predict(self, X, threshold):
        scores = self.score(X)
        return (scores < threshold).int()  # umbral ajustado según el score, no la densidad


    def print_parameters(self):
        """
        Imprime el vector de medias y la matriz de covarianza.
        """
        print("Vector de medias (mu):")
        print(self.mu)
        print("\nMatriz de covarianza (sigma):")
        print(self.sigma)

    def print_score(self, X):
      """
      Imprime y retorna la densidad de cada punto en X.
      :param X: Tensor (n_samples, n_features)
      :return: Diccionario {índice: densidad}
      """
      probs = self.score(X)
      scores = dict()

      for i, p in enumerate(probs):
          valor = p.item()
          print(f"Densidad del punto {i}: {valor:.6f}")
          scores[i] = valor

      return scores


In [204]:
from sklearn.metrics import accuracy_score,f1_score
file_path = "embeddings.csv"

df = pd.read_csv(file_path)

df_normal = df[df['label'] == 0]
embedding_columns = df.columns[:-1]  # nombres de columnas excepto la última
embeddings_train = df_normal[embedding_columns].values
model = GDAOneClassTorch()
X_train = torch.tensor(embeddings_train, dtype=torch.float32)

model.fit(X_train)


embeddings = df[embedding_columns].values
X_test = torch.tensor(embeddings, dtype=torch.float32)


preds = model.predict(X_test, threshold=-120)
print(preds.shape)

# Paso 3: imprimir por pantalla (predicción y etiqueta real)
for i, (p, real) in enumerate(zip(preds, df["label"])):
    print(f"Ejemplo {i}: Predicción = {p.item()}, Etiqueta real = {real}")

true_labels = df["label"]
acc= accuracy_score(true_labels, preds)
f1 = f1_score(true_labels, preds)
print("Accuracy:", acc)
print("F1 Score:", f1)



torch.Size([292])
Ejemplo 0: Predicción = 0, Etiqueta real = 0
Ejemplo 1: Predicción = 0, Etiqueta real = 0
Ejemplo 2: Predicción = 0, Etiqueta real = 0
Ejemplo 3: Predicción = 0, Etiqueta real = 0
Ejemplo 4: Predicción = 0, Etiqueta real = 0
Ejemplo 5: Predicción = 0, Etiqueta real = 0
Ejemplo 6: Predicción = 0, Etiqueta real = 0
Ejemplo 7: Predicción = 0, Etiqueta real = 0
Ejemplo 8: Predicción = 0, Etiqueta real = 0
Ejemplo 9: Predicción = 0, Etiqueta real = 0
Ejemplo 10: Predicción = 0, Etiqueta real = 0
Ejemplo 11: Predicción = 0, Etiqueta real = 0
Ejemplo 12: Predicción = 0, Etiqueta real = 0
Ejemplo 13: Predicción = 0, Etiqueta real = 0
Ejemplo 14: Predicción = 0, Etiqueta real = 0
Ejemplo 15: Predicción = 0, Etiqueta real = 0
Ejemplo 16: Predicción = 0, Etiqueta real = 0
Ejemplo 17: Predicción = 0, Etiqueta real = 0
Ejemplo 18: Predicción = 0, Etiqueta real = 0
Ejemplo 19: Predicción = 0, Etiqueta real = 0
Ejemplo 20: Predicción = 0, Etiqueta real = 0
Ejemplo 21: Predicción = 0

In [203]:
model.print_parameters()
model.print_score(X_train)
l=model.print_score(X_test)
score_ordenado = dict(sorted(l.items(), key=lambda item: item[1]))
print(score_ordenado)
print(preds)

Vector de medias (mu):
tensor([1.0490, 0.6470, 0.1405,  ..., 0.1763, 0.7875, 0.6380])

Matriz de covarianza (sigma):
tensor([[ 1.3818e-02, -1.8820e-03, -9.0921e-04,  ...,  4.7015e-04,
          5.0818e-03, -1.3453e-03],
        [-1.8820e-03,  8.7657e-03,  1.1247e-03,  ..., -1.1773e-04,
         -5.1083e-03,  2.6859e-07],
        [-9.0921e-04,  1.1247e-03,  4.4453e-03,  ...,  1.0937e-03,
         -3.8740e-03,  3.6731e-03],
        ...,
        [ 4.7015e-04, -1.1773e-04,  1.0937e-03,  ...,  2.0578e-03,
         -9.2642e-04,  2.5566e-03],
        [ 5.0818e-03, -5.1083e-03, -3.8740e-03,  ..., -9.2642e-04,
          1.8766e-02, -2.5094e-03],
        [-1.3453e-03,  2.6859e-07,  3.6731e-03,  ...,  2.5566e-03,
         -2.5094e-03,  1.8755e-02]])
Densidad del punto 0: -113.495369
Densidad del punto 1: -113.356911
Densidad del punto 2: -113.547157
Densidad del punto 3: -113.380867
Densidad del punto 4: -113.599045
Densidad del punto 5: -113.521675
Densidad del punto 6: -113.325233
Densidad del 

In [156]:
# Obtener todos los scores como diccionario {i: score}
score_dict = model.print_score(X_test)

# Filtrar solo los scores de los ejemplos anómalos (label == 1)
scores_anomalos = {
    i: score
    for i, score in score_dict.items()
    if true_labels[i] == 1
}

# Mostrar
print("🔍 SCORES de anomalías (label == 1):")
for i, score in scores_anomalos.items():
    print(f"Índice {i} | Score: {score:.4f} | Predicción: {preds[i].item()}")


Densidad del punto 0: -12.179932
Densidad del punto 1: -2.937256
Densidad del punto 2: -5.107498
Densidad del punto 3: -8.025604
Densidad del punto 4: -8.244873
Densidad del punto 5: -10.072571
Densidad del punto 6: -3.276413
Densidad del punto 7: -1.492676
Densidad del punto 8: -10.793457
Densidad del punto 9: -15.905029
Densidad del punto 10: -4.435852
Densidad del punto 11: -2.109253
Densidad del punto 12: -43.795410
Densidad del punto 13: -6.423645
Densidad del punto 14: -10.168457
Densidad del punto 15: -14.024567
Densidad del punto 16: -3.153534
Densidad del punto 17: -0.692627
Densidad del punto 18: -5.493408
Densidad del punto 19: -4.665894
Densidad del punto 20: -9.551514
Densidad del punto 21: -7.324463
Densidad del punto 22: -2.110840
Densidad del punto 23: -2.855759
Densidad del punto 24: -9.329498
Densidad del punto 25: -17.702148
Densidad del punto 26: -1.195801
Densidad del punto 27: -5.386841
Densidad del punto 28: -43.843750
Densidad del punto 29: -16.066895
Densidad d

NUEVO METODO ANTONIO

In [157]:
class ZScoreThreshold:
    def fit(self, X):
        """
        Calcula media y desviación estándar por columna.
        X: tensor (n_samples, n_features)
        """
        self.mu = X.mean(dim=0)
        self.std = X.std(dim=0)
        self.std[self.std == 0] = 1e-6  # evita divisiones por cero

    def score(self, X):
        """
        Retorna el número de columnas fuera de lo normal por muestra.
        """
        z_scores = (X - self.mu) / self.std
        return (z_scores.abs() > 1.96).sum(dim=1)  # puedes parametrizar el umbral

    def predict(self, X, num_columns=1):
        """
        Marca como anomalía si más de x columnas estan fuera del estandar.
        """

        scores = self.score(X)
        return (scores > num_columns).int()
    # Puedes ajustar el número de columnas según lo que consideres anómalo

    


In [201]:
from sklearn.metrics import accuracy_score,f1_score
file_path = "dataset_embeddings_encoder_resnet.csv"

df = pd.read_csv(file_path)

df_normal = df[df['label'] == 0]
embedding_columns = df.columns[:-1]  # nombres de columnas excepto la última
embeddings_train = df_normal[embedding_columns].values
model = ZScoreThreshold()
X_train = torch.tensor(embeddings_train, dtype=torch.float32)

model.fit(X_train)


embeddings = df[embedding_columns].values
X_test = torch.tensor(embeddings, dtype=torch.float32)


preds = model.predict(X_test, num_columns=8)     #PARA LOS EMBEDDINGS DEL PREENTRENADO 300,350,400 ESTA BIEN, 200, 180 MUY BIEN
print(preds.shape)                               #PARA LOS EMBEDDINGS DEL AUTOENCODER 2,3
                                                 #PARA LOS EMBEDDINGS DEL AUTOENCODER RESNET 15,12,10,13,14,11

# Paso 3: imprimir por pantalla (predicción y etiqueta real)
for i, (p, real) in enumerate(zip(preds, df["label"])):
    print(f"Ejemplo {i}: Predicción = {p.item()}, Etiqueta real = {real}")

true_labels = df["label"]
acc= accuracy_score(true_labels, preds)
f1 = f1_score(true_labels, preds, average='macro')  
print("Accuracy:", acc)
print("F1 Score:", f1)



torch.Size([292])
Ejemplo 0: Predicción = 0, Etiqueta real = 0
Ejemplo 1: Predicción = 0, Etiqueta real = 0
Ejemplo 2: Predicción = 0, Etiqueta real = 0
Ejemplo 3: Predicción = 0, Etiqueta real = 0
Ejemplo 4: Predicción = 0, Etiqueta real = 0
Ejemplo 5: Predicción = 0, Etiqueta real = 0
Ejemplo 6: Predicción = 0, Etiqueta real = 0
Ejemplo 7: Predicción = 0, Etiqueta real = 0
Ejemplo 8: Predicción = 0, Etiqueta real = 0
Ejemplo 9: Predicción = 0, Etiqueta real = 0
Ejemplo 10: Predicción = 0, Etiqueta real = 0
Ejemplo 11: Predicción = 0, Etiqueta real = 0
Ejemplo 12: Predicción = 1, Etiqueta real = 0
Ejemplo 13: Predicción = 0, Etiqueta real = 0
Ejemplo 14: Predicción = 0, Etiqueta real = 0
Ejemplo 15: Predicción = 0, Etiqueta real = 0
Ejemplo 16: Predicción = 0, Etiqueta real = 0
Ejemplo 17: Predicción = 0, Etiqueta real = 0
Ejemplo 18: Predicción = 0, Etiqueta real = 0
Ejemplo 19: Predicción = 0, Etiqueta real = 0
Ejemplo 20: Predicción = 0, Etiqueta real = 0
Ejemplo 21: Predicción = 0