In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
annotations_path = []
for dirname, _, filenames in os.walk('/kaggle/input/face-mask-detection/annotations'):
    for filename in filenames:
        annotations_path.append(os.path.join(dirname, filename))
images_path = []
for dirname, _, filenames in os.walk('/kaggle/input/face-mask-detection/images'):
    for filename in filenames:
        images_path.append(os.path.join(dirname, filename))
        
annotations_path.sort()
images_path.sort()

import torch
print(torch.cuda.memory_summary(device=None, abbreviated=False))
#torch.cuda.empty_cache()

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import torchvision
from torchvision import transforms, datasets, models
import torch
from PIL import Image
import xml.etree.ElementTree as et

class MaskDataset(object):
    def __init__(self, transforms):
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir("/kaggle/input/face-mask-detection/images/")))
    def __getitem__ (self, idx):
        img_path = os.path.join('/kaggle/input/face-mask-detection/images/maksssksksss' + str(idx) + '.png')
        annotation_path = os.path.join("/kaggle/input/face-mask-detection/annotations/maksssksksss" + str(idx) + ".xml")
        img = Image.open(img_path).convert("RGB")
        target = {}
        with open(annotation_path) as annotation:
            xml = et.parse(annotation)
            root = xml.getroot()
            bounding_boxes = []
            labels = []
            for i in range(4,len(root)):
                #bounding_boxes.append([root[i][5][0].text,
                #                           root[i][5][1].text, root[i][5][2].text, root[i][5][3]])
                bounding_boxes.append([int(root[i][5][0].text),
                                           int(root[i][5][1].text), int(root[i][5][2].text), int(root[i][5][3].text)])
                if root[i][0].text== 'with_mask':
                    labels.append(1)
                elif root[i][0].text=='without_mask':
                    labels.append(2)
                else:
                    labels.append(3)
            
            boxes = torch.as_tensor(bounding_boxes, dtype=torch.float32)
            
            labels = torch.as_tensor(labels, dtype=torch.int64)
            
            img_id = torch.tensor([idx])
            
            target["boxes"] = boxes
            target["labels"] = labels
            target["image_id"] = img_id
            
        if self.transforms is not None:
            img = self.transforms(img)
        
        return img, target
    
    def __len__(self):
        return len(self.imgs)

In [None]:
import random

def collate_fn(batch):
    return tuple(zip(*batch))

train_dataset = []
valid_dataset = []

data_transform = transforms.Compose([
        transforms.ToTensor(), 
])

dataset = MaskDataset(data_transform)
for i in range(len(dataset)):
    if i % 5 == 0:
        valid_dataset.append(dataset[i])
    else:
        train_dataset.append(dataset[i])

train_data_loader = torch.utils.data.DataLoader(
  train_dataset, batch_size=4, collate_fn=collate_fn)

valid_data_loader = torch.utils.data.DataLoader(
  valid_dataset, batch_size=4, collate_fn=collate_fn)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
for imgs, annotations in train_data_loader:
    imgs = list(img.to(device) for img in imgs)
    annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
    print(annotations)
    break

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load an instance segmentation model pre-trained pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, 4)

In [None]:
num_epochs = 10
model.to(device)
    
# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)


for epoch in range(num_epochs):
    model.train()   
    i = 0
    loss_total = 0
    for imgs, annotations in train_data_loader:
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()   
        
        loss_total += loss_value
        i+=1
        
        if i % 50 == 0:
            print(f"Iteration #{i} loss: {loss_total/i}")

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()       
    print(f"Epoch #{epoch} loss: {loss_total/i}") 

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def plot_image(img_tensor, annotation):    
    fig,ax = plt.subplots(1)
    img = img_tensor.cpu().data

    # Display the image
    ax.imshow(img.permute(1, 2, 0))
    
    for i in range(len(annotation["boxes"])):
        box = annotation["boxes"][i]
        xmin, ymin, xmax, ymax = box

        # Create a Rectangle patch
        rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='r',facecolor='none')

        # Add the patch to the Axes
        ax.add_patch(rect)
        
        label = annotation["labels"][i]
        
        print(label)
        toWrite = ""
        if label==1:
            toWrite += "With Mask"
        elif label==2:
            toWrite += "Without Mask"
        else:
            toWrite += "Incorrect Mask"
        
        ax.text(xmin, ymax+15, toWrite, fontsize=10, color="green")

    plt.show()
  
