In [22]:
!pip install blitz-bayesian-pytorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.datasets as dsets
import torchvision.transforms as transforms

from blitz.modules import BayesianLinear, BayesianConv2d
from blitz.losses import kl_divergence_from_nn
from blitz.utils import variational_estimator

In [24]:
import os
import typing

import numpy as np
import torch
import torch.optim
from matplotlib import pyplot as plt
from sklearn.metrics import roc_auc_score, average_precision_score
from torch import nn
from torch.nn import functional as F
from tqdm import trange
import tqdm
from torch.distributions import Poisson
from collections import deque
import copy
from enum import Enum
# @title

import abc
import warnings

import numpy as np
import torch

import os
import matplotlib.pyplot as plt
from torch.optim import SGD


In [25]:
# TODO: Reliability_diagram_1. Set `EXTENDED_EVALUATION` to `True` in order to visualize your predictions.
EXTENDED_EVALUATION = False

class Approach(Enum):
    Dummy_Trainer = 0
    MCDropout = 1
    Ensemble = 2
    Backprop = 3
    SGLD = 4
    SelfMade = 5 


def run_solution(dataset_train: torch.utils.data.Dataset, data_dir: str = os.curdir, output_dir: str = '/results/'):
    """
    Run your task 2 solution.
    This method should train your model, evaluate it, and return the trained model at the end.
    Make sure to preserve the method signature and to return your trained model,
    else the checker will fail!

    :param dataset_train: Training dataset
    :param data_dir: Directory containing the datasets
    :return: Your trained model
    """

    # TODO: Combined model_1: Choose if you want to combine with multiple methods or not
    combined_model = False

    if not combined_model:
        
        # TODO General_1: Choose your approach here
        approach = Approach.Dummy_Trainer

        # if approach == Approach.Dummy_Trainer:
        #     trainer = DummyTrainer(dataset_train=dataset_train)
        # if approach == Approach.MCDropout:
        #     trainer = DropoutTrainer(dataset_train=dataset_train)
        # if approach == Approach.Ensemble:
        #     trainer = EnsembleTrainer(dataset_train=dataset_train)
        # if approach == Approach.Backprop:
        #     trainer = BackpropTrainer(dataset_train=dataset_train)
        # if approach == Approach.SGLD:
        #     trainer = SGLDTrainer(dataset_train=dataset_train)
        # if approach == Approach.SelfMade:
        #     trainer = SelfTrainer(dataset_train=dataset_train)
        # trainer = BayesTrainer(dataset_train=dataset_train)
        trainer = BayesTrainerD(dataset_train=dataset_train)
        # Train the model
        print('Training model', approach.name)
        trainer.train()

        # Predict using the trained model
        print('Evaluating model on training data')
        eval_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=64, shuffle=False, drop_last=False
        )
        evaluate(trainer, eval_loader, data_dir, output_dir)

        return trainer
        
    elif combined_model:
        # using combined model

        # TODO: Combined model_2: If you want to use combined methods 
        # you can set the trainers you want to use like below
        trainer1 = DummyTrainer(dataset_train=dataset_train)
        trainer2 = DummyTrainer(dataset_train=dataset_train)
        trainer3 = DummyTrainer(dataset_train=dataset_train)
        trainer_list = [trainer1,trainer2,trainer3]

        # Train the combined model
        for trainer_i in trainer_list:
            trainer_i.train()

        # Evaulate each of the combined models
        print('Evaluating model on training data')
        eval_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=64, shuffle=False, drop_last=False
        )
        for trainer_i in trainer_list:
            evaluate(trainer_i, eval_loader, data_dir, output_dir)

    # IMPORTANT: return your combined model here!
        return trainer_list

In [26]:
def calc_calibration_curve(predicted_probabilities: np.ndarray, labels: np.ndarray, num_bins=30)-> dict:
    """
    Calculate ece and understand what is good calibration. This task is part of the 
    extended evaluation and not required for passing. 
    """

    num_samples, num_classes = predicted_probabilities.shape
    predicted_classes = np.argmax(predicted_probabilities, axis=1) 
    confidences = predicted_probabilities[range(num_samples), predicted_classes]
    bins = np.linspace(start=0,stop=1,num=num_bins+1)

    bin_lowers = bins[:-1]
    bin_uppers = bins[1:]
    accuracies = predicted_classes == labels

    calib_confidence = np.zeros(num_bins,dtype=np.float)
    calib_accuracy = np.zeros(num_bins,dtype=np.float)
    ratios = np.zeros(num_bins,dtype=np.float)

    for bin_i, (bin_lower, bin_upper) in enumerate(zip(bin_lowers, bin_uppers)):
        # TODO: Reliability_diagram_2. Calculate calibration confidence, accuracy in every bin
        calib_confidence[bin_i] = None
        calib_accuracy[bin_i] = None
        ratios[bin_i] = None
    
    return {"calib_confidence": calib_confidence, "calib_accuracy": calib_accuracy, "p": ratios, "bins": bins}


class Framework(object):
    def __init__(self, dataset_train:torch.utils.data.Dataset, *args, **kwargs):
        """
        Basic Framework for your bayesian neural network.
        Other solutions like MC Dropout, Ensemble learning will based upon this.
        """
        self.train_set = dataset_train
        self.print_interval = 100 # number of batches until updated metrics are displayed during training

    def train(self):
        raise NotImplementedError()

    def predict(self, data_loader: torch.utils.data.DataLoader) -> np.ndarray:
        """
        Predict the class probabilities using your trained model.
        This method should return an (num_samples, 10) NumPy float array
        such that the second dimension sums up to 1 for each row.

        :param data_loader: Data loader yielding the samples to predict on
        :return: (num_samples, 10) NumPy float array where the second dimension sums up to 1 for each row
        """
        probability_batches = []
        
        for batch_x, _ in tqdm.tqdm(data_loader):
            current_probabilities = self.predict_probabilities(batch_x).detach().cpu().numpy()
            probability_batches.append(current_probabilities)

        output = np.concatenate(probability_batches, axis=0)
        assert isinstance(output, np.ndarray)
        assert output.ndim == 2 and output.shape[1] == 10
        assert np.allclose(np.sum(output, axis=1), 1.0)
        return output

    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        raise NotImplementedError()

