In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from bs4 import BeautifulSoup
import torchvision
from torchvision import transforms, datasets, models
import torch
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image
import matplotlib.pyplot as plt
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
import matplotlib.patches as patches
import os
import json

import cv2 

import pickle as pkl
import shutil
%matplotlib inline

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
is_training = False
save_tensors_to_plot = True
use_aug_data = False

In [3]:
import random
random.seed(42)

In [4]:
if not is_training:
    !pip uninstall typing -y
    !pip install object_detection_metrics
    from podm import coco_decoder
    from podm.metrics import get_pascal_voc_metrics, MetricPerClass, get_bounding_boxes, BoundingBox
    from podm.box import Box, intersection_over_union

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
torch.cuda.is_available()

# Retrieve all the image and label files

In [6]:
test_label_path = "../input/facemask-yangxu-dataset/testing_dataset/annotations"
test_label_files = [os.path.join(test_label_path, file) for file in sorted(os.listdir(test_label_path))]
test_img_path = "../input/facemask-yangxu-dataset/testing_dataset/images"
test_img_files = [os.path.join(test_img_path, file) for file in sorted(os.listdir(test_img_path))]

In [7]:
if use_aug_data:
    train_label_path = "../input/facemask-yangxu-dataset/training_dataset_with_aug/annotations"    
    train_img_path = "../input/facemask-yangxu-dataset/training_dataset_with_aug/images"
else:
    train_label_path = "../input/facemask-yangxu-dataset/training_dataset/annotations"    
    train_img_path = "../input/facemask-yangxu-dataset/training_dataset/images"
    
train_label_files = [os.path.join(train_label_path, file) for file in sorted(os.listdir(train_label_path))]
train_img_files = [os.path.join(train_img_path, file) for file in sorted(os.listdir(train_img_path))]    

In [8]:

def generate_box(obj):
    
    xmin = int(obj.find('xmin').text)
    ymin = int(obj.find('ymin').text)
    xmax = int(obj.find('xmax').text)
    ymax = int(obj.find('ymax').text)
    
    return [xmin, ymin, xmax, ymax]


def generate_label(obj):
    if obj.find('name').text == "with_mask":
        return 1
    elif obj.find('name').text == "mask_weared_incorrect":
        return 2
    elif obj.find('name').text == "without_mask":
        return 3
    return 0

def generate_target_xml(image_id, file):
    with open(file) as f:
        data = f.read()
        soup = BeautifulSoup(data, 'xml')
        objects = soup.find_all('object')

        num_objs = len(objects)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        labels = []
        for i in objects:
            boxes.append(generate_box(i))
            labels.append(generate_label(i))
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([image_id])
        # Annotation is in dictionary format
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = img_id
        
        return target
    
def generate_target_json(image_id, file):
    with open(file) as f:
        data = json.load(f)
    target = {}
#     print(data)
    target["boxes"] = torch.as_tensor(data["boxes"], dtype=torch.float32)
    target["labels"] = torch.as_tensor(data["labels"], dtype=torch.int64)
    target["image_id"] = torch.tensor([image_id])
    return target

    
def generate_target(image_id, file): 
    if file.endswith(".xml"):
        return generate_target_xml(image_id, file)
    elif file.endswith(".json"):
        return generate_target_json(image_id, file)
    else:
        raise ValueError(f"Unrecognized file: {file}")


# Build weight sampler

In [9]:
# # def generate_label(obj):
# #     if obj.find('name').text == "with_mask":
# #         return 1
# #     elif obj.find('name').text == "mask_weared_incorrect":
# #         return 2
# #     elif obj.find('name').text == "without_mask":
# #         return 3
# #     return 0
# if is_training:
#     nclasses = 4
#     count = [0] * nclasses                                                      
#     training_labels = []
#     def find_least_count_label(labels):
#         if 2 in labels:
#             return 2  
#         if 3 in labels:
#             return 3              
#         if 1 in labels:
#             return 1  
#         return 0        

#     for training_label_file in train_label_files:    
#         target = generate_target(0, training_label_file)           
#         for label in target["labels"]:
#             count[int(label)] += 1
#         training_labels.append(target["labels"])    
#     weight_per_class = [0.] * 4      
#     print(f"count = {count}")
#     N = float(sum(count))                                                   
#     for i in range(nclasses):  
#         if count[i] == 0:
#             weight_per_class[i] = 1e-10
#         else:
#             weight_per_class[i] = N/float(count[i])                                 
#     weights = [0] * len(training_labels)                                              
#     for idx, labels in enumerate(training_labels):           
#         weights[idx] = weight_per_class[find_least_count_label(labels.detach().tolist())]                                  

