###Video Link: https://drive.google.com/file/d/1dS4zRURBqvEZTEdkfhJAhlpZd8zI1GAV/view?usp=sharing

#Q1

In [1]:
import sys
sys.path.insert(0, '../')

# from data_utils import *
# from utils import *
# from combination_methods import *
from tqdm.auto import tqdm
import torch
from sklearn.model_selection import train_test_split
# from metrics import *
import csv
import numpy as np
import os

In [2]:
import numpy as np
import scipy; import scipy.stats; import scipy.integrate as integrate
# import imax_calib.utils as # import imax_calib.as 
##################
# Binning #################
def nolearn_bin_boundaries(num_bins, binning_scheme, x=None):
    """
    Get the bin boundaries (in logit space) of the <num_bins> bins. This function returns only the bin boundaries which do not include any type of learning. 
    For example: equal mass bins, equal size bins or overlap bins.
    
    Parameters
    ----------
    num_bins: int
        Number of bins
    binning_scheme: string
        The way the bins should be placed. 
            'eqmass': each bin has the same portion of samples assigned to it. Requires that `x is not None`.
            'eqsize': equal spaced bins in `probability` space. Will get equal spaced bins in range [0,1] and then convert to logodds.
            'custom_range[min_lambda,max_lambda]': equal spaced bins in `logit` space given some custom range.
    x: numpy array (1D,)
        array with the 1D data to determine the eqmass bins. 
    Returns
    -------
    bins: numpy array (num_bins-1,)
        Returns the bin boundaries. It will return num_bins-1 bin boundaries in logit space. Open ended range on both sides.
    """
    if binning_scheme=="eqmass": 
        assert x is not None and len(x.shape)==1
        bins = np.linspace(1.0/num_bins, 1 - 1.0 / num_bins, num_bins-1) # num_bins-1 boundaries for open ended sides
        bins = np.percentile(x, bins * 100, interpolation='lower') # data will ensure its in Logit space
    elif binning_scheme=="eqsize":  # equal spacing in logit space is not the same in prob space because of sigmoid non-linear transformation
        bins = to_logodds(   np.linspace(1.0/num_bins, 1 - 1.0 / num_bins, num_bins-1)   ) # num_bins-1 boundaries for open ended sides
    elif "custom_range" in binning_scheme: # used for example when you want bins at overlap regions. then custom range should be [ min p(y=1), max p(y=0)  ]. e.g. custom_range[-5,8]
        custom_range = eval(binning_scheme.replace("custom_range", ""))
        assert type(custom_range)==list and (custom_range[0] <= custom_range[1])
        bins = np.linspace(custom_range[0], custom_range[1], num_bins-1) # num_bins-1 boundaries for open ended sides
    return bins

def bin_data(x, bins):
    """
    Given bin boundaries quantize the data (x). When ndims(x)>1 it will flatten the data, quantize and then reshape back to orig shape. 
    Returns the following quantized values for num_bins=10 and bins = [2.5, 5.0, 7.5, 1.0]\n
    quantize: \n
              (-inf, 2.5) -> 0\n
              [2.5, 5.0) -> 1\n
              [5.0, 7.5) -> 2\n
              [7.5, 1.0) -> 3\n
              [1.0, inf) -> 4\n
    Parameters
    ----------
    x: numpy ndarray 
       Network logits as numpy array 
    bins: numpy ndarray
        location of the (num_bins-1) bin boundaries
    Returns
    -------
    assigned: int numpy ndarray 
        For each sample, this contains the bin id (0-indexed) to which the sample belongs.
    """
    orig_shape = x.shape
    # if not 1D data. so need to reshape data, then quantize, then reshape back
    if len(orig_shape)>1 or orig_shape[-1]!=1:  x = x.flatten()
    assigned = np.digitize(x, bins) # bin each input in data. np.digitize will always return a valid index between 0 and num_bins-1 whenever bins has length (num_bins-1) to cater for the open range on both sides
    if len(orig_shape)>1 or orig_shape[-1]!=1:  assigned = np.reshape(assigned, orig_shape)
    return assigned.astype(np.int)



######### Quantize data
def quantize_logodds(x, bins, bin_reprs, return_probs=True):
    """ 
    Quantize logodds (x) using bin boundaries (bins) and reprs in logit space and then convert to prob space if `return_probs=True`.
    Parameters
    ----------
    x: numpy ndarray
       Network logits as numpy array 
    bins: numpy ndarray
        Location of the (num_bins-1) bin boundaries
    bin_reprs: numpy ndarray
        Bin representations in logodds space. Contains (num_bins-1)=len(bins)+1 entries.
    return_probs: boolean (default: True)
        All operations take place in logodds space. Setting this to true will ensure that the values returned are in probability space (i.e. it will convert the quantized values from logodds to sigmoid before returning them)
    Returns
    -------
    quant_output: numpy ndarray
        The output of the quantization based on the bins and bin_reprs. Either the output will be in logodds space (i.e. return_probs=False) or in probability space.
    assigned: int numpy ndarray
        The bin assignment integers for each sample.
    """
    assigned = bin_data(x, bins) # log space
    quant_output = bin_reprs[assigned] # fill up representations based on assignments
    if return_probs:    quant_output = to_sigmoid(quant_output) # prob space 
    return pred_probs, assigned


########### Bin boundary update
def bin_boundary_update_closed_form(representations):
    """
    Closed form update of boundaries. stationary point when log(p(y=1|lambda)) - log(p(y=0|lambda)) = log(log(xxx)/log(xxx)) term. LHS side is logodds/boundaries when p(y|lambda) modelled with sigmoid (e.g. PPB )
    """
    temp_log = 1. + np.exp(-1*np.abs(representations))
    temp_log[temp_log==0] = EPS
    logphi_a = np.maximum(0., representations)  + np.log(temp_log)
    logphi_b = np.maximum(0., -1*representations) + np.log(temp_log)
    assert np.any(np.sign(logphi_a[1:]-logphi_a[:-1])*np.sign(logphi_b[:-1]-logphi_b[1:])>=0.)
    temp_log1 = np.abs( logphi_a[1:] - logphi_a[:-1] )
    temp_log2 = np.abs( logphi_b[:-1] - logphi_b[1:] )
    temp_log1[temp_log1==0] = EPS
    temp_log2[temp_log2==0] = EPS
    bin_boundaries = np.log(temp_log1) - np.log(temp_log2)               
    bin_boundaries = np.sort(bin_boundaries)
    return bin_boundaries




######### Bin representation code
def bin_representation_calculation(x, y, num_bins, bin_repr_scheme="sample_based", bin_boundaries=None, assigned=None, return_probs=False):
    """
    Bin representations: frequency based: num_positive_samples/num_total_samples in each bin.
        or pred_prob based: average of the sigmoid of lambda
    Function gets the bin representation which can be used during the MI maximization.
    Parameters
    ----------
    x: numpy ndarray
        logodds data which needs to be binned using bin_boundaries. Only needed if assigned not given.
    y: numpy ndarray
        Binary label for each sample
    bin_repr_scheme: strig
        scheme to use to determine bin reprs. options: 'sample_based' and 'pred_prob_based'
    bin_boundaries: numpy array
        logodds bin boundaries. Only needed when assigned is not given.
    assigned: int numpy array
        bin id assignments for each sample
    Returns
    -------
    quant_reprs: numpy array
        quantized bin reprs for each sample
    """
    assert (bin_boundaries is None) != (assigned is None), "Cant have or not have both arguments. Need exactly one of them."
    if assigned is None:    assigned = bin_data(x, bin_boundaries)

    if bin_repr_scheme=="sample_based":
        quant_reprs = bin_repr_unknown_LLR(y, assigned, num_bins, return_probs) # frequency estimate of correct/incorrect
    elif bin_repr_scheme=="pred_prob_based":
        quant_reprs = bin_repr_unknown_LLR(to_sigmoid(x), assigned, num_bins, return_probs) # softmax probability for bin reprs
    else:
        raise Exception("bin_repr_scheme=%s is not valid."%(bin_repr_scheme))
    return quant_reprs   

def bin_repr_unknown_LLR(sample_weights, assigned, num_bins, return_probs=False):
    """
    Unknown Bin reprs. Will take the average of the either the pred_probs or the binary labels.
    Determines the bin reprs by taking average of sample weights in each bin.
    For example for sample-based repr: sample_weights should be 0 or 1 indicating correctly classified or not.
    or for pred-probs-based repr: sample_weights should be the softmax output probabilities.
    Handles reshaping if sample_weights or assigned has more than 1 dim.
    
    Parameters
    ----------
    sample_weights: numpy ndarray
        array with the weight of each sample. These weights are used to calculate the bin representation by taking the averages of samples grouped together.
    assigned: int numpy array
        array with the bin ids of each sample
    return_probs: boolean (default: True)
        All operations take place in logodds space. Setting this to true will ensure that the values returned are in probability space (i.e. it will convert the quantized values from logodds to sigmoid before returning them)
    Returns
    -------
    representations: numpy ndarray
        representations of each sample based on the bin it was assigned to 
    """
    orig_shape = sample_weights.shape
    assert np.all(orig_shape==assigned.shape)
    assert sample_weights.max()<=1.0 and sample_weights.min()>=0.0, "make sure sample weights are probabilities"
    if len(orig_shape)>1:
        sample_weights = sample_weights.flatten()
        assigned = assigned.flatten()

    bin_sums_pos = np.bincount(assigned, weights=sample_weights, minlength=num_bins) # sum up all positive samples
    counts = np.bincount(assigned, minlength=num_bins) # sum up all samples in bin
    filt = counts>0
    prob_pos = np.ones(num_bins)*sample_weights.mean() # NOTE: important change: when no samples at all fall into any bin then default should be the prior
    prob_pos[filt] = bin_sums_pos[filt] / counts[filt] # get safe prob of pos samples over all samples
    representations = prob_pos 
    if return_probs==False:    representations = to_logodds( representations)#NOTE: converting to logit domain again
    return representations 