In [27]:
def combined_predict(data_loader: torch.utils.data.DataLoader, models_list: list) -> np.ndarray:
    """
    Predict the class probabilities using a combination of your trained model.
    This method should return an (num_samples, 10) NumPy float array - the same as 
    predict()- such that the second dimension sums up to 1 for each row.
    :param data_loader: Data loader yielding the samples to predict on
    :return: (num_samples, 10) NumPy float array where the second dimension sums up to 1 for each row
    """

    probability_batches = []

    for batch_x, _ in tqdm.tqdm(data_loader):
        # TODO: Combined model_3. Predict with your combined model
        current_probabilities = None
        probability_batches.append(current_probabilities)
        
    output = np.concatenate(probability_batches, axis=0)
    assert isinstance(output, np.ndarray)
    assert output.ndim == 2 and output.shape[1] == 10
    assert np.allclose(np.sum(output, axis=1), 1.0)
    return output

In [28]:
class DummyTrainer(Framework):
    """
    Trainer implementing a simple feedforward neural network.
    You can learn how to build your own trainer and use this model as a reference/baseline for 
    the calibration of a standard Neural Network. 
    """
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        self.batch_size = 128
        self.learning_rate = 1e-3
        self.num_epochs = 100


        self.network = MNISTNet(in_features=28*28,out_features=10)
        self.train_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=self.batch_size, shuffle=True, drop_last=True
            )
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=self.learning_rate) 
        
    def train(self):
        self.network.train()
        progress_bar = trange(self.num_epochs)
        for _ in progress_bar:
            for batch_idx, (batch_x, batch_y) in enumerate(self.train_loader):
                # batch_x are of shape (batch_size, 784), batch_y are of shape (batch_size,)

                self.network.zero_grad()

                # Perform forward pass
                current_logits = self.network(batch_x)

                # Calculate the loss
                # We use the negative log likelihood as the loss
                # Combining nll_loss with a log_softmax is better for numeric stability
                loss = F.nll_loss(F.log_softmax(current_logits, dim=1), batch_y, reduction='sum')

                # Backpropagate to get the gradients
                loss.backward()

                self.optimizer.step()

                # Update progress bar with accuracy occasionally
                if batch_idx % self.print_interval == 0:
                    current_logits = self.network(batch_x)

                    current_accuracy = (current_logits.argmax(axis=1) == batch_y).float().mean()
                    progress_bar.set_postfix(loss=loss.item(), acc=current_accuracy.item())

    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        # using the confidence as estimated probaility, 
        self.network.eval()
        # assert x.shape[1] == 28 ** 2
        with torch.no_grad():
          estimated_probability = F.softmax(self.network(x), dim=1)
          assert estimated_probability.shape == (x.shape[0], 10)
        return estimated_probability

In [29]:
@variational_estimator
class BayesianCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = BayesianConv2d(1, 6, (5,5))
        self.conv2 = BayesianConv2d(6, 16, (5,5))
        self.fc1   = BayesianLinear(256, 120)
        self.fc2   = BayesianLinear(120, 84)
        self.fc3   = BayesianLinear(84, 10)

    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = F.max_pool2d(out, 2)
        out = F.relu(self.conv2(out))
        out = F.max_pool2d(out, 2)
        out = out.view(out.size(0), -1)
        out = F.relu(self.fc1(out))
        out = F.relu(self.fc2(out))
        out = self.fc3(out)
        return out

In [30]:
@variational_estimator
class BayesianCNND(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = BayesianConv2d(1, 6, (5,5))
        self.conv2 = BayesianConv2d(6, 16, (5,5))
        self.fc1   = BayesianLinear(256, 120)
        self.fc2   = BayesianLinear(120, 84)
        self.fc3   = BayesianLinear(84, 10)
        self.dropout = nn.Dropout(p=0.3)

    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = F.max_pool2d(out, 2)
        out = F.relu(self.conv2(out))
        out = F.max_pool2d(out, 2)
        out = out.view(out.size(0), -1)
        out = F.relu(self.dropout(self.fc1(out)))
        out = F.relu(self.dropout(self.fc2(out)))
        out = self.fc3(out)
        return out

In [31]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [32]:
class BayesTrainer(Framework):
    """
    Trainer implementing a simple feedforward neural network.
    You can learn how to build your own trainer and use this model as a reference/baseline for 
    the calibration of a standard Neural Network. 
    """
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        self.batch_size = 64
        self.learning_rate = 1e-3
        self.num_epochs = 100


        self.network = BayesianCNN().to(device)
        self.train_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=self.batch_size, shuffle=True, drop_last=True
            )
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=self.learning_rate) 
        
    def train(self):
        self.network.train()
        iteration = 0
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        # classifier = BayesianCNN().to(device)
        # optimizer = optim.Adam(classifier.parameters(), lr=0.001)
        # criterion = F.nll_loss
        criterion = torch.nn.CrossEntropyLoss()
        for epoch in trange(self.num_epochs):
            for i, (datapoints, labels) in enumerate(self.train_loader):
                datapoints = datapoints.unsqueeze(1).to(device)
                self.optimizer.zero_grad()
                loss = self.network.sample_elbo(inputs=datapoints.to(device),
                                  labels=labels.to(device),
                                  criterion=criterion,
                                  sample_nbr=3,
                                  complexity_cost_weight=1/50000)
                #print(loss)
                loss.backward()
                self.optimizer.step()
                
                iteration += 1
                if iteration%250==0:
                    print(loss)
                    correct = 0
                    total = 0
                    print('Evaluating model on training data')
                    eval_loader = torch.utils.data.DataLoader(
                        dataset_train, batch_size=64, shuffle=False, drop_last=False
                    )
                    evaluate(self, eval_loader, data_dir, output_dir)
                    torch.save(self.network.state_dict(), f'bayesnn{iteration}.pth')
        torch.save(self.network.state_dict(), 'bayesnn.pth')
    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        # using the confidence as estimated probaility, 
        # print(f"x shape: {x.shape}")
        self.network.eval()
        # assert x.shape[1] == 28 ** 2
        x = x.squeeze(0).unsqueeze(1)
        estimated_probability = F.softmax(self.network(x), dim=1)
        # assert estimated_probability.shape == (x.shape[0], 10)
        return estimated_probability

