# Bayesian LeNet

In this notebook I follow Kumar Shridhar's approach to building a Bayesian Version of LeNet (https://github.com/kumar-shridhar/PyTorch-BayesianCNN/)

The goal here is to understand how he generates the estiamtes for aleatoric and epistemic uncertainty from the trained model

### Importing Dependencies:

In [1]:
#import sys
#!{sys.executable} -m pip install --user torch

In [2]:
# from lenet.py / for the models
from torch._C import InferredType
from torch.nn import Module # Base class for all nn modules in PyTorch
from torch.nn import Conv2d
from torch.nn import Linear
from torch.nn import MaxPool2d
from torch.nn import ReLU
from torch.nn import LogSoftmax
from torch import flatten

In [23]:
# from train.py / for training
import matplotlib
from torch._C import device
from torch.functional import broadcast_shapes
matplotlib.use("Agg")
from sklearn.metrics import classification_report
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from torch.utils.data import Subset
from torchvision.transforms import ToTensor
from torchvision.datasets import MNIST
from torch.optim import Adam
from torch import nn
import matplotlib.pyplot as plt
import numpy as np
import argparse
import torch
import time

In [24]:
# for prediction / testing
import imutils

In [4]:
#import sys  
#!{sys.executable} -m pip install --user torchvision

In [5]:
from layers import BBBConv2d, BBBLinear, FlattenLayer

## Architecture

### Architecture for LeNet

In [6]:
# LeNet architecture 
class LeNet(Module):
    def __init__(self, n_channels, n_classes):
        '''
        n_channels: the number of channels in the input images
        n_classes: the number of unique labels/outputs represented in the dataset

        '''
        # calling the constructor for the Model class
        super(LeNet, self).__init__()

        # A set of Conv, ReLU and Pool layers
        self.conv1 = Conv2d(in_channels=n_channels, out_channels=20, kernel_size=(5,5))
        self.relu1 = ReLU()
        self.maxpool1 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # A second set of Conv, ReLU and Pool layers
        self.conv2 = Conv2d(in_channels=20, out_channels=50, kernel_size=(5,5))
        self.relu2 = ReLU()
        self.maxpool2 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # A fully connected layer followed by a ReLU
        self.fc1 = Linear(in_features=800, out_features=500)
        self.relu3 = ReLU()

        # Initializing our softmax classifier
        self.fc2 = Linear(in_features=500, out_features=n_classes)
        self.logSoftmax = LogSoftmax(dim=1)

    def forward(self, x):
        '''
        Accepts a batch of input data to feed to the network, 
        returns the network predictions

        '''
        # pass the input through our first set of CONV => RELU =>
		# POOL layers 
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
		# pass the output from the previous layer through the second
		# set of CONV => RELU => POOL layers
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
		# flatten the output from the previous layer and pass it
		# through our only set of FC => RELU layers
        x = flatten(x, 1)
        x = self.fc1(x)
        x = self.relu3(x)
		# pass the output to our softmax classifier to get our output
		# predictions
        x = self.fc2(x)
        output = self.logSoftmax(x)
		# return the output predictions
        return output

### Bayesian Version

For the bayesian version, we need to: 

* replace the Conv2d layers with 'BBBConv2d' layers (BBB = Bayes by Backprop)
* replace the fully connected layers with 'BBBLinear' layers
* replace the default pytorch flatten layer with 'FlattenLayer'

