<a href="https://colab.research.google.com/github/mich1803/AnomalyDetection-AE-LHC/blob/main/AD_AE_LHC_PhysAI2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Group Project 2024/25





## Task



Allenare un AE per Anomaly Detection in grado di riconoscere segnali prodotti da jeti adronici anomali in un reivelatore di fisica delle alte energie.

Alle energie estreme del Large Hadron Collider, particelle massive possono essere prodotte con un tale boost di Lorentz da far sì che i loro decadimenti in adroni (getti adronici) risultino così collimati che le particelle prodotte si sovrappongono. Determinare se la sottostruttura di un getto osservato sia dovuta a una singola particella di bassa massa oppure a molteplici prodotti di decadimento di una particella di massa elevata è un problema cruciale nell’analisi dei dati del LHC. Gli approcci tradizionali si basano su osservabili di alto livello costruite a partire da modelli teorici di deposizione di energia nei calorimetri e da parametri delle tracce cariche ricostruite nel tracciatore interno, ma la complessità dei dati rende questo compito un candidato ideale per l’applicazione di strumenti di deep learning. I costituenti dei getti possono infatti essere rappresentati come immagini 2D in cui ogni pixel rappresenta una delle celle sensibili del calorimetro, e il contenuto della cella una misura dell'energia o della quantità di moto depositata nella cella.

**Dataset:**

I dati del progetto sono nella forma di immagini 2D di dimensione (100,100), ogni cella rappresenta l'energia depositata in quella cella dalle particelle del jet adronico corrispondente. Ci sono due tipologie di jet adronici consider ati: *jet normali*, costituiti dalla adronizzazione di un quark leggero o gluone, e *jet anomali* (disponibili in una frazione incognita solo nel test set) costituiti dall'adronizzazione dei quark nel decadimento $t \to Wb \to qq'b$, in cui a causa del boost del quark top, i tre quark nello stato finale sono parzialmente sovrapposti.

* *Normal data dataset:* 12k jet rappresentati come histogrammi 2D della quantità di moto depositata in ciascuno dei 100x100 bin di una finestra quadrata nel piano ($\theta,\phi$) centrato intorno all'asse del jet.

* *Test dataset:*
due dataset costituiti ciascuno da 3k eventi, contenenti jet normali e jet anomali in una frazione relativa icognita da determinare. Nel primo dataset (*_high*) la frazione incognita di eventi anomali è $\ge 55\%$. Nel secondo dataset (*_low*) la frazione incognita di eventi anomali incognita è $\le 45\%$.
Potete utilizzare questa informazione per verificare che le vostre predizioni soddisfino la relazione $f_{high} > f_{low}$.

I dati sono forniti come array numpy in un file numpy compresso (.npz), leggibile con l'esempio di codice che segue:


```
import numpy as np

f_train = np.load('Normal_data.npz')
f_test_l = np.load('Test_data_low.npz')
f_test_h = np.load('Test_data_high.npz')

normal_data = f_train['normal_data']
test_data_l = f_test_l['test_data']
test_data_h = f_test_h['test_data']

print(normal_data.shape)
print(test_data_l.shape)
print(test_data_h.shape)
```

**Per scaricare i dataset:**
* dati normali:
```
!wget http://giagu.web.cern.ch/giagu/CERN/P2025/Normal_data.npz
```
* dati anomali:
```
!wget http://giagu.web.cern.ch/giagu/CERN/P2025/<Identificativo Dataset>/Test_data_low.npz
!wget http://giagu.web.cern.ch/giagu/CERN/P2025/<Identificativo Dataset>/Test_data_high.npz
```
```
# <Identificativo Dataset> dal foglio excel prenotazione gruppi
```


