### Imports

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import os
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

### Load constants

In [24]:
from const import *

## Train data augmentation

In [25]:
TRAIN_TRANSFORM = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(3),
    transforms.RandomAffine(degrees=0, translate=(0.05, 0.05), scale=(0.95, 1.05)),
    transforms.ColorJitter(brightness=0.05, contrast=0.05, saturation=0.05, hue=0.05),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [26]:
class WaldoImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, fname) for fname in os.listdir(root_dir) if fname.endswith(IMAGE_EXTENSION)]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, image

def get_dataloaders(root_dir, transform, batch_size):
    dataset = WaldoImageDataset(root_dir=root_dir, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [27]:
class ConvAutoencoder(nn.Module):
    def __init__(self, latent_dim: int):
        super().__init__()
        self.latent_dim = latent_dim
        self.flat_size = 64 * 4 * 4

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
        )

        self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2),
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2),
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 3, kernel_size=2, stride=2),
            nn.Tanh()
        )


        self.fc_encoder = nn.Linear(self.flat_size, latent_dim)
        self.fc_decoder = nn.Linear(latent_dim, self.flat_size)

    def forward(self, x):
        img_height, img_width = x.size(2), x.size(3)

        x = self.encoder(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_encoder(x)
        x = self.fc_decoder(x)
        x = x.view(x.size(0), 64, 4, 4)
        x = self.decoder(x)
        x = F.interpolate(x, size=(img_height, img_width), mode='bilinear', align_corners=False)
        return x

In [28]:
BATCH_SIZE: int = 64
LEARNING_RATE: float = 1e-3
NUM_EPOCHS: int = 50
NOISE_FACTOR: float = 0.05
WEIGHT_DECAY: float = 1e-5

In [29]:
def add_gaussian_noise(images, noise_factor):
    noisy_images = images + noise_factor * torch.randn_like(images)
    noisy_images = torch.clamp(noisy_images, 0., 1.)
    return noisy_images

In [30]:
def train(model, data_root, transform, epochs, device):
    dataset = WaldoImageDataset(root_dir=data_root, transform=transform)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
    model.to(device)

    for epoch in range(epochs):
        for images in dataloader:
            if isinstance(images, (list, tuple)):
                images = images[0]
            images = add_gaussian_noise(images, NOISE_FACTOR)
            images = images.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

    print("Training complete.")
    return model

In [31]:
trained_model = train(ConvAutoencoder(latent_dim=IMAGE_SIZE), TRAIN_DATA_PATH, TRAIN_TRANSFORM, NUM_EPOCHS, DEVICE)

Epoch [1/50], Loss: 0.1039
Epoch [2/50], Loss: 0.0925
Epoch [3/50], Loss: 0.0887
Epoch [4/50], Loss: 0.0746
Epoch [5/50], Loss: 0.0771
Epoch [6/50], Loss: 0.0781
Epoch [7/50], Loss: 0.0813
Epoch [8/50], Loss: 0.0738
Epoch [9/50], Loss: 0.0717
Epoch [10/50], Loss: 0.0719
Epoch [11/50], Loss: 0.0686
Epoch [12/50], Loss: 0.0671
Epoch [13/50], Loss: 0.0675
Epoch [14/50], Loss: 0.0693
Epoch [15/50], Loss: 0.0701
Epoch [16/50], Loss: 0.0723
Epoch [17/50], Loss: 0.0647
Epoch [18/50], Loss: 0.0614
Epoch [19/50], Loss: 0.0632
Epoch [20/50], Loss: 0.0535
Epoch [21/50], Loss: 0.0628
Epoch [22/50], Loss: 0.0636
Epoch [23/50], Loss: 0.0743
Epoch [24/50], Loss: 0.0516
Epoch [25/50], Loss: 0.0678
Epoch [26/50], Loss: 0.0614
Epoch [27/50], Loss: 0.0571
Epoch [28/50], Loss: 0.0717
Epoch [29/50], Loss: 0.0653
Epoch [30/50], Loss: 0.0644
Epoch [31/50], Loss: 0.0505
Epoch [32/50], Loss: 0.0530
Epoch [33/50], Loss: 0.0634
Epoch [34/50], Loss: 0.0678
Epoch [35/50], Loss: 0.0572
Epoch [36/50], Loss: 0.0551
E

## Test data preparations

In [None]:
TEST_TRANSFORM = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [None]:
def get_test_dataloaders(normal_path, anomaly_path, transform, batch_size):
    normal_dataset = WaldoImageDataset(root_dir=normal_path, transform=transform)
    normal_loader = DataLoader(normal_dataset, batch_size=batch_size, shuffle=False)
    
    anomaly_dataset = WaldoImageDataset(root_dir=anomaly_path, transform=transform)
    anomaly_loader = DataLoader(anomaly_dataset, batch_size=batch_size, shuffle=False)
    
    return normal_loader, anomaly_loader

In [None]:
def calculate_reconstruction_errors(model, dataloader, device):
    model.eval()
    all_errors = []
    
    with torch.no_grad():
        for images, _ in dataloader:
            images = images.to(device)
            
            reconstructed = model(images)
            mse_loss = F.mse_loss(reconstructed, images, reduction='none')
            reconstruction_error = mse_loss.sum(dim=[1, 2, 3]) 
            
            all_errors.extend(reconstruction_error.cpu().numpy())
            
    return np.array(all_errors)

In [None]:
from scipy.stats import scoreatpercentile
from sklearn.metrics import roc_auc_score, f1_score, confusion_matrix

def evaluate_model(model, normal_loader, anomaly_loader, threshold, device):
    # Calculate errors for both classes
    normal_errors = calculate_reconstruction_errors(model, normal_loader, device)
    anomaly_errors = calculate_reconstruction_errors(model, anomaly_loader, device)
    
    # Concatenate all errors and create true labels
    all_errors = np.concatenate([normal_errors, anomaly_errors])
    # True labels: 0 for normal, 1 for anomaly
    true_labels = np.concatenate([np.zeros(len(normal_errors)), np.ones(len(anomaly_errors))])
    
    # Predict labels based on the threshold
    predicted_labels = (all_errors > threshold).astype(int)
    
    # --- Metrics ---
    
    # 1. AUC-ROC Score (best measure for imbalanced binary classification)
    auc_roc = roc_auc_score(true_labels, all_errors)
    
    # 2. Confusion Matrix (to see False Positives/Negatives)
    cm = confusion_matrix(true_labels, predicted_labels)
    tn, fp, fn, tp = cm.ravel()
    
    # 3. F1 Score (harmonic mean of precision and recall)
    f1 = f1_score(true_labels, predicted_labels)
    
    print("\n--- Model Performance ---")
    print(f"AUC-ROC Score: {auc_roc:.4f} (Closer to 1.0 is better)")
    print(f"F1 Score: {f1:.4f}")
    print(f"\nConfusion Matrix:\n{cm}")
    print(f"True Positives (Correct Waldo patches): {tp}")
    print(f"False Positives (NotWaldo flagged as Waldo): {fp}")
    print(f"False Negatives (Waldo missed): {fn}")
    print(f"True Negatives (Correct NotWaldo patches): {tn}")

normal_test_loader, anomaly_test_loader = get_test_dataloaders(
    TRAIN_DATA_PATH, 
    TEST_DATA_PATH, 
    TEST_TRANSFORM, 
    BATCH_SIZE
)


evaluate_model(trained_model, normal_test_loader, anomaly_test_loader, 0.9, DEVICE)