# M115 - Image Analysis and Processing, Assignment 2 (Notebook 2)

---

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import random_split
from torch.utils.data import WeightedRandomSampler
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
import time
import random
import os
import matplotlib.pyplot as plt
import numpy as np
import pprint

# Consider aesthetics and consistency
plt.rcParams.update({
        "text.usetex": True,
        "font.size": 15,
        'mathtext.default': 'regular',
        'axes.titlesize': 16,
        "axes.labelsize": 16,
        "legend.fontsize": 15,
        "xtick.labelsize": 15,
        "ytick.labelsize": 15,
        'figure.titlesize': 16,
        'figure.figsize': (12, 7),
        'text.latex.preamble': r'\usepackage{amsmath,amssymb}',
        "font.family": "serif",
        "font.serif": "computer modern roman",
        })

Starting off, check the available resources.

In [2]:
# Check if GPU is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Get number of CPU cores
num_cpu_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cpu_cores}")

# Get GPU name
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    print(f"GPU name: {gpu_name}")
else:
    print("No GPU available")

Device: cuda
Number of CPU cores: 2
GPU name: Tesla P100-PCIE-16GB


Kaggle's Tesla P100 will be used for training and evaluating the neural model. Also 2 workers (2 CPUs) will be employed for the loading and batching of the data.

## Balance dataset

As was observed in Notebook 1, there is a class imbalance, with more pneumonia images than normal images. This can lead to a biased model that may not perform well on the under-represented class. To address this issue, the dataset will be balanced by applying oversampling, undersampling, or a combination of both to make the number of samples in each class equal. This can potentially help the CNN's performance on the under-represented class.

In [3]:
def create_weighted_sampler(h, dataset):
    targets = [label for _, label in dataset]
    class_counts = np.bincount(targets)
    class_weights = 1.0 / class_counts
    weights = [class_weights[label] for label in targets]
    sampler = WeightedRandomSampler(weights, len(weights))
    return sampler

## Data preprocessing

As opposed to Notebook 1, the preprocessing here will be significantly different given than a CNN will be used.

The images' center will now be cropped, as opposed to resizing them, which could potentially distort their aspect ratio. The original proportions of the image are preserved by this alteration, aiding the CNN in better learning from the dataset and comprehending the inherent characteristics of the images.

Data augmentation techniques will also be leveraged to enhance the diversity of the training dataset. New training examples are generated through subtle alterations to the original images, such as rotation, flipping, and translation. Overfitting is effectively prevented by this strategy, enabling the CNN to become more resilient and proficient in generalising unseen data.

To correctly implement these techniques, a separate transformation for the training data has been designed, ensuring the test set remains unaffected by these augmentations. The same cautious approach is taken with the validation set, maintaining its authenticity by ensuring it does not undergo any data augmentation.

Normalisation will still be performed on the whole dataset of course.

Here's a brief overview of the aforementioned pipeline:

<div style="text-align:center">
<img src="https://i.imgur.com/PHkp7ms.png" alt="workflow3" width="500" height="600"/>
</div>

With the augmented volume of data, the CNN can be trained in more epochs without the risk of overfitting. This presents an opportunity to further refine the CNN's learning through additional training iterations, which should result in more accurate predictions.

With these enhancements and considerations in place, and enhanced ability of the CNN to perform better on unseen data while maintaining robustness against overfitting can reasonably be expected.

