
<h1><div style="text-align: center;"> COVID-19 Detection and Instance semantic
segmentation</div></h1>

<h3><div style="text-align: center;"> A Ascencio-Cabral
</div></h3>


## 1. Introduction



### 1.1 Dataset - Merged Dataset


The 3D data was ingested from four public sources:

- COVID-19 Lung CT Lesion Segmentation - Grand Challenge 2020, 199 patients
https://covid-segmentation.grand-challenge.org/COVID-19-20/

- COVID-19 CT Lung and Infection Segmentation Dataset Zenodo - 20 patients (only infection mask
used)
https://zenodo.org/record/3757476#.YTdEx55Kg1h

- Italian Society of Medical and Interventional Radiology(ISIRM), a volume with images from 48 patients -  Dataset available on SegMed website https://medicalsegmentation.com/covid19/

-  COVID-19 CT scan lesion segmentation dataset – Only the sliced CT and Masks from the MosMedData Dataset COVID19_1110 sliced were used.
https://www.kaggle.com/datasets/maedemaftouni/covid19-ct-scan-lesion-segmentation-dataset

Except for the images from the Morozov dataset, all the other datasets were compressed NIfTi.
Volumes (nii.gz) were sliced on plane z and converted to 2D png images. The images were
process using

The images were merged into one dataset.


### 1.2 Models

- **Mask-RCNN-50-FPN and customised versions**
- **Mask-RCNN-MobileNet-v3-large-FPN**

The model used to segment COVID-19 lesions is Mask-RCNN. This model is the base to build our classification models. Our model consists of Mask-RCNN with two feature extractors, ResNet-50-FPN and MobileNet-v3-large-FPN. Mask-RCNN-ResNet-50-FPN is an off-the-box model from the PyTorch Framework. The anchor generator of the network will be modified, and the kernel size of the first convolutional layer of the models' backbone will also be hacked. Mask-RCNN-MobileNet-v3-large-FPN is not an off-the-box network. We will also create this model with custom anchor generators in this work.

<center>

| Exp |     Model Backbone     | Resized<br/> kernel | Custom <br/>Anchor Generator |
|-----|:----------------------:|:-------------------:|:----------------------------:|
| 1   |    ResNet-50-FPN-v2    |          -          |              -               |
| 2   |    ResNet-50-FPN-v2    |          -          |             Yes              |
| 3   |    ResNet-50-FPN-v2    |        3 x 3        |              -               |
| 4   |    ResNet-50-FPN-v2    |        3 x 3        |             Yes              |
| 5   |    ResNet-50-FPN-v2    |        5 x 5        |              -               |
| 6   |    ResNet-50-FPN-v2    |        5 x 5        |             Yes              |
| 7   | MobileNet-v3-large-FPN |         N/A         |              -               |

</center>

### **1.3 Approach to training - Transfer Learning**

The approach to training the models is transfer learning by fine-tuning all layers of the deep neural networks.


###  **1.4 Evaluation Metrics - Coco style**
Evaluation Metrics - COCO style mean average precision mAP at different intersections over union (IoU) threshold

### 1.5 Code Guidance

