In [None]:
import torch
from torchvision import models, datasets, transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import utils_a
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt

utils_a.set_seed(42)
device = utils_a.set_device()

### PREPROCESSING

In [None]:
dataset_folder = "mri_brain_tumor"

In [None]:
#mean, std = utils_a.compute_mean_std(train_loader)
mean, std = torch.tensor([0.1855]*3), torch.tensor([0.1813]*3) # precomputed, to save time

print(f"Mean: {mean}")
print(f"Std: {std}")

In [None]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),              # Flip orizzontale
    transforms.RandomVerticalFlip(p=0.5),                # Flip verticale
    transforms.RandomRotation(degrees=15),               # Rotazione casuale di 15 gradi
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Variazione casuale dei colori
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),  # Crop e ridimensionamento casuale
    transforms.ToTensor(),                               # Converti in tensore
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3)),  # Erasing casuale
    transforms.Normalize(mean=mean, std=std)             # Normalizzazione
])

# Transformation for testing dataset without data augmentation
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),         # Resize the image to 224x224
    transforms.ToTensor(),                 # Convert the image to a tensor
    transforms.Normalize(mean=mean, std=std)  # Normalize the image
])

# Apply the transformations to the datasets
train_dataset = datasets.ImageFolder(root=f'{dataset_folder}/Training', transform=train_transform)
test_dataset = datasets.ImageFolder(root=f'{dataset_folder}/Testing', transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=True, num_workers=1)

### PRETRAINED CNN IMPORT

In [None]:
import torch.optim.lr_scheduler as lr_scheduler

model = utils_a.CNN(num_classes=4).to(device)
state_dict = torch.load('weights/cnn_10epochs.pth', map_location=torch.device('cpu'), weights_only=True)

try:
    # Carica lo stato del modello
    model.load_state_dict(state_dict)
    model.to(device)
    print("Model loaded successfully.")
except RuntimeError as e:
    print(f"RuntimeError during model loading: {e}")
except Exception as e:
    print(f"Exception: {e}")
    
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params/(10**6):.3f} M")
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

In [None]:
classes = train_dataset.classes
all_labels, all_preds = utils_a.test(model, device, test_loader, 'test set')
print(classification_report(all_labels, all_preds, target_names=classes))

### POISONING

In [None]:
### SYSTEMATIC POISONING AND EVALUATION
target_classes = [0, 1, 2, 3]
num_epochs = 10
results = []
results_resnet = []
batch_size = 16
rate = 0.3

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=1)

In [None]:
for target in target_classes:
    # Avvelenamento del dataset di addestramento
    poisoned_train_dataset = utils_a.targeted_poison_dataset(train_dataset, target_label=target)
    poisoned_train_loader = DataLoader(poisoned_train_dataset, batch_size=batch_size, shuffle=True)
    print(f"Train set poisoned targeting class {train_dataset.classes[target]}!")
    
    # Avvelenamento del dataset di test
    poisoned_test_dataset = utils_a.random_poison_dataset(test_dataset, poison_rate=rate, exclude_labels=[target])
    poisoned_test_loader = DataLoader(poisoned_test_dataset, batch_size=batch_size, shuffle=True)
    print(f"Test set poisoned with rate {rate}, excluding class {train_dataset.classes[target]}!")

    # Definizione del modello e dell'ottimizzatore
    model = utils_a.CNN(num_classes=4).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    # Addestramento del modello avvelenato
    for epoch in range(num_epochs):
        utils_a.train(poisoned_train_loader, model, optimizer, criterion, device, epoch, num_epochs)
        val_loss = utils_a.validate(model, poisoned_test_loader, criterion, device)
        scheduler.step(val_loss)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}')

    # Test del modello sui dati puliti
    all_labels_clean, all_preds_clean = utils_a.test(model, device, test_loader, 'test clean')
    report_clean = classification_report(all_labels_clean, all_preds_clean, target_names=train_dataset.classes, output_dict=True, zero_division=0)

    # Test del modello sui dati avvelenati
    all_labels_poisoned, all_preds_poisoned = utils_a.test(model, device, poisoned_test_loader, 'test poisoned')
    report_poisoned = classification_report(all_labels_poisoned, all_preds_poisoned, target_names=train_dataset.classes, output_dict=True, zero_division=0)

    # Raccolta dei risultati
    results.append({
        "target_class": train_dataset.classes[target],
        "clean_report": report_clean,
        "poisoned_report": report_poisoned,
    })

