In [None]:
# Some basic setup:
# Setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# import some common libraries
import numpy as np
from PIL import Image
from datetime import datetime
import matplotlib.pyplot as plt
from skimage import measure
import os, json, cv2, random, pathlib, shutil

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.structures import BoxMode
from detectron2.utils.visualizer import ColorMode

In [None]:
CATEGORIES = ["Open", "close", "Unknown"]
COLORS = {
    "Open": (0, 255, 0),  # Green
    "close": (255, 0, 0),  # Red
    "Unknown": (0, 0, 255)  # Blue
}

def random_color():
    return [random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]

def filter_annotations(img_dir, label_filename):
    r""" Filter annotations by existing image files in `img_dir` folder
    
    Returns a dictionary with fields:
        "image": image file path
        "annotations": annotation JSON dict with Label studio format. E.g.,
            {
                "original_width": 1920,
                "original_height": 1280,
                "image_rotation": 0,
                "value": {
                    "x": 3.1,
                    "y": 8.2,
                    "radiusX": 20,
                    "radiusY": 16,
                    "ellipselabels": ["Car"]
                }
            }
    """
    orig_annotations = None
    with open(label_filename, "r") as f:
        orig_annotations = json.load(f)

    annotations = []
    for annotation in orig_annotations:
        # Process the label file, remove the first 9 random charactors 
        img_filename = os.path.basename(annotation["data"]["image"])[9:]
        
        # Check whether images exist with TIFF format
        filename, file_extension = os.path.splitext(img_filename)
        tif_filename = filename + '.tif'
        # if TIFF image file name starts with "2020", removes it
        if tif_filename.startswith("2020"):
            tif_filename = tif_filename[4:]
        tif_img_filepath = os.path.join(img_dir, tif_filename)
        
        if os.path.exists(tif_img_filepath):
            annotations.append({
                "image": tif_img_filepath,
                "annotations": annotation["annotations"][0]["result"]
            }) 
        else:
            print("Not exist: {}".format(tif_img_filepath))
    return annotations

def gen_ellipse_from_annotation(label):
    r"""
    Generate ellipse from Label Studio ellipse annotation
    Warning: this function only handles annotation with "image_rotation" = 0, otherwise, return False.

    Returns a tuple of
        - A bool flag to indicate whether the operation is successful
        - ellipse center (x, y)
        - ellipse axis (horizontal axis, vertical axis)
        - ellipse angle
        - category of the annotation

    """
    image_rotation = label["image_rotation"]
    if image_rotation != 0:
        return False, (0, 0), (0, 0), 0, ""
    img_w, img_h = label["original_width"], label["original_height"]
    rx, ry = label["value"]["radiusX"] * img_w / 100, label["value"]["radiusY"] * img_h / 100   # horizontal, verticle axies 
    # According to Label Studio, (x, y) coordinate of the top left corner before rotation (0, 100), but here it is actually the centre, weird.
    cx, cy = label["value"]["x"] * img_w / 100, label["value"]["y"] * img_h / 100
    angle = label["value"]["rotation"]  # clockwise degree
    category = label["value"]["ellipselabels"][0]
    return True, (cx, cy), (rx, ry), angle, category

def gen_polygon_from_annotation(label, delta=10):
    r""" 
    Generate polygon from Label Studio ellipse annotation
    Warning: this function only handles annotation with "image_rotation" = 0, otherwise, return False.

    Returns a tuple of
        - A bool flag to indicate whether the operation is successful
        - a closed polygon if successful, otherwise an empty list. E.g, [[x1, y1], [x2, y2], ...]
        - category of the annotation

    """
    success, center, axes, angle, category = gen_ellipse_from_annotation(label)
    if success:
        int_center = (int(center[0]), int(center[1]))
        int_axes = (int(axes[0]), int(axes[1]))
        int_angle = int(angle)
        poly = cv2.ellipse2Poly(center=int_center, axes=int_axes, angle=int_angle, arcStart=0, arcEnd=360, delta=delta)
        return True, poly, category
    else:
        return False, [], ""

