<a href="https://colab.research.google.com/github/linlih/CovidFaceMaskDetector/blob/master/Covid_Face_Mask_Detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 下载数据

数据来源：https://github.com/X-zhangyang/Real-World-Masked-Face-Dataset


In [0]:
from pathlib import Path

import pandas as pd
from google_drive_downloader import GoogleDriveDownloader as gdd
from tqdm import tqdm

In [0]:
datasetPath = Path('./data/mask.zip')
# 从GoogleDrive的共享文件中下载训练数据
gdd.download_file_from_google_drive(file_id='1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp',
                  dest_path=str(datasetPath),
                  unzip=True)

Downloading 1UlOk6EtiaXTHylRUx2mySgvJX9ycoeBp into data/mask.zip... Done.
Unzipping...Done.


In [0]:
datasetPath.unlink() # 删除下载的zipwe文件

In [0]:
# 构建DataFrame，并保存序列化，如果序列化过了，就无需无需执行这个内容，直接读入序列化的文件即可
# 注意DataFrame的append是要赋值等号的形式：maskDF = maskDF.append(xxx)，这个使用形式和其他直接append无法赋值就生效的不一致，要十分注意
datasetPath = Path('./data/self-built-masked-face-recognition-dataset')
maskPath = datasetPath/'AFDB_masked_face_dataset'
nonMaskPath = datasetPath/'AFDB_face_dataset'

maskDF = pd.DataFrame()

for subject in tqdm(list(maskPath.iterdir()),desc='mask photos'):
  for imgPath in subject.iterdir():
    maskDF = maskDF.append({
        'image': str(imgPath),
        'mask': 1
    }, ignore_index=True)

for subject in tqdm(list(nonMaskPath.iterdir()),desc='no mask photos'):
  for imgPath in subject.iterdir():
    maskDF = maskDF.append({
        'image': str(imgPath),
        'mask': 0
    }, ignore_index=True)
    
dfName = './data/mask_df.pickle'
print(f'saving DataFrame to {dfName}')
maskDF.to_pickle(dfName) # 保存序列化文件，读取的函数使用pd.read_pickle

mask photos: 100%|██████████| 525/525 [00:05<00:00, 92.19it/s]
no mask photos: 100%|██████████| 460/460 [04:33<00:00,  1.68it/s]

saving DataFrame to ./data/mask_df.pickle





In [0]:
# 如果已经序列化过，直接执行这个创建DataFrame即可
maskDF = pd.read_pickle('./data/mask_df.pickle')

In [0]:
# 统计结果中共有戴口罩的人脸图片是2203张，正常人脸是90468张
# 和Github数据集上说明的5千张戴口罩和9万张正常人脸有一定的出入
maskDF['mask'].value_counts()

In [0]:
# 构建Dataset，这里是为了能够让PyTorch进行读取
import cv2
from torch import long, tensor
from torch.utils.data.dataset import Dataset
from torchvision.transforms import Compose, Resize, ToPILImage, ToTensor

In [0]:
class MaskDataset(Dataset):
  def __init__(self, dataFrame):
    self.dataFrame = dataFrame
    self.transformations = Compose([
        ToPILImage(),
        Resize((100, 100)), # 每张人脸的大小调整为100*100
        ToTensor(),
    ])
    
  def __getitem__(self, key):
    if isinstance(key, slice):
      raise NotImplementedError('slicing is not supported')
    row = self.dataFrame.iloc[key]
    return {
        'image': self.transformations(cv2.imread(row['image'])),
        'mask': tensor([row['mask']], dtype=long)
    }

  def __len__(self):
    return len(self.dataFrame.index)

In [47]:
!pip install pytorch-lightning -q

