<a href="https://colab.research.google.com/github/liranbd1/FaceMaskDetectionLab/blob/main/Training_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing needed libraries

In [None]:
!pip install comet_ml

#Connecting to GDrive
All of the dataset and saving of the models are stored in GDrive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


# Initializing Comet

In [None]:
# Import comet_ml at the top of your file
from comet_ml import Experiment

# Create an experiment with your api key
experiment = Experiment(
    api_key="TleaivY1A4bzGtbcYhLmrRyAQ",
    project_name="mask-detection",
    workspace="liranbd1",
)

# Imports

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
import torchvision
from torchvision import transforms
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split

# Custom Dataset - FaceMaskDetectionDataset

In [None]:
class FaceMaskDetectionDataset(Dataset):

    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.dataframe = self.process_df(annotations_file)
        self.img_dir = img_dir
        self.transform = transform

        self.target_transform = target_transform

        self.classes = ["face_with_mask",
                        "face_no_mask"]

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
      # Getting the row of the next image 
      img_row = self.dataframe.iloc[idx]

      # Path to image
      img_path = os.path.join(self.img_dir, img_row['name'])
      
      # Bounding box cordinates 
      # x -> A coordinate of two points 
      # y -> A coordinate of two points
      # Fixed the names when pulling them here to make it more readable
      x1 = img_row['x1']
      y1 = img_row['x2']
      x2 = img_row['y1']
      y2 = img_row['y2']

      # Cropping the image to see the face
      cropped_img = Image.open(img_path).crop((x1,y1, x2,y2))
      label = img_row['classname']

      # Applying transforms
      if self.transform:
        image = self.transform(cropped_img)
      if self.target_transform:
        label = self.target_transform(label)
      label = self.classes.index(label)
      return image, label

    # Processing the input df, since we are getting multiple rows for the same
    # face with classes that are not important for the mask/no mask classifier
    def process_df(csv_path):
      df = pd.read_csv(csv_path)
      df = df.drop(df[(df['classname'] != 'face_other_covering') & (df['classname'] != 'face_no_mask') & (df['classname'] != 'face_with_mask') & (df['classname'] != 'face_with_mask_incorrect')].index) # This section will remove all the rows which their classname is not one of the mentiond here.
      df.loc[df['classname'] == 'face_with_mask_incorrect', 'classname'] = 'face_with_mask' # Replace all face_with_mask_incorrect with face_with_mask
      df.loc[df['classname'] == 'face_other_covering', 'classname'] = 'face_no_mask' # Replace all face_other_covering with face_with_no_mask
      return df

# Util functions

## Confusion matrix

In [None]:
def confusion(prediction, truth):
    """ Returns the confusion matrix for the values in the `prediction` and `truth`
    tensors, i.e. the amount of positions where the values of `prediction`
    and `truth` are
    - 1 and 1 (True Positive)
    - 1 and 0 (False Positive)
    - 0 and 0 (True Negative)
    - 0 and 1 (False Negative)
    """

    confusion_vector = prediction / truth
    # Element-wise division of the 2 tensors returns a new tensor which holds a
    # unique value for each case:
    #   1     where prediction and truth are 1 (True Positive)
    #   inf   where prediction is 1 and truth is 0 (False Positive)
    #   nan   where prediction and truth are 0 (True Negative)
    #   0     where prediction is 0 and truth is 1 (False Negative)

    true_positives = torch.sum(confusion_vector == 1).item()
    true_negatives = torch.sum(torch.isnan(confusion_vector)).item()
    false_positives = torch.sum(confusion_vector == float('inf')).item()
    false_negatives = torch.sum(confusion_vector == 0).item()

    return true_positives, true_negatives, false_positives, false_negatives


## Log metrics

In [None]:
# Logging the data into comet for easy save and access
def log_data(accuracy, precision, recall, f1, loss, epoch, state = "train"):
  experiment.log_metric(f'{state}_loss', loss, epoch)
  experiment.log_metric(f'{state}_accuracy', accuracy, epoch)
  experiment.log_metric(f'{state}_recall', recall, epoch)
  experiment.log_metric(f'{state}_precision', precision, epoch)
  experiment.log_metric(f'{state}_f1', f1, epoch)
  

# Global Parameters


## Hyper Parameters

In [None]:
root = "/content/gdrive/MyDrive/MaskDetection"
model_save_path = os.path.join(root, "model.pth")
csv_path = os.path.join(root, "train.csv")
images_path = os.path.join(root, "images")

device = 'cuda' if torch.cuda.is_available() else 'cpu'

