# Face Mask Detection

Dataset Link: https://drive.google.com/file/d/1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp/view

In [2]:
# !gdown --id 1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp

Downloading...
From: https://drive.google.com/uc?id=1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp
To: /content/RMFD.zip
640MB [00:06, 105MB/s]


In [1]:
""" Add images into a pandas Dataframe
"""
from pathlib import Path

import pandas as pd
from google_drive_downloader import GoogleDriveDownloader as gdd
from tqdm import tqdm

# download dataset from link provided by
# https://github.com/X-zhangyang/Real-World-Masked-Face-Dataset
datasetPath = Path('covid-mask-detector/data/mask.zip')
gdd.download_file_from_google_drive(file_id='1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp',
                                    dest_path=str(datasetPath),
                                    unzip=True)
# delete zip file
datasetPath.unlink()

datasetPath = Path('covid-mask-detector/data/self-built-masked-face-recognition-dataset')
maskPath = datasetPath/'AFDB_masked_face_dataset'
nonMaskPath = datasetPath/'AFDB_face_dataset'
maskDF = pd.DataFrame()

for subject in tqdm(list(maskPath.iterdir()), desc='mask photos'):
    for imgPath in subject.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 1
        }, ignore_index=True)

for subject in tqdm(list(nonMaskPath.iterdir()), desc='non mask photos'):
    for imgPath in subject.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 0
        }, ignore_index=True)

dfName = 'covid-mask-detector/data/mask_df.csv'
print(f'saving Dataframe to: {dfName}')
maskDF.to_csv(dfName)

Downloading 1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp into covid-mask-detector/data/mask.zip... Done.
Unzipping...Done.


mask photos: 100%|██████████| 525/525 [00:04<00:00, 112.39it/s]
non mask photos: 100%|██████████| 460/460 [03:53<00:00,  1.97it/s]


saving Dataframe to: covid-mask-detector/data/mask_df.csv


In [2]:
""" Dataset module
"""
import cv2
import numpy as np
from torch import long, tensor
from torch.utils.data.dataset import Dataset
from torchvision.transforms import Compose, Resize, ToPILImage, ToTensor


class MaskDataset(Dataset):
    """ Masked faces dataset
        0 = 'no mask'
        1 = 'mask'
    """
    def __init__(self, dataFrame):
        self.dataFrame = dataFrame
        
        self.transformations = Compose([
            ToPILImage(),
            Resize((100, 100)),
            ToTensor(), # [0, 1]
        ])
    
    def __getitem__(self, key):
        if isinstance(key, slice):
            raise NotImplementedError('slicing is not supported')
        
        row = self.dataFrame.iloc[key]
        image = cv2.imdecode(np.fromfile(row['image'], dtype=np.uint8),
                             cv2.IMREAD_UNCHANGED)
        return {
            'image': self.transformations(image),
            'mask': tensor([row['mask']], dtype=long), # pylint: disable=not-callable
        }
    
    def __len__(self):
        return len(self.dataFrame.index)

In [3]:
""" Face detection using neural network
"""
from pathlib import Path

import numpy as np
from cv2 import resize
from cv2.dnn import blobFromImage, readNetFromCaffe


class FaceDetectorException(Exception):
    """ generic default exception
    """


class FaceDetector:
    """ Face Detector class
    """
    def __init__(self, prototype: Path=None, model: Path=None,
                 confidenceThreshold: float=0.6):
        self.prototype = prototype
        self.model = model
        self.confidenceThreshold = confidenceThreshold
        if self.prototype is None:
            raise FaceDetectorException("must specify prototype '.prototxt.txt' file "
                                        "path")
        if self.model is None:
            raise FaceDetectorException("must specify model '.caffemodel' file path")
        self.classifier = readNetFromCaffe(str(prototype), str(model))
    
    def detect(self, image):
        """ detect faces in image
        """
        net = self.classifier
        height, width = image.shape[:2]
        blob = blobFromImage(resize(image, (300, 300)), 1.0,
                             (300, 300), (104.0, 177.0, 123.0))
        net.setInput(blob)
        detections = net.forward()
        faces = []
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence < self.confidenceThreshold:
                continue
            box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
            startX, startY, endX, endY = box.astype("int")
            faces.append(np.array([startX, startY, endX-startX, endY-startY]))
        return faces

Install Pytorch Lightning