[K     |████████████████████████████████| 256kB 2.8MB/s 
[K     |████████████████████████████████| 829kB 8.8MB/s 
[?25h  Building wheel for future (setup.py) ... [?25l[?25hdone


In [0]:
# 构建模型
from pathlib import Path
from typing import Dict, List, Union

import pandas as pd
import pytorch_lightning as pl
import torch
import torch.nn.init as init

from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from torch import Tensor
from torch.nn import (Conv2d, CrossEntropyLoss, Linear, MaxPool2d, ReLU, Sequential)
from torch.optim import Adam
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader

In [0]:
class MaskDetector(pl.LightningModule):
  def __init__(self, maskDFPath: Path=None):
    super(MaskDetector, self).__init__()
    self.maskDFPath = maskDFPath
    self.maskDF = None
    self.trainDF = None
    self.validationDF = None
    self.crossEntropyLoss = None
    self.learningRate = 0.00001

    self.convLayer1 = convLayer1 = Sequential(Conv2d(3, 32, kernel_size=(3, 3), padding=(1, 1)),
                           ReLU(),
                           MaxPool2d(kernel_size=(2, 2))
                           )
    self.convLayer2 = convLayer2 = Sequential(Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1)),
                           ReLU(),
                           MaxPool2d(kernel_size=(2, 2))
                           )
    self.convLayer3 = convLayer3 = Sequential(Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1), stride=(3, 3)),
                           ReLU(),
                           MaxPool2d(kernel_size=(2, 2))
                           )
    self.linearLayers = linearLayers = Sequential(Linear(in_features=2048, out_features=1024),
                            ReLU(),
                            Linear(in_features=1024, out_features=2))
    
    for sequential in [convLayer1, convLayer2, convLayer3, linearLayers]:
      for layer in sequential.children():
        if isinstance(layer, (Linear, Conv2d)):
          init.xavier_uniform_(layer.weight)
  
  def forward(self, x: Tensor):
    out = self.convLayer1(x)
    out = self.convLayer2(out)
    out = self.convLayer3(out)
    out = out.view(-1, 2048)
    out = self.linearLayers(out)
    return out
  
  def prepare_data(self) -> None:
    self.maskDF = maskDF = pd.read_pickle(self.maskDFPath)
    train, validate = train_test_split(maskDF, test_size=0.3, random_state=0, stratify=maskDF['mask'])
    self.trainDF = MaskDataset(train)
    self.validateDF = MaskDataset(validate)

    maskNum = maskDF[maskDF['mask'] == 1].shape[0]
    nonMaskNum = maskDF[maskDF['mask'] == 0].shape[0]
    nSamples = [nonMaskNum, maskNum]
    normedWeights = [1 - (x/sum(nSamples)) for x in nSamples]
    self.crossEntropyLoss = CrossEntropyLoss(weight=torch.tensor(normedWeights))

  def train_dataloader(self) -> DataLoader:
    return DataLoader(self.trainDF, batch_size=32, shuffle=True, num_workers=4)

  def val_dataloader(self) -> DataLoader:
    return DataLoader(self.validateDF, batch_size=32, num_workers=4)
  
  def configure_optimizers(self) -> Optimizer:
    return Adam(self.parameters(), lr=self.learningRate)

  def training_step(self, batch: dict, _batch_idx: int) -> Dict[str, Tensor]:
    inputs, labels = batch['image'], batch['mask']
    labels = labels.flatten()
    outputs = self.forward(inputs)
    loss = self.crossEntropyLoss(outputs, labels)
    
    tensorboardLogs = {'train_loss': loss}
    return {'loss': loss, 'log': tensorboardLogs}

  def validation_step(self, batch:dict, _batch_idx: int) -> Dict[str, Tensor]:
    inputs, labels = batch['image'], batch['mask']
    labels = labels.flatten()
    outputs = self.forward(inputs)
    loss = self.crossEntropyLoss(outputs, labels)

    _, outputs = torch.max(outputs, dim=1)
    valAcc = accuracy_score(outputs.cpu(), labels.cpu())
    valAcc = torch.tensor(valAcc)
    return {'val_loss': loss, 'val_acc': valAcc}
  
  def validation_epoch_end(self, outputs: List[Dict[str, Tensor]]) \
     -> Dict[str, Union[Tensor, Dict[str, Tensor]]]:
    avgLoss = torch.stack([x['val_loss'] for x in outputs]).mean()
    avgAcc = torch.stack([x['val_acc'] for x in outputs]).mean()
    tensorboardLogs = {'val_loss': avgLoss, 'val_acc': avgAcc}
    return {'val_loss':avgLoss, 'log': tensorboardLogs}

In [0]:
# colab在这里训练会卡死
# 提示是JavaScript失效或者google账号登录失效之类的
# 很奇怪的是，这个训练一运行，本地的内存被吃光了，训练不是放在Colab云端进行的吗，这里只是做了个显示而已，为什么会把本地的内存全部用完，有点莫名其妙
model = MaskDetector(Path('./data/mask_df.pickle'))

checkpoint_callback = ModelCheckpoint(
    filepath = './checkpoints/weights.ckpt',
    save_weights_only=True,
    verbose=True,
    monitor='val_acc',
    mode='max'
)
trainer = Trainer(gpus=1,
          max_epochs=10,
          checkpoint_callback=checkpoint_callback,
          profiler=True)
trainer.fit(model)

In [0]:
# 下载训练好的模型
!wget https://raw.githubusercontent.com/JadHADDAD92/covid-mask-detector/master/covid-mask-detector/models/face_mask.ckpt # 人脸口罩检测模型
!wget https://raw.githubusercontent.com/JadHADDAD92/covid-mask-detector/master/covid-mask-detector/models/deploy.prototxt.txt # OpenCV人脸检测
!wget https://raw.githubusercontent.com/JadHADDAD92/covid-mask-detector/master/covid-mask-detector/models/res10_300x300_ssd_iter_140000.caffemodel # OpenCV人脸检测

由于在Colab不能实时处理webcam的图像，下面的实现方法是进行拍照，然后将得到的图片进行检测是否有佩戴口罩，如果是在本地运行的话，可以使用OpenCV进行实时检测

In [0]:
# 参考代码：在Colab中显示webcam的内容，测试用，不需要执行
def start_webcam():
  js = Javascript('''
    async function startWebcam() {

      const div = document.createElement('div');

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});


      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();
    }
    ''')
  
  display(js)
  data = eval_js('startWebcam()')
      
start_webcam()

In [0]:
# Google官方提供的webcam获取函数，支持拍照，测试用，不需要执行
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename

from IPython.display import Image
try:
  filename = take_photo()
  print('Saved to {}'.format(filename))
  
  # Show the image which was just taken.
  display(Image(filename))
except Exception as err:
  # Errors will be thrown if the user does not have a webcam or if they do not
  # grant the page permission to access it.
  print(str(err))

In [0]:
from pathlib import Path

import numpy as np
from cv2 import resize
from cv2.dnn import blobFromImage, readNetFromCaffe

class FaceDetectorException(Exception):
  """ generic default exception
  """

# 人脸检测，这里使用的是OpenCV提供的人脸检测模型
class FaceDetector:
  """ Face Detector class
  """
  def __init__(self, prototype: Path=None, model: Path=None,
                confidenceThreshold: float=0.6):
    self.prototype = prototype
    self.model = model
    self.confidenceThreshold = confidenceThreshold
    if self.prototype is None:
        raise FaceDetectorException("must specify prototype '.prototxt.txt' file "
                                    "path")
    if self.model is None:
        raise FaceDetectorException("must specify model '.caffemodel' file path")
    self.classifier = readNetFromCaffe(str(prototype), str(model))
  
  def detect(self, image):
    """ detect faces in image
    """
    net = self.classifier
    height, width = image.shape[:2]
    blob = blobFromImage(resize(image, (300, 300)), 1.0,
                          (300, 300), (104.0, 177.0, 123.0))
    net.setInput(blob)
    detections = net.forward()
    faces = []
    for i in range(0, detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence < self.confidenceThreshold:
            continue
        box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
        startX, startY, endX, endY = box.astype("int")
        faces.append(np.array([startX, startY, endX-startX, endY-startY]))
    return faces

In [0]:
from pathlib import Path

import click
import cv2
import torch
#from skvideo.io import FFmpegWriter, vreader
from torchvision.transforms import Compose, Resize, ToPILImage, ToTensor
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import numpy as np
from PIL import Image
import io
import cv2

def VideoCapture():
  js = Javascript('''
    async function create(){
      div = document.createElement('div');
      document.body.appendChild(div);

      video = document.createElement('video');
      video.setAttribute('playsinline', '');

      div.appendChild(video);

      stream = await navigator.mediaDevices.getUserMedia({video: {facingMode: "environment"}});
      video.srcObject = stream;

      await video.play();

      canvas =  document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);

      div_out = document.createElement('div');
      document.body.appendChild(div_out);
      img = document.createElement('img');
      div_out.appendChild(img);
    }

    async function capture(){
        return await new Promise(function(resolve, reject){
            pendingResolve = resolve;
            canvas.getContext('2d').drawImage(video, 0, 0);
            result = canvas.toDataURL('image/jpeg', 0.8);
            pendingResolve(result);
        })
    }

    function showimg(imgb64){
        img.src = "data:image/jpg;base64," + imgb64;
    }

  ''')
  display(js)

def byte2image(byte):
  jpeg = b64decode(byte.split(',')[1])
  im = Image.open(io.BytesIO(jpeg))
  return np.array(im)

def image2byte(image):
  image = Image.fromarray(image)
  buffer = io.BytesIO()
  image.save(buffer, 'jpeg')
  buffer.seek(0)
  x = b64encode(buffer.read()).decode('utf-8')
  return x

VideoCapture()
eval_js('create()')

modelpath='/content/face_mask.ckpt'
outputPath=None

model = MaskDetector()
model.load_state_dict(torch.load(modelpath), strict=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()

faceDetector = FaceDetector(
    prototype='./deploy.prototxt.txt',
    model='./res10_300x300_ssd_iter_140000.caffemodel',
)

transformations = Compose([
    ToPILImage(),
    Resize((100, 100)),
    ToTensor(),
])

if outputPath:
    writer = FFmpegWriter(str(outputPath))

font = cv2.FONT_HERSHEY_SIMPLEX
labels = ['No mask', 'Mask']
labelColor = [(10, 0, 255), (10, 255, 0)]

while True:
  byte = eval_js('capture()')
  frame = byte2image(byte)
  #print(im.shape)
  faces = faceDetector.detect(frame)

  for face in faces:
    xStart, yStart, width, height = face
    #print(face)
    # clamp coordinates that are outside of the image
    xStart, yStart = max(xStart, 0), max(yStart, 0)
    
    # predict mask label on extracted face
    faceImg = frame[yStart:yStart+height, xStart:xStart+width]
    output = model(transformations(faceImg).unsqueeze(0).to(device))
    _, predicted = torch.max(output.data, 1)
    
    # draw face frame
    cv2.rectangle(frame,
            (xStart, yStart),
            (xStart + width, yStart + height),
            (126, 65, 64),
          thickness=2)
    
    # center text according to the face frame
    textSize = cv2.getTextSize(labels[predicted], font, 1, 2)[0]
    textX = xStart + width // 2 - textSize[0] // 2
    
    # draw prediction label
    cv2.putText(frame,
          labels[predicted],
          (textX, yStart-20),
          font, 1, labelColor[predicted], 2)
    if outputPath:
        writer.writeFrame(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    eval_js('showimg("{}")'.format(image2byte(frame)))
  if outputPath:
      writer.close()