This code computes the class relevance maps for individual images through different base learners and performs an ensemble of the localized ROI, computes the IOU with the ground truth and finally measures the mean average precision across different IOU thresholds.

In [None]:
#loading libraries
from __future__ import print_function

from keras.applications.vgg16 import VGG16
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.models import Sequential, Model, Input, load_model
from keras.layers import Conv2D, Dense, MaxPooling2D, SeparableConv2D, BatchNormalization, ZeroPadding2D, Flatten,Average, BatchNormalization, Dropout
from keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D

from keras import backend as K
from keras.models import *
import keras.backend as K

from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

from scipy import ndimage
from skimage.measure import regionprops
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image

import numpy as np
import math
import cv2
import os
import glob
import h5py
import csv
import json

from collections import namedtuple

In [None]:
#custom definition functions to create JSON files for computing mean average precision

# create json format
def serializeGT(ImgName, g_coords):
    return {ImgName:  g_coords,}

def serializeCRM(ImgName, b_coords, b_scores):
    return {ImgName: { 
            'boxes': b_coords,
            'scores': b_scores,}
        }

In [None]:
#computing IOU
def bb_intersection_over_union(boxA, boxB):
# determine the (x, y)-coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
 
    # compute the area of intersection rectangle
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
 
    # compute the area of both the prediction and ground-truth
    # rectangles
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
 
    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = interArea / float(boxAArea + boxBArea - interArea)
 
    # return the intersection over union value
    return iou

In [None]:
#get the output layer of the individual base-nearners

def get_output_layer(model, layer_name):
    try:
        # get the symbolic outputs of each "key" layer (we gave them unique names).
        layer_dict = dict([(layer.name, layer) for layer in model.layers])
        layer = layer_dict[layer_name]
        return layer
    except Exception as e:
        raise Exception('Error from get_output_layer(): ' + str(e))

In [None]:
#function to generate the CRM bounding box for a given CRM threshold
def generate_BoundingBox(aCRM, threshold):
    try:
        labeled_CRM, nr_objects = ndimage.label(aCRM > threshold)
        props = regionprops(labeled_CRM)
        return props
    except Exception as e:
        raise Exception('Error from generate_BoundingBox(): ' + str(e))

In [None]:
# Confidence score = highest heatmap value in box * classification score
def Calculate_Confidence_Score(aCRM, bboxes, outScores):
    try:
        c_scores = []
        for a_b in bboxes:
            a_bbox = aCRM[a_b[1]:a_b[3], a_b[0]:a_b[2]]       
            a_score = np.max(a_bbox)
            b_score = outScores[0][0]
            c_scores.append(np.max(a_bbox) * outScores[0][0])
        
        return np.array(c_scores)
        
    except Exception as e:
        raise Exception('Error from Calculate_Confidence_Score(): ' + str(e))

In [None]:
#function to compute the CRM

def Generate_CRM_2Class(thisModel, thisImg_path, Threshold):             
    try:
        # preprocess input image      
        width, height = thisModel.input.shape[1:3].as_list()
        original_img = cv2.imread(thisImg_path) 
        resized_original_image = cv2.resize(original_img, (width,height))        
    
        input_image = img_to_array(resized_original_image)
        input_image = np.array(input_image, dtype="float") /255.0       
        input_image = input_image.reshape((1,) + input_image.shape)
    
        class_weights = thisModel.layers[-1].get_weights()[0]
    
        get_output = K.function([thisModel.layers[0].input], [thisModel.layers[-4].output, thisModel.layers[-1].output])
        [conv_outputs, predictions] = get_output([input_image])
        conv_outputs = conv_outputs[0, :, :, :]     
        final_output = predictions   
        
        wf0 = np.zeros(dtype = np.float32, shape = conv_outputs.shape[0:2])    
        wf1 = np.zeros(dtype = np.float32, shape = conv_outputs.shape[0:2])    
    
        for i, w in enumerate(class_weights[:, 0]):     
            wf0 += w * conv_outputs[:, :, i]           
        S0 = np.sum(wf0)           # score at node 0 in the final output layer
        for i, w in enumerate(class_weights[:, 1]):     
            wf1 += w * conv_outputs[:, :, i]             
        S1 = np.sum(wf1)           # score at node 1 in the final output layer
    
        #Calculate incremental MSE
        iMSE0 = np.zeros(dtype = np.float32, shape = conv_outputs.shape[0:2])
        iMSE1 = np.zeros(dtype = np.float32, shape = conv_outputs.shape[0:2])
    
        row, col = wf0.shape
        for x in range (row):
                for y in range (col):
                        tmp0 = np.array(wf0)
                        tmp0[x,y] = 0.                   # remove activation at the spatial location (x,y)
                        iMSE0[x,y] = (S0 - np.sum(tmp0)) ** 2
    
                        tmp1 = np.array(wf1)
                        tmp1[x,y] = 0.                  
                        iMSE1[x,y] = (S1 - np.sum(tmp1)) ** 2
         
      
        Final_crm = iMSE0 + iMSE1                         # consider both positive and negative contribution
    
        Final_crm /= np.max(Final_crm)                                  # normalize
        resized_Final_crm = cv2.resize(Final_crm, (height, width))      # upscaling to original image size

        The_CRM = np.array(resized_Final_crm)
        The_CRM[np.where(resized_Final_crm < Threshold)] = 0.           # clean-up (remove data below threshold)

        return[resized_original_image, final_output, resized_Final_crm, The_CRM]
    except Exception as e:
        raise Exception('Error from Generate_CRM_2Class(): ' + str(e))

