In [None]:
import numpy as np

import os
import sys
import torch
import torch.optim as optim
import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import cv2
import time
from collections import Counter
!pip install xmltodict
import xmltodict
from xml.etree import ElementTree as et

import matplotlib.pyplot as plt
import matplotlib.patches as patches
from bs4 import BeautifulSoup

# Dataloader

In [None]:
img_names=[] 
xml_names=[] 
for dirname, _, filenames in os.walk('/kaggle/input/face-mask-detection/'):
    for filename in filenames:
        if os.path.join(dirname, filename)[-3:]!="xml":
            img_names.append(filename)
        else:
            xml_names.append(filename)
            
path_annotations="/kaggle/input/face-mask-detection/annotations/" 
listing=[]
for i in img_names[:]:
    with open(path_annotations+i[:-4]+".xml") as fd:
        doc=xmltodict.parse(fd.read())
    temp=doc["annotation"]["object"]
    if type(temp)==list:
        for i in range(len(temp)):
            listing.append(temp[i]["name"])
    else:
        listing.append(temp["name"])


Items = Counter(listing).keys()
values = Counter(listing).values()

fig, (ax1, ax2) = plt.subplots(1, 2, figsize =(14,6))
background_color = '#faf9f4'
ax1.set_facecolor(background_color)
ax2.set_facecolor(background_color) 
ax1.pie(values,wedgeprops=dict(width=0.3, edgecolor='w') ,
        labels=Items, radius=1, startangle = 120, autopct='%1.2f%%')

ax2 = plt.bar(Items, list(values),
              color ='maroon',width = 0.4)

plt.show()

In [None]:
images_dir = '../input/face-mask-detection/images/'
annotations_dir = '../input/face-mask-detection/annotations/'

'''!mkdir train_data
!mkdir test_data
!cp -r ../input/yolov5trainedep99/yolov5/data/train/images train_data
!cp -r ../input/yolov5trainedep99/yolov5/data/train/labels train_data
!cp -r ../input/yolov5trainedep99/yolov5/data/val/images/* train_data/images
!cp -r ../input/yolov5trainedep99/yolov5/data/val/labels/* train_data/labels
!cp -r ../input/yolov5trainedep99/yolov5/data/test/* test_data'''

In [None]:
class MaskDataset(torch.utils.data.Dataset):
    
    def __init__(self, images_dir, annotation_dir,width, height, transforms=None):
        self.transforms = transforms
        self.images_dir = images_dir
        self.annotation_dir = annotation_dir
        self.height = height
        self.width = width
        
        self.imgs = [image for image in sorted(os.listdir(images_dir))]
        self.annotate = [image for image in sorted(os.listdir(annotation_dir))]
        
        self.classes = [_, 'without_mask','with_mask','mask_weared_incorrect']

    def __getitem__(self, idx):
        img_name = self.imgs[idx]
        image_path = os.path.join(self.images_dir, img_name)
        
        # BGR2RGB, resize and normalized evey images before transform()
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img_res = cv2.resize(img_rgb, (self.width, self.height), cv2.INTER_AREA)
        img_res /= 255.0
        
        annot_filename = self.annotate[idx]
        annot_file_path = os.path.join(self.annotation_dir, annot_filename)
        
        boxes = []
        labels = []
        tree = et.parse(annot_file_path)
        root = tree.getroot()
        
        wt = img.shape[1]
        ht = img.shape[0]
        
        for member in root.findall('object'):
            labels.append(self.classes.index(member.find('name').text))
            
            xmin = int(member.find('bndbox').find('xmin').text)
            xmax = int(member.find('bndbox').find('xmax').text)
            ymin = int(member.find('bndbox').find('ymin').text)
            ymax = int(member.find('bndbox').find('ymax').text)
            
            xmin_corr = (xmin/wt)*self.width
            xmax_corr = (xmax/wt)*self.width
            ymin_corr = (ymin/ht)*self.height
            ymax_corr = (ymax/ht)*self.height
            
            boxes.append([xmin_corr, ymin_corr, xmax_corr, ymax_corr])
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["area"] = area
        target["iscrowd"] = iscrowd
        
        image_id = torch.tensor([idx])
        target["image_id"] = image_id

        if self.transforms:
            sample = self.transforms(image = img_res, 
                                     bboxes = target['boxes'],
                                     labels = labels)
            img_res = sample['image']
            target['boxes'] = torch.Tensor(sample['bboxes'])
            
        return img_res, target

    def __len__(self):
        return len(self.imgs)

