# SLAP Inference

In [None]:
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import posixpath

import requests
from io import BytesIO
from PIL import Image, ImageOps
import numpy as np
import pandas as pd

import inm
import idf

In [None]:
# this makes our figures bigger
pylab.rcParams['figure.figsize'] = 20, 12

Those are the relevant imports for the detection model

In [None]:
from maskrcnn_benchmark.config import cfg
from predictor import COCODemo

We provide a helper class `COCODemo`, which loads a model from the config file, and performs pre-processing, model prediction and post-processing for us.

We can configure several model options by overriding the config options.
In here, we make the model run on the CPU

In [None]:
config_file = "../configs/e2e_mask_rcnn_R_50_FPN_1x.yaml"

# update the config options with the config file
cfg.merge_from_file(config_file)

# Switch between model Finger and Finger-Tips
#cfg.merge_from_list(["MODEL.DEVICE", "cpu", "MODEL.WEIGHT", "../models/fingers/model_final.pth"])
cfg.merge_from_list(["MODEL.DEVICE", "cuda", "MODEL.WEIGHT", "../models/finger-tips/model_final.pth"])

In [None]:
coco_demo = COCODemo(
    cfg,
    min_image_size=800,
    confidence_threshold=0.7,
)
slap_wsize = 800

#SLAP1
slap_idf = "../datasets/slaps/SLAP1_gt_20190408_idf.h5"
prefix = "../datasets/slaps/slap1"

#SLAP2
#slap_idf = "../datasets/slaps/SLAP2_gt_20190408_idf.h5"
#prefix = "../datasets/slaps/slap2"

In [None]:
def load_img(path, scale=0.5):
    pil_image = Image.open(path).convert("RGB")
    w, h = pil_image.size
    scaled_w, scaled_h = int(scale * w), int(scale * h)
    pil_image = pil_image.resize((scaled_w, scaled_h))
    # convert to BGR format
    image = np.array(pil_image)[:, :, [2, 1, 0]]
    w_pad = slap_wsize - scaled_w
    w_pad = w_pad if w_pad > 0 else 0
    h_pad = slap_wsize - scaled_h
    h_pad = h_pad if h_pad > 0 else 0
    image = np.pad(image, ((0, h_pad),(0, w_pad),(0,0)), 'constant', constant_values=255)
    image = image[:800, :800]
    return image, scale

def iterate_images():
    reader =  idf.reader.Reader(slap_idf)
    for i_img, _ in enumerate(reader.img_ids):
        ann = reader.annotations(i_img, idf.utils.EAnnotationType.BoundingBox)
        img = reader.image(i_img)
        yield img, ann

def get_image(i_img):
    reader =  idf.reader.Reader(slap_idf)
    ann = reader.annotations(i_img, idf.utils.EAnnotationType.BoundingBox)
    img = reader.image(i_img)
    return img, ann
        

def dataframe_gt(img_ids, ann_ids, bboxes) -> pd.DataFrame:
    data = [{
        "img_id": img_id, "ann_id": ann_id,
        "bbox_up": bbox[1], "bbox_left": bbox[0],
        "bbox_height": bbox[3], "bbox_width": bbox[2]
        }
        for img_id, ann_id, bbox in zip(img_ids, ann_ids,  bboxes)
    ]

    return pd.DataFrame(data)

def dataframe_det(img_ids, det_ids, bboxes, scores) -> pd.DataFrame:
    data = [{
        "img_id": img_id, "det_id": ann_id,
        "bbox_up": bbox[1], "bbox_left": bbox[0],
        "bbox_height": bbox[3], "bbox_width": bbox[2],
        "det_score": score,
        }
        for img_id, ann_id, bbox, score in zip(img_ids, det_ids,  bboxes, scores)
    ]

    return pd.DataFrame(data)    

In [None]:
img_ids = []
ann_ids = []
bboxes = []
img_ids_dt = []
bboxes_dt = []
scores_dt = []
i = 0
for iret, aret in iterate_images(): 
    img_ids += [iret.img_id] * len(aret.ann_id)
    ann_ids += aret.ann_id.tolist()
    bboxes += aret.ann
    
    path = posixpath.join(prefix, iret.img_path)
    img, ratio = load_img(path, 0.5)
    predictions = coco_demo.compute_prediction(img)
    top_predictions = coco_demo.select_top_predictions(predictions).convert("xywh")
    scores_dt += top_predictions.get_field("scores").tolist()
    img_ids_dt += [iret.img_id] * len(top_predictions.get_field("scores"))
    bboxes_dt += (top_predictions.bbox / ratio).tolist()

    

In [None]:
gt = dataframe_gt(img_ids, ann_ids, bboxes)
dt = dataframe_det(img_ids_dt, list(range(len(scores_dt))), bboxes_dt, scores_dt)

