In [22]:
#Imports here

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import time
from PIL import Image
import cv2
from torch import nn, optim
from torchvision import datasets, transforms, models
from zipfile import ZipFile
from cv2 import resize
from cv2.dnn import readNetFromCaffe, blobFromImage
from pathlib import Path

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [2]:
# Directory to save model checkpoint
model_dir = 'models'

# dataset root directory
data_dir = 'mask_dataset'

#directory to training dataset
train_dir = data_dir + '/train'

#directory to validation dataset
valid_dir = data_dir + '/valid'

#directory to testing dataset
test_dir = data_dir + '/test'

prototype_path = 'models/deploy.prototxt.txt'
face_detection_model_path = 'models/res10_300x300_ssd_iter_140000.caffemodel'
font = cv2.FONT_HERSHEY_DUPLEX

In [None]:
# Extracting data from zip file
with ZipFile('mask_dataset.zip', 'r') as zipped_file:
    zipped_file.extractall(data_dir)

In [3]:
# transforms to be applied to training and validation dataset
train_transforms = transforms.Compose([transforms.RandomPerspective(),
                                       transforms.Resize((100,100)),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.4437, 0.3848, 0.3613], [0.2972, 0.2702, 0.2581])
                                       ])

# transforms to be applied to testing dataset
test_transforms = transforms.Compose([transforms.Resize((100,100)),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.4437, 0.3848, 0.3613], [0.2972, 0.2702, 0.2581])
                                       ])

In [4]:
# Loading training dataset
train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)

# Loading validation dataset
valid_dataset = datasets.ImageFolder(valid_dir, transform=train_transforms)

# Loading testing dataset
test_dataset = datasets.ImageFolder(test_dir, transform=test_transforms)

In [None]:
# calculating mean and standard deviation of dataset

#mean = 0
#std = 0
#for images, _ in train_dataloader:
#  batch_samples = images.size(0) # batch size (the last batch can have smaller size!)
#  images = images.view(batch_samples, images.size(1), -1)
#  mean += images.mean(2).sum(0)
#  std += images.std(2).sum(0)

#mean /= len(train_dataloader.dataset)
#std /= len(train_dataloader.dataset)

In [5]:
# Number of input data in a single batch
BATCH_SIZE = 100

# Dataloader for training set
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Dataloader for validation set
valid_dataloader = torch.utils.data.DataLoader(valid_dataset, batch_size=BATCH_SIZE)

# Dataloader for testing set
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [6]:
# Confirm that training with gpu is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [7]:
class ConvNet(nn.Module):
    """Convolutional Neural Network class"""

    def __init__(self):
        super(ConvNet, self).__init__()

        self.convLayer1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2))

        self.convLayer2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2))

        self.drop_out = nn.Dropout(p=0.5)

        self.classifier = nn.Sequential(
            nn.Linear(in_features=(25 * 25 * 64), out_features=1024),
            nn.ReLU(),
            nn.Linear(in_features=1024, out_features=2))

    def forward(self, x):
        """forward pass"""

        x = self.convLayer1(x)
        x = self.convLayer2(x)
        x = x.reshape(x.size(0), -1)
        x = self.classifier(x)

        return x


In [8]:
model = ConvNet()
model.class_to_idx = train_dataset.class_to_idx
model.to(device)

# learning rate
learning_rate = 0.0001

# loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [9]:
def validation():
  """function for validation of training results"""

  test_loss = []
  accuracy = []

  for images, labels in valid_dataloader:
    # moving tensors to gpu
    images, labels = images.to(device), labels.to(device)

    #forward pass
    output = model(images)
    loss = criterion(output, labels)
    test_loss.append(loss.item())

    # calculating accuracy
    total = labels.size(0)
    _, prediction = torch.max(output.data, dim=1)
    correct = (prediction == labels).sum().item()
    accuracy.append(correct / total)

  return test_loss, accuracy

In [None]:
def training():
  """function for training model"""
  epochs = 10
  steps = 0
  train_loss = []
  print_count = 5

  for epoch in range(epochs):
    for images, labels in train_dataloader:
      # forward pass
      steps += 1
      # moving tensors to gpu
      images, labels = images.to(device), labels.to(device)

      output = model(images)
      loss = criterion(output, labels)
      train_loss.append(loss.item())

      # Backpropagation and optimization
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # validation
      if steps % print_count == 0:
        test_loss, accuracy = validation()

        print('Epoch {}/{} | Training loss: {} | Test Loss: {} | Accuracy: {:.4f} %'
        .format(epoch + 1, epochs, sum(train_loss) / BATCH_SIZE, 
                sum(test_loss) / len(valid_dataloader),
                sum(accuracy) / len(valid_dataloader)))
        
        train_loss = []
  print("\nTraining process is now complete!!")