In [None]:
def plot_img_bbox(img, target):
    fig, a = plt.subplots(1,1) 
    fig.set_size_inches(5,5) 
    a.imshow(img, cmap='gray')
    for box in (target['boxes']):
        x, y, width, height  = box[0], box[1], box[2]-box[0], box[3]-box[1]
        rect = patches.Rectangle((x, y),
                                 width, height,
                                 linewidth = 2,
                                 edgecolor = 'white',
                                 facecolor = 'none')

        a.add_patch(rect)
    plt.show()

In [None]:
def get_transform(train):
    if train:
        return A.Compose([
                            A.OneOf([
                                A.MotionBlur(p=0.2),
                                A.MedianBlur(blur_limit=3, p=0.2),
                                A.Blur(blur_limit=3, p=0.2),
                            ], p=0.4),
                            A.RandomBrightnessContrast(p=0.1),
                            A.RandomGamma(gamma_limit=(80, 120), eps=None, always_apply=False, p=0.2),
                            ToTensorV2(p=1.0)
                        ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
    else:
        return A.Compose([
                            ToTensorV2(p=1.0)
                        ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
from sklearn.model_selection import train_test_split

dataset = MaskDataset(images_dir, annotations_dir, 480, 480, transforms= get_transform(train=True))
dataset_test = MaskDataset(images_dir, annotations_dir, 480, 480, transforms= get_transform(train=False))

train_dataset, _ = train_test_split(dataset, test_size=0.1, random_state=22)
_, test_dataset = train_test_split(dataset_test, test_size=0.1, random_state=22)
test_dataset, val_dataset = train_test_split(test_dataset, test_size=0.07, random_state=22)

data_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=8, shuffle=True, num_workers=4,
    collate_fn=collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    val_dataset, batch_size=4, shuffle=False, num_workers=4,
    collate_fn=collate_fn)

# Model

In [None]:
def get_model_instance_segmentation(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features 
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

# Train Model

In [None]:
def IOU(box1, box2):
    xmin_inter = max(box1[0], box2[0])
    ymin_inter = max(box1[1], box2[1])
    xmax_inter = min(box1[2], box2[2])
    ymax_inter = min(box1[3], box2[3])

    inter_area = max(0, xmax_inter - xmin_inter + 1) * max(0, ymax_inter - ymin_inter + 1) # FIXME why plus one?
 
    area1 = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    area2 = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
 
    iou = inter_area / float(area1 + area2 - inter_area)
    assert iou >= 0
    return iou

In [None]:
def compute_AP(ground_truth, predictions, iou_thresh, n_classes=4):
    # Initialize lists
    APs = []
    class_gt = []
    class_predictions = []
    
    for c in range(n_classes):
        # Find gt and predictions of the class
        for gt in ground_truth:
            if gt[4] == c: # 4->label
                class_gt.append(gt)
        for predict in predictions:
            if predict[4] == c:
                class_predictions.append(predict)
                
        gt_amount_bb = Counter([gt[1] for gt in class_gt])
        for key, val in gt_amount_bb.items():
            gt_amount_bb[key] = np.zeros(val)

        class_predictions = sorted(class_predictions, key=lambda x: x[5], reverse=True)

        TP = np.zeros(len(class_predictions))
        FP = np.zeros(len(class_predictions))
        truth = len(class_gt)
        epsilon = 1e-6

        for predict_idx, prediction in enumerate(class_predictions):
            image_gt = [obj for obj in class_gt if obj[1] == prediction[1]]

            best_iou = -1
            best_gt_iou_idx = -1

            for gt_idx, gt in enumerate(image_gt):
                iou = IOU(prediction[3], gt[3])
                if iou > best_iou:
                    best_iou = iou
                    best_gt_iou_idx = gt_idx

            if best_iou > iou_thresh and best_gt_iou_idx > -1:
                # Check if gt box was already covered
                if  gt_amount_bb[prediction[1]][best_gt_iou_idx] == 0:
                    gt_amount_bb[prediction[1]][best_gt_iou_idx] = 1  # set as covered
                    TP[predict_idx] = 1  # Count as true positive
                else:
                    FP[predict_idx] = 1
            else:
                FP[predict_idx] = 1

        # Calculate recall and precision
        TP_cumsum = np.cumsum(TP)
        FP_cumsum = np.cumsum(FP)
        recall = np.append([0], TP_cumsum / (truth + epsilon))
        precision = np.append([1], np.divide(TP_cumsum, (TP_cumsum + FP_cumsum + epsilon)))
        AP = np.trapz(precision, recall)
        APs.append(AP)
        print(f"class = {c}, precision = {precision.mean()}, recall = {recall.mean()}, AP = {AP.mean()}")

    return sum(APs)/3  # average of class precisions

In [None]:
def compute_mAP(ground_truth, predictions, n_classes):
    iou_thresh = 0.5
    mAP = compute_AP(ground_truth, predictions, iou_thresh, n_classes)
    return mAP

In [None]:
def evaluate(model, data_loader, device, sequences=1):
    # Set evaluation mode flag
    model.eval()
    ground_truth = []
    predictions = []

    for image, targets in data_loader:
        image = [img.to(device) for img in image]
        outputs = model(image)
        for idx in range(len(outputs)):
            outputs[idx] = apply_nms(outputs[idx], iou_thresh=0.3)

        for s in range(sequences):
            obj_gt = 0
            obj_target = 0
            for out, target in zip(outputs, targets):# (output, target總batch數量一樣)

                for i in range(len(target['boxes'])): 
                    ground_truth.append([s, target['image_id'].detach().cpu().numpy()[0], obj_target,
                                         target['boxes'].detach().cpu().numpy()[i],
                                         target['labels'].detach().cpu().numpy()[i], 1])
                    obj_target += 1

                for j in range(len(out['boxes'])):# 每個output batch大小不同，因為前面的nms confidence刪掉一些重疊的
                    predictions.append([s, target['image_id'].detach().cpu().numpy()[0], obj_gt,
                                        out['boxes'].detach().cpu().numpy()[j],
                                        out['labels'].detach().cpu().numpy()[j],
                                        out['scores'].detach().cpu().numpy()[j]])
                    obj_gt += 1

    mAP = compute_mAP(ground_truth, predictions, n_classes=4)

    return mAP

In [None]:
def apply_nms(orig_prediction, iou_thresh=0.3):
    keep = torchvision.ops.nms(orig_prediction['boxes'], orig_prediction['scores'], iou_thresh)
    
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]
    
    return final_prediction

def torch_to_pil(img):
    return transforms.ToPILImage()(img).convert('RGB')

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 4
model = get_model_instance_segmentation(num_classes) 
epochs = 3
b = 0.1
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,momentum=0.9, weight_decay=0.0005)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3,gamma=0.1)  
len_dataloader = len(data_loader)
len_dataloader_test = len(data_loader_test)
epoch_loss_min = 1000
mAP_max = 0
E = []
L = []
LT = []
M = []
MT = []
loss_classifier_mean = []
loss_classifier_mean_test = []
loss_box_reg_mean = []
loss_box_reg_mean_test = []
loss_objectness_mean = []
loss_objectness_mean_test = []
loss_rpn_box_reg_mean = []
loss_rpn_box_reg_mean_test = []
for epoch in range(epochs):
    print('training:', epoch + 1, 'of', epochs)
    model.train()
    epoch_loss = 0
    epoch_loss_test = 0
    loss_classifier = []
    loss_box_reg = []
    loss_objectness = []
    loss_rpn_box_reg = []
    for imgs, annotations in data_loader:
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)

        loss_classifier.append(loss_dict['loss_classifier'].detach().cpu().numpy())
        loss_box_reg.append(loss_dict['loss_box_reg'].detach().cpu().numpy())
        loss_objectness.append(loss_dict['loss_objectness'].detach().cpu().numpy())
        loss_rpn_box_reg.append(loss_dict['loss_rpn_box_reg'].detach().cpu().numpy())

        losses = sum(loss for loss in loss_dict.values())
        flood = abs(losses-b)

        optimizer.zero_grad() 
        flood.backward()
        optimizer.step()
        epoch_loss += losses.item()
    epoch_loss /= len_dataloader
    lc = np.mean(loss_classifier)
    loss_classifier_mean.append(lc)
    lbr = np.mean(loss_box_reg)
    loss_box_reg_mean.append(lbr)
    lo = np.mean(loss_objectness)
    loss_objectness_mean.append(lo)
    lrbr = np.mean(loss_rpn_box_reg)
    loss_rpn_box_reg_mean.append(lrbr)

    with torch.no_grad():
        loss_classifier = []
        loss_box_reg = []
        loss_objectness = []
        loss_rpn_box_reg = []
        for imgs, annotations in data_loader_test:
            imgs = list(img.to(device) for img in imgs)
            annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
            loss_dict = model(imgs, annotations)

            loss_classifier.append(loss_dict['loss_classifier'].detach().cpu().numpy())
            loss_box_reg.append(loss_dict['loss_box_reg'].detach().cpu().numpy())
            loss_objectness.append(loss_dict['loss_objectness'].detach().cpu().numpy())
            loss_rpn_box_reg.append(loss_dict['loss_rpn_box_reg'].detach().cpu().numpy())

            losses = sum(loss for loss in loss_dict.values())
            epoch_loss_test += losses.item()
        epoch_loss_test /= len_dataloader_test
        lct = np.mean(loss_classifier)
        loss_classifier_mean_test.append(lct)
        lbrt = np.mean(loss_box_reg)
        loss_box_reg_mean_test.append(lbrt)
        lot = np.mean(loss_objectness)
        loss_objectness_mean_test.append(lot)
        lrbrt = np.mean(loss_rpn_box_reg)
        loss_rpn_box_reg_mean_test.append(lrbrt)

    E.append(epoch + 1)
    L.append(epoch_loss)
    LT.append(epoch_loss_test)
    model.eval()
    mAP = evaluate(model, data_loader, device=device)
    mAP_test = evaluate(model, data_loader_test, device=device)
    M.append(mAP)
    MT.append(mAP_test)
    print(f'Epoch={epoch + 1}, train_loss={epoch_loss}, test_loss={epoch_loss_test},mAP_train ={mAP}, mAP_test ={mAP_test}')
    if epoch_loss < epoch_loss_min:
        epoch_loss_min = epoch_loss
        torch.save(model.state_dict(), 'model_loss_best_10_epochs.pt')
    if mAP_test > mAP_max:
        mAP_max = mAP_test
        torch.save(model.state_dict(), 'model_mAP_best_10_epochs.pt')