#     assert len(weights) == len(training_label_files)
#     weights = torch.DoubleTensor(weights)    
#     sampler = torch.utils.data.sampler.WeightedRandomSampler(weights, len(weights))                     
    

In [10]:
class MaskDataset(object):
    def __init__(self, transforms, img_files, label_files):
        self.transforms = transforms
        assert len(img_files) == len(label_files)
        self.imgs = img_files
        self.labels = label_files

    def __getitem__(self, idx):
        # load images ad masks
        img_path = self.imgs[idx]
        label_path = self.labels[idx]
        assert os.path.exists(img_path) and os.path.exists(label_path)
        img = Image.open(img_path).convert("RGB")
        #Generate Label
        target = generate_target(idx, label_path)
        
        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)

In [11]:
data_transform = transforms.Compose([
        transforms.ToTensor(), 
    ])
def collate_fn(batch):
    return tuple(zip(*batch))

In [12]:
train_dataset = MaskDataset(data_transform, train_img_files, train_label_files)
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=4, collate_fn=collate_fn)
    
test_dataset = MaskDataset(data_transform, test_img_files, test_label_files)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn)

In [13]:
def compute_dataset_statics(data_loader):
    num_no_mask = 0
    num_wear_mask = 0
    num_wear_mask_wrong = 0
    num_other = 0
    for _, annotations in data_loader:        
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        for annotation in annotations:
            for label in annotation["labels"].detach().tolist():
                if label == 0:
                    num_other += 1
                elif label == 1:
                    num_wear_mask += 1
                elif label == 2:
                    num_wear_mask_wrong += 1
                elif label == 3:
                    num_no_mask += 1
                else:
                    raise ValueError(f"Unrecognized label: {label}")
    print(f"DEBUG: num_no_mask_label = {num_no_mask}, num_wear_mask = {num_wear_mask}, num_wear_mask_wrong = {num_wear_mask_wrong}, num_other = {num_other}")

compute_dataset_statics(train_data_loader)
compute_dataset_statics(test_data_loader)

# Model

In [14]:
def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [15]:
num_classes = 4
model = get_model_instance_segmentation(num_classes)

In [16]:
if not is_training:
    torch.cuda.empty_cache()
    model_path = "../input/models/model.random_set.pt"
    model2 = get_model_instance_segmentation(num_classes)
    if torch.cuda.is_available():
        model2.load_state_dict(torch.load(model_path))
    else:
        model2.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
    model2.eval()
    model2.to(device)

In [17]:
def apply_nms(orig_prediction, iou_thresh):
    """
    Applies non max supression and eliminates low score bounding boxes.

      Args:
        orig_prediction: the model output. A dictionary containing element scores and boxes.
        iou_thresh: Intersection over Union threshold. Every bbox prediction with an IoU greater than this value
                      gets deleted in NMS.

      Returns:
        final_prediction: Resulting prediction
    """

    # torchvision returns the indices of the bboxes to keep
    keep_high_score = np.where(orig_prediction['scores'].cpu() >= 0.5)
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep_high_score]
    final_prediction['scores'] = final_prediction['scores'][keep_high_score]
    final_prediction['labels'] = final_prediction['labels'][keep_high_score]        
        
    keep = torchvision.ops.nms(final_prediction['boxes'], final_prediction['scores'], iou_thresh)

    # Keep indices from nms
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]

    return final_prediction

In [26]:
if not is_training:
    IOU_threshold = 0.4
    imgs_to_plot = []
    annotations_to_plot = []
    predictions_to_plot = []

    torch.no_grad()
    detection_boxes = []
    ground_truth_boxes = []    
    num_imgs_processed = 0
    for imgs, annotations in test_data_loader:
        num_imgs_processed += len(imgs)
        testing_img_tensors = list(img.to(device) for img in imgs)
        preds = model2(testing_img_tensors)
        
        assert len(preds) == len(annotations)
        ######### How to find prediction with different label as annotation
        for pred_idx, pred in enumerate(preds):
            final_pred = apply_nms(pred, IOU_threshold)
            labels = pred["labels"].detach().tolist()
