In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.patches as patches

from collections import defaultdict

import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.rcParams["figure.figsize"] = (15, 10)
block_plot = False

In [None]:
# Compute intersection over union of detected boxes with GTs.
def get_iou(predicted_dbox, ground_truth_dboxes):
    """
        Calculates IoU (Jaccard index) of two detection boxes:
            predicted_dbox ∩ ground_truth_dbox / (area(predicted_dbox) +
            area(ground_truth_dbox) - predicted_dbox ∩ ground_truth_dbox)

        Parameters:
            Coordinates of detection boxes are supposed to be in the following form: [x1, y1, x2, y2]
            predicted_dbox: [tensor] predicted detection boxe
            ground_truth_dboxes: [tensor] ground truth detection boxes

        Return value:
            overlap area
    """
    # Get coordinates of boxes to prepare overlap data.
    xmin = tf.math.maximum(ground_truth_dboxes[:, 0], predicted_dbox[0])
    ymin = tf.math.maximum(ground_truth_dboxes[:, 1], predicted_dbox[1])
    xmax = tf.math.minimum(ground_truth_dboxes[:, 2], predicted_dbox[2])
    ymax = tf.math.minimum(ground_truth_dboxes[:, 3], predicted_dbox[3])

    width = tf.math.maximum(xmax - xmin + 1., 0.0)
    height = tf.math.maximum(ymax - ymin + 1., 0.0)
    intersection_area = width * height

    # Calculate the union value.
    union = ((predicted_dbox[2] - predicted_dbox[0] + 1.) * (predicted_dbox[3] - predicted_dbox[1] + 1.) +
             (ground_truth_dboxes[:, 2] - ground_truth_dboxes[:, 0] + 1.) *
             (ground_truth_dboxes[:, 3] - ground_truth_dboxes[:, 1] + 1.) - intersection_area)

    return intersection_area / union

In [None]:
# Check if intersections of predicted and ground-truth boxes are greater than predefined threshold.
def check_iou_threshold(predicted_dbox, ground_truth_dboxes, intersection_threshold):
    """
        Get the predictions with an appropriate IoU area for further true positives computations

        Parameters:
        predicted_dboxes: predicted by the detector detection boxes
        ground_truth_dboxes: ground truth
        intersection_threshold: IoU threshold

        Return value:
            tensor with the following values:
                    True - if the IoU passed defined threshold
                    False - if the IoU did not pass defined threshold
            index of the maximum IoU value
    """
    intersection_over_union = get_iou(predicted_dbox, ground_truth_dboxes)
    return  (
                tf.math.reduce_max(intersection_over_union) >= intersection_threshold,
                tf.math.argmax(intersection_over_union)
            )

In [None]:
def plot_groundtruth_prediction_boxes(ground_truths, predictions):
    """
    "Plots ground truth and prediction bounding boxes"

    Parameters:

    gt_bboxes (list): a list of ground truths.
                      Format:
                      [
                          [Image_ID(str), [x_min, y_min, x_max, y_max] (tf.Variable)],
                          [Image_ID(str), [x_min, y_min, x_max, y_max] (tf.Variable)],
                                         :
                                         :
                          [Image_ID(str), [x_min, y_min, x_max, y_max] (tf.Variable)]

                      ]

    pred_bboxes (list): a list of detected bounding box data containing ImageID, boxes, conf. scores.
                        Format:
                        [
                          [Image_ID(str), [x_min, y_min, x_max, y_max](tf.Variable), prob_1(tf.Variable)],
                          [Image_ID(str), [x_min, y_min, x_max, y_max](tf.Variable), prob_2(tf.Variable)],
                                                     :
                                                     :
                          [Image_ID(str), [x_min, y_min, x_max, y_max](tf.Variable), prob_n(tf.Variable)]

                        ]
    """

    box_data = [ground_truths, predictions]

    sorted_ind = [i for i,_ in sorted(enumerate(predictions), key=lambda conf: conf[1][2], reverse=True)]


    colors = ['b', 'r']
    label_prefix = ['g-truth', 'pred']

    fig = plt.figure(figsize=(15, 10))
    ax = fig.add_subplot(111, aspect='equal')

    for boxtype_i, boxes in enumerate(box_data):

        dbox_data = tf.stack([box[1] for box in boxes])

        width = dbox_data[:, 2] - dbox_data[:, 0] + 1
        height = dbox_data[:, 3] - dbox_data[:, 1] + 1

        for i in range(tf.shape(dbox_data)[0]):

            # Ground Truths.
            if boxtype_i == 0:
                index = i
                label = '{}. {}'.format(index, label_prefix[0])

            # Detections.
            else:
                index = sorted_ind[i]
                label = '{0}. {1}({2:.2})'.format(index, label_prefix[1], predictions[index][2].numpy())


            x_label = dbox_data[index, 0] + 8
            y_label = dbox_data[index, 3] - 10

            ax.add_patch(
                patches.Rectangle(
                    (dbox_data[index, 0], dbox_data[index, 1]),
                    width[index],
                    height[index],
                    fill=False,      # remove background
                    color=colors[boxtype_i],
                    linewidth = 3,
            )
            )



            ax.text(x_label, y_label, label, color=colors[boxtype_i],
                    bbox=dict(facecolor='none', edgecolor=colors[boxtype_i]))


    plt.xticks(range(0, 1550, 50))
    plt.yticks(range(0, 1050, 50))

    plt.gca().invert_yaxis()
    plt.gca().axis('tight')
    plt.show(block=block_plot)

    return