In [7]:
class Bayesian_LeNet(Module):
    def __init__(self, n_channels, n_classes):
        '''
        n_channels: the number of channels in the input images
        n_classes: the number of unique labels/outputs represented in the dataset

        '''
        # calling the constructor for the Model class
        super(Bayesian_LeNet, self).__init__()

        # A set of Conv, ReLU and Pool layers
        self.conv1 = BBBConv2d(in_channels=n_channels, out_channels=20, kernel_size=(5,5))
        self.relu1 = ReLU()
        self.maxpool1 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # A second set of Conv, ReLU and Pool layers
        self.conv2 = BBBConv2d(in_channels=20, out_channels=50, kernel_size=(5,5))
        self.relu2 = ReLU()
        self.maxpool2 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # A fully connected layer followed by a ReLU
        self.flatten = FlattenLayer(800)
        self.fc1 = BBBLinear(in_features=800, out_features=500)
        self.relu3 = ReLU()

        # Initializing our softmax classifier
        self.fc2 = BBBLinear(in_features=500, out_features=n_classes)
        self.logSoftmax = LogSoftmax(dim=1)

    def forward(self, x):
        '''
        Accepts a batch of input data to feed to the network, 
        returns the network predictions

        '''
        # pass the input through our first set of CONV => RELU =>
		# POOL layers 
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
		# pass the output from the previous layer through the second
		# set of CONV => RELU => POOL layers
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
		# flatten the output from the previous layer and pass it
		# through our only set of FC => RELU layers
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu3(x)
		# pass the output to our softmax classifier to get our output
		# predictions
        x = self.fc2(x)
        output = self.logSoftmax(x)
		# return the output predictions
        return output

## Training

In [8]:
def train(model_class, save_spot='LeNet_example/output/model.pth'):
    '''
    Takes a class of model to init and train,
    as well as a path to save the trained model to
    '''
    # setting hyperparameters
    learning_rate = 1e-3
    batch_size = 64
    epochs = 10
    # (1 - train_split) of the training data will be used for validation
    train_split = 0.75

    # setting training device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # loading the MNIST dataset
    print("Loading MNIST dataset...")
    train_data = MNIST(root="data", train=True, download=True, transform=ToTensor())
    test_data = MNIST(root="data", train=False, download=True, transform=ToTensor())

    # splitting the training data into training and validation data
    print("Making training and validation sets...")
    n_train = int(train_split * len(train_data))
    n_val = len(train_data) - n_train
    (train_data, val_data) = random_split(train_data, [n_train, n_val], generator=torch.Generator().manual_seed(84))

    # initializing data loaders
    train_data_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
    val_data_loader = DataLoader(val_data, shuffle=True, batch_size=batch_size)
    test_data_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)

    # calculating steps per epoch for training+val set
    training_steps = len(train_data_loader.dataset) // batch_size
    validation_steps = len(val_data_loader.dataset) // batch_size

    # initializing the LeNet model
    print("Initializing model...")
    model = model_class(n_channels=1, n_classes=len(train_data.dataset.classes)).to(device)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    loss_fn = nn.NLLLoss()

    # tracking training history
    hist = {
        "train_loss": [],
        "train_acc": [],
        "val_loss": [],
        "val_acc": []
    }

    # training
    print("Training the network...")
    t_start = time.time()

    for i in range(epochs):
        # set model to training mode
        model.train()
        # initializing total training, val loss
        total_train_loss = 0
        total_val_loss = 0
        # initializing the number of correct predictions
        train_n_correct = 0
        val_n_correct = 0

        # training step
        for (x, y) in train_data_loader:
            # send input to device
            (x, y) = (x.to(device), y.to(device))

            # perform a forward pass and calc the training loss
            prediction = model(x)
            loss = loss_fn(prediction, y)

            # setting gradients to zero, performing backprop and updating the weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # tracking training loss and num correct
            total_train_loss += loss
            train_n_correct += (prediction.argmax(1) == y).type(torch.float).sum().item()

        # validation step
        with torch.no_grad(): # turning off autograd for evaluation
            # setting model to evaluation mode
            model.eval()

            for (x, y) in val_data_loader:
                # send input to device
                (x, y) = (x.to(device), y.to(device))

                # perform a forward pass and calc the training loss
                prediction = model(x)
                total_val_loss += loss_fn(prediction, y)
                val_n_correct += (prediction.argmax(1) == y).type(torch.float).sum().item()

        # adding stats to the history object
        
        avg_train_loss = total_train_loss / training_steps
        avg_val_loss = total_val_loss / validation_steps
        train_n_correct = train_n_correct / len(train_data_loader.dataset)
        val_n_correct = val_n_correct / len(val_data_loader.dataset)

        hist['train_loss'].append(avg_train_loss.cpu().detach().numpy())
        hist['train_acc'].append(train_n_correct)
        hist['val_loss'].append(avg_val_loss.cpu().detach().numpy())
        hist['val_acc'].append(val_n_correct)

        # printing
        print("Epoch {}/{}".format(i+1, epochs))
        print("Training Loss: {:.6f}, Training Accuracy: {:.4f}".format(avg_train_loss, train_n_correct))
        print("Validation Loss: {:.6f}, Validation Accuracy: {:.4f}".format(avg_val_loss, val_n_correct))


    t_end = time.time()
    print("Total time: {:.2f}s".format(t_end - t_start))

    print("Evaluating trained network...")

    # turn off autograd for testing
    with torch.no_grad():
        # set model to eval mode
        model.eval()
        # empty list to store predictions
        predictions = []

        # looping over testing data
        for (x, y) in test_data_loader:
            x = x.to(device)

            # making predictions
            prediction = model(x)
            predictions.extend(prediction.argmax(axis=1).cpu().numpy())


    # generate a classification report
    print(classification_report(test_data.targets.cpu().numpy(), np.array(predictions), target_names=test_data.classes))

    # plot the training loss and accuracy
    plt.style.use("ggplot")
    plt.figure()
    plt.plot(hist["train_loss"], label="train_loss")
    plt.plot(hist["val_loss"], label="val_loss")
    plt.plot(hist["train_acc"], label="train_acc")
    plt.plot(hist["val_acc"], label="val_acc")
    plt.title("Training Loss and Accuracy on Dataset")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss/Accuracy")
    plt.legend(loc="lower left")
    #plt.savefig(args.plot)
    # serialize the model to disk
    torch.save(model, save_spot)