### RESNET PRETRAINED

In [None]:
mean = torch.tensor([0.485, 0.456, 0.406])
std = torch.tensor([0.229, 0.224, 0.225])

model_pre = models.resnet50(weights='IMAGENET1K_V1')
model_pre.fc = nn.Linear(model_pre.fc.in_features, 4)
model_pre = model.to(device)

total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params/(10**6):.3f} M")

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=1)

In [None]:
all_labels_clean, all_preds_clean = utils_a.test(model_pre, device, test_loader, 'test clean')
report_clean = classification_report(all_labels_clean, all_preds_clean, target_names=classes, output_dict=True, zero_division=0)

all_labels_poisoned, all_preds_poisoned = utils_a.test(model_pre, device, poisoned_test_loader, 'test poisoned')
report_poisoned = classification_report(all_labels_poisoned, all_preds_poisoned, target_names=classes, output_dict=True, zero_division=0)

In [None]:
for target in target_classes:
    # Avvelenamento del dataset di addestramento
    poisoned_train_dataset = utils_a.targeted_poison_dataset(train_dataset, target_label=target)
    poisoned_train_loader = DataLoader(poisoned_train_dataset, batch_size=8, shuffle=True)
    print(f"Train set poisoned targeting class {train_dataset.classes[target]}!")

    # Avvelenamento del dataset di test
    poisoned_test_dataset = utils_a.random_poison_dataset(test_dataset, poison_rate=rate, exclude_labels=[target])
    poisoned_test_loader = DataLoader(poisoned_test_dataset, batch_size=8, shuffle=True)
    print(f"Test set poisoned with rate {rate}, excluding class {train_dataset.classes[target]}!")

    # Definizione del modello e ottimizzazione
    model = models.resnet50(weights='IMAGENET1K_V1')
    model.fc = nn.Linear(model.fc.in_features, 4)
    model = model.to(device)

    # Blocca i layer di base e allena solo l'ultimo blocco e il fully connected
    for param in model.parameters():
        param.requires_grad = False
    for param in model.layer4.parameters():
        param.requires_grad = True
    for param in model.fc.parameters():
        param.requires_grad = True

    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.00001, weight_decay=1e-5)

    # Addestramento del modello
    for epoch in range(num_epochs):
        utils_a.train(poisoned_train_loader, model, optimizer, criterion, device, epoch, num_epochs)
        val_loss = utils_a.validate(model, test_loader, criterion, device)
        scheduler.step(val_loss)
        print(f'Epoch {epoch + 1}/{num_epochs}, Validation Loss: {val_loss:.4f}')

    # Test del modello sui dati puliti
    all_labels_clean, all_preds_clean = utils_a.test(model, device, test_loader, 'test clean')
    report_clean = classification_report(all_labels_clean, all_preds_clean, target_names=train_dataset.classes,
                                         output_dict=True, zero_division=0)

    # Test del modello sui dati avvelenati
    all_labels_poisoned, all_preds_poisoned = utils_a.test(model, device, test_loader, 'test poisoned')
    report_poisoned = classification_report(all_labels_poisoned, all_preds_poisoned,
                                            target_names=train_dataset.classes, output_dict=True, zero_division=0)

    # Raccolta dei risultati
    results_resnet.append({
        "target_class": train_dataset.classes[target],
        "clean_report": report_clean,
        "poisoned_report": report_poisoned,
    })

In [None]:
# Funzione di visualizzazione dei risultati
def plot_metrics(results, metric='f1-score'):
    targets = [result['target_class'] for result in results]
    clean_metrics = [result['clean_report']['weighted avg'][metric] for result in results]
    poisoned_metrics = [result['poisoned_report']['weighted avg'][metric] for result in results]
    plt.figure(figsize=(12, 6))
    plt.plot(targets, clean_metrics, label='Clean', marker='o')
    plt.plot(targets, poisoned_metrics, label='Poisoned', marker='x')
    plt.xlabel('Target Class')
    plt.ylabel(f'{metric.capitalize()} Score')
    plt.title(f'Comparison of {metric.capitalize()} Scores')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
plot_metrics(results, metric='f1-score')

In [None]:
plot_metrics(results_resnet, metric='f1-score')