# **Task 1 | Mask Recognition**

***Goal :*** *Detect human faces on videos and check whether or not they have a mask on*. 

In this notebook we implement two different models to perform the task :
- [Face Detection with Cascade Classifier](https://docs.opencv.org/3.4/db/d28/tutorial_cascade_classifier.html) + [ResNet18](https://pytorch.org/hub/pytorch_vision_resnet/)
- [Faster-RCNN (ResNet50)](https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html)

___
___

## **1. INITIALIZATION**

### *1.1 IMPORTS*

In [2]:
from IPython.display import display, clear_output

import cv2
import numpy as np
import os
import pandas as pd
import random as rd
import time

from tools import engine, utils

from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

import torch
import torch.nn as nn
import torchvision

In [3]:
# to fill the `requirement.txt` file we use the following line of code:
import session_info
session_info.show()

In [4]:
torch.cuda.empty_cache()

# setting device on GPU if available, else CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("device: {}".format(device))

device: cuda


### *1.3. DATA LOADING*

In [5]:
# data preprocessing
!python ./DataPreprocessing.py
clear_output()

In [6]:
data_dir_path = "data/FaceMaskDetection_Processed/" # path to the directory with the relevant data
images_dir_path = data_dir_path + "images/"         # path to the directory with the images
images_files = os.listdir(images_dir_path)           # list of files in the image directory

annotations = pd.read_csv(data_dir_path + "annotations.csv", index_col=None) # dataframe with information about the images and their bounding boxes
display(annotations)

Unnamed: 0,image_id,image_height,image_width,box_id,box_label,xmin,xmax,ymin,ymax
0,0,366,512,0,1,79,109,105,142
1,0,366,512,1,3,185,226,100,144
2,0,366,512,2,1,325,360,90,141
3,1,366,512,0,1,402,432,105,142
4,2,111,90,0,1,30,60,37,74
...,...,...,...,...,...,...,...,...,...
8910,5182,266,276,0,2,92,184,88,177
8911,5183,266,276,0,2,92,184,88,177
8912,5184,266,276,0,2,92,184,88,177
8913,5185,266,276,0,2,92,184,88,177


In [7]:
class FaceMaskDataset1(Dataset):

    def __init__(self, annotations, images_dir_path, images_files):
        self.annotations = annotations
        self.images_dir_path = images_dir_path
        self.images_files = images_files
    
    def __len__(self):
        return len(self.annotations)
    
    def __getitem__(self, idx):

        img_id = int(self.annotations.iloc[idx]["image_id"])
        img = cv2.imread(self.images_dir_path+str(img_id)+".png")
        xmin = self.annotations.iloc[idx]["xmin"]
        xmax = self.annotations.iloc[idx]["xmax"]
        ymin = self.annotations.iloc[idx]["ymin"]
        ymax = self.annotations.iloc[idx]["ymax"]
        img = transforms.Resize((256,256))(torch.Tensor(img[ymin:ymax,xmin:xmax]).permute(2,0,1))
        label = torch.zeros(3) 
        label[int(self.annotations.iloc[idx]["box_label"])-1] = 1
        return img, label

In [None]:
class FaceMaskDataset2(Dataset):

    def __init__(self, annotations, images_dir_path, images_files):
        self.annotations = annotations
        self.images_dir_path = images_dir_path
        self.images_files = images_files
    
    def __len__(self):
        return len(self.images_files)
    
    def __getitem__(self, idx):

        img = cv2.imread(self.images_dir_path+self.images_files[idx])
        img = transforms.Resize((256,256))(torch.Tensor(img).permute(2,0,1))

        img_id = int(self.images_files[idx][:-4])
        img_annotations = self.annotations[self.annotations["image_id"] == img_id]

        img_height = int(list(img_annotations["image_height"])[0])
        img_width = int(list(img_annotations["image_width"])[0])
        xmins = [256*xmin/img_width for xmin in list(img_annotations["xmin"])]
        ymins = [256*ymin/img_height for ymin in list(img_annotations["ymin"])]
        xmaxs = [256*xmax/img_width for xmax in list(img_annotations["xmax"])]
        ymaxs = [256*ymax/img_height for ymax in list(img_annotations["ymax"])]

        target =  {
            "boxes": torch.as_tensor([[xmins[i], ymins[i], xmaxs[i], ymaxs[i]] for i in range(len(img_annotations))], dtype=torch.float32),
            "labels": torch.as_tensor(list(img_annotations["box_label"]), dtype=torch.int64),
            "image_id": torch.as_tensor([img_id]),
            "area": torch.as_tensor([(xmaxs[i]-xmins[i])*(ymaxs[i]-ymins[i]) for i in range(len(img_annotations))], dtype=torch.float32),
            "iscrowd": torch.zeros((len(img_annotations),), dtype=torch.int64)
        }

        return img, target

90% of the whole dataset is dedicated to training and the 10% left is used as a test dataset.

In [79]:
FMD1 = FaceMaskDataset1(annotations, images_dir_path, images_files)
FMD2 = FaceMaskDataset2(annotations, images_dir_path, images_files)

train_ratio = 0.9
trainset1, testset1 = torch.utils.data.random_split(FMD1, [int(train_ratio*len(FMD1)), len(FMD1)-int(train_ratio*len(FMD1))])
trainset2, testset2 = torch.utils.data.random_split(FMD2, [int(train_ratio*len(FMD2)), len(FMD2)-int(train_ratio*len(FMD2))])

batch_size1 = 64
batch_size2 = 2

trainloader1 = DataLoader(trainset1, batch_size=batch_size1, shuffle=True)
testloader1 = DataLoader(testset1, batch_size=batch_size1, shuffle=True)

trainloader2 = DataLoader(trainset2, batch_size=batch_size2, shuffle=True, collate_fn=utils.collate_fn)
testloader2 = DataLoader(testset2, batch_size=batch_size2, shuffle=True, collate_fn=utils.collate_fn)

___

## **2. THE MODELS**

### *2.1. Defining the models*

In [80]:
FaceDetection_model_frontal = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
FaceDetection_model_profile = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_profileface.xml")

In [81]:
modelRSN18 = models.resnet18(pretrained=True)
for param in modelRSN18.parameters():
    param.requires_grad = False
modelRSN18.fc = nn.Sequential(
    nn.Linear(512,256),
    nn.ReLU(),
    nn.Linear(256,64),
    nn.ReLU(),
    nn.Linear(64,3),
    nn.Softmax(dim=1))
modelRSN18.to(device)
try:
    modelRSN18.load_state_dict(torch.load("./models/MaskRecognitionRSN18.pt"))
    print("model loaded")
except:
    print("new model")
    pass
modelRSN18.eval()
clear_output()

In [82]:
# load a model pre-trained on COCO
modelRCNN = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
# get number of input features for the classifier
in_features = modelRCNN.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
modelRCNN.roi_heads.box_predictor = FastRCNNPredictor(in_features,num_classes=4)
modelRCNN.to(device)
try:
    modelRCNN.load_state_dict(torch.load("./models/MaskRecognitionFasterRCNN.pt"))
    print("model loaded")
except:
    print("new model")
    pass
modelRCNN.eval()
clear_output()

### *2.2. Training the model*

In [83]:
optimizerRSN18 = torch.optim.Adam(modelRSN18.parameters(), lr=0.0005)
lr_schedulerRSN18 = torch.optim.lr_scheduler.StepLR(optimizerRSN18, step_size=10, gamma=0.1)
criterionRSN18 = nn.CrossEntropyLoss().cuda()

num_epochsRSN18 = 50
epoch_print_frequence = 5

In [86]:
s = time.time()

train_losses, test_losses, train_accuracies, test_accuracies = [], [], [], []

for epoch in range(num_epochsRSN18):

    running_loss_train, running_loss_test, running_acc_train, running_acc_test = 0,0,0,0

    for train in [True, False]:

        if train:
            dataloader = trainloader1
            modelRSN18.train()
        else:
            dataloader = testloader1
            modelRSN18.eval()

        for inputs,labels in dataloader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            if train:
                optimizerRSN18.zero_grad()

            outputs = modelRSN18(inputs)
            loss = criterionRSN18(outputs, labels)

            if train:
                loss.backward()
                optimizerRSN18.step()
                running_loss_train += loss.item()
                running_acc_train += np.sum(torch.argmax(labels, dim=1).cpu().detach().numpy() == torch.argmax(outputs, dim=1).cpu().detach().numpy())
                
            else:
                running_loss_test += loss.item()
                running_acc_test += np.sum(torch.argmax(labels, dim=1).cpu().detach().numpy() == torch.argmax(outputs, dim=1).cpu().detach().numpy()) 

    running_loss_train /= len(trainloader1)
    running_loss_test /= len(testloader1)
    running_acc_train /= len(trainset1)
    running_acc_test /= len(testset1)

    train_losses.append(running_loss_train)
    test_losses.append(running_loss_test)
    train_accuracies.append(running_acc_train)
    test_accuracies.append(running_acc_test)

    if (epoch+1) % epoch_print_frequence == 0:
        print("epochs {} ({} s) | train loss : {} | test loss : {} | train acc : {} | test acc : {}".format(
            epoch+1,
            int(time.time()-s),
            int(1000000*running_loss_train)/1000000,
            int(1000000*running_loss_test)/1000000,
            int(1000000*running_acc_train)/1000000,
            int(1000000*running_acc_test)/1000000)
        )
    
    lr_schedulerRSN18.step()

resultsRSN18 = train_losses, test_losses, train_accuracies, test_accuracies

epochs 5 (319 s) | train loss : 0.623683 | test loss : 0.633484 | train acc : 0.931322 | test acc : 0.918161
epochs 10 (639 s) | train loss : 0.62172 | test loss : 0.630959 | train acc : 0.933565 | test acc : 0.918161
epochs 15 (957 s) | train loss : 0.618867 | test loss : 0.630644 | train acc : 0.936432 | test acc : 0.919282
epochs 20 (1274 s) | train loss : 0.621803 | test loss : 0.635121 | train acc : 0.93531 | test acc : 0.914798
epochs 25 (1595 s) | train loss : 0.620529 | test loss : 0.628856 | train acc : 0.934937 | test acc : 0.92713
epochs 30 (1923 s) | train loss : 0.620722 | test loss : 0.630009 | train acc : 0.933441 | test acc : 0.921524
epochs 35 (2255 s) | train loss : 0.619567 | test loss : 0.629727 | train acc : 0.935186 | test acc : 0.922645
epochs 40 (2601 s) | train loss : 0.620408 | test loss : 0.630005 | train acc : 0.934438 | test acc : 0.922645
epochs 45 (2953 s) | train loss : 0.621301 | test loss : 0.630365 | train acc : 0.932568 | test acc : 0.922645
epochs 5

In [None]:
paramsRCNN = [p for p in modelRCNN.parameters() if p.requires_grad]
optimizerRCNN = torch.optim.SGD(paramsRCNN, lr=0.0005, momentum=0.9, weight_decay=0.0005)
lr_schedulerRCNN = torch.optim.lr_scheduler.StepLR(optimizerRCNN, step_size=10, gamma=0.1)

num_epochsRCNN = 10

In [None]:
for epoch in range(num_epochsRCNN):
    # train for one epoch, printing every 10 iterations
    engine.train_one_epoch(modelRCNN, optimizerRCNN, trainloader2, device, epoch, print_freq=len(trainloader2)//3)
    # update the learning rate
    lr_schedulerRCNN.step()
    # evaluate on the test dataset
    engine.evaluate(modelRCNN, testloader2, device=device)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Epoch: [0]  [   0/2334]  eta: 0:22:45  lr: 0.000001  loss: 0.0330 (0.0330)  loss_classifier: 0.0094 (0.0094)  loss_box_reg: 0.0228 (0.0228)  loss_objectness: 0.0000 (0.0000)  loss_rpn_box_reg: 0.0009 (0.0009)  time: 0.5850  data: 0.0150  max mem: 2491
Epoch: [0]  [ 778/2334]  eta: 0:14:45  lr: 0.000390  loss: 0.0295 (0.0613)  loss_classifier: 0.0115 (0.0187)  loss_box_reg: 0.0168 (0.0376)  loss_objectness: 0.0000 (0.0011)  loss_rpn_box_reg: 0.0002 (0.0039)  time: 0.5692  data: 0.0090  max mem: 2491
Epoch: [0]  [1556/2334]  eta: 0:07:23  lr: 0.000500  loss: 0.0381 (0.0617)  loss_classifier: 0.0099 (0.0187)  loss_box_reg: 0.0234 (0.0383)  loss_objectness: 0.0001 (0.0011)  loss_rpn_box_reg: 0.0003 (0.0036)  time: 0.5663  data: 0.0095  max mem: 2491
Epoch: [0]  [2333/2334]  eta: 0:00:00  lr: 0.000500  loss: 0.0476 (0.0669)  loss_classifier: 0.0177 (0.0204)  loss_box_reg: 0.0283 (0.0404)  loss_objectness: 0.0000 (0.0018)  loss_rpn_box_reg: 0.0004 (0.0043)  time: 0.5667  data: 0.0095  max me

### *2.3. Saving the models*

In [85]:
torch.save(modelRSN18.state_dict(), "./models/MaskRecognitionRSN18.pt")
torch.save(modelRCNN.state_dict(), "./models/MaskRecognitionFasterRCNN.pt")

### *2.4. Testing the models*

In [70]:
modelRSN18 = models.resnet18(pretrained=True)
for param in modelRSN18.parameters():
    param.requires_grad = False
modelRSN18.fc = nn.Sequential(
    nn.Linear(512,256),
    nn.ReLU(),
    nn.Linear(256,64),
    nn.ReLU(),
    nn.Linear(64,3),
    nn.Softmax(dim=1))
modelRSN18.to(device)
modelRSN18.load_state_dict(torch.load("./models/MaskRecognitionRSN18.pt"))
modelRSN18.eval()
clear_output()

In [43]:
modelRCNN = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
in_features = modelRCNN.roi_heads.box_predictor.cls_score.in_features
modelRCNN.roi_heads.box_predictor = FastRCNNPredictor(in_features,num_classes=4)
modelRCNN.to(device)
modelRCNN.load_state_dict(torch.load("./models/MaskRecognitionFasterRCNN.pt"))
modelRCNN.eval()
clear_output()

In [64]:
def get_boxes_RSN18(img):
    height,width,_ = img.shape
    img_r = cv2.resize(img, (2*height,2*width))
    gray_img = cv2.cvtColor(img_r, cv2.COLOR_BGR2GRAY)
    boxes_frontal = FaceDetection_model_frontal.detectMultiScale(gray_img)
    boxes_profile = FaceDetection_model_profile.detectMultiScale(gray_img)
    results = []
    for (x,y,w,h) in boxes_frontal:
        model_input = transforms.Resize((256,256))(torch.Tensor(img_r[y:y+h,x:x+w]).permute(2,0,1))
        model_input = model_input.reshape((1,3,256,256)).to(device)
        label = torch.argmax(modelRSN18(model_input)).item()
        results.append((x,y,x+w,y+h,label))
    for (x,y,w,h) in boxes_profile:
        model_input = transforms.Resize((256,256))(torch.Tensor(img_r[y:y+h,x:x+w]).permute(2,0,1))
        model_input = model_input.reshape((1,3,256,256)).to(device)
        label = torch.argmax(modelRSN18(model_input)).item()
        results.append((x,y,x+w,y+h,label))
    return results

In [65]:
def get_boxes_RCNN(img):
    h,w,c = img.shape
    results = []
    model_input = transforms.Resize((256,256))(torch.Tensor(img).permute(2,0,1))
    model_input = model_input.reshape((1,3,256,256)).to(device)
    target = modelRCNN(model_input)[0]
    for i in range(len(target["boxes"])):
        box = target["boxes"][i]
        label = int(target["labels"][i])
        xmin = int(w*box[0]/256)
        ymin = int(h*box[1]/256)
        xmax = int(w*box[2]/256)
        ymax = int(h*box[3]/256)
        results.append((xmin,ymin,xmax,ymax,label))
    return results

In [66]:
def show_random_results(get_boxes, nb_images):

    img_ids = rd.sample(list(annotations["image_id"]), nb_images)

    for img_id in img_ids:

        img = cv2.imread(images_dir_path+"{}.png".format(img_id))
        cv2.imshow("before | {}.png".format(img_id), img)
        cv2.waitKey(0)
        cv2.destroyAllWindows()
        h,w,c = img.shape
        overlay = img.copy()
        output = img.copy()

        boxes = get_boxes(img)

        for (xmin,ymin,xmax,ymax,label) in boxes:
            
            if label == 1:
                cv2.rectangle(overlay, (xmin,ymin), (xmax,ymax), (0,0,255), 2)
            
            elif label == 2:
                cv2.rectangle(overlay, (xmin,ymin), (xmax,ymax), (0,255,255), 2)
            
            else:
                cv2.rectangle(overlay, (xmin,ymin), (xmax,ymax), (0,255,0), 2)

        output = cv2.addWeighted(overlay, 0.5, output, 0.5, 0, output)        
        cv2.imshow("after | maksssksksss{}.png".format(img_id), output)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

In [67]:
show_random_results(get_boxes_RSN18, 10)

In [68]:
show_random_results(get_boxes_RCNN, 10)

___
___