In [4]:
!pip install pytorch-lightning

Collecting pytorch-lightning
[?25l  Downloading https://files.pythonhosted.org/packages/48/5e/19c817ad2670c1d822642ed7bfc4d9d4c30c2f8eaefebcd575a3188d7319/pytorch_lightning-1.3.8-py3-none-any.whl (813kB)
[K     |▍                               | 10kB 23.6MB/s eta 0:00:01[K     |▉                               | 20kB 29.3MB/s eta 0:00:01[K     |█▏                              | 30kB 21.6MB/s eta 0:00:01[K     |█▋                              | 40kB 16.9MB/s eta 0:00:01[K     |██                              | 51kB 8.7MB/s eta 0:00:01[K     |██▍                             | 61kB 9.1MB/s eta 0:00:01[K     |██▉                             | 71kB 9.3MB/s eta 0:00:01[K     |███▎                            | 81kB 10.3MB/s eta 0:00:01[K     |███▋                            | 92kB 10.6MB/s eta 0:00:01[K     |████                            | 102kB 8.5MB/s eta 0:00:01[K     |████▍                           | 112kB 8.5MB/s eta 0:00:01[K     |████▉                        

Link: https://www.pytorchlightning.ai/

In [12]:
""" Training module
"""
from pathlib import Path
from typing import Dict, List

import pandas as pd
import pytorch_lightning as pl
import torch
import torch.nn.init as init
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.metrics import Accuracy
from sklearn.model_selection import train_test_split
from torch import Tensor
from torch.nn import (Conv2d, CrossEntropyLoss, Linear, MaxPool2d, ReLU,
                      Sequential)
from torch.optim import Adam
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader

# from .dataset import MaskDataset


# pylint: disable=not-callable
class MaskDetector(pl.LightningModule):
    """ MaskDetector PyTorch Lightning class
    """
    def __init__(self, maskDFPath: Path=None):
        super(MaskDetector, self).__init__()
        self.maskDFPath = maskDFPath
        
        self.maskDF = None
        self.trainDF = None
        self.validateDF = None
        self.crossEntropyLoss = None
        self.learningRate = 0.00001
        
        self.trainAcc = Accuracy()
        self.valAcc = Accuracy()
        
        self.convLayer1 = convLayer1 = Sequential(
            Conv2d(3, 32, kernel_size=(3, 3), padding=(1, 1)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.convLayer2 = convLayer2 = Sequential(
            Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.convLayer3 = convLayer3 = Sequential(
            Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1), stride=(3,3)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.linearLayers = linearLayers = Sequential(
            Linear(in_features=2048, out_features=1024),
            ReLU(),
            Linear(in_features=1024, out_features=2),
        )
        
        # Initialize layers' weights
        for sequential in [convLayer1, convLayer2, convLayer3, linearLayers]:
            for layer in sequential.children():
                if isinstance(layer, (Linear, Conv2d)):
                    init.xavier_uniform_(layer.weight)
    
    def forward(self, x: Tensor): # pylint: disable=arguments-differ
        """ forward pass
        """
        out = self.convLayer1(x)
        out = self.convLayer2(out)
        out = self.convLayer3(out)
        out = out.view(-1, 2048)
        out = self.linearLayers(out)
        return out
    
    def prepare_data(self) -> None:
        self.maskDF = maskDF = pd.read_csv(self.maskDFPath)
        train, validate = train_test_split(maskDF, test_size=0.3, random_state=0,
                                           stratify=maskDF['mask'])
        self.trainDF = MaskDataset(train)
        self.validateDF = MaskDataset(validate)
        
        # Create weight vector for CrossEntropyLoss
        maskNum = maskDF[maskDF['mask']==1].shape[0]
        nonMaskNum = maskDF[maskDF['mask']==0].shape[0]
        nSamples = [nonMaskNum, maskNum]
        normedWeights = [1 - (x / sum(nSamples)) for x in nSamples]
        self.crossEntropyLoss = CrossEntropyLoss(weight=torch.tensor(normedWeights))
    
    def train_dataloader(self) -> DataLoader:
        return DataLoader(self.trainDF, batch_size=32, shuffle=True, num_workers=4)
    
    def val_dataloader(self) -> DataLoader:
        return DataLoader(self.validateDF, batch_size=32, num_workers=4)
    
    def configure_optimizers(self) -> Optimizer:
        return Adam(self.parameters(), lr=self.learningRate)
    
    # pylint: disable=arguments-differ
    def training_step(self, batch: dict, _batch_idx: int) -> Tensor:
        inputs, labels = batch['image'], batch['mask']
        labels = labels.flatten()
        outputs = self.forward(inputs)
        loss = self.crossEntropyLoss(outputs, labels)
        self.trainAcc(outputs.argmax(dim=1), labels)
        self.log('train_loss', loss, on_step=False, on_epoch=True, prog_bar=False)
        return loss
    
    def training_epoch_end(self, _trainingStepOutputs):
        self.log('train_acc', self.trainAcc.compute() * 100, prog_bar=True)
        self.trainAcc.reset()
    
    def validation_step(self, batch: dict, _batch_idx: int) -> Dict[str, Tensor]:
        inputs, labels = batch['image'], batch['mask']
        labels = labels.flatten()
        outputs = self.forward(inputs)
        loss = self.crossEntropyLoss(outputs, labels)
        
        self.valAcc(outputs.argmax(dim=1), labels)
        
        return {'val_loss': loss}
    
    def validation_epoch_end(self, validationStepOutputs: List[Dict[str, Tensor]]):
        avgLoss = torch.stack([x['val_loss'] for x in validationStepOutputs]).mean()
        valAcc = self.valAcc.compute() * 100
        self.valAcc.reset()
        self.log('val_loss', avgLoss, prog_bar=True)
        self.log('val_acc', valAcc, prog_bar=True)

if __name__ == '__main__':
    model = MaskDetector(Path('covid-mask-detector/data/mask_df.csv'))
    logger = TensorBoardLogger("covid-mask-detector/tensorboard", name="mask-detector")
    checkpointCallback = ModelCheckpoint(
        dirpath="/content/covid-mask-detector/models/",
        filename='{epoch}-{val_loss:.2f}-{val_acc:.2f}',
        verbose=True,
        monitor='val_acc',
        mode='max'
    )
    trainer = Trainer(gpus=1 if torch.cuda.is_available() else 0,
                      max_epochs=10,
                      logger=logger,
                      checkpoint_callback=checkpointCallback)
    trainer.fit(model)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name             | Type             | Params
------------------------------------------------------
0 | trainAcc         | Accuracy         | 0     
1 | valAcc           | Accuracy         | 0     
2 | convLayer1       | Sequential       | 896   
3 | convLayer2       | Sequential       | 18.5 K
4 | convLayer3       | Sequential       | 73.9 K
5 | linearLayers     | Sequential       | 2.1 M 
6 | crossEntropyLoss | CrossEntropyLoss | 0     
------------------------------------------------------
2.2 M     Trainable params
0         Non-trainable params
2.2 M     Total params
8.774     Total estimated model params size (MB)


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…

  cpuset_checked))




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', layout=Layout(flex='2'), max…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…




Load models from the checkpoints

In [13]:
checkpoint_callback = ModelCheckpoint(dirpath='/content/covid-mask-detector/models')
trainer = Trainer(callbacks=[checkpoint_callback])
trainer.fit(model)
checkpoint_callback.best_model_path

GPU available: True, used: False
TPU available: False, using: 0 TPU cores
  "GPU available but not used. Set the gpus flag in your trainer"

  | Name             | Type             | Params
------------------------------------------------------
0 | trainAcc         | Accuracy         | 0     
1 | valAcc           | Accuracy         | 0     
2 | convLayer1       | Sequential       | 896   
3 | convLayer2       | Sequential       | 18.5 K
4 | convLayer3       | Sequential       | 73.9 K
5 | linearLayers     | Sequential       | 2.1 M 
6 | crossEntropyLoss | CrossEntropyLoss | 0     
------------------------------------------------------
2.2 M     Trainable params
0         Non-trainable params
2.2 M     Total params
8.774     Total estimated model params size (MB)


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…

  cpuset_checked))




HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', layout=Layout(flex='2'), max…

  rank_zero_warn('Detected KeyboardInterrupt, attempting graceful shutdown...')


''

In [14]:
filepath = "trained_model_pytorch_lightning.pt"
trainer.save_checkpoint("pl_model.ckpt")

!pip install sk-video

This library provides easy access to common as well as state-of-the-art video processing routines