The code implementation of this is adapted from Torch Vision Object Detection
Tutorials:
- [torchvision_tutorial-object-detection-finetuning-tutorial](https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html)
- [oneoffcoder.com/object-detection](https://learn-pytorch.oneoffcoder.com/object-detection.html)
- [pytorch/vision/tree/master/references/detection](https://github.com/pytorch/vision/tree/main/references/detection)


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# Comment out if pycocotools is installed
# %%shell
!pip install cython
!pip install -U 'git+https://github.com/cocodataset/cocoapi.git#subdirectory=PythonAPI'

In [None]:
%cd '/content/drive/MyDrive/covid_instance_seg'

## __Libraries__

In [None]:
import os
import time
import datetime
import numpy as np
import pandas as pd
import cv2
from collections import defaultdict

import torch, gc
import torch.nn as nn
import torchvision
from detection import utils
from detection import transforms as T
from detection.engine import train_one_epoch, evaluate
from utility.plotting import training_stats

from torch.utils.data import DataLoader
from torchvision.models.detection.mask_rcnn import MaskRCNN, MaskRCNNPredictor, MaskRCNNHeads
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.mobilenetv3 import mobilenet_v3_large, MobileNet_V3_Large_Weights
from torchvision.models.detection.backbone_utils import _validate_trainable_layers, \
    _mobilenet_extractor
from torchvision.ops import misc as misc_nn_ops
from torchvision.models.detection.rpn import AnchorGenerator, RPNHead
from torchvision.models.detection.faster_rcnn import FastRCNNConvFCHead
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR
from covid_dataset import CovidMerged

import matplotlib.pyplot as plt
%matplotlib inline
np.random.seed(180)
torch.manual_seed(180)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
print(torch.__version__)

In [None]:
!nvidia-smi -L

In [None]:
!nvidia-smi

## 1. Accessing the dataset merged dataset

Here we will unzip the merged dataset. Comment out to decompress the images file.

In [None]:
# Uncomment to unzip
# !unzip './merged_dataset' -d './'

In [None]:
print('Images and masks in dataset')
print(len(os.listdir('merged_dataset/CT')),'&',
      len(os.listdir('merged_dataset/GT')))

In [None]:
imgs = sorted(os.listdir('./merged_dataset/CT'))
msks = sorted(os.listdir('./merged_dataset/GT'))

## 2. Dataset Visualisation

In [None]:
covid_img = cv2.imread('merged_dataset/CT/ct_gz15z093.png')
covid_img = cv2.cvtColor(covid_img, cv2.COLOR_RGBA2RGB)
covid_mask =cv2.imread('merged_dataset/GT/gt_gz15z093.png')
covid_mask = cv2.cvtColor(covid_mask, cv2.COLOR_RGBA2RGB)
plt.subplot(1, 2, 1)
plt.imshow(covid_img)
plt.subplot(1, 2, 2)
plt.imshow(covid_mask)

In [None]:
image_gt= cv2.addWeighted(covid_img, 0.5, covid_mask, 0.5, 0)
plt.imshow(image_gt)

## 3. Building Mask-RCNN-50FPN  and Mask-RCNN-MobileNet- v3-fpn fuctions

Here we will build the function to create the instance segmentation model with ResNet-50-FPN and MobileNet-v3-large-FPN backbones.
PyTorch does not have an off-the-shelf method for Mask-RCNN with MobileNet-v3-FPN; we will use it as a base maskrcnn_resnet50_fpn function to buildmaskrcnn_mobilenet_v3_large_fpn.
Additionally, we will build a function that utilises the Mask-RCNN-ResNet-50-fpn-v2 as the based
model with the capability to customise the anchors and the kernel size of the first convolutional
layer of the backbone.


### 3.1 __Build a customised anchor generator__

In [None]:
def anchorgen_rcnn():
    """
    :return: AnchorGenerator(anchor_sizes, aspect_ratios)
    """
    anchor_sizes = (8, 16, 32, 64, 128, 256)
    aspect_ratios = (0.25, 0.5, 1.0, 1.5, 2.0)
    return AnchorGenerator(sizes=tuple([anchor_sizes for _ in range(5)]),
                           aspect_ratios=tuple([aspect_ratios for _ in range(5)]))

### 3.2 Building Mask-RCNN-50-FPN-V2 with customised anchors  and resized filters

In [None]:
def customise_maskrcnn_50_fpn(num_classes=2, resized_conv=True, size=(5,5), custom_anchors=True):
    """
    :param num_classes: number of classes to classify covid + 1 for the background
    :param resized_conv: a boolean
    :param size: a tuple with the dimesion to resize the kernel, the default is 5 x 5
    :param custom_anchors: if customised anchors will be built
    :return: maskrcnn-resnet-50-fpn with or without customed features
    """

    assert size in [(3,3), (5,5), None], 'resize kernel to (3 x 3) or (5 x 5)'

    model = torchvision.models.detection.maskrcnn_resnet50_fpn_v2(weights='DEFAULT')

    if resized_conv:
        # The mask-rcnn standard kernel size is 7 x 7
        model.backbone.body.conv1 = nn.Conv2d(3, 64, size, (2, 2), (3, 3), bias=False)

    if custom_anchors:
        new_anchors = anchorgen_rcnn()
        model.rpn.anchor_generator = new_anchors

        # Features that FPN returns
        model.rpn.head = RPNHead(256, new_anchors.num_anchors_per_location()[0], conv_depth=2)

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    # Input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256

    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                        hidden_layer,
                                                        num_classes)
    return model

### 3.3 Building Mask-RCNN-MobileNet-v3-FPN

In [None]:
def maskrcnn_mobilenet_v3_large_fpn(*,
    num_classes=None, progress=True, weights_backbone=MobileNet_V3_Large_Weights.IMAGENET1K_V2,
    trainable_backbone_layers=6):
    """ 
    Class adapted from pytorch torchvision.models.detection.maskrcnn_resnet50_fpn` for more details.
    https://github.com/pytorch/vision/blob/main/torchvision/models/detection/mask_rcnn.py
    https://github.com/pytorch/vision/blob/70faba91d1c79f689ac6ebd30ad0c4be62690196
    /torchvision/models/detection/faster_rcnn.py#L653

    Args:
        num_classes (int, optional): number of output classes of the model (including the background)
        progress: boolean to display the weights downloading progress
        weights_backbone (:class:`~torchvision.models.MobileNet_V3_Large_Weights.IMAGENET1K_V1`,): The
            pretrained weights for the backbone.
        trainable_backbone_layers (int, optional): number of trainable (not frozen) layers starting from
            final block. Valid values are between 0 and 6, with 6 meaning all backbone layers are
            trainable. If ``None`` is passed (the default) this value is set to 3.
    :returns Mask-RCNN model with MobileNet-v3-large-FPN backbone
    """

    weights_backbone =  MobileNet_V3_Large_Weights.verify(weights_backbone)

    if num_classes is None:
        num_classes = 91

    is_trained = weights_backbone is not None
    trainable_backbone_layers = _validate_trainable_layers(is_trained, trainable_backbone_layers, 6, 3)
    norm_layer = misc_nn_ops.FrozenBatchNorm2d if is_trained else nn.BatchNorm2d

    backbone = mobilenet_v3_large(weights=weights_backbone, progress=progress, norm_layer=norm_layer)
    backbone = _mobilenet_extractor(backbone, True, trainable_backbone_layers)
    anchor_sizes = ((8, 16, 32, 64, 128, 256,),) * 3
    aspect_ratios = ((0.25, 0.5, 1.0, 2.0),) * len(anchor_sizes)
    rpn_anchor_generator = AnchorGenerator(anchor_sizes, aspect_ratios)   
    rpn_head = RPNHead(backbone.out_channels, rpn_anchor_generator.num_anchors_per_location()[0], conv_depth=2)
    box_head = FastRCNNConvFCHead(
        (backbone.out_channels, 7, 7), [256, 256, 256, 256], [1024], norm_layer=nn.BatchNorm2d)
    # Normalisation can be 'None'
    mask_head = MaskRCNNHeads(backbone.out_channels, [256, 256, 256, 256], 1, norm_layer=nn.BatchNorm2d)
    model = MaskRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=rpn_anchor_generator,
        rpn_score_thresh=0.05, # threshold from fasterrcnn_mobilenet_v3_large_fpn
        rpn_head=rpn_head,
        box_head=box_head,
        mask_head=mask_head)

    return model