In [4]:
def prepare_data(h):
    data_transforms = transforms.Compose([
        transforms.Resize(size=(h["image_size"], h["image_size"])),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                            std=[0.229, 0.224, 0.225])
    ])

    data_transforms_train = transforms.Compose([
        transforms.RandomRotation(20),                                               # Randomly rotate image
        transforms.RandomHorizontalFlip(p=0.5),                                      # Randomly flip image
        transforms.ColorJitter(brightness=0.1, contrast=0.1,
                               saturation=0.1, hue=0.1),                             # Randomly change the b/c/h
        transforms.RandomApply([transforms.RandomAffine(0, 
                               translate=(0.1, 0.1))], p=0.5),                       # Randomly apply affine transformations with translation
        transforms.RandomApply([transforms.RandomPerspective(distortion_scale=0.2)],
                               p=0.5),                                               # Randomly apply perspective transformations
        transforms.Resize(size=(h["image_size"], h["image_size"])),                  # Resize data
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],                             # Normalise data
                            std=[0.229, 0.224, 0.225])
    ])


    # Define the validation split percentage
    val_split = 0.2

    # Load the datasets
    train_dataset = datasets.ImageFolder("/kaggle/input/chest-xray-pneumonia/chest_xray/train/",
                                         transform=data_transforms_train)
    val_dataset = datasets.ImageFolder("/kaggle/input/chest-xray-pneumonia/chest_xray/train/",
                                       transform=data_transforms)    
    test_dataset = datasets.ImageFolder("/kaggle/input/chest-xray-pneumonia/chest_xray/test/",
                                        transform=data_transforms)

    # Create the data loaders for train, validation, and test sets
    if (h["balance"]):
        sampler = create_weighted_sampler(h, train_dataset)
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=h["batch_size"],
                                                   sampler=sampler, num_workers=2)    
    else:
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=h["batch_size"],
                                                   shuffle=True, num_workers=2)    
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=h["batch_size"],
                                             shuffle=True, num_workers=2)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=h["batch_size"],
                                              shuffle=True, num_workers=2)

    return train_loader, val_loader, test_loader

## Using a Convolutional Neural Network

As mentioned before, the model of choice will be a Convolutional Neural Network. CNNs are designed to work with image data and can capture local patterns more efficiently than fully connected layers. They can also handle larger input images, which can lead to better performance.

The custom CNN architecture will consist of multiple convolutional layers, followed by max-pooling layers, and finally a fully connected layer for classification. The image size will be set to 256x256, as CNNs can handle larger images better.

Here's a brief overview of the CNN's architecture:

<div style="text-align:center">
<img src="" alt="workflow" width="500" height="600"/>
</div>

In [5]:
def create_model(h, device):
    if (h["model"]=="cnn"):
        model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Dropout(0.25),
            nn.Linear(64 * (h["image_size"] // 8) * (h["image_size"] // 8), 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 2)
        )
        model = model.to(device)
        return model

### Training

Provided the augmentation of the dataset, the model will be trained for 10 epochs, using a 0.001 learning rate.

In [6]:
h = {
    "balance": True,
    "num_epochs": 10,
    "batch_size": 256,
    "image_size": 256,
    "lr": 0.001,
    "model": "cnn"
}


def train_model(h, model, train_loader, val_loader, optimizer, criterion, device):
    train_loss_history = []
    val_loss_history = []

    start_time = time.time()
    num_epochs = h["num_epochs"]

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        progress_bar = tqdm(train_loader, desc=f"Training epoch {epoch + 1}/{num_epochs}", leave=False, unit="mini-batch")
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)      
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())


        val_loss, _, _, _ = evaluate_model(h, model, val_loader, criterion, device)
        
        # Store the loss history
        train_loss = running_loss / len(train_loader)
        train_loss_history.append(train_loss)
        val_loss_history.append(val_loss)

        # Calculate elapsed time and remaining time
        elapsed_time = time.time() - start_time
        avg_time_per_epoch = elapsed_time / (epoch + 1)
        remaining_epochs = num_epochs - (epoch + 1)
        remaining_time = avg_time_per_epoch * remaining_epochs

        # Convert remaining time to minutes and seconds
        remaining_time_min, remaining_time_sec = divmod(remaining_time, 60)

        print(f"Epoch [{epoch + 1}/{num_epochs}]: Train Loss: {running_loss / len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Remaining Time: {remaining_time_min:.0f}m {remaining_time_sec:.0f}s")

    return train_loss_history, val_loss_history



### Evaluation

The F1 score will be used as a basic evaluation metric. It is useful for evaluating models when there is class imbalance, as it provides a more balanced measure of the model's performance. The F1 score ranges from 0 (worst) to 1 (best).

