## HubMap Training Notebook

### Setup

In [None]:
import os
import gc
import sys
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.cuda import amp
from torch.utils.data import Dataset, DataLoader

### Config

In [None]:
class CFG:
    LR = 1e-4
    EPOCHS = 50
    BATCH_SIZE = 32
    N_TRAIN = 1400 # Take first N_TRAIN images for training, rest for validation

### Dataloaders

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import json
from PIL import Image
from skimage.draw import polygon
from albumentations import Compose, Resize, HorizontalFlip, VerticalFlip, BboxParams



class HubmapDataset(Dataset):
        
    def __init__(self, image_dir, labels_file, n_train, mode='train'):
        
        assert mode in ['train', 'val'], "mode must be one of ['train', 'val']"
        self.mode = mode
        
        with open(labels_file, 'r') as json_file:
            self.json_labels = [json.loads(line) for line in json_file]

        if mode == 'train':
            self.json_labels = self.json_labels[:n_train]
        else:
            self.json_labels = self.json_labels[n_train:]

        self.image_dir = image_dir
        
        
        if mode == 'train':
            initial_augm = [
            
                
                A.HorizontalFlip(p=0.5),
                A.VerticalFlip(p=0.5),
                
                A.RandomBrightnessContrast(p=0.75),
                # A.CoarseDropout(max_holes=1, max_width=int(512 * 0.1), max_height=int(512 * 0.1), 
                                # mask_fill_value=0, p=0.5),
                
                
                A.OneOf([
                        A.GaussNoise(var_limit=[0.01, 0.05]),
                        A.GaussianBlur(blur_limit=(3, 5), sigma_limit=0),
                        A.MotionBlur(blur_limit=3),
                        ], p=0.4),
                A.MultiplicativeNoise(per_channel=True, multiplier=(0.95, 1.05)),
                
                
            ]
        else:
            initial_augm = []
        
        self.aug_list = initial_augm + [
                A.Resize(512, 512),
                A.Normalize(
                    mean= [0, 0, 0],
                    std= [1, 1, 1],
                    max_pixel_value = 255
                ),
                ToTensorV2(transpose_mask=True),
            ]
        
        # Create the augmentation pipeline
        self.augmentations = A.Compose(self.aug_list, bbox_params=BboxParams(format='pascal_voc'))

    def __len__(self):
        return len(self.json_labels)
        
    def __getitem__(self, idx):
        
        image_path = os.path.join(self.image_dir, f"{self.json_labels[idx]['id']}.tif")
        image = Image.open(image_path)
        
        
        # Get the mask
        mask = np.zeros((512, 512), dtype=np.float32)
        bounding_boxes = []
        
        for annot in self.json_labels[idx]['annotations']:
            cords = annot['coordinates']
            if annot['type'] == "blood_vessel":
                for cord in cords:
                    rr, cc = polygon(np.array([i[1] for i in cord]), np.asarray([i[0] for i in cord]))
                    mask[rr, cc] = 1
                    
                    # compute bounding box
                    x_min = np.min(np.array([i[0] for i in cord]))
                    x_max = np.max(np.array([i[0] for i in cord]))
                    y_min = np.min(np.array([i[1] for i in cord]))
                    y_max = np.max(np.array([i[1] for i in cord]))
                    class_label = 0
                    bounding_boxes.append([x_min, y_min, x_max, y_max, class_label])
                    
        image = np.array(image)

        # image = torch.tensor(np.array(image), dtype=torch.float32).permute(2, 0, 1)  # Shape: [C, H, W]
        # mask = torch.tensor(mask, dtype=torch.float32)


        if self.mode == 'train':
            augmented = self.augmentations(image=image, mask=mask, bboxes=bounding_boxes)
            image, mask, bboxes = augmented["image"], augmented["mask"], augmented["bboxes"]
            
            bboxes = np.array(bboxes)

            return image, mask, bboxes
        else:
            augmented = self.augmentations(image=image)
            image = augmented["image"]

            return image