def bin_repr_known_LLR(bin_boundaries, prior_y_pos, distr_kde_dict):
    """
    Known Bin reprs (i.e. density based representation). Will get the bin representations based on the density estimated by KDE. 
    Much slower than unknown LLR. so only used when calculating the MI.
    Parameters
    ----------
    logodds: numpy ndarray
       data which will be used to estimate the KDE 
    y: numpy ndarray
        labels of the samples also used to get the positive and negative KDEs
    assigned: int numpy array
        array with the bin ids of each sample
    return_probs: boolean (default: True)
        All operations take place in logodds space. Setting this to true will ensure that the values returned are in probability space (i.e. it will convert the quantized values from logodds to sigmoid before returning them)
    Returns
    -------
    representations: numpy ndarray
        representations of each sample based on the bin it was assigned to 
    """
    distr_pos = distr_kde_dict["pos"] # scipy.stats.gaussian_kde(logodds[y==1])
    distr_neg = distr_kde_dict["neg"] # scipy.stats.gaussian_kde(logodds[y==0])
    prior_y_neg = 1 - prior_y_pos
    new_boundaries = np.hstack([-100, bin_boundaries , 100])
    new_reprs = np.zeros(len(bin_boundaries)+1)

    p_ypos_given_lam = np.zeros( len(bin_boundaries)+1 )
    p_yneg_given_lam = np.zeros( len(bin_boundaries)+1 )
    for idx in range( len(bin_boundaries) + 1):
        numer = prior_y_pos*distr_pos.integrate_box_1d(new_boundaries[idx], new_boundaries[idx+1]) # p(lam|y=1)*p(y=1)
        denom = prior_y_neg*distr_neg.integrate_box_1d(new_boundaries[idx], new_boundaries[idx+1]) # p(lam|y=0)*p(y=0)
        new_reprs[idx] = safe_log_diff(numer, denom, np.log)
        p_ypos_given_lam[idx] = numer
        p_yneg_given_lam[idx] = denom
    new_reprs[~np.isfinite(new_reprs)] = EPS
    new_reprs[new_reprs==0] = EPS
    return new_reprs, p_ypos_given_lam, p_yneg_given_lam






def MI_unknown_LLR(p_y_pos, logodds, bin_boundaries, representations):
    """logodds => the logodds which were used to bin. rewrote MI loss: sum_Y sum_B p(y'|lambda)p(lambda) for term outside log. Before it was p(lambda|y')p(y') """
    # NOTE: checked and matches impl of Dan: -1*MI_eval(**kwargs) => all good
    pred_probs = to_sigmoid(logodds)
    prior_y = AttrDict( dict(pos=p_y_pos, neg=1-p_y_pos)  )
    num_bins = len(bin_boundaries)+1
    # get p(y|lambda)p(lambda).... first get mean pred. prob. per bin
    assigned = bin_data(logodds, bin_boundaries)
    bin_sums_pred_probs_pos = np.bincount( assigned, weights=pred_probs, minlength=num_bins) # get the reprs in prob space because of mean.
    p_y_pos_given_lambda_per_bin = bin_sums_pred_probs_pos / logodds.shape[0]
    bin_sums_pred_probs_neg = np.bincount( assigned, weights=1-pred_probs, minlength=num_bins) # get the reprs in prob space because of mean.
    p_y_neg_given_lambda_per_bin = bin_sums_pred_probs_neg / logodds.shape[0]
    p_y_given_lambda_dict = AttrDict(dict(pos=p_y_pos_given_lambda_per_bin, neg=p_y_neg_given_lambda_per_bin))
    mi_loss = 0.0
    for binary_class_str, binary_class in zip( ["neg","pos"], [0,1] ):
        terms_in_log = (   1 + np.exp((1-2*binary_class) * representations)  )       *    prior_y[binary_class_str]   # part 3
        bin_summation_term =  np.sum(  p_y_given_lambda_dict[binary_class_str] * np.log(  terms_in_log ) )
        mi_loss += bin_summation_term
    return -1*mi_loss




def MI_known_LLR(bin_boundaries, p_y_pos, distr_kde_dict):
    """
    Calculate the MI(lambda_hat, y)(using the known LLR), where lambda_hat is the quantized lambdas.
    This will compute the MI in bits (log2).
    It uses a KDE to estimate the density of the positive and negative samples.
    At the end it will perform some basic checks to see if the computations were correct.
    In addition to the MI it will compute the bit rate (R) (i.e. MI(z, lambda) where z is quantized lambda)
    
    Parameters
    ----------
    bin_boundaries: numpy array
        bin boundaries
    p_y_pos: float
        p(y=1) prior
    distr_kde_dict: dict
        dictionary containing the KDE objects used to estimate the density in each bin with keys 'pos' and 'neg'.
    Returns
    -------
    MI: float
        MI(z, y) where z is quantized lambda. This is the mutual information between the quantizer output to the label.
    R: float
        bin rate. This is MI(z, lambda). Mutual Information between lambda and quantized lambda.
    """
    distr_pos, distr_neg = distr_kde_dict["pos"], distr_kde_dict["neg"]        
    p_y_neg = 1 - p_y_pos


    new_boundaries = np.hstack([-100, bin_boundaries, 100])
    # lists for checks afterwards
    all_vs, all_intpos, all_intneg = [], [], []
    MI, R = 0.0, 0.0
    for idx in range( len(bin_boundaries) + 1):
        integral_pos = p_y_pos*distr_pos.integrate_box_1d(new_boundaries[idx], new_boundaries[idx+1]) # p(lam|y=1)*p(y=1) = p(lam|y=1)
        integral_neg = p_y_neg*distr_neg.integrate_box_1d(new_boundaries[idx], new_boundaries[idx+1]) # p(lam|y=1)*p(y=1) = p(lam|y=0)
        repr = safe_log_diff(integral_pos, integral_neg, np.log)

        p_ypos_given_z = max( EPS, to_sigmoid(   repr) )
        p_yneg_given_z = max( EPS, to_sigmoid(-1*repr) )

        curr_MI_pos = integral_pos * ( safe_log_diff( p_ypos_given_z, p_y_pos, np.log2    ) )
        curr_MI_neg = integral_neg * ( safe_log_diff( p_yneg_given_z, p_y_neg, np.log2    ) )
        MI += curr_MI_pos + curr_MI_neg 

        v = max( EPS, (integral_pos + integral_neg)     )
        curr_R = -1 * v * np.log2(v) # entropy of p(z) = p(z|y=1)p(y=1) + p(z|y=0)p(y=0)
        R += curr_R
        # gather for checks
        all_vs.append(v)
        all_intpos.append(integral_pos); all_intneg.append(integral_neg)
    np.testing.assert_almost_equal( np.sum(all_vs), 1.0 , decimal=1)
    np.testing.assert_almost_equal( np.sum(all_intpos), p_y_pos, decimal=1)
    np.testing.assert_almost_equal( np.sum(all_intneg), p_y_neg, decimal=1)
    return MI, R


def MI_upper_bounds(p_y_pos, distr_kde_dict):
    """
    Calculate the MI upper bound of MI(z, y) <= MI(lambda, y). As z is the quantized version of lambda, MI(z, y) is upper bounded by MI(lambda, y).
    This is a tigther bound than H(y). This function will return both upper bounds.
    Bound 1: MI(z, y) <= H(y) - H(y|z) <= H(y)
    Bound 2: MI(z, y) <= MI(lambda, y)
    Parameters
    ----------
    p_y_pos: float
        p(y=1) prior
    distr_kde_dict: dict
        dictionary containing the KDE objects used to estimate the density in each bin with keys 'pos' and 'neg'.
    Returns
    -------
    H_y: float
        Loose upper bound which is H(y)        
    MI_y_lambda: float
        Upper bound of MI(z, y) which is upper bounded by MI(lambda, y). Tigther bound than H(y)
    """
    tic = time.time()
    p_y_neg = 1 - p_y_pos

    # Bound 1
    H_y = -1*p_y_pos*np.log2(p_y_pos) + -1*p_y_neg*np.log2(p_y_neg)

    # Bound 2
    distr_pos, distr_neg = distr_kde_dict["pos"], distr_kde_dict["neg"]        
    def get_logodd_lambda(lam):
        log_term_1 = p_y_pos * distr_pos.pdf(lam)
        log_term_2 = p_y_neg * distr_neg.pdf(lam) 
        logodd_lambda = safe_log_diff(log_term_1, log_term_2, np.log)
        return logodd_lambda

    def integral_pos(lam):
        logodd_lambda = get_logodd_lambda(lam)
        p_ypos_lambda = to_sigmoid(    logodd_lambda  )
        return p_y_pos * distr_pos.pdf(lam) * safe_log_diff( p_ypos_lambda, p_y_pos, np.log2)  #np.log2(  p_ypos_lambda    /   p_y_pos  )
	
    def integral_neg(lam):
        logodd_lambda = get_logodd_lambda(lam)
        p_yneg_lambda = to_sigmoid(  -1*  logodd_lambda  )
        return p_y_neg * distr_neg.pdf(lam) * safe_log_diff(p_yneg_lambda, p_y_neg, np.log2) #np.log2(  p_yneg_lambda    /   p_y_neg  )    
	
    term_pos = integrate.quad(integral_pos, -100, 100, limit=100)[0]
    term_neg = integrate.quad(integral_neg, -100, 100, limit=100)[0]
    MI_y_lambda =  term_pos + term_neg


    toc = time.time()
    print("Time elapsed: upper bound computation: ", (toc-tic), " seconds!")
    return H_y, MI_y_lambda


In [3]:
!pip install attrdict

Collecting attrdict
  Downloading attrdict-2.0.1-py2.py3-none-any.whl (9.9 kB)
Installing collected packages: attrdict
Successfully installed attrdict-2.0.1


In [4]:
!pip install deepdish

Collecting deepdish
  Downloading deepdish-0.3.7-py2.py3-none-any.whl (37 kB)
Installing collected packages: deepdish
Successfully installed deepdish-0.3.7


In [5]:
import os
import numpy as np
from attrdict import AttrDict
import deepdish
import time

def deepdish_read(fpath, group=None):
    ''' Read all data inside the hdf5 file '''
    data = deepdish.io.load(fpath, group=group)
    if isinstance(data, dict):
        data = AttrDict(data)
    return data

def deepdish_write(fpath, data):
    ''' Save a dictionary as a hdf5 file! '''
    create_dir_for_fpath(fpath)
    deepdish.io.save(fpath, data, compression="None")
    



class Logger:
    def __init__(self, fpath):
        self.fpath = fpath
        self.logdata = AttrDict({})        

    def log(self, key, value):
        if key not in self.logdata:  self.logdata[key] = []
        self.logdata[key].append(value)

    def last(self, key):
        return self.logdata[key][-1]

    def log_dict(self, dictionary, suffix=""):
        # logging each element in the dictionary
        suffix = "_%s"%(suffix) if (suffix != "" and suffix[0]!="_") else suffix
        for k,v in dictionary.items():
            self.log(k+suffix,v)


    def end_log(self):
        for k,v in self.logdata.items():
            self.logdata[k] = np.array(v) if isinstance(v, list) else v

    def save_log(self):
        deepdish_write(self.fpath, self.logdata)

In [6]:
import numpy as np
import sklearn.model_selection

#EPS = np.finfo(float).eps # used to avoid division by zero
EPS = 1e-50

def is_sorted(a):
    for i in range(a.size-1):
        if a[i+1] < a[i] :  return False
    return True