In [None]:
#genrate bounding boxes
def generate_bBox(thisCAM_img, Threshold):             
    try:
        bboxes = []
        TheProps = generate_BoundingBox(thisCAM_img, Threshold)
        for b in TheProps:
            bbox = b.bbox
            xs = bbox[1]
            ys = bbox[0]
            w = bbox[3] - bbox[1]
            h = bbox[2] - bbox[0]

            bboxes.append([bbox[1], bbox[0], bbox[3], bbox[2]])         

        CRM_bboxes = np.vstack(bboxes)
        
        return CRM_bboxes

    except Exception as e:
        raise Exception('Error from generate_bBox(): ' + str(e))

In [None]:
#calculate IOU of the ground truth and precited boudning box
def calculate_IOU (GT_bbox, a_bbox):
    try:
        iou = 0.
        for c_b in a_bbox:
            for g_b in GT_bbox:
                iou += bb_intersection_over_union(g_b, c_b)
        iou /= float(GT_bbox.shape[0])           # get average 

        return iou

    except Exception as e:
        raise Exception('Error from calculate_IOU(): ' + str(e))


In [None]:
#load the model and change the layer names of the VGG16 model in the ensemble to avoid similar name issues

def VGG16_DL(model_input, num_classes):
    try:
        vgg16_cnn = VGG16(weights='imagenet', include_top=False, input_tensor=model_input)
        vgg16_cnn = Model(inputs=vgg16_cnn.input, outputs=vgg16_cnn.get_layer('block5_conv3').output)
        x = vgg16_cnn.output
        x = GlobalAveragePooling2D()(x)
        x = Dropout(0.5)(x)
        predictions = Dense(num_classes, activation='softmax')(x)
        model = Model(inputs=vgg16_cnn.input, outputs=predictions, name='vgg16_custom')
        model.get_layer(name='block1_conv1').name='block1_conv1VGG'  
        model.get_layer(name='block1_conv2').name='block1_conv2VGG' 
        model.get_layer(name='block2_conv1').name='block2_conv1VGG' 
        model.get_layer(name='block2_conv2').name='block2_conv2VGG' 
        model.get_layer(name='block3_conv1').name='block3_conv1VGG' 
        model.get_layer(name='block3_conv2').name='block3_conv2VGG' 
        model.get_layer(name='block3_conv3').name='block3_conv3VGG' 
        model.get_layer(name='block4_conv1').name='block4_conv1VGG' 
        model.get_layer(name='block4_conv2').name='block4_conv2VGG' 
        model.get_layer(name='block4_conv3').name='block4_conv3VGG' 
        model.get_layer(name='block5_conv1').name='block5_conv1VGG' 
        model.get_layer(name='block5_conv2').name='block5_conv2VGG' 
        model.get_layer(name='block5_conv3').name='block5_conv3VGG' 
        model.get_layer(name='block1_pool').name='block1_poolVGG' 
        model.get_layer(name='block2_pool').name='block2_poolVGG' 
        model.get_layer(name='block3_pool').name='block3_poolVGG' 
        model.get_layer(name='block4_pool').name='block4_poolVGG' 
        return model
    except Exception as e:
        raise Exception('Error from VGG16_DL(): ' + str(e))

In [None]:
#load models and compute CRM and create JSON files

def Load_Pretrained_Model(thisModel_path, thisModel_name):
    try:
        # load weights into new model
        loaded_model = load_model(os.path.join(thisModel_path, thisModel_name + '.h5'))
        return(loaded_model)
    except Exception as e:
        raise Exception('Error from Load_Pretrained_Model(): ' + str(e))