In [33]:
class BayesTrainerD(Framework):
    """
    Trainer implementing a simple feedforward neural network.
    You can learn how to build your own trainer and use this model as a reference/baseline for 
    the calibration of a standard Neural Network. 
    """
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        self.batch_size = 64
        self.learning_rate = 0.0005
        self.num_epochs = 100


        self.network = BayesianCNND().to(device)
        self.train_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=self.batch_size, shuffle=True, drop_last=True
            )
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=self.learning_rate) 
        
    def train(self):
        self.network.train()
        iteration = 0
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        # classifier = BayesianCNN().to(device)
        # optimizer = optim.Adam(classifier.parameters(), lr=0.001)
        # criterion = F.nll_loss
        criterion = torch.nn.CrossEntropyLoss()
        for epoch in trange(self.num_epochs):
            for i, (datapoints, labels) in enumerate(self.train_loader):
                datapoints = datapoints.unsqueeze(1).to(device)
                self.optimizer.zero_grad()
                loss = self.network.sample_elbo(inputs=datapoints.to(device),
                                  labels=labels.to(device),
                                  criterion=criterion,
                                  sample_nbr=3,
                                  complexity_cost_weight=1/50000)
                #print(loss)
                loss.backward()
                self.optimizer.step()
                
                iteration += 1
                if iteration%250==0:
                    print(loss)
                    correct = 0
                    total = 0
                    print('Evaluating model on training data')
                    eval_loader = torch.utils.data.DataLoader(
                        dataset_train, batch_size=64, shuffle=False, drop_last=False
                    )
                    evaluate(self, eval_loader, data_dir, output_dir)
                 
    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        # using the confidence as estimated probaility, 
        # print(f"x shape: {x.shape}")
        self.network.eval()
        # enable_dropout(self.network)
        # assert x.shape[1] == 28 ** 2
        with torch.no_grad():
          x = x.squeeze(0).unsqueeze(1)
          estimated_probability = F.softmax(self.network(x), dim=1)
        
        # estimated_probability = F.softmax(self.network(x), dim=1)
        # assert estimated_probability.shape == (x.shape[0], 10)
        return estimated_probability

def enable_dropout(model):
    """ Function to enable the dropout layers during test-time """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.train()

In [34]:
class SelfMadeNetwork(nn.Module):
    def __init__(self,
                in_features: int, 
                out_features: int,
                ):
        super().__init__()
        # TODO General_3: Play around with the network structure.
        # You can customize your own model here. 
        self.layer1 = None
        self.layer2 = None
        self.layer3 = None

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        class_probs = self.layer3(x)
        return class_probs

class DropoutTrainer(Framework):
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        # TODO: MC_Dropout_4. Do experiments and tune hyperparameters
        self.batch_size = 128
        self.learning_rate = 1e-3
        self.num_epochs = 100
        # torch.manual_seed(0) # set seed for reproducibility
        
        # TODO: MC_Dropout_1. Initialize the MC_Dropout network and optimizer here
        # You can check the Dummy Trainer above for intuition about what to do
        self.network = None
        self.optimizer = None
        

    def train(self):
        self.network.train()
        progress_bar = trange(self.num_epochs)
        for _ in progress_bar:
            for batch_idx, (batch_x, batch_y) in enumerate(self.train_loader):
                # batch_x are of shape (batch_size, 784), batch_y are of shape (batch_size,)

                self.network.zero_grad()
                # TODO: MC_Dropout_2. Implement MCDropout training here
                # You need to calculate the loss based on the literature
                loss = None

                # Backpropagate to get the gradients
                loss.backward()

                self.optimizer.step()
                # Update progress bar with accuracy occasionally
                if batch_idx % self.print_interval == 0:
                    current_logits = self.network(batch_x)
                    current_accuracy = (current_logits.argmax(axis=1) == batch_y).float().mean()
                    progress_bar.set_postfix(loss=loss.item(), acc=current_accuracy.item())
          

    def predict_probabilities(self, x: torch.Tensor, num_sample=100) -> torch.Tensor:
        assert x.shape[1] == 28 ** 2
        self.network.eval()

        # TODO: MC_Dropout_3. Implement your MC_dropout prediction here
        # You need to sample from your trained model here multiple times
        # in order to implement Monte Carlo integration
        estimated_probability = None
        
        assert estimated_probability.shape == (x.shape[0], 10)  
        return estimated_probability


class EnsembleTrainer(Framework):
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        # TODO: Ensemble_4. Do experiments and tune hyperparameters
        self.batch_size = 128
        self.learning_rate = 1e-3
        self.num_epochs = 100

        # TODO: Ensemble_1.  initialize the Ensemble network list and optimizer.
        # You can check the Dummy Trainer above for intution about what to do
        # You need to build an ensemble of initialized networks here
        self.EnsembleNetworks = [None]
        self.optimizer = None

    def train(self):

        for network in self.EnsembleNetworks:   
            network.train()
            progress_bar = trange(self.num_epochs)
            for _ in progress_bar:
                for batch_idx, (batch_x, batch_y) in enumerate(self.train_loader):
                    # batch_x are of shape (batch_size, 784), batch_y are of shape (batch_size,)

                    network.zero_grad()
                    # TODO: Ensemble_2. Implement Ensemble training here
                    # You need to calculate the loss based on the literature
                    loss = None

                    # Backpropagate to get the gradients
                    loss.backward()

                    self.optimizer.step()

                    # Update progress bar with accuracy occasionally
                    if batch_idx % self.print_interval == 0:
                        current_logits = self.network(batch_x)
                        current_accuracy = (current_logits.argmax(axis=1) == batch_y).float().mean()
                        progress_bar.set_postfix(loss=loss.item(), acc=current_accuracy.item())

    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        assert x.shape[1] == 28 ** 2
        for network in self.EnsembleNetworks:  
            network.eval() 

        # TODO: Ensemble_3. Implement Ensemble prediction here
        # You need obtain predictions from each ensemble member and think about 
        # how to combine the results from each of them
        estimated_probability = None
        
        assert estimated_probability.shape == (x.shape[0], 10)  
        return estimated_probability


