# Allenamento in Google Colab di CustomResNet50

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
PATH_FOLDER = '/content/drive/MyDrive/Colab Notebooks/Pneumonia Detection/'

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import datasets
import os
import matplotlib.pyplot as plt
from torchvision import models
from torchvision.models import ResNet50_Weights

# Parametri dell'allenamento
BATCH_SIZE = 128
LEARNING_RATE = 0.0002
EPOCHS = 10
OPTIMIZER = 'Adam'
LOSS_FUNCTION = 'Categorical Cross-Entropy'
H, W = 224, 244
MODEL = 'CustomResNet50'

# Transformation for Data Augmentation
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.RandomResizedCrop(size=(224, 224), scale=(0.9, 1.1)),
    transforms.Resize((H, W)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: add_gaussian_noise(x) if torch.rand(1).item() > 0.5 else x),
])

# Dataset e DataLoader
path_to_train = os.path.join(PATH_FOLDER, "Data", "train")
path_to_test = os.path.join(PATH_FOLDER, "Data", "test")
train_dataset = datasets.ImageFolder(path_to_train, transform=transform)
test_dataset = datasets.ImageFolder(path_to_test, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


class CustomResNet50(nn.Module):
    def __init__(self, num_classes=3):
        super(CustomResNet50, self).__init__()
        # Carica ResNet-50 pre-addestrato su ImageNet
        self.base_model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        self.name = 'CustomResNet50'

        # Congela i primi 50 layer (tutti tranne gli ultimi blocchi)
        layers_to_freeze = 50
        for i, child in enumerate(self.base_model.children()):
            if i < layers_to_freeze:
                for param in child.parameters():
                    param.requires_grad = False

        # Modifica della parte finale della rete
        num_features = self.base_model.fc.in_features  # Dimensione dell'output dell'ultimo layer convoluzionale

        # Definisci il nuovo classificatore
        self.base_model.fc = nn.Sequential(
            nn.Linear(num_features, 512),  # Primo Fully Connected
            nn.ReLU(),  # Funzione di attivazione
            nn.Dropout(0.5),  # Dropout per ridurre l'overfitting
            nn.Linear(512, num_classes),  # Strato Fully Connected finale
            nn.Softmax(dim=1)  # Strato softmax per probabilità
        )

    def forward(self, x):
        return self.base_model(x)


# Funzione per aggiungere rumore gaussiano
def add_gaussian_noise(img, mean=0, std=0.25):
    noise = torch.randn_like(img) * std + mean
    noisy_img = img + noise
    noisy_img = torch.clamp(noisy_img, 0., 1.)  # Assicuriamoci che i valori siano tra 0 e 1
    return noisy_img


# Funzione di allenamento
def train_net(model, train_loader, optimizer, criterion, epochs, save_dir='./training_results'):
    # Creazione della cartella per salvare i modelli e grafici
    os.makedirs(save_dir, exist_ok=True)

    history = {'loss': [], 'accuracy': []}
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct, total = 0, 0

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total
        history['loss'].append(epoch_loss)
        history['accuracy'].append(epoch_acc)

        # Stampa e salvataggio ogni epoca
        message = f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}"
        print(message)

        # Salvataggio del messaggio nel file di log
        log_file_path = os.path.join(save_dir, f"{model.name}_training_log.txt")
        save_log_to_file(log_file_path, message)

        # Salvataggio del modello ad ogni epoca
        torch.save(model.state_dict(), os.path.join(save_dir, f"{model.name}_model_epoch_{epoch + 1}.pth"))

        # Salvataggio dei grafici alla fine di ogni epoca
        plot_training_curves(history, save_dir, epoch + 1)

    return history


def save_log_to_file(file_path, message):
    """
    Salva un messaggio in un file di testo. Crea il file se non esiste.

    Args:
        file_path (str): Il percorso del file in cui salvare il messaggio.
        message (str): Il messaggio da salvare.
    """
    # Verifica se il file esiste, altrimenti crealo
    if not os.path.exists(file_path):
        with open(file_path, 'w') as file:  # 'w' crea il file vuoto
            pass  # File creato vuoto, pronto per l'aggiunta di messaggi

    # Aggiunge il messaggio al file
    with open(file_path, 'a') as file:
        file.write(message + '\n')


# Funzione per i grafici
def plot_training_curves(history, save_dir, epoch, name = MODEL):
    epochs = range(1, len(history['loss']) + 1)  # Epoche da 1 a N

    # Creazione dei grafici
    plt.figure(figsize=(12, 6))

    # Diagramma della perdita
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['loss'], label='Training Loss', color='red', marker='o')
    plt.title('Loss Diagram')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.grid(True)
    plt.legend()

    # Diagramma dell'accuratezza
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['accuracy'], label='Training Accuracy', color='blue', marker='o')
    plt.title('Accuracy Diagram')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()

    # Salvataggio dei grafici
    plt.savefig(os.path.join(save_dir, f"{name}_training_curves_epoch_{epoch}.png"))
    plt.close()

In [None]:
# Definizione dell'ottimizzatore e della loss function
model = CustomResNet50()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

# Allenamento della rete
train_net(model, train_loader, optimizer, criterion, epochs=EPOCHS, save_dir=os.path.join(PATH_FOLDER, f'{MODEL}_training_results'))