try:
    # Load Keras model
    print("Loading a total of 5 DL models...") #ensemble of five base-learners
    try:
        img_width, img_height = 256, 256 #input dimensions
        input_shape = (img_width, img_height, 3)
        model_input = Input(shape=input_shape)
        print(model_input)

        try:
            print("1) Loading VGG16-based model...") #VGG16
            VGG16_M = VGG16_DL(model_input, 2)
            VGG16_M.load_weights(os.path.join('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/corase models and codes/with lung segmented chexpert/top-7_finetuned_models', 'vgg16_abnormality with coarse model.10-0.8923' + '.h5'))
        except Exception as e:
            raise Exception('Error in loading VGG16-based model: ' + str(e))

        try:
            print("3) Loading nasnet-based model...") #NASNET
            nasnet_M = Load_Pretrained_Model('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/corase models and codes/with lung segmented chexpert/top-7_finetuned_models', 'nasnet_abnormality_with coarse model.04-0.8734')
        except Exception as e:
            raise Exception('Error in loading nasnet-based model: ' + str(e))
        
        try:
            print("4) Loading xception-based model...") #Xception
            xception_M = Load_Pretrained_Model('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/corase models and codes/with lung segmented chexpert/top-7_finetuned_models', 'xception_abnormality_with coarse model.12-0.8765')
        except Exception as e:
            raise Exception('Error in loading xception-based model: ' + str(e))

        try:
            print("5) Loading InceptionV3-based model...") #InceptionV-3
            InceptionV3_M = Load_Pretrained_Model('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/corase models and codes/with lung segmented chexpert/top-7_finetuned_models', 'inceptionv3_abnormality_with coarse model.03-0.8825')
        except Exception as e:
            raise Exception('Error in loading InceptionV3-based model: ' + str(e))

        try:
            print("6) Loading mobile-based model...") #MobileNet
            mobile_M = Load_Pretrained_Model('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/corase models and codes/with lung segmented chexpert/top-7_finetuned_models', 'mobile_abnormality_with coarse model.07-0.8799')
        except Exception as e:
            raise Exception('Error in loading mobile-based model: ' + str(e))

    except Exception as e:
        raise Exception('Error in loading CNN models: ' + str(e))
  
    try: #folder where the abnromal test data is stored in the PNG format
        TestFolder_path = 'C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/fine_data_kaggle/abnormality_classifier_binary/cropped/abnormality_aug/test/abnormal/'
        print("Input image folder: " + TestFolder_path)
        Test_image_path = TestFolder_path + '*.png'
        Test_image_Set = glob.glob(Test_image_path)
    except Exception as e:
        raise Exception('Error in retrieving images from dataset folder: ' + str(e))
    
    # Set-up output folders
    try:
        Visualization_path   = 'C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/visualization/'
        
        #create folders to store the results of CRM and corresponding bounding box computations
        #here we compute CRMs with no threshold and with different thresholds (0.1 to 0.8)
        #we see which CRM threshold gives the best values for IOU computation
        aCRM_output_path      = Visualization_path + 'aCRM'       
        aCRM_bbox_output_path = Visualization_path + 'aCRM_bbox'        
        tCRM_output_path      = Visualization_path + 'tCRM'      
        tCRM_bbox_output_path = Visualization_path + 'tCRM_bbox'
        
        #file that stores the classification results and IOU computation
        f1 = open(os.path.join(Visualization_path, 'y_ClassificationResult.txt'), 'w')

        print("CRM output folder: " + Visualization_path)
    except Exception as e:
        raise Exception('Error in setting up output folders: ' + str(e))

    # Load groundtruth information from csv file
    try:
        cf = open('C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/bounding_box_rsna_256.csv', 'r')
    except Exception as e:
        raise Exception('Error in loading csv file: ' + str(e))

    #CRM thresholds
    Th_set = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]

    avg_aIOU = np.zeros(len(Th_set))       
    avg_tIOU = np.zeros(len(Th_set))       

    bb_data = [dict() for x in range(len(Th_set))]
    tb_data = [dict() for x in range(len(Th_set))]
    gt_data = [dict() for x in range(len(Th_set))]

    i = 0
    err_cnt = 0
    for Img_filename in Test_image_Set:

        fname = os.path.basename(Img_filename)
        name_only, ext_only = os.path.splitext(fname)

        aCRM_ImgFile    = name_only           # for saving CRM heatmap image
        aBbox_file      = name_only           # for saving bbox image

        tCRM_ImgFile    = name_only           # for saving tCRM heatmap image
        tBbox_file      = name_only           # for saving bbox image

        # Check if a given test image has a ground-truth information (bbox) in the csv file. if not, skip the remaining process
        bbox_set = []
        FileFound = False
        BBoxFound = False
        k = 0
        cf.seek(0)
        CSVreader = csv.DictReader(cf)        
        for row in CSVreader:           # 'for loop' for dealing with the case that a given image has multiple bounding boxes (actually very common)
            if fname in row['patientId']:
                FileFound = True        # OK found image file name in the CSV, next check if it has ground-truth bbox info.

                if row['x_dis'] and row['y_dis'] and row['width_dis'] and row['height_dis']:
                    bbox_set.append([int(row['x_dis']), int(row['y_dis']), int(row['x_dis'])+int(row['width_dis']), int(row['y_dis'])+int(row['height_dis'])])
                    BBoxFound = True
                    continue            # continue 'for loop' to see if this image has multiple bboxes
                else:
                    break
            k += 1

                                  
        if FileFound:
            if BBoxFound:
                print('[' + str(i) + '] ' + fname + ': Found from ' + str(k) + 'th row in the dictionary')
            else:
                print('[' + str(i) + '] ' + fname + ': Found from ' + str(k) + 'th row in the dictionary (BBox info missing)')
                continue                                # we only care about the image having groundtrugh bbox information
        else:
            print('[' + str(i) + '] ' + fname + ': Not Found from the dictionary')
            continue

        groundtruth_bbox_set = np.vstack(bbox_set)      # a list of ground-truth bbox information
        

        # Generate CRM based on the given DL model (aCRM: CRM as is, tCRM: CRM with thresholding)
        InImage1, OutScores1, aCRM_Img1, tCRM_Img1 = Generate_CRM_2Class(VGG16_M,             Img_filename, 0.1) #CRM threshold is set to 0.1 here
        InImage3, OutScores3, aCRM_Img3, tCRM_Img3 = Generate_CRM_2Class(nasnet_M,            Img_filename, 0.1)
        InImage4, OutScores4, aCRM_Img4, tCRM_Img4 = Generate_CRM_2Class(xception_M,          Img_filename, 0.1)
        InImage5, OutScores5, aCRM_Img5, tCRM_Img5 = Generate_CRM_2Class(InceptionV3_M,       Img_filename, 0.1)
        InImage6, OutScores6, aCRM_Img6, tCRM_Img6 = Generate_CRM_2Class(mobile_M,            Img_filename, 0.1)

        OutScores_F = (OutScores1 + OutScores3 + OutScores4 + OutScores5 + OutScores6)/5.0 #score normalization

        eval_result = "{}:    [{:.3f},  {:.3f}]".format(fname, OutScores_F[0][0], OutScores_F[0][1])

        target = OutScores_F[0][0]
        TP = True                                                                                                                            
        for other_output in OutScores_F[0]:
            if target < other_output:
                TP = False
                break
                    
        if TP:          
            aCRM_ImgFile += "_aCRM.png"
            aBbox_file += "_aBbox.png"

            tCRM_ImgFile += "_tCRM.png"
            tBbox_file  += "_tBbox.png"
            eval_result += "      | "
        else:
            err_cnt += 1

            aCRM_ImgFile += "_aCRM_err.png"
            aBbox_file += "_aBbox_err.png"

            tCRM_ImgFile += "_tCRM_err.png"
            tBbox_file  += "_tBbox_err.png"
            eval_result += "  ERR\n"           

        # CRM image (does NOT apply thresholding to each individual CRM image)
        aCRM_Img_F = aCRM_Img1 + aCRM_Img3 + aCRM_Img4 + aCRM_Img5 + aCRM_Img6 
        aCRM_Img_F /= np.max(aCRM_Img_F)
        
        # CRM image (remove the area lower than a given threshold)
        tCRM_Img_F = tCRM_Img1 + tCRM_Img3 + tCRM_Img4 + tCRM_Img5 + tCRM_Img6
        tCRM_Img_F /= np.max(tCRM_Img_F)
        
        
        for idx in range(len(Th_set)):
        
            aBBox_coord = generate_bBox(aCRM_Img_F, Th_set[idx])
            aBBox_score = Calculate_Confidence_Score(aCRM_Img_F, aBBox_coord, OutScores_F)

            aHeatmap = cv2.applyColorMap(np.uint8(255*aCRM_Img_F), cv2.COLORMAP_JET)
            aHeatmap[np.where(aCRM_Img_F < Th_set[idx])] = 0            
            aImg = np.float32(aHeatmap) + np.float32(InImage1)          
            aImg = 255 * aImg / np.max(aImg)


            tBBox_coord = generate_bBox(tCRM_Img_F, Th_set[idx])
            tBBox_score = Calculate_Confidence_Score(tCRM_Img_F, tBBox_coord, OutScores_F)

            tHeatmap = cv2.applyColorMap(np.uint8(255*tCRM_Img_F), cv2.COLORMAP_JET)
            tHeatmap[np.where(tCRM_Img_F < Th_set[idx])] = 0            
            tImg = np.float32(tHeatmap) + np.float32(InImage1)          
            tImg = 255 * tImg / np.max(tImg)

           
            a_path      = aCRM_output_path + str(Th_set[idx]) + '/'
            if not os.path.exists(a_path):
                os.makedirs(a_path)              
            cv2.imwrite(os.path.join(a_path, aCRM_ImgFile), aImg)

            t_path      = tCRM_output_path + str(Th_set[idx]) + '/'
            if not os.path.exists(t_path):
                os.makedirs(t_path)              
            cv2.imwrite(os.path.join(t_path, tCRM_ImgFile), tImg)

            aIOU = calculate_IOU (groundtruth_bbox_set, aBBox_coord)
            tIOU = calculate_IOU (groundtruth_bbox_set, tBBox_coord)
            eval_result += "aIOU_" + str(Th_set[idx]) + ":  {:.4f} | ".format (aIOU)
            eval_result += "tIOU_" + str(Th_set[idx]) + ":  {:.4f} | ".format (tIOU)
 
            avg_aIOU[idx] += aIOU        
            avg_tIOU[idx] += tIOU        

            aOutImage = np.copy(aImg)
            for c_b in aBBox_coord:
                cv2.rectangle(aOutImage, (c_b[0], c_b[1]), (c_b[2], c_b[3]), (255,0,0), 2)
            for g_b in groundtruth_bbox_set:
                cv2.rectangle(aOutImage, (g_b[0], g_b[1]), (g_b[2], g_b[3]), (0,255,0), 2)

            ab_path      = aCRM_bbox_output_path + str(Th_set[idx]) + '/'
            if not os.path.exists(ab_path):
                os.makedirs(ab_path)              
            cv2.putText(aOutImage, "IoU: {:.4f}".format(aIOU), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)    
            cv2.imwrite(os.path.join(ab_path, aBbox_file), aOutImage)

            tOutImage = np.copy(tImg)
            for c_b in tBBox_coord:
                cv2.rectangle(tOutImage, (c_b[0], c_b[1]), (c_b[2], c_b[3]), (255,0,0), 2)
            for g_b in groundtruth_bbox_set:
                cv2.rectangle(tOutImage, (g_b[0], g_b[1]), (g_b[2], g_b[3]), (0,255,0), 2)

            tb_path      = tCRM_bbox_output_path + str(Th_set[idx]) + '/'
            if not os.path.exists(tb_path):
                os.makedirs(tb_path)              
            cv2.putText(tOutImage, "IoU: {:.4f}".format(tIOU), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)    
            cv2.imwrite(os.path.join(tb_path, tBbox_file), tOutImage)

        
            b  = serializeCRM(fname, aBBox_coord.tolist(), aBBox_score.tolist())
            tb = serializeCRM(fname, tBBox_coord.tolist(), tBBox_score.tolist())
            g  = serializeGT(fname, groundtruth_bbox_set.tolist())           
            
            if len(bb_data[idx]) > 0:
                bb_data[idx].update(b)
            else:
                bb_data[idx] = dict(b)           # create dictionary

            if len(tb_data[idx]) > 0:
                tb_data[idx].update(tb)
            else:
                tb_data[idx] = dict(tb)           # create dictionary

            if len(gt_data[idx]) > 0:
                gt_data[idx].update(g)
            else:
                gt_data[idx] = dict(g)           # create dictionary
     
        f1.write(eval_result + '\n')

        i += 1
        print (i)

    for Th in range(len(Th_set)):
        with open(os.path.join(Visualization_path, 'Ensemble5_BBox_predicted_' + str(Th_set[Th]) + '.json'), 'w') as outfile:
            json.dump(bb_data[Th], outfile)
        with open(os.path.join(Visualization_path, 'Ensemble5_tBBox_predicted_' + str(Th_set[Th]) + '.json'), 'w') as outfile:
            json.dump(tb_data[Th], outfile)
        with open(os.path.join(Visualization_path, 'Ensemble5_BBox_groundtruth_' + str(Th_set[Th]) + '.json'), 'w') as outfile:
            json.dump(gt_data[Th], outfile)


    avg_aIOU /= float(i-err_cnt)
    avg_tIOU /= float(i-err_cnt)
    f1.write("\n\nTotal Errors: {:d}\n".format(err_cnt))
    f1.write("No. of testing images (having groundtruth bbox): {:d}\n".format(i))
    f1.write("Accuracy: {:.5f}\n".format((float(i-err_cnt)/float(i))*100.))

    for ix in range(len(avg_aIOU)):
        f1.write("avrage aIOU_" + str(Th_set[ix]) + ": {:.4f}  |".format(avg_aIOU[ix]))                                            
        f1.write("avrage tIOU_" + str(Th_set[ix]) + ": {:.4f}\n".format(avg_tIOU[ix]))                                            
    f1.close()
     
    print('done')

