In [None]:
import torch
import pickle
import argparse
import numpy as np
import torch.nn as nn
from torchmetrics import Accuracy
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from fastai.vision.all import untar_data, URLs
from pytorch_lightning import Trainer
from dime.utils import MaskLayer2d
from dime import CMIEstimator, MaskingPretrainer
from dime.resnet_imagenet import resnet18, Predictor, ValueNetwork, ResNet18Backbone
from dime.vit import PredictorViT, ValueNetworkViT
import timm
import matplotlib.pyplot as plt

# Load Dataset

In [None]:
image_size = 224
mask_width = 14
mask_type = 'zero'
mask_layer = MaskLayer2d(append=False, mask_width=mask_width, patch_size=image_size/mask_width)

device = torch.device('cuda:7')

norm_constants = ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
unnorm_constants = ((-0.4914/0.2023, -0.4822/0.1994, -0.4465/0.2010), (1/0.2023, 1/0.1994, 1/0.2010))
mbsize = 32
dataset_path = "/homes/gws/sgadgil/.fastai/data/imagenette2-320"
transforms_test = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize(*norm_constants),
    ])

test_dataset = ImageFolder(dataset_path+'/val', transforms_test)
test_dataloader = DataLoader(test_dataset, batch_size=mbsize, pin_memory=True, num_workers=4, shuffle=False)

# Load Pretrained Model

In [None]:
acc_metric = Accuracy(task='multiclass', num_classes=10)

arch = "vit"
use_entropy=False
if arch == 'resnet':
    backbone = ResNet18Backbone(resnet18(pretrained=True))
    predictor =  Predictor(backbone)
    block_layer_stride = 1
    if mask_width == 14:
        block_layer_stride = 0.5
    value_network = ValueNetwork(backbone, block_layer_stride=block_layer_stride)
else:
    backbone = timm.create_model('vit_small_patch16_224', pretrained=True)
    predictor =  PredictorViT(backbone)
    value_network = ValueNetworkViT(backbone)

trained_model_path = f"<path_to_trained_model>"

greedy_cmi_estimator = CMIEstimator.load_from_checkpoint(trained_model_path,
                                                         value_network=value_network,
                                                         predictor=predictor,
                                                         mask_layer=mask_layer,
                                                         lr=1e-5,
                                                         min_lr=1e-8,
                                                         max_features=50,
                                                         eps=0.05,
                                                         loss_fn=nn.CrossEntropyLoss(reduction='none'),
                                                         val_loss_fn=acc_metric,
                                                         eps_decay=0.2,
                                                         eps_steps=10,
                                                         patience=3,
                                                         feature_costs=None)
trainer = Trainer(
                    accelerator='gpu',
                    devices=[device.index],
                    precision=16
                )

# Evaluate Penalized Policy

In [None]:
avg_num_features_lamda = []
accuracy_scores_lamda = []
all_masks_lamda =[]
results_dict = {"acc": {}}

lamda_values = [0.4] #list(np.geomspace(0.01, 0.3, num=10))#[0.01] #list(np.geomspace(0.001, 0.3, num=10))
for lamda in lamda_values:
    metric_dict = greedy_cmi_estimator.inference(trainer, test_dataloader, feature_costs=None, lam=lamda)
        
    y = metric_dict['y']
    pred = metric_dict['pred']
    accuracy_score = acc_metric(pred, y)
    final_masks = np.array(metric_dict['mask'])
    accuracy_scores_lamda.append(accuracy_score)
    avg_num_features_lamda.append(np.mean(np.sum(final_masks, axis=1)))
    results_dict['acc'][np.mean(np.sum(final_masks, axis=1))] = accuracy_score

    print(f"Lambda={lamda}, Acc={accuracy_score}, Avg. num features={np.mean(np.sum(final_masks, axis=1))}")
    all_masks_lamda.append(final_masks)

# with open(f'results/imagenette_ours_trial_2_penalized.pkl', 'wb') as f:
#     pickle.dump(results_dict, f)

# Evaluate Budget Constrained Policy

In [None]:
results_dict = {"acc": {}}

avg_num_features_budget = []
accuracy_scores_budget = []
all_masks_budget=[]

max_budget_values = list(range(1, 15, 1))
for budget in max_budget_values:
    metric_dict_budget = greedy_cmi_estimator.inference(trainer, test_dataloader, feature_costs=None, budget=budget)
        
    y = metric_dict_budget['y']
    pred = metric_dict_budget['pred']
    accuracy_score = acc_metric(pred, y)
    final_masks = np.array(metric_dict_budget['mask'])
    accuracy_scores_budget.append(accuracy_score)
    avg_num_features_budget.append(np.mean(np.sum(final_masks, axis=1)))
    results_dict['acc'][np.mean(np.sum(final_masks, axis=1))] = accuracy_score
    print(f"Budget={budget}, Acc={accuracy_score}, Avg. num features={np.mean(np.sum(final_masks, axis=1))}")

    all_masks_budget.append(final_masks)

with open(f'results/imagenette_ours_trial_2.pkl', 'wb') as f:
    pickle.dump(results_dict, f)

# Evaluate Confidence Constrained Policy

In [None]:
avg_num_features_confidence = []
accuracy_scores_confidence = []
all_masks_confidence=[]
confidence_values = list(np.arange(0.1, 1, 0.1))

for confidence in confidence_values:
    metric_dict = greedy_cmi_estimator.inference(trainer, test_dataloader, feature_costs=None, confidence=confidence)
    y = metric_dict['y']
    pred = metric_dict['pred']
    accuracy_score = acc_metric(pred, y)

    final_masks = np.array(metric_dict['mask'])
    accuracy_scores_confidence.append(accuracy_score)
    avg_num_features_confidence.append(np.mean(np.sum(final_masks, axis=1)))
    print(f"Confidence={confidence}, Acc={accuracy_score}, Avg. num features={np.mean(np.sum(final_masks, axis=1))}")
    all_masks_confidence.append(final_masks)