# Refinement step

Once the model is trained, this Jupyter notebook includes the code to perform the refinement step to obtain better bounding boxes. It also includes some benchmarking about the performance of the best models with and without the refinement step and deblurGAN.

Download the models from: https://zenodo.org/records/10231845/files/models.zip?download=1.

## Imports

In [1]:
from torch.utils.data import Dataset
import os, random, time
import cv2 as cv2
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import numpy as np

import torch
import torchvision
from torchvision import transforms
from torchvision import io
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN, fasterrcnn_resnet50_fpn_v2, fasterrcnn_resnet50_fpn, fasterrcnn_mobilenet_v3_large_320_fpn
from torchvision.models.detection.rpn import AnchorGenerator
import torchvision.transforms.functional as F_vision
from torchvision.utils import make_grid
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SequentialSampler

from tqdm.notebook import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
import albumentations.pytorch
import pandas as pd
from PIL import Image
import rawpy
import imageio
import math
from pathlib import Path
from os.path import isfile, join
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

## Parameters

Here one should select the model

In [2]:
bbox_params = A.BboxParams(format = 'pascal_voc',
         min_visibility = 0.6,
         label_fields = ['labels'])

BOXES_FOLDER = "dataset/labels/"

# One should adjust the following parameters according to the chosen model. 
# Ideally, they should have been obtained automatically, but it is not the case.

CROP_X, CROP_Y = 224, 224
THRESHOLD_CONFIDENCE = 0.50
DEBLURGAN = True


device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
print(device)

# Choose the corresponding model:
model = fasterrcnn_resnet50_fpn(num_classes=2)
# backbone = resnet_fpn_backbone(backbone_name="resnet18", weights="DEFAULT")
# model = FasterRCNN(backbone, num_classes=2)   

# Load the state of the model:
model.to(device)
model.load_state_dict(torch.load("models/0094_fasterrcnn_resnet50_fpn_deblurGAN/modelo.bin"))
# model.load_state_dict(torch.load("models/0070_fasterrcnn_resnet50_fpn_th50/modelo.bin"))
# model.load_state_dict(torch.load("models/0105_resnet18_fpn_th50_deblurGAN/modelo_7752.bin")) 
# model.load_state_dict(torch.load("models/0096_resnet18_fpn_th50/modelo.bin")) 
# model.load_state_dict(torch.load("models/0103_resnet152_fpn_th50/modelo.bin")) 
# model.load_state_dict(torch.load("models/0104_resnet152_fpn_th50_deblurGAN/modelo.bin")) 


cuda:0


<All keys matched successfully>

In [3]:
# To parse annotations
import xml.etree.ElementTree as ET
import torchvision.transforms.functional as FT

# Label map
voc_labels = ('varroa', 'pupe')
label_map = {k: v + 1 for v, k in enumerate(voc_labels)}
label_map['background'] = 0
rev_label_map = {v: k for k, v in label_map.items()}  # Inverse mapping

def parse_annotation(annotation_path):
    tree = ET.parse(annotation_path)
    root = tree.getroot()

    boxes = list()
    labels = list()
    difficulties = list()
    for object in root.iter('object'):

        difficult = int(object.find('difficult').text == '1')

        label = object.find('name').text.lower().strip()
        if label not in label_map:
            continue

        bbox = object.find('bndbox')
        xmin = int(bbox.find('xmin').text) - 1
        ymin = int(bbox.find('ymin').text) - 1
        xmax = int(bbox.find('xmax').text) - 1
        ymax = int(bbox.find('ymax').text) - 1

        boxes.append([xmin, ymin, xmax, ymax])
        labels.append(label_map[label])
        difficulties.append(difficult)
    return {'boxes': boxes, 'labels': labels, 'difficulties': difficulties}