In [None]:
# bounding boxes / detection
ground_truths = [
                 ['1', tf.Variable([ 100, 500,  300, 950], dtype=tf.float32)],
                 ['1', tf.Variable([ 350, 550,  570, 930], dtype=tf.float32)],
                 ['1', tf.Variable([ 600, 200,  850, 600], dtype=tf.float32)],
                 ['1', tf.Variable([ 900, 10,  1100, 380], dtype=tf.float32)],
                 ['1', tf.Variable([ 980, 500, 1200, 950], dtype=tf.float32)],
                 ['1', tf.Variable([1250, 300, 1470, 800], dtype=tf.float32)],
                ]

# bounding boxes / detection
detections = [
               ['1', tf.Variable([ 120, 450,  325, 900], dtype=tf.float32), tf.Variable(.67)],
               ['1', tf.Variable([ 340, 500,  580, 970], dtype=tf.float32), tf.Variable(.87)],
               ['1', tf.Variable([ 620, 300,  870, 800], dtype=tf.float32), tf.Variable(.65)],
               ['1', tf.Variable([ 820, 180, 1170, 440], dtype=tf.float32), tf.Variable(.56)],
               ['1', tf.Variable([1000, 550, 1220, 980], dtype=tf.float32), tf.Variable(.89)],
               ['1', tf.Variable([1300, 320, 1450, 750], dtype=tf.float32), tf.Variable(.99)],
               ['1', tf.Variable([ 350, 100,  580, 450], dtype=tf.float32), tf.Variable(.51)],
               ['1', tf.Variable([ 950, 490, 1180, 960], dtype=tf.float32), tf.Variable(.83)],
             ]

In [None]:
# Get the indices of sorted (decreasing order) prediction confidence.
# These indices will be used to prioritize detection.
s_ind = [i for i,_ in sorted(enumerate(detections), key=lambda conf: conf[1][2], reverse=True)]

all_gt_dboxes = tf.stack([gt[1] for gt in ground_truths])

# The IoU threshold.
iou_thres = 0.5

for i, s_index in enumerate(s_ind):
    pred_conf = detections[s_index][2].numpy()

    print('Predicted Bounding Box index: {0}, prob: {1:0.3}'.format(s_index, pred_conf))
    qualified_iou_thres, gt_index = check_iou_threshold(detections[s_index][1], all_gt_dboxes,
                                                        intersection_threshold=iou_thres)
    print('Is any g-truth box has IoU more than {0:.2}: {1}'.format(iou_thres, qualified_iou_thres))
    print('The g-truth that has maximum IoU with pred-box, {0}({1:.2}): {2}'.format(s_index, pred_conf,
                                                                                    gt_index))

    print('------'*10)