except Exception as e:
    print (str(e))

The computed classification result, bounding boxes for different CRM thresholds, the JSON converted files are stored in the results folder


In [None]:
#computing mean average precision using the JSON files

In [None]:
#import libraries

from __future__ import absolute_import, division, print_function

from copy import deepcopy
import json
import glob
import os
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

sns.set_style('white')
sns.set_context('poster')

COLORS = [
    '#1f77b4', '#aec7e8', '#ff7f0e', '#ffbb78', '#2ca02c',
    '#98df8a', '#d62728', '#ff9896', '#9467bd', '#c5b0d5',
    '#8c564b', '#c49c94', '#e377c2', '#f7b6d2', '#7f7f7f',
    '#c7c7c7', '#bcbd22', '#dbdb8d', '#17becf', '#9edae5']

In [None]:
#calculate individual IOUs using the ppredicted and ground truth BB for the CRM threshold of 0.1
#that gave the best IOU values

def calc_iou_individual(pred_box, gt_box):
    """Calculate IoU of single predicted and ground truth box

    Args:
        pred_box (list of floats): location of predicted object as [xmin, ymin, xmax, ymax]
        gt_box (list of floats): location of ground truth object as [xmin, ymin, xmax, ymax]

    Returns:
        float: value of the IoU for the two boxes.

    Raises:
        AssertionError: if the box is obviously malformed
    """
    x1_t, y1_t, x2_t, y2_t = gt_box
    x1_p, y1_p, x2_p, y2_p = pred_box

    if (x1_p > x2_p) or (y1_p > y2_p):
        raise AssertionError(
            "Prediction box is malformed? pred box: {}".format(pred_box))
    if (x1_t > x2_t) or (y1_t > y2_t):
        raise AssertionError(
            "Ground Truth box is malformed? true box: {}".format(gt_box))

    if (x2_t < x1_p or x2_p < x1_t or y2_t < y1_p or y2_p < y1_t):
        return 0.0

    far_x = np.min([x2_t, x2_p])
    near_x = np.max([x1_t, x1_p])
    far_y = np.min([y2_t, y2_p])
    near_y = np.max([y1_t, y1_p])

    inter_area = (far_x - near_x + 1) * (far_y - near_y + 1)
    true_box_area = (x2_t - x1_t + 1) * (y2_t - y1_t + 1)
    pred_box_area = (x2_p - x1_p + 1) * (y2_p - y1_p + 1)
    iou = inter_area / (true_box_area + pred_box_area - inter_area)
    return iou

