In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [None]:
# Imports here
import os

import cv2
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
from torchvision.transforms import Compose, ToTensor
import itertools


class CustomDataset(Dataset):
    """ Masked faces dataset
        0 = 'mask'
        1 = 'human'
        2 = 'non-human'
    """

    def __init__(self, dataset):
        self.dataset = dataset

        self.transformations = Compose([ToTensor()])

    def __getitem__(self, key):
        return [
            self.transformations(self.dataset[key][0]),
            torch.tensor(self.dataset[key][1])
        ]

    def __len__(self):
        return len(self.dataset)


datasetPath = "/gdrive/My Drive/Ai Project Run to Cloud/Dataset"
maskDatasetPath = datasetPath + "/masked dataset"
humanDatasetPath = datasetPath + "/human dataset"
nonHumanDatasetPath = datasetPath + "/non-human dataset"
preprocessedDataPath = datasetPath + "/data.npy"
resultsPath = datasetPath + "/Results"
modelName = "timepass.t7"
batchSize = 142


class Data:
    def __init__(self):
        self.data = []
        self.trainDataLoader = []
        self.validationDataLoader = []
        self.testDataLoader = []
        self.normalizedWeights = []
        self.labelsDict = {0: "Masked Human", 1: "Human", 2: "Non-Human"}

    def buildData(self):
        # Mask Dataset
        for path in os.listdir(maskDatasetPath):
            print(maskDatasetPath + "/" + path)
            img = cv2.imread(maskDatasetPath + "/" + path, cv2.IMREAD_COLOR)
            img = cv2.resize(img, (100, 100))
            self.data.append([np.array(img), 0])

        # Human Dataset
        for path in os.listdir(humanDatasetPath):
            print(humanDatasetPath + "/" + path)
            img = cv2.imread(humanDatasetPath + "/" + path, cv2.IMREAD_COLOR)
            img = cv2.resize(img, (100, 100))
            self.data.append([np.array(img), 1])

        # Non Human Dataset
        for path in os.listdir(nonHumanDatasetPath):
            print(nonHumanDatasetPath + "/" + path)
            img = cv2.imread(nonHumanDatasetPath + "/" + path, cv2.IMREAD_COLOR)
            img = cv2.resize(img, (100, 100))
            self.data.append([np.array(img), 2])

        np.random.shuffle(self.data)
        np.save(preprocessedDataPath, self.data)

    def loadPreprocessedData(self):
        return np.load(preprocessedDataPath, allow_pickle=True)

    def buildDataLoader(self, build=False):
        print("Build DataLoader")
        if build:
            self.buildData()
        trainVal, test = train_test_split(self.loadPreprocessedData(), test_size=0.05, random_state=0)
        train, val = train_test_split(trainVal, test_size=0.15, random_state=0)
        trainDataset = CustomDataset(train)
        validationDataset = CustomDataset(val)
        testDataset = CustomDataset(test)
        self.trainDataLoader = DataLoader(trainDataset, batch_size=batchSize)
        self.validationDataLoader = DataLoader(validationDataset, batch_size=batchSize)
        self.testDataLoader = DataLoader(testDataset, batch_size=batchSize)
        trainMaskImages = 0
        trainHumanImages = 0
        trainNonHumanImages = 0
        for row in train:
            if row[1] == 0:
                trainMaskImages = trainMaskImages + 1
            elif row[1] == 1:
                trainHumanImages = trainHumanImages + 1
            else:
                trainNonHumanImages = trainNonHumanImages + 1

        numberOfImagesByCategory = [trainMaskImages, trainHumanImages, trainNonHumanImages]
        self.normalizedWeights = [1 - (x / sum(numberOfImagesByCategory)) for x in numberOfImagesByCategory]
        print("Normalized Weights: ", self.normalizedWeights)