In [None]:
def match(sorted_ind, detections, gts_per_image, intersection_threshold, debug=False):
    """
    match detected boxes with ground-truth boxes.

    Parameters:
        sorted_ind: Indicies corresponds to the confidence score in decreasing order.

        detections: Contains info on the detections predicted by the model.

        gts_per_image: A dictionary which maintains all ground truths based on ImageID.

        intersection_threshold: IoU threshold.

        debug: boolean, to print logs. default is false.

    Return value:
        true_positives: A tensor of boolean values
        false_positives: A tensor of boolean values



    """
    # Define true positives and false positives TF tensors.
    true_positives = tf.Variable(tf.zeros((len(sorted_ind)), dtype=tf.float64))
    false_positives = tf.Variable(tf.zeros((len(sorted_ind)), dtype=tf.float64))

    # Prepare the boolean list for further check whether the object for that particular image
    # has been already detected.
    # So we create a dictionary: 'is_obj_already_detected' that maintains this boolean list based on the ImageID.
    is_obj_already_detected = {key: np.full((len(gts_per_image[key]), ), False) for key in gts_per_image}

    # For each box decide if it is a true positive or a false positive.
    for i, box_num in enumerate(sorted_ind):

        # Get the detection (a list containing ImageID, BBox and conf. score) based on sorted_ind.
        det = detections[box_num]

        # Get the detected BBox coordinates.
        predicted_dbox = det[1]

        # Stack all ground truth bboxes based on the ImageID from det.
        all_gt_dboxes = tf.stack([gt[1] for gt in gts_per_image[det[0]]])

        is_pass_threshold, max_iou_index = check_iou_threshold(
            predicted_dbox, all_gt_dboxes, intersection_threshold
        )
        # Note that "true_positives" and "false_positives" are updated with "i" and not with sorted index.
        # This is being done so that higher confident detections should be prioritized.
        if is_pass_threshold and not is_obj_already_detected[det[0]][max_iou_index]:
            true_positives[i].assign(1.0)
            is_obj_already_detected[det[0]][max_iou_index] = True
            if debug:
                print('Predicted box no {} is true positive.'.format(box_num))
        else:
            false_positives[i].assign(1)
            if debug:
                print('Predicted box no {} is false positive.'.format(box_num))

    return true_positives, false_positives

In [None]:
gts_per_image = defaultdict(list)
for gt in ground_truths:
    gts_per_image[gt[0]].append(gt)

t_positive, f_positive = match(sorted_ind=s_ind,
                               detections=detections,
                               gts_per_image=gts_per_image,
                               intersection_threshold=iou_thres,
                               debug=True)

print('\nTrue positives: {}, \nFalse positives: {}'.format(t_positive.numpy(), f_positive.numpy()))

