# Dependencies

In [1]:
!pip install torch_uncertainty

Collecting torch_uncertainty
  Downloading torch_uncertainty-0.3.1-py3-none-any.whl.metadata (9.1 kB)
Collecting lightning>=2.0 (from lightning[pytorch-extra]>=2.0->torch_uncertainty)
  Downloading lightning-2.4.0-py3-none-any.whl.metadata (38 kB)
Collecting lightning-utilities<2.0,>=0.10.0 (from lightning>=2.0->lightning[pytorch-extra]>=2.0->torch_uncertainty)
  Downloading lightning_utilities-0.11.9-py3-none-any.whl.metadata (5.2 kB)
Collecting torchmetrics<3.0,>=0.7.0 (from lightning>=2.0->lightning[pytorch-extra]>=2.0->torch_uncertainty)
  Downloading torchmetrics-1.6.0-py3-none-any.whl.metadata (20 kB)
Collecting pytorch-lightning (from lightning>=2.0->lightning[pytorch-extra]>=2.0->torch_uncertainty)
  Downloading pytorch_lightning-2.4.0-py3-none-any.whl.metadata (21 kB)
Collecting bitsandbytes<1.0,>=0.42.0 (from lightning[pytorch-extra]>=2.0->torch_uncertainty)
  Downloading bitsandbytes-0.45.0-py3-none-manylinux_2_24_x86_64.whl.metadata (2.9 kB)
Collecting hydra-core<2.0,>=1.2.

In [1]:
!pip install blitz-bayesian-pytorch

Collecting blitz-bayesian-pytorch
  Downloading blitz_bayesian_pytorch-0.2.8-py3-none-any.whl.metadata (19 kB)
Downloading blitz_bayesian_pytorch-0.2.8-py3-none-any.whl (48 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.4/48.4 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: blitz-bayesian-pytorch
Successfully installed blitz-bayesian-pytorch-0.2.8


In [2]:
from pathlib import Path

import torch
from torch import nn, optim
from torch.optim.lr_scheduler import MultiStepLR

from torch_uncertainty import TUTrainer
from torch_uncertainty.datamodules import MNISTDataModule
from torch_uncertainty.losses import ELBOLoss
from torch_uncertainty.models.lenet import bayesian_lenet
from torch_uncertainty.models import mc_dropout
from torch_uncertainty.routines import ClassificationRoutine

from blitz.modules import BayesianLinear
from blitz.utils import variational_estimator
import scipy.stats as st

from pathlib import Path
from safetensors.torch import load_file
from torchvision.datasets import MNIST

from google.colab import drive
drive.flush_and_unmount()  # Unmount Google Drive
drive.mount('/content/drive')  # Remount Google Drive
import os


Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


# OptuNet Posterior Approximation

In [None]:
# Constants
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DATA_PATH = "data"

# Parameters from paper
EPOCHS = 60
BATCH_SIZE = 64
LEARNING_RATE = 0.04
#WEIGHT_DECAY = 2e-4

NUM_WORKERS = 4
## OptuNet params
DROPOUT_RATE = 0.2 # last layer dropout rate

## Load Data

In [None]:
# Load MNIST data
root = Path(DATA_PATH)
datamodule = MNISTDataModule(root=root, batch_size=BATCH_SIZE, eval_ood=False, num_workers=NUM_WORKERS)

TypeError: MNISTDataModule.__init__() got an unexpected keyword argument 'download'

## OptuNet Model

In [3]:
# the variational_estimator decorator adjusts the model to compute and optimize
# the Evidence Lower Bound (ELBO)
@variational_estimator
class OptuNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # Add layers for OptuNet (use Section C.2.1 from the paper for details)
        # Layers: Conv2D (out_ch=2, ks=4, groups=1) -> Max Pooling (ks=3, stride=3) -> ReLU -> Conv2D (out_ch=10, ks=5, groups=2) -> Average Pooling -> ReLU -> Linear 10x10
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=2, kernel_size=4, groups=1, bias=False)
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=3)
        self.conv2 = nn.Conv2d(in_channels=2, out_channels=10, kernel_size=5, groups=2, bias=False)
        self.pool2 = nn.AvgPool2d(kernel_size=2)
        self.fc1 = nn.Linear(in_features=10, out_features=10)
        #self.fc1 = nn.Linear(in_features=10 * 2 * 2, out_features=10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.pool1(self.conv1(x)))  # First conv, max pooling, ReLU
        x = self.relu(self.pool2(self.conv2(x)))  # Second conv, avg pooling, ReLU
        x = x.mean(dim=[2, 3])
        x = self.fc1(x)  # Linear layer
        return x