class CNN(nn.Module):
    def __init__(self):
      print("Building CNN")
      super().__init__()
      self.network = nn.Sequential(
          nn.Conv2d(3, 100, kernel_size=3, padding=1),
          nn.BatchNorm2d(100),
          nn.ReLU(inplace=True),
          nn.Conv2d(100, 128, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(128),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2, 2),  # output: 128 x 8 x 8
          nn.Dropout2d(p=0.05),

          nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(256),
          nn.ReLU(inplace=True),
          nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
          nn.BatchNorm2d(512),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2, 2),  # output: 512 x 25 x 25
          nn.Dropout2d(p=0.05),

          nn.Flatten(),
          nn.Linear(320000, 512), # 512 x 25 x 25 = 320000
          nn.ReLU(inplace=True),
          nn.Dropout2d(p=0.2),
          nn.Linear(512, 256),
          nn.ReLU(inplace=True),
          nn.Dropout2d(p=0.2),
          nn.Linear(256, 3))

    def forward(self, xb):
      return self.network(xb)


class TrainTest:
    def __init__(self, normalizedWeights, model: CNN):
        self.model = model.to(self.getDevice())
        self.crossEntropyLoss = nn.CrossEntropyLoss(weight=torch.tensor(normalizedWeights).to(self.getDevice()))
        self.optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    def moveToDevice(self, content, newDevice):
        if isinstance(content, list):
            return [self.moveToDevice(x, newDevice) for x in content]
        return content.to(newDevice, non_blocking=True)

    def accuracy(self, outputs, labels):
        _, predictions = torch.max(outputs, dim=1)
        return torch.tensor(torch.sum(predictions == labels).item() / len(predictions))

    def validationPhase(self, batch):
        with torch.no_grad():
            images, labels = batch
            outputs = self.model(images)
            loss = self.crossEntropyLoss(outputs, labels.long())
            validationAccuracy = self.accuracy(outputs, labels)
            return [validationAccuracy, loss]

    def validationEnd(self, outputs):
        batchLoss = [output[1] for output in outputs]
        batchAcc = [output[0] for output in outputs]
        epochLoss = torch.stack(batchLoss).mean()
        epochAcc = torch.stack(batchAcc).mean()
        return [epochLoss.item(), epochAcc.item()]

    def testModel(self, dataLoader):
        self.model.eval()
        with torch.no_grad():
            outputs = []
            for batch in dataLoader:
                batch = self.moveToDevice(batch, self.getDevice())
                outputs.append(self.validationPhase(batch))
            return self.validationEnd(outputs)

    def trainModel(self, epochs, data):
      print("Training Model")
      epochResults = []
      tolerance = 1e-4
      prevValLoss = 1
      for epoch in range(epochs):
          self.model.train()
          trainAccuracies = []
          trainLosses = []

          # TRAIN STEP
          for batch in data.trainDataLoader:
              batch = self.moveToDevice(batch, self.getDevice())
              images, labels = batch
              outputs = self.model(images)
              trainLoss = self.crossEntropyLoss(outputs, labels.long())
              trainAcc = self.accuracy(outputs, labels)
              trainAccuracies.append(trainAcc)
              trainLosses.append(trainLoss)
              trainLoss.backward()
              self.optimizer.step()
              self.optimizer.zero_grad()

          # VALIDATION STEP
          validationLoss, validationAcc = self.testModel(data.validationDataLoader)
          # RESULTS
          result = [torch.stack(trainAccuracies).mean().item(), torch.stack(trainLosses).mean().item(),
                    validationLoss, validationAcc]
          epochResults.append(result)

          # Print Result
          print("Epoch: {}, trainLoss: {:.4f}, trainAcc: {:.4f}, valLoss: {:.4f}, valAcc: {:.4f}"
                .format(epoch, result[1], result[0], result[2], result[3]))

          # SAVE MODEL
          if prevValLoss - validationLoss > tolerance:
              prevValLoss = validationLoss
              print('==> Saving model ...')
              state = {
                  'net': self.model,
                  'epoch': epoch,
                  'state_dict': self.model.state_dict()
              }
              torch.save(state, datasetPath + "/" + modelName)
      return epochResults

    def getDevice(self):
        isCudaAvailable = torch.cuda.is_available()
        device = torch.device('cuda') if isCudaAvailable else torch.device('cpu')
        return device

    def loadModel(self):
        loadedModel = torch.load(datasetPath + "/" + modelName, map_location=self.getDevice())
        self.model = CNN()
        self.model.load_state_dict(loadedModel["state_dict"])
        self.model = self.model.to(self.getDevice())

    def evaluate(self, dataLoader):
        predictionLabels = []
        actualLabels = []
        self.model.eval()
        with torch.no_grad():
            for batch in dataLoader:
                batch = self.moveToDevice(batch, self.getDevice())
                images, labels = batch
                outputs = self.model(images)
                _, predictions = torch.max(outputs, dim=1)
                predictionLabels.append(predictions.detach().cpu().numpy())
                actualLabels.append(labels.detach().cpu().numpy())
        return [item for sublist in predictionLabels for item in sublist], [item for sublist in actualLabels for item in
                                                                            sublist]

    def printClassificationReportAndPlotConfusionMatrix(self, data):
        classes = ["Masked Human", "Human", "Non-Human"]
        print("Test Classification Report")
        testPredictionLabels, testActualLabels = self.evaluate(data.testDataLoader)
        print(classification_report(testActualLabels, testPredictionLabels))
        self.plot_confusion_matrix(confusion_matrix(testActualLabels, testPredictionLabels), classes,
                                   title="Test Confusion Matrix")

        print("Validation Classification Report")
        validationPredictionLabels, validationActualLabels = self.evaluate(data.validationDataLoader)
        print(classification_report(validationActualLabels, validationPredictionLabels))
        self.plot_confusion_matrix(confusion_matrix(validationActualLabels, validationPredictionLabels), classes,
                                   title="Validation Confusion Matrix")
        
        print("Train Classification Report")
        trainPredictionLabels, trainActualLabels = self.evaluate(data.trainDataLoader)
        print(classification_report(trainActualLabels, trainPredictionLabels))
        self.plot_confusion_matrix(confusion_matrix(trainActualLabels, trainPredictionLabels), classes,
                                   title="Train Confusion Matrix")

    def plot_confusion_matrix(self, cm, classes, normalize=False, title='Confusion matrix',
                            cmap=plt.cm.Blues):
        """
        This function prints and plots the confusion matrix.
        Normalization can be applied by setting `normalize=True`.
        """
        plt.figure()
        plt.imshow(cm, interpolation='nearest', cmap=cmap)
        plt.title(title)
        plt.colorbar()
        tick_marks = np.arange(len(classes))
        plt.xticks(tick_marks, classes, rotation=45)
        plt.yticks(tick_marks, classes)

        if normalize:
            cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

        thresh = cm.max() / 2.
        for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
            plt.text(j, i, cm[i, j],
                      horizontalalignment="center",
                      color="white" if cm[i, j] > thresh else "black")

        plt.axis('scaled')
        plt.ylabel('True label')
        plt.xlabel('Predicted label')
        plt.savefig(resultsPath + "/" + title)

    def plotTestPredictions(self, data):
        testPredictionLabels, testActualLabels = self.evaluate(data.testDataLoader)
        f, axarr = plt.subplots(1, 10, figsize=(100, 100))
        for batch in data.testDataLoader:
            images, labels = batch
            for i in range(10):
                img = images[i].detach().cpu().numpy().transpose(1, 2, 0)
                axarr[i].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
                axarr[i].set_title("PL: {0}, AL: {1}".format(testPredictionLabels[i], testActualLabels[i]))
                axarr[i].set_xticks([])
                axarr[i].set_yticks([])
            plt.show(block=True)
            f.savefig(resultsPath + "/Test-Predictions.png")
            break

trainModel = True
plotTestingPredictions = False
buildData = False
dataObject = Data()
dataObject.buildDataLoader(buildData)
cnn = CNN()
trainTest = TrainTest(dataObject.normalizedWeights, cnn)
if trainModel:
  trainTest.trainModel(10, dataObject)
trainTest.loadModel()
trainTest.printClassificationReportAndPlotConfusionMatrix(dataObject)
if plotTestingPredictions:
  trainTest.plotTestPredictions(dataObject)