In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Assign detections to ground truth

The assignment is based on the Intersection over Union (IoU) and
the detection score. The definitions of True Positive (tp),
False Positive (tp) and False Negative (fn) follows the Pascal VOC and
later on the COCO.
https://www.robots.ox.ac.uk/~vgg/publications/2015/Everingham15/everingham15.pdf

The assignment is computed for the given range of IoU thresholds.

Result is represented as a pandas DataFrame with columns

 - img_id : image id
 - ann_id : ground truth annotation id
 - det_id : detection annotation id
 - iou : IoU between ann_id and det_id
 - det_score : detection score of the assigned detection
 - label : assigned label, 0 (fp), 1 (tp), or 2 (fn) - see DALabel enum

In a case the input is list of lists, ann_id and det_id correspond to
particular index in the list

Example:
-------

>>> gts = [[[10, 10, 64, 64], [200, 200, 64, 64], [110, 110, 64, 64], [170, 20, 64, 64]]]
>>> dts = [[[10, 40, 64, 64], [50, 10, 64, 64], [300, 300, 64, 64], [180, 50, 64, 64]]]
>>> dt_scores = [np.asarray([0.85, 0.78, 0.65, 0.7], dtype=np.float32)]
>>> # Get assignment for a IoU threshold of 0.2
>>> da = DetectionAssignment(gts, dts, dt_scores, (0.2, ))
>>> # Get the assignment for the first given IoU threshold
>>> assigned = da.assign()[0]; assigned
   img_id  ann_id  det_id       iou  det_score  label