In [9]:
#train(LeNet, save_spot='output/freq_model.pth')

Loading MNIST dataset...
Making training and validation sets...
Initializing model...
Training the network...
Epoch 1/10
Training Loss: 0.181599, Training Accuracy: 0.9445
Validation Loss: 0.086474, Validation Accuracy: 0.9726
Epoch 2/10
Training Loss: 0.054114, Training Accuracy: 0.9832
Validation Loss: 0.053691, Validation Accuracy: 0.9829
Epoch 3/10
Training Loss: 0.036515, Training Accuracy: 0.9882
Validation Loss: 0.047019, Validation Accuracy: 0.9855
Epoch 4/10
Training Loss: 0.026521, Training Accuracy: 0.9918
Validation Loss: 0.044159, Validation Accuracy: 0.9871
Epoch 5/10
Training Loss: 0.019976, Training Accuracy: 0.9938
Validation Loss: 0.042942, Validation Accuracy: 0.9881
Epoch 6/10
Training Loss: 0.014728, Training Accuracy: 0.9950
Validation Loss: 0.037901, Validation Accuracy: 0.9900
Epoch 7/10
Training Loss: 0.012730, Training Accuracy: 0.9959
Validation Loss: 0.042197, Validation Accuracy: 0.9891
Epoch 8/10
Training Loss: 0.011732, Training Accuracy: 0.9961
Validatio

In [10]:
train(Bayesian_LeNet, save_spot='output/bayesian_model.pth')

Loading MNIST dataset...
Making training and validation sets...
Initializing model...
Training the network...
Epoch 1/10
Training Loss: 0.471463, Training Accuracy: 0.8547
Validation Loss: 0.141901, Validation Accuracy: 0.9557
Epoch 2/10
Training Loss: 0.110058, Training Accuracy: 0.9666
Validation Loss: 0.080635, Validation Accuracy: 0.9746
Epoch 3/10
Training Loss: 0.071944, Training Accuracy: 0.9779
Validation Loss: 0.071615, Validation Accuracy: 0.9782
Epoch 4/10
Training Loss: 0.053005, Training Accuracy: 0.9836
Validation Loss: 0.060796, Validation Accuracy: 0.9820
Epoch 5/10
Training Loss: 0.042003, Training Accuracy: 0.9867
Validation Loss: 0.053335, Validation Accuracy: 0.9837
Epoch 6/10
Training Loss: 0.034632, Training Accuracy: 0.9888
Validation Loss: 0.052742, Validation Accuracy: 0.9851
Epoch 7/10
Training Loss: 0.030196, Training Accuracy: 0.9902
Validation Loss: 0.054316, Validation Accuracy: 0.9840
Epoch 8/10
Training Loss: 0.022126, Training Accuracy: 0.9930
Validatio