def gen_polygon_w_boundingbox_from_annotation(label, delta=10):
    r"""
    Generate polygon and non-rotated bounding_box from Label Studio ellipse annotation
    Warning: this function only handles annotation with "image_rotation" = 0, otherwise, return False.

    Returns a tuple of
        - A bool flag to indicate whether the operation is successful
        - a closed polygon if successful, otherwise an empty list. E.g, [[x1, y1], [x2, y2], ...]
        - a bounding box in the format of (top_left_x, top_left_y, width, height)
        - category of the annotation

    """
    success, poly, category = gen_polygon_from_annotation(label, delta)
    if success:
        # bounding box format: (tlx, tly, w, h)
        bb = cv2.boundingRect(poly)
        return True, poly, bb, category
    else:
        return False, [], (0, 0, 0, 0), ""

def draw_ellipses(img_filename, ellipses, categories, thickness=1):
    img = cv2.imread(img_filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    tl = thickness
    tf = max(tl-1, 1)
    for e, category in zip(ellipses, categories):
        color = random_color()
        centerCoordinates = (int(e[0][0]), int(e[0][1]))
        axesLength = (int(e[1][0]), int(e[1][1]))
        angle = e[2]
        cv2.ellipse(img, centerCoordinates, axesLength, angle, startAngle=0, endAngle=360, color=color, thickness=thickness)
#         x, y, w, h = cv2.boundingRect(pts)
#         cv2.rectangle(img, pt1=(x, y), pt2=(x+w, y+h), color=color, thickness=thickness, lineType=cv2.LINE_AA)
#         t_size = cv2.getTextSize(category, 0, fontScale=tl/3, thickness=thickness)[0]
#         c1 = (x, y)
#         c2 = c1[0] + t_size[0], c1[1] - t_size[1] -3
#         cv2.rectangle(img, c1, c2, color=color, thickness=-1, lineType=cv2.LINE_AA)  # filled
#         cv2.putText(img, category, (c1[0], c1[1]-2), 0, tl/3, [255,255,255], thickness=tf, lineType=cv2.LINE_AA)
    return img

def draw_rotated_bboxes(img_filename, rboxes, texts, thickness=1, color=None):
    img = cv2.imread(img_filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = draw_rotated_bboxes_on_image(img, rboxes, texts, thickness, color)
    return img

def draw_rotated_bboxes_on_image(img, rboxes, texts, thickness=1, color=None):
    img_draw = img.copy()
    tl = thickness
    tf = max(tl-1, 1)
    for rb, text in zip(rboxes, texts):
        c = random_color() if color is None else color
        box = cv2.boxPoints(rb)
        box = np.int0(box)
        cv2.drawContours(img_draw, [box], 0, color=c, thickness=thickness)
        t_size = cv2.getTextSize(text, 0, fontScale=tl/3, thickness=thickness)[0]
        pt = np.amin(box, axis=0)
        c1 = (pt[0], pt[1])
        c2 = c1[0] + t_size[0], c1[1] - t_size[1] -3
        cv2.rectangle(img_draw, c1, c2, color=color, thickness=-1, lineType=cv2.LINE_AA)  # filled
        cv2.putText(img_draw, text, (c1[0], c1[1]-2), 0, tl/3, [255,255,255], thickness=tf, lineType=cv2.LINE_AA)
    return img_draw
    

def draw_polygons(img_filename, polygons, categories, thickness=1):
    img = cv2.imread(img_filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    tl = thickness
    tf = max(tl-1, 1)
    for p, category in zip(polygons, categories):
        color = random_color()
        pts_x = p[::2]
        pts_y = p[1::2]
        pts = [[x, y] for x,y in zip(pts_x, pts_y)]
        pts = np.array(pts, np.int32)
        cv2.polylines(img, [pts], isClosed=True, thickness=thickness, color=color)
        # bounding box format: (tlx, tly, w, h)
        x, y, w, h = cv2.boundingRect(pts)
        cv2.rectangle(img, pt1=(x, y), pt2=(x+w, y+h), color=color, thickness=thickness, lineType=cv2.LINE_AA)
        t_size = cv2.getTextSize(category, 0, fontScale=tl/3, thickness=thickness)[0]
        c1 = (x, y)
        c2 = c1[0] + t_size[0], c1[1] - t_size[1] -3
        cv2.rectangle(img, c1, c2, color=color, thickness=-1, lineType=cv2.LINE_AA)  # filled
        cv2.putText(img, category, (c1[0], c1[1]-2), 0, tl/3, [255,255,255], thickness=tf, lineType=cv2.LINE_AA)
    return img
    

def draw_labelstudio_annotations(img_filename, annotations, draw_polygon=True, draw_boundingbox=True, delta=10, thickness=1):
    img = cv2.imread(img_filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    # img_h, img_w = img.shape[:2]
    # print("Image size = {}".format((img_h, img_w)))
    
    tl = thickness
    tf = max(tl-1, 1)
    for v in annotations:
        if draw_polygon:
            success_poly, poly, category = gen_polygon_from_annotation(v)
            if success_poly:
                cv2.polylines(img, [poly], isClosed=True, thickness=thickness, color=COLORS[category])
                # cv2.fillConvexPoly(img, poly, color=COLORS[category])
                
        if draw_boundingbox:
            success_bb, _, (x, y, w, h), category = gen_polygon_w_boundingbox_from_annotation(v, delta=delta)
            if success_bb:
                cv2.rectangle(img, pt1=(x, y), pt2=(x+w, y+h), color=COLORS[category], thickness=thickness, lineType=cv2.LINE_AA)
                t_size = cv2.getTextSize(category, 0, fontScale=tl/3, thickness=thickness)[0]
                c1 = (x, y)
                c2 = c1[0] + t_size[0], c1[1] - t_size[1] -3
                cv2.rectangle(img, c1, c2, color=COLORS[category], thickness=-1, lineType=cv2.LINE_AA)  # filled
                cv2.putText(img, category, (c1[0], c1[1]-2), 0, tl/3, [255,255,255], thickness=tf, lineType=cv2.LINE_AA)
    return img

def get_detectron2_dicts(img_dir, json_filename, delta=5):
    labelstudio_annotations = filter_annotations(img_dir, json_filename)

    dataset_dicts = []
    for idx, v in enumerate(labelstudio_annotations):
        record = {}
        img_filename = v["image"]
        annotations = v["annotations"]
        img_h, img_w = cv2.imread(img_filename).shape[:2]
        
        record["file_name"] = img_filename
        record["image_id"] = idx
        record["height"] = img_h
        record["width"] = img_w

        objs = []
        for anno in annotations:
            if anno["original_width"] != record["width"] or anno["original_height"] != record["height"]:
                print("Generate record error!")
                return []
            
            success, poly, _, category = gen_polygon_w_boundingbox_from_annotation(anno, delta=delta)
            # Convert from [[x1, y1], [x2, y2], ...] to [x1, y1, x2, y2, ...]
            px = [x for x, _ in poly]
            py = [y for _, y in poly]
            poly = [(float(x), float(y)) for x, y in poly]
            poly = [p for x in poly for p in x]
            if success:
                if len(poly) <= 4:
                    continue
                obj = {
                    "bbox": [np.min(px), np.min(py), np.max(px), np.max(py)],
                    "bbox_mode": BoxMode.XYXY_ABS,
                    "segmentation": [poly],
                    "category_id": 0,  # single category
                }
                objs.append(obj)
            else:
                print("Generate record error!")
                return []
        record["annotations"] = objs
        dataset_dicts.append(record)
    
    return dataset_dicts


def binary_mask_to_polygon(binary_mask, tolerance=0):
    r""" Converts a binary mask to COCO polygon representation
    Args:
        binary_mask: a 2D binary numpy array where '1's represent the object
        tolerance: Maximum distance from original points of polygon to approximated polygonal chain. If tolerance is 0, the original coordinate array is returned.
    
    """
    def close_contour(contour):
        if not np.array_equal(contour[0], contour[-1]):
            contour = np.vstack((contour, contour[0]))
        return contour
    
    polygons = []
    # pad mask to close contours of shapes which start and end at an edge
    padded_binary_mask = np.pad(binary_mask, pad_width=1, mode='constant', constant_values=0)
    contours = measure.find_contours(padded_binary_mask, 0.5)
    for contour in contours:
        contour = close_contour(contour)
        if len(contour) < 3:
            continue
        contour = np.flip(contour, axis=1)
        segmentation = contour.ravel().tolist()
        # after padding and subtracting 1 we may get -0.5 points in our segmentation
#         segmentation = [0 if i < 0 else i for i in segmentation]
        polygons.append(segmentation)
    return polygons

def fit_polygens_to_ellipses(polygons):
    ellipses = []
    for p in polygons:
        pts_x = p[::2]
        pts_y = p[1::2]
        pts = [[x, y] for x,y in zip(pts_x, pts_y)]
        pts = np.array(pts, np.int32)
        ellipses.append(cv2.fitEllipse(pts))
    return ellipses

def fit_polygens_to_rotated_bboxes(polygons):
    rbboxes = []
    for p in polygons:
        pts_x = p[::2]
        pts_y = p[1::2]
        pts = [[x, y] for x,y in zip(pts_x, pts_y)]
        pts = np.array(pts, np.float32)
        rect = cv2.minAreaRect(pts)  #  ((cx, cy), (w, h), a)
        rbboxes.append(rect)
    return rbboxes
        

In [None]:
# Dataset summary
prj_path = pathlib.Path().absolute().parent

dataset_name = "stomata100"

json_filename = os.path.join(prj_path, "{}/labels/labels.json".format(dataset_name))

full_img_dir = os.path.join(prj_path, "{}/images".format(dataset_name))
train_img_dir = os.path.join(prj_path, "{}/train".format(dataset_name))
val_img_dir = os.path.join(prj_path, "{}/val".format(dataset_name))

labelstudio_annotations = filter_annotations(full_img_dir, json_filename)
train_labelstudio_annotations = filter_annotations(train_img_dir, json_filename)
val_labelstudio_annotations = filter_annotations(val_img_dir, json_filename)

for ls_annotation in [labelstudio_annotations, train_labelstudio_annotations, val_labelstudio_annotations]:
    summary = {}
    for v in ls_annotation:
        annotations = v["annotations"]
        for anno in annotations:
            category = anno['value']['ellipselabels'][0]
            if category in summary:
                summary[category] += 1
            else:
                summary[category] = 1
    print(summary)

In [None]:
# Draw ground truth on image data and save on local disk
prj_path = pathlib.Path().absolute().parent
dataset_name = "stomata100"
json_filename = os.path.join(prj_path, "{}/labels/labels.json".format(dataset_name))

full_img_dir = os.path.join(prj_path, "{}/images".format(dataset_name))
train_img_dir = os.path.join(prj_path, "{}/train".format(dataset_name))
val_img_dir = os.path.join(prj_path, "{}/val".format(dataset_name))

labelstudio_annotations = filter_annotations(full_img_dir, json_filename)
save_dir = os.path.join(prj_path, "{}/images_w_gt".format(dataset_name))
os.makedirs(save_dir, exist_ok=True)

for anno in labelstudio_annotations:
    img_filename = anno["image"]
    annotations = anno["annotations"]
    img = draw_labelstudio_annotations(img_filename, annotations, thickness=2)
    save_filename = os.path.join(save_dir, os.path.basename(img_filename))
    cv2.imwrite(save_filename, cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

In [None]:
prj_path = pathlib.Path().absolute().parent
dataset_name = "stomata100"

json_filename = os.path.join(prj_path, "{}/labels/labels.json".format(dataset_name))

# Get Detectron2 ground truth
for d in ["train", "val"]:
    catelog_name = "{}_{}".format(dataset_name, d)
    if catelog_name in DatasetCatalog:
        DatasetCatalog.remove(catelog_name)
    if catelog_name in MetadataCatalog:
        MetadataCatalog.remove(catelog_name)
    
    img_dir = os.path.join(prj_path, catelog_name)
    DatasetCatalog.register(catelog_name, 
        lambda d=d: get_detectron2_dicts(os.path.join(prj_path, "{}/{}".format(dataset_name, d)), json_filename))
    MetadataCatalog.get(catelog_name).set(thing_classes=["stomata"])

stomata_metadata = MetadataCatalog.get("{}_train".format(dataset_name))

In [None]:
# Inference should use the config with parameters that are used in training
# cfg now already contains everything we've set previously. We changed it a little bit for inference:

dataset_name = "stomata100"

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
cfg.DATASETS.TRAIN = ("{}_train".format(dataset_name),)
cfg.DATASETS.TEST = ("{}_val".format(dataset_name), )
cfg.DATALOADER.NUM_WORKERS = 2
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")  # Let training initialize from model zoo
cfg.SOLVER.IMS_PER_BATCH = 2  # This is the real "batch size" commonly known to deep learning people
cfg.SOLVER.BASE_LR = 0.00025  # pick a good LR
cfg.SOLVER.MAX_ITER = 1000    # you may need to train longer for a practical dataset
cfg.SOLVER.STEPS = []        # do not decay learning rate
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128   # The "RoIHead batch size". 128 is faster, and good enough for this toy dataset (default: 512)
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1  # only has one class (stomata). (see https://detectron2.readthedocs.io/tutorials/datasets.html#update-the-config-for-new-datasets)
# NOTE: this config means the number of classes, but a few popular unofficial tutorials incorrect uses num_classes+1 here.
cfg.MODEL.DEVICE = "cpu"    # train with CPU


model_folder_name = "stomata100_output_ep8000_2022_10_06_04_36_29"
cfg.OUTPUT_DIR = "./{}".format(model_folder_name)
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")  # path to the model we just trained


In [None]:
dataset_name = "stomata100"

for i in range(1,10):
    cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = i/10   # set a custom testing threshold
    
    save_dir = os.path.join(
        prj_path, 
        "{}/{}_val_outputs/thres_{}".format(model_folder_name, dataset_name, cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST))
    shutil.rmtree(save_dir, ignore_errors=True)
    os.makedirs(save_dir)
    
    predictor = DefaultPredictor(cfg)

    prj_path = pathlib.Path().absolute().parent
    json_filename = os.path.join(prj_path, "{}/labels/labels.json".format(dataset_name))
    full_img_dir = os.path.join(prj_path, "{}/images".format(dataset_name))
    train_img_dir = os.path.join(prj_path, "{}/train".format(dataset_name))
    val_img_dir = os.path.join(prj_path, "{}/val".format(dataset_name))

    val_dataset_dicts = get_detectron2_dicts(val_img_dir, json_filename, delta=5)
    labelstudio_annotations = filter_annotations(val_img_dir, json_filename)

    for idx, d in enumerate(val_dataset_dicts):
        im = cv2.imread(d["file_name"])
        outputs = predictor(im)  # format is documented at https://detectron2.readthedocs.io/tutorials/models.html#model-output-format

        v = Visualizer(im[:, :, ::-1],
                        metadata=stomata_metadata,
                        scale=1,
    #                     instance_mode=ColorMode.IMAGE_BW   # remove the colors of unsegmented pixels. This option is only available for segmentation models
            )
        out = v.draw_instance_predictions(outputs["instances"].to("cpu"))

        fig = plt.figure(figsize=(12, 8), dpi=300)
        fig.add_subplot(1, 3, 1)
        plt.imshow(out.get_image())
        plt.axis('off')
        plt.title("Pred: {}".format(os.path.basename(d["file_name"])))

        filename, file_extension = os.path.splitext(os.path.basename(d["file_name"]))
        # Set file extension to JPEG
        file_extension = ".jpg"
        save_filename = os.path.join(
            save_dir, 
            "{}_thres_{}_inst_seg{}".format(filename, cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST, file_extension))
        cv2.imwrite(save_filename, cv2.cvtColor(out.get_image(), cv2.COLOR_BGR2RGB))
        
        
        img_filename = labelstudio_annotations[idx]["image"]
        annotations = labelstudio_annotations[idx]["annotations"]
        
        
        pred_masks = outputs['instances'].pred_masks
        pred_categories = outputs['instances'].pred_classes
        pred_scores = outputs['instances'].scores
        polygons, categories = [], []
        for mask, category, score in zip(pred_masks, pred_categories, pred_scores):
            po = binary_mask_to_polygon(mask)
            polygons += po
            categories += ["stomata {:.0%}".format(score)] * len(po)
        
        
        # Draw fitted bounding boxes
        fitted_rbboxs = fit_polygens_to_rotated_bboxes(polygons)
        img_draw = draw_rotated_bboxes(img_filename, fitted_rbboxs, categories, thickness=2, color=None)
        fig.add_subplot(1, 3, 2)
        plt.imshow(img_draw)
        plt.axis('off')
        plt.title("Pred_rotated_bboxes: {}".format(os.path.basename(d["file_name"])))
        save_filename = os.path.join(
            save_dir, 
            "{}_thres_{}_rotated_bbox{}".format(filename, cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST, file_extension))
        cv2.imwrite(save_filename, cv2.cvtColor(img_draw, cv2.COLOR_BGR2RGB))
        

        # Draw annotation ground truth
        img = draw_labelstudio_annotations(img_filename, annotations, thickness=2)
        fig.add_subplot(1, 3, 3)
        plt.imshow(img)
        plt.axis('off')
        plt.title("Ground truth: {}".format(os.path.basename(d["file_name"])))
        
        save_filename = os.path.join(
            save_dir, 
            "benchmark_{}_thres_{}{}".format(filename, cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST, file_extension))
        fig.savefig(save_filename, bbox_inches='tight')
        
        

We can also evaluate its performance using AP metric implemented in COCO API.

In [None]:
from detectron2.evaluation import COCOEvaluator, inference_on_dataset
from detectron2.data import build_detection_test_loader

dataset_name = "stomata100"

split = "{}_val".format(dataset_name)

results = []
for i in range(1, 10):
    thres = i/10
    cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = thres   # set a custom testing threshold
    cfg.INPUT.MIN_SIZE_TEST = 0 # disable resize in testing
    
    predictor = DefaultPredictor(cfg)
    evaluator = COCOEvaluator(split, output_dir=cfg.OUTPUT_DIR)
    val_loader = build_detection_test_loader(cfg, split)
    dataset_result = inference_on_dataset(predictor.model, val_loader, evaluator)
    
    res = {
        "thres": thres,
        "bbox": {
            "AP": dataset_result['bbox']['AP'],
            "AP50": dataset_result['bbox']['AP50'],
            "AP75": dataset_result['bbox']['AP75'],
        },
        "segm": {
            "AP":   dataset_result['segm']['AP'],
            "AP50": dataset_result['segm']['AP50'],
            "AP75": dataset_result['segm']['AP75'],
        }
    }
    
    results.append(res)
    # another equivalent way to evaluate the model is to use `trainer.test`

In [None]:
# print(cfg)

for _, v in enumerate(results):
    print("\n==========================================")
    print("\nThres = {}".format(v["thres"]))
    print("\n=== bbox ===")
    for k in ["AP", "AP50", "AP75"]:
        print("{}: {}".format(k, v["bbox"][k]))
    print("=== segm ===")
    for k in ["AP", "AP50", "AP75"]:
        print("{}: {}".format(k, v["segm"][k]))