In [None]:
from   ipywidgets import interactive, fixed
import json
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from   pathlib import Path
from   PIL import Image
from   utils.utils import list_labelled_slices, calculate_IoU, difference, intersection, union
%config Completer.use_jedi = False

In [None]:
# Directories and filenames
results_dir = 'test_results_v2'

### Slices with annotations (boundig box + class label)

In [None]:
df_labelled_slices = list_labelled_slices()
df_labelled_slices

### Predicted detections (bounding box + class label + classification score)

In [None]:
# List all predictions
results_dir = Path(results_dir)

retina_net_detections = []
study_ids             = []
for i in results_dir.iterdir():
    
    if str(i.name).startswith('ae_') and i.is_dir():
        for j in i.iterdir():
            
            study_id = j.stem
            
            if str(j.name).startswith('ae_') and str(j.name).endswith('.csv') and j.is_file():
                retina_net_detections.append(str(j))
                study_ids.append(str(study_id))

In [None]:
# Filter predictions for which there is a ground truth available
detections_data = np.array([retina_net_detections, study_ids]).swapaxes(0,1)
df_detections   = pd.DataFrame(detections_data, columns=['retina_net_detection', 'study_id'])

detection_with_groundtruth = np.where(df_detections['study_id'].isin(df_labelled_slices['study_id']))[0].tolist()
df_detections              = df_detections.iloc[detection_with_groundtruth]
df_detections              = df_detections.sort_values(by='study_id')
df_detections              = df_detections.reset_index(drop=True)
df_detections

### Detections (TP, TN, FP, FN)

In [None]:
def get_detections_from_ct_prediction(detections_scan):
    """
    Load bounding box predictions for an entire CT scan from a csv file and return the slice scores.
    """
    header = ['image_path','x1','y1','x2','y2','score']
    
    # Load prediction for a full CT scan
    df_detections = pd.read_csv(detections_scan, names=header)
    
    # Extract bounding box coordinates and slice scores
    x1     = df_detections['x1'].to_numpy()
    y1     = df_detections['y1'].to_numpy()
    x2     = df_detections['x2'].to_numpy()
    y2     = df_detections['y2'].to_numpy()
    scores = df_detections['score'].to_numpy()
    
    # Calculate bounding box parameters
    #w    = x2 - x1
    #h    = y2 - y1
    bbox = [x1, y1, x2, y2]
    bbox = np.asarray(bbox).swapaxes(0,1)
    
    return scores, bbox

In [None]:
def get_ground_truth_slices(scan, df_labelled_slices):
    
    # Load ground truth slice info
    df_ct_scan  = df_labelled_slices.iloc[np.where(df_labelled_slices['scan'].isin([scan]))[0].tolist()]
    true_slices = df_ct_scan['slice'].to_numpy(dtype=np.int64)
    true_slices = np.sort(true_slices)
    
    return true_slices

In [None]:
def get_ground_truth_bboxes(scan, df_labelled_slices):
    
    # Load ground truth slice info
    df_ct_scan  = df_labelled_slices.iloc[np.where(df_labelled_slices['scan'].isin([scan]))[0].tolist()]
    annot_files = df_ct_scan['annot_file'].to_numpy()
    
    bboxes = []
    for annot_file in annot_files:
        with open(annot_file) as json_file:
            annots = json.load(json_file)
        
        x1 = annots['x0']
        y1 = annots['y0']
        x2 = annots['x1']
        y2 = annots['y2']
    
        bbox = [x1, y1, x2, y2]
        bboxes.append(bbox)
    
    bboxes = np.asarray(bboxes)
    return bboxes

## ToDo

Check IoU calculation. What if BBox = 0,0,1,1?

Also, mean IoU is not the same as for evaluation script (0.74 vs. 0.88)

In [None]:
# Loop over predictions
print('Found {:d} predictions'.format(df_detections.shape[0]))

scans            = []
true_slices      = []
detected_slices  = []
IoUs             = []

n_slices = 0
n_annots = 0
n_tp     = 0
n_tn     = 0
n_fp     = 0
n_fn     = 0

scores_in_annots = []
scores_in_fps    = []
scores_in_tns    = []
IoUs_in_annots   = []