In [None]:
plt.title('loss function')
plt.xlabel('epoch')
plt.ylabel('epoch-loss')
plt.plot(E, L) #, color='red')
plt.plot(E, LT) #, color='blue')
plt.legend(labels=["train","test"],loc="upper right")
plt.show()

plt.title('mean Average Precision')
plt.xlabel('epoch')
plt.ylabel('mAP')
plt.plot(E, M) # , color='red')
plt.plot(E, MT) # , color='blue')
plt.legend(labels=["train","test"],loc="upper right")
plt.show()

plt.title('classification loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.plot(E, loss_classifier_mean) #, color='red')
plt.plot(E, loss_classifier_mean_test) #, color='blue')
plt.legend(labels=["train","test"],loc="upper right")
plt.show()

plt.title('regression loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.plot(E, loss_box_reg_mean)
plt.plot(E, loss_box_reg_mean_test) 
plt.legend(labels=["train","test"],loc="upper right",fontsize=6)
plt.show()

plt.title('loss_objectness')
plt.xlabel('epoch')
plt.ylabel('oss_objectness')
plt.plot(E, loss_objectness_mean) #, color='red')
plt.plot(E, loss_objectness_mean_test) #, color='blue')
plt.legend(labels=["loss_box_reg_train","loss_box_reg_test"],loc="upper right",fontsize=6)
plt.show()


plt.title('loss_rpn_box_reg')
plt.xlabel('epoch')
plt.ylabel('loss_rpn_box_reg')
plt.plot(E, loss_rpn_box_reg_mean) #, color='red')
plt.plot(E, loss_rpn_box_reg_mean_test) #, color='blue')
plt.legend(labels=["loss_rpn_box_reg_train","loss_rpn_box_reg_test"],loc="upper right",fontsize=6)
plt.show()

plt.title('losses of train')
plt.xlabel('epoch')
plt.ylabel('losses')
plt.plot(E, L)
plt.plot(E, loss_classifier_mean, label='loss_classifier')
plt.plot(E, loss_box_reg_mean, label='loss_box_reg')
plt.plot(E, loss_objectness_mean, label='loss_objectness')
plt.plot(E, loss_rpn_box_reg_mean, label='loss_rpn_box_reg')
plt.legend(labels=["loss_train","loss_classifier","loss_box_reg","loss_objectness","loss_rpn_box_reg"],loc="upper right",fontsize=6)
plt.show()

plt.title('losses of test')
plt.xlabel('epoch')
plt.ylabel('losses')
plt.plot(E, LT)
plt.plot(E, loss_classifier_mean_test, label='loss_classifier')
plt.plot(E, loss_box_reg_mean_test, label='loss_box_reg')
plt.plot(E, loss_objectness_mean_test, label='loss_objectness')
plt.plot(E, loss_rpn_box_reg_mean_test, label='loss_rpn_box_reg')
plt.legend(labels=["loss_test","loss_classifier","loss_box_reg","loss_objectness","loss_rpn_box_reg"],loc="upper right",fontsize=6)
plt.show()

In [None]:
nms_prediction = apply_nms(prediction, iou_thresh=0.3)
print('NMS APPLIED MODEL OUTPUT')
print ("Predicted NMS Labels: ",len(nms_prediction['labels']))
plot_img_bbox(torch_to_pil(img), nms_prediction)

# Function to plot image

In [None]:
def plot_image(img_tensor, annotation,predict=True):
    
    fig,ax = plt.subplots(1)
    fig.set_size_inches(18.5, 10.5)
    img = img_tensor.cpu().data
    mask_dic = {1:'without_mask', 2:'with_mask', 3:'mask_worn_incorrectly'}

    # Display the image
    ax.imshow(img.permute(1, 2, 0), cmap='gray')
    
    for i,box in enumerate(annotation["boxes"]):
        xmin, ymin, xmax, ymax = box

        # Create a Rectangle patch
        rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=2,edgecolor='white',facecolor='none')

        # Add the patch to the Axes
        ax.add_patch(rect)
        label = mask_dic[int(annotation['labels'][i].data)]
        if predict:
            score = int((annotation['scores'][i].data) * 100)
            if label == 'with_mask':
                ax.text(xmin, ymin, f"{label} : {score}%", horizontalalignment='center', verticalalignment='center',fontsize=20,color='g')
            else:
                ax.text(xmin, ymin, f"{label} : {score}%", horizontalalignment='center', verticalalignment='center',fontsize=20,color='r')
        else:
            score=''
            if label == 'with_mask':
                ax.text(xmin, ymin, f"{label}", horizontalalignment='center', verticalalignment='center',fontsize=20,color='g')
            else:
                ax.text(xmin, ymin, f"{label}", horizontalalignment='center', verticalalignment='center',fontsize=20,color='r')
    plt.show()
    

In [None]:
idx = 0;
while idx < len(imgs):
    nms_prediction = apply_nms(preds[idx], iou_thresh=0.3)
    print(f'Prediction {idx+1}')
    plot_image(imgs[idx], nms_prediction)
    print(f'Target {idx+1}')
    plot_image(imgs[idx].to('cpu'), annotations[idx],False)
    
    idx = idx + 1