def to_softmax(x, axis=-1):
    """
    Stable softmax in numpy. Will be applied across last dimension by default.
    Takes care of numerical instabilities like division by zero or log(0).
    Parameters
    ----------
    x : numpy ndarray 
       Logits of the network as numpy array. 
    axis: int
       Dimension along which to apply the operation (default: last one) 
    Returns
    -------
    softmax: numpy ndarray 
       Softmax output 
    """
    z = x - np.max(x, axis=axis, keepdims=True)
    numerator = np.exp(z)
    denominator = np.sum(numerator, axis=axis, keepdims=True)
    softmax = numerator / denominator
    assert np.all( np.isfinite(softmax) ) == True , "Softmax output contains NaNs. Handle this."
    return softmax

def to_sigmoid(x): 
    """
    Stable sigmoid in numpy. Uses tanh for a more stable sigmoid function. 
    
    Parameters
    ----------
    x : numpy ndarray 
       Logits of the network as numpy array. 
    Returns
    -------
    sigmoid: numpy ndarray 
       Sigmoid output 
    """
    sigmoid = 0.5 + 0.5 * np.tanh(x/2)
    assert np.all( np.isfinite(sigmoid) ) == True , "Sigmoid output contains NaNs. Handle this."
    return sigmoid 

def to_logodds(x):
    """
    Convert probabilities to logodds using:
    .. math:: 
        \\log\\frac{p}{1-p} ~ \\text{where} ~ p \\in [0,1]
    Natural log.
    Parameters
    ----------
    x : numpy ndarray 
       Class probabilties as numpy array. 
    Returns
    -------
    logodds : numpy ndarray 
       Logodds output 
    """
    x = np.clip(x, 1e-10, 1 - 1e-10)
    assert x.max() <= 1 and x.min() >= 0
    numerator = x
    denominator = 1-x
    #numerator[numerator==0] = EPS
    #denominator[denominator==0] = EPS # 1-EPS is basically 1 so not stable!
    logodds = safe_log_diff(numerator, denominator, np.log) # logodds = np.log( numerator/denominator   )
    assert np.all(np.isfinite(logodds))==True, "Logodds output contains NaNs. Handle this."
    return logodds

def safe_log_diff(A, B, log_func=np.log):
    """
    Numerically stable log difference function. Avoids log(0). Will compute log(A/B) safely where the log is determined by the log_func
    """
    if np.isscalar(A):
        if A==0 and B==0:
            return log_func(EPS)
        elif A==0:
            return log_func(  EPS   )  - log_func(B)
        elif B==0:
            return log_func(  A   )  - log_func( EPS )
        else:
            return log_func(A) - log_func(B)
    else:
        # log(A) - log(B)
        with np.errstate(divide='ignore'):
            output = np.where(A==0, log_func(EPS), log_func(A) ) - np.where(B==0, log_func(EPS), log_func(B))
            output[ np.logical_or(A==0, B==0)] = log_func(EPS)
            assert np.all(np.isfinite(output))
        return output




def quick_logits_to_logodds(logits, probs=None):
    """
    Using the log-sum-exp trick can be slow to convert from logits to logodds. This function will use the faster prob_to_logodds if n_classes is large.
    """
    n_classes = logits.shape[-1]
    if n_classes <=100:   # n_classes are reasonable as use this slow way to get marginal
        logodds = logits_to_logodds(logits) 
    else: # imagenet case will always come here! 
        if probs is None:   probs = to_softmax(logits)
        logodds = probs_to_logodds(probs)
    return logodds

def probs_to_logodds(x):
    """
    Use probabilities to convert to logodds. Faster than logits_to_logodds.
    """
    assert x.max() <= 1 and x.min() >= 0
    logodds = to_logodds(x)
    assert np.all(np.isfinite(logodds))
    return logodds

def logits_to_logodds(x):
    """
    Convert network logits directly to logodds (without conversion to probabilities and then back to logodds) using:
    .. math:: 
        \\lambda_k=z_k-\\log\\sum\\nolimits_{k'\\not = k}e^{z_{k'}}
    Parameters
    ----------
    x: numpy ndarray 
       Network logits as numpy array 
    axis: int
        Dimension with classes
    Returns
    -------
    logodds : numpy ndarray 
       Logodds output 
    """
    n_classes = x.shape[1]
    all_logodds = []
    for class_id in range(n_classes):
        logodds_c = x[...,class_id][..., np.newaxis] - custom_logsumexp(  np.delete(x, class_id, axis=-1) , axis=-1)
        all_logodds.append(logodds_c.reshape(-1))
    logodds = np.stack( all_logodds, axis=1 )
    assert np.all(np.isfinite(logodds))
    return logodds

def custom_logsumexp(x, axis=-1):
    """
    Uses the log-sum-exp trick.
    Parameters
    ----------
    x: numpy ndarray 
       Network logits as numpy array 
    
    axis: int (default -1)
        axis along which to take the sum
    Returns
    -------
    out: numpy ndarray
        log-sum-exp of x along some axis
    """
    x_max = np.amax(x, axis=axis, keepdims=True)
    x_max[~np.isfinite(x_max)] = 0
    tmp = np.exp(x - x_max)
    s = np.sum(tmp, axis=axis, keepdims=True)
    s[s<=0] = EPS # only add epsilon when argument is zero
    out = np.log(s)
    out += x_max
    return out






def to_onehot(y, num_classes):
    """
    Convert 1D targets to one-hot repr.
    Parameters
    ----------
    y: numpy 1D-array 
        Array with sample target ids (i.e. 0 to <num_classes>-1)
    num_classes: int
        Number of classes
    Returns
    -------
    y_onehot: numpy ndarray
        One-hot representation of target labels
    """
    assert len(y.shape)==1
    y_onehot = np.eye(num_classes)[y]
    return y_onehot


def binary_convertor(logodds, y, cal_setting, class_idx):
    """
    Function to convert the logodds data (in multi-class setting) to binary setting. The following options are available:
    1) CW - slice out some class: cal_setting="CW" and class_idx is not None (int)
    2) top1 - max class for each sample: get the top1 prediction: cal_setting="top1" and class_idx is None
    3) sCW - merge marginal setting where data is combined: cal_setting="sCW" and class_idx is None
    """

    if cal_setting=="CW":
        assert class_idx is not None, "class_idx needs to be an integer to slice out class needed for CW calibration setting"
        logodds_c = logodds[..., class_idx]
        y_c = y[..., class_idx] if y is not None else None
    elif cal_setting=="top1":
        assert class_idx is None, "class_idx needs to be None - check"
        top1_indices = logodds.argmax(axis=-1)
        logodds_c = logodds[np.arange(top1_indices.shape[0]), top1_indices]
        y_c = y.argmax(axis=-1) == top1_indices if y is not None else None 
    elif cal_setting=="sCW":
        assert class_idx is None, "class_idx needs to be None - check"
        logodds_c = np.concatenate(logodds.T)
        y_c = np.concatenate(y.T) if y is not None else None
    else:
        raise Exception("Calibration setting (%s) not recognized!"%(cal_setting))
    
    return logodds_c, y_c




In [7]:
import numpy as np
# import imax_calib.as # import imax_calib.utils as # import imax_calib.as from scipy.cluster.vq import kmeans,vq
import scipy.cluster.vq
import os
import contextlib

# from imax_calib.calibrators.binners import run_imax



def compute_top_1_and_CW_ECEs(multi_cls_probs, multi_cls_labels, list_approximators=["dECE", "mECE", "iECE", "kECE"], num_bins=100, threshold_mode='class'):
    """
    Given the multi-class predictions and labels, this function computes the top1 and CW ECEs. Will compute it by calling the other functions in this script.
    Parameters:
    -----------
    multi_cls_probs: 2D ndarray
        predicted probabilities
    multi_cls_labels: 1D or 2D ndarray
        label indices or one-hot labels. Will be converted to one-hot
    Return:
    -------
    ece_dict: dict
        Dictionary with all the ECE estimates
    """
    assert len(multi_cls_probs.shape)==2
    if len(multi_cls_labels.shape)==1: # not one-hot. so convert to one-hot
        multi_cls_labels = np.eye(multi_cls_probs.shape[1])[multi_cls_labels]

    ece_evals_dict = AttrDict({})
    
    n_classes = multi_cls_probs.shape[1]
    for ece_approx in list_approximators:
        top_1_preds  = multi_cls_probs.max(axis=-1)
        top_1_correct=multi_cls_probs.argmax(axis=-1) == multi_cls_labels.argmax(axis=-1)

        top_1_ECE = eval("measure_%s_calibration"%(ece_approx))(pred_probs=top_1_preds, correct=top_1_correct, num_bins=num_bins)["ece"]
    
        cw_ECEs = []
        if threshold_mode == 'class':
            threshold = 1.0/n_classes
        elif threshold_mode is None:
            threshold = 0.
        for class_idx in range(n_classes):
            cw_ECE = eval("measure_%s_calibration"%(ece_approx))(pred_probs=multi_cls_probs[:, class_idx],
                                                                 correct=multi_cls_labels[:, class_idx],
                                                                 num_bins=num_bins, threshold=threshold)["ece"]
            cw_ECEs.append(cw_ECE)
        mean_cw_ECE = np.mean(cw_ECEs)
        
        ece_evals_dict["top_1_%s"%(ece_approx)] = top_1_ECE
        ece_evals_dict["cw_%s"%(ece_approx)] = mean_cw_ECE 

    return ece_evals_dict


def _ece(avg_confs, avg_accs, counts):
    """
    Helper function to compute the Expected Calibration Error.
    Parameters
    ----------
    avg_confs: Averaged probability of predictions per bin (confidence)
    avg_accs: Averaged true accuracy of predictions per bin
    counts: Number of predictions per bin
    Returns
    -------
    ece: float - calibration error
    """
    return np.sum((counts / counts.sum()) * np.absolute(avg_confs- avg_accs))


def measure_iECE_calibration(pred_probs, correct, num_bins, threshold=-1):
    """
    Compute the calibration curve using I-Max binning scheme. This will run the I-Max algorithm on the TEST set and get the bin boundaries.
    Parameters
    ----------
    y: numpy binary array
        label indicating if sample is positive or negative
    
    for rest see calibration_error_and_curve()
    Returns
    -------
        see calibration_error_and_curve()
    """
    #print("Running iECE calc.: calling I-Max now!")
    logodds = to_logodds(pred_probs)
    with open(os.devnull, "w") as f, contextlib.redirect_stdout(f), contextlib.redirect_stderr(f):
        logdata = run_imax(logodds, correct, num_bins, log_every_steps=None, logfpath=None )
    bin_boundaries = logdata.bin_boundaries[-1]
    assigned = bin_data(logodds, bin_boundaries)
    return calibration_error_and_curve(pred_probs, correct, assigned, num_bins, threshold)

def measure_dECE_calibration(pred_probs, correct, num_bins=100, threshold=-1):
    """
    Compute the calibration curve using the equal size binning scheme (i.e. equal size bins)and computes the calibration error given this binning scheme (i.e. dECE).
    Parameters
    ----------
        see calibration_error_and_curve()
    Returns
    -------
        see calibration_error_and_curve()
    """
    assert len(pred_probs.shape)==1
    bin_boundaries_prob = to_sigmoid( nolearn_bin_boundaries(num_bins, binning_scheme="eqsize") )
    assigned = bin_data(pred_probs, bin_boundaries_prob)
    return calibration_error_and_curve(pred_probs, correct, assigned, num_bins, threshold)


