In [None]:
!pip install -r /content/drive/MyDrive/Dissertation/requirements.txt -qqq


from datetime import datetime
import json
import torch
import torchvision
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import os
import numpy as np
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from torch import argmax
from tqdm import tqdm
from PIL import Image
from matplotlib import pyplot as plt
import seaborn as sn
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score
)
from torchsampler import ImbalancedDatasetSampler
import wandb
import numpy as np
from os import listdir
from os.path import join, isdir
from glob import glob
import cv2
import timeit
import timm
from sklearn.metrics import confusion_matrix
from torchvision.datasets import ImageFolder

torch.manual_seed(17)


Several custom classes were made. The first, ImageResize(), resizes an image using the PIL image resizer set to the bilinear function. A custom class was generated even though Pytorch has its own resizer, as discussed in the following article, https://blog.zuru.tech/machine-learning/2021/08/09/the-dangers-behind-image-resizing there are resizing issues associated with certain libraries such as Pytorch which introduce artefacts or don't provide sufficient antialiasing in the resized images using libraries such as Pytorch.


In [None]:
class ImageResize(object):
        """
        PIL's resize performs better than pytorch
        https://blog.zuru.tech/machine-learning/2021/08/09/the-dangers-behind-image-resizing
        """

        def __init__(self, new_h, new_w):
            self.new_h = new_h
            self.new_w = new_w

        def __call__(self, image):
            image = image.resize((self.new_w, self.new_h), resample=Image.BILINEAR)
            return image

## Network generation

Five networks were created using several architectures loaded from the Pytroch library.  Within each model-specific function, the feature extraction layers have been frozen and the final classification layer unfrozen, to allow this final layer to train on our Lesion dataset.

In [None]:
def create_network_densenet():
        net = models.densenet161(pretrained=True)
        net.classifier = nn.Linear(in_features=2208, out_features=250, bias=True)

        for param in net.parameters():
                param.requires_grad = False
        for param in net.classifier.parameters():
                param.requires_grad = True

        return net


def create_network_efficientnet():
        net = models.efficientnet_b0(pretrained=True)
        net.classifier[1] = nn.Linear(in_features=1280, out_features=250, bias=True)

        for param in net.parameters():
                param.requires_grad = False
        for param in net.classifier.parameters():
                param.requires_grad = True

        return net

def create_network_mobilenet():
        net = models.mobilenet_v3_small(pretrained=True)
        net.classifier[3] = nn.Linear(in_features=1024, out_features=250, bias=True)

        for param in net.parameters():
            param.requires_grad = False
        for param in net.classifier.parameters():
            param.requires_grad = True

        return net


def create_network_resnet():
        net = models.resnet18(pretrained=True)
        net.fc = nn.Linear(in_features=512, out_features=250, bias=True)

        for param in net.parameters():
            param.requires_grad = False
        for param in net.fc.parameters():
            param.requires_grad = True

        return net


def create_network_regnet():
        net = models.regnet_y_400mf(pretrained=True)
        net.fc = nn.Linear(in_features=440, out_features=250, bias=True)

        for param in net.parameters():
                param.requires_grad = False
        for param in net.fc.parameters():
                param.requires_grad = True

        return net