In [None]:
# The core class for running average precision pipeline.
class AveragePrecisionEvaluator:
    def __init__(self, intersection_threshold=0.5, use_07_metric=False, points_number=11, debug=False):

        # Keep track of all ground truths per ImageID.
        self.gts = defaultdict(list)
        # Create 11 recall points, on these points precesion will be interpolated.
        self.recall_levels = tf.cast(tf.linspace(0., 1., points_number), dtype=tf.float64)
        # Predefined intersection threshold.
        self.intersection_threshold = intersection_threshold
        # Init recalls and precisions to fill them with approprite values in run_ap_calculation method.
        self.recalls = None
        self.precisions = None
        # Init average precision for futher computation
        self.average_precision = 0.
        self.true_positives = None
        self.false_positives = None
        self.false_negatives = None
        self.use_07_metric = use_07_metric
        self.debug = debug

        self.precision_levels = tf.Variable(tf.zeros((points_number), dtype=tf.float64))


    # calculate ap
    def run_ap_calculation(self, detections, ground_truths):
        """
            Initiate AP calculation process

            Parameters:

                detections: Contains info on the detections predicted by the model.
                            Each detection is a list of ImageID(str), BBox(tf.Variable)
                            and conf. score(tf.Variable)

                ground_truths: Contains info on the ground truth objects.
                               Each ground truth is a list of ImageID(str), BBox(tf.Variable)

            Return value:
                VOC mean average precision
        """
        # Sort predicted_dboxes based on their confidence scores
        sorted_ind = [i for i,_ in sorted(enumerate(detections), key=lambda conf: conf[1][2], reverse=True)]

        # Initialize true positives and false positives TF tensors with 0 for each detected object.
        true_positives = tf.Variable(tf.zeros((len(sorted_ind)), dtype=tf.float64))
        false_positives = tf.Variable(tf.zeros((len(sorted_ind)), dtype=tf.float64))

        if len(ground_truths) == 0:
            false_positives = tf.Variable(tf.ones((len(sorted_ind)), dtype=tf.float64))

        else:
            for g in ground_truths:
                self.gts[g[0]].append(g)
            # Get true positives and false positives TF tensors.
            true_positives, false_positives = match(
                sorted_ind, detections, self.gts, self.intersection_threshold
            )

        # Get true positives cumulative sum to obtain its value.
        self.true_positives = tf.math.cumsum(true_positives, axis=0)

        # Get false positives cumulative sum to obtain its value.
        self.false_positives = tf.math.cumsum(false_positives, axis=0)


        # Calculate precisions.
        self.precisions = self.true_positives / (
            self.false_positives + self.true_positives + np.finfo(float).eps
        )

        # Calculate recalls.
        self.recalls = self.true_positives / len(ground_truths)

        if self.debug:
            print("\n{0}run_ap_calculation{0}".format('--'*20))
            print('true positives and false positives returned from the "match" method:'
                  '\ntrue_positives:\t\t{}\nfalse_positives:\t{}'.format(true_positives.numpy(),
                                                                       false_positives.numpy()))
            print('\ntrue positives and false positives after cumulative sum:'
                  '\nself.true_positives:\t{}\nself.false_positives:\t{}'.format(self.true_positives,
                                                                                 self.false_positives))

            print('\nprecision calculated using true positive and false positive:\n'
                  'self.precisions:\t{}'.format(self.precisions))
            print('\nrecall calculated using true positive and false negative:\n'
                  'self.recalls:\t\t{}'.format(self.recalls))

        # Return VOC mAP value.
        return self.get_voc_ap()

    # calculate VOC mAP
    def get_voc_ap(self):
        """
            Evaluates VOC Mean Average Precision

        """
        # Define average precision tensor.
        self.average_precision = tf.Variable(0, dtype=tf.float64)

        # Check whether recalls and precisions values were recalculated.
        if self.precisions is None or self.recalls is None:
            self.average_precision = tf.Variable(np.nan)
            return self.average_precision

        if self.use_07_metric:

            # Iterate over recall levels.
            for i, recall_level in enumerate(self.recall_levels):
                # Get tensor of boolean values where recalls are greater or equal to recall level.
                recalls_check = tf.math.greater_equal(self.recalls, recall_level)

                # Check the sum of True values from recalls_check tensor.
                if tf.math.reduce_sum(tf.cast(recalls_check, tf.float64)) == 0.:
                    val = tf.Variable(0., dtype=tf.float64)
                else:
                    # Precision needs to be interpolated at different points of recall.
                    # Precision at any point of recall will be equal to maximum precision
                    # among all precisions correspond to all recalls greater than or equal to the recall point.
                    val = tf.math.reduce_max(self.precisions[recalls_check])

                self.precision_levels[i].assign(val)

                # Update average_precision with value.
                self.average_precision = self.average_precision + val

            # Get mean average precision.
            self.average_precision = self.average_precision / float(self.recall_levels.shape[0])


            print("\n{0}get_voc_ap_11_points{0}".format('--'*21))
            print('\n{} recall points:\n{}'.format(len(self.recall_levels), self.recall_levels))
            print('\n{}-points interpolated precision:\n{}'.format(len(self.recall_levels),
                                                                   self.precision_levels.numpy()))
            print('\n{}-points average precision:\t{}'.format(len(self.recall_levels),
                                                              self.average_precision))


        # Compute AP using all-points interpolation.
        else:

            mean_recall = tf.concat([[0.], self.recalls, [1.]], axis=-1)
            mean_precision = tf.concat([[0.], self.precisions, [0.]], axis=-1)

            # Compute the precision envelope.
            for i in range(mean_precision.shape[0] - 1, 0, -1):
                changed_precision = tf.maximum(mean_precision[i - 1], mean_precision[i])
                mean_precision = tf.tensor_scatter_nd_update(mean_precision, [[i-1]], [changed_precision])

            # To calculate area under PR curve, look for points
            # where X axis (recall) changes value.
            idx = tf.squeeze(tf.where(mean_recall[1:] != mean_recall[:-1]), axis=-1)


            self.average_precision= tf.math.reduce_sum(
                                        tf.multiply((tf.gather(mean_recall,idx+1) - tf.gather(mean_recall,idx)),
                                            tf.gather(mean_precision, idx+1)))


            self.recall_levels = mean_recall[:-1]
            self.precision_levels = mean_precision[:-1]


            print("\n{0}get_voc_ap_all_points{0}".format('--'*21))
            print('\nRecall points:\n{}'.format(self.recall_levels))
            print('\nAll-points interpolated precision:\n{}'.format(self.precision_levels.numpy()))
            print('\nAll-points average precision:\t{}'.format(self.average_precision.numpy()))


        return self.average_precision

    # Plot precision recall curve.
    def precision_recall_curve(self):
        """
            Precision-recall curve visualization for specified class

            Parameters:
                experiment_name: title of the running experiment
        """
        # Plot the curve.
        fig, ax = plt.subplots(figsize=(15,10))
        ax.set_ylim([-0.05, 1.05])

        if self.use_07_metric:

            ax.plot(self.recalls, self.precisions, label='precision-recall')
            ax.plot(self.recall_levels.numpy(), self.precision_levels.numpy(), '*',
                     label='{}-points interpolation'.format(len(self.recall_levels)))
            ax.set_xlabel('recall')
            ax.set_ylabel('precision')
            ax.legend(loc='best')

            plt.show(block=block_plot)

        else:
            recalls = self.recall_levels.numpy()
            interp_prec = self.precision_levels.numpy()

            ids = np.where(recalls[1:]!=recalls[:-1])[0]
            unique_precs = np.unique(interp_prec[ids+1])[::-1]

            recall_st = []
            recall_end = []

            for un_pr in unique_precs:
                id_pr = np.where(np.isclose(interp_prec,un_pr))[0]
                recall_st.append(recalls[id_pr][0])
                recall_end.append(recalls[id_pr][-1])

            ax.plot(self.recalls, self.precisions, 'b', linewidth=3, label='precision-recall')
            ax.plot(self.recall_levels.numpy(), self.precision_levels.numpy(), 'r--', linewidth=3,
                     label='All-points interpolation')

            for i in range(len(unique_precs)):

                width   = recall_end[i]-recall_st[i]
                height  = unique_precs[i]

                label   = 'A{}'.format(i+1)
                x_label = recall_st[i] + width/2.5
                y_label = height/2

                ax.add_patch(
                        patches.Rectangle(
                            (recall_st[i], 0), width, height,
                            fill=False, color='g',linewidth = 2
                        )
                    )

                ax.text(x_label, y_label, label, color='g', fontsize=30,
                                bbox=dict(facecolor='none', edgecolor='g'))




            ax.set_xlabel('recall')
            ax.set_ylabel('precision')
            ax.legend(loc='best')

            plt.show(block=block_plot)