"""
def load_optunet_model(version: int):
    #Load the model corresponding to the given version.
    model = OptuNet(num_classes=datamodule.num_classes)
    #path = Path(f"models/mnist-optunet-0-8191/version_{version}.safetensors")
    #notebook_dir = Path("/content/drive/MyDrive/DL,\ adv/project")
    path = "/content/drive/MyDrive/DL, adv/project/mnist-optunet-0-8191/version_1000.safetensors"

    print(os.path.exists(path))

    if not os.path.exists(path):
        raise ValueError("File does not exist")

    state_dict = load_file(path)

    model.load_state_dict(state_dict=state_dict)
    return model
"""

'\ndef load_optunet_model(version: int):\n    #Load the model corresponding to the given version.\n    model = OptuNet(num_classes=datamodule.num_classes)\n    #path = Path(f"models/mnist-optunet-0-8191/version_{version}.safetensors")\n    #notebook_dir = Path("/content/drive/MyDrive/DL,\\ adv/project")\n    path = "/content/drive/MyDrive/DL, adv/project/mnist-optunet-0-8191/version_1000.safetensors"\n\n    print(os.path.exists(path))\n\n    if not os.path.exists(path):\n        raise ValueError("File does not exist")\n\n    state_dict = load_file(path)\n\n    model.load_state_dict(state_dict=state_dict)\n    return model\n'

In [None]:
#file_path = "/content/drive/MyDrive/DL, adv/project/mnist-optunet-0-8191/version_1000.safetensors"
#print("File exists:", os.path.exists(file_path))


File exists: True


## Train / Test

In [None]:
class CustomClassificationRoutine(ClassificationRoutine):
    def __init__(self, lr_scheduler, num_samples, kl_weight, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lr_scheduler = lr_scheduler
        #self.epoch_outputs = []  # Store outputs here if needed
        self.num_samples = num_samples
        self.kl_weight = kl_weight

    def training_step(self, batch, batch_idx):
        """
        output = super().training_step(batch, batch_idx)
        self.epoch_outputs.append(output)  # Collect outputs manually
        return output
        """

        inputs, targets = batch
        inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)

        # Calculate ELBO using sample_elbo
        elbo = self.model.sample_elbo(
            inputs=inputs,
            labels=targets,
            criterion=nn.CrossEntropyLoss(),
            sample_nbr=self.num_samples,
            complexity_cost_weight=self.kl_weight
        )

        self.log("train_elbo", elbo)
        return elbo

    def on_train_epoch_end(self):
        # Step the scheduler if it exists
        if self.lr_scheduler:
            self.lr_scheduler.step()

        # Optionally, process self.epoch_outputs here
        #self.epoch_outputs.clear()  # Clear outputs for the next epoch



In [None]:
def optim_lenet(model: nn.Module):
    optimizer = optim.SGD(
        model.parameters(),
        lr=0.04
    )
    return optimizer

#trainer = TUTrainer(accelerator="gpu", enable_progress_bar=False, max_epochs=60)
trainer = TUTrainer(accelerator="cpu", enable_progress_bar=False, max_epochs=60)

# model
#model = load_optunet_model(version=1000)
model = OptuNet(num_classes=datamodule.num_classes)