for detection_ct in df_detections['retina_net_detection']:
    
    score_threshold = 0.05
    IoU_thresholds  = 0.5
    
    scan                    = detection_ct.split('/ae_')[-1].split('.')[0]
    true_slices_ct          = get_ground_truth_slices(scan, df_labelled_slices)
    true_bbox_ct            = get_ground_truth_bboxes(scan, df_labelled_slices)
    scores_ct, pred_bbox_ct = get_detections_from_ct_prediction(detection_ct)
    
    # Find slices with a detection
    slices_ct          = np.arange(scores_ct.shape[0])
    detected_slices_ct = np.empty([0], dtype=np.int64)
    IoUs_detection_ct  = np.empty([0], dtype=np.float64)
    IoUs_true_ct       = np.empty([0], dtype=np.float64)
    for s, score, pred_bbox in zip(slices_ct, scores_ct, pred_bbox_ct):
        
        # Check if detection score is above threshold
        if score > score_threshold:
            detected_slices_ct = np.append(detected_slices_ct, s)
            
            # Calculate IoU
            IoU = 0
            
            # Check if the detection is in an annotated slice
            if s in true_slices_ct:
                true_bbox = true_bbox_ct[np.where(true_slices_ct==s)][0]
                IoU = calculate_IoU(pred_bbox, true_bbox)
                
            IoUs_detection_ct = np.append(IoUs_detection_ct, IoU)
            
        # Check if the slice is in an annotated slice
        if s in true_slices_ct:
            
            # Calculate IoU
            IoU = 0
            
            # Check if detection score is above threshold
            if score > score_threshold:
                true_bbox = true_bbox_ct[np.where(true_slices_ct==s)][0]
                IoU = calculate_IoU(pred_bbox, true_bbox)
                
            IoUs_true_ct = np.append(IoUs_true_ct, IoU)
            
    scans.append(scan)
    true_slices.append(true_slices_ct)
    detected_slices.append(detected_slices_ct)
    IoUs.append(IoUs_true_ct)
    
    #print('\n************************************')
    #print('Scan: {:s}'.format(scan))
    #print('\tTrue slices:     {}'.format(true_slices_ct))    
    #print('\tDetected slices: {}'.format(detected_slices_ct))
    #print('\tIoUs:            {}'.format(IoUs_true_ct))
    
    # Confusion matrix: Calculate TP, TN, FP, FN
    n_slices_ct     = slices_ct.shape[0]
    n_annots_ct     = true_slices_ct.shape[0]
    n_detections_ct = detected_slices_ct.shape[0]
    tp_ct =  intersection(true_slices_ct, detected_slices_ct)
    tn_ct =  slices_ct[~np.isin(slices_ct, union(true_slices_ct, detected_slices_ct))]
    fp_ct =  difference(detected_slices_ct, true_slices_ct)
    fn_ct =  difference(true_slices_ct, detected_slices_ct)
    
    n_slices += n_slices_ct
    n_annots += n_annots_ct
    n_tp += tp_ct.shape[0]
    n_tn += tn_ct.shape[0]
    n_fp += fp_ct.shape[0]
    n_fn += fn_ct.shape[0]
    
    #print('\nConfusion matrix:')
    #print('\tn_slices     = {:d}'.format(n_slices_ct))
    #print('\tn_annots     = {:d}'.format(n_annots_ct))
    #print('\t# TP         = {:d}/{:d} ({:.2f} %)'.format(tp_ct.shape[0], n_annots_ct,             100*(float(tp_ct.shape[0])/float(n_annots_ct))))
    #print('\t# TN         = {:d}/{:d} ({:.2f} %)'.format(tn_ct.shape[0], n_slices_ct-n_annots_ct, 100*(float(tn_ct.shape[0])/float(n_slices_ct-n_annots_ct))))
    #print('\t# FP         = {:d}/{:d} ({:.2f} %)'.format(fp_ct.shape[0], n_slices_ct-n_annots_ct, 100*(float(fp_ct.shape[0])/float(n_slices_ct-n_annots_ct))))
    #print('\t# FN         = {:d}/{:d} ({:.2f} %)'.format(fn_ct.shape[0], n_annots_ct,             100*(float(fn_ct.shape[0])/float(n_annots_ct))))
    
    # Classification score
    scores_in_annots_ct  = scores_ct[true_slices_ct]
    scores_in_fp_ct      = scores_ct[fp_ct]
    scores_in_tn_ct      = scores_ct[tn_ct]
    
    scores_in_annots.extend(scores_in_annots_ct)
    scores_in_fps.extend(scores_in_fp_ct)
    scores_in_tns.extend(scores_in_tn_ct)
    
    mean_score_in_annots = np.mean(scores_in_annots_ct)
    #print('\nScore statistics:')
    #print('\tMean score in n={:d} annotated slices = {:.1f}'.format(n_annots_ct, mean_score_in_annots))

    # IoU: Average IoU in true slice
    IoUs_in_annots.extend(IoUs_true_ct)
    
    #print('\nAverage IoU in annotated slices:', np.mean(IoUs_true_ct))
    