### 3.4 __Creating models__



In [None]:
def get_model(net_name, num_classes=2, resized_kernel=True, size=(5, 5), custom_anchors=True):
    """
    :param net_name: a string with the name of the model to be built
    :param num_classes: an integer with the number of classes + background
    :param resized_kernel: a boolean indicating if the kermnel is resized or not, only 
    for models with resnet backbone
    :param size: a tuple with the new kernel dimensions
    :param custom_anchors: a boolean indicating whether to use the standard anchors or custom
    anchors
    :return: model
    """
    assert net_name in ['maskrcnn-resnet50-fpn', 'maskrcnn-mobilenet-v3'], 'Input maskrcnn-resnet50-fpn or maskrcnn-mobilenet-v3'

    if net_name == 'maskrcnn-resnet50-fpn':
        model = customise_maskrcnn_50_fpn(num_classes=num_classes,
                                          resized_conv=resized_kernel, size=size,
                                          custom_anchors=custom_anchors)
        return model

    elif net_name == 'maskrcnn-mobilenet-v3':
        model = maskrcnn_mobilenet_v3_large_fpn(num_classes=num_classes)
        # print('......Maskrcnn-mobilenet-v3-large-fpn pretrained on ImageNet.....')
        return model

### __4.1 Transformations__

Mask-RCNN class resizes the images. We will apply random horizontal flips and scale jitter
augmentation during the training of the models.