In [None]:
#get single image results
def get_single_image_results(gt_boxes, pred_boxes, iou_thr):
    """Calculates number of true_pos, false_pos, false_neg from single batch of boxes.

    Args:
        gt_boxes (list of list of floats): list of locations of ground truth objects as [xmin, ymin, xmax, ymax]
        pred_boxes (dict): dict of dicts of 'boxes' (formatted like `gt_boxes`) and 'scores'
        iou_thr (float): value of IoU to consider as threshold for a true prediction.

    Returns:
        dict: true positives (int), false positives (int), false negatives (int)
    """

    all_pred_indices = range(len(pred_boxes))
    all_gt_indices = range(len(gt_boxes))
    if len(all_pred_indices) == 0:
        tp = 0
        fp = 0
        fn = len(gt_boxes)
        return {'true_pos': tp, 'false_pos': fp, 'false_neg': fn}
    if len(all_gt_indices) == 0:
        tp = 0
        fp = len(pred_boxes)
        fn = 0
        return {'true_pos': tp, 'false_pos': fp, 'false_neg': fn}

    gt_idx_thr = []
    pred_idx_thr = []
    ious = []
    for ipb, pred_box in enumerate(pred_boxes):
        for igb, gt_box in enumerate(gt_boxes):
            iou = calc_iou_individual(pred_box, gt_box)
            if iou > iou_thr:
                gt_idx_thr.append(igb)
                pred_idx_thr.append(ipb)
                ious.append(iou)

    args_desc = np.argsort(ious)[::-1]
    if len(args_desc) == 0:
        # No matches
        tp = 0
        fp = len(pred_boxes)
        fn = len(gt_boxes)
    else:
        gt_match_idx    = []
        pred_match_idx  = []
        for idx in args_desc:
            gt_idx = gt_idx_thr[idx]
            pr_idx = pred_idx_thr[idx]
            # If the boxes are unmatched, add them to matches
            if (gt_idx not in gt_match_idx) and (pr_idx not in pred_match_idx):
                gt_match_idx.append(gt_idx)
                pred_match_idx.append(pr_idx)
        tp = len(gt_match_idx)
        fp = len(pred_boxes) - len(pred_match_idx)
        fn = len(gt_boxes)   - len(gt_match_idx)

    return {'true_pos': tp, 'false_pos': fp, 'false_neg': fn}