print('Test set:')
print('\t# slices = {:d}'.format(n_slices))
print('\t# annots = {:d}'.format(n_annots))
print('\t# TP     = {:d}/{:d} ({:.1f} %)'.format(n_tp, n_annots,          100*(float(n_tp)/float(n_annots))))
print('\t# TN     = {:d}/{:d} ({:.1f} %)'.format(n_tn, n_slices-n_annots, 100*(float(n_tn)/float(n_slices-n_annots))))
print('\t# FP     = {:d}/{:d} ({:.1f} %)'.format(n_fp, n_slices-n_annots, 100*(float(n_fp)/float(n_slices-n_annots))))
print('\t# FN     = {:d}/{:d} ({:.1f} %)'.format(n_fn, n_annots,          100*(float(n_fn)/float(n_annots))))
print('\n\tMean score in annots = {:.2f} [{:.2f},{:.2f}]'.format(np.median(scores_in_annots), np.quantile(scores_in_annots, q=0.25), np.quantile(scores_in_annots, q=0.75)))
print('\tMean score in fps    = {:.2f} [{:.2f},{:.2f}]'.format(np.median(scores_in_fps),    np.quantile(scores_in_fps, q=0.25),    np.quantile(scores_in_fps, q=0.75)))
print('\tMean score in tns    = {:.2f} [{:.2f},{:.2f}]'.format(np.median(scores_in_tns),    np.quantile(scores_in_tns, q=0.25),    np.quantile(scores_in_tns, q=0.75)))
print('\tMean IoU in annots   = {:.2f} [{:.2f},{:.2f}]'.format(np.median(IoUs_in_annots), np.quantile(IoUs_in_annots, q=0.25), np.quantile(IoUs_in_annots, q=0.75)))
print('\tMean IoU in detect   = {:.2f} [{:.2f},{:.2f}]'.format(np.median(IoUs_detection_ct), np.quantile(IoUs_detection_ct, q=0.25), np.quantile(IoUs_detection_ct, q=0.75)))

In [None]:
np.mean(IoUs_detection_ct)

### Localization