In [None]:
train_dataset = HubmapDataset(image_dir="/home/viktor/Documents/kaggle/hubmap-2023/kaggle-data/train", 
                              labels_file="/home/viktor/Documents/kaggle/hubmap-2023/kaggle-data/polygons.jsonl", 
                              n_train=CFG.N_TRAIN,
                              mode='train')

In [None]:
import cv2
import numpy as np

def seg_to_det(
    seg: np.ndarray, 
    seg_thresh: float
):
    num_outputs, labels, stats, centroids = cv2.connectedComponentsWithStats((seg > seg_thresh).astype(np.uint8)*255, 8)
    boxes = stats[:, [cv2.CC_STAT_LEFT, cv2.CC_STAT_TOP, cv2.CC_STAT_WIDTH, cv2.CC_STAT_HEIGHT]]
    label_masks = [labels == i for i in range(num_outputs)]
    dets = {
        "boxes": np.stack([
            boxes[:, 0],
            boxes[:, 1],
            boxes[:, 0] + boxes[:, 2],
            boxes[:, 1] + boxes[:, 3],
        ], axis=1),
        "masks": [seg * m for m in label_masks],
    }
    dets["scores"] = [np.mean(seg[m]) for m in label_masks]
    
    # remove dets element where 'boxes' = [0, 0, 512, 512]
    boxes_to_remove = [0, 0, 512, 512]
    indices_to_remove = np.where(np.all(dets["boxes"] == boxes_to_remove, axis=1))
    
    dets["boxes"] = np.delete(dets["boxes"], indices_to_remove, axis=0)
    dets["masks"] = [i for j, i in enumerate(dets["masks"]) if j not in indices_to_remove]
    dets["scores"] = np.delete(dets["scores"], indices_to_remove)
    
    return dets


In [None]:
# visualize the data
image, mask, bbox = train_dataset[2]

plt.figure(figsize=(16, 8))
plt.subplot(1, 2, 1)
plt.imshow(image.permute(1, 2, 0))

plt.subplot(1, 2, 2)
plt.imshow(mask)


plt.subplot(1, 2, 2)
plt.imshow(0.5 * image.permute(1, 2, 0)[:, :, 0] + 0.6 * mask)


# draw bounding boxes
for box in bbox:
    x_min, y_min, x_max, y_max, class_label = box
    # plt.plot([x_min, x_max, x_max, x_min, x_min], [y_min, y_min, y_max, y_max, y_min], linewidth=2, color='r')



dets = seg_to_det(mask.numpy(), 0.5)
bboxes = dets['boxes']

# draw bounding boxes
for box in bboxes:
    x_min, y_min, x_max, y_max = box
    plt.plot([x_min, x_max, x_max, x_min, x_min], [y_min, y_min, y_max, y_max, y_min], linewidth=2, color='r')



# plot together with the original image

# Competition metric

In [None]:
def calculate_iou(pred_box, gt_box):
    """
    Calculate the intersection over union (IoU) between a predicted bounding box and a ground truth bounding box.

    Args:
        pred_box (list or np.array): Predicted bounding box of format [x1, y1, x2, y2].
        gt_box (list or np.array): Ground truth bounding box of format [x1, y1, x2, y2].

    Returns:
        float: The intersection over union (IoU) between the predicted bounding box and the ground truth bounding box.
    """
    x1_pred, y1_pred, x2_pred, y2_pred = pred_box
    x1_gt, y1_gt, x2_gt, y2_gt = gt_box

    xi1 = max(x1_pred, x1_gt)
    yi1 = max(y1_pred, y1_gt)
    xi2 = min(x2_pred, x2_gt)
    yi2 = min(y2_pred, y2_gt)

    inter_area = max(xi2 - xi1, 0) * max(yi2 - yi1, 0)

    box1_area = (x2_pred - x1_pred) * (y2_pred - y1_pred)
    box2_area = (x2_gt - x1_gt) * (y2_gt - y1_gt)

    union_area = box1_area + box2_area - inter_area

    iou = inter_area / union_area

    return iou