In [None]:
def get_transform(train):
    trans = [T.PILToTensor(), T.ConvertImageDtype(dtype=torch.float32)]
    # converts the image, a PIL image, into a PyTorch Tensor
    if train:
       # data augmentation during training
        trans.extend([T.RandomHorizontalFlip(0.5), T.ScaleJitter((800, 800))])
        # trans.append(T.FixedSizeCrop())
    return T.Compose(trans)


### __4.2 Tracking time taken for training__

In [None]:
def time_minutes(s):
    """
    :param s:  time in seconds
    :return: time in hh:mm:ss format
    """
    return time.strftime('%H:%M:%S', time.gmtime(s))

### __4.3  Building the training function__

The dataset will be split  randomly in training, validation and test subsets. First a list of
random indices will be generated by using a random permutation. Then split of rations are
80/10/10 for training, validation and test respectively.

To build the main training function we will use train_one_epoch
and evaluate functions from ```engine.py```.

In [None]:
def main(**train_params):
    """
    :param train_params:  aregument containing the training hyperparameters
    :return: the training  metrics, indices to  prepare the testdataset, the name of the model
    trained, and the time taken for training.
    """

    assert train_params['scheduler'] in ['plateau', 'steps'], 'Only ReduceLROnPlateau ' \
                                                              'and StepLR schedulers available'

    train_path = os.path.join(train_params['data_dir'])

    dataset = CovidMerged(train_path, min_area=train_params['min_area'],
                          trans=get_transform(train=True))
    val_dataset = CovidMerged(train_path, min_area=train_params['min_area'],
                              trans=get_transform(train=False))

    indices = torch.randperm(len(dataset)).tolist()
    indx = round(len(dataset) * 0.80)
    indx_ = round(len(dataset) * 0.90)
    train_dataset = torch.utils.data.Subset(dataset, indices[0:indx])
    val_dataset = torch.utils.data.Subset(val_dataset, indices[indx:indx_])
    test_idx = indices[indx_:]

    # Training and validation data loaders
    train_loader = DataLoader(train_dataset, batch_size=train_params['batch_size'], shuffle=True,
                              num_workers=train_params['workers'], collate_fn=utils.collate_fn)

    val_loader = DataLoader(val_dataset, batch_size=train_params['batch_size'], shuffle=False,
                            num_workers=train_params['workers'], collate_fn=utils.collate_fn)

    # Make the model using the helper function
    model = get_model(train_params['model'], train_params['num_classes'],
                      resized_kernel=train_params['resize'], size=train_params['size'],
                      custom_anchors=train_params['anchors'])

    # send model to the available device GPU or CPU
    model = model.to(device)

    # construct the optimizer
    param = [p for p in model.parameters() if p.requires_grad]

    if train_params['optimizer'] == 'SGD':
        optimizer = torch.optim.SGD(param, lr=train_params['lr'], momentum=train_params['momentum'],
                                    weight_decay=train_params['w_decay'],
                                    nesterov=train_params['nesterov'])

    elif train_params['optimizer'] == 'Adam':
        optimizer = torch.optim.Adam(param, lr=train_params['lr'],
                                     weight_decay=train_params['w_decay'])

    else:
        optimizer = torch.optim.AdamW(param, lr=train_params['lr'])

    # Learning rate scheduler
    if train_params['scheduler'] == 'steps':
        assert train_params['steps'] is not None,  'Input the number of steps'
        print('steps scheduler')
        lr_scheduler = StepLR(optimizer, step_size=train_params['steps'],
                              gamma=train_params['gamma'])

    else:
        print('plateau scheduler')
        lr_scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=train_params['gamma'],
                                         patience=1)

    net_name = train_params['model']
    ex = train_params['exp']

    if train_params['resize'] and train_params['anchors']:
        net_name = f'exp_{ex}_{net_name}_ra'
    elif train_params['resize'] and not train_params['anchors']:
        net_name = f'exp_{ex}_{net_name}_r'
    elif not train_params['resize'] and train_params['anchors']:
        net_name = f'exp_{ex}_{net_name}_a'
    else:
        net_name = f'exp_{ex}_{net_name}'

    if train_params['resume_training']:
        weights_dir = f"{train_params['output_dir']}/{net_name}_resume.pth"
        check_point = torch.load(weights_dir, map_location='cpu')
        model.load_state_dict(check_point['model'])
        optimizer.load_state_dict(check_point['optimizer_dict'])
        s_epoch = check_point['epoch']
        lr_scheduler.load_state_dict(check_point['scheduler'])
        stats_log = check_point['stats']
        best_mAPs = np.max(stats_log['mAPseg'])
        print('resume')

    else:
        s_epoch = 0
        best_mAPs = 0.
        stats_log = defaultdict(list)

    print(f'Training {net_name}')
    epoch_start_time = time.time()

    for epoch in range(s_epoch, train_params['num_epochs']):

        # train one epoch
        loss, _ = train_one_epoch(model, optimizer, train_loader, device, epoch,
                                  print_freq=200)

        # evaluate on the validation dataset
        _, stats = evaluate(model, val_loader, device)

        # AP @ IoU=0.5 and IoU= 0:50:0:95 for bboxes
        mAPb, mAPb50 = round(stats['bbox'][0], 4), round(stats['bbox'][1], 4)

        # AP @ IoU=0.5 and IoU= 0:50:0:95 for segmentation
        mAPseg, mAP50seg = round(stats['segm'][0], 4), round(stats['segm'][1], 4)

        # AP @ IoU=0.75 for detections and segmentations
        mAPb75, mAP75seg = round(stats['bbox'][2], 4), round(stats['segm'][2], 4)

        # update the learning rate
        if train_params['scheduler'] == 'plateau':
            lr_scheduler.step(mAPseg)

        else:
            lr_scheduler.step()

        stats_log['mAPb'].append(mAPb)
        stats_log['mAPseg'].append(mAPseg)
        stats_log['mAPb50'].append(mAPb50)
        stats_log['mAP50seg'].append(mAP50seg)
        stats_log['mAPb75'].append(mAPb75)
        stats_log['mAP75seg'].append(mAP75seg)
        stats_log['train_loss'].append(loss)

        if mAPseg > best_mAPs:
            best_mAPs = mAPseg
            # Save the best weights for test
            check_point = {'exp': ex, 'model': model.state_dict(),
                           'epoch': epoch + 1,
                           'optimizer_dict': optimizer.state_dict(),
                           'lr': train_params['lr'], 'scheduler': lr_scheduler.state_dict(),
                           'test_idx': test_idx}

            torch.save(check_point, os.path.join(train_params['output_dir'],
                                                 f'{net_name}.pth'))

        # Save weights to continue training if interrupted
        torch.save({'exp': ex, 'model': model.state_dict(), 'epoch': epoch + 1,
                    'optimizer_dict': optimizer.state_dict(),
                    'lr': train_params['lr'], 'scheduler': lr_scheduler.state_dict(),
                    'stats': stats_log, 'test_idx': test_idx},
                   os.path.join(train_params['output_dir'], f'{net_name}_resume.pth'))

    total_time = time.time() - epoch_start_time
    print('Training time {}'.format(time_minutes(total_time)))
    print('Best val_mAPseg: {0:.4f}'.format(best_mAPs))

    return stats_log, net_name, total_time