In [None]:
#calculate precision recall curves from the data
def calc_precision_recall(img_results):
    """Calculates precision and recall from the set of images

    Args:
        img_results (dict): dictionary formatted like:
            {
                'img_id1': {'true_pos': int, 'false_pos': int, 'false_neg': int},
                'img_id2': ...
                ...
            }

    Returns:
        tuple: of floats of (precision, recall)
    """
    true_pos = 0; false_pos = 0; false_neg = 0
    for _, res in img_results.items():
        true_pos  += res['true_pos']
        false_pos += res['false_pos']
        false_neg += res['false_neg']

    try:
        precision = true_pos/(true_pos + false_pos)
    except ZeroDivisionError:
        precision = 0.0
    try:
        recall = true_pos/(true_pos + false_neg)
    except ZeroDivisionError:
        recall = 0.0

    return (precision, recall)

In [None]:
#compute model scores map
def get_model_scores_map(pred_boxes):
    """Creates a dictionary of from model_scores to image ids.

    Args:
        pred_boxes (dict): dict of dicts of 'boxes' and 'scores'

    Returns:
        dict: keys are model_scores and values are image ids (usually filenames)

    """
    model_scores_map = {}
    for img_id, val in pred_boxes.items():
        for score in val['scores']:
            if score not in model_scores_map.keys():
                model_scores_map[score] = [img_id]
            else:
                model_scores_map[score].append(img_id)
    return model_scores_map

