In [1]:
import json

import numpy as np
import torch
from shapely import Polygon
from torch.utils.data import DataLoader
from tqdm import tqdm

from datasets.SlideSeperatedDataset import SlideSeperatedDataset
from utils import divide


In [2]:
slides_root_dir = "data/whole-slides/gut"
annotations_root_dir = "data/annotations/json"
candidates_dataset_dir = "output/candidates"
model_output_dir = "output/models"

In [3]:
load_dict = torch.load(f"{model_output_dir}/model.pickle")
model = load_dict["model"]
train_dict = load_dict["train_slides"]
test_slides = load_dict["test_slides"]

In [4]:

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

batch_size = 256
test_dataset = SlideSeperatedDataset(candidates_dataset_dir, test_slides, with_index=True)
# test_dataset = reduce_dataset(test_dataset, discard_ratio=0)
test_loader = DataLoader(test_dataset,
                         batch_size=batch_size,
                         shuffle=False, )

print(f"Candidates: {len(test_dataset):,}")

Device: cuda:0
Candidates: 29,265


In [5]:

from pathlib import Path

model.eval()
indexes = []
predictions = []
with torch.no_grad():
    for i, (x_test, y_test, index) in enumerate(tqdm(iter(test_loader), desc=f"Testing")):
        x_test = x_test.to(device)
        y_test = y_test.to(device)
        test_logits = model.forward(x_test)
        test_loss = model.loss_function(test_logits, y_test)
        test_preds = model.predict(test_logits)
        indexes.append(index)
        predictions.append(test_preds.squeeze())
indexes = torch.cat(indexes).to("cpu")
predictions = torch.cat(predictions).to("cpu")
predicted_positives = indexes[predictions == 1]


Testing: 100%|██████████| 115/115 [00:39<00:00,  2.89it/s]


In [7]:
predicted_positive_bboxes_by_slide = {}
for item_index in predicted_positives:
    file_path = test_dataset.get_item_file_path(item_index)
    file_name = Path(file_path).stem
    slide, x_min, y_min, width, height = file_name.split("_")
    x_min, y_min, width, height = int(x_min), int(y_min), int(width), int(height)
    if not slide in predicted_positive_bboxes_by_slide:
        predicted_positive_bboxes_by_slide[slide] = []
    predicted_positive_bboxes_by_slide[slide].append((x_min, y_min, width, height))

In [12]:
from itertools import product


def calculate_iou(poly1, poly2):
    intersection = poly1.intersection(poly2).area
    union = poly1.union(poly2).area
    return intersection / union if union > 0 else 0


def calculate_metrics(confusion_matrix):
    tp, fp, fn = confusion_matrix["TP"], confusion_matrix["FP"], confusion_matrix["FN"]
    precision = divide(tp, (tp + fp))
    recall = divide(tp, (tp + fn))
    f1 = divide(2 * precision * recall, (precision + recall))
    return precision, recall, f1


def calculate_iou_confusion_matrix(ground_truth_polygons, predicted_bboxes, iou_threshold=0.5):
    gt_polys = [Polygon(pts).buffer(0) for pts in ground_truth_polygons]
    pred_polys = [Polygon([(x, y), (x + w, y), (x + w, y + h), (x, y + h)]).buffer(0) for x, y, w, h in
                  predicted_bboxes]

    iou_matrix = np.zeros((len(gt_polys), len(pred_polys)))

    for i, gt in enumerate(gt_polys):
        for j, pred in enumerate(pred_polys):
            iou_matrix[i, j] = calculate_iou(gt, pred)

    matched_gt = set()
    matched_pred = set()

    for i, j in product(range(len(gt_polys)), range(len(pred_polys))):
        if iou_matrix[i, j] > iou_threshold:
            matched_gt.add(i)
            matched_pred.add(j)

    TP = len(matched_gt)
    FP = len(pred_polys) - len(matched_pred)
    FN = len(gt_polys) - len(matched_gt)

    return {
        "TP": TP,
        "FP": FP,
        "FN": FN
    }


total_confusion_matrix = {
    "TP": 0,
    "FP": 0,
    "FN": 0
}
for slide in test_slides:
    with open(f"data/annotations/json/{slide}.json") as f:
        ground_truth_positive_annotations = json.load(f)
    predicted_positive_bboxes = predicted_positive_bboxes_by_slide.get(slide, [])
    confusion_matrix = calculate_iou_confusion_matrix(ground_truth_positive_annotations, predicted_positive_bboxes)
    tp, fp, fn = confusion_matrix["TP"], confusion_matrix["FP"], confusion_matrix["FN"]
    precision, recall, f1 = calculate_metrics(confusion_matrix)

    total_confusion_matrix["TP"] += tp
    total_confusion_matrix["FP"] += fp
    total_confusion_matrix["FN"] += fn

    print(f"{slide}: precision: {precision:.6f}, recall: {recall:.6f}, f1: {f1:.6f}")
total_precision, total_recall, total_f1 = calculate_metrics(total_confusion_matrix)
print()
print(f"Overall: precision: {total_precision:.6f}, recall: {total_recall:.6f}, f1: {total_f1:.6f}")


593449: precision: 0.030120, recall: 0.022124, f1: 0.025510
593450: precision: 0.000000, recall: 0.000000, f1: 0.000000
593454: precision: 0.012579, recall: 0.100000, f1: 0.022346
593445: precision: 0.045752, recall: 0.070000, f1: 0.055336
593451: precision: 0.000000, recall: 0.000000, f1: 0.000000
593446: precision: 0.012195, recall: 0.200000, f1: 0.022989

Overall: precision: 0.017276, recall: 0.041262, f1: 0.024355