<h2>5. Training</h2>

In [None]:
exps =[i for i in range(1, 7)]
exp_models = sorted(['maskrcnn-resnet50-fpn'] * 6)
resized = [False, False] + [True] * 4
kernel_sizes = [None, None, (3, 3), (3,  3), (5, 5), (5,  5)]
custom_anchor = [False, True] * 3

In [None]:
setup = pd.DataFrame({'Exp':exps, 'Net': exp_models,  'resized': resized, 'sizes': kernel_sizes,
                      'custom_anchors': custom_anchor })

In [None]:
setup.loc[len(setup.index)] = [7, 'maskrcnn-mobilenet-v3', False , False, False]
setup.set_index('Exp', inplace=True)


<h3> <center> Experimental Design </center></h3>

<div align="center">

| Exp |               Net                | Resized </br>Kernel | Sizes | Customed </br>Anchors |
|-----|:--------------------------------:|:-------------------:|:-----:|:---------------------:|
| 1   |     Mask-RCNN-ResNet-50-FPN      |          -          |   -   |           -           |
| 2   |     Mask-RCNN-ResNet-50-FPN      |          -          |   -   |         True          |
| 3   |     Mask-RCNN-ResNet-50-FPN      |        True         | 3 x 3 |           -           |
| 4   |     Mask-RCNN-ResNet-50-FPN      |        True         | 3 x 3 |         True          |
| 5   |     Mask-RCNN-ResNet-50-FPN      |        True         | 5 x 5 |           -           |
| 6   |     Mask-RCNN-ResNet-50-FPN      |        True         | 5 x 5 |         True          |
| 7   | Mask-RCNN-MobileNet-v3-large-FPN |          -          |   -   |           -           |
</div>