def predict_image_batch(images, threshold=THRESHOLD_CONFIDENCE):
    output=[]
    scores = []
    final_boxes = []
    with torch.no_grad():
        preds = model(images)
        for npred, pred_uniq in enumerate(preds):
            #preds_prob.append(pred_uniq['scores'].detach().cpu().numpy())
            cuales = (pred_uniq['scores']>threshold).detach().cpu().numpy() 
            boxes = pred_uniq['boxes'].cpu().numpy().astype(np.int32)
            img = np.ascontiguousarray(images[npred].permute(1,2,0).detach().cpu().numpy(), dtype=np.float32)
            img_cp = img.copy()
            num_varroas = np.sum(cuales)
            aux_boxes=[]
            if num_varroas>0:
                for nbox, box in enumerate(boxes[cuales]):
                    cv2.rectangle(img,(box[0], box[1]),(box[2], box[3]),(255, 0, 0),2)
                    aux_boxes.append((box[0], box[1], box[2], box[3]))
                    #fig = plt.figure(figsize=(5, 5))
                #plt.imshow(img)
            final_boxes.append(aux_boxes)
            s = pred_uniq['scores'].cpu().numpy()[cuales]
            scores.append(s)
            output.append(img*255)
    return output, final_boxes, scores



def predict_image(input_image_name, model):

    model.eval()
    preds_prob = []
    targets_num = []

    valid_imagenes=[]
    positions=[] 

    if input_image_name.endswith(".DNG"):
        with rawpy.imread(input_image_name) as raw:
            input_image = raw.postprocess(use_camera_wb=True, use_auto_wb=False, output_color=rawpy.ColorSpace.sRGB)
            input_image = input_image.astype(np.float32)
            input_image /= 255.0
            input_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR)
    else:
        input_image = cv2.imread(input_image_name)
        input_image = input_image.astype(np.float32)
        input_image /= 255.0
        input_image = np.array(input_image)

    pos_x, pos_y = 0, 0

    size_img_y, size_img_x, _ = input_image.shape

    image_orig = input_image
    boxes_orig = []
    labels = torch.ones(0, dtype=torch.int64)
    while True:
        x_max = pos_x+CROP_X
        y_max = pos_y+CROP_Y
        if x_max>=size_img_x: 
            x_max = size_img_x
        if y_max>=size_img_y:
            y_max = size_img_y

        transform = A.Compose([A.Crop(x_min = pos_x, y_min=pos_y, x_max=x_max, y_max=y_max),
                               ToTensorV2()],
                               bbox_params=bbox_params)
        result_transform = transform(image=image_orig, bboxes=boxes_orig, labels=labels)
        #result_transform = transform(image=image_orig)
        valid_imagenes.append(result_transform['image'])
        positions.append((pos_x, pos_y))
        pos_x += CROP_X
        if pos_x>=size_img_x:
            pos_x = 0
            pos_y += CROP_Y
            # Sal
            if pos_y >= size_img_y:
                break

    # Number of batches. It should be a divisor of the total number of images and as smaller as possible (to 
    # use as much VRAM as possible)
    n_batch = 18 
    parte = len(valid_imagenes)//n_batch # This would be the batch size
    trozos=[]
    final_boxes = []
    scores = []
    for t in range(n_batch):
        images = list(image.to(device) for image in valid_imagenes[t*parte:(t+1)*parte])
        r, bb, s = predict_image_batch(images, threshold=THRESHOLD_CONFIDENCE)
        final_boxes.extend(bb)
        scores.extend(s)
        trozos+=r

    bboxes = []

    for ((x,y), b) in zip(positions, final_boxes):
        if b == []: 
            bboxes.append([])
        else:
            aux_boxes = []

            for (x_min, y_min, x_max, y_max) in b:
                aux_boxes.append((x_min + x , y_min + y,  x_max + x, y_max + y))
            bboxes.append(aux_boxes)

    resultado=[]

    nrows = math.ceil(size_img_y / CROP_Y)
    ncols = math.ceil(size_img_x / CROP_X)

    i=0
    for row in range(nrows):
        indices = [i + j for j in range(ncols)]
        imagenes_fila = [trozos[i] for i in indices]
        i=(row+1)*ncols
        imagen_final_fila = cv2.hconcat(imagenes_fila)
        resultado.append(imagen_final_fila)

    resultado = cv2.vconcat(resultado)
    name = Path(input_image_name).stem 

    return positions, bboxes, scores


def predict_small_image(input_image, model):
    model.eval()
    transform = A.Compose([ToTensorV2()])
    result_transform = transform(image=input_image)
    images = []
    images.append(result_transform['image'])
    image = list(image.to(device) for image in images)
    with torch.no_grad():
        preds = model(image)
        for npred, pred_uniq in enumerate(preds):
            cuales = (pred_uniq['scores']>0.50).detach().cpu().numpy() 
            boxes = pred_uniq['boxes'].cpu().numpy().astype(np.int32)
            scores = pred_uniq['scores'].cpu().numpy()
            return boxes[cuales], scores[cuales]


