# Conditional Conformal Prediction with Custom Function Class [WIP]

This notebook allows you to perform conditional conformal prediction using any custom function class you define. The function class should take model outputs as input and return features for conditional conformal prediction.

In [None]:
# %load_ext autoreload
# %autoreload 2
import os, sys
import numpy as np
import pandas as pd
import torch
from scipy.special import softmax
from sklearn.preprocessing import PolynomialFeatures

# Navigate to the project root directory and add necessary paths
current_dir = os.getcwd()
project_root = current_dir

while not (os.path.exists(os.path.join(project_root, 'experiments')) and 
           os.path.exists(os.path.join(project_root, 'conditionalconformal'))):
    parent = os.path.dirname(project_root)
    if parent == project_root:  
        break
    project_root = parent

# Add the necessary paths to the system path
experiments_path = os.path.join(project_root, 'experiments')
if experiments_path not in sys.path:
    sys.path.append(experiments_path)
if project_root not in sys.path:
    sys.path.append(project_root)


from utils.conformal import compute_conformity_score, compute_conformity_score_softmax, compute_conformity_score_aps, compute_conformity_score_raps, compute_sets_split, compute_sets_cond
from utils.model import get_image_classifier, split_test
from utils.data import get_image_dataset
from utils.evaluation import aggregate_results_over_seeds

Matplotlib is building the font cache; this may take a moment.


### Define Your Custom Function Class

Create your custom function class that will generate features for conditional conformal prediction. The class should have a `compute_features` method that takes model outputs and returns features.

In [None]:
class CustomFunctionClass:
    def __init__(self):
        """Initialize your custom function class"""
        pass
        
    def compute_features(self, logits, features=None):
        """Compute features for conditional conformal prediction
        
        Args:
            logits: Model output logits of shape (n_samples, n_classes)
            features: Optional model features of shape (n_samples, feature_dim)
            
        Returns:
            phi: Features for conditional conformal prediction of shape (n_samples, n_features)
        """
        # Example: Use maximum softmax probability as a feature
        probs = softmax(logits, axis=1)
        max_probs = np.max(probs, axis=1, keepdims=True)
        return max_probs

### Configuration

In [None]:
# Set paths and parameters
DATA_DIR = '/path/to/data'
CACHE_DIR = '/path/to/pretrained_model'
FEATURES_DIR = '/path/to/precomputed/features'
OUTPUT_DIR = './outputs'

# Model and dataset parameters
dataset_name = 'imagenet'  # Choose from: imagenet, places, imagenet_lt, places_lt
model_name = 'resnet50'    # Choose appropriate model for your dataset
batch_size = 64

# Conformal prediction parameters
alpha = 0.1               # Significance level
score_fn = 'aps'         # Conformity score function: softmax, aps, or raps
scores_randomize = False  # Whether to randomize scores
temp_scaling = True      # Whether to use temperature scaling
degree = 5               # Polynomial feature degree
seed = 1                 # Random seed

### Load Model and Data

In [None]:
# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load model
model, preprocess = get_image_classifier(model_name, device=device)

# Load dataset and compute/load features
if dataset_name in ["imagenet", "places"]:
    train_dataset, test_dataset = get_image_dataset(dataset_name, preprocess=preprocess)
    test_features, test_logits, test_labels = model.run_and_cache_outputs(test_dataset, batch_size, FEATURES_DIR)
    test_labels, test_features, test_logits, _, calib_labels, calib_features, calib_logits, _ = split_test(
        test_labels, test_features, test_logits, split=0.5, seed=seed
    )
else:
    train_dataset, val_dataset, test_dataset = get_image_dataset(dataset_name, preprocess=preprocess)
    calib_features, calib_logits, calib_labels = model.run_and_cache_outputs(val_dataset, batch_size, FEATURES_DIR)
    test_features, test_logits, test_labels = model.run_and_cache_outputs(test_dataset, batch_size, FEATURES_DIR)

### Compute Features using Custom Function Class

In [None]:
# Initialize your custom function class
custom_function = CustomFunctionClass()

# Compute features for calibration and test sets
calib_phi = custom_function.compute_features(calib_logits, calib_features)
test_phi = custom_function.compute_features(test_logits, test_features)

