In [29]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
import torchvision
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch
import torch.utils
from torch.utils.data import (
    Dataset,
    DataLoader
)
import numpy as np
from PIL import Image
import math
import sys
import os
import xml.etree.ElementTree as ET
import detection.utils  # Get this folder into the project directory https://github.com/pytorch/vision/tree/main/references/detection
import detection.engine  # In order to work in Colab, you need to change rows 7-9 of engine.py to:
                                                                  #7 import detection.utils
                                                                  #8 from detection.coco_eval import CocoEvaluator
                                                                  #9 from detection.coco_utils import get_coco_api_from_dataset
                                                                  # Also in coco_utils.py you need to change 7th row to: 'import torchvision.transforms as T'

Defining function, which will return pretrained model with a replaced head

In [19]:
def get_model_detection(num_classes):
    # load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

Defining transforms for train and test sets

In [20]:
train_transform = FasterRCNN_ResNet50_FPN_Weights.COCO_V1.transforms
test_transform = FasterRCNN_ResNet50_FPN_Weights.COCO_V1.transforms

In [21]:
def get_transform(train):
    transform = []
    transform.append(transforms.PILToTensor())
    transform.append(transforms.ConvertImageDtype(torch.float))
    if train:
        transform.append(transforms.RandomHorizontalFlip(0.5))
    return transforms.Compose(transform)

Creating a Dataset class 

In [22]:
class MasksDataset(Dataset):
  def __init__(self, root_dir, transform = None):
    self.root_dir = root_dir
    self.transform = transform
    self.images = list(sorted(os.listdir(os.path.join(self.root_dir, "images"))))
    self.annotations = list(sorted(os.listdir(os.path.join(self.root_dir, "annotations"))))

  def __len__(self):
    return len(self.images)

  def __getitem__(self, index):
    img_path = os.path.join(self.root_dir, "images", self.images[index])
    img = Image.open(img_path).convert("RGB")

    annotation_path = os.path.join(self.root_dir, "annotations", self.annotations[index])
    tree = ET.parse(annotation_path)
    root = tree.getroot()

    objects = len(root.findall("./object/name"))

    boxes = []
    labels = []
    for i in range(objects):
        coords = []
        for j in range(4):
            coords.append(int(root[4 + i][5][j].text))
        boxes.append(coords)
        if root[4 + i][0].text == "with_mask":
          labels.append(1)
        elif root[4 + i][0].text == "without_mask":
          labels.append(2)
        else:
          labels.append(3)

    boxes = torch.as_tensor(boxes, dtype=torch.float32)
    labels = torch.as_tensor(labels, dtype=torch.int64)
    image_id = torch.tensor([index])
    area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
    iscrowd = torch.zeros((objects,), dtype=torch.int64)
    target = {}
    target["boxes"] = boxes
    target["labels"] = labels
    target["image_id"] = image_id
    target["area"] = area
    target["iscrowd"] = iscrowd

    if self.transform is not None:
      img = self.transform(img)  

    return img, target



In [23]:
dataset_path = r"/content/drive/MyDrive/FaceMaskDetection"

train_set = MasksDataset(root_dir= dataset_path, transform=get_transform(train=True))
test_set = MasksDataset(root_dir= dataset_path, transform=get_transform(train=False))

In [24]:
model = get_model_detection(4)

In [25]:
dataloader = DataLoader(
 train_set, batch_size=2, shuffle=True, num_workers=2,
 collate_fn=detection.utils.collate_fn)

In [None]:
# Cell for testing outputs
images,targets = next(iter(dataloader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images,targets)
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)
predictions

Training

In [None]:
# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# split the dataset in train and test set
indices = torch.randperm(len(train_set)).tolist()
dataset = torch.utils.data.Subset(train_set, indices[:-50])
dataset_test = torch.utils.data.Subset(test_set, indices[-50:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=2,
    collate_fn=detection.utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=2,
    collate_fn=detection.utils.collate_fn)

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                step_size=3,
                                                gamma=0.1)

# let's train it for 10 epochs
num_epochs = 2

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    detection.engine.train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    torch.save(model.state_dict(), '/content/drive/MyDrive/FaceMaskDetection/model_weights-' + str(epoch) + '.pth')
    # evaluate on the test dataset
    detection.engine.evaluate(model, data_loader_test, device=device)

print("Training is over")