def calculate_mAP(pred_boxes, gt_boxes, thresh=0.6, scores=None):
    """
    Calculate mAP.

    Args:
        pred_boxes (np.array): Array of predicted bounding boxes of shape (n_boxes, 4).
        gt_boxes (np.array): Array of ground truth bounding boxes of shape (n_boxes, 4).
        thresh (float, optional): Threshold value for IoU. Defaults to 0.6.
        scores (list or np.array, optional): Array of confidence scores associated with predicted bounding boxes.

    Returns:
        float: Average precision over all thresholds.
        list: List of precision values for each threshold.
        list: List of recall values for each threshold.
        np.array: Matrix of IoU scores for each pair of predicted and ground truth bounding boxes.
    """
    image_precision = 0.0

    # calculate IoUs
    ious = np.zeros((len(pred_boxes), len(gt_boxes)))
    for i, pred_box in enumerate(pred_boxes):
        for j, gt_box in enumerate(gt_boxes):
            ious[i, j] = calculate_iou(pred_box, gt_box)

    # sort pred_boxes and ious by scores in descending order
    indices = np.argsort(scores)[::-1]
    pred_boxes = pred_boxes[indices]
    ious = ious[indices]

    gt_match = []
    pred_match = []
    _gt_match = []
    _pred_match = []
    for i, pred_box in enumerate(pred_boxes):
        matched = False
        for j, gt_box in enumerate(gt_boxes):
            if ious[i, j] > thresh:
                if j not in _gt_match and i not in _pred_match:
                    _gt_match.append(j)
                    _pred_match.append(i)
                    matched = True
                    break

        if not matched:
            _pred_match.append(i)

    gt_match.append(_gt_match)
    pred_match.append(_pred_match)

    n_gt = len(gt_boxes)
    precisions = []
    recalls = []
    for i, _gt_match in enumerate(gt_match):
        tp = len(_gt_match)
        fp = len(pred_match[i]) - tp
        fn = n_gt - tp

        precision = tp / (tp + fp)
        recall = tp / (tp + fn)

        precisions.append(precision)
        recalls.append(recall)

        image_precision += precision 

    return image_precision, precisions, recalls, ious


In [None]:
# Example usage


# Your provided data
dets = {
    'boxes': np.array([[204, 0, 140, 16], [330, 283, 369, 374], [505, 452, 512, 484], [0, 0, 100, 100]], dtype=np.int32),
    'scores': np.array([1., 1., 1., 0.9], dtype=np.float32)
}


gt_boxes = np.array([[331., 284., 369., 374., 0.], [506., 453., 512., 484., 0.], [105., 1., 140., 16., 0.]])

# Convert the format of the ground truth boxes to match the format of the predicted boxes
gt_boxes = gt_boxes[:, :4]

# Calculate AP for a single IoU threshold
ap, precisions, recalls, overlaps = calculate_mAP(dets['boxes'], gt_boxes, thresh=0.6, scores=dets['scores'])

print(f"Average precision: {ap}")

# FBeta metric

In [None]:
# ref - https://www.kaggle.com/competitions/vesuvius-challenge-ink-detection/discussion/397288
def fbeta_score(preds, targets, threshold, beta=1.0, smooth=1e-5):
    preds_t = torch.where(preds > threshold, 1.0, 0.0).float()
    y_true_count = targets.sum()
    
    ctp = preds_t[targets==1].sum()
    cfp = preds_t[targets==0].sum()
    beta_squared = beta * beta

    c_precision = ctp / (ctp + cfp + smooth)
    c_recall = ctp / (y_true_count + smooth)
    dice = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall + smooth)

    return dice