Epoch 1/100, Loss: 0.7160, Accuracy: 0.7234
Epoch 2/100, Loss: 0.4571, Accuracy: 0.8542
Epoch 3/100, Loss: 0.3550, Accuracy: 0.8871
Epoch 4/100, Loss: 0.3108, Accuracy: 0.8946
Epoch 5/100, Loss: 0.2894, Accuracy: 0.9014
Epoch 6/100, Loss: 0.2694, Accuracy: 0.9114
Epoch 7/100, Loss: 0.2468, Accuracy: 0.9121
Epoch 8/100, Loss: 0.2274, Accuracy: 0.9182
Epoch 9/100, Loss: 0.2175, Accuracy: 0.9230
Epoch 10/100, Loss: 0.2114, Accuracy: 0.9285
Epoch 11/100, Loss: 0.2006, Accuracy: 0.9304
Epoch 12/100, Loss: 0.1952, Accuracy: 0.9318
Epoch 13/100, Loss: 0.2023, Accuracy: 0.9347
Epoch 14/100, Loss: 0.1862, Accuracy: 0.9360
Epoch 15/100, Loss: 0.1866, Accuracy: 0.9360
Epoch 16/100, Loss: 0.1817, Accuracy: 0.9357
Epoch 17/100, Loss: 0.1774, Accuracy: 0.9392


# Applicazione tecniche di OverSampling o DownSampling, per dati sbilanciati

In [None]:
from torch.utils.data import WeightedRandomSampler, Subset
import numpy as np

# Funzione per il downsampling
def apply_downsampling(dataset, class_counts):
    """
    Riduce il dataset downsampling le classi maggioritarie per bilanciare le proporzioni.
    Args:
        dataset (Dataset): Dataset completo.
        class_counts (dict): Dizionario con la distribuzione delle classi nel formato {classe: conteggio}.
    Returns:
        Subset: Dataset ridotto.
    """
    min_count = min(class_counts.values())  # Dimensione della classe minoritaria
    indices = []
    for class_idx, count in class_counts.items():
        class_indices = [i for i, (_, label) in enumerate(dataset) if label == class_idx]
        downsampled_indices = np.random.choice(class_indices, min_count, replace=False)
        indices.extend(downsampled_indices)
    return Subset(dataset, indices)

# Funzione per l'oversampling
def apply_oversampling(dataset, class_counts):
    """
    Applica l'oversampling per le classi minoritarie utilizzando WeightedRandomSampler.
    Args:
        dataset (Dataset): Dataset completo.
        class_counts (dict): Dizionario con la distribuzione delle classi nel formato {classe: conteggio}.
    Returns:
        WeightedRandomSampler: Sampler pesato per DataLoader.
    """
    class_weights = [1.0 / class_counts[label] for _, label in dataset]
    sampler = WeightedRandomSampler(weights=class_weights, num_samples=len(dataset), replacement=True)
    return sampler

# Calcolo della distribuzione delle classi
def compute_class_distribution(dataset):
    """
    Calcola la distribuzione delle classi in un dataset.
    Args:
        dataset (Dataset): Dataset completo.
    Returns:
        dict: Distribuzione delle classi {classe: conteggio}.
    """
    class_counts = {}
    for _, label in dataset:
        class_counts[label] = class_counts.get(label, 0) + 1
    return class_counts

# Applicazione delle strategie
class_counts = compute_class_distribution(train_dataset)
print("Distribuzione originale delle classi:", class_counts)

# Scegli una strategia
strategy = "downsampling"  # Cambia a "downsampling" per valutare il downsampling

if strategy == "downsampling":
    print("Applicazione del downsampling...")
    train_dataset = apply_downsampling(train_dataset, class_counts)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
elif strategy == "oversampling":
    print("Applicazione dell'oversampling...")
    sampler = apply_oversampling(train_dataset, class_counts)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler)

# Rete e allenamento
model = CustomResNet50()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

# Allenamento con la strategia scelta
train_net(model, train_loader, optimizer, criterion, epochs=EPOCHS, save_dir=os.path.join(PATH_FOLDER, f'{MODEL}_training_results_{strategy}'))


Distribuzione originale delle classi: {0: 460, 1: 1266, 2: 3418}
Applicazione del downsampling...
Epoch 1/100, Loss: 0.7785, Accuracy: 0.6377
Epoch 2/100, Loss: 0.6238, Accuracy: 0.7616
Epoch 3/100, Loss: 0.5129, Accuracy: 0.8065
Epoch 4/100, Loss: 0.4986, Accuracy: 0.8036
Epoch 5/100, Loss: 0.4454, Accuracy: 0.8304
Epoch 6/100, Loss: 0.4479, Accuracy: 0.8275
Epoch 7/100, Loss: 0.4109, Accuracy: 0.8551
Epoch 8/100, Loss: 0.3742, Accuracy: 0.8565
Epoch 9/100, Loss: 0.3358, Accuracy: 0.8826
Epoch 10/100, Loss: 0.3013, Accuracy: 0.8841
Epoch 11/100, Loss: 0.3101, Accuracy: 0.8935
Epoch 12/100, Loss: 0.3370, Accuracy: 0.8761
Epoch 13/100, Loss: 0.3006, Accuracy: 0.8964
Epoch 14/100, Loss: 0.3133, Accuracy: 0.8913
Epoch 15/100, Loss: 0.2931, Accuracy: 0.8964
Epoch 16/100, Loss: 0.2725, Accuracy: 0.9022
Epoch 17/100, Loss: 0.2854, Accuracy: 0.8986
Epoch 18/100, Loss: 0.2988, Accuracy: 0.8855
Epoch 19/100, Loss: 0.3193, Accuracy: 0.8906
Epoch 20/100, Loss: 0.3188, Accuracy: 0.8819
Epoch 21/10