j = 0
frames = []
for images, annotations in valid_data_loader:
    imgs = list(img.to(device) for img in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in annotations]
    model.eval()
    with torch.no_grad():
        preds = model(imgs)    

    for i in range(len(imgs)):
        goodPred = { "boxes" : [], "labels" : [], "scores" : []}
        predBoxes = []
        predLabels = []
        predScores = []
        groundTruthBoxes = []
        groundTruthLabels = []
        for j in range(len(preds[i]["scores"])):
            if preds[i]["scores"][j] > 0.01:
                goodPred["boxes"].append(preds[i]["boxes"][j])
                goodPred["labels"].append(preds[i]["labels"][j])
                goodPred["scores"].append(preds[i]["scores"][j])
                predBoxes.append(preds[i]["boxes"][j].cpu().detach().numpy())
                predLabels.append(preds[i]["labels"][j].cpu().detach().numpy())
                predScores.append(preds[i]["scores"][j].cpu().detach().numpy())
        for k in range(len(targets[i]["boxes"])):
            groundTruthBoxes.append(targets[i]["boxes"][k].cpu().detach().numpy())
            groundTruthLabels.append(targets[i]["labels"][k].cpu().detach().numpy())
        #print("Prediction")
        #plot_image(imgs[i], goodPred)
        #print(goodPred["boxes"])
        print("Target")
        plot_image(imgs[i], targets[i])
        print(targets[i]["boxes"])
        print(targets[i]["labels"])
        frames.append((np.array(predBoxes), np.array(predLabels), np.array(predScores), np.array(groundTruthBoxes), 
                       np.array(groundTruthLabels)))
             

In [None]:
pip install --upgrade git+https://github.com/MathGaron/mean_average_precision.git

In [None]:
import numpy as np
from mean_average_precision.ap_accumulator import APAccumulator
from mean_average_precision.utils.bbox import jaccard
import math
import matplotlib.pyplot as plt

DEBUG = False