In [None]:
def run(config):
   
    #changing the device to GPU rather than CPU if it is available,
    # this will decrease model training time.
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("Using device {}".format(device))


   
    BASE_PATH = 'birds/part_2_models/'
    now = str(datetime.now()).replace(":", "-").replace(" ", "_")
    RESULTS_PATH = os.path.join(BASE_PATH, now)
    os.makedirs(RESULTS_PATH, exist_ok=False)

    with open(os.path.join(RESULTS_PATH, "config.json"), "w") as f:
        json.dump(config, f, indent=4)

    if config.get("use_wandb"):
        # this integrates the third-party platform, Weights and Biases,
        # with this notebook.
        run = wandb.init(
            project="part_2_bird",
            entity="sarahlouise",
            config=config,
        )


    # ## Train, validation and test set
    #
    # The train, validation and test sets were loaded and transformed using the Pytorch functions and amended classes previously discussed.

    batch_size = config["batch_size"]
    Imagenet_NV = ((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))

    print("Configuring datasets")
    trainset = ImageFolder(
        os.path.join(config.get("data_path"), 'train'),
        transform=transforms.Compose([
                            transforms.RandomVerticalFlip(0.5),
                            transforms.RandomHorizontalFlip(0.5),
                            transforms.RandomApply(nn.ModuleList([transforms.ColorJitter(),
                                                            transforms.GaussianBlur(3)]), p=0.1),
                            ImageResize(224,224),
                            transforms.PILToTensor(),
                            transforms.ConvertImageDtype(torch.float),
                            transforms.Normalize(*Imagenet_NV),

                        ]))


    trainloader = torch.utils.data.DataLoader(
        trainset,
        batch_size=batch_size,
        num_workers=5,
        sampler=ImbalancedDatasetSampler(trainset, callback_get_label=lambda x: x.targets),
    )

    valset = ImageFolder(
        os.path.join(config.get("data_path"), 'valid'),
        transform=transforms.Compose([
                            ImageResize(224,224),
                            transforms.PILToTensor(),
                            transforms.ConvertImageDtype(torch.float),
                            transforms.Normalize(*Imagenet_NV),
                        ]))

    valloader = torch.utils.data.DataLoader(
        valset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=2
    )

    testset = ImageFolder(
        os.path.join(config.get("data_path"), 'test'),
        transform=transforms.Compose([
                                ImageResize(224,224),
                                transforms.PILToTensor(),
                                transforms.ConvertImageDtype(torch.float),
                                transforms.Normalize(*Imagenet_NV),
                            ]))

    testloader = torch.utils.data.DataLoader(
        testset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=2
    )

    classes = sorted(os.listdir(os.path.join(config.get("data_path"), 'train')))


    #making logical arguments from the configuration dictionary definied earlier
    model_name = config["model"]

    if model_name == "densenet":
        net = create_network_densenet()
    elif model_name == "efficientnet":
        net = create_network_efficientnet()
    elif model_name == "mobilenet":
        net = create_network_mobilenet()
    elif model_name == "resnet":
        net = create_network_resnet()
    elif model_name == "regnet":
        net = create_network_regnet()
    else:
        raise ValueError("Model name not supported '{}'".format(model_name))

    print("Moving network to GPU")
    net = net.to(device)

    # ## Training configurations
    

    # defining the training configurations
    no_of_epochs = config["epochs"]
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(
        net.parameters(),
        lr=config["lr"],
        amsgrad=config["use_amsgrad"]
    )
    if config["use_warm_restarts"]:
        scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer,
            T_0=100,
        )
    else:
        scheduler = optim.lr_scheduler.CosineAnnealingLR(
            optimizer,
            round((len(trainset)/batch_size)*no_of_epochs)
        )


    # ##  Model training
    #
    # This section included the training for the model. The network was set to train, a mini-batch was passed through the model and then the loss was then backpropagated. Each trained model was also automatically saved.

    if config.get("use_wandb"):
        wandb.watch(net)


    if not config.get("use_pretrained"):
        print("Starting training")
        for epoch in range(no_of_epochs):  # loop over the dataset multiple times
            print("Epoch {} of {}".format(epoch, no_of_epochs))
            epoch_running_loss = []
            epoch_val_metric = []
            for i, data in tqdm(enumerate(trainloader, 1), total=len(trainloader)):
                # get the inputs; data is a list of [inputs, labels]
                net.train()
                inputs, labels = data
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward + backward
                outputs = net(inputs)
                # outputs = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                scheduler.step()

                # saving loss statistics
                loss_val = loss.item()
                epoch_running_loss.append(loss_val)
                if config.get("use_wandb"):
                    wandb.log(
                        {
                            "train/bce_logits_loss": loss_val,
                            "train/lr": scheduler.get_last_lr()[0]
                        }
                    )

                # adding in valiation data testing
                if i % 65 == 0:
                    model_name = "{name}-epoch-{epoch}-step-{step}.pth".format(
                        name=config["name"],
                        epoch=epoch,
                        step=i,
                    )
                    model_path = os.path.join(RESULTS_PATH, model_name)
                    torch.save(net.state_dict(), model_path)

                    predicted = []
                    actual = []

                    print("Running validation")
                    net.eval()
                    with torch.no_grad():
                        for i, data in enumerate(tqdm(valloader, total=len(valloader))):
                            inputs, labels = data
                            inputs = inputs.to(device)
                            labels = labels.to(device)

                            outputs = net(inputs)
                            loss = criterion(outputs, labels)
                            if config.get("use_wandb"):
                                wandb.log({"val/loss": loss})

                            # collect the correct predictions for each class
                            actual.extend(labels.detach().cpu().numpy())
                            predicted.extend(torch.argmax(outputs, 1).detach().cpu().numpy())

                        mb_acc = accuracy_score(actual, predicted)

                        precision = precision_score(actual, predicted, average="macro")

                        recall = recall_score(actual, predicted, average="macro")
                        if config.get("use_wandb"):
                            wandb.log({"val/accuracy": mb_acc})
                            wandb.log({"val/precision": precision})
                            wandb.log({"val/recall": recall})


        print('Finished Training')
        # saving each trained model
        model_name = '{}-final.pth'.format(
            config["name"]
        )
        model_path = os.path.join(RESULTS_PATH, model_name)
        torch.save(net.state_dict(), model_path)

        print("Final model saved")

    # ##  Model testing
    #
    # The model was set to evaluate mode for testing. The predicted and actual results were saved, and used for generating a confusion matrix and metrics.
    #
    # A confusion matrix was generated, showing the number of true positives and negatives, and false positives and negatives. Additionally, the metrics were calculated, including the accuracy score, precision and recall. The corresponding graphs were created using Weights and Biases.


    if config.get("use_pretrained"):
        net.load_state_dict(
            torch.load(
                config.get("use_pretrained")
            )
        )

    predicted = []
    actual = []

    print("Testing model")
    # again no gradients needed
    net.eval()
    with torch.no_grad():
        for i, data in enumerate(tqdm(testloader, total=len(testloader))):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = net(inputs)
            actual.extend(labels.detach().cpu().numpy())
            predicted.extend(torch.argmax(outputs, 1).detach().cpu().numpy())

    print("Calculating performance metrics")
    model_accuracy = accuracy_score(actual, predicted)
    model_precision = precision = precision_score(actual, predicted,
                                                average="macro")
    model_recall = recall_score(actual, predicted,
                                average="macro")

    if config.get("use_wandb"):
        wandb.log({"test/accuracy": model_accuracy})
        wandb.log({"test/precision": model_precision})
        wandb.log({"test/recall": model_recall})
    else:
        print("test/accuracy", model_accuracy)
        print("test/precision", model_precision)
        print("test/recall", model_recall)

    plot_save_path = os.path.join(RESULTS_PATH, '{}_confusion_matrix.png'.format(
        config["name"]
    ))
    print("Plotting confusion matrix in {}".format(plot_save_path))

    print("Calculating")
    conf_matrix = confusion_matrix(y_true=actual, y_pred=predicted)
    print("Done")
    fig =plt.figure(figsize=(20, 20))
    ax = fig.add_subplot(1,1,1)
    ax.matshow(conf_matrix, cmap=plt.cm.Reds)


    plt.xlabel('Predictions')
    plt.ylabel('Actuals')
    plt.title('Confusion Matrix')
    plt.savefig(plot_save_path)
    # plt.show()

    if config.get("use_wandb"):
        run.finish()