0       0       0       0  0.361702       0.85      1
1       0       3       3  0.288861       0.70      1
2       0      -1       1  0.000000       0.78      0
3       0      -1       2  0.000000       0.65      0
4       0       1      -1  0.000000       0.00      2
5       0       2      -1  0.000000       0.00      2
>>> # Get the labels and scores for scikit-learn related functions
>>> labels, scores = true_targets(assigned)
>>> labels
[0, 0, 1, 1, 1, 1]
>>> np.asarray(scores, dtype=np.float32) # Get rid of float64 in list
array([0.78, 0.65, 0.85, 0.7 , 0.  , 0.  ], dtype=float32)
"""

from collections import OrderedDict, namedtuple
from enum import IntEnum
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from inm.df import ANN, BBOX, DET, IMG, IOU, LABEL

__all__ = [
    "DALabel",
    "DetectionAssignment",
    "true_targets",
    "IOU_THRS",
    "per_image_label_counts",
    "per_iou_image_label_counts",
    "per_iou_totals",
]


class DALabel(IntEnum):
    undefined = -1
    fp = 0
    tp = 1
    fn = 2
    tn = 3


IOU_THRS = "iou_thrs" # IoU threshold values as named index


class DetectionAssignment:
    """DetectionAssignment constructor.

    Args:
        gts (Union[List, pd.Dataframe]): Ground truths
        dts (Union[List, pd.Dataframe]): Detections
        scores (Optional[List], optional): Detection scores. Defaults to None.
        iou_thrs (Union[List, Tuple], optional): IoU threshold. Defaults to (0.5,).

    Note:
        If dts is of type List, dt_scores has to be provided as well.
    """

    GtDt = namedtuple("GtDt", ["gt", "dt"])

    def __init__(
            self, gts: Union[List, pd.DataFrame],
            dts: Union[List, pd.DataFrame],
            scores: Optional[List] = None,
            iou_thrs: Union[List, Tuple] = (0.5,)):
        if isinstance(gts, list):
            gts = self._list_to_df(gts)
        if isinstance(dts, list):
            if scores is None:
                raise ValueError('Missing dt_scores')
            dts = self._list_to_df(dts)
            dts = dts.rename(columns={ANN.ID: DET.ID})
            dts[DET.SCORE] = [DET.SCORE.dtype(s) for sc in scores for s in sc]

        self.gts = gts.set_index(ANN.ID, drop=False)
        self.dts = dts.set_index(DET.ID, drop=False)  # fp score selection in _label_detections
        self.iou_thrs = iou_thrs

    def _list_to_df(self, data):
        n_img = len(data)
        ann_id = [i_ann for bboxes in data for i_ann, _ in enumerate(bboxes)] #list(range(sum([len(bboxes) for bboxes in data])))

        imgs = []
        for img_id, n_img_id in zip(list(range(n_img)), [len(d) for d in data]):
            imgs += [img_id] * n_img_id

        dataframe = {
            IMG.ID: imgs,
            ANN.ID: ann_id,
            BBOX.Y0: [box[0] for d in data for box in d],
            BBOX.X0: [box[1] for d in data for box in d],
            BBOX.HEIGHT: [box[2] for d in data for box in d],
            BBOX.WIDTH: [box[3] for d in data for box in d],
        }

        return pd.DataFrame(dataframe)

    def _iou_mat(self, gt: np.ndarray, dt: np.ndarray) -> np.ndarray:
        """Compute IoU of all gt and dt bboxes."""
        tiled_gt = np.tile(gt, (len(dt), 1))
        repeated_dt = np.repeat(dt, len(gt), axis=0)

        gt_y_end = tiled_gt[:, 0] + tiled_gt[:, 2]
        gt_x_end = tiled_gt[:, 1] + tiled_gt[:, 3]
        dt_x_end = repeated_dt[:, 1] + repeated_dt[:, 3]
        dt_y_end = repeated_dt[:, 0] + repeated_dt[:, 2]

        y_max = np.maximum(tiled_gt[:, 0], repeated_dt[:, 0])
        y_min = np.minimum(gt_y_end, dt_y_end)
        x_max = np.maximum(tiled_gt[:, 1], repeated_dt[:, 1])
        x_min = np.minimum(gt_x_end, dt_x_end)

        height = tiled_gt[:, 2]
        width = tiled_gt[:, 3]

        intersection_height = y_min - y_max
        intersection_width = x_min - x_max

        i_iou = (intersection_height > 0) & (intersection_width > 0)

        area_intersection = intersection_height * intersection_width
        area_gt = height * width
        area_dt = repeated_dt[:, 2] * repeated_dt[:, 3]

        denom = area_dt + area_gt - area_intersection

        iou = np.zeros(len(tiled_gt), dtype=np.float)
        iou[i_iou] = area_intersection[i_iou] / denom[i_iou]

        return iou.reshape(len(dt), len(gt)).T

    def _label_detections(self, iou_score, dfs, iou_thr=0.5) -> pd.DataFrame:
        """Label detections based on IoU.

        Note:
            One can not define empty dataframe with given column dtypes.
            The only way is to create a dataframe from
            structured numpy array.
        """
        # Sort by score and iou
        dt = iou_score.query(f'{IOU} >= @iou_thr').sort_values(
            [DET.SCORE, IOU], ascending=False)
        # Define dtype of possible empty dataframes
        dtypes = [
            (ANN.ID, ANN.ID.dtype), (DET.ID, DET.ID.dtype), (IOU, IOU.dtype),
            (DET.SCORE, DET.SCORE.dtype), (LABEL, LABEL.dtype)]
        # TP
        i_dt = dt.index[dt.duplicated(DET.ID).values == False]
        i_tp = dt.index[dt.loc[i_dt].duplicated(ANN.ID).values == False]
        tp = dt.loc[i_tp].assign(**{LABEL: DALabel.tp})
        # FP
        id_fp = list(set(dfs.dt[DET.ID]) - set(tp[DET.ID]))
        n_id_fp = len(id_fp)
        fpl = [
            [ANN.ID.undefined] * n_id_fp, id_fp, [0.] * n_id_fp,
            dfs.dt.loc[id_fp, DET.SCORE], [DALabel.fp] * n_id_fp]
        fp = np.array(list(zip(*fpl)), dtype=dtypes)
        fp = pd.DataFrame(fp)
        # FN
        id_fn = list(set(dfs.gt[ANN.ID]) - set(tp[ANN.ID]))
        n_id_fn = len(id_fn)
        fnl = [
            id_fn, [DET.ID.undefined] * n_id_fn, [0.] * n_id_fn,
            [DET.SCORE.dtype(0.)] * n_id_fn, [DALabel.fn] * n_id_fn]
        fn = np.array(list(zip(*fnl)), dtype=dtypes)
        fn = pd.DataFrame(fn)

        return pd.concat([tp, fp, fn], ignore_index=True)

    def _df_iou(self, gt, dt):
        if gt.shape[0] == 0:  # No ground truth
            iou = np.zeros((1, len(dt.index)), dtype=IOU.dtype)
            return pd.DataFrame(iou, index=[ANN.ID.undefined], columns=dt.index)
        elif dt.shape[0] == 0:  # No detections
            iou = np.zeros((len(gt.index), 1), dtype=IOU.dtype)
            return pd.DataFrame(iou, index=gt.index, columns=[DET.ID.undefined])

        array_gt = gt[BBOX.y_x_height_width].values
        array_dt = dt[BBOX.y_x_height_width].values

        iou = self._iou_mat(array_gt, array_dt)
        return pd.DataFrame(iou, index=gt.index, columns=dt.index)

    def _df_score(self, scores, iou):
        score = iou.copy()
        if len(scores) == 0:
            score.loc[iou.index, iou.columns] = 0.
            return score
        score.loc[iou.index, iou.columns] = np.asarray([scores.tolist()] * len(iou))
        return score

    def _df_melt_iou_score(self, iou, score):
        iou[ANN.ID] = iou.index
        df = iou.melt(id_vars=ANN.ID, var_name=DET.ID, value_name=IOU)
        # adding iou.index as a new column named ANN.ID changes column
        # index type from int to object type which need to be typed back
        # to DET.ID.dtype (int)
        df[DET.ID] = df[DET.ID].astype(DET.ID.dtype)

        score[ANN.ID] = score.index
        score = score.melt(id_vars=ANN.ID, var_name=DET.ID, value_name=DET.SCORE)

        df[DET.SCORE] = score[DET.SCORE]
        return df

    def assign(self):
        dt_ids = self.dts[IMG.ID]
        gt_ids = self.gts[IMG.ID]
        id_images = sorted(list(set(dt_ids).union(set(gt_ids))))

        tdfs = []
        dfs_iou = []
        dfs_score = []
        dfs_iou_score = []
        for id_img in id_images:
            gt = self.gts.query(f'{IMG.ID} == @id_img')
            dt = self.dts.query(f'{IMG.ID} == @id_img')
            tdfs.append(self.GtDt(gt, dt))  # Avoid repeating the query
            iou = self._df_iou(gt, dt)
            score = self._df_score(dt[DET.SCORE], iou)
            dfs_iou.append(iou)
            dfs_score.append(score)
            dfs_iou_score.append(self._df_melt_iou_score(iou, score))

        assigned = []
        for iou_thr in self.iou_thrs:
            labeled = []
            for id_img, iou_score, dfs in zip(id_images, dfs_iou_score, tdfs):
                df_label = self._label_detections(iou_score, dfs, iou_thr)
                df_label.insert(0, IMG.ID, id_img)
                labeled.append(df_label)

            labeled_iou_thr = pd.concat(labeled, ignore_index=True)
            assigned.append(labeled_iou_thr)

        return assigned


def true_targets(assigned: pd.DataFrame) -> Tuple[List, List]:
    """Filter out inputs for scikit-learn methods.

    Args:
        assigned (pd.DataFrame): labeled data of DetectionAssignment

    Returns:
        Tuple: labels and scores
    """
    grouped = assigned.groupby(LABEL)

    switch = {
        DALabel.tp: lambda x: [1] * len(x),
        DALabel.fn: lambda x: [1] * len(x),
        DALabel.fp: lambda x: [0] * len(x),
    }

    scores = []
    labels = []
    for label, group in grouped:
        if label in switch:
            scores += group[DET.SCORE].tolist()
            labels += switch[label](group)

    return labels, scores


def per_image_label_counts(assigned: pd.DataFrame) -> pd.DataFrame:
    """Get per image tp, fp, fn counts.

    Args:
        assigned (pd.DataFrame): labeled data of DetectionAssignment

    Returns:
        pd.DataFrame
    """
    image_assign = assigned[[LABEL, IMG.ID]].pivot_table(
        index=IMG.ID, columns=[LABEL], aggfunc=len, fill_value=0)
    image_assign.columns.name = None
    image_assign.reset_index(inplace=True)

    return image_assign


def per_iou_image_label_counts(das: list, ious: tuple) -> pd.DataFrame:
    """Get per IoU per image tp, fp, fn counts.

    Args:
        das (list): detection assignments
        ious (tuple): IoU thresholds

    Returns:
        pd.DataFrame
    """
    assert len(das) == len(ious), "das and ious must have same length"

    iou_assign = pd.concat(das, keys=ious)
    iou_assign.index.rename([IOU_THRS, None], inplace=True)

    iou_image_assign = iou_assign[[LABEL, IMG.ID]].pivot_table(
        index=(IOU_THRS, IMG.ID), columns=[LABEL], aggfunc=len, fill_value=0)
    iou_image_assign.columns.name = None

    return iou_image_assign


def per_iou_totals(iou_image_assign: pd.DataFrame) -> pd.DataFrame:
    """Get total counts of tp, fp, fn per IoU.

    Args:
        iou_image_assign (pd.DataFrame): per IoU per image assignment

    Returns:
        pd.DataFrame
    """
    return iou_image_assign.pivot_table(
        index=(IOU_THRS),
        values=(DALabel.tp, DALabel.fp, DALabel.fn),
        aggfunc=np.sum)


In [None]:
da = DetectionAssignment(gt, dt, iou_thrs=(0.5,))
assigned = da.assign()

In [None]:
iou_image_assign = per_iou_image_label_counts(assigned, (0.5, ))
iou_image_assign.sum(axis=0)

In [None]:
with pd.HDFStore("slap-detections.h5", "w") as hdf:
    hdf["data"] = dt