#             print(f"DEBUG: predicted labels = {labels}")
            scores = pred["scores"].detach().tolist()  
            for box_idx, box in enumerate(final_pred["boxes"]):
                xmin, ymin, xmax, ymax = box.tolist()
                bb = BoundingBox.of_bbox(pred_idx, labels[box_idx], xmin, ymin, xmax, ymax, scores[box_idx])
                detection_boxes.append(bb)
        for annotation_idx, annotation in enumerate(annotations):
            labels = annotation["labels"].detach().tolist()
            boxes = annotation["boxes"].detach().tolist()    
            for box_idx, box in enumerate(boxes):
                xmin, ymin, xmax, ymax = box    
                if labels[box_idx] == 2 and save_tensors_to_plot and len(imgs_to_plot) < 8:
                    print(f"Found label = 2, annotation_idx = {annotation_idx}")
                    imgs_to_plot.append(imgs[annotation_idx])
                    annotations_to_plot.append(annotations[annotation_idx])
                    final_pred = apply_nms(preds[annotation_idx], IOU_threshold)
                    scores = final_pred["scores"].detach().tolist()
                    print(f"DEBUG: scores = {scores}")                        
                    predictions_to_plot.append(final_pred)                
                bb = BoundingBox.of_bbox(annotation_idx, labels[box_idx], xmin, ymin, xmax, ymax, 1.0)
                ground_truth_boxes.append(bb)        
#         print(f"Num of imgs processed {num_imgs_processed}")
    

In [19]:
if not is_training:
    testing_results = get_pascal_voc_metrics(ground_truth_boxes, detection_boxes, .5)

In [20]:
if not is_training:
    for label, metric in testing_results.items():
        print(f"**********{metric.label}***********")
        print('ap', metric.ap)
#         print('precision', metric.precision)
#         print('interpolated_recall', metric.interpolated_recall)
#         print('interpolated_precision', metric.interpolated_precision)
        print('tp', metric.tp)        
        print('fp', metric.fp)
        print('num_groundtruth', metric.num_groundtruth)
        print('num_detection', metric.num_detection)

# Train Model

In [21]:
if is_training:
    num_epochs = 5
    model.to(device)

    # parameters
    torch.cuda.empty_cache()
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

    # Train a model with only the classifier loss
    for epoch in range(num_epochs):
        model.train()
        i = 0    
        epoch_loss = 0
        for imgs, annotations in train_data_loader:
            i += 1
            imgs = list(img.to(device) for img in imgs)
            annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
            loss_dict = model(imgs, annotations)
#             print({k: loss_dict[k].detach().tolist() for k in loss_dict.keys() })
            losses = sum(loss for loss in loss_dict.values())        

            optimizer.zero_grad()
            losses.backward()
            optimizer.step() 
            epoch_loss += losses
            if i % 50 == 0:
                print('epoch = {}, i = {}'.format(epoch, i))
        print(epoch_loss.detach().tolist())


# Save Model

In [22]:
if is_training:
    torch.save(model.state_dict(),'/kaggle/working/model.random_set.pt')

# Function to plot image

In [23]:
def plot_image(img_tensor, annotation):
    
    fig,ax = plt.subplots(1)
    img = img_tensor.cpu().data

    # Display the image
    ax.imshow(img.permute(1, 2, 0))
    
    for idx, box in enumerate(annotation["boxes"]):
        xmin, ymin, xmax, ymax = box

        # Create a Rectangle patch
        label = annotation["labels"][idx]
        if label == 3:
            color = "r"
        elif label == 1:
            color = "g"
        elif label == 2:
            color = "y"
        else:
            raise ValueError(f"Annotation {label} incorrect!")
        rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor=color,facecolor='none')

        # Add the patch to the Axes
        ax.add_patch(rect)

    plt.show()

In [27]:
print(len(imgs_to_plot))
for idx, img in enumerate(imgs_to_plot):
    plot_image(imgs_to_plot[idx], annotations_to_plot[idx]) 
torch.save(imgs_to_plot, "/kaggle/working/imgs_to_plot.pt")
torch.save(annotations_to_plot, "/kaggle/working/annotations_to_plot.pt")
torch.save(predictions_to_plot, "/kaggle/working/predictions_to_plot.pt")

In [28]:
print("Prediction")
for idx, img in enumerate(imgs_to_plot):
    plot_image(imgs_to_plot[idx], predictions_to_plot[idx])