In [None]:
def find_nearest(array,value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return array[idx]

In [None]:
# Loop over predictions
print('Found {:d} predictions'.format(df_detections.shape[0]))

scans            = []
scores           = []
bbox             = []
localized_slices = []
true_slices      = []
hits             = []
xy_dists_tp      = []
xy_dists_fp      = []

tp_det = 0
fp_det = 0
fn_det = 0

for detection_ct in df_detections['retina_net_detection']:
    
    score_threshold = 0.05
    
    scan               = detection_ct.split('/ae_')[-1].split('.')[0]
    true_slices_ct     = get_ground_truth_slices(scan, df_labelled_slices)
    true_bbox_ct       = get_ground_truth_bboxes(scan, df_labelled_slices)
    scores_ct, bbox_ct = get_detections_from_ct_prediction(detection_ct)

    # Check if there is a detection with a classification score above the threshold
    if np.amax(scores_ct) > score_threshold:
        # Pick the slice(s) with the highest classification score. Typically, this will be a single slice
        loc_slice = np.where(scores_ct==np.amax(scores_ct))[0]
        
        if loc_slice.shape[0] > 1:
            print("WARNING: Localized more than 1 slice!")
            break
        
        # Check if one or more of the picked slices is in the list of true slices
        slice_overlap = set(loc_slice).intersection(set(true_slices_ct))
        slice_overlap = list(slice_overlap)

        # Store results
        if len(slice_overlap) > 0:
            hits.append(1)
            tp_det +=1
            
            # xy-plane distance
            true_bbox = true_bbox_ct[np.where(true_slices_ct==loc_slice)][0]
            loc_bbox  = bbox_ct[loc_slice][0]
            
            true_x = true_bbox[2] - true_bbox[0]
            true_y = true_bbox[3] - true_bbox[1]
            
            loc_x  = loc_bbox[2] - loc_bbox[0]
            loc_y  = loc_bbox[3] - loc_bbox[1]
            
            loc_xy_distance = np.sqrt( (loc_x-true_x)**2 + (loc_y-true_y)**2 )
            xy_dists_tp.append(loc_xy_distance)
            
        else:
            hits.append(0)
            fp_det += 1
            
            # Find nearest true slice
            nearest_true_slice = find_nearest(true_slices_ct, loc_slice)
            
            # xy-plane distance
            true_bbox = true_bbox_ct[np.where(true_slices_ct==nearest_true_slice)][0]
            loc_bbox  = bbox_ct[loc_slice][0]
            
            true_x = true_bbox[2] - true_bbox[0]
            true_y = true_bbox[3] - true_bbox[1]
            
            loc_x  = loc_bbox[2] - loc_bbox[0]
            loc_y  = loc_bbox[3] - loc_bbox[1]
            
            loc_xy_distance = np.sqrt( (loc_x-true_x)**2 + (loc_y-true_y)**2 )
            xy_dists_fp.append(loc_xy_distance)
    
    else:
        loc_slice = np.empty(shape=(0))
        hits.append(0)
        fn_det += 1
        
    print('\nScan: {:s}'.format(scan))
    print('\tTrue slices:      {}'.format(true_slices_ct))
    print('\tLocalized slices: {}'.format(loc_slice))
        
    scans.append(scan)
    scores.append(scores_ct)
    bbox.append(np.asarray(bbox_ct))
    localized_slices.append(loc_slice)
    true_slices.append(true_slices_ct)
    
hits = np.asarray(hits)

print('\nConfusion matrix:')
print('\t#TP = {:d}'.format(tp_det))
print('\t#FP = {:d}'.format(fp_det))
print('\t#FN = {:d}'.format(fn_det))

In [None]:
print(hits)

In [None]:
print('Average localization distance in xy-plane for TPs: {:.1f} +/- {:.1f} pxl (n={:d})'.format(np.mean(xy_dists_tp), np.std(xy_dists_tp), len(xy_dists_tp)))

In [None]:
print('Average localization distance in xy-plane for FPs: {:.1f} +/- {:.1f} pxl (n={:d})'.format(np.mean(xy_dists_fp), np.std(xy_dists_fp), len(xy_dists_fp)))

In [None]:
def plotter(scans, scores, bbox, predicted_slices, true_slices, scan_id, slice_id):
    
    scan    = scans[scan_id]
    patient = scan.split('_')[0]
    
    image_path = '/data/public/age_estimation/jpg/ae_'+patient+'/ae_'+scan+'/ae_'+scan+'_s_'+str(slice_id)+'.jpg'
    
    print('Scan: {:s}'.format(scans[scan_id]))    
    print('Image path: {:s}'.format(image_path))
    
    image = Image.open(image_path)
    image = np.asarray(image)
    
    x1, y1, w, h = bbox[scan_id][:,slice_id]
    
    fig, ax = plt.subplots(1, 2, figsize=(20,10))
    ax[0].plot(scores[scan_id], c='r', label='algorithm score')
    if predicted_slices[scan_id].size != 0:
        ax[0].plot(predicted_slices[scan_id], np.ones_like(predicted_slices[scan_id]), c='k')
        ax[0].axvline(x=np.mean(predicted_slices[scan_id]), ls='--', lw=1.5, c='k')
    ax[0].axvline(x=np.mean(true_slices[scan_id]), ls='-.', lw=1.5, c='g')
    ax[0].axvline(x=slice_id, ls='--', lw=1.5, c='b', label='current slice')
    ax[0].set_ylim(0,1)
    ax[0].tick_params(labelsize=14)
    ax[0].set_xlabel('slice', fontsize=16)
    ax[0].set_ylabel('score', fontsize=16)
    ax[0].legend(fontsize=14)
    
    ax[1].imshow(image[100:-100,100:-100])
    patch = plt.Rectangle([x1-100, y1-100], w, h, fill=False, edgecolor='r', linewidth=3)
    ax[1].add_patch(patch)
    
    plt.show()

In [None]:
interactive(plotter,
            scans            = fixed(scans),
            scores           = fixed(scores),
            bbox             = fixed(bbox),
            predicted_slices = fixed(localized_slices),
            true_slices      = fixed(true_slices),
            scan_id          = (0,len(scans)-1),
            slice_id         = (0,400))

In [None]:
n_tot = hits.shape[0]
n_0   = np.where(hits==0)[0].shape[0]
n_1   = np.where(hits==1)[0].shape[0]
accuracy = (float(n_tot-n_0) / float(n_tot)) * 100
print('Accuracy = {:d}/{:d} ({:.2f} %)'.format(n_1, n_tot, accuracy))

In [None]:
plt.figure(figsize=(8,8))
plt.hist(hits, bins=[0,1,2], color='cornflowerblue')
plt.xticks([0.5,1.5], [0,1])
plt.tick_params(labelsize=14)
plt.xlabel('result', fontsize=16)
plt.ylabel('count', fontsize=16)
plt.show()

In [None]:
print('Misses:')
for miss in np.where(hits==0)[0]:
    print('\tPredicted slice:', localized_slices[miss], 'Annotated slices:', true_slices[miss])