In [1]:
import matplotlib.pyplot as plt
import numpy as np
import random
import pandas as pd
import scipy.stats, scipy.special

from collections import Counter

from conformal_utils import *

%load_ext autoreload
%autoreload 2

This notebook is a copy of `synthetic_experiments1.ipynb`, but the data generation process is modified to output 1000 classes

**Goal of this notebook**: Run some simulations to sanity check my code to see if class-balanced conformal inference will outperform standard conformal inference as we increase the amount of calibration data

## Setup

### Data generation procedure: Mixture of Gaussians

#### Phase I:
1. Randomly select a class $Y_i$ with probability determined by `P_Y`
2. Select the corresponding mean $\mu$ given by `means`
3. Sample $X_i \sim N(\mu, 1)$
4. Repeat steps 1-3 `num_samples` number of times

#### Phase II:
1. Compute $P(Y = k |X_i) = \frac{P(X_i | Y = k)P(Y_i = k)}{P(X_i)}$
2. Define score = $P(Y = \mathsf{truelabel} |X_i)$

In [21]:
def generate_synthetic_data(num_samples, means, std_dev, P_Y, return_log=False):
    '''
    Inputs:
        - num_samples: Number of samples to generate
        - means: array of the means of each Gaussian in the mixture of Gaussians 
        - std_dev: Scalar representing standard deviation of each Gaussian (all Gaussians have the same SD)
        - P_Y: array of same length as means. ith element is probability that true label is class i
        - return_log: if True, class_probs are returned as log of class probabilities 
    Outputs:
        - true_labels: num_samples length vector
        - class_probs: num_samples x 5 array containing P(Y_i = k | X_i) for each class k 
          or log(P(Y_i = k | X_i)) if return_log is set to True
        
    '''

    # --- Phase I ---
    # Generate mixture of Gaussians
    means = np.array(means) 
    std_devs = std_dev * np.ones((len(means),))
    P_Y = np.array(P_Y) 

    num_classes = len(means)

    # Select true labels randomly according to P_Y
    true_labels = np.random.choice(np.arange(num_classes), num_samples, p=P_Y)
    # Simulate values from corresponding Gaussians
    X = np.random.normal(loc=means[true_labels], scale=std_devs[true_labels], size=(num_samples,))
    
    # --- Phase II ---
    # Compute vector P(Y|X_i) for each X_i
    class_probs = np.zeros((num_samples, num_classes))
    if return_log: # Use log scale
        for k in range(num_classes):
            log_P_X_given_Y = scipy.stats.norm(loc=means[k], scale=std_devs[k]).logpdf(X)
            class_probs[:,k] = log_P_X_given_Y + np.log(P_Y[k]) # P(Y and X_i)

        # Normalize each row to get P(Y and X_i) / P(X_i) = P(Y|X_i)
        class_probs = class_probs - np.expand_dims(scipy.special.logsumexp(class_probs, axis=1), axis=1)
    else: 
        for k in range(num_classes):
            P_X_given_Y = scipy.stats.norm(loc=means[k], scale=std_devs[k]).pdf(X)
            class_probs[:,k] = P_X_given_Y * P_Y[k] # P(Y and X_i)

        # Normalize each row to get P(Y and X_i) / P(X_i) = P(Y|X_i)
        class_probs = class_probs / np.expand_dims(np.sum(class_probs, axis=1), axis=1)
    
    class_scores = 1 - class_probs
    
    return true_labels, class_scores

# For debugging purposes, generate uniform data with uniform scores
def generate_uniform_data(num_samples, num_classes):
    class_scores = np.random.rand(num_samples, num_classes)
    true_labels = np.random.choice(np.arange(num_classes), size=(num_samples,))
    
    return true_labels, class_scores

In [22]:
np.array([[1, 2, 3], [4, 5, 6], [2, 2, 2]]) - np.array([[0], [5], [10]])

array([[ 1,  2,  3],
       [-1,  0,  1],
       [-8, -8, -8]])

# What is the effect of increasing the size of the calibration dataset? 

In [77]:
calibration_data_size = 1000 
test_data_size = 10000
num_trials = 1 # Number of synthetic datasets to generate for each parameter setting

