### Classifier.py

In [1]:
from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, models, transforms
import time
import copy
import sklearn.metrics

class Classifier:
    """
        model_name (str): Pretrained model being used
        output_classes: Binary classification, so 2
        batch_size (int): Batch size
        num_epochs (int): Number of epochs
        feature_extract (bool): Are we using the CNN as a feature extractor(changing only the final layer)
                        or retraining for our problem

        Adapted from the pytorch tutorials on using pre-trained models to train new classifiers.
    """

    model_name = None
    output_classes = 2

    def __init__(self, model_name, output_classes = 2, batch_size = 8, num_epochs=15, feature_extract=True):
        self.model_name = model_name
        self.output_classes = output_classes
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.feature_extract = feature_extract
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def train_model(self, model, criterion, optimizer, dataloaders, dataset_sizes, is_inception=False):
        since = time.time()

        best_model_weights = copy.deepcopy(model.state_dict())
        best_accuracy = 0.0
        val_acc_history = []

        per_epoch_loss = []
        per_epoch_accuracy = []

        for epoch in range(self.num_epochs):
            print('Epoch {}/{}'.format(epoch, self.num_epochs - 1))
            print('-' * 10)

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(self.device)
                    labels = labels.to(self.device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
   
                        if is_inception and phase == 'train':
                            # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                            outputs, aux_outputs = model(inputs)
                            loss1 = criterion(outputs, labels)
                            loss2 = criterion(aux_outputs, labels)
                            loss = loss1 + 0.4*loss2
                        else:
                            outputs = model(inputs)
                            loss = criterion(outputs, labels)

                        _, preds = torch.max(outputs, 1)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                    # if phase == 'train':
                    #     scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                if phase == 'train':
                    per_epoch_accuracy.append(epoch_acc)
                    per_epoch_loss.append(epoch_loss)

                print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                    phase, epoch_loss, epoch_acc))

                # deep copy the model
                if phase == 'val' and epoch_acc > best_accuracy:
                    best_accuracy = epoch_acc
                    best_model_weights = copy.deepcopy(model.state_dict())
                if phase == 'val':
                    val_acc_history.append(epoch_acc)

            print()

        time_elapsed = time.time() - since
        print('Training complete in {:.0f}m {:.0f}s'.format(
            time_elapsed // 60, time_elapsed % 60))
        print('Best val Acc: {:4f}'.format(best_accuracy))

        # load best model weights
        model.load_state_dict(best_model_weights)
        return model, val_acc_history, per_epoch_loss, per_epoch_accuracy


    def set_requires_grad(self, model):
        if self.feature_extract:
            for param in model.parameters():
                param.requires_grad = False

    def initPretrainedModel(self, inputSize):
        model = None
        input_size = 0
        if self.model_name == 'alexnet' and self.feature_extract:
            model = torchvision.models.alexnet(pretrained=True)
            self.set_requires_grad(model)
            num_ftrs = model.classifier[6].in_features
            model.classifier[6] = nn.Linear(num_ftrs, self.output_classes)
            input_size = inputSize

        if self.model_name == 'inception' and self.feature_extract:
            print("Initializing model: Inception_V3")
            model = models.inception_v3(pretrained=True)
            self.set_requires_grad(model)
            # Handle the auxilary net
            num_ftrs = model.AuxLogits.fc.in_features
            model.AuxLogits.fc = nn.Linear(num_ftrs, self.output_classes)
            # Handle the primary net
            num_ftrs = model.fc.in_features
            model.fc = nn.Linear(num_ftrs, self.output_classes)
            input_size = inputSize
        
        if self.model_name == "vgg" and self.feature_extract:
            model = models.vgg11_bn(pretrained=True)
            self.set_requires_grad(model)
            num_ftrs = model.classifier[6].in_features
            model.classifier[6] = nn.Linear(num_ftrs,self.output_classes)
            input_size = inputSize

        if self.model_name == 'densenet' and self.feature_extract:
            """
            Densenet 121
            """
            print("Initializing to use pre-trained DenseNet 121 for feature extraction...")
            model = models.densenet121(pretrained=True)
            self.set_requires_grad(model)
            num_ftrs = model.classifier.in_features
            model.classifier = nn.Linear(num_ftrs, self.output_classes)
            input_size = inputSize
            
        if self.model_name == "resnet":
            """Resnet 18
            """
            print("Initializing to use pre-trained Resnet 18 for feature extraction...")
            model = models.resnet18(pretrained=True)
            self.set_requires_grad(model)
            num_ftrs = model.fc.in_features
            model.fc = nn.Linear(num_ftrs, self.output_classes)
            input_size = inputSize

        return model

    def testModel(self, dataloaders, model, classes, dataset_sizes, batch_size):
        correct = 0
        total = dataset_sizes['test']
        predictions = []

        y_actual = []
        y_pred = []

        model.eval()
        with torch.no_grad():
            for index, (inputs, labels) in enumerate(dataloaders['test'], 0):
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                correct += (predicted == labels).sum().item()
                                
                samples = dataloaders['test'].dataset.samples[index*batch_size : index*batch_size + batch_size]
                predicted_classes = [classes[predicted[j]] for j in range(predicted.size()[0])]
                sample_names = [s[0] for s in samples]
                
                predictions.extend(list(zip(sample_names, predicted_classes)))
                # labels = labels.cpu()
                # predicted = predicted.cpu()
                y_actual.extend(labels.cpu().numpy())
                y_pred.extend(predicted.cpu().numpy())

        try:
            print(f"Accuracy (Sklearn): {sklearn.metrics.accuracy_score(y_actual, y_pred)}")
            print(f"F1-Score (Sklearn): {sklearn.metrics.f1_score(y_actual, y_pred)}")
            print(f"Precision Score: {sklearn.metrics.precision_score(y_actual, y_pred)}")
            print(f"Recall Score: {sklearn.metrics.recall_score(y_actual, y_pred)}")
            print(f"\nConfusion Matrix:\n{sklearn.metrics.confusion_matrix(y_actual, y_pred)}")
            print(f"\nClassification Report:\n{sklearn.metrics.classification_report(y_actual, y_pred)}")
        except RuntimeError:
            print("Error computing metrics: \n", RuntimeError)

        print('\n\nAccuracy of the network on the test images: %d %%' % (100 * correct / total))


        return predictions

### Dataset.py

In [2]:
from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy

class DataSet:
    """
    Note: Ensure data directory is in ImageFolder format.

    """
    
    data_dir = None

    def __init__(self, data_dir):
        self.data_dir = data_dir

    @staticmethod
    def initDataLoaders(data_dir, batch_size):
        data_transforms = DataSet.setUpDataLoaderTransformers()
        image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val', 'test']}
        dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                                    shuffle=True, num_workers=4)
                    for x in ['train', 'val']}
        dataloaders['test'] = torch.utils.data.DataLoader(image_datasets['test'], batch_size=batch_size,
                                                    shuffle=False, num_workers=4)
        dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
        class_names = image_datasets['train'].classes

        return dataloaders, dataset_sizes, class_names

    @staticmethod
    def setUpDataLoaderTransformers(inputSize = 224):
                
        data_transforms = {
            'train': transforms.Compose([
                transforms.RandomResizedCrop(inputSize),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
            'val': transforms.Compose([
                transforms.Resize(inputSize),
                transforms.CenterCrop(inputSize),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
            'test': transforms.Compose([
                transforms.Resize(inputSize),
                transforms.CenterCrop(inputSize),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
        }
    
        # data_transforms = {
        #     'train': transforms.Compose([
        #         transforms.Resize((inputSize,inputSize)),
        #         transforms.ToTensor(),
        #         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        #     ]),
        #     'val': transforms.Compose([
        #         transforms.Resize((inputSize,inputSize)),
        #         transforms.ToTensor(),
        #         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        #     ]),
        #     'test': transforms.Compose([
        #         transforms.Resize((224,224)),
        #         transforms.ToTensor(),
        #         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        #     ]),
        # }

        return data_transforms

### Densenet.py

In [None]:
from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
from torchvision import datasets

import pickle
import os
import numpy as np

from classifier import Classifier
from optimizer import Optimizer
from dataset import DataSet

if __name__ == '__main__':

    data_dir = 'data'
    model_name = 'densenet'
    output_classes = 2
    feature_extract = True
    batch_size = 8
    num_epochs = 20

    learningRate = 0.001
    momentum = 0.9

    run_id = 'l_' + str(learningRate) + '_m_' + str(momentum)

#     saved_data_structures_dir = 'saved_data_structures/'

    densenetClassifier = Classifier(model_name, output_classes, batch_size, num_epochs, feature_extract)
    model = densenetClassifier.initPretrainedModel(224)

    dataloaders_dict, dataset_sizes, class_names = DataSet.initDataLoaders(data_dir, batch_size)
    data_transforms = DataSet.setUpDataLoaderTransformers()

    image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in
                      ['train', 'val', 'test']}

    # Detect if we have a GPU available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    sgdOptimizer = Optimizer(device)
    optimizer_ft = sgdOptimizer.optimize(model, feature_extract, learningRate, momentum)

    criterion = nn.CrossEntropyLoss()

    model, val_acc_history, per_epoch_loss, per_epoch_accuracy = densenetClassifier.train_model(model,
                                                criterion,
                                                optimizer_ft,
                                                dataloaders_dict,
                                                dataset_sizes)

    print(f"Validation Accuracy History:\n{val_acc_history}")
    print(f"\nPer epoch loss:\n{per_epoch_loss}")
    print(f"\nPer epoch accuracy:\n{per_epoch_accuracy}")

    # Save lists for plots:
    print("Saving per_epoch_losses, per_epoch_accuracy to disk for analysis...")
    epoch_losses_file = 'epoch_losses_' + run_id + '.pickle'
    with open(epoch_losses_file, 'wb') as handle:
        pickle.dump(per_epoch_loss, handle)

    epoch_accuracies_file = 'epoch_accuracies_' + run_id + '.pickle'
    with open(epoch_accuracies_file, 'wb') as handle:
        pickle.dump(per_epoch_accuracy, handle)

    val_acc_history_file = 'val_acc_history_' + run_id + '.pickle'
    with open(val_acc_history_file, 'wb') as handle:
        pickle.dump(val_acc_history, handle)

    print("Saving final model to disk...")
    save_as_name = 'densenetFeatureExtraction_' + run_id + '.pt'
    torch.save({
        'name': 'densenet_feature_extraction_' + run_id,
        'epoch': num_epochs,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer_ft.state_dict(),
    }, save_as_name)

    # testing
    print("Running independent test...")
    state = torch.load(save_as_name)
    model.load_state_dict(state['model_state_dict'])
    predictions = densenetClassifier.testModel(dataloaders_dict, model, class_names, dataset_sizes, batch_size=8)

    # save predicted values
    save_as_name = 'predictedLabelsDensenet_' + run_id + '.csv'
    np.savetxt(save_as_name, predictions, fmt='%s')