**Obiettivi minimi del progetto (potete a vostro piacimento aggiungere ulteriori analisi/studi:**

1. Plot della rappresentazione latente delle immagini di test fatto con riduzione dimensionale.
2. Stima della frazione di eventi anomali presente nei due Test dataset, tenendo conto che la di procedura di stima deve garantire che la rate di falsi postivi sia inferiore a circa il $10\%$ (FPR $\le \sim 10\%$).
3. Clustering dello spazio (per esempio usando un algoritmo GMM).
4. Misura della purezza dei cluster rispetto alle label assegnate in anomaly score.


**Nota Importante:**

Il notebook deve essere compilato come una relazione scientifica del progetto, quindi deve contenere sia il codice (leggibile e riproducibile), i risultati in termini di grafici e tabelle numeriche, e il testo che illustra la strategia ottenuta, le scelte compiute, e i risultati ottenuti.

## Introduzione

In questo progetto proponiamo un approccio basato su Autoencoder convolutivi per l'identificazione di anomalie nei segnali prodotti da jet adronici in un rivelatore del Large Hadron Collider. Le immagini utilizzate rappresentano mappe bidimensionali della quantità di moto depositata nei calorimetri, e vengono trattate come input per reti neurali convolutive.

L'obiettivo principale è confrontare due strategie per la rilevazione di anomalie:
1. Una basata sull'errore di ricostruzione (Mean Squared Error, MSE) tra immagine originale e immagine ricostruita;
2. Una basata su un'analisi di clustering dello spazio latente (ottenuto tramite codifica dell'Autoencoder), in particolare utilizzando un algoritmo Gaussian Mixture Model (GMM).

Per svolgere tale confronto sono stati addestrati due modelli di Autoencoder con architettura convolutiva:

- **Modello S**: spazio latente a 3 dimensioni, direttamente visualizzabile in 3D.
- **Modello L**: spazio latente a 32 dimensioni, successivamente ridotto a 3D tramite PCA per scopi di visualizzazione e clustering.

Per entrambi i modelli, l'addestramento è stato effettuato unicamente su eventi normali, mentre il test è stato condotto su due dataset contenenti una frazione ignota di eventi anomali.

**Autori**  
Michele Magrini (2066963)  
Julian Hendrix (Matricola)

## Codice

In [None]:
#@title Import librerie, settaggio seed per la riproducibilità
!pip install -q pytorch-lightning

import numpy as np
import torch
import random
import os
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
import pytorch_lightning as pl
import torch.nn.functional as F
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping
from sklearn.mixture import GaussianMixture
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Impostazioni per la riproducibilità
SEED = 42
pl.seed_everything(SEED, workers=True)
random.seed(SEED)
np.random.seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

# Device (CPU/GPU)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {DEVICE} device")

# Download dei dataset
ID = "G15"
!wget -nc http://giagu.web.cern.ch/giagu/CERN/P2025/Normal_data.npz
!wget -nc http://giagu.web.cern.ch/giagu/CERN/P2025/{ID}/Test_data_low.npz
!wget -nc http://giagu.web.cern.ch/giagu/CERN/P2025/{ID}/Test_data_high.npz

### Caricamento dei Dataset/Dataloader e visualizzazione preliminare

In [None]:
#@title Creazione Datasets e DataLoader PyTorch

f_train = np.load('Normal_data.npz')
f_test_l = np.load('Test_data_low.npz')
f_test_h = np.load('Test_data_high.npz')

normal_data = f_train['normal_data']
test_data_l = f_test_l['test_data']
test_data_h = f_test_h['test_data']

# Normalizzazione [0,1] usando solo il training set
train_min = np.min(normal_data)
train_max = np.max(normal_data)

def normalize(data, data_min, data_max):
    return (data - data_min) / (data_max - data_min + 1e-8)

normal_data = normalize(normal_data, train_min, train_max)
test_data_l = normalize(test_data_l, train_min, train_max)
test_data_h = normalize(test_data_h, train_min, train_max)

# Conversione in tensori torch
normal_tensor = torch.tensor(normal_data, dtype=torch.float32).unsqueeze(1)
test_tensor_l = torch.tensor(test_data_l, dtype=torch.float32).unsqueeze(1)
test_tensor_h = torch.tensor(test_data_h, dtype=torch.float32).unsqueeze(1)

# Split: 90% train, 10% val
train_len = int(0.9 * len(normal_tensor))
val_len = len(normal_tensor) - train_len
full_dataset = TensorDataset(normal_tensor)
train_dataset, val_dataset = random_split(full_dataset, [train_len, val_len])

# Dataset
test_dataset_l = TensorDataset(test_tensor_l)
test_dataset_h = TensorDataset(test_tensor_h)

# DataLoader
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader_l = DataLoader(test_dataset_l, batch_size=BATCH_SIZE, shuffle=False)
test_loader_h = DataLoader(test_dataset_h, batch_size=BATCH_SIZE, shuffle=False)

# Info
print(f"Train: {len(train_dataset)} | Val: {len(val_dataset)} | Test Low: {len(test_dataset_l)} | Test High: {len(test_dataset_h)}")
# Print Sample Shape
print(f"Train Sample Shape: {train_dataset[0][0].shape}")