def measure_mECE_calibration(pred_probs, correct, num_bins=100, threshold=-1):
    """
    Compute the calibration curve using the equal mass binning scheme (i.e. equal mass bins)and computes the calibration error given this binning scheme (i.e. mECE).
    Parameters
    ----------
        see calibration_error_and_curve()
    Returns
    -------
        see calibration_error_and_curve()
    """
    assert len(pred_probs.shape)==1
    logodds = to_logodds(pred_probs)
    #if logodds.max()<=1 and logodds.min()>=0:
    bin_boundaries_prob = to_sigmoid( nolearn_bin_boundaries(num_bins, binning_scheme="eqmass", x=logodds) )
    assigned = bin_data(pred_probs, bin_boundaries_prob)
    return calibration_error_and_curve(pred_probs, correct, assigned, num_bins, threshold)

def measure_kECE_calibration(pred_probs, correct, num_bins=100, threshold=-1):
    """
    Compute the calibration curve using the kmeans binning scheme (i.e. use kmeans to cluster the data and then determine the bin assignments) and computes the calibration error given this binning scheme (i.e. kECE).
    Parameters
    ----------
        see calibration_error_and_curve()
    Returns
    -------
        see calibration_error_and_curve()
    """

    assert len(pred_probs.shape)==1
    centroids,_ = scipy.cluster.vq.kmeans(pred_probs, num_bins)
    cluster_ids, _ = scipy.cluster.vq.vq(pred_probs, centroids)
    cluster_ids = cluster_ids.astype(np.int)
    return calibration_error_and_curve(pred_probs, correct, cluster_ids, num_bins, threshold)

 
def measure_quantized_calibration(pred_probs, correct, assigned, num_bins=100, threshold=-1):
    """
    Compute the calibration curve given the bin assignments (i.e. quantized values). 
    """
    assert len(pred_probs.shape)==1
    return calibration_error_and_curve(pred_probs, correct, assigned, num_bins, threshold)


def calibration_error_and_curve(pred_probs, correct, assigned, num_bins=100, threshold=-1):
    """
    Compute the calibration curve and calibration error. The threshold float will determine which samples to ignore because its confidence is very low.
    Parameters
    ----------
        see calibration_curve_quantized()
    Returns
    -------
    results: dict
        dictionary with calibration information
    """
    assert len(pred_probs.shape)==1
    mask = pred_probs>threshold
    pred_probs, correct, assigned = pred_probs[mask], correct[mask], assigned[mask]
    cov = mask.mean()
    prob_pred, prob_true, counts, counts_unfilt = calibration_curve_quantized(pred_probs, correct, assigned=assigned, num_bins=num_bins)
    ece = _ece(prob_pred, prob_true, counts)
    return {"ece": ece, "prob_pred":prob_pred, "prob_true":prob_true, "counts":counts, "counts_unfilt":counts_unfilt, "threshold":threshold, "cov":cov}


def calibration_curve_quantized(pred_probs, correct, assigned, num_bins=100):
    """
    Get the calibration curve given the bin assignments, samples and sample-correctness. 
    Parameters
    ----------
    pred_probs: numpy ndarray
        numpy array with predicted probabilities (i.e. confidences)
    correct: numpy ndarray
        0/1 indicating if the sample was correctly classified or not
    num_bins: int
        number of bins for quantization
    Returns
    -------
    prob_pred: for each bin the avg. confidence 
    prob_true: for each bin the avg. accuracy 
    counts: number of samples in each bin 
    counts_unfilt: same as `counts` but also including zero bins
    """
    assert len(pred_probs.shape)==1
    bin_sums_pred = np.bincount(assigned, weights=pred_probs,  minlength=num_bins)
    bin_sums_true = np.bincount(assigned, weights=correct, minlength=num_bins)
    counts = np.bincount(assigned, minlength=num_bins)
    filt = counts > 0
    prob_pred = (bin_sums_pred[filt] / counts[filt])
    prob_true = (bin_sums_true[filt] / counts[filt])
    counts_unfilt = counts
    counts = counts[filt]
    return prob_pred, prob_true, counts, counts_unfilt

In [8]:
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import GridSearchCV, StratifiedKFold
import warnings
from torch import nn, optim


def get_ECE(probs, y_true, mode='mass', num_bins=15):
    """ Estimates the top-label ECE by binning.
    Args:
        probs: shape (n_samples, n_classes)
        y_true: shape (n_samples, )
        mode: Either 'mass' or 'width' -- determines binning scheme
        num_bins: Number of bins used in estimation
    """
    if mode == 'mass':
        _mode = 'mECE'
    elif mode == 'width':
        _mode = 'dECE'

    evals = compute_top_1_and_CW_ECEs(probs, y_true, list_approximators=[_mode], num_bins=num_bins)
    return evals[f'top_1_{_mode}']

def get_acc(y_pred, y_true):
    """ Computes the accuracy of predictions.
    If y_pred is 2D, it is assumed that it is a matrix of scores (e.g. probabilities) of shape (n_samples, n_classes)
    """
    if y_pred.ndim == 1:
        return np.mean(y_pred == y_true)
    elif y_pred.ndim == 2:
        return np.mean(np.argmax(y_pred, axis=1), y_true)

class DoubleConfusionCombiner:
    """ Implements the double-confusion matrix combiner ("L+L") using maximum likelihood inference
    """
    def __init__(self, calibration_method='temperature scaling'):
        self.confusion_matrix_h = None  # entry [i, j] is P(h = i | Y = j)
        self.confusion_matrix_m = None  # entry [i, j] is P(Y = j | m = i)

        self.n_train_u = None  # Amount of unlabeled training data
        self.n_train_l = None  # Amount of labeled training data
        self.n_cls = None  # Number of classes

        self.eps = 1e-50

    def fit(self, model_probs, y_h, y_true):
        self.n_cls = model_probs.shape[1]
        self.n_train_l = y_true.size

        # Estimate human confusion matrix
        # Entry [i, j]  is #(Y = i and h = j)
        conf_h = 1. * confusion_matrix(y_true, y_h, labels=np.arange(self.n_cls))
        # Swap so entry [i, j] is #(h = i and Y = j)
        conf_h = conf_h.T
        conf_h = np.clip(conf_h, self.eps, None)
        normalizer = np.sum(conf_h, axis=0, keepdims=True)
        # Normalize columns so entry [i, j] is P(h = i | Y = j)
        conf_h /= normalizer
        self.confusion_matrix_h = conf_h

        # Estimate model confusion matrix
        y_m = np.argmax(model_probs, axis=1)
        # [i, j] = #(Y = i and m = j)
        conf_m = 1. * confusion_matrix(y_true, y_m, labels=np.arange(self.n_cls))
        conf_m = conf_m.T  # [i, j] = #(m = i and Y = j)
        conf_m = np.clip(conf_m, self.eps, None)
        normalizer = np.sum(conf_m, axis=1, keepdims=True)  # NB: normalize rows here, not columns!
        conf_m /= normalizer
        self.confusion_matrix_m = conf_m


class OracleCombiner:
    """ Implements the P+L combination method, fit using maximum likelihood
    """
    def __init__(self, calibration_method='temperature scaling', **kwargs):
        self.calibrator = None
        self.confusion_matrix = None  # conf[i, j] is assumed to be P(h = i | Y = j)

        self.n_train_u = None  # Amount of unlabeled training data
        self.n_train_l = None  # Amount of labeled training data
        self.n_cls = None  # Number of classes

        self.eps = 1e-50

        self.use_cv = False
        self.calibration_method = calibration_method
        if self.calibration_method == 'temperature scaling':
            self.calibrator = TSCalibrator()
        # elif self.calibration_method == 'dirichlet':
        #     # reg_norm : bool, true if regularization is used
        #     # reg_mu : None or float, if None regular L2 regularization is used
        #     # reg_lambda : 0 or float, l2 regularization term
        #     from dirichlet_python.dirichletcal.calib.fulldirichlet import FullDirichletCalibrator
        #     self.calibrator = FullDirichletCalibrator(reg_norm=True, reg_lambda=0.0, reg_mu=None)
        #     self.use_cv = True
        # elif self.calibration_method == 'ensemble temperature scaling':
        #     self.calibrator = EnsembleTSCalibrator()
        # elif self.calibration_method == 'imax binning':
        #     mode = kwargs.pop('mode', 'sCW')
        #     num_bins = kwargs.pop('num_bins', 15)
        #     self.calibrator = IMaxCalibrator(mode=mode, num_bins=num_bins)
        elif self.calibration_method == 'none':
            self.calibrator = IdentityCalibrator()

    def calibrate(self, model_probs):
        return self.calibrator.calibrate(model_probs)

    def fit(self, model_probs, y_h, y_true):
        self.n_cls = model_probs.shape[1]

        # Estimate human confusion matrix
        # Entry [i, j]  is #(Y = i and h = j)
        conf_h = 1. * confusion_matrix(y_true, y_h, labels=np.arange(self.n_cls))
        # Swap so entry [i, j] is #(h = i and Y = j)
        conf_h = conf_h.T
        conf_h = np.clip(conf_h, self.eps, None)
        normalizer = np.sum(conf_h, axis=0, keepdims=True)
        # Normalize columns so entry [i, j] is P(h = i | Y = j)
        conf_h /= normalizer
        self.confusion_matrix = conf_h

        # Calibrate model probabilities
        if self.use_cv:
            self.fit_calibrator_cv(model_probs, y_true)
        else:
            self.fit_calibrator(model_probs, y_true)

    def fit_bayesian(self, model_probs, y_h, y_true, alpha=0.1, beta=0.1):
        """ This is the "plus one" parameterization, i.e. alpha,beta just need to be > 0
        Really corresponds to a Dirichlet(alpha+1, beta+1, beta+1, . . . ,beta+1) distribution
        """
        self.n_cls = model_probs.shape[1]

        prior_matr = np.eye(self.n_cls) * alpha + (np.ones(self.n_cls) - np.eye(self.n_cls)) * beta

        conf_h = 1. * confusion_matrix(y_true, y_h, labels=np.arange(self.n_cls))
        conf_h += prior_matr
        # Swap so entry [i, j] is #(h = i and Y = j)
        conf_h = conf_h.T
        #conf_h = np.clip(conf_h, self.eps, None)
        normalizer = np.sum(conf_h, axis=0, keepdims=True)
        # Normalize columns so entry [i, j] is P(h = i | Y = j)
        conf_h = conf_h / normalizer
        self.confusion_matrix = conf_h

        # Calibrate model probabilities
        if self.use_cv:
            self.fit_calibrator_cv(model_probs, y_true)
        else:
            self.fit_calibrator(model_probs, y_true)

    def fit_calibrator(self, model_probs, y_true):
        clipped_model_probs = np.clip(model_probs, self.eps, 1)
        model_logits = np.log(clipped_model_probs)
        self.calibrator.fit(model_logits, y_true)

    def fit_calibrator_cv(self, model_probs, y_true):
        # Fits calibration maps that require hyperparameters, using cross-validation
        if self.calibration_method == 'dirichlet':
            reg_lambda_vals = [10., 1., 0., 5e-1, 1e-1, 1e-2, 1e-3]
            skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=0)
            gscv = GridSearchCV(self.calibrator, param_grid={'reg_lambda': reg_lambda_vals,
                                                             'reg_mu': [None]},
                                cv=skf, scoring='neg_log_loss', refit=True)
            gscv.fit(model_probs, y_true)
            self.calibrator = gscv.best_estimator_
        else:
            raise NotImplementedError

    def combine_proba(self, model_probs, y_h):
        """ Combines model probabilities with hard labels via the calibrate-confuse equation given the confusion matrix.
        Args:
            p_m: Array of model probabilities ; shape (n_samples, n_classes)
            y_h: List of hard labels ; shape (n_samples,)
        Returns:
            Normalized posterior probabilities P(Y | m, h). Entry [i, j] is P(Y = j | h_i, m_i)
        """
        assert model_probs.shape[0] == y_h.size, 'Size mismatch between model probs and human labels'
        assert model_probs.shape[1] == self.n_cls, 'Size mismatch between model probs and number of classes'

        n_samples = model_probs.shape[0]
        calibrated_model_probs = self.calibrate(model_probs)

        y_comb = np.empty((n_samples, self.n_cls))
        for i in range(n_samples):
            y_comb[i] = calibrated_model_probs[i] * self.confusion_matrix[y_h[i]]
            if np.allclose(y_comb[i], 0):  # Handle zero rows
                y_comb[i] = np.ones(self.n_cls) * (1./self.n_cls)

        # Don't forget to normalize :)
        assert np.all(np.isfinite(np.sum(y_comb, axis=1)))
        assert np.all(np.sum(y_comb, axis=1) > 0)
        y_comb /= np.sum(y_comb, axis=1, keepdims=True)
        return y_comb

    def combine(self, model_probs, y_h):
        """ Combines model probs and y_h to return hard labels
        """
        y_comb_soft = self.combine_proba(model_probs, y_h)
        return np.argmax(y_comb_soft, axis=1)