### 5.2 Hyper-parameters selection and training



In [None]:
data_dir = os.path.join(os.getcwd(), 'merged_dataset')
out_dir = os.path.join(os.getcwd(), 'weights')

try:
    os.makedirs(out_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

# Extract the model configuration from the experimental setup dataframe
exp = 7
set_model = setup.loc[exp]['Net']
setup_resize = setup.loc[exp]['resized']
setup_sizes = setup.loc[exp]['sizes']
setup_anchors = setup.loc[exp]['custom_anchors']

schedulers = ['steps', 'plateau']

resume = False

params= {'exp': exp, 'model': set_model, 'num_classes': 2, 'resize': setup_resize,
         'size': setup_sizes, 'anchors': setup_anchors, 'data_dir' : data_dir,
         'output_dir' : out_dir, 'min_area': 10, 'batch_size': 4, 'lr' : 0.000015,
         'optimizer': 'AdamW', 'momentum':None, 'w_decay' : None, 'nesterov': False,
         'scheduler': schedulers[1], 'steps': None, 'gamma': 0.8,
         'num_epochs': 50, 'resume_training': resume, 'workers': 2}

if __name__ == '__main__':
    training_metrics, model_name, train_time = main(**params)

In [None]:
# Uncomment the two lines below only if run out of CUDA memory and re-run the cell above,
# alternatyve re-start the whole runtime and run the notebook
# gc.collect()
# torch.cuda.empty_cache()

<h2> 6. Results Visualisation </h2>

In [None]:
# Create the directory for the plots and images
figs_dir = os.path.join(os.getcwd(), 'figures')
try:
    os.makedirs(figs_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

In [None]:
# Get the best weights for testing
checkpoint_dir = os.path.join(out_dir, f'{model_name}.pth')
checkpoint = torch.load(checkpoint_dir)

In [None]:

training_stats(training_metrics, model_name, figs_dir=figs_dir)

<h2> 7. Evaluating the model </h2>

<h3> 7.1 Create and process the test dataset and create the dataloaders </h3>

In [None]:
dataset_path = params['data_dir']
test_indices = checkpoint['test_idx']
covid = CovidMerged(dataset_path, min_area=params['min_area'], trans=get_transform(train=False))
test_dataset = torch.utils.data.Subset(covid, test_indices)
test_loader = DataLoader(test_dataset, batch_size=4,
                    shuffle=False, num_workers=2, collate_fn=utils.collate_fn)

<h3> 7.2 Load the best training weights onto the model </h3>

In [None]:
model_ = get_model(params['model'], params['num_classes'], params['resize'], params['size'],
                   params['anchors'])

model_.load_state_dict(checkpoint['model'], strict=False)

In [None]:
_ , test_stats = evaluate(model_.to(device), test_loader, device=device)

<h3>7.3 Save metrics and  training setup </h3>

In [None]:
# Rounding the output of the metrics to 4 figures
for k,v in test_stats.items():
    test_stats[k] = np.round(v, 4)

In [None]:
pred_dir = os.path.join(os.getcwd(), 'predictions')
try:
    os.makedirs(pred_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

In [None]:
# Extract metrics from coco evaluator 0 is the overall range, 1 indicates IoU @0.50 and 2 IoU @0.75
metrics = {'AP bbox': test_stats['bbox'][0], 'AP bbox @0.50': test_stats['bbox'][1],
           'AP bbox @0.75': test_stats['bbox'][2], 'AP seg': test_stats['segm'][0],
           'AP seg @ 0.50' : test_stats['segm'][1], 'AP seg @0.75': test_stats['segm'][2]}

In [None]:
df_metrics = pd.DataFrame(metrics, index =[0])
df_metrics.insert(0, 'Exp', params['exp'])
df_metrics.insert(1, 'Date', datetime.date.today())
df_metrics.insert(2, 'Model', model_name)
df_metrics.insert(3, 'Epochs', params['num_epochs'])
df_metrics.insert(4, 'Optimizer', params['optimizer'])
df_metrics.insert(5, 'Learning rate', params['lr'])
df_metrics.insert(6, 'Scheduler', params['scheduler'])
df_metrics.insert(7, 'Steps', params['steps'])
df_metrics.insert(8, 'Gamma', params['gamma'])
df_metrics.insert(9, 'Best epoch', checkpoint['epoch'])

In [None]:
epochs = params['num_epochs']
df_metrics.to_csv(os.path.join(pred_dir,  f'{model_name}_{epochs}.csv'))
df_metrics.set_index('Exp', inplace=True)
df_metrics

<h2>8. Inference </h2>

<h3>8.1 Inference functions </h3>

In [None]:
def get_coloured_mask(mask, colour='blue'):
    """
    :param colour: str one of the colors blue, cyan and pink
    :param mask: a numpy array of the masks
    :return: the colour mask and the colour selected
    """
    # One color for all COVID instances
    if colour == 'blue':
        colour = [0, 0, 255]
    elif colour == 'cyan':
        colour = [0, 255, 255]
    elif colour == 'green':
        colour = [0, 204, 0]
    else:
        colour = [255, 0, 255]

    r = np.zeros_like(mask).astype(np.uint8)
    g = np.zeros_like(mask).astype(np.uint8)
    b = np.zeros_like(mask).astype(np.uint8)
    r[mask == 1], g[mask == 1], b[mask == 1] = colour
    coloured_mask = np.stack([r, g, b], axis=2)
    return coloured_mask, colour


In [None]:
def get_prediction(pred, threshold, msk_threshold):
    """
    :param pred: a dictionary of tensors with the predictions scores, masks and
    detection boxes for one image
    :param threshold: a float to threshold the probability score
    :param msk_threshold: a float to threshold the segmentation masks
    :return: numpy arrays of the thresholded masks, boxes and scores
    """

    pred_score = list(pred['scores'].detach().cpu().numpy())
    
    # Get the score indices
    pred_th = [pred_score.index(s) for s in pred_score if s > threshold][-1]
    score = [s for s in pred_score if s > threshold]
    # Threshold the masks and squeeze the batch dimmension
    masks = (pred['masks']>msk_threshold).squeeze(1).detach().cpu().numpy()
    pred_boxes = [[(int(i[0]), int(i[1])), (int(i[2]), int(i[3]))] 
                  for i in list(pred['boxes'].detach().cpu().numpy())]
    masks = masks[:pred_th+1]
   pred_boxes = pred_boxes[:pred_th+1]

    return masks, pred_boxes, score

def inst_segment(img, pred, threshold=0.6, mask_thr=0.6, rect_th=2, text_size=2, text_th=2,
                 mask_out=True, colour='blue'):
    """
    :param img: a numpy array with image
    :param pred: a numpy array of the prediction
    :param threshold: a float to threshold the detection boxes
    :param mask_thr:  a float to threshold the mask predictions
    :param rect_th:  a float of the boxes boundaries
    :param text_size: a float or integer with the size of the font
    :param text_th:  a float indicating the ticknes of the font
    :param mask_out: a boolean  indicating wheteher only mask will be shown on the inference
    :param colour: a string of the mask and boxes colour
    :return: images with mask predictions
    """

    masks, boxes, scores = get_prediction(pred, threshold, msk_threshold=mask_thr)

    for i in range(len(masks)):
        rgb_mask, rgb_colour = get_coloured_mask(masks[i], colour=colour)
        # image, alpha, segmentation_map, beta, gamma)
        img = cv2.addWeighted(np.array(img), 1, rgb_mask, 1, 0.2)
        if not mask_out:
            cv2.rectangle(img, boxes[i][0], boxes[i][1], color=rgb_colour, thickness=rect_th)
            cv2.putText(img, str(np.round(scores[i],2)), boxes[i][0], cv2.FONT_HERSHEY_PLAIN,
                      text_size, rgb_colour, thickness=text_th , lineType=cv2.LINE_AA)
    return img

In [None]:
def show_predictions(model, loader, threshold=0.7, msk_thres=0.6, mask_out=True,
                     outdir=None, fname=None, colour='blue'):
    """
    :param model: model to evaluate
    :param loader:  constructor with tensors of the test images and targets
    :param threshold: float to select the boxes
    :param msk_thres: float to threshold the masks
    :param mask_out: boolean to show only mask or boxes and masks
    :param outdir: str directory to save the inference
    :param fname: str name to save the inference
    :param colour: a string of the mask and boxes colour
    :return: save and show predictions and probability scores on the test images
    """

    fig = plt.figure(figsize=(12, 6))
    model = model
    model.eval()

    images, targets = next(iter(loader))
    with torch.no_grad():
        predictions = model(images)
    for i, image in enumerate(images):
        fig.add_subplot(len(images) // 4, 4, i + 1, xticks=[], yticks=[])
        img = torchvision.transforms.ToPILImage()(image)
        img = inst_segment(np.array(img), predictions[i], threshold, mask_thr=msk_thres,
                           mask_out=mask_out,
                           colour=colour)
        img_id = targets[i]['image_id'].item()
        name = covid.get_name(img_id)
        plt.imshow(img)
        plt.xlabel(name, color='blue', fontsize=12)
    plt.tight_layout()

    if outdir is not None:
        fname = f'predictions_{fname}.png'
        return plt.savefig(os.path.join(outdir, fname), bbox_inches='tight',
                            format='png', dpi=300)

In [None]:
show_predictions(model_.to('cpu'), test_loader, threshold=0.70, msk_thres=0.6, mask_out=True,
                 outdir=figs_dir, fname=model_name, colour='blue')