In [None]:
def testing():
  """function for testing model"""
  with torch.no_grad():
    steps = len(test_dataloader)
    test_loss = []
    accuracy = []
    
    for batch, (images, labels) in enumerate(test_dataloader):
      # moving tensors to gpu
      images, labels = images.to(device), labels.to(device)
      output = model(images)
      loss = criterion(output, labels)
      test_loss.append(loss.item())

      # calculating accuracy
      total = labels.size(0)
      _, prediction = torch.max(output.data, dim=1)
      correct = (prediction == labels).sum().item()
      accuracy.append(correct / total)

      print("batch {}".format(batch + 1))
      print("\nPrediction accuracy ={:.1f}% "
      .format((sum(accuracy) / len(test_dataloader) * 100)))

In [None]:
# begin training
training()

In [None]:
# begin testing
testing()

In [None]:
# Save the model state
torch.save(model.state_dict(), model_dir)

# Inference

In [10]:
PATH = 'detector_state.pth'

def load_model(file_path):
    """function to load saved state of model"""
    trained_model = ConvNet()
    trained_model.load_state_dict(torch.load(file_path, map_location=torch.device('cpu')))
    trained_model.eval()

    return trained_model

model  = load_model(PATH)
model.class_to_idx = train_dataset.class_to_idx
model

ConvNet(
  (convLayer1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (convLayer2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (drop_out): Dropout(p=0.5, inplace=False)
  (classifier): Sequential(
    (0): Linear(in_features=40000, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=2, bias=True)
  )
)

In [11]:
class FaceDetector:
    """ Class for the face detector model"""

    def __init__(self, prototype, model):
        self.prototype = prototype
        self.model = model
        self.confidence_threshold = 0.6
        self.classifier = readNetFromCaffe(prototype, model)

    def detect(self, image):
        """method to detect faces in input image"""
        classifier = self.classifier
        height, width = image.shape[:2]
        image_blob = blobFromImage(resize(image, (300, 300)), 1.0, (300, 300), (103.93, 116.77, 123.68))
        classifier.setInput(image_blob)
        detections = classifier.forward()
        faces = []
        
        # loop over the detections
        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by ensuring the 'confidence' is greater than the minimum confidence
            if confidence > self.confidence_threshold:
                # compute the coordinates of the bounding box for the object
                box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
                start_x, start_y, end_x, end_y = box.astype("int")
                # ensuring the bounding boxes fall within the dimensions of the frame
                faces.append(np.array([start_x, start_y, end_x - start_x, end_y - start_y]))

        return faces

In [12]:
def process_image(image):
    """apply normalization to image for the pytorch model"""
    try:
        img = Image.fromarray(image)
        transform = transforms.Compose([transforms.Resize((100, 100)),
                                        transforms.ToTensor(),
                                        transforms.Normalize([0.4437, 0.3848, 0.3613], [0.2972, 0.2702, 0.2581])
                                        ])
        image = transform(img)
        return image
    except IOError:
        pass

In [13]:
def inference(images):
    """method to obtain predictions on passed images"""
    with torch.no_grad():
        classification = []
        index_to_class = {value: key for key, value in model.class_to_idx.items()}
        
        #forward pass
        image = process_image(images)
        output = model(image[None])
        label = output.numpy().argmax()
        classification.append(index_to_class[label])
        
    return classification[-1]

In [53]:
def classification(frame, faces):
    for (start_x, start_y, width, height) in faces:
        # clamp coordinates that are outside of the image
        start_x, start_y = max(start_x, 0), max(start_y, 0)
        # obtain face coordinates
        face_img = frame[start_y:start_y + height, start_x:start_x + width]
        face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
        # make prediction
        prediction = inference(face_img)
        # create bounding box on face and add text label
        label = prediction
        if label == 'with_mask':
            colour = (0, 255, 0) 
        elif label == 'without_mask':
            colour = (0, 0, 255)
            
        cv2.rectangle(frame, (start_x, start_y), (start_x + width, start_y + height), colour, 1)
        cv2.putText(frame, label, (start_x, start_y - 10), font, 0.5, colour, 2)
        
    return frame

In [54]:
def image_prediction(image):
    """function that detects human faces in a given image and makes prediction on it"""
    # load the model for face detection
    face_detector = FaceDetector(prototype_path, face_detection_model_path)
    # read input image
    image = cv2.imread(image)
    # detect faces in input image
    faces = face_detector.detect(image)
    # pass detected face for classification
    classification(image, faces)
    # display the output image
    cv2.imshow('image', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [76]:
image_prediction('trial images/test_image.jpg')

In [92]:
image_prediction('test/t7.jpg')

In [71]:
def video_prediction():
    face_detector = FaceDetector(prototype_path, face_detection_model_path)
    video_capture = cv2.VideoCapture(0)
    time.sleep(2.0)
    
    while True:
        # capture frame by frame
        ret, frame = video_capture.read()
        faces = face_detector.detect(frame)
        classification(frame, faces)
        
        # display the output image
        cv2.imshow('Video', frame)
        if cv2.waitKey(1) & 0xFF ==ord('q'):
            break
            
        video_capture.release()
        cv2.destroyAllWindows()


In [85]:
video_prediction()

AttributeError: 'NoneType' object has no attribute 'shape'