torch.manual_seed(0)

transform = transforms.Compose(
  [
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomGrayscale(p=0.1),
    transforms.ColorJitter(brightness=0.1,saturation=0.1,hue=0.25),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  ])

target_transform = transforms.ToTensor()


dataset = FaceMaskDetectionDataset(csv_path, images_path, transform)


classifier = torchvision.models.resnet18(pretrained=True)
classifier.fc = nn.Linear(512, 2)
classifier.to(device)


criterion = nn.CrossEntropyLoss()
learning_rate = 0.001
epochs = 25
batch_size = 4
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)


experiment.log_parameters({
    "learning_rate" : learning_rate,
    "epochs" : epochs,
    "batch_size": batch_size
})

## Trainset and Testset dataloaders

In [None]:
trainset, testset = train_test_split(dataset, test_size = 0.2)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, 
                                          shuffle=True, num_workers=4)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, 
                                         shuffle=True, num_workers=4)


# Training Loop

In [None]:
train_losses = []
test_losses = []
patience_counter = 0
patience_lvl = 5

for epoch in range(epochs):  # loop over the dataset multiple times
  train_loss = 0.0
  train_tp = 0
  train_tn = 0
  train_fp = 0
  train_fn = 0

  for i, data in enumerate(trainloader):
    # get the inputs; data is a list of [inputs, labels]
    inputs, labels = data
    labels = labels.to(cuda)
    inputs = inputs.to(cuda)
    # zero the parameter gradients
    optimizer.zero_grad()

    # forward + backward + optimize
    outputs = net(inputs)
    _, predicted = torch.max(outputs.data, 1)

    batch_tp, batch_tn, batch_fp, batch_fn = confusion(predicted, labels)
    train_tp += batch_tp
    train_tn += batch_tn
    train_fp += batch_fp
    train_fn += batch_fn

    loss = criterion(outputs, labels.to(cuda))
    loss.backward()

    torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)

    optimizer.step()

    train_loss += loss.item()

  train_total = train_tp + train_tn + train_fp + train_fn
  train_accuracy = (train_tp + train_tn) / (train_total * 1.0)
  train_precision = (1.0 * train_tp) / (train_tp + train_fp)
  train_recall = (1.0 * train_tp) / (train_tp + train_fn)
  train_f1 = 2 / ((1 / train_precision) + (1 / train_recall))
  
  log_data(train_accuracy, train_percision, 
           train_recall, train_f1, train_loss/train_total, epoch)
  
  test_loss = 0.0
  test_tp = 0
  test_fp = 0
  test_tn = 0
  test_fn = 0

  with torch.no_grad():
    for i, data in enumerate(testloader):
      # get the inputs; data is a list of [inputs, labels]
      inputs, labels = data
      labels = labels.to(cuda)
      inputs = inputs.to(cuda)

      outputs = net(inputs.to(cuda))

      _, predicted = torch.max(outputs.data, 1)

      true_preds = predicted == labels
      false_preds = predicted != labels

      batch_tp, batch_tn, batch_fp, batch_fn = confusion(predicted, labels)
      test_tp += batch_tp
      test_tn += batch_tn
      test_fp += batch_fp
      test_fn += batch_fn

      loss = criterion(outputs, labels.to(cuda))
      test_loss += loss.item()

  test_total = test_tp + test_tn + test_fp + test_fn
  test_accuracy = (test_tp + test_tn) / (test_total * 1.0)
  test_precision = (1.0 * test_tp) / (test_tp + test_fp)
  test_recall = (1.0 * test_tp) / (test_tp + test_fn)
  test_f1 = 2 / ((1 / test_precision) + (1 / test_recall))
  test_run_loss = test_loss / test_total

  log_data(test_accuracy, test_percision, 
           test_recall, test_f1, test_run_loss, epoch, "test")
  
  test_losses.append(test_run_loss)


  # print statistics
  print('[epoch %d][train] loss: %.3f acc: %.3f f1: %.3f || [test] loss: %.3f acc: %.3f f1: %.3f' %
        (epoch + 1, train_loss / train_total, train_accuracy, train_f1,
                    test_run_loss, test_accuracy, test_f1))

  # Early stopping           
  if (min(test_losses) > test_run_loss or len(test_losses) == 1):
    patience_counter = 0
    torch.save(net.state_dict(), model_save_path)
  else:
    patience_counter += 1
    if (patience_counter == patience_lvl):
      print("Patience level maxed stopping training loop")
      break

print('Finished Training')