datasets = {
    "Train (Normal)": normal_data,
    "Test Low": test_data_l,
    "Test High": test_data_h
}
print("\nLa normalizzazione tra 0 e 1 è basata sui dati di training:")
for name, data in datasets.items():
    print(f"{name}: min = {np.min(data):.4f}, max = {np.max(data):.4f}")


In [None]:
#@title plot di alcuni eventi del dataset

fig, axes = plt.subplots(nrows=3, ncols=3, figsize=(10, 10))
fig.suptitle("Primi 3 esempi per dataset (100x100)\nScala dei colori locale per ciascuna immagine", fontsize=14)

for row_idx, (label, data) in enumerate(datasets.items()):
    for col_idx in range(3):
        ax = axes[row_idx, col_idx]
        img = data[col_idx]
        im = ax.imshow(img, cmap="inferno", origin="lower", vmin=np.min(img), vmax=np.max(img))
        ax.axis("off")
        if col_idx == 0:
            ax.set_title(label, fontsize=12, loc='left')

fig.subplots_adjust(left=0.05, right=0.95, top=0.9, bottom=0.05, wspace=0.1, hspace=0.2)
plt.show()

### Definizione del Modello e dei Parametri


In [None]:
#@title Modello ConvAE

class ConvAE(pl.LightningModule):
    def __init__(self, config):
        super().__init__()
        self.save_hyperparameters(config)
        self.lr = config.get("lr", 1e-3)
        self.dropout = config.get("dropout", 0.0)
        hidden_dim = config["hidden_dim"]

        # Encoder
        in_channels = 1
        encoder_layers = []
        for conv in config["in_conv"]:
            encoder_layers.append(nn.Conv2d(in_channels, **{k: v for k, v in conv.items() if k != "output_padding"}))
            encoder_layers.append(nn.ReLU())
            if self.dropout > 0:
                encoder_layers.append(nn.Dropout2d(self.dropout))
            in_channels = conv["out_channels"]
        self.encoder_conv = nn.Sequential(*encoder_layers)

        # Calcola shape del bottleneck convoluzionale
        dummy_input = torch.zeros(1, 1, 100, 100)
        with torch.no_grad():
            dummy_out = self.encoder_conv(dummy_input)
            self.conv_shape = dummy_out.shape[1:]  # (C, H, W)
            self.flat_dim = dummy_out.numel()


        # Bottleneck FC
        self.encoder_fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flat_dim, hidden_dim)
        )
        self.decoder_fc = nn.Sequential(
            nn.Linear(hidden_dim, self.flat_dim),
            nn.ReLU()
        )

        # Decoder
        decoder_layers = []
        in_channels = config["in_conv"][-1]["out_channels"]
        for conv in config["out_conv"]:
            decoder_layers.append(nn.ConvTranspose2d(in_channels, **conv))
            decoder_layers.append(nn.ReLU())
            if self.dropout > 0:
                decoder_layers.append(nn.Dropout2d(self.dropout))
            in_channels = conv["out_channels"]
        decoder_layers[-2] = nn.Sigmoid()  # ultima attivazione
        self.decoder_conv = nn.Sequential(*decoder_layers)

    def forward(self, x):
        x = self.encoder_conv(x)
        x = self.encoder_fc(x)
        x = self.decoder_fc(x)
        x = x.view(-1, *self.conv_shape)  # usa shape salvata
        x = self.decoder_conv(x)
        return x

    def training_step(self, batch, batch_idx):
        x = batch[0]
        x_hat = self(x)
        loss = F.mse_loss(x_hat, x)
        self.log("train_loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x = batch[0]
        x_hat = self(x)
        loss = F.mse_loss(x_hat, x)
        self.log("val_loss", loss, prog_bar=True)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)


In [None]:
#@title Parametri S e L

# Small (3D spazio latente)
params_S = {
    "hidden_dim": 3,
    "in_conv": [
        {"out_channels": 16, "kernel_size": 4, "stride": 2, "padding": 1},  # 100 → 50
        {"out_channels": 32, "kernel_size": 4, "stride": 2, "padding": 1},  # 50 → 25
        {"out_channels": 64, "kernel_size": 4, "stride": 2, "padding": 1}   # 25 → 13
    ],
    "out_conv": [
        {"out_channels": 32, "kernel_size": 4, "stride": 2, "padding": 1, "output_padding": 1},  # 13 → 25
        {"out_channels": 16, "kernel_size": 4, "stride": 2, "padding": 1, "output_padding": 0},  # 25 → 50
        {"out_channels": 1,  "kernel_size": 4, "stride": 2, "padding": 1, "output_padding": 0}   # 50 → 100
    ],
    "dropout": 0.2,
    "lr": 1e-3
}