class BaseCalibrator:
    """ Abstract calibrator class
    """
    def __init__(self):
        self.n_classes = None

    def fit(self, logits, y):
        raise NotImplementedError

    def calibrate(self, probs):
        raise NotImplementedError

class TSCalibrator(BaseCalibrator):
    """ Maximum likelihood temperature scaling (Guo et al., 2017)
    """

    def __init__(self, temperature=1.):
        super().__init__()
        self.temperature = temperature

        self.loss_trace = None

    def fit(self, logits, y):
        """ Fits temperature scaling using hard labels.
        """
        # Pre-processing
        self.n_classes = logits.shape[1]
        _model_logits = torch.from_numpy(logits)
        _y = torch.from_numpy(y)
        _temperature = torch.tensor(self.temperature, requires_grad=True)

        # Optimization parameters
        nll = nn.CrossEntropyLoss()  # Supervised hard-label loss
        num_steps = 7500
        learning_rate = 0.05
        grad_tol = 1e-3  # Gradient tolerance for early stopping
        min_temp, max_temp = 1e-2, 1e4  # Upper / lower bounds on temperature

        optimizer = optim.Adam([_temperature], lr=learning_rate)

        loss_trace = []  # Track loss over iterations
        step = 0
        converged = False
        while not converged:

            optimizer.zero_grad()
            loss = nll(_model_logits / _temperature, _y)
            loss.backward()
            optimizer.step()
            loss_trace.append(loss.item())

            with torch.no_grad():
                _temperature.clamp_(min=min_temp, max=max_temp)

            step += 1
            if step > num_steps:
                warnings.warn('Maximum number of steps reached -- may not have converged (TS)')
            converged = (step > num_steps) or (np.abs(_temperature.grad) < grad_tol)

        self.loss_trace = loss_trace
        self.temperature = _temperature.item()

    def calibrate(self, probs):
        calibrated_probs = probs ** (1. / self.temperature)  # Temper
        calibrated_probs /= np.sum(calibrated_probs, axis=1, keepdims=True)  # Normalize
        return calibrated_probs

class IdentityCalibrator(BaseCalibrator):
    """ A class that implements no recalibration.
    """

    def fit(self, probs, y):
        return

    def calibrate(self, probs):
        return probs

def load_CIFAR10H(model_name):
    """ Loads the CIFAR-10H predictions (human and model) and true labels.
    """
    dirname = os.path.dirname("")
    if model_name == 'r_low_acc':
        data_path = os.path.join(dirname, 'human_model_truth_cifar10h.csv')
        data = np.genfromtxt(data_path, delimiter=',')

        human_counts = data[:, :10]
        model_probs = data[:, 10:20]
        true_labels = data[:, -1]

        true_labels -= 1  # data has labels 1-10 -- shifting so that they are zero-indexed.
    else:
        data_path = os.path.join(dirname, f'{model_name}.csv')
        data = np.genfromtxt(data_path, delimiter=',')

        true_labels = data[:, 0]
        human_counts = data[:, 1:11]
        model_probs = data[:, 11:]

    true_labels = true_labels.astype(int)

    return human_counts, model_probs, true_labels


def simulate_single_human(human_counts, seed=0):
    rng = np.random.default_rng(seed)

    human_labels_per_input = np.sum(human_counts, axis=1)
    min_human_labels = int(min(human_labels_per_input))
    n_rows = human_counts.shape[0]
    n_classes = human_counts.shape[1]

    human_labels = np.empty(shape=(n_rows, min_human_labels))
    for row in range(n_rows):
        temp = []
        for i in range(n_classes):
            temp += [i] * int(human_counts[row, i])
        rng.shuffle(temp)
        human_labels[row, :] = temp[:min_human_labels]

    return human_labels[:, 0].astype(int)

##1.Caliberation_experiment

In [9]:
def get_NLL(probs, y_true):
    """ Computes the negative log likelihood.
    """
    nll = nn.NLLLoss()
    _probs = np.clip(probs, 1e-100, 1)
    logprobs = torch.from_numpy(np.log(_probs))
    return nll(logprobs, torch.from_numpy(y_true)).item()


# Generates the data for Appendix C in our paper.

def _run_experiment(y_h=None, model_probs=None, y_true=None, **kwargs):
    seed = kwargs.pop('seed', 0)
    n_runs = kwargs.pop('n_runs', 25)
    test_size = kwargs.pop('test_size', 0.3)
    calibration_methods = kwargs.pop('calibration_methods', ['none'])
    calibration_metrics = kwargs.pop('calibration_metrics', {'ECE': get_ECE})
    output_file_acc = kwargs.pop('output_file_acc', './acc.csv')
    output_file_calibration = kwargs.pop('output_file_calibration', './cal.csv')

    acc_data = []
    cal_data = []
    for i in tqdm(range(n_runs), leave=False, desc='Runs'):
        # Train/test split
        y_h_tr, y_h_te, model_probs_tr, model_probs_te, y_true_tr, y_true_te = train_test_split(
            y_h, model_probs, y_true, test_size=test_size, random_state=i * seed)

        acc_h = get_acc(y_h_te, y_true_te)
        acc_m = get_acc(np.argmax(model_probs_te, axis=1), y_true_te)

        _acc_data = [acc_h, acc_m]
        _cal_data = []
        for calibration_method in calibration_methods:
            if calibration_method == 'confusion':
                combiner = DoubleConfusionCombiner()
                combiner.fit(model_probs_tr, y_h_tr, y_true_tr)
            else:
                combiner = OracleCombiner(calibration_method=calibration_method)
                combiner.fit(model_probs_tr, y_h_tr, y_true_tr)

            y_comb_te = combiner.combine(model_probs_te, y_h_te)
            acc_comb = get_acc(y_comb_te, y_true_te)
            _acc_data.append(acc_comb)

            model_probs_calibrated_te = combiner.calibrate(model_probs_te)
            y_comb_prob_te = combiner.combine_proba(model_probs_te, y_h_te)
            for metric, fxn in calibration_metrics.items():
                cal_m = fxn(model_probs_calibrated_te, y_true_te)
                cal_comb = fxn(y_comb_prob_te, y_true_te)
                _cal_data.append([calibration_method, metric, cal_m, cal_comb])

        acc_data += [_acc_data]
        cal_data += _cal_data

    # Save data to CSV
    header_acc = ['human', 'model'] + [f'comb {cal_m}' for cal_m in calibration_methods]
    with open(output_file_acc, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_acc)
        writer.writerows(acc_data)
    header_cal = ['calibration method', 'metric', 'model', 'comb']
    with open(output_file_calibration, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_cal)
        writer.writerows(cal_data)


def run_experiment_cifar10(out_fpath=None, experiment_args=None, seed=0):
    model_names = ['r_low_acc', 'resnet-110', 'preresnet-110', 'densenet-bc-L190-k40']
    for model_name in tqdm(model_names, desc='Models', leave=True):
        # Specify output files
        output_file_acc = out_fpath + f'{model_name}_accuracy.csv'
        output_file_calibration = out_fpath + f'{model_name}_calibration.csv'
        assert not os.path.exists(output_file_acc), 'Output filepath already exists'
        assert not os.path.exists(output_file_calibration), 'Output filepath already exists'
        experiment_args['output_file_acc'] = output_file_acc
        experiment_args['output_file_calibration'] = output_file_calibration

        # Load data
        human_counts, model_probs, y_true = load_CIFAR10H(model_name)
        y_h = simulate_single_human(human_counts, seed=seed)

        _run_experiment(y_h=y_h, model_probs=model_probs, y_true=y_true, **experiment_args)





if __name__ == '__main__':
    seed = 9658
    torch.manual_seed(seed)
    np.random.seed(seed)

    calibration_methods = ['none',  'temperature scaling']
    """
    calibration_metrics = {'ECE width': lambda probs, y: get_ECE(probs, y, mode='width'),
                           'ECE mass': lambda probs, y: get_ECE(probs, y, mode='mass'),
                           'cwECE thresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width'),
                           'cwECE thresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass'),
                           'cwECE nothresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width',
                                                                               threshold_mode=None),
                           'cwECE nothresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass',
                                                                              threshold_mode=None),
                           'kumar MCE': get_MCE,
                           'kumar MCE (bin)': lambda probs, y: cal.get_binning_ce(probs, y,
                                                                                  p=1, debias=False, mode='marginal'),
                           'kumar MCE (scale)': lambda probs, y: cal.lower_bound_scaling_ce(probs, y,
                                                                                            p=1, debias=False,
                                                                                            mode='marginal'),
                           'kumar ECE': cal.get_ece}
    """
    calibration_metrics = {
                           'NLL': get_NLL}

    args = {'n_runs': 25,
            'test_size': 0.3,
            'calibration_methods': calibration_methods,
            'calibration_metrics': calibration_metrics,
            'seed': seed
            }

    out_fpath = ''
    run_experiment_cifar10(out_fpath=out_fpath, experiment_args=args, seed=seed)

    # out_fpath = './output/noisy_imagenet/final/fully_sup_CI/'
    # run_experiment_noisy_imagenet(out_fpath=out_fpath, experiment_args=args, seed=seed)

