In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import numpy as np
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
from collections import Counter
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from sklearn.model_selection import ParameterGrid
from models import MRIResNetClassifier, MRI_VGG16_Classifier
from multiprocessing import cpu_count

In [3]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet stats
])

train_data_full = datasets.ImageFolder(root="cleaned/Training", transform=transform)
class_names = train_data_full.classes

train_size = int(0.8 * len(train_data_full))  # 80% za trening
val_size = len(train_data_full) - train_size  # 20% za validaciju

train_data, val_data = random_split(train_data_full, [train_size, val_size])
test_data = datasets.ImageFolder(root="cleaned/Testing", transform=transform)

In [None]:
def plot_class_distribution(data, title):
    counts = Counter(data.targets if hasattr(data, 'targets') else [x[1] for x in data])
    labels = [class_names[i] for i in range(len(class_names))]
    plt.figure(figsize=(4, 4))
    plt.pie([counts[i] for i in range(len(class_names))], labels=labels, autopct='%1.1f%%', startangle=90)
    plt.title(title)
    plt.axis('equal')
    plt.show()

plot_class_distribution(train_data, "Train Data Distribution")
plot_class_distribution(val_data, "Validation Data Distribution")
plot_class_distribution(test_data, "Test Data Distribution")

In [None]:
train_targets = [train_data_full[i][1] for i in train_data.indices]
val_targets = [train_data_full[i][1] for i in val_data.indices]
test_targets = [label for _, label in test_data] 

def count_classes(targets, num_classes):
    counts = [0] * num_classes
    for target in targets:
        counts[target] += 1
    return counts

train_counts = count_classes(train_targets, len(class_names))
val_counts = count_classes(val_targets, len(class_names))
test_counts = count_classes(test_targets, len(class_names))


print("Train-Val-Test Split per Class:")
for i, class_name in enumerate(class_names):
    train_count = train_counts[i]
    val_count = val_counts[i]
    test_count = test_counts[i]
    total_count = train_count + val_count + test_count
    
    train_percentage = (train_count / total_count) * 100 if total_count > 0 else 0
    val_percentage = (val_count / total_count) * 100 if total_count > 0 else 0
    test_percentage = (test_count / total_count) * 100 if total_count > 0 else 0
    
    print(f"{class_name}: Train = {train_count} ({train_percentage:.2f}%), "
          f"Val = {val_count} ({val_percentage:.2f}%), "
          f"Test = {test_count} ({test_percentage:.2f}%)")

In [6]:
num_workers = cpu_count() // 2

train_loader = DataLoader(train_data, batch_size=16, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_data, batch_size=16, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False, num_workers=num_workers)

In [7]:
def train(model, epochs, train_loader, val_loader, optimizer, criterion, device):
    best_val_acc = 0
    
    for epoch in range(epochs):
        model.train()
        total_train_loss = 0.0
        train_pred_correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
            train_pred_correct += (outputs.argmax(1) == labels).sum().item()
            total += len(labels)

        train_accuracy = train_pred_correct / total

        model.eval()
        total_val_loss = 0.0
        val_pred_correct = 0
        total_val = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                total_val_loss += loss.item()
                val_pred_correct += (outputs.argmax(1) == labels).sum().item()
                total_val += len(labels)

        val_accuracy = val_pred_correct / total_val

        if val_accuracy > best_val_acc:
            best_val_acc = val_accuracy

        print(f"Epoch {epoch+1}: Train Loss={total_train_loss/len(train_loader):.4f}, "
              f"Train Acc={train_accuracy:.4f}, Val Loss={total_val_loss/len(val_loader):.4f}, "
              f"Val Acc={val_accuracy:.4f}")
        
    return best_val_acc


def evaluate(model, loader, device):
    model.eval()
    all_labels = []
    all_predictions = []
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            pred_labels = outputs.argmax(dim=1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(pred_labels.cpu().numpy())

    conf_matrix = confusion_matrix(all_labels, all_predictions)
    acc = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions, average="macro")

    print("Confusion matrix:\n", conf_matrix)
    print('Accuracy:', acc)
    print('F1 score:', f1)
    return acc, f1
                

In [8]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
#RESNET pretrained
param_grid = {
    'lr': [0.001],
    'batch_size': [32, 64, 128],
}

best_params = None
best_accuracy = 0.0

for params in ParameterGrid(param_grid):
    print(f"Testing parameters: {params}")
    model = MRIResNetClassifier(len(train_data_full.classes), pretrained=True).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
    criterion = torch.nn.CrossEntropyLoss()
    train_loader = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True)
    val_loader = DataLoader(val_data, batch_size=params['batch_size'], shuffle=False)

    best_val_accuracy = train(model, 10, train_loader, val_loader, optimizer, criterion, device)

    if best_val_accuracy > best_accuracy:
        best_accuracy = best_val_accuracy
        best_params = params
        best_model = model

print("Best Hyperparameters:", best_params)

print('Test evaluation:')
evaluate(best_model, test_loader, device)

In [None]:
#RESNET koji nije pretrained
param_grid = {
    'lr': [0.0001],
    'batch_size': [64],
}

best_params = None
best_accuracy = 0.0

for params in ParameterGrid(param_grid):
    print(f"Testing parameters: {params}")
    model = MRIResNetClassifier(len(train_data_full.classes)).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
    criterion = torch.nn.CrossEntropyLoss()
    train_loader = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True)
    val_loader = DataLoader(val_data, batch_size=params['batch_size'], shuffle=False)

    best_val_accuracy = train(model, 20, train_loader, val_loader, optimizer, criterion, device)
    
    if best_val_accuracy > best_accuracy:
        best_accuracy = best_val_accuracy
        best_params = params
        best_model = model

print("Best Hyperparameters:", best_params)

print('Test evaluation:')
evaluate(best_model, test_loader, device)

In [None]:
param_grid = {
    'lr': [0.01, 0.001],
    'batch_size': [16, 32],
}

best_params = None
best_accuracy = 0.0

for params in ParameterGrid(param_grid):
    print(f"Testing parameters: {params}")
    model = MRI_VGG16_Classifier(len(train_data_full.classes)).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
    criterion = torch.nn.CrossEntropyLoss()
    train_loader = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True)
    val_loader = DataLoader(val_data, batch_size=params['batch_size'], shuffle=False)

    best_val_accuracy = train(model, 10, train_loader, val_loader, optimizer, criterion, device)
    
    if best_val_accuracy > best_accuracy:
        best_accuracy = best_val_accuracy
        best_params = params
        best_model = model

print("Best Hyperparameters:", best_params)

print('Test evaluation:')
evaluate(best_model, test_loader, device)