## Prediction

Pasted code from making predictions, getting uncertainties

In [None]:
# from predict.py
model_path = 'LeNet_example/output/freq_model.pth'

# setting the device 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# loading the MNIST dataset and grabbing 10 random data points
print("Loading data...")
test_data = MNIST(root='data', train=False, download=True, transform=ToTensor())
indxs = np.random.choice(range(len(test_data)), size=(10,))
test_data = Subset(test_data, indxs)
#initializing test data loader
test_data_loader = DataLoader(test_data, batch_size=1)
# loading the model
model = torch.load(model_path).to(device)
model.eval()

In [None]:
'''
from predict.py:

trial_no = 0
for (img, label) in test_data_loader:
    trail_no += 1
    # get original image and ground truth
    original_img = img.numpy().squeeze(axis=(0,1))
    ground_truth_label = test_data.dataset.classes[label.numpy()[0]]

    # send input to device and make prediction
    img = img.to(device)
    #prediction = model(img)
    prediction, epistemic_ucs, aleatoric_ucs = get_uncertainty_per_image(model, img)

'''

In [None]:
def get_uncertainty_per_image(model, input_image, T=15, normalized=False):
    
    '''
    original code:
    
    input_image = input_image.unsqueeze(0)
    input_images = input_image.repeat(T, 1, 1, 1)
    net_out, _ = model(input_images)
    '''

    #net_out = [model(img) for img in input_images]

    net_out = model(input_image)

    pred = torch.mean(net_out, dim=0).cpu().detach().numpy()
    if normalized:
        prediction = F.softplus(net_out)
        p_hat = prediction / torch.sum(prediction, dim=1).unsqueeze(1)
    else:
        p_hat = F.softmax(net_out, dim=1)
    p_hat = p_hat.detach().cpu().numpy()
    p_bar = np.mean(p_hat, axis=0)

    temp = p_hat - np.expand_dims(p_bar, 0)
    epistemic = np.dot(temp.T, temp) / T
    epistemic = np.diag(epistemic)

    aleatoric = np.diag(p_bar) - (np.dot(p_hat.T, p_hat) / T)
    aleatoric = np.diag(aleatoric)

    return pred, epistemic, aleatoric

Grabbing sample image, trying to get a number for uncertainty from the bayesian net

Later we'll need to figure out what number to use for uncertainty for the freq net

Then we can redo what's in predict.py, i.e. making a nice summary with some examples from MNIST

In [16]:
# loading the model and setting it to evaluation mode

model_path = 'output/bayesian_model.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = torch.load(model_path).to(device)
model.eval()

Bayesian_LeNet(
  (conv1): BBBConv2d()
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): BBBConv2d()
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (flatten): FlattenLayer()
  (fc1): BBBLinear()
  (relu3): ReLU()
  (fc2): BBBLinear()
  (logSoftmax): LogSoftmax(dim=1)
)

In [47]:
# grabbing some data

N_datapoints = 10
# loading MNIST
test_data = MNIST(root='data', train=False, download=True, transform=ToTensor())
# Getting N_datapoints random images
indxs = np.random.choice(range(len(test_data)), size=(N_datapoints,))
test_data = Subset(test_data, indxs)
#initializing test data loader
test_data_loader = DataLoader(test_data, batch_size=1)

In [61]:
images = []
labels = []

In [62]:
for (img, label) in test_data_loader:
    images.append(img)
    labels.append(label)

In [75]:
def make_displayable(img):
    displayable = img.numpy().squeeze(axis=(0,1))
    displayable = np.dstack([original_img] * 3)
    displayable = imutils.resize(original_img, width=128)
    return displayable

In [77]:
i = 0
for (img, label) in test_data_loader:
    plt.clf()
    plt.imshow(make_displayable(images[i]))
    plt.savefig('example_{}.png'.format(i))
    i += 1

All the images in the test_data_loader are the same?!

In [51]:
# working through the get_uncertainty per image method step by step
net_out = model(img)