# Apply polynomial features if degree > 1
if degree > 1:
    poly = PolynomialFeatures(degree)
    calib_phi = poly.fit_transform(calib_phi)
    test_phi = poly.transform(test_phi)

print('Feature shapes:', calib_phi.shape, test_phi.shape)

### Compute conformity scores and prediction sets

In [None]:
# Compute base conformity scores
calib_scores, test_scores = compute_conformity_score(calib_logits, test_logits, calib_labels, test_labels, temp_scaling=temp_scaling)

# Compute method-specific conformity scores
if score_fn == "softmax":
    calib_scores, test_scores, test_scores_all = compute_conformity_score_softmax(
        calib_logits, test_logits, calib_labels, test_labels, temp_scaling=temp_scaling
    )
elif score_fn == "aps":
    calib_scores, test_scores, test_scores_all = compute_conformity_score_aps(
        calib_logits, test_logits, calib_labels, test_labels, rand=scores_randomize, temp_scaling=temp_scaling
    )
elif score_fn == "raps":
    calib_scores, test_scores, test_scores_all = compute_conformity_score_raps(
        calib_logits, test_logits, calib_labels, test_labels, rand=scores_randomize, temp_scaling=temp_scaling
    )

# Compute prediction sets for split conformal
coverages_split, prediction_sets_split, set_sizes_split = compute_sets_split(
    calib_scores, test_scores, test_scores_all, alpha
)

# Compute prediction sets for conditional conformal
coverages_cond, prediction_sets_cond, set_sizes_cond = compute_sets_cond(
    calib_phi, calib_scores, test_phi, test_scores, test_scores_all, alpha, rand=scores_randomize
)

### Save results

In [None]:
# Setup output directory
results_dir = os.path.join(OUTPUT_DIR, f"{dataset_name}_{model_name}")
os.makedirs(results_dir, exist_ok=True)

# Create filename based on parameters
res_fname = f"alpha_{alpha}_score_fn_{score_fn}_scores_randomize_{scores_randomize}_temp_scale_{temp_scaling}_custom_fn_degree_{degree}_seed_{seed}"

# Save results
np.save(os.path.join(results_dir, f"{res_fname}_calib_phi.npy"), calib_phi)
np.save(os.path.join(results_dir, f"{res_fname}_test_phi.npy"), test_phi)
np.save(os.path.join(results_dir, f"{res_fname}_coverages_split.npy"), coverages_split)
np.save(os.path.join(results_dir, f"{res_fname}_prediction_sets_split.npy"), prediction_sets_split)
np.save(os.path.join(results_dir, f"{res_fname}_set_sizes_split.npy"), set_sizes_split)
np.save(os.path.join(results_dir, f"{res_fname}_coverages_cond.npy"), coverages_cond)
np.save(os.path.join(results_dir, f"{res_fname}_prediction_sets_cond.npy"), prediction_sets_cond)
np.save(os.path.join(results_dir, f"{res_fname}_set_sizes_cond.npy"), set_sizes_cond)

### Evaluate results

In [None]:
# Define evaluation parameters
datasets = {
    'ImageNet': 'imagenet',
    'ImageNet-LT': 'imagenet_lt',
    'Places365': 'places',
    'Places365-LT': 'places_lt',
}

score_fns = {
    'APS': 'aps',
}

models = {
    'ImageNet': 'resnet50',
    'ImageNet-LT': 'resnext50_imagenet_lt',
    'Places365': 'resnet152_places',
    'Places365-LT': 'resnet152_places_lt',
}

methods = {
    'split': '$\\mathsf{split}$',
    'conditional': '$\\mathsf{conditional}$',
}

# Aggregate results
for dataset, dataset_name in datasets.items():
    for score_fn, score_fn_name in score_fns.items():
        results_dir = os.path.join(OUTPUT_DIR, f"{dataset_name}_{models[dataset]}")
        res_fname = f"alpha_{alpha}_score_fn_{score_fn_name}_scores_randomize_{scores_randomize}_temp_scale_{temp_scaling}_custom_fn"
        df = aggregate_results_over_seeds(dataset_name, models[dataset], FEATURES_DIR, results_dir, res_fname, None, methods.keys(), score_fn_name, degree)
        display(df)