def bounding_boxes_intersection_area (bb1, bb2):
  # determine the (x, y)-coordinates of the intersection rectangle
    xA = max(bb1[0], bb2[0])
    yA = max(bb1[1], bb2[1])
    xB = min(bb1[2], bb2[2])
    yB = min(bb1[3], bb2[3])

  # compute the area of intersection rectangle
    interArea = max(xB - xA, 0) * max(yB - yA, 0)
    return interArea

def improve_bboxes(input_image, positions, bboxes, scores):
    size_img_y, size_img_x, _ = input_image.shape
    bboxes2 = []
    new_boxes = []
    new_scores = []
    for i, (pos, pos_bbs) in enumerate(zip(positions,bboxes)):
        pos_bbs2 = []
        pos_x, pos_y = pos
        x_max = pos_x+CROP_X
        y_max = pos_y+CROP_Y
        if x_max>=size_img_x: 
            x_max = size_img_x
        if y_max>=size_img_y:
            y_max = size_img_y
        for j, bb in enumerate(pos_bbs):
            bbx_min, bby_min, bbx_max, bby_max = bb
            if not (((bbx_min <= pos_x + 10) and (bbx_min != 0)) or ((bby_min <= pos_y +10) and (bby_min != 0)) or
              ((bbx_max >= x_max-10) and (bbx_max != size_img_x)) or ((bby_max >= y_max-10) and (bby_max != size_img_y))):
                pos_bbs2.append(bb)
                new_scores.append(scores[i][j])
            else:
                center_x = (bbx_min + bbx_max) // 2
                center_y = (bby_min + bby_max) // 2
                new_x_min = center_x - (CROP_X // 2)
                new_x_max = center_x + (CROP_X // 2)
                new_y_min = center_y - (CROP_Y // 2)
                new_y_max = center_y + (CROP_Y // 2)
                if new_x_min<=0:
                    new_x_min = 0
                if new_y_min <= 0:
                    new_y_min = 0
                if new_x_max>=size_img_x:
                    new_x_max = size_img_x
                if new_y_max>=size_img_y:
                    new_y_max = size_img_y
                new_crop = input_image[new_y_min:new_y_max,new_x_min:new_x_max]
                new_boxes_predict, new_scores_predict = predict_small_image(new_crop, model)

                for box, new_score in zip(new_boxes_predict, new_scores_predict):
                    new_box = (box[0]+new_x_min,box[1]+new_y_min,box[2]+new_x_min,box[3]+new_y_min)
                    interArea = bounding_boxes_intersection_area(new_box,bb)
                    bbArea = abs((bb[2] - bb[0]) * (bb[3] - bb[1]))
                    new_box_add = False
                    if (interArea >= bbArea*0.5):
                        new_box_add = True
                        for bb2 in new_boxes:
                            interArea2 = bounding_boxes_intersection_area(bb2,new_box)
                            bb2Area = abs((bb2[2] - bb2[0]) * (bb2[3] - bb2[1]))
                            if (interArea2 >= 0.5*bb2Area):
                                new_box_add = False

                    if new_box_add:
                        pos_bbs2.append(new_box)
                        new_boxes.append(new_box)
                        new_scores.append(new_score)

        bboxes2.append(pos_bbs2)
    return bboxes2, new_scores


In [4]:
from torchmetrics.detection import MeanAveragePrecision

def compute_metric(images):
    all_images_preds_new = []
    all_images_preds = []

    all_targets=[]
    metric_new_boxes = MeanAveragePrecision(iou_type="bbox", iou_thresholds=[0.50])
    metric_old_boxes = MeanAveragePrecision(iou_type="bbox", iou_thresholds=[0.50])

    for image in images:
        input_image_name = image
        positions, bboxes, scores = predict_image(input_image_name, model)
        input_image = cv2.imread(input_image_name)
        input_image = input_image.astype(np.float32)
        input_image /= 255.0
        input_image = np.array(input_image)        
        # Boxes without improvement
        bboxes_old = [item for sublist in bboxes for item in sublist]
        scores_old = [item for sublist in scores if sublist.size!=0 for item in sublist]    
        all_images_preds.append(dict(boxes=torch.tensor(bboxes_old), scores=torch.tensor(scores_old), labels=torch.tensor([1]*len(bboxes_old))))

        # Boxes after the refinement step
        bboxes_new, scores_new = improve_bboxes(input_image, positions, bboxes, scores)
        bboxes_new = [item for sublist in bboxes_new for item in sublist]
        all_images_preds_new.append(dict(boxes=torch.tensor(bboxes_new), scores=torch.tensor(scores_new), labels=torch.tensor([1]*len(bboxes_new))))

        # Original boxes
        input_labels_name = BOXES_FOLDER + Path(input_image_name).stem + ".xml"
        a = parse_annotation(input_labels_name)
        all_targets.append(dict(boxes=torch.tensor(a['boxes']),labels=torch.tensor(a['labels'])))

    metric_new_boxes.update(all_images_preds_new, all_targets)
    metric_new_boxes = metric_new_boxes.compute()
    metric_old_boxes.update(all_images_preds, all_targets)
    metric_old_boxes = metric_old_boxes.compute()
    return metric_new_boxes, metric_old_boxes

In [5]:
df = pd.read_csv('df_dataset.csv')
df_posic = df[['file_image', 'name', 'file_boxes', 'pos_img', 'is_train', 'size_img_x', 'size_img_y']].drop_duplicates().sort_values('pos_img').reset_index(drop=True)
lista_names_imagenes = df_posic['file_image'].values
numero_imagen = dict(zip(lista_names_imagenes, np.arange(len(lista_names_imagenes))))
df_posic['pos_img'] = df_posic['file_image'].map(numero_imagen)
df['pos_img'] = df['file_image'].map(numero_imagen)

input_images_names = list(df_posic.loc[df_posic['is_train']==False]['file_image'])
if DEBLURGAN:
    input_images_names = ["dataset/images_deblurGAN/"+Path(file_image).stem +".jpg" for file_image in input_images_names]

In [6]:
init = time.time()
metric_new, metric_old = compute_metric(input_images_names)    
final_time = time.time() - init
print("Time:", final_time, "seconds")
print("Without prediction refinement: map50 =", metric_old["map_50"], "mar100 =", metric_old["mar_100"])
print("With prediction refinement: map50 =", metric_new["map_50"], "mar100 =", metric_new["mar_100"])

Time: 313.0843665599823 seconds
Without prediction refinement: map50 = tensor(0.7764) mar100 = tensor(0.8476)
With prediction refinement: map50 = tensor(0.9073) mar100 = tensor(0.9703)


**0096_resnet18_fpn_th50**
- Without prediction refinement: map50 = tensor(0.7797) mar100 = tensor(0.8476)
- With prediction refinement: map50 = tensor(0.8494) mar100 = tensor(0.9480)

**0105_resnet18_fpn_th50_deblurGAN**
- Time: 224.897 seconds
- Without prediction refinement: map50 = tensor(0.7752) mar100 = tensor(0.8513)
- With prediction refinement: map50 = tensor(0.8832) mar100 = tensor(0.9554)

**0070_fasterrcnn_resnet50_fpn_th50**
- Without prediction refinement: map50 = tensor(0.7765) mar100 = tensor(0.8439)
- With prediction refinement: map50 = tensor(0.8578) mar100 = tensor(0.9628)

**0094_fasterrcnn_resnet50_fpn_deblurGAN**
- Time: 316.895 seconds
- Without prediction refinement: map50 = tensor(0.7798) mar100 = tensor(0.8439)
- With prediction refinement: map50 = tensor(0.9073) mar100 = tensor(0.9665)

**0103_resnet152_fpn_th50**
- Time: 637.326 seconds
- Without prediction refinement: map50 = tensor(0.7939) mar100 = tensor(0.8513)
- With prediction refinement: map50 = tensor(0.8944) mar100 = tensor(0.9628)

**0104_resnet152_fpn_th50_deblurGAN**
- Time: 595.263 seconds
- Without prediction refinement: map50 = tensor(0.7086) mar100 = tensor(0.8067)
- With prediction refinement: map50 = tensor(0.8514) mar100 = tensor(0.9405)