# loss
"""
loss = ELBOLoss(
    model=model,
    inner_loss=nn.CrossEntropyLoss(),
    kl_weight=1 / 10000,
    num_samples=3,
)
"""

# learning rate scheduler to  decay
#the learning rate twice during training, at epochs 15 and 30, dividing the learning rate by 2.
def scheduler_lenet(optimizer):
    scheduler = MultiStepLR(
        optimizer,
        milestones=[15, 30],  # Epochs at which to decay the learning rate
        gamma=0.5,            # Factor by which to multiply the learning rate
    )
    return scheduler

optimizer = optim_lenet(model)
scheduler = scheduler_lenet(optimizer)

routine = CustomClassificationRoutine(
    model=model,
    num_classes=datamodule.num_classes,
    loss=None, # computed by sample_elbo in training_step()
    optim_recipe=optimizer,
    lr_scheduler=scheduler,
    num_samples = 3,
    kl_weight=1/100000
    #is_ensemble=True
)

trainer.fit(model=routine, datamodule=datamodule)
results = trainer.test(model=routine, datamodule=datamodule)

INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: 
  | Name             | Type             | Params | Mode 
--------------------------------------------------------------
0 | model            | OptuNet          | 392    | train
1 | format_batch_fn  | Identity         | 0      | train
2 | val_cls_metrics  | MetricCollection | 0      | train
3 | test_cls_metrics | MetricCollection | 0      | train
4 | test_id_entropy  | Entropy          | 0      | train
5 | mixup            | Identity         | 0      | train
--------------------------------------------------------------
392       Trainable params
0         Non-trainable params
392       Total params
0.002     Total es

In [None]:
#save state dictionary on drive (model parameters)
model_path = "/content/drive/MyDrive/optunet_trained_model.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")


Model saved to /content/drive/MyDrive/optunet_trained_model.pth


## Confidence Interval Evaluation Function
* Sample predictions from your Bayesian model (OptuNet) 3 times for each input.
* Calculate the mean and standard deviation of these predictions.
* Use these statistics to construct a confidence interval, assuming a Gaussian distribution.




In [33]:
import torch.nn.functional as F

def evaluate_confidence_interval(model, dataloader, confidence=0.95):
    model.eval()

    all_preds = []
    all_targets = []
    lower_bounds = []
    upper_bounds = []

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.to(DEVICE)
            targets = targets.to(DEVICE)

            # For Monte carlo estimation of the ELBO using 3 samples
            preds = torch.stack([model(inputs) for _ in range(3)], dim=0)

            # Calculate mean and standard deviation
            preds_mean = preds.mean(dim=0)
            preds_std = preds.std(dim=0)

            # Apply softmax to the mean predictions for probabilities
            preds_mean = F.softmax(preds_mean, dim=1)


            # Compute confidence intervals
            z_value = st.norm.ppf(1 - (1 - confidence) / 2)  #dynamically computing the z-score for a given confidence level (e.g., 95%, 99%).
            ci_lower = preds_mean - z_value * preds_std
            ci_upper = preds_mean + z_value * preds_std

            all_preds.append(preds_mean)
            all_targets.append(targets)
            lower_bounds.append(ci_lower)
            upper_bounds.append(ci_upper)

    # Concatenate results for all batches
    all_preds = torch.cat(all_preds)
    all_targets = torch.cat(all_targets)
    lower_bounds = torch.cat(lower_bounds)
    upper_bounds = torch.cat(upper_bounds)

    return all_preds, all_targets, lower_bounds, upper_bounds


In [19]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DATA_PATH = "data"
BATCH_SIZE = 64
NUM_WORKERS = 4

In [20]:
# Download dataset to the specified path
MNIST(root=DATA_PATH, train=True, download=True)
MNIST(root=DATA_PATH, train=False, download=True)

Dataset MNIST
    Number of datapoints: 10000
    Root location: data
    Split: Test

In [21]:
datamodule = MNISTDataModule(
    root=Path(DATA_PATH),
    batch_size=BATCH_SIZE,
    eval_ood=False,
    num_workers=NUM_WORKERS
)