Models:   0%|          | 0/4 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

##2.Calibration_experiment

In [13]:
def get_dirichlet_params(acc, strength, n_cls):
    # acc: desired off-diagonal accuracy
    # strength: strength of prior

    # Returns alpha,beta where the prior is Dir((beta, beta, . . . , alpha, . . . beta))
    # where the alpha appears for the correct class

    beta = 0.1
    alpha = beta * (n_cls - 1) * acc / (1. - acc)

    alpha *= strength
    beta *= strength

    alpha += 1
    beta += 1

    return alpha, beta

class MAPOracleCombiner(OracleCombiner):
    """ P+L combination method, fit using MAP estimates
    This is our preferred combination method.
    """
    def __init__(self, diag_acc=0.75, strength=1., mu_beta=0.5, sigma_beta=0.5, **kwargs):
        super().__init__()
        self.calibrator = None
        self.prior_params = {'mu_beta': mu_beta,
                             'sigma_beta': sigma_beta
        }
        #self.n_cls = None
        self.diag_acc = diag_acc
        self.strength = strength

    def fit(self, model_probs, y_h, y_true, model_logits=None):
        self.n_cls = model_probs.shape[1]

        # Get MAP estimate of confusion matrix
        alpha, beta = get_dirichlet_params(self.diag_acc, self.strength, self.n_cls)
        prior_matr = np.eye(self.n_cls) * alpha + (np.ones(self.n_cls) - np.eye(self.n_cls)) * beta
        posterior_matr = 1. * confusion_matrix(y_true, y_h, labels=np.arange(self.n_cls))
        posterior_matr += prior_matr
        posterior_matr = posterior_matr.T
        posterior_matr = (posterior_matr - np.ones(self.n_cls)) / (np.sum(posterior_matr, axis=0, keepdims=True) - self.n_cls)
        self.confusion_matrix = posterior_matr

        self.calibrator = TSCalibratorMAP()
        logits = np.log(np.clip(model_probs, 1e-50, 1))
        self.calibrator.fit(logits, y_true)

In [14]:
from torch.distributions.log_normal import LogNormal

class TSCalibratorMAP(BaseCalibrator):
    """ MAP Temperature Scaling
    """

    def __init__(self, temperature=1., prior_mu=0.5, prior_sigma=0.5):
        super().__init__()
        self.temperature = temperature
        self.loss_trace = None

        self.prior_mu = torch.tensor(prior_mu)
        self.prior_sigma = torch.tensor(prior_sigma)

    def fit(self, model_logits, y):
        """ Fits temperature scaling using hard labels.
        """
        # Pre-processing
        _model_logits = torch.from_numpy(model_logits)
        _y = torch.from_numpy(y)
        _temperature = torch.tensor(self.temperature, requires_grad=True)

        prior = LogNormal(self.prior_mu, self.prior_sigma)
        # Optimization parameters
        nll = nn.CrossEntropyLoss()  # Supervised hard-label loss
        num_steps = 7500
        learning_rate = 0.05
        grad_tol = 1e-3  # Gradient tolerance for early stopping
        min_temp, max_temp = 1e-2, 1e4  # Upper / lower bounds on temperature

        optimizer = optim.Adam([_temperature], lr=learning_rate)

        loss_trace = []  # Track loss over iterations
        step = 0
        converged = False
        while not converged:

            optimizer.zero_grad()
            loss = nll(_model_logits / _temperature, _y)
            loss += -1 * prior.log_prob(_temperature)  # This step adds the prior
            loss.backward()
            optimizer.step()
            loss_trace.append(loss.item())

            with torch.no_grad():
                _temperature.clamp_(min=min_temp, max=max_temp)

            step += 1
            if step > num_steps:
                warnings.warn('Maximum number of steps reached -- may not have converged (TS)')
            converged = (step > num_steps) or (np.abs(_temperature.grad) < grad_tol)

        self.loss_trace = loss_trace
        self.temperature = _temperature.item()

    def calibrate(self, probs):
        calibrated_probs = probs ** (1. / self.temperature)  # Temper
        calibrated_probs /= np.sum(calibrated_probs, axis=1, keepdims=True)  # Normalize
        return calibrated_probs


In [15]:
import sys
sys.path.insert(0, '../')

#from data_utils import *
#from utils import *
#from combination_methods import *
from tqdm.auto import tqdm
import torch
from sklearn.model_selection import train_test_split
#from metrics import *
import csv
import numpy as np
import os
#from calibrators import *

# Generates the data for Table 2 (and Appendix D) in our paper.


def _run_experiment(y_h=None, model_probs=None, y_true=None, **kwargs):
    seed = kwargs.pop('seed', 0)
    n_runs = kwargs.pop('n_runs', 25)
    test_size = kwargs.pop('test_size', 0.3)
    calibration_methods = kwargs.pop('calibration_methods', ['none'])
    calibration_metrics = kwargs.pop('calibration_metrics', {'ECE': get_ECE})
    output_file_acc = kwargs.pop('output_file_acc', './acc.csv')
    output_file_calibration = kwargs.pop('output_file_calibration', './cal.csv')

    acc_data = []
    cal_data = []
    for i in tqdm(range(n_runs), leave=False, desc='Runs'):
        # Train/test split
        y_h_tr, y_h_te, model_probs_tr, model_probs_te, y_true_tr, y_true_te = train_test_split(
            y_h, model_probs, y_true, test_size=test_size, random_state=i * seed)

        # Limit to 5k datapoints
        y_h_tr = y_h_tr[:5000]
        model_probs_tr = model_probs_tr[:5000, :]
        y_true_tr = y_true_tr[:5000]

        acc_h = get_acc(y_h_te, y_true_te)
        acc_m = get_acc(np.argmax(model_probs_te, axis=1), y_true_te)

        _acc_data = [acc_h, acc_m]
        _cal_data = []
        DIAG_ACC = 0.75
        MU_BETA = 0.5
        SIGMA_BETA = 0.5
        combiners = {'MAP_CI': MAPOracleCombiner(diag_acc=DIAG_ACC, mu_beta=MU_BETA, sigma_beta=SIGMA_BETA),
                     'uncal_MAP_CI': MAPOracleCombiner(diag_acc=DIAG_ACC, mu_beta=MU_BETA, sigma_beta=SIGMA_BETA)}
        for combiner_name, combiner in combiners.items():
            combiner.fit(model_probs_tr, y_h_tr, y_true_tr)
            if combiner_name == 'uncal_MAP_CI':
                combiner.calibrator.temperature = 1  # pretty hacky way to get uncalibrated temps.. but w/e

            y_comb_te = combiner.combine(model_probs_te, y_h_te)
            acc_comb = get_acc(y_comb_te, y_true_te)
            _acc_data.append(acc_comb)

            model_probs_calibrated_te = combiner.calibrate(model_probs_te)
            y_comb_prob_te = combiner.combine_proba(model_probs_te, y_h_te)

            # ----- Calibrate combination
            ts_calibrator = TSCalibratorMAP()
            comb_probs_tr = combiner.combine_proba(model_probs_tr, y_h_tr)
            comb_logits_tr = np.log(np.clip(comb_probs_tr, 1e-50, 1))
            ts_calibrator.fit(comb_logits_tr, y_true_tr)
            y_comb_prob_te_calibrated = ts_calibrator.calibrate(y_comb_prob_te)

            for metric, fxn in calibration_metrics.items():
                cal_m = fxn(model_probs_calibrated_te, y_true_te)
                cal_comb = fxn(y_comb_prob_te, y_true_te)
                cal_comb_calibrated = fxn(y_comb_prob_te_calibrated, y_true_te)
                _cal_data.append([combiner_name, metric, cal_m, cal_comb, cal_comb_calibrated])

            acc_data += [_acc_data]
            cal_data += _cal_data

    # Save data to CSV
    header_acc = ['human', 'model'] + [f'comb {cal_m}' for cal_m in calibration_methods]
    with open(output_file_acc, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_acc)
        writer.writerows(acc_data)
    header_cal = ['calibration method', 'metric', 'model', 'comb', 'comb (post cal)']
    with open(output_file_calibration, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_cal)
        writer.writerows(cal_data)


def run_experiment_cifar10(out_fpath=None, experiment_args=None, seed=0):
    model_names = ['r_low_acc', 'resnet-110', 'preresnet-110', 'densenet-bc-L190-k40']
    for model_name in tqdm(model_names, desc='Models', leave=True):
        # Specify output files
        output_file_acc = out_fpath + f'{model_name}_accuracy.csv'
        output_file_calibration = out_fpath + f'{model_name}_calibration.csv'
        assert not os.path.exists(output_file_acc), 'Output filepath already exists'
        assert not os.path.exists(output_file_calibration), 'Output filepath already exists'
        experiment_args['output_file_acc'] = output_file_acc
        experiment_args['output_file_calibration'] = output_file_calibration

        # Load data
        human_counts, model_probs, y_true = load_CIFAR10H(model_name)
        y_h = simulate_single_human(human_counts, seed=seed)

        _run_experiment(y_h=y_h, model_probs=model_probs, y_true=y_true, **experiment_args)



if __name__ == '__main__':
    seed = 9658
    torch.manual_seed(seed)
    np.random.seed(seed)

    calibration_methods = ['none', 'confusion', 'temperature scaling']
    """
    calibration_metrics = {'ECE width': lambda probs, y: get_ECE(probs, y, mode='width'),
                           'ECE mass': lambda probs, y: get_ECE(probs, y, mode='mass'),
                           'cwECE thresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width'),
                           'cwECE thresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass'),
                           'cwECE nothresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width',
                                                                               threshold_mode=None),
                           'cwECE nothresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass',
                                                                              threshold_mode=None),
                           'kumar MCE': get_MCE,
                           'kumar MCE (bin)': lambda probs, y: cal.get_binning_ce(probs, y,
                                                                                  p=1, debias=False, mode='marginal'),
                           'kumar MCE (scale)': lambda probs, y: cal.lower_bound_scaling_ce(probs, y,
                                                                                            p=1, debias=False,
                                                                                            mode='marginal'),
                           'kumar ECE': cal.get_ece}
    """
    calibration_metrics = {
                           'NLL': get_NLL}

    args = {'n_runs': 25,
            'test_size': 0.3,
            'calibration_methods': calibration_methods,
            'calibration_metrics': calibration_metrics,
            'seed': seed
            }

    out_fpath = 'combo_exp_'
    run_experiment_cifar10(out_fpath=out_fpath, experiment_args=args, seed=seed)

    #out_fpath = './output/noisy_imagenet/final/calibrate_comb_MAP/'
    #run_experiment_noisy_imagenet(out_fpath=out_fpath, experiment_args=args, seed=seed)

