In [None]:
import pandas as pd
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pathlib import Path
!pip install pytorch_lightning

Collecting pytorch_lightning
[?25l  Downloading https://files.pythonhosted.org/packages/17/f6/bfe4676f3577063045e9d19f176163d9367a3e93fc999a7f72ced85287e7/pytorch_lightning-1.3.7.post0-py3-none-any.whl (810kB)
[K     |████████████████████████████████| 819kB 4.0MB/s 
[?25hCollecting fsspec[http]!=2021.06.0,>=2021.05.0
[?25l  Downloading https://files.pythonhosted.org/packages/0e/3a/666e63625a19883ae8e1674099e631f9737bd5478c4790e5ad49c5ac5261/fsspec-2021.6.1-py3-none-any.whl (115kB)
[K     |████████████████████████████████| 122kB 8.6MB/s 
Collecting torchmetrics>=0.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/3b/e8/513cd9d0b1c83dc14cd8f788d05cd6a34758d4fd7e4f9e5ecd5d7d599c95/torchmetrics-0.3.2-py3-none-any.whl (274kB)
[K     |████████████████████████████████| 276kB 12.6MB/s 
Collecting tensorboard!=2.5.0,>=2.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/64/21/eebd23060763fedeefb78bc2b286e00fa1d8abda6f70efa2ee08c28af0d4/tensorboard-2.4.1-py3-none

In [None]:
import cv2
import numpy as np
from torch import long, tensor
from torch.utils.data.dataset import Dataset
from torchvision.transforms import Compose, Resize, ToPILImage, ToTensor

In [None]:
class MaskDataset(Dataset):
    
    def __init__(self, dataFrame):
        self.dataFrame = dataFrame
        
        self.transformations = Compose([
            ToPILImage(),
            Resize((100, 100)),
            ToTensor(), # [0, 1]
        ])
    
    def __getitem__(self, key):
        if isinstance(key, slice):
            raise NotImplementedError('slicing is not supported')
        
        row = self.dataFrame.iloc[key]
        image = cv2.imdecode(np.fromfile(row['image'], dtype=np.uint8),
                             cv2.IMREAD_UNCHANGED)
        return {
            'image': self.transformations(image),
            'mask': tensor([row['mask']], dtype=long), 
        }
    
    def __len__(self):
        return len(self.dataFrame.index)

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from typing import Dict, List
val_loss=[]

import pytorch_lightning as pl
import torch
import torch.nn.init as init
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.metrics import Accuracy
from sklearn.model_selection import train_test_split
from torch import Tensor
from torch.nn import (Conv2d, CrossEntropyLoss, Linear, MaxPool2d, ReLU,
                      Sequential)
from torch.optim import Adam
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader


class MaskDetector(pl.LightningModule):
    """ MaskDetector PyTorch Lightning class
    """
    def __init__(self, maskDFPath: Path=None):
        super(MaskDetector, self).__init__()
        self.maskDFPath = maskDFPath
        
        self.maskDF = None
        self.trainDF = None
        self.validateDF = None
        self.crossEntropyLoss = None
        self.learningRate = 0.00001
        
        self.trainAcc = Accuracy()
        self.valAcc = Accuracy()
        
        self.convLayer1 = convLayer1 = Sequential(
            Conv2d(3, 32, kernel_size=(3, 3), padding=(1, 1)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.convLayer2 = convLayer2 = Sequential(
            Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.convLayer3 = convLayer3 = Sequential(
            Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1), stride=(3,3)),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2))
        )
        
        self.linearLayers = linearLayers = Sequential(
            Linear(in_features=2048, out_features=1024),
            ReLU(),
            Linear(in_features=1024, out_features=2),
        )
        
        # Initialize layers' weights
        for sequential in [convLayer1, convLayer2, convLayer3, linearLayers]:
            for layer in sequential.children():
                if isinstance(layer, (Linear, Conv2d)):
                    init.xavier_uniform_(layer.weight)
    
    def forward(self, x: Tensor): # pylint: disable=arguments-differ
        """ forward pass
        """
        out = self.convLayer1(x)
        out = self.convLayer2(out)
        out = self.convLayer3(out)
        out = out.view(-1, 2048)
        out = self.linearLayers(out)
        return out
    
    def prepare_data(self) -> None:
        self.maskDF = maskDF = pd.read_csv(self.maskDFPath)
        train, validate = train_test_split(maskDF, test_size=0.3, random_state=0,
                                           stratify=maskDF['mask'])
        self.trainDF = MaskDataset(train)
        self.validateDF = MaskDataset(validate)
        
        # Create weight vector for CrossEntropyLoss
        maskNum = maskDF[maskDF['mask']==1].shape[0]
        nonMaskNum = maskDF[maskDF['mask']==0].shape[0]
        nSamples = [nonMaskNum, maskNum]
        normedWeights = [1 - (x / sum(nSamples)) for x in nSamples]
        self.crossEntropyLoss = CrossEntropyLoss(weight=torch.tensor(normedWeights))
    
    def train_dataloader(self) -> DataLoader:
        return DataLoader(self.trainDF, batch_size=32, shuffle=True, num_workers=4)
    
    def val_dataloader(self) -> DataLoader:
        return DataLoader(self.validateDF, batch_size=32, num_workers=4)
    
    def configure_optimizers(self) -> Optimizer:
        return Adam(self.parameters(), lr=self.learningRate)
    
    
    def training_step(self, batch: dict, _batch_idx: int) -> Tensor:
        inputs, labels = batch['image'], batch['mask']
        labels = labels.flatten()
        outputs = self.forward(inputs)
        loss = self.crossEntropyLoss(outputs, labels)
        self.trainAcc(outputs.argmax(dim=1), labels)
        self.log('train_loss', loss, on_step=False, on_epoch=True, prog_bar=False)
        return loss
    
    def training_epoch_end(self, _trainingStepOutputs):
        self.log('train_acc', self.trainAcc.compute() * 100, prog_bar=True)
        self.trainAcc.reset()
    
    def validation_step(self, batch: dict, _batch_idx: int): #-> Dict[str, Tensor]:
        inputs, labels = batch['image'], batch['mask']
        labels = labels.flatten()
        outputs = self.forward(inputs)
        loss = self.crossEntropyLoss(outputs, labels)
        
        self.valAcc(outputs.argmax(dim=1), labels)
        val_loss.append(loss)

        return {'val_loss': loss}
    
    def validation_epoch_end(self, validationStepOutputs: List[Dict[str, Tensor]]):
        avgLoss = torch.stack([x['val_loss'] for x in validationStepOutputs]).mean()
        valAcc = self.valAcc.compute() * 100
        self.valAcc.reset()
        self.log('val_loss', avgLoss, prog_bar=True)
        self.log('val_acc', valAcc, prog_bar=True)


model = MaskDetector(Path('/content/drive/MyDrive/face_detection/Train/mask_df.csv'))
checkpointCallback = ModelCheckpoint(
    filename='{epoch}-{val_loss:.2f}-{val_acc:.2f}',
    verbose=True,
    monitor='val_acc',
    mode='max'
)
trainer = Trainer(gpus=1 if torch.cuda.is_available() else 0,
                      max_epochs=40,
                      checkpoint_callback=checkpointCallback)
trainer.fit(model)

  "`pytorch_lightning.metrics.*` module has been renamed to `torchmetrics.*` and split off to its own package"
  stream(template_mgs % msg_args)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name             | Type             | Params
------------------------------------------------------
0 | trainAcc         | Accuracy         | 0     
1 | valAcc           | Accuracy         | 0     
2 | convLayer1       | Sequential       | 896   
3 | convLayer2       | Sequential       | 18.5 K
4 | convLayer3       | Sequential       | 73.9 K
5 | linearLayers     | Sequential       | 2.1 M 
6 | crossEntropyLoss | CrossEntropyLoss | 0     
------------------------------------------------------
2.2 M     Trainable params
0         Non-trainable params
2.2 M     Total params
8.774     Total estimated model params size (MB)


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…



HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', layout=Layout(flex='2'), max…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…

In [None]:
from pathlib import Path

import numpy as np
from cv2 import resize
from cv2.dnn import blobFromImage, readNetFromCaffe


class FaceDetectorException(Exception):
    """ generic default exception
    """


class FaceDetector:
    """ Face Detector class
    """
    def __init__(self, prototype: Path=None, model: Path=None,
                 confidenceThreshold: float=0.6):
        self.prototype = prototype
        self.model = model
        self.confidenceThreshold = confidenceThreshold
        if self.prototype is None:
            raise FaceDetectorException("must specify prototype '.prototxt.txt' file "
                                        "path")
        if self.model is None:
            raise FaceDetectorException("must specify model '.caffemodel' file path")
        self.classifier = readNetFromCaffe(str(prototype), str(model))
    
    def detect(self, image):
        """ detect faces in image
        """
        net = self.classifier
        height, width = image.shape[:2]
        blob = blobFromImage(resize(image, (300, 300)), 1.0,
                             (300, 300), (104.0, 177.0, 123.0))
        net.setInput(blob)
        detections = net.forward()
        faces = []
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence < self.confidenceThreshold:
                continue
            box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
            startX, startY, endX, endY = box.astype("int")
            faces.append(np.array([startX, startY, endX-startX, endY-startY]))
        return faces

In [None]:
import cv2
import torch
from torchvision.transforms import Compose, Resize, ToPILImage, ToTensor
from google.colab.patches import cv2_imshow


def testimage(model):
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    model = model.to(device)
    model.eval()
    
    faceDetector = FaceDetector(
        prototype='/content/drive/MyDrive/face_detection/models/deploy.prototxt.txt',
        model='/content/drive/MyDrive/face_detection/models/res10_300x300_ssd_iter_140000.caffemodel',
    )
    
    transformations = Compose([
        ToPILImage(),
        Resize((100, 100)),
        ToTensor(),
    ])
    
    font = cv2.FONT_HERSHEY_SIMPLEX
    labels = ['Mask','No Mask']
    labelColor = [(10, 0, 255), (10, 255, 0)]
    frame = cv2.imread('b.jpg')
                            
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    faces = faceDetector.detect(frame)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    for face in faces:
        xStart, yStart, width, height = face
        
        xStart, yStart = max(xStart, 0), max(yStart, 0)
            
            # predict mask label on extracted face
        
        faceImg = frame[yStart:yStart+height, xStart:xStart+width]
        output = model(transformations(faceImg).unsqueeze(0).to(device))
        _, predicted = torch.max(output.data, 1)
            
            
        cv2.rectangle(frame,
                          (xStart, yStart),
                          (xStart + width, yStart + height),
                          (126, 65, 64),
                          thickness=2)
            
            # center text according to the face frame
        textSize = cv2.getTextSize(labels[predicted], font, 1, 2)[0]
        textX = xStart + width // 2 - textSize[0] // 2
            
            # draw prediction label
        cv2.putText(frame,
                        labels[predicted],
                        (textX, yStart+20),
                        font, 1, labelColor[predicted], 2)
    
    cv2_imshow(frame)

testimage(model)

In [None]:
datasetPath = Path('/content/drive/MyDrive/face_detection/Train')
maskPath = datasetPath/'Mask'
nonMaskPath = datasetPath/'Non Mask'
maskPath1 = datasetPath/'Mask1'
nonMaskPath1 = datasetPath/'Non Mask1'
maskDF = pd.DataFrame()

for imgPath in nonMaskPath.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 1
        }, ignore_index=True)

print(maskDF.shape)

for imgPath in maskPath.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 0
        }, ignore_index=True)

print(maskDF.shape)

for imgPath in nonMaskPath1.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 1
        }, ignore_index=True)

print(maskDF.shape)

for imgPath in maskPath1.iterdir():
        maskDF = maskDF.append({
            'image': str(imgPath),
            'mask': 0
        }, ignore_index=True)

print(maskDF.shape)

dfName = '/content/drive/MyDrive/face_detection/Train/mask_df.csv'
maskDF.to_csv(dfName)

(300, 2)
(600, 2)
(753, 2)
(906, 2)