In [None]:
ap_evaluator = AveragePrecisionEvaluator(use_07_metric=True,debug=True)
_ = ap_evaluator.run_ap_calculation(detections, ground_truths)
ap_evaluator.precision_recall_curve()

In [None]:
# All point interpolated precision

In [None]:
ap_evaluator = AveragePrecisionEvaluator(debug=True)
ap_evaluator.run_ap_calculation(detections, ground_truths)
ap_evaluator.precision_recall_curve()

In [None]:
groundTruths =  [
                 ['1', tf.Variable([25,  16,  63,  72],  dtype=tf.float32)],
                 ['1', tf.Variable([129, 123, 170, 185], dtype=tf.float32)],
                 ['2', tf.Variable([123,  11, 166,  66], dtype=tf.float32)],
                 ['2', tf.Variable([ 38, 132,  97, 177], dtype=tf.float32)],
                 ['3', tf.Variable([ 16,  14,  51,  62], dtype=tf.float32)],
                 ['3', tf.Variable([123,  30, 172,  74], dtype=tf.float32)],
                 ['3', tf.Variable([ 99, 139, 146, 186], dtype=tf.float32)],
                 ['4', tf.Variable([ 53,  42,  93,  94], dtype=tf.float32)],
                 ['4', tf.Variable([154,  43, 185,  77], dtype=tf.float32)],
                 ['5', tf.Variable([ 59,  31, 103,  82], dtype=tf.float32)],
                 ['5', tf.Variable([ 48, 128,  82, 180], dtype=tf.float32)],
                 ['6', tf.Variable([ 36,  89,  88, 165], dtype=tf.float32)],
                 ['6', tf.Variable([ 62,  58, 106, 125], dtype=tf.float32)],
                 ['7', tf.Variable([ 28,  31,  83,  94], dtype=tf.float32)],
                 ['7', tf.Variable([ 58,  67, 108, 125], dtype=tf.float32)]
                ]