Models:   0%|          | 0/4 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

In [16]:
import pandas as pd

densenet_data = pd.read_csv('densenet-bc-L190-k40_calibration.csv') 
resnet_data = pd.read_csv('resnet-110_calibration.csv')

In [17]:
densenet_data = densenet_data[densenet_data['calibration method']=='none']

print('NLL:\n')
print('model: ',densenet_data['model'].mean(),'+-',densenet_data['model'].std())
print('comb: ',densenet_data['comb'].mean(),'+-',densenet_data['comb'].std())

NLL:

model:  0.1684602036918858 +- 0.014993866409259609
comb:  0.37190867133097905 +- 0.09985100197737258


In [18]:
resnet_data = resnet_data[resnet_data['calibration method']=='none']

print('NLL:\n')
print('model: ',resnet_data['model'].mean(),'+-',resnet_data['model'].std())
print('comb: ',resnet_data['comb'].mean(),'+-',resnet_data['comb'].std())

NLL:

model:  0.23901574634536804 +- 0.015875530397798508
comb:  0.32335299580621135 +- 0.08544993583783371


#Q2

In [19]:
def simulate_single_human_2(human_counts, seed=0):
    rng = np.random.default_rng(seed)

    human_labels_per_input = np.sum(human_counts, axis=1)
    min_human_labels = int(min(human_labels_per_input))
    n_rows = human_counts.shape[0]
    n_classes = human_counts.shape[1]

    human_labels = np.empty(shape=(n_rows, min_human_labels))
    for row in range(n_rows):
        temp = []
        for i in range(n_classes):
            temp += [int(human_counts[row, i])]
        #rng.shuffle(temp)
        max_ele = max(temp)
        count_max_ele = temp.count(max_ele)

        if count_max_ele == 1:
            human_labels[row, 0] = temp.index(max_ele)
        else:
            index_arr = []
            for i in range(len(temp)):
                if temp[i] == max_ele:
                    index_arr.append(i)
            human_labels[row, 0] = index_arr[np.random.randint(0, len(index_arr))]
        #human_labels[row, :] = temp[:min_human_labels]

    return human_labels[:, 0].astype(int)

In [20]:
def get_NLL(probs, y_true):
    """ Computes the negative log likelihood.
    """
    nll = nn.NLLLoss()
    _probs = np.clip(probs, 1e-100, 1)
    logprobs = torch.from_numpy(np.log(_probs))
    return nll(logprobs, torch.from_numpy(y_true)).item()


# Generates the data for Appendix C in our paper.

def _run_experiment(y_h=None, model_probs=None, y_true=None, **kwargs):
    seed = kwargs.pop('seed', 0)
    n_runs = kwargs.pop('n_runs', 25)
    test_size = kwargs.pop('test_size', 0.3)
    calibration_methods = kwargs.pop('calibration_methods', ['none'])
    calibration_metrics = kwargs.pop('calibration_metrics', {'ECE': get_ECE})
    output_file_acc = kwargs.pop('output_file_acc', './acc.csv')
    output_file_calibration = kwargs.pop('output_file_calibration', './cal.csv')

    acc_data = []
    cal_data = []
    for i in tqdm(range(n_runs), leave=False, desc='Runs'):
        # Train/test split
        y_h_tr, y_h_te, model_probs_tr, model_probs_te, y_true_tr, y_true_te = train_test_split(
            y_h, model_probs, y_true, test_size=test_size, random_state=i * seed)

        acc_h = get_acc(y_h_te, y_true_te)
        acc_m = get_acc(np.argmax(model_probs_te, axis=1), y_true_te)

        _acc_data = [acc_h, acc_m]
        _cal_data = []
        for calibration_method in calibration_methods:
            if calibration_method == 'confusion':
                combiner = DoubleConfusionCombiner()
                combiner.fit(model_probs_tr, y_h_tr, y_true_tr)
            else:
                combiner = OracleCombiner(calibration_method=calibration_method)
                combiner.fit(model_probs_tr, y_h_tr, y_true_tr)

            y_comb_te = combiner.combine(model_probs_te, y_h_te)
            acc_comb = get_acc(y_comb_te, y_true_te)
            _acc_data.append(acc_comb)

            model_probs_calibrated_te = combiner.calibrate(model_probs_te)
            y_comb_prob_te = combiner.combine_proba(model_probs_te, y_h_te)
            for metric, fxn in calibration_metrics.items():
                cal_m = fxn(model_probs_calibrated_te, y_true_te)
                cal_comb = fxn(y_comb_prob_te, y_true_te)
                _cal_data.append([calibration_method, metric, cal_m, cal_comb])

        acc_data += [_acc_data]
        cal_data += _cal_data

    # Save data to CSV
    header_acc = ['human', 'model'] + [f'comb {cal_m}' for cal_m in calibration_methods]
    with open(output_file_acc, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_acc)
        writer.writerows(acc_data)
    header_cal = ['calibration method', 'metric', 'model', 'comb']
    with open(output_file_calibration, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_cal)
        writer.writerows(cal_data)


def run_experiment_cifar10(out_fpath=None, experiment_args=None, seed=0):
    model_names = ['r_low_acc', 'resnet-110', 'preresnet-110', 'densenet-bc-L190-k40']
    for model_name in tqdm(model_names, desc='Models', leave=True):
        # Specify output files
        output_file_acc = out_fpath + f'{model_name}_accuracy.csv'
        output_file_calibration = out_fpath + f'{model_name}_calibration.csv'
        assert not os.path.exists(output_file_acc), 'Output filepath already exists'
        assert not os.path.exists(output_file_calibration), 'Output filepath already exists'
        experiment_args['output_file_acc'] = output_file_acc
        experiment_args['output_file_calibration'] = output_file_calibration

        # Load data
        human_counts, model_probs, y_true = load_CIFAR10H(model_name)
        y_h = simulate_single_human_2(human_counts, seed=seed)

        _run_experiment(y_h=y_h, model_probs=model_probs, y_true=y_true, **experiment_args)





if __name__ == '__main__':
    seed = 9658
    torch.manual_seed(seed)
    np.random.seed(seed)

    calibration_methods = ['none',  'temperature scaling']
    """
    calibration_metrics = {'ECE width': lambda probs, y: get_ECE(probs, y, mode='width'),
                           'ECE mass': lambda probs, y: get_ECE(probs, y, mode='mass'),
                           'cwECE thresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width'),
                           'cwECE thresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass'),
                           'cwECE nothresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width',
                                                                               threshold_mode=None),
                           'cwECE nothresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass',
                                                                              threshold_mode=None),
                           'kumar MCE': get_MCE,
                           'kumar MCE (bin)': lambda probs, y: cal.get_binning_ce(probs, y,
                                                                                  p=1, debias=False, mode='marginal'),
                           'kumar MCE (scale)': lambda probs, y: cal.lower_bound_scaling_ce(probs, y,
                                                                                            p=1, debias=False,
                                                                                            mode='marginal'),
                           'kumar ECE': cal.get_ece}
    """
    calibration_metrics = {
                           'NLL': get_NLL}

    args = {'n_runs': 25,
            'test_size': 0.3,
            'calibration_methods': calibration_methods,
            'calibration_metrics': calibration_metrics,
            'seed': seed
            }

    out_fpath = 'Q2_base_'
    run_experiment_cifar10(out_fpath=out_fpath, experiment_args=args, seed=seed)

    # out_fpath = './output/noisy_imagenet/final/fully_sup_CI/'
    # run_experiment_noisy_imagenet(out_fpath=out_fpath, experiment_args=args, seed=seed)

Models:   0%|          | 0/4 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

In [21]:
import sys
sys.path.insert(0, '../')

#from data_utils import *
#from utils import *
#from combination_methods import *
from tqdm.auto import tqdm
import torch
from sklearn.model_selection import train_test_split
#from metrics import *
import csv
import numpy as np
import os
#from calibrators import *

# Generates the data for Table 2 (and Appendix D) in our paper.


def _run_experiment(y_h=None, model_probs=None, y_true=None, **kwargs):
    seed = kwargs.pop('seed', 0)
    n_runs = kwargs.pop('n_runs', 25)
    test_size = kwargs.pop('test_size', 0.3)
    calibration_methods = kwargs.pop('calibration_methods', ['none'])
    calibration_metrics = kwargs.pop('calibration_metrics', {'ECE': get_ECE})
    output_file_acc = kwargs.pop('output_file_acc', './acc.csv')
    output_file_calibration = kwargs.pop('output_file_calibration', './cal.csv')

    acc_data = []
    cal_data = []
    for i in tqdm(range(n_runs), leave=False, desc='Runs'):
        # Train/test split
        y_h_tr, y_h_te, model_probs_tr, model_probs_te, y_true_tr, y_true_te = train_test_split(
            y_h, model_probs, y_true, test_size=test_size, random_state=i * seed)

        # Limit to 5k datapoints
        y_h_tr = y_h_tr[:5000]
        model_probs_tr = model_probs_tr[:5000, :]
        y_true_tr = y_true_tr[:5000]

        acc_h = get_acc(y_h_te, y_true_te)
        acc_m = get_acc(np.argmax(model_probs_te, axis=1), y_true_te)

        _acc_data = [acc_h, acc_m]
        _cal_data = []
        DIAG_ACC = 0.75
        MU_BETA = 0.5
        SIGMA_BETA = 0.5
        combiners = {'MAP_CI': MAPOracleCombiner(diag_acc=DIAG_ACC, mu_beta=MU_BETA, sigma_beta=SIGMA_BETA),
                     'uncal_MAP_CI': MAPOracleCombiner(diag_acc=DIAG_ACC, mu_beta=MU_BETA, sigma_beta=SIGMA_BETA)}
        for combiner_name, combiner in combiners.items():
            combiner.fit(model_probs_tr, y_h_tr, y_true_tr)
            if combiner_name == 'uncal_MAP_CI':
                combiner.calibrator.temperature = 1  # pretty hacky way to get uncalibrated temps.. but w/e

            y_comb_te = combiner.combine(model_probs_te, y_h_te)
            acc_comb = get_acc(y_comb_te, y_true_te)
            _acc_data.append(acc_comb)

            model_probs_calibrated_te = combiner.calibrate(model_probs_te)
            y_comb_prob_te = combiner.combine_proba(model_probs_te, y_h_te)

            # ----- Calibrate combination
            ts_calibrator = TSCalibratorMAP()
            comb_probs_tr = combiner.combine_proba(model_probs_tr, y_h_tr)
            comb_logits_tr = np.log(np.clip(comb_probs_tr, 1e-50, 1))
            ts_calibrator.fit(comb_logits_tr, y_true_tr)
            y_comb_prob_te_calibrated = ts_calibrator.calibrate(y_comb_prob_te)

            for metric, fxn in calibration_metrics.items():
                cal_m = fxn(model_probs_calibrated_te, y_true_te)
                cal_comb = fxn(y_comb_prob_te, y_true_te)
                cal_comb_calibrated = fxn(y_comb_prob_te_calibrated, y_true_te)
                _cal_data.append([combiner_name, metric, cal_m, cal_comb, cal_comb_calibrated])

            acc_data += [_acc_data]
            cal_data += _cal_data

    # Save data to CSV
    header_acc = ['human', 'model'] + [f'comb {cal_m}' for cal_m in calibration_methods]
    with open(output_file_acc, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_acc)
        writer.writerows(acc_data)
    header_cal = ['calibration method', 'metric', 'model', 'comb', 'comb (post cal)']
    with open(output_file_calibration, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header_cal)
        writer.writerows(cal_data)