class DetectionMAP:
    def __init__(self, n_class, pr_samples=11, overlap_threshold=0.5):
        """
        Running computation of average precision of n_class in a bounding box + classification task
        :param n_class:             quantity of class
        :param pr_samples:          quantification of threshold for pr curve
        :param overlap_threshold:   minimum overlap threshold
        """
        self.n_class = n_class
        self.overlap_threshold = overlap_threshold
        self.pr_scale = np.linspace(0, 1, pr_samples)
        self.total_accumulators = []
        self.reset_accumulators()

    def reset_accumulators(self):
        """
        Reset the accumulators state
        TODO this is hard to follow... should use a better data structure
        total_accumulators : list of list of accumulators at each pr_scale for each class
        :return:
        """
        self.total_accumulators = []
        for i in range(len(self.pr_scale)):
            class_accumulators = []
            for j in range(self.n_class):
                class_accumulators.append(APAccumulator())
            self.total_accumulators.append(class_accumulators)

    def evaluate(self, pred_bb, pred_classes, pred_conf, gt_bb, gt_classes):
        """
        Update the accumulator for the running mAP evaluation.
        For exemple, this can be called for each images
        :param pred_bb: (np.array)      Predicted Bounding Boxes [x1, y1, x2, y2] :     Shape [n_pred, 4]
        :param pred_classes: (np.array) Predicted Classes :                             Shape [n_pred]
        :param pred_conf: (np.array)    Predicted Confidences [0.-1.] :                 Shape [n_pred]
        :param gt_bb: (np.array)        Ground Truth Bounding Boxes [x1, y1, x2, y2] :  Shape [n_gt, 4]
        :param gt_classes: (np.array)   Ground Truth Classes :                          Shape [n_gt]
        :return:
        """

        if pred_bb.ndim == 1:
            pred_bb = np.repeat(pred_bb[:, np.newaxis], 4, axis=1)
        IoUmask = None
        if len(pred_bb) > 0:
            IoUmask = self.compute_IoU_mask(pred_bb, gt_bb, self.overlap_threshold)
        for accumulators, r in zip(self.total_accumulators, self.pr_scale):
            if DEBUG:
                print("Evaluate pr_scale {}".format(r))
            self.evaluate_(IoUmask, accumulators, pred_classes, pred_conf, gt_classes, r)

    @staticmethod
    def evaluate_(IoUmask, accumulators, pred_classes, pred_conf, gt_classes, confidence_threshold):
        pred_classes = pred_classes.astype(np.int)
        gt_classes = gt_classes.astype(np.int)

        for i, acc in enumerate(accumulators):
            gt_number = np.sum(gt_classes == i)
            pred_mask = np.logical_and(pred_classes == i, pred_conf >= confidence_threshold)
            pred_number = np.sum(pred_mask)
            if pred_number == 0:
                acc.inc_not_predicted(gt_number)
                continue

            IoU1 = IoUmask[pred_mask, :]
            mask = IoU1[:, gt_classes == i]

            tp = DetectionMAP.compute_true_positive(mask)
            fp = pred_number - tp
            fn = gt_number - tp
            acc.inc_good_prediction(tp)
            acc.inc_not_predicted(fn)
            acc.inc_bad_prediction(fp)

    @staticmethod
    def compute_IoU_mask(prediction, gt, overlap_threshold):
        IoU = jaccard(prediction, gt)
        # for each prediction select gt with the largest IoU and ignore the others
        for i in range(len(prediction)):
            maxj = IoU[i, :].argmax()
            IoU[i, :maxj] = 0
            IoU[i, (maxj + 1):] = 0
        # make a mask of all "matched" predictions vs gt
        return IoU >= overlap_threshold

    @staticmethod
    def compute_true_positive(mask):
        # sum all gt with prediction of its class
        return np.sum(mask.any(axis=0))

    def compute_ap(self, precisions, recalls):
        """
        Compute average precision of a particular classes (cls_idx)
        :param cls:
        :return:
        """
        previous_recall = 0
        average_precision = 0
        for precision, recall in zip(precisions[::-1], recalls[::-1]):
            average_precision += precision * (recall - previous_recall)
            previous_recall = recall
        return average_precision

    def compute_precision_recall_(self, class_index, interpolated=True):
        precisions = []
        recalls = []
        for acc in self.total_accumulators:
            precisions.append(acc[class_index].precision)
            recalls.append(acc[class_index].recall)

        if interpolated:
            interpolated_precision = []
            for precision in precisions:
                last_max = 0
                if interpolated_precision:
                    last_max = max(interpolated_precision)
                interpolated_precision.append(max(precision, last_max))
            precisions = interpolated_precision
        return precisions, recalls

    def plot_pr(self, ax, class_name, precisions, recalls, average_precision):
        ax.step(recalls, precisions, color='b', alpha=0.2,
                where='post')
        ax.fill_between(recalls, precisions, step='post', alpha=0.2,
                        color='b')
        ax.set_ylim([0.0, 1.05])
        ax.set_xlim([0.0, 1.0])
        ax.set_xlabel('Recall')
        ax.set_ylabel('Precision')
        ax.set_title('{0:} : AUC={1:0.2f}'.format(class_name, average_precision))

    def plot(self, interpolated=True, class_names=None):
        """
        Plot all pr-curves for each classes
        :param interpolated: will compute the interpolated curve
        :return:
        """
        grid = int(math.ceil(math.sqrt(self.n_class)))
        fig, axes = plt.subplots(nrows=grid, ncols=grid)
        mean_average_precision = []
        # TODO: data structure not optimal for this operation...
        for i, ax in enumerate(axes.flat):
            if i > self.n_class - 1:
                break
            if i == 0:
                continue
            precisions, recalls = self.compute_precision_recall_(i, interpolated)
            print(i, ' ', precisions , ' ', recalls[0])
            average_precision = self.compute_ap(precisions, recalls)
            class_name = class_names[i] if class_names else "Class {}".format(i)
            self.plot_pr(ax, class_name, precisions, recalls, average_precision)
            mean_average_precision.append(average_precision)

        plt.suptitle("Mean average precision : {:0.2f}".format(sum(mean_average_precision)/len(mean_average_precision)))
        fig.tight_layout()

In [None]:
n_class = 4

mAP = DetectionMAP(n_class)
for frame in frames:
    mAP.evaluate(*frame)
    

mAP.plot()
plt.show()