class SGLDTrainer(Framework):
    def __init__(self, dataset_train,
                 *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        # TODO: SGLD_4. Do experiments and tune hyperparameters
        self.batch_size = 128
        self.learning_rate = 1e-3
        self.num_epochs = 100
        self.burn_in = 2
        self.sample_interval = 3
        self.max_size = 10


        # TODO: SGLD_1.  initialize the SGLD network.
        # You can check the Dummy Trainer above for intution about what to do
        self.network = None
        
        # SGLD optimizer is provided
        self.optimizer = SGLD(self.network.parameters(),lr = self.learning_rate)

        # deques support bi-directional addition and deletion
        # You can add models in the right side of a deque by append()
        # You can delete models in the left side of a deque by popleft()
        self.SGLDSequence = deque() 

    def train(self):
        num_iter = 0
        print('Training model')

        self.network.train()
        progress_bar = trange(self.num_epochs)
        
        for _ in progress_bar:
            num_iter+=1

            for batch_idx, (batch_x, batch_y) in enumerate(self.train_loader):
                self.network.zero_grad()

                # Perform forward pass
                current_logits = self.network(batch_x)

                # Calculate the loss
                # TODO: SGLD_1. Implement SGLD training here
                # You need to calculate the loss based on the literature
                loss = None

                # Backpropagate to get the gradients
                loss.backward()

                self.optimizer.step()

                if batch_idx % self.print_interval == 0:
                    current_logits = self.network(batch_x)
                    current_accuracy = (current_logits.argmax(axis=1) == batch_y).float().mean()
                    progress_bar.set_postfix(loss=loss.item(), acc=current_accuracy.item())
  
            # TODO: SGLD_2. save the model samples if it satisfies the following conditions:
            # We are 1) past the burn-in epochs and 2) reached one of the regular sampling intervals we save the model at
            # If the self.SGLDSequence already exceeded the maximum length then we have to delete the oldest model
            if None:
                self.SGLDSequence # add model
            if None:
                self.SGLDSequence # remove model

    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        assert x.shape[1] == 28 ** 2
        self.network.eval()

        # TODO SGLD_3: Implement SGLD predictions here
        # You need to obtain the prediction from each network
        # in SGLDSequence and combine the predictions
        estimated_probability = None
        
        assert estimated_probability.shape == (x.shape[0], 10)  
        return estimated_probability


class BackpropTrainer(Framework):
    def __init__(self, dataset_train,  *args, **kwargs):
        super().__init__(dataset_train, *args, **kwargs)

        # Hyperparameters and general parameters
        # TODO: Backprop_7 Tune parameters and add more if necessary
        self.hidden_features=(100,100)
        self.batch_size = 128
        self.num_epochs = 10
        learning_rate = 1e-3

        self.network = BayesNet(in_features=28 * 28, hidden_features=self.hidden_features, out_features=10)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)
        self.train_loader = torch.utils.data.DataLoader(
            dataset_train, batch_size=self.batch_size, shuffle=True, drop_last=True
            )

    def train(self):

        self.network.train()

        progress_bar = trange(self.num_epochs)
        for _ in progress_bar:
            num_batches = len(self.train_loader)
            for batch_idx, (batch_x, batch_y) in enumerate(self.train_loader):
                # batch_x are of shape (batch_size, 784), batch_y are of shape (batch_size,)

                self.network.zero_grad()

                # TODO: Backprop_6: Implement Bayes by backprop training here
                loss = None
                self.optimizer.step()

                # Update progress bar with accuracy occasionally
                if batch_idx % self.print_interval == 0:
                    current_logits, _, _ = self.network(batch_x)
                    current_accuracy = (current_logits.argmax(axis=1) == batch_y).float().mean()
                    progress_bar.set_postfix(loss=loss.item(), acc=current_accuracy.item())


    def predict_probabilities(self, x: torch.Tensor, num_mc_samples: int = 10) -> torch.Tensor:
        """
        Predict class probabilities for the given features by sampling from this BNN.

        :param x: Features to predict on, float tensor of shape (batch_size, in_features)
        :param num_mc_samples: Number of MC samples to take for prediction
        :return: Predicted class probabilities, float tensor of shape (batch_size, 10)
            such that the last dimension sums up to 1 for each row
        """
        probability_samples = torch.stack([F.softmax(self.network(x)[0], dim=1) for _ in range(num_mc_samples)], dim=0)
        estimated_probability = torch.mean(probability_samples, dim=0)

        assert estimated_probability.shape == (x.shape[0], 10)
        assert torch.allclose(torch.sum(estimated_probability, dim=1), torch.tensor(1.0))
        return estimated_probability

