In [1]:
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing import image as keras_image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
import numpy as np
import os
import joblib

In [2]:
from IPython.display import display, clear_output, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [5]:
import tensorflow as tf

physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    # Invalid device or cannot modify virtual devices once initialized.
    print("No GPU?")
    clear_output()

In [6]:
print("Setting up pre-trained keras ResNet50 model")
model = ResNet50(weights='imagenet')
print("Model ready")
clear_output()

In [7]:
import h5py

In [8]:
import urllib.request
if not os.path.exists('val_preds.h5'):
    print("Downloading MICP calibration data (190MB) - be patient!")
    urllib.request.urlretrieve("https://cml.rhul.ac.uk/people/ptocca/ILSVRC2012-CP/val_preds.h5",
                               'val_preds.h5')
    clear_output()

In [9]:
with h5py.File('val_preds.h5','r') as f:
    preds_cal = f['preds'][:]

In [10]:
def pValues(calibrationAlphas,testAlphas,randomized=False):
    testAlphas = np.array(testAlphas)
    sortedCalAlphas = np.sort(calibrationAlphas)
    
    leftPositions = np.searchsorted(sortedCalAlphas,testAlphas)
    
    if randomized:
        rightPositions = np.searchsorted(sortedCalAlphas,testAlphas,side='right')
        ties  = rightPositions-leftPositions+1   # ties in cal set plus the test alpha itself
        randomizedTies = ties * np.random.uniform(size=len(ties))
        return  (len(calibrationAlphas) - rightPositions + randomizedTies)/(len(calibrationAlphas)+1)
    else:
        return  (len(calibrationAlphas) - leftPositions + 1)/(len(calibrationAlphas)+1)


In [11]:
def rev_score(scores,label):
    return -scores[:,label]


def ratio_own_to_max(scores, label):
    mask = np.ones(scores.shape[1],dtype=np.bool)
    mask[label] = False

    return np.amax(scores, axis=1, where=mask, initial=0) / scores[:,label]

In [12]:
def micp_pValues(scores_cal,scores_test,y_cal,ncm):
    """Compute p-values for a Mondrian Inductive Conformal Predictor
    scores_cal,scores_test: arrays of shape (objects,labels) of scores for 
                            calibration set and test set
    y_cal: array of shape (objects,) with the labels of the calibration set
    ncm: function of scores and label, computing the NCM"""
    
    micp_pValues = []

    for i in range(scores_test.shape[1]):
        ncm_cal = ncm(scores_cal[y_cal==i], i)
        ncm_test = ncm(scores_test, i)
        p_i = pValues(ncm_cal,ncm_test)
        
        micp_pValues.append(p_i)

    micp_pValues = np.array(micp_pValues)
    
    return micp_pValues

In [13]:
# ilsrvc_dir = "/mnt/d/Research/ILSVRC2012/"
ilsrvc_dir = "."

In [14]:
gt_cal_file = os.path.join(ilsrvc_dir,"cal_gt.txt")
gt_test_file = os.path.join(ilsrvc_dir,"test_gt.txt")
lbls_file = os.path.join(ilsrvc_dir,"labels.txt")

In [15]:
n_to_ki = {}
ki_to_synset = {}
with open(os.path.join(ilsrvc_dir,'synset_words.txt')) as f:
    for i,l in enumerate(f):
        n_to_ki[l.split()[0].strip()]=i
        ki_to_synset[i]=l[10:].split(",")[0].strip()

In [16]:
ground_truth_ki_cal = np.loadtxt(gt_cal_file,dtype=np.int)
ground_truth_ki_test = np.loadtxt(gt_test_file,dtype=np.int)

In [17]:
import io

In [18]:
import PIL.Image
import joblib

In [19]:
mem = joblib.Memory('/dev/shm/joblib',verbose=0)

@mem.cache
def getImage(url):
    img_data = PIL.Image.open(urllib.request.urlopen(url))
    if img_data.mode != 'RGB':
        img_data = img_data.convert('RGB')
    img_data = img_data.resize((224,224),resample=PIL.Image.NEAREST)
    return img_data    

In [20]:
def get_prob_sets(preds, eps):
    preds_as = np.argsort(-preds,axis=1)
    preds_cumul = np.cumsum(np.take_along_axis(preds,preds_as,axis=1),axis=1)

    set_masks = preds_cumul<1-eps
    set_masks[:,1:] = set_masks[:,:-1]
    set_masks[:,0] = True

    sets = [(pr_as[m],pr[pr_as[m]]) for pr_as, m,pr in zip(preds_as,set_masks,preds)]
    return sets