# Large (32D spazio latente)
params_L = {**params_S, "hidden_dim": 32}

MAX_EPOCHS = 50
PATIENCE = MAX_EPOCHS // 5


### Modello S -> 3D

In [None]:
# Inizializza il modello
from torchsummary import summary
model_S = ConvAE(params_S).to(DEVICE)

summary(model_S, input_size=(1, 100, 100))

In [None]:
#@title Training del modello Small

model_S = ConvAE(params_S)

early_stop_cb = EarlyStopping(
    monitor="val_loss",
    patience=PATIENCE,
    mode="min"
)

# Trainer
trainer = Trainer(
    max_epochs=MAX_EPOCHS,
    accelerator="auto",
    callbacks=[early_stop_cb],
    log_every_n_steps=10
)

# Training
trainer.fit(model_S, train_loader, val_loader)


In [None]:
#@title Confronto degli errori di ricostruzione: Test LOW, Test HIGH

def compute_reconstruction_errors(model, dataloader):
    model.eval()
    errors = []
    with torch.no_grad():
        for batch in dataloader:
            x = batch[0].to(model.device)
            x_hat = model(x)
            mse = F.mse_loss(x_hat, x, reduction='none')
            mse_per_image = mse.view(mse.size(0), -1).mean(dim=1)
            errors.extend(mse_per_image.cpu().numpy())
    return np.array(errors)

# Calcolo errori
train_errors = compute_reconstruction_errors(model_S, train_loader)
val_errors = compute_reconstruction_errors(model_S, val_loader)
test_errors_l = compute_reconstruction_errors(model_S, test_loader_l)
test_errors_h = compute_reconstruction_errors(model_S, test_loader_h)

print(f"Train MSE: {np.mean(train_errors):.7f} | Val MSE: {np.mean(val_errors):.7f}")
print(f"Test LOW MSE: {np.mean(test_errors_l):.7f} | Test HIGH MSE: {np.mean(test_errors_h):.7f}")

In [None]:
#@title (TASK 1) Plot 3D delle rappresentazioni latenti di: Train+Val, Test LOW, Test HIGH

def extract_latents(dataloader, model):
    latents = []
    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            x = batch[0].to(model.device)
            z = model.encoder_fc(model.encoder_conv(x))
            latents.append(z.cpu().numpy())
    return np.concatenate(latents, axis=0)

# Estrazione
latents_train = extract_latents(train_loader, model_S)
latents_val = extract_latents(val_loader, model_S)
latents_low = extract_latents(test_loader_l, model_S)
latents_high = extract_latents(test_loader_h, model_S)

# Funzione per creare uno scatter3D Plotly
def plot_latents_plotly(latents, name, color):
    return go.Scatter3d(
        x=latents[:, 0], y=latents[:, 1], z=latents[:, 2],
        mode='markers',
        marker=dict(size=2, color=color, opacity=0.6),
        name=name
    )

# Subplots Plotly
fig = make_subplots(
    rows=1, cols=3,
    specs=[[{'type': 'scatter3d'}]*3],
    subplot_titles=["Train", "Test LOW", "Test HIGH"]
)

# Tracce
fig.add_trace(plot_latents_plotly(latents_train, "Train", "deepskyblue"), row=1, col=1)
fig.add_trace(plot_latents_plotly(latents_val, "Val", "steelblue"), row=1, col=1)
fig.add_trace(plot_latents_plotly(latents_low, "Test LOW", "orange"), row=1, col=2)
fig.add_trace(plot_latents_plotly(latents_high, "Test HIGH", "purple"), row=1, col=3)

# Layout
fig.update_layout(
    height=600, width=1800,
    title_text="Confronto Spazio Latente - Train+Val vs Test",
    showlegend=True
)

fig.show()


**Visualizzazione dello Spazio Latente**

Per visualizzare la rappresentazione appresa dallo spazio latente, abbiamo proiettato i dati del training set e dei due test set (LOW e HIGH) nello spazio latente di dimensione 3 utilizzando direttamente le coordinate apprese dall'autoencoder (modello `Small`).