In [None]:
#compute average precision values
def get_avg_precision_at_iou(gt_boxes, pred_boxes, iou_thr=0.5):
    """Calculates average precision at given IoU threshold.

    Args:
        gt_boxes (list of list of floats): list of locations of ground truth objects as [xmin, ymin, xmax, ymax]
        pred_boxes (list of list of floats): list of locations of predicted objects as [xmin, ymin, xmax, ymax]
        iou_thr (float): value of IoU to consider as threshold for a true prediction.

    Returns:
        dict: avg precision as well as summary info about the PR curve

        Keys:
            'avg_prec' (float): average precision for this IoU threshold
            'precisions' (list of floats): precision value for the given model_threshold
            'recall' (list of floats): recall value for given model_threshold
            'models_thrs' (list of floats): model threshold value that precision and recall were computed for.
    """
    model_scores_map = get_model_scores_map(pred_boxes)
    sorted_model_scores = sorted(model_scores_map.keys())

    # Sort the predicted boxes in descending order (lowest scoring boxes first):
    for img_id in pred_boxes.keys():
        arg_sort = np.argsort(pred_boxes[img_id]['scores'])
        pred_boxes[img_id]['scores'] = np.array(pred_boxes[img_id]['scores'])[arg_sort].tolist()
        pred_boxes[img_id]['boxes']  = np.array(pred_boxes[img_id]['boxes'])[arg_sort].tolist()

    pred_boxes_pruned = deepcopy(pred_boxes)

    precisions  = []
    recalls     = []
    model_thrs  = []
    img_results = {}
    # Loop over model score thresholds and calculate precision, recall
    for ithr, model_score_thr in enumerate(sorted_model_scores[:-1]):
        # On first iteration, define img_results for the first time:
        img_ids = gt_boxes.keys() if ithr == 0 else model_scores_map[model_score_thr]
        for img_id in img_ids:
            gt_boxes_img = gt_boxes[img_id]
            box_scores = pred_boxes_pruned[img_id]['scores']
            start_idx = 0
            for score in box_scores:
                if score <= model_score_thr:
                    pred_boxes_pruned[img_id]
                    start_idx += 1
                else:
                    break

            # Remove boxes, scores of lower than threshold scores:
            pred_boxes_pruned[img_id]['scores'] = pred_boxes_pruned[img_id]['scores'][start_idx:]
            pred_boxes_pruned[img_id]['boxes']  = pred_boxes_pruned[img_id]['boxes'][start_idx:]

            # Recalculate image results for this image
            img_results[img_id] = get_single_image_results(gt_boxes_img, pred_boxes_pruned[img_id]['boxes'], iou_thr)

        prec, rec = calc_precision_recall(img_results)
        precisions.append(prec)
        recalls.append(rec)
        model_thrs.append(model_score_thr)

    precisions = np.array(precisions)
    recalls = np.array(recalls)
    prec_at_rec = []
    for recall_level in np.linspace(0.0, 1.0, 11):
        try:
            args = np.argwhere(recalls >= recall_level).flatten()
            prec = max(precisions[args])
        except ValueError:
            prec = 0.0
        prec_at_rec.append(prec)
    avg_prec = np.mean(prec_at_rec)

    return {
        'avg_prec': avg_prec,
        'precisions': precisions,
        'recalls': recalls,
        'model_thrs': model_thrs}