In [23]:
from torch.utils.data import DataLoader

In [24]:
datamodule.setup(stage='test')
test_dataset = datamodule.test  # Ensure `self.test` exists and is initialized correctly.
test_dataloader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
)


In [25]:
for inputs, targets in test_dataloader:
    print(f"Inputs shape: {inputs.shape}")
    print(f"Targets shape: {targets.shape}")
    break


Inputs shape: torch.Size([64, 1, 28, 28])
Targets shape: torch.Size([64])


In [27]:
#(reinitializing the optunet architecture)
model = OptuNet(num_classes=datamodule.num_classes)
#load state dictionary
model_path = "/content/drive/MyDrive/optunet_trained_model.pth"
model.load_state_dict(torch.load(model_path))
model.to(DEVICE)  # Send model to appropriate device
print("Model loaded successfully!")


Model loaded successfully!


  model.load_state_dict(torch.load(model_path))


In [34]:


preds, targets, lower_bounds, upper_bounds = evaluate_confidence_interval(model, test_dataloader, confidence=0.95)

## Scoring

### AUPR

In [35]:
from sklearn.metrics import precision_recall_curve, auc
from sklearn.metrics import roc_curve

In [39]:

probs = torch.softmax(preds, dim=1)[:, 1].cpu().numpy()  # Adjust for binary classification
true_labels = targets.cpu().numpy()
# Extract probabilities and true labels
probs = torch.softmax(preds, dim=1)[:, positive_class].cpu().numpy()  # Probability of the positive class
binary_labels = (true_labels == positive_class).astype(int)  # Binarized labels

# Precision-recall curve
precision, recall, _ = precision_recall_curve(binary_labels, probs)
aupr = auc(recall, precision)
print(f"AUPR for class {positive_class}: {aupr}")


AUPR for class 1: 0.9828197600748013


### FPR95

In [41]:


fpr, tpr, thresholds = roc_curve(binary_labels, probs)
# Find the threshold where TPR is closest to 0.95
tpr_95_index = (tpr >= 0.95).argmax()
# FPR at TPR = 0.95
fpr_95 = fpr[tpr_95_index]
print(f"FPR at TPR=0.95: {fpr_95}")


FPR at TPR=0.95: 0.011844331641285956


### Accuracy

In [44]:
from sklearn.metrics import accuracy_score
import numpy as np

In [47]:
#For Multi-Class Classification
# Assuming `preds` are raw outputs from the model, and `targets` are the true labels
_, predicted_labels = torch.max(preds, 1)  # Get the index of the max log-probability
correct = (predicted_labels == targets).sum().item()  # Count correct predictions
accuracy = correct / targets.size(0)  # Calculate accuracy as a ratio
print(f'Accuracy: {accuracy * 100:.2f}%')



Accuracy: 82.38%


In [53]:
import numpy as np
from sklearn.calibration import calibration_curve

def calculate_ace(preds, targets, num_bins=10):
    # Convert predictions to probabilities
    probs = torch.softmax(preds, dim=1).cpu().numpy()  # For multi-class, probs[:, 1] for class 1, or probs[:, positive_class] for specific class
    true_labels = targets.cpu().numpy()

    # For binary classification, you can use probs[:, 1] for class 1 (positive_class)
    prob_true = probs[:, positive_class]

    binary_labels = (true_labels == positive_class).astype(int)

    # Get calibration curve: This will return the true fraction of positives and predicted probabilities for each bin
    fraction_of_positives, mean_predicted_value = calibration_curve(binary_labels, prob_true, n_bins=num_bins)

    # Calculate ACE: This is the average absolute difference between the true fraction of positives and the predicted probability
    ace = np.mean(np.abs(fraction_of_positives - mean_predicted_value))
    return ace

# Example usage
ace = calculate_ace(preds, targets)
print(f"Average Calibration Error (ACE): {ace:.4f}")


Average Calibration Error (ACE): 0.4780