**Risultati e Interpretazione**

- **Train**: I dati del training set (solo eventi normali) risultano fortemente concentrati su una regione molto compressa dello spazio latente.
- **Test LOW & HIGH**: Anche i dati di test (contenenti anomalie) si distribuiscono lungo la stessa direzione latente. Non si osserva una separazione chiara tra eventi normali e anomali nello spazio latente a 3 dimensioni.

Il fatto che i dati (train, test low e test high) risultino **allineati su una linea** suggerisce che:
- Lo spazio latente appreso è **troppo compresso** (il modello ha appreso solo una variabilità principale).
- Il modello riesce a **ricostruire bene anche gli eventi anomali**, rendendo difficile distinguere le anomalie solo dalla struttura latente.


In [None]:
#@title (TASK 2) Stima delle anomalie nei Test LOW e Test HIGH (tramite errore MSE)

threshold = np.percentile(val_errors, 90)
print(f"Soglia impostata (90° percentile val set): {threshold:.6f}")

# Stima della frazione anomala
frac_anom_low = np.mean(test_errors_l > threshold)
frac_anom_high = np.mean(test_errors_h > threshold)

# Plot
fig, axs = plt.subplots(1, 3, figsize=(20, 5), sharey=True, sharex=True)
datasets = [val_errors, test_errors_l, test_errors_h]
titles = ["Validation", "Test LOW", "Test HIGH"]
colors = ['steelblue', 'orange', 'purple']

for ax, data, title, color in zip(axs, datasets, titles, colors):
    n, bins, patches = ax.hist(data, bins=100, color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    for patch, left in zip(patches, bins[:-1]):
        if left >= threshold:
            patch.set_edgecolor('red')
            patch.set_linewidth(2)
    ax.set_title(title)
    ax.set_xlabel("MSE")
    ax.legend()

plt.suptitle("Distribuzione degli errori di ricostruzione (MSE) per immagine", fontsize=14)
plt.tight_layout()
plt.show()

print(f"\nFrazione stimata di anomalie:")
print(f"Test LOW : {frac_anom_low:.3f}")
print(f"Test HIGH: {frac_anom_high:.3f}")
print(f"Verifica relazione: f_high > f_low → {frac_anom_high > frac_anom_low}")


In [None]:
#@title (TASK 3) Clustering delle rappresentazioni latenti con GMM

"""# Funzione per rietichettare i cluster in base all'errore medio
def reorder_gmm_labels(errors, labels):
    unique_labels = np.unique(labels)
    cluster_means = [errors[labels == i].mean() for i in unique_labels]
    sorted_idx = np.argsort(cluster_means)
    label_map = {unique_labels[sorted_idx[0]]: 0, unique_labels[sorted_idx[1]]: 1}
    return np.vectorize(label_map.get)(labels)"""

# Standardizzazione
scaler = StandardScaler()
latents_train_scaled = scaler.fit(latents_train)
latents_low_scaled = scaler.transform(latents_low)
latents_high_scaled = scaler.transform(latents_high)

# Concatenazione dei dati
latents_combined = np.concatenate([latents_low_scaled, latents_high_scaled], axis=0)

# Fit del GMM su entrambi
gmm = GaussianMixture(n_components=2, random_state=SEED)
gmm.fit(latents_combined)

# Predizione separata mantenendo coerenza dei cluster
labels_low = gmm.predict(latents_low_scaled)
labels_high = gmm.predict(latents_high_scaled)

"""# Ordinamento delle etichette in base all'errore medio
labels_low = reorder_gmm_labels(test_errors_l, labels_low)
labels_high = reorder_gmm_labels(test_errors_h, labels_high)"""

# Plot interattivo con subplots
fig = make_subplots(
    rows=1, cols=2,
    specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]],
    subplot_titles=["GMM Clustering - Test LOW", "GMM Clustering - Test HIGH"]
)

fig.add_trace(go.Scatter3d(
    x=latents_low_scaled[:, 0], y=latents_low_scaled[:, 1], z=latents_low_scaled[:, 2],
    mode='markers', marker=dict(size=2, color=labels_low, colorscale='Viridis', opacity=0.6)
), row=1, col=1)

fig.add_trace(go.Scatter3d(
    x=latents_high_scaled[:, 0], y=latents_high_scaled[:, 1], z=latents_high_scaled[:, 2],
    mode='markers', marker=dict(size=2, color=labels_high, colorscale='Viridis', opacity=0.6)
), row=1, col=2)