class BayesianLayer(nn.Module):
    """
    Module implementing a single Bayesian feedforward layer.
    It maintains a prior and variational posterior for the weights (and biases)
    and uses sampling to approximate the gradients via Bayes by backprop.
    """
    def __init__(self, in_features: int, out_features: int, bias: bool = True):
        """
        Create a BayesianLayer.

        :param in_features: Number of input features
        :param out_features: Number of output features
        :param bias: If true, use a bias term (i.e., affine instead of linear transformation)
        """
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.use_bias = bias

        # Background Pytorch will backpropogate gradients to an object initialized with
        # torch.Parameter(...) and the object will be updated when computing loss.backwards()
        # during training. This will not happen for a torch.Tensor(...) object, which is by default a constant. 

        # TODO: Backprop_1. Create a suitable prior for weights and biases as an instance of ParameterDistribution.
        #  You can use the same prior for both weights and biases, but are also free to experiment with other priors.
        #  You can create constants using torch.tensor(...).
        #  Do NOT use torch.Parameter(...) here since the prior should not be optimized!
        #  Example: self.prior = MyPrior(torch.tensor(0.0), torch.tensor(1.0))
        self.prior = None
        assert isinstance(self.prior, ParameterDistribution)
        assert not any(True for _ in self.prior.parameters()), 'Prior cannot have parameters'

        # TODO: Backprop_1. Create a suitable variational posterior for weights as an instance of ParameterDistribution.
        #  You need to create separate ParameterDistribution instances for weights and biases,
        #  but you can use the same family of distributions for each if you want.
        #  IMPORTANT: You need to create a nn.Parameter(...) for each parameter
        #  and add those parameters as an attribute in the ParameterDistribution instances.
        #  If you forget to do so, PyTorch will not be able to optimize your variational posterior.
        # The variational posterior for weights is created here. For the biases it is created further down. 
        #  Example: self.weights_var_posterior = MyPosterior(
        #      torch.nn.Parameter(torch.zeros((out_features, in_features))),
        #      torch.nn.Parameter(torch.ones((out_features, in_features)))
        #  )
        self.weights_var_posterior = None

        assert isinstance(self.weights_var_posterior, ParameterDistribution)
        assert any(True for _ in self.weights_var_posterior.parameters()), 'Weight posterior must have parameters'

        if self.use_bias:
            # TODO: Backprop_1. Similarly as you did above for the weights, create the bias variational posterior instance here.
            #  Make sure to follow the same rules as for the weight variational posterior.
            self.bias_var_posterior = None
            assert isinstance(self.bias_var_posterior, ParameterDistribution)
            assert any(True for _ in self.bias_var_posterior.parameters()), 'Bias posterior must have parameters'
        else:
            self.bias_var_posterior = None


    def forward(self, inputs: torch.Tensor):
        """
        Perform one forward pass through this layer.
        If you need to sample weights from the variational posterior, you can do it here during the forward pass.
        Just make sure that you use the same weights to approximate all quantities
        present in a single Bayes by backprop sampling step.

        :param inputs: Flattened input images as a (batch_size, in_features) float tensor
        :return: 3-tuple containing
            i) transformed features using stochastic weights from the variational posterior,
            ii) sample of the log-prior probability, and
            iii) sample of the log-variational-posterior probability
        """
        # TODO: Backprop_2. Perform a forward pass as described in this method's docstring.
        #  Make sure to check whether `self.use_bias` is True,
        #  and if yes, include the bias as well.
        log_prior = torch.tensor(0.0)
        log_variational_posterior = torch.tensor(0.0)
        weights = None
        bias = None

        return F.linear(inputs, weights, bias), log_prior, log_variational_posterior