In [21]:
def show_pic(i,eps, ncm):
    if 0:   # for development environment
        img_file = os.path.join(".","img","ILSVRC2012_valsub_%08d.JPEG"%i)
        img_data = keras_image.load_img(img_file, target_size=(224, 224))
    else:
        url="""https://cml.rhul.ac.uk/people/ptocca/ILSVRC2012-CP/img/ILSVRC2012_valsub_%08d.JPEG"""%i
        img_data = getImage(url)

    output = io.BytesIO()
    img_data.save(output,format="PNG")
    img.value = output.getvalue()

    # compute ResNet50 preds
    x = keras_image.img_to_array(img_data)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    test_preds = model.predict(x)
    resNet50_set = zip(*(get_prob_sets(test_preds.reshape(1,-1), eps=eps)[0]))

    # compute CP
    if ncm=='NegProb':
        ncm_f = rev_score
    elif ncm=='Ratio':
        ncm_f = ratio_own_to_max
        
    p_vals = micp_pValues(preds_cal, test_preds, ground_truth_ki_cal, ncm=ncm_f)
    ps = np.argwhere(p_vals>eps)[:,0].T
    ps_p_vals = p_vals[ps].flatten()
    sorting_by_p_val = np.argsort(ps_p_vals)[::-1] 
    ps_synset = [ki_to_synset[k]+": %0.3f"%p for k,p in zip(ps[sorting_by_p_val],ps_p_vals[sorting_by_p_val])]
    
    # Do all widget updates
    ## update ground truth widget
    lbl = ki_to_synset[ground_truth_ki_test[i-1]]
    desc.children[1].value = lbl

    ## update resNet50 widget
    resnet50.children[0].value = "ResNet50 (prob) at aggr prob %0.2f"%(1-eps)
    resnet50.children[1].value = "\n".join(["%s: %0.3f"%(ki_to_synset[k],pr) for k,pr in resNet50_set])

    ## update CP widget
    CP.children[0].value = "CP (p-value) at significance level %0.2f"%eps
    CP.children[1].value = "\n".join(ps_synset)
    return

In [22]:
import ipywidgets as ipw

In [23]:
with open("ILSRVC_CP_Notes.html") as f:
    notes = f.read()


In [24]:
%%html
<style>
    .widget-label {font-size: 16px;}
    .widget-htmlmath {font-size: 16px;}
    .widget-html {font-size: 16px;}
    .widget-textarea > textarea {font-size: 16px;}
    .widget-radio-box > label {font-size: 16px;}
    .widget-readout  {font-size: 16px;}
</style>

In [25]:
initial_pic = 1000
initial_eps = 0.2
initial_ncm = "NegProb"

desc = ipw.HBox([ipw.Label("ImageNet label:"),
             ipw.HTML("N/A")],layout=ipw.Layout(align='center',border="solid",padding="7px 27px 7px 27px"))
#desc.add_class("fontsize16px")

img = ipw.Image(layout=ipw.Layout(height="400px",width="400px"))

resnet50 = ipw.VBox([ipw.Label("ResNet50 Probability (top 5)"),
                     ipw.Textarea(layout=ipw.Layout(height="100%"))])

CP = ipw.VBox([ipw.Label("Conformal Predictor at significance level 'eps'"),
               ipw.Textarea(layout=ipw.Layout(height="100%"))])

NCM = ipw.RadioButtons(options=['NegProb','Ratio'],
                       value=initial_ncm,
                       description="NCM:")
NCM_frame = ipw.VBox([ipw.Label(""), NCM])  # Just to align nicely
               

labels = ipw.HBox([resnet50,CP,NCM_frame],layout=ipw.Layout(height="200px",align_content="stretch"))

pic_idx = ipw.IntSlider(value=initial_pic, description="Image:",
                        min=1, max=2000, continuous_update=False,
                        layout=ipw.Layout(width="90%", align_items='center'))
eps_slider = ipw.FloatSlider(value=initial_eps, description="Epsilon",
                             min=1e-8, max=1.0, step=0.01,
                             continuous_update=False,
                             layout=ipw.Layout(width="90%", align_items='center'))

heading = ipw.HTML(value="<h1>Demo of Conformal Prediction using ResNet50 on ImageNet data</h1>")

gui_tab = ipw.VBox([heading, img, desc, pic_idx, eps_slider, labels], layout=ipw.Layout(align_items='center'))
help_tab = ipw.HTMLMath(value=notes)

gui = ipw.Tab(children=[gui_tab,help_tab])
gui.set_title(0,"Demo")
gui.set_title(1,"Notes")

show_pic(initial_pic,initial_eps,initial_ncm)

#clear_output();


In [26]:
ipw.interactive(show_pic,i = pic_idx, eps = eps_slider, ncm = NCM)

gui

Tab(children=(VBox(children=(HTML(value='<h1>Demo of Conformal Prediction using ResNet50 on ImageNet data</h1>…