In [7]:
def evaluate_model(h, model, data_loader, criterion, device):
    true_labels = []
    predicted_labels = []
    total_loss = 0.0
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)          

            outputs = model(inputs)

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

    epoch_loss = total_loss / len(data_loader)
    epoch_accuracy = correct / total

    return epoch_loss, epoch_accuracy, true_labels, predicted_labels


def plot_metrics(h, train_loss_history, val_loss_history, test_loss, test_accuracy, true_labels, predicted_labels):
    print(f"Accuracy on the test set: {test_accuracy:.2%}")

    # Calculate precision, recall, and F1 score using the accumulated true labels and predictions
    precision = precision_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels)
    print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1 score: {f1:.2f}")

    # Calculate the confusion matrix using the accumulated true labels and predictions
    cm = confusion_matrix(true_labels, predicted_labels)

    # Visualize the confusion matrix
    plt.figure()
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Normal", "Pneumonia"])
    plt.savefig(f'/kaggle/working/confMatrix.pdf',
                format='pdf', bbox_inches='tight', dpi=300, transparent=True)
    disp.plot()

    # Plot the learning curves
    plt.figure()
    plt.plot(train_loss_history, label='Train Loss')
    plt.plot(val_loss_history, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss history')
    plt.legend()
    plt.savefig(f'/kaggle/working/LossHistory.pdf',
                format='pdf', bbox_inches='tight', dpi=300, transparent=True)
    
    plt.show()  

### Running the model and Statistical Significance

Since the training process is random, the training and testing will be repeated several times to measure the statistical significance of the results.

In [None]:
def check_solution(h, device, verbose):
    train_loader, val_loader, test_loader = prepare_data(h)
    model = create_model(h, device)
    optimizer = torch.optim.Adam(model.parameters(), lr=h["lr"])
    criterion = nn.CrossEntropyLoss()
    train_loss_history, val_loss_history = train_model(h, model, train_loader,
                                                       val_loader, optimizer, criterion, device)
    test_loss, test_accuracy, true_labels, predicted_labels = evaluate_model(h, model, test_loader,
                                                                             criterion, device)
    if verbose:
        plot_metrics(h, train_loss_history, val_loss_history, test_loss, test_accuracy,
                     true_labels, predicted_labels)

    f1 = f1_score(true_labels, predicted_labels)

    return f1, test_accuracy


# Print hyperparameters for records
print("Hyperparameters:")
pprint.pprint(h)

f1_array = np.array([])
accuracy_array = np.array([])
start_time = time.time()

repeats = 5
for i in range(repeats):
    print(f"Running solution {i+1}/{repeats}")
    f1, accuracy = check_solution(h, device, verbose=(i==0))
    print(f"F1 = {f1:.2f}, accuracy = {accuracy:.2f} ")
    f1_array = np.append(f1_array, f1)
    accuracy_array = np.append(accuracy_array, accuracy) 

# Calculate elapsed time and remaining time
repeat_time = (time.time() - start_time) / repeats
repeat_time_min, repeat_time_sec = divmod(repeat_time, 60)

# Printing final results
print("Results")
print(f"F1 List: {f1_array}")
print(f"Accuracy List: {accuracy_array}")
print(f"F1: {np.mean(f1_array):.1%} (+-{np.std(f1_array):.1%})")
print(f"Accuracy: {np.mean(accuracy_array):.1%} (+-{np.std(accuracy_array):.1%})")
print(f"Time of one solution: {repeat_time_min:.0f}m {repeat_time_sec:.0f}s")

print(f" | {np.mean(f1_array):.1%} (+-{np.std(f1_array):.1%}) | {np.mean(accuracy_array):.1%} (+-{np.std(accuracy_array):.1%}) | {repeat_time_min:.0f}m {repeat_time_sec:.0f}s")

Hyperparameters:
{'balance': True,
 'batch_size': 256,
 'image_size': 256,
 'lr': 0.001,
 'model': 'cnn',
 'num_epochs': 10}
Running solution 1/5