class BayesNet(nn.Module):
    """
    Module implementing a Bayesian feedforward neural network using BayesianLayer objects.
    """

    def __init__(self, in_features: int, hidden_features: typing.Tuple[int, ...], out_features: int):
        """
        Create a BNN.

        :param in_features: Number of input features
        :param hidden_features: Tuple where each entry corresponds to a (Bayesian) hidden layer with
            the corresponding number of features.
        :param out_features: Number of output features
        """

        super().__init__()

        feature_sizes = (in_features,) + hidden_features + (out_features,)
        num_affine_maps = len(feature_sizes) - 1
        self.layers = nn.ModuleList([
            BayesianLayer(feature_sizes[idx], feature_sizes[idx + 1], bias=True)
            for idx in range(num_affine_maps)
        ])
        self.activation = nn.ReLU()

    def forward(self, x: torch.Tensor) -> typing.Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Perform one forward pass through the BNN using a single set of weights
        sampled from the variational posterior.

        :param x: Input features, float tensor of shape (batch_size, in_features)
        :return: 3-tuple containing
            i) output features using stochastic weights from the variational posterior,
            ii) sample of the log-prior probability, and
            iii) sample of the log-variational-posterior probability
        """

        # TODO: Backprop_3. Perform a full pass through your BayesNet as described in this method's docstring.
        #  You can look at Dummy Trainer to get an idea how a forward pass might look like.
        #  Don't forget to apply your activation function in between BayesianLayers!
        log_prior = torch.tensor(0.0)
        log_variational_posterior = torch.tensor(0.0)
        output_features = None

        return output_features, log_prior, log_variational_posterior


    def predict_probabilities(self, x: torch.Tensor, num_mc_samples: int = 50) -> torch.Tensor:
        """
        Predict class probabilities for the given features by sampling from this BNN.

        :param x: Features to predict on, float tensor of shape (batch_size, in_features)
        :param num_mc_samples: Number of MC samples to take for prediction
        :return: Predicted class probabilities, float tensor of shape (batch_size, 10)
            such that the last dimension sums up to 1 for each row
        """
        probability_samples = torch.stack([F.softmax(self.forward(x)[0], dim=1) for _ in range(num_mc_samples)], dim=0)
        estimated_probability = torch.mean(probability_samples, dim=0)

        assert estimated_probability.shape == (x.shape[0], 10)
        assert torch.allclose(torch.sum(estimated_probability, dim=1), torch.tensor(1.0))
        return estimated_probability


class UnivariateGaussian(ParameterDistribution):
    """
    Univariate Gaussian distribution.
    For multivariate data, this assumes all elements to be i.i.d.
    """

    def __init__(self, mu: torch.Tensor, sigma: torch.Tensor):
        super(UnivariateGaussian, self).__init__()  # always make sure to include the super-class init call!
        assert mu.size() == () and sigma.size() == ()
        assert sigma > 0
        self.mu = mu
        self.sigma = sigma

    def log_likelihood(self, values: torch.Tensor) -> torch.Tensor:
        # TODO: Backprop_4. You need to complete the log likelihood function 
        # for the Univariate Gaussian distribution. 
        return 0.0

    def sample(self) -> torch.Tensor:
        # TODO: Backprop_4. You need to complete the sample function 
        # for the Univariate Gaussian distribution. 
        raise NotImplementedError()


class MultivariateDiagonalGaussian(ParameterDistribution):
    """
    Multivariate diagonal Gaussian distribution,
    i.e., assumes all elements to be independent Gaussians
    but with different means and standard deviations.
    This parameterizes the standard deviation via a parameter rho as
    sigma = softplus(rho).
    """

    def __init__(self, mu: torch.Tensor, rho: torch.Tensor):
        super(MultivariateDiagonalGaussian, self).__init__()  # always make sure to include the super-class init call!
        assert mu.size() == rho.size()
        self.mu = mu
        self.rho = rho

    def log_likelihood(self, values: torch.Tensor) -> torch.Tensor:
        # TODO: Backprop_5. You need to complete the log likelihood function 
        # for the Multivariate DiagonalGaussian Gaussian distribution. 
        return 0.0

    def sample(self) -> torch.Tensor:
        # TODO: Backprop_5. You need to complete the sample function 
        # for the Multivariate DiagonalGaussian Gaussian distribution. 
        raise NotImplementedError()



class SelfTrainer(Framework):
    def __init__(self, dataset_train, print_interval=50,*args, **kwargs):
        """
        Basic Framework for creating your own bayesian neural network trainer.
        """
        self.train_set = dataset_train
        self.print_interval = print_interval  # number of batches until updated metrics are displayed during training

    def train(self, num_epochs):
        raise NotImplementedError()

    def predict(self, data_loader: torch.utils.data.DataLoader) -> np.ndarray:
        """
        Predict the class probabilities using your trained model.
        This method should return an (num_samples, 10) NumPy float array
        such that the second dimension sums up to 1 for each row.

        :param data_loader: Data loader yielding the samples to predict on
        :return: (num_samples, 10) NumPy float array where the second dimension sums up to 1 for each row
        """
        probability_batches = []
        
        for batch_x, batch_y in tqdm.tqdm(data_loader):
            current_probabilities = self.predict_probabilities(batch_x).detach().cpu().numpy()
            probability_batches.append(current_probabilities)

        output = np.concatenate(probability_batches, axis=0)
        assert isinstance(output, np.ndarray)
        assert output.ndim == 2 and output.shape[1] == 10
        assert np.allclose(np.sum(output, axis=1), 1.0)
        return output

    def predict_probabilities(self, x: torch.Tensor) -> torch.Tensor:
        raise NotImplementedError()


def evaluate(model:Framework, eval_loader: torch.utils.data.DataLoader, data_dir: str, output_dir: str):
    """
    Evaluate your model.
    :param model: Trained model to evaluate
    :param eval_loader: Data loader containing the training set for evaluation
    :param data_dir: Data directory from which additional datasets are loaded
    :param output_dir: Directory into which plots are saved
    """
    print("evaulating")
    # Predict class probabilities on test data
    predicted_probabilities = model.predict(eval_loader)

    # Calculate evaluation metrics
    predicted_classes = np.argmax(predicted_probabilities, axis=1)
    actual_classes = eval_loader.dataset.tensors[1].detach().cpu().numpy()
    accuracy = np.mean((predicted_classes == actual_classes)) 
    ece_score = ece(predicted_probabilities, actual_classes)
    print(f'Accuracy: {accuracy.item():.3f}, ECE score: {ece_score:.3f}')

    # TODO: Reliability_diagram_3. draw reliability diagram on Test Data
    # You can uncomment the below code to make it run. You can learn from
    # the graph about how to improve your model. Remember first to complete 
    # the function of calc_calibration_curve.
  
    # print('Plotting reliability diagram on Test Dataset')
    # out = calc_calibration_curve(predicted_probabilities, actual_classes, num_bins = 30)
    # fig = draw_reliability_diagram(out)
    # fig.savefig(os.path.join(output_dir, 'reliability-diagram.pdf'))

    if EXTENDED_EVALUATION:
        if not os.path.isdir(output_dir):
            os.mkdir(output_dir)
        ### draw reliability diagram
        print('Plotting reliability diagram')
        reliability_diagram_datax = np.load(os.path.join(data_dir, 'reliability_diagram_data_x.npy'))
        reliability_diagram_datay = np.load(os.path.join(data_dir, 'reliability_diagram_data_y.npy'))
        for i in range(3):
            demo_prob_i = reliability_diagram_datax[i]
            demo_label_i = reliability_diagram_datay[i]
            out = calc_calibration_curve(demo_prob_i, demo_label_i, num_bins = 9)
            fig = draw_reliability_diagram(out)
            fig.savefig(os.path.join(output_dir, str(i)+'_reliability-diagram.pdf'))
        
        eval_samples = eval_loader.dataset.tensors[0].detach().cpu().numpy()

        # Determine confidence per sample and sort accordingly
        confidences = np.max(predicted_probabilities, axis=1)
        sorted_confidence_indices = np.argsort(confidences)

        # Plot samples your model is most confident about
        print('Plotting most confident MNIST predictions')
        most_confident_indices = sorted_confidence_indices[-10:]
        fig, ax = plt.subplots(4, 5, figsize=(13, 11))
        for row in range(0, 4, 2):
            for col in range(5):
                sample_idx = most_confident_indices[5 * row // 2 + col]
                ax[row, col].imshow(np.reshape(eval_samples[sample_idx], (28, 28)), cmap='gray')
                ax[row, col].set_axis_off()
                ax[row + 1, col].set_title(f'predicted {predicted_classes[sample_idx]}, actual {actual_classes[sample_idx]}')
                bar_colors = ['C0'] * 10
                bar_colors[actual_classes[sample_idx]] = 'C1'
                ax[row + 1, col].bar(
                    np.arange(10), predicted_probabilities[sample_idx], tick_label=np.arange(10), color=bar_colors
                )
        fig.suptitle('Most confident predictions', size=20)
        fig.savefig(os.path.join(output_dir, 'mnist_most_confident.pdf'))

        # Plot samples your model is least confident about
        print('Plotting least confident MNIST predictions')
        least_confident_indices = sorted_confidence_indices[:10]
        fig, ax = plt.subplots(4, 5, figsize=(13, 11))
        for row in range(0, 4, 2):
            for col in range(5):
                sample_idx = least_confident_indices[5 * row // 2 + col]
                ax[row, col].imshow(np.reshape(eval_samples[sample_idx], (28, 28)), cmap='gray')
                ax[row, col].set_axis_off()
                ax[row + 1, col].set_title(f'predicted {predicted_classes[sample_idx]}, actual {actual_classes[sample_idx]}')
                bar_colors = ['C0'] * 10
                bar_colors[actual_classes[sample_idx]] = 'C1'
                ax[row + 1, col].bar(
                    np.arange(10), predicted_probabilities[sample_idx], tick_label=np.arange(10), color=bar_colors
                )
        fig.suptitle('Least confident predictions', size=20)
        fig.savefig(os.path.join(output_dir, 'mnist_least_confident.pdf'))

        print('Plotting ambiguous and rotated MNIST confidences')
        ambiguous_samples = torch.from_numpy(np.load(os.path.join(data_dir, 'test_x.npz'))['test_x']).reshape([-1, 784])[:10]
        ambiguous_dataset = torch.utils.data.TensorDataset(ambiguous_samples, torch.zeros(10))
        ambiguous_loader = torch.utils.data.DataLoader(
            ambiguous_dataset, batch_size=10, shuffle=False, drop_last=False
        )
        ambiguous_predicted_probabilities = model.predict(ambiguous_loader)
        ambiguous_predicted_classes = np.argmax(ambiguous_predicted_probabilities, axis=1)
        fig, ax = plt.subplots(4, 5, figsize=(13, 11))
        for row in range(0, 4, 2):
            for col in range(5):
                sample_idx = 5 * row // 2 + col
                ax[row, col].imshow(np.reshape(ambiguous_samples[sample_idx], (28, 28)), cmap='gray')
                ax[row, col].set_axis_off()
                ax[row + 1, col].set_title(f'predicted {ambiguous_predicted_classes[sample_idx]}')
                ax[row + 1, col].bar(
                    np.arange(10), ambiguous_predicted_probabilities[sample_idx], tick_label=np.arange(10)
                )
        fig.suptitle('Predictions on ambiguous and rotated MNIST', size=20)
        fig.savefig(os.path.join(output_dir, 'ambiguous_rotated_mnist.pdf'))


        # Do the same evaluation as on MNIST also on FashionMNIST
        print('Predicting on FashionMNIST data')
        fmnist_samples = torch.from_numpy(np.load(os.path.join(data_dir, 'fmnist.npz'))['x_test'])#.reshape([-1, 784])
        fmnist_dataset = torch.utils.data.TensorDataset(fmnist_samples, torch.zeros(fmnist_samples.shape[0]))
        fmnist_loader = torch.utils.data.DataLoader(
            fmnist_dataset, batch_size=64, shuffle=False, drop_last=False
        )
        fmnist_predicted_probabilities = model.predict(fmnist_loader)
        fmnist_predicted_classes = np.argmax(fmnist_predicted_probabilities, axis=1)
        fmnist_confidences = np.max(fmnist_predicted_probabilities, axis=1)
        fmnist_sorted_confidence_indices = np.argsort(fmnist_confidences)

        # Plot FashionMNIST samples your model is most confident about
        print('Plotting most confident FashionMNIST predictions')
        most_confident_indices = fmnist_sorted_confidence_indices[-10:]
        fig, ax = plt.subplots(4, 5, figsize=(13, 11))
        for row in range(0, 4, 2):
            for col in range(5):
                sample_idx = most_confident_indices[5 * row // 2 + col]
                ax[row, col].imshow(np.reshape(fmnist_samples[sample_idx], (28, 28)), cmap='gray')
                ax[row, col].set_axis_off()
                ax[row + 1, col].set_title(f'predicted {fmnist_predicted_classes[sample_idx]}')
                ax[row + 1, col].bar(
                    np.arange(10), fmnist_predicted_probabilities[sample_idx], tick_label=np.arange(10)
                )
        fig.suptitle('Most confident predictions', size=20)
        fig.savefig(os.path.join(output_dir, 'fashionmnist_most_confident.pdf'))

        # Plot FashionMNIST samples your model is least confident about
        print('Plotting least confident FashionMNIST predictions')
        least_confident_indices = fmnist_sorted_confidence_indices[:10]
        fig, ax = plt.subplots(4, 5, figsize=(13, 11))
        for row in range(0, 4, 2):
            for col in range(5):
                sample_idx = least_confident_indices[5 * row // 2 + col]
                ax[row, col].imshow(np.reshape(fmnist_samples[sample_idx], (28, 28)), cmap='gray')
                ax[row, col].set_axis_off()
                ax[row + 1, col].set_title(f'predicted {fmnist_predicted_classes[sample_idx]}')
                ax[row + 1, col].bar(
                    np.arange(10), fmnist_predicted_probabilities[sample_idx], tick_label=np.arange(10)
                )
        fig.suptitle('Least confident predictions', size=20)
        fig.savefig(os.path.join(output_dir, 'fashionmnist_least_confident.pdf'))

        print('Determining suitability of your model for OOD detection')
        all_confidences = np.concatenate([confidences, fmnist_confidences])
        dataset_labels = np.concatenate([np.ones_like(confidences), np.zeros_like(fmnist_confidences)])
        print(
            'AUROC for MNIST vs. FashionMNIST OOD detection based on confidence: '
            f'{roc_auc_score(dataset_labels, all_confidences):.3f}'
        )
        print(
            'AUPRC for MNIST vs. FashionMNIST OOD detection based on confidence: '
            f'{average_precision_score(dataset_labels, all_confidences):.3f}'
        )

In [35]:
# train_loader2 = torch.utils.data.DataLoader(dataset_train, batch_size=64, shuffle=True, drop_last=True)
# for i, (datapoints, labels) in enumerate(train_loader2):
#   print(f"i: {i}, datapoints: {datapoints.unsqueeze(1).shape}, {labels.shape}")
#   break

In [None]:
# Load training data
data_dir = os.curdir
output_dir = os.curdir
raw_train_data = np.load(os.path.join(data_dir, 'train_data.npz'))
x_train = torch.from_numpy(raw_train_data['train_x']).to(device)
y_train = torch.from_numpy(raw_train_data['train_y']).long().to(device)
dataset_train = torch.utils.data.TensorDataset(x_train, y_train)

# Run actual solution
trainer = run_solution(dataset_train, data_dir=data_dir, output_dir=output_dir)


Training model Dummy_Trainer


  0%|          | 0/100 [00:00<?, ?it/s]

In [None]:


class SGLD(SGD):
    """Implementation of SGLD algorithm.
    References
    ----------
        
    """
    @torch.no_grad()
    def step(self, closure=None):
        """See `torch.optim.step’."""
        loss = super().step(closure)
        for group in self.param_groups:
            weight_decay = group['weight_decay']
            for p in group['params']:
                if p.grad is None:
                    continue
                grad_p = p.grad.data
                if weight_decay!=0:
                    grad_p.add_(alpha=weight_decay,other=p.data)
                langevin_noise = 0.1*torch.randn_like(p.data).mul_(group['lr']**0.5) #  use weight 0.1 to balance the noise
                p.data.add_(grad_p,alpha=-0.5*group['lr'])
                if torch.isnan(p.data).any(): 
                    exit('Exist NaN param after SGLD, Try to tune the parameter')
                if torch.isinf(p.data).any(): 
                    exit('Exist Inf param after SGLD, Try to tune the parameter')
                p.data.add_(langevin_noise)
        return loss

def ece(predicted_probabilities: np.ndarray, labels: np.ndarray, n_bins: int = 30) -> float:
    """
    Computes the Expected Calibration Error (ECE).
    Many options are possible; in this implementation, we provide a simple version.

    Using a uniform binning scheme on the full range of probabilities, zero
    to one, we bin the probabilities of the predicted label only (ignoring
    all other probabilities). For the ith bin, we compute the avg predicted
    probability, p_i, and the bin's total accuracy, a_i.
    We then compute the ith calibration error of the bin, |p_i - a_i|.
    The final returned value is the weighted average of calibration errors of each bin.

    :param predicted_probabilities: Predicted probabilities, float array of shape (num_samples, num_classes)
    :param labels: True labels, int tensor of shape (num_samples,) with each entry in {0, ..., num_classes - 1}
    :param n_bins: Number of bins for histogram binning
    :return: ECE score as a float
    """
    num_samples, num_classes = predicted_probabilities.shape

    # Predictions are the classes with highest probability
    predictions = np.argmax(predicted_probabilities, axis=1)
    prediction_confidences = predicted_probabilities[range(num_samples), predictions]

    # Use uniform bins on the range of probabilities, i.e. closed interval [0.,1.]
    bin_upper_edges = np.histogram_bin_edges([], bins=n_bins, range=(0., 1.))
    bin_upper_edges = bin_upper_edges[1:]  # bin_upper_edges[0] = 0.

    probs_as_bin_num = np.digitize(prediction_confidences, bin_upper_edges)
    sums_per_bin = np.bincount(probs_as_bin_num, minlength=n_bins, weights=prediction_confidences)
    sums_per_bin = sums_per_bin.astype(np.float32)

    total_per_bin = np.bincount(probs_as_bin_num, minlength=n_bins) \
        + np.finfo(sums_per_bin.dtype).eps  # division by zero
    avg_prob_per_bin = sums_per_bin / total_per_bin

    onehot_labels = np.eye(num_classes)[labels]
    accuracies = onehot_labels[range(num_samples), predictions]  # accuracies[i] is 0 or 1
    accuracies_per_bin = np.bincount(probs_as_bin_num, weights=accuracies, minlength=n_bins) / total_per_bin

    prob_of_being_in_a_bin = total_per_bin / float(num_samples)

    ece_ret = np.abs(accuracies_per_bin - avg_prob_per_bin) * prob_of_being_in_a_bin
    ece_ret = np.sum(ece_ret)
    return float(ece_ret)


class ParameterDistribution(torch.nn.Module, metaclass=abc.ABCMeta):
    """
    Abstract class that models a distribution over model parameters,
    usable for Bayes by backprop.
    You can implement this class using any distribution you want
    and try out different priors and variational posteriors.
    All torch.nn.Parameter that you add in the __init__ method of this class
    will automatically be registered and know to PyTorch.
    """

    def __init__(self):
        super().__init__()

    @abc.abstractmethod
    def log_likelihood(self, values: torch.Tensor) -> torch.Tensor:
        """
        Calculate the log-likelihood of the given values
        :param values: Values to calculate the log-likelihood on
        :return: Log-likelihood
        """
        pass

    @abc.abstractmethod
    def sample(self) -> torch.Tensor:
        """
        Sample from this distribution.
        Note that you only need to implement this method for variational posteriors, not priors.

        :return: Sample from this distribution. The sample shape depends on your semantics.
        """
        pass

    def forward(self, values: torch.Tensor) -> torch.Tensor:
        # DO NOT USE THIS METHOD
        # We only implement it since torch.nn.Module requires a forward method
        warnings.warn('ParameterDistribution should not be called! Use its explicit methods!')
        return self.log_likelihood(values)


def draw_reliability_diagram(out,
                            title="Reliability Diagram", 
                            xlabel="Confidence", 
                            ylabel="Accuracy"):
    """Draws a reliability diagram into a subplot."""
    fig,ax = plt.subplots()
    plt.tight_layout()
    accuracies =  out['calib_accuracy']
    confidences = out['calib_confidence']
    counts = out['p']
    bins = out['bins']

    bin_size = 1.0 / len(counts)
    positions = bins[:-1] + bin_size/2.0

    widths = bin_size
    alphas = 0.3

    colors = np.zeros((len(counts), 4))
    colors[:, 0] = 240 / 255.
    colors[:, 1] = 60 / 255.
    colors[:, 2] = 60 / 255.
    colors[:, 3] = alphas

    gap_plt = ax.bar(positions, np.abs(accuracies - confidences), 
                     bottom=np.minimum(accuracies, confidences), width=widths,
                     edgecolor=colors, color=colors, linewidth=1, label="Gap")

    acc_plt = ax.bar(positions, 0, bottom=accuracies, width=widths,
                     edgecolor="black", color="black", alpha=1.0, linewidth=3,
                     label="Accuracy")

    ax.set_aspect("equal")
    ax.plot([0,1], [0,1], linestyle = "--", color="gray")
    

    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)\
    
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.legend(handles=[gap_plt, acc_plt])
    return fig


def draw_confidence_histogram(out, 
                                draw_averages=True,
                                title="Confidence Diagram",
                                xlabel="Confidence",
                                ylabel="Count"):
    """Draws a confidence histogram into a subplot."""

    fig,ax = plt.subplots()
    zs = out['p']
    bins = out['bins']
    bin_lowers = bins[:-1]
    bin_uppers = bins[1:]
    bin_middles = (bin_lowers + bin_uppers)/2

    bin_size = 1.0 / len(zs)

    ax.bar(bin_middles, zs, width=bin_size * 0.9)
   
    ax.set_xlim(0, 1)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    return fig


    