fig.update_layout(height=600, width=1000, title_text="GMM Clustering nello Spazio Latente", showlegend=False)
fig.show()

In [None]:
#@title Errore di Ricostruzione VS Clustering dei Test Set

fig, axs = plt.subplots(2, 2, figsize=(12, 8), sharex=True, sharey=True)

# Prima riga: errori complessivi
datasets = [test_errors_l, test_errors_h]
titles = ["Test LOW - Anomaly Score", "Test HIGH - Anomaly Score"]
colors = ['orange', 'purple']

for ax, data, title, color in zip(axs[0], datasets, titles, colors):
    n, bins, patches = ax.hist(data, bins=50, color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    for patch, left in zip(patches, bins[:-1]):
        if left >= threshold:
            patch.set_edgecolor('red')
            patch.set_linewidth(2)
    ax.set_title(title)
    ax.set_xlabel("MSE")
    ax.legend()

# Seconda riga: divisione per cluster
cluster_info = [
    (test_errors_l, labels_low, "Test LOW - Cluster", ['goldenrod', 'orangered']),
    (test_errors_h, labels_high, "Test HIGH - Cluster", ['indigo', 'mediumorchid'])
]

for ax, (errors, labels, title_prefix, cluster_colors) in zip(axs[1], cluster_info):
    for cluster_id, color in zip([0, 1], cluster_colors):
        cluster_errors = errors[labels == cluster_id]
        ax.hist(cluster_errors, bins=50, alpha=0.7, label=f"{title_prefix} {cluster_id}", color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    ax.set_xlabel("MSE")
    ax.legend()
    ax.set_title(f"{title_prefix} - Anomaly Score by Cluster")

plt.suptitle("Distribuzione degli errori di ricostruzione (MSE) e divisione per cluster", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

print()

# Percentuali di ciascun cluster
def print_cluster_stats(labels, name):
    unique, counts = np.unique(labels, return_counts=True)
    total = counts.sum()
    print(f"{name}:")
    for u, c in zip(unique, counts):
        print(f"  Cluster {u}: {c} ({c / total * 100:.2f}%)")
    print()

print_cluster_stats(labels_low, f"Test LOW ({frac_anom_low:.3f}% di anomalie MSE)")
print_cluster_stats(labels_high, f"Test HIGH ({frac_anom_high:.3f}% di anomalie MSE)")

In [None]:
#@title (TASK 4) Misura della purezza dei cluster rispetto alla classificazione fatta con l'MSE

def purity_score(cluster_labels, anomaly_labels):
    N = len(cluster_labels)
    purity_sum = 0
    for cl in np.unique(cluster_labels):
        idx = (cluster_labels == cl)
        # Quanti punti "anomali" e "normali" nel cluster
        counts = np.bincount(anomaly_labels[idx], minlength=2)
        purity_sum += counts.max()
    return purity_sum / N

# 1. Calcola anomaly labels
anomaly_labels_low = (test_errors_l < threshold).astype(int)   # 1 = normale
anomaly_labels_high = (test_errors_h < threshold).astype(int)

# 2. Calcola purity per LOW e HIGH
purity_low = purity_score(labels_low, anomaly_labels_low)
purity_high = purity_score(labels_high, anomaly_labels_high)

print(f"Purity nei cluster:")
print(f"  Test LOW : {purity_low:.3f}")
print(f"  Test HIGH: {purity_high:.3f}")


### Modello L -> 32D

In [None]:
model_L = ConvAE(params_L).to(DEVICE)

# Supponendo input (1, 100, 100)
summary(model_L, input_size=(1, 100, 100))

In [None]:
#@title Training del modello Large

# Inizializza il modello
model_L = ConvAE(params_L)

early_stop_cb = EarlyStopping(
    monitor="val_loss",
    patience=PATIENCE,
    mode="min"
)

# Trainer
trainer = Trainer(
    max_epochs=MAX_EPOCHS,
    accelerator="auto",
    callbacks=[early_stop_cb],
    log_every_n_steps=10
)

# Training
trainer.fit(model_L, train_loader, val_loader)


In [None]:
#@title Confronto degli errori di ricostruzione: Test LOW, Test HIGH

def compute_reconstruction_errors(model, dataloader):
    model.eval()
    errors = []
    with torch.no_grad():
        for batch in dataloader:
            x = batch[0].to(model.device)
            x_hat = model(x)
            mse = F.mse_loss(x_hat, x, reduction='none')
            mse_per_image = mse.view(mse.size(0), -1).mean(dim=1)
            errors.extend(mse_per_image.cpu().numpy())
    return np.array(errors)

# Calcolo errori
train_errors = compute_reconstruction_errors(model_L, train_loader)
val_errors = compute_reconstruction_errors(model_L, val_loader)
test_errors_l = compute_reconstruction_errors(model_L, test_loader_l)
test_errors_h = compute_reconstruction_errors(model_L, test_loader_h)

print(f"Train MSE: {np.mean(train_errors):.7f} | Val MSE: {np.mean(val_errors):.7f}")
print(f"Test LOW MSE: {np.mean(test_errors_l):.7f} | Test HIGH MSE: {np.mean(test_errors_h):.7f}")

In [None]:
#@title (TASK 1) Plot 3D (Riduzione da 32D) delle rappresentazioni latenti di: Train+Val, Test LOW, Test HIGH

# Estrazione
latents_train = extract_latents(train_loader, model_L)
latents_val = extract_latents(val_loader, model_L)
latents_low = extract_latents(test_loader_l, model_L)
latents_high = extract_latents(test_loader_h, model_L)

# Standardizzazione
scaler = StandardScaler()
latents_train_std = scaler.fit_transform(latents_train)
latents_val_std = scaler.transform(latents_val)
latents_low_std = scaler.transform(latents_low)
latents_high_std = scaler.transform(latents_high)

# PCA → 3D
pca = PCA(n_components=3, random_state=SEED)
latents_train_pca = pca.fit_transform(latents_train)
latents_val_pca = pca.transform(latents_val)
latents_low_pca = pca.transform(latents_low)
latents_high_pca = pca.transform(latents_high)

# Subplots Plotly
fig = make_subplots(
    rows=1, cols=3,
    specs=[[{'type': 'scatter3d'}]*3],
    subplot_titles=["Train", "Test LOW", "Test HIGH"]
)

# Tracce
fig.add_trace(plot_latents_plotly(latents_train_pca, "Train", "deepskyblue"), row=1, col=1)
fig.add_trace(plot_latents_plotly(latents_val_pca, "Val", "steelblue"), row=1, col=1)
fig.add_trace(plot_latents_plotly(latents_low_pca, "Test LOW", "orange"), row=1, col=2)
fig.add_trace(plot_latents_plotly(latents_high_pca, "Test HIGH", "purple"), row=1, col=3)

# Layout
fig.update_layout(
    height=600, width=1800,
    title_text="Spazio Latente (PCA) - Train+Val vs Test",
    showlegend=True
)

fig.show()


In [None]:
#@title (TASK 2) Stima delle anomalie nei Test LOW e Test HIGH (tramite errore MSE)

threshold = np.percentile(val_errors, 90)
print(f"Soglia impostata (90° percentile val set): {threshold:.6f}")

# Stima della frazione anomala
frac_anom_low = np.mean(test_errors_l > threshold)
frac_anom_high = np.mean(test_errors_h > threshold)

# Plot
fig, axs = plt.subplots(1, 3, figsize=(20, 5), sharey=True, sharex=True)
datasets = [val_errors, test_errors_l, test_errors_h]
titles = ["Validation", "Test LOW", "Test HIGH"]
colors = ['steelblue', 'orange', 'purple']

for ax, data, title, color in zip(axs, datasets, titles, colors):
    n, bins, patches = ax.hist(data, bins=100, color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    for patch, left in zip(patches, bins[:-1]):
        if left >= threshold:
            patch.set_edgecolor('red')
            patch.set_linewidth(2)
    ax.set_title(title)
    ax.set_xlabel("MSE")
    ax.legend()

plt.suptitle("Distribuzione degli errori di ricostruzione (MSE) per immagine", fontsize=14)
plt.tight_layout()
plt.show()

print(f"\nFrazione stimata di anomalie:")
print(f"Test LOW : {frac_anom_low:.3f}")
print(f"Test HIGH: {frac_anom_high:.3f}")
print(f"Verifica relazione: f_high > f_low → {frac_anom_high > frac_anom_low}")


In [None]:
#@title (TASK 3) Clustering delle rappresentazioni latenti con GMM

"""# Funzione per rietichettare i cluster in base all'errore medio
def reorder_gmm_labels(errors, labels):
    cluster_means = [errors[labels == i].mean() for i in np.unique(labels)]
    new_order = np.argsort(cluster_means)
    label_map = {old: new for new, old in enumerate(new_order)}
    return np.vectorize(label_map.get)(labels)"""

# GMM clustering
# Concatenazione dei dati
latents_combined = np.concatenate([latents_low_std, latents_high_std], axis=0)

# Fit del GMM su entrambi
gmm = GaussianMixture(n_components=2, random_state=SEED)
gmm.fit(latents_combined)

# Predizione separata mantenendo coerenza dei cluster
labels_low = gmm.predict(latents_low_std)
labels_high = gmm.predict(latents_high_std)


"""# Rietichettatura coerente
labels_low = reorder_gmm_labels(test_errors_l, labels_low)
labels_high = reorder_gmm_labels(test_errors_h, labels_high)"""

# Plot interattivo
fig = make_subplots(
    rows=1, cols=2,
    specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]],
    subplot_titles=["GMM Clustering - Test LOW", "GMM Clustering - Test HIGH"]
)

fig.add_trace(go.Scatter3d(
    x=latents_low_pca[:, 0], y=latents_low_pca[:, 1], z=latents_low_pca[:, 2],
    mode='markers',
    marker=dict(size=2, color=labels_low, colorscale='Viridis', opacity=0.6),
    name="LOW"
), row=1, col=1)

fig.add_trace(go.Scatter3d(
    x=latents_high_pca[:, 0], y=latents_high_pca[:, 1], z=latents_high_pca[:, 2],
    mode='markers',
    marker=dict(size=2, color=labels_high, colorscale='Viridis', opacity=0.6),
    name="HIGH"
), row=1, col=2)

fig.update_layout(
    height=600, width=1000,
    title_text="GMM Clustering nello Spazio Latente",
    showlegend=False
)
fig.show()

In [None]:
#@title Errore di Ricostruzione VS Clustering dei Test Set

fig, axs = plt.subplots(2, 2, figsize=(12, 8), sharex=True, sharey=True)

# Prima riga: errori complessivi
datasets = [test_errors_l, test_errors_h]
titles = ["Test LOW - Anomaly Score", "Test HIGH - Anomaly Score"]
colors = ['orange', 'purple']

for ax, data, title, color in zip(axs[0], datasets, titles, colors):
    n, bins, patches = ax.hist(data, bins=50, color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    for patch, left in zip(patches, bins[:-1]):
        if left >= threshold:
            patch.set_edgecolor('red')
            patch.set_linewidth(2)
    ax.set_title(title)
    ax.set_xlabel("MSE")
    ax.legend()

# Seconda riga: divisione per cluster
cluster_info = [
    (test_errors_l, labels_low, "Test LOW - Cluster", ['goldenrod', 'orangered']),
    (test_errors_h, labels_high, "Test HIGH - Cluster", ['indigo', 'mediumorchid'])
]

for ax, (errors, labels, title_prefix, cluster_colors) in zip(axs[1], cluster_info):
    for cluster_id, color in zip([0, 1], cluster_colors):
        cluster_errors = errors[labels == cluster_id]
        ax.hist(cluster_errors, bins=50, alpha=0.7, label=f"{title_prefix} {cluster_id}", color=color)
    ax.axvline(threshold, color='red', linestyle='--', label='Soglia 90° percentile')
    ax.set_xlabel("MSE")
    ax.legend()
    ax.set_title(f"{title_prefix} - Anomaly Score by Cluster")

plt.suptitle("Distribuzione degli errori di ricostruzione (MSE) e divisione per cluster", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

print()

print_cluster_stats(labels_low, f"Test LOW ({frac_anom_low:.3f}% di anomalie MSE)")
print_cluster_stats(labels_high, f"Test HIGH ({frac_anom_high:.3f}% di anomalie MSE)")

In [None]:
#@title (TASK 4) Misura della purezza dei cluster rispetto alla classificazione fatta con l'MSE

# 1. Calcola anomaly labels
anomaly_labels_low = (test_errors_l > threshold).astype(int)   # 1 = anomalia
anomaly_labels_high = (test_errors_h > threshold).astype(int)

# 2. Calcola purity per LOW e HIGH
purity_low = purity_score(labels_low, anomaly_labels_low)
purity_high = purity_score(labels_high, anomaly_labels_high)

print(f"Purity nei cluster:")
print(f"  Test LOW : {purity_low:.3f}")
print(f"  Test HIGH: {purity_high:.3f}")