In [80]:
%%time 

np.random.seed(1)

# np.set_printoptions(suppress=True) # Suppress scientific notation when printing arrays
# np.set_printoptions(suppress=False)

num_classes = 1000
# num_classes = 200

# Specify calibration_data_size's to use
calibration_data_sizes = [5000, 10000, 25000, 50000, 100000]
test_data_size = 50000

# Fixed settings (held constant throughout experiment)
alpha = .05 # Specifies coverage level 
std_dev = .5
means = np.random.rand(num_classes,)*100
P_Y = np.ones((num_classes,)) / num_classes # Uniform class balance

# Run experiment
test_marginal_coverage = np.zeros((len(calibration_data_sizes), num_trials))
test_class_coverage = np.zeros((len(calibration_data_sizes), num_trials, num_classes))
for i, calibration_data_size in enumerate(calibration_data_sizes):
    print('calibration_data_size:', calibration_data_size)
    for j in range(num_trials):
        
# #         # DEBUGGING: Generate uniform data
#         cal_true_labels, cal_class_scores = generate_uniform_data(calibration_data_size, num_classes)
#         test_true_labels, test_class_scores = generate_uniform_data(test_data_size, num_classes)
        
        
        # Generate datasets of desired size
        cal_true_labels, cal_class_scores = generate_synthetic_data(calibration_data_size, means, std_dev, P_Y, return_log=False)
        test_true_labels, test_class_scores = generate_synthetic_data(test_data_size, means, std_dev, P_Y, return_log=False)
        
#         print('cal_class_probs', cal_class_probs[:3,:])

        # Compute standard score quantile
        standard_ci_qhat = compute_qhat(cal_class_scores, cal_true_labels, alpha=alpha)
        print('standard_ci_qhat', standard_ci_qhat)

        # Compute class-specific score quantiles
        qhats = compute_class_specific_qhats(cal_class_scores, cal_true_labels, alpha=alpha, default_qhat=np.inf)
#         qhats = compute_class_specific_qhats(cal_class_scores, cal_true_labels, alpha=alpha, default_qhat=standard_ci_qhat)

#         print('qhats', qhats)
        
        # Create prediction sets for test set
        set_preds = create_cb_prediction_sets(test_class_scores, qhats)
        avg_set_size = np.mean([len(x) for x in set_preds])
        print(f'Average set size {avg_set_size:.2f}')

        # Compute marginal coverage for test set
        test_marginal_coverage[i,j] = compute_coverage(test_true_labels, set_preds)
        
        # Compute class-specific coverage for test set
        class_specific_cov = compute_class_specific_coverage(test_true_labels, set_preds)
        test_class_coverage[i,j,:] = class_specific_cov
    print()

calibration_data_size: 5000
standard_ci_qhat 0.9881542894546381




Average set size 1000.00

calibration_data_size: 10000
standard_ci_qhat 0.9881368292743946






Average set size 995.14

calibration_data_size: 25000
standard_ci_qhat 0.9888513151404966


Average set size 122.16

calibration_data_size: 50000
standard_ci_qhat 0.9882594525600426
Average set size 22.17

calibration_data_size: 100000
standard_ci_qhat 0.9882206038034266
Average set size 21.11

CPU times: user 52.3 s, sys: 6.67 s, total: 59 s
Wall time: 1min 2s


In [79]:
avg_class_coverage = np.mean(test_class_coverage, axis=(1,2)) # Class coverage averaged across classes
sd_class_coverage = np.std(test_class_coverage, axis=(1,2))
test_marginal_coverage = np.squeeze(test_marginal_coverage)

df = pd.DataFrame({'calibration_data_size': calibration_data_sizes, 
                   'marginal_coverage': test_marginal_coverage, 
                   'avg_class_coverage': avg_class_coverage,
                   'sd_class_coverage': sd_class_coverage})

df

Unnamed: 0,calibration_data_size,marginal_coverage,avg_class_coverage,sd_class_coverage
0,5000,1.0,1.0,0.0
1,10000,0.99982,0.99982,0.005009
2,25000,0.96774,0.967918,0.044101
3,50000,0.96224,0.962638,0.037159
4,100000,0.9539,0.953603,0.037052
