In [98]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, log_loss
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import numpy as np
import os

In [99]:
def get_data_transforms():
    data_transforms = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(512),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.46079967214138484, 0.46104556926356555, 0.4559120253283609], std=[0.23117445026151823, 0.22748220382304327, 0.2637965208115187])
        ]),
        'val': transforms.Compose([
            transforms.Resize(512),
            transforms.CenterCrop(512),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.46079967214138484, 0.46104556926356555, 0.4559120253283609], std=[0.23117445026151823, 0.22748220382304327, 0.2637965208115187])
        ]),
    }
    return data_transforms

In [100]:
def load_data(data_dir, batch_size=32, use_fixed_training_set=False, training_set_size=5):
    data_transforms = get_data_transforms()
    image_dataset = datasets.ImageFolder(data_dir, data_transforms['train'])

    if use_fixed_training_set:
        # Use a fixed-size training set (e.g., 50 data points)
        train_size = training_set_size
        val_size = len(image_dataset) - train_size
        train_dataset, val_dataset = random_split(image_dataset, [train_size, val_size])
    else:
        # Split the data into training and validation sets
        train_size = int(0.8 * len(image_dataset))
        val_size = len(image_dataset) - train_size
        train_dataset, val_dataset = random_split(image_dataset, [train_size, val_size])

    dataloaders = {
        'train': DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0),
        'val': DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    }

    dataset_sizes = {
        'train': len(train_dataset),
        'val': len(val_dataset)
    }

    class_names = image_dataset.classes

    return dataloaders, dataset_sizes, class_names


In [101]:
def create_model(num_classes):
    # Load a pre-trained ResNet18 model
    model = models.resnet18(pretrained=True)

    # Get the input dimensions of the last layer (fully connected layer)
    num_ftrs = model.fc.in_features

    # Replace the last layer with a new one that has `num_classes` outputs
    model.fc = nn.Linear(num_ftrs, num_classes)

    return model

In [102]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=3):
    best_model_wts = model.state_dict()
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

    print(f'Best val Acc: {best_acc:.4f}')

    model.load_state_dict(best_model_wts)
    return model


In [103]:
def evaluate_model(model, dataloader, criterion, class_names):
    model.eval()  # Set model to evaluation mode
    all_labels = []
    all_preds = []
    all_probs = []  # Store probabilities (softmax outputs)
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Calculate the loss

            running_loss += loss.item() * inputs.size(0)

            # Get class predictions and probabilities
            _, preds = torch.max(outputs, 1)  # Get the class with the highest score
            probs = torch.nn.functional.softmax(outputs, dim=1)  # Get probabilities

            all_labels.extend(labels.cpu().numpy())  # Store true labels
            all_preds.extend(preds.cpu().numpy())    # Store predicted labels
            all_probs.extend(probs.cpu().numpy())    # Store probabilities

    avg_loss = running_loss / len(dataloader.dataset)  # Calculate average loss

    # Calculate overall metrics
    accuracy = accuracy_score(all_labels, all_preds)
    
    # Handle undefined metrics by setting zero_division=0
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted', zero_division=0
    )
    
    # Make sure log_loss receives probabilities (already handled with softmax)
    logloss = log_loss(all_labels, all_probs)

    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    print(f'Log Loss: {logloss:.4f}')

    return accuracy, precision, recall, f1, logloss

In [104]:

if __name__ == "__main__":
    #the dataset is in the same parent folder with the name campusVision
    data_dir = "campusVision"                 
    
    dataloaders, dataset_sizes, class_names = load_data(data_dir, use_fixed_training_set=False, training_set_size=10)

    device = torch.device("cpu")
    model = create_model(num_classes=len(class_names))
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    # Train the model
    model = train_model(model, dataloaders, criterion, optimizer, num_epochs=15)

    # Evaluate the model on the validation set
    print("Validation Set Performance:")
    evaluate_model(model, dataloaders['val'], criterion, class_names)

    model_save_path = "models/resnet-pretrained-large.pth"

    # Save the best model
    torch.save(model.state_dict(), model_save_path)

Epoch 0/14
----------
train Loss: 0.9089 Acc: 0.7538
val Loss: 0.3250 Acc: 0.9153
Epoch 1/14
----------
train Loss: 0.2722 Acc: 0.9301
val Loss: 0.1745 Acc: 0.9565
Epoch 2/14
----------
train Loss: 0.1852 Acc: 0.9517
val Loss: 0.1414 Acc: 0.9637
Epoch 3/14
----------
train Loss: 0.1464 Acc: 0.9612
val Loss: 0.1081 Acc: 0.9713
Epoch 4/14
----------
train Loss: 0.1159 Acc: 0.9657
val Loss: 0.0990 Acc: 0.9720
Epoch 5/14
----------
train Loss: 0.0990 Acc: 0.9733
val Loss: 0.1027 Acc: 0.9720
Epoch 6/14
----------
train Loss: 0.0892 Acc: 0.9755
val Loss: 0.0820 Acc: 0.9762
Epoch 7/14
----------
train Loss: 0.0872 Acc: 0.9731
val Loss: 0.0815 Acc: 0.9784
Epoch 8/14
----------
train Loss: 0.0741 Acc: 0.9793
val Loss: 0.0711 Acc: 0.9803
Epoch 9/14
----------
train Loss: 0.0719 Acc: 0.9797
val Loss: 0.0710 Acc: 0.9800
Epoch 10/14
----------
train Loss: 0.0685 Acc: 0.9801
val Loss: 0.0730 Acc: 0.9803
Epoch 11/14
----------
train Loss: 0.0673 Acc: 0.9799
val Loss: 0.0574 Acc: 0.9837
Epoch 12/14
--

(0.9849, 0.985, 0.9849, 0.9849, 0.0545)