detections = [
                 ['1', tf.Variable([  5,  67,  36, 115], dtype=tf.float32), tf.Variable(.88)],
                 ['1', tf.Variable([119, 111, 159, 178], dtype=tf.float32), tf.Variable(.70)],
                 ['1', tf.Variable([124,   9, 173,  76], dtype=tf.float32), tf.Variable(.80)],
                 ['2', tf.Variable([ 64, 111, 128, 169], dtype=tf.float32), tf.Variable(.71)],
                 ['2', tf.Variable([ 26, 140,  86, 187], dtype=tf.float32), tf.Variable(.54)],
                 ['2', tf.Variable([ 19,  18,  62,  53], dtype=tf.float32), tf.Variable(.74)],
                 ['3', tf.Variable([109,  15, 186,  54], dtype=tf.float32), tf.Variable(.18)],
                 ['3', tf.Variable([ 86,  63, 132, 108], dtype=tf.float32), tf.Variable(.67)],
                 ['3', tf.Variable([160,  62, 196, 115], dtype=tf.float32), tf.Variable(.38)],
                 ['3', tf.Variable([105, 131, 152, 178], dtype=tf.float32), tf.Variable(.91)],
                 ['3', tf.Variable([ 18, 148,  58, 192], dtype=tf.float32), tf.Variable(.44)],
                 ['4', tf.Variable([ 83,  28, 111,  54], dtype=tf.float32), tf.Variable(.35)],
                 ['4', tf.Variable([ 28,  68,  70, 135], dtype=tf.float32), tf.Variable(.78)],
                 ['4', tf.Variable([ 87,  89, 112, 128], dtype=tf.float32), tf.Variable(.45)],
                 ['4', tf.Variable([ 10, 155,  70, 181], dtype=tf.float32), tf.Variable(.14)],
                 ['5', tf.Variable([ 50,  38,  78,  84], dtype=tf.float32), tf.Variable(.62)],
                 ['5', tf.Variable([ 95,  11, 148,  39], dtype=tf.float32), tf.Variable(.44)],
                 ['5', tf.Variable([ 29, 131, 101, 160], dtype=tf.float32), tf.Variable(.95)],
                 ['5', tf.Variable([ 29, 163, 101, 192], dtype=tf.float32), tf.Variable(.23)],
                 ['6', tf.Variable([ 43,  48, 117,  86], dtype=tf.float32), tf.Variable(.45)],
                 ['6', tf.Variable([ 17, 155,  46, 190], dtype=tf.float32), tf.Variable(.84)],
                 ['6', tf.Variable([ 95, 110, 120, 152], dtype=tf.float32), tf.Variable(.43)],
                 ['7', tf.Variable([ 16,  20, 117, 108], dtype=tf.float32), tf.Variable(.48)],
                 ['7', tf.Variable([ 33, 116,  70, 165], dtype=tf.float32), tf.Variable(.95)]
                ]

In [None]:
ap_evaluator = AveragePrecisionEvaluator(intersection_threshold=0.3, debug=True)
ap_evaluator.run_ap_calculation(detections,groundTruths)
ap_evaluator.precision_recall_curve()