def run_experiment_cifar10(out_fpath=None, experiment_args=None, seed=0):
    model_names = ['r_low_acc', 'resnet-110', 'preresnet-110', 'densenet-bc-L190-k40']
    for model_name in tqdm(model_names, desc='Models', leave=True):
        # Specify output files
        output_file_acc = out_fpath + f'{model_name}_accuracy.csv'
        output_file_calibration = out_fpath + f'{model_name}_calibration.csv'
        assert not os.path.exists(output_file_acc), 'Output filepath already exists'
        assert not os.path.exists(output_file_calibration), 'Output filepath already exists'
        experiment_args['output_file_acc'] = output_file_acc
        experiment_args['output_file_calibration'] = output_file_calibration

        # Load data
        human_counts, model_probs, y_true = load_CIFAR10H(model_name)
        y_h = simulate_single_human_2(human_counts, seed=seed)

        _run_experiment(y_h=y_h, model_probs=model_probs, y_true=y_true, **experiment_args)



if __name__ == '__main__':
    seed = 9658
    torch.manual_seed(seed)
    np.random.seed(seed)

    calibration_methods = ['none', 'confusion', 'temperature scaling']
    """
    calibration_metrics = {'ECE width': lambda probs, y: get_ECE(probs, y, mode='width'),
                           'ECE mass': lambda probs, y: get_ECE(probs, y, mode='mass'),
                           'cwECE thresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width'),
                           'cwECE thresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass'),
                           'cwECE nothresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width',
                                                                               threshold_mode=None),
                           'cwECE nothresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass',
                                                                              threshold_mode=None),
                           'kumar MCE': get_MCE,
                           'kumar MCE (bin)': lambda probs, y: cal.get_binning_ce(probs, y,
                                                                                  p=1, debias=False, mode='marginal'),
                           'kumar MCE (scale)': lambda probs, y: cal.lower_bound_scaling_ce(probs, y,
                                                                                            p=1, debias=False,
                                                                                            mode='marginal'),
                           'kumar ECE': cal.get_ece}
    """
    calibration_metrics = {
                           'NLL': get_NLL}

    args = {'n_runs': 25,
            'test_size': 0.3,
            'calibration_methods': calibration_methods,
            'calibration_metrics': calibration_metrics,
            'seed': seed
            }

    out_fpath = 'Q2_combo_exp_'
    run_experiment_cifar10(out_fpath=out_fpath, experiment_args=args, seed=seed)

    #out_fpath = './output/noisy_imagenet/final/calibrate_comb_MAP/'
    #run_experiment_noisy_imagenet(out_fpath=out_fpath, experiment_args=args, seed=seed)

Models:   0%|          | 0/4 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

Runs:   0%|          | 0/25 [00:00<?, ?it/s]

In [22]:
densenet_data = pd.read_csv('Q2_combo_exp_densenet-bc-L190-k40_calibration.csv') 
resnet_data = pd.read_csv('Q2_combo_exp_resnet-110_calibration.csv')

In [27]:
densenet_data = densenet_data[densenet_data['calibration method']=='MAP_CI']

#print('ECE :: ', '\n', densenet_data[densenet_data['metric'] == 'ECE (W)'].mean(), '\n', densenet_data[densenet_data['metric'] == 'ECE (W)'].std())
#print('\n\n')

#print('cwECE :: ', '\n', densenet_data[densenet_data['metric'] == 'cwECE (WT)'].mean(), '\n', densenet_data[densenet_data['metric'] == 'cwECE (WT)'].std())
#print('\n\n')


print('NLL:\n')
print('model: ',densenet_data['model'].mean(),'+-',densenet_data['model'].std())
print('comb: ',densenet_data['comb'].mean(),'+-',densenet_data['comb'].std())

NLL:

model:  0.13697279494110723 +- 0.011366149028872032
comb:  0.05090241420365429 +- 0.010708769104916218


In [24]:
resnet_data = resnet_data[resnet_data['calibration method']=='MAP_CI']

print('NLL:\n')
print('model: ',resnet_data['model'].mean(),'+-',resnet_data['model'].std())
print('comb: ',resnet_data['comb'].mean(),'+-',resnet_data['comb'].std())

NLL:

model:  0.20660398832531968 +- 0.012600371894193848
comb:  0.049413026404921664 +- 0.009684166279715882


#Q3

In [25]:
from keras.backend import learning_phase
import sys
sys.path.insert(0, '../')

#from data_utils import *
#from utils import *
#from combination_methods import *
from tqdm.auto import tqdm
import torch
from sklearn.model_selection import train_test_split
#from metrics import *
import csv
import numpy as np
import os
#from calibrators import *

from tensorflow import keras
from keras import layers
from keras.layers import Input, Dense, Activation
# Generates the data for Table 2 (and Appendix D) in our paper.


def _run_experiment(y_h=None, model_probs=None, y_true=None, **kwargs):
    seed = kwargs.pop('seed', 0)
    n_runs = kwargs.pop('n_runs', 25)
    test_size = kwargs.pop('test_size', 0.3)
    calibration_methods = kwargs.pop('calibration_methods', ['none'])
    calibration_metrics = kwargs.pop('calibration_metrics', {'ECE': get_ECE})
    output_file_acc = kwargs.pop('output_file_acc', './acc.csv')
    output_file_calibration = kwargs.pop('output_file_calibration', './cal.csv')

    acc_data = []
    cal_data = []
    # for i in tqdm(range(n_runs), leave=False, desc='Runs'):
    # Train/test split
    y_h_tr, y_h_te, model_probs_tr, model_probs_te, y_true_tr, y_true_te = train_test_split(
        y_h, model_probs, y_true, test_size=test_size, random_state=seed)

    # Limit to 5k datapoints
    y_h_tr = y_h_tr[:5000]
    model_probs_tr = model_probs_tr[:5000, :]
    y_true_tr = y_true_tr[:5000]

    te_arr = []
    for i in range(len(model_probs_te)):
        te_arr.append([y_h_te[i]])
    
    x_te = np.append(model_probs_te, te_arr ,1)

    tr_arr = []
    for i in range(len(model_probs_tr)):
        tr_arr.append([y_h_tr[i]])
    
    x_tr = np.append(model_probs_tr, tr_arr, 1)

    model = keras.Sequential([
                              Input(shape=(None, len(x_tr[0]))),
                              Dense(25, activation='relu'),
                              Dense(30, activation='relu'),
                              Dense(10, activation='softmax')
    ])

    adam_opt = keras.optimizers.Adam(learning_rate=0.01)

    model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'], optimizer=adam_opt)

    summary = model.fit(x_tr, y_true_tr, batch_size=64, epochs=15, validation_data=(x_te, y_true_te))

    return summary.history        

def run_experiment_cifar10(out_fpath=None, experiment_args=None, seed=0):
    model_names = ['r_low_acc', 'resnet-110', 'preresnet-110', 'densenet-bc-L190-k40']
    for model_name in tqdm(model_names, desc='Models', leave=True):
        # Specify output files
        output_file_acc = out_fpath + f'{model_name}_accuracy.csv'
        output_file_calibration = out_fpath + f'{model_name}_calibration.csv'
        assert not os.path.exists(output_file_acc), 'Output filepath already exists'
        assert not os.path.exists(output_file_calibration), 'Output filepath already exists'
        experiment_args['output_file_acc'] = output_file_acc
        experiment_args['output_file_calibration'] = output_file_calibration

        # Load data
        human_counts, model_probs, y_true = load_CIFAR10H(model_name)
        y_h = simulate_single_human_2(human_counts, seed=seed)

        _run_experiment(y_h=y_h, model_probs=model_probs, y_true=y_true, **experiment_args)



if __name__ == '__main__':
    seed = 9658
    torch.manual_seed(seed)
    np.random.seed(seed)

    calibration_methods = ['none', 'confusion', 'temperature scaling']
    """
    calibration_metrics = {'ECE width': lambda probs, y: get_ECE(probs, y, mode='width'),
                           'ECE mass': lambda probs, y: get_ECE(probs, y, mode='mass'),
                           'cwECE thresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width'),
                           'cwECE thresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass'),
                           'cwECE nothresh width': lambda probs, y: get_cw_ECE(probs, y, mode='width',
                                                                               threshold_mode=None),
                           'cwECE nothresh mass': lambda probs, y: get_cw_ECE(probs, y, mode='mass',
                                                                              threshold_mode=None),
                           'kumar MCE': get_MCE,
                           'kumar MCE (bin)': lambda probs, y: cal.get_binning_ce(probs, y,
                                                                                  p=1, debias=False, mode='marginal'),
                           'kumar MCE (scale)': lambda probs, y: cal.lower_bound_scaling_ce(probs, y,
                                                                                            p=1, debias=False,
                                                                                            mode='marginal'),
                           'kumar ECE': cal.get_ece}
    """
    calibration_metrics = {
                           'NLL': get_NLL}

    args = {'n_runs': 25,
            'test_size': 0.3,
            'calibration_methods': calibration_methods,
            'calibration_metrics': calibration_metrics,
            'seed': seed
            }

    out_fpath = 'Q3_combo_exp_'
    run_experiment_cifar10(out_fpath=out_fpath, experiment_args=args, seed=seed)

    #out_fpath = './output/noisy_imagenet/final/calibrate_comb_MAP/'
    #run_experiment_noisy_imagenet(out_fpath=out_fpath, experiment_args=args, seed=seed)

Models:   0%|          | 0/4 [00:00<?, ?it/s]

Epoch 1/15


  return dispatch_target(*args, **kwargs)


Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


###Observations:



1.   With the above run of neural networks, the accuracies are hitting 98.64% !!!
2.   In the neural network I have used Input->Dense(25,relu)->Dense(35,relu)->Dense(10, Softmax) and Adam optimizer