In [None]:
#plot precision recall curve
def plot_pr_curve(precisions, recalls, category='Person', label=None, color=None, ax=None):    
 
    if ax is None:
        plt.figure(figsize=(10,8))
        ax = plt.gca()

    if color is None:
        color = COLORS[0]

    ax.scatter(recalls, precisions, label=label, s=5, color=color)
    ax.set_xlabel('recall', fontsize='medium')
    ax.set_ylabel('precision', fontsize='medium')
    ax.set_title('Precision-Recall curve\n')
    ax.set_xlim([0.0,1.15])
    ax.set_ylim([0.0,1.15])
    ax.tick_params(axis='x',which='major',direction='out',length=8,pad=-5,labelsize=20)
    ax.tick_params(axis='y',which='major',direction='out',length=8,pad=-5,labelsize=20)
    return ax

In [None]:
#give the location of the JSON files for the ground truth and predicted bounding boxes
if __name__ == "__main__":

    bBox_groundtruth_File = 'C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/visualization/Ensemble5_BBox_groundtruth_0.1.json'
    bBox_prediction_File = 'C:/Users/rajaramans2/codes/omsakthi_ensemble_visualization_kaggle/visualization/Ensemble5_tBBox_predicted_0.1.json'
    print("\n ****** Calculating mAP:  " + bBox_prediction_File + "****** \n")
    
    with open(bBox_groundtruth_File) as infile:
        gt_boxes = json.load(infile)

    with open(bBox_prediction_File) as infile:
        pred_boxes = json.load(infile)

    # Runs it for one IoU threshold
    iou_thr = 0.2
    start_time = time.time()
    data = get_avg_precision_at_iou(gt_boxes, pred_boxes, iou_thr=iou_thr)
    end_time = time.time()
    print('Single IoU calculation took {:.4f} secs'.format(end_time - start_time))
    print('avg precision: {:.4f}'.format(data['avg_prec']))

    start_time = time.time()
    ax = None
    avg_precs = []
    iou_thrs = []
    for idx, iou_thr in enumerate(np.linspace(0.1, 0.6, 10)):          #(0.5, 0.95, 10)
        data = get_avg_precision_at_iou(gt_boxes, pred_boxes, iou_thr=iou_thr)
        avg_precs.append(data['avg_prec'])
        iou_thrs.append(iou_thr)

        precisions = data['precisions']
        recalls = data['recalls']
        ax = plot_pr_curve(
            precisions, recalls, label='{:.2f}'.format(iou_thr), color=COLORS[idx*2], ax=ax)

    # prettify for printing:
    avg_precs = [float('{:.4f}'.format(ap)) for ap in avg_precs]
    iou_thrs = [float('{:.4f}'.format(thr)) for thr in iou_thrs]
    print('mAP: {:.2f}'.format(100*np.mean(avg_precs)))
    print('avg precs: ', avg_precs)
    print('iou_thrs:  ', iou_thrs)
    legend = plt.legend(loc='upper right', title='IOU Thr', fontsize='x-small', frameon=True)
    legend.get_title().set_fontsize('20')
    for xval in np.linspace(0.0, 1.0, 11):
        plt.vlines(xval, 0.0, 1.1, color='gray', alpha=0.3, linestyles='dashed')
    end_time = time.time()
    print('\nPlotting and calculating mAP takes {:.4f} secs'.format(end_time - start_time))
    plt.show()