In [None]:
#implementation is based on https://www.nitarshan.com/bayes-by-backprop/

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from torch.optim import lr_scheduler
import math
import random
import os
import sys

In [15]:
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname('__file__'))))
import dataloader
USE_GPU = 1
if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [25]:
train_loader, valid_loader, test_loader = dataloader.load_MNIST(100, 60000)

In [17]:
class Gaussian(object):
    def __init__(self, mu, rho):
        self.mu = mu
        self.rho = rho
    @property 
    def sigma(self):
        return torch.log1p(torch.exp(self.rho))
    def sample(self):
        epsilon = torch.distributions.Normal(0,1).sample(self.rho.size()).to(device)
        return self.mu + self.sigma* epsilon
    #lnN(x|mu,sigma)
    def prob(self, input):
        p = torch.exp(-((input - self.mu)**2)/(2 * self.sigma ** 2))/(torch.sqrt(2*torch.tensor(math.pi))*self.sigma)
        return p

    #lnP(w)
    def log_prob(self, input):
        return torch.log(self.prob(input))

In [18]:
mu = torch.tensor(3, dtype = torch.float)
rho = torch.tensor(2, dtype = torch.float)
a = Gaussian(mu, rho)
print(a.sample())
print(a.log_prob(3))

tensor(7.2020)
tensor(-1.6736)


In [19]:
class Scale_Mixture_Prior(object):
    def __init__(self, sigma1, sigma2, pi):
        self.sigma1 = sigma1
        self.sigma2 = sigma2
        self.gaussian1 = torch.distributions.Normal(0,sigma1)
        self.gaussian2 = torch.distributions.Normal(0,sigma2)
        self.pi = pi
    def log_prob(self, input):
        prob1 = torch.exp(self.gaussian1.log_prob(input))
        prob2 = torch.exp(self.gaussian2.log_prob(input))
        return torch.log(self.pi * prob1 + (1-self.pi) * prob2)
    

In [20]:
a= [3,3]
print(torch.tensor(a))

tensor([3, 3])


In [21]:
class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super(BayesianLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-0.2, 0.2))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-5, -4))
        self.weight = Gaussian(self.weight_mu, self.weight_rho)
        
        self.bias_mu = nn.Parameter(torch.Tensor(out_features,).uniform_(-0.2, 0.2))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features,).uniform_(-5, -4))
        self.bias = Gaussian(self.bias_mu, self.bias_rho)
                   
        sigma1=torch.exp(torch.tensor(0.0))
        sigma2=torch.exp(torch.tensor(-6.0))
        pi = 0.5
            
        self.scale_mixture_prior = Scale_Mixture_Prior(sigma1, sigma2, pi)
        
        self.log_prior = 0
        self.log_posterior = 0
        
        
    def forward(self, input, sampling = False):
        if sampling == True:
            weight = self.weight.sample()
            bias = self.bias.sample()
        else:
            weight = self.weight_mu
            bias = self.bias_mu
        
        
        self.log_prior = torch.sum(self.scale_mixture_prior.log_prob(weight)) + torch.sum(self.scale_mixture_prior.log_prob(bias))
        self.log_posterior = torch.sum(self.weight.log_prob(weight)) + torch.sum(self.bias.log_prob(bias))

            
        return F.linear(input, weight, bias)
        
        
        
        
            

In [28]:
class BayesianNet(nn.Module):
    def __init__(self, input_size=28*28, hidden_unit=400,num_classes=10):
        super(BayesianNet, self).__init__()
        self.fc1 = BayesianLinear(input_size, hidden_unit)
        self.fc2 = BayesianLinear(hidden_unit, hidden_unit)
        self.fc3 = BayesianLinear(hidden_unit, num_classes)
        
        
    def get_log_prior(self):
        log_prior = self.fc1.log_prior + self.fc2.log_prior + self.fc3.log_prior
        #print('log_prior: {:.4f}'.format(self.fc1.log_prior))
        return log_prior
    
    def get_log_posterior(self):
        log_posterior = self.fc1.log_posterior + self.fc2.log_posterior + self.fc3.log_posterior
        #print('log_posterior: {:.4f}'.format(self.fc1.log_posterior))
        return log_posterior
    
    def forward(self, input, sampling = False):
        x = input.view(input.size(0), -1)
        x = F.relu(self.fc1(x, sampling))
        x = F.relu(self.fc2(x, sampling))
        x = F.log_softmax(self.fc3(x, sampling), dim=1)
        return x
    
    def get_loss(self, input, target, batch_num, num_sampling = 3):
        
        likelihood_loss = 0
        complexity_loss = 0
        for i in range(num_sampling):
            output = self.forward(input, sampling = True)
            log_prior = self.get_log_prior()
            log_posterior = self.get_log_posterior()
            complexity_loss += log_posterior - log_prior
            if USE_GPU:
                target = target.type(torch.cuda.LongTensor)
            else:
                target = target.type(torch.LongTensor)
                
            
            likelihood_loss += F.nll_loss(output, target, size_average=False)
        
        #print(complexity_loss)
        
        #print('likelihood_loss:{:.4f}'.format(likelihood_loss))
        loss = complexity_loss/batch_num + likelihood_loss
            
            
        return loss/num_sampling, output
        

In [29]:
print(train_loader.batch_size)

100


In [None]:
model = BayesianNet()
model.to(device)
params = [p.device for p in model.parameters() if p.device]

optimizer = optim.Adam(model.parameters(),lr=0.001)

num_train = 50000
num_val = 10000

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.3)
for epoch in range(50):
    model.train()
    
    train_correct, running_num = 0.0, 0.0
    for i, data in enumerate(train_loader):
        input, target = data
        input = input.to(device)
        target = target.to(device)
        optimizer.zero_grad()
        
        loss, output = model.get_loss(input, target, len(train_loader))
        loss.backward()
        optimizer.step()
        
        
        pred = torch.argmax(output, dim = 1)
        train_correct += torch.sum(pred == target)
        running_num += input.size(0)    
        
        for param_group in optimizer.param_groups:
            lr = param_group['lr']

        if i % 200 == 0:
            print('training_loss: {:.4f}, likelihood_loss = {:.4f} training_acc = {:.2f}, lr = {:.4f}'.format(loss, likelihood_loss, (train_correct*100/running_num), lr))
            
    #scheduler.step()
    
    model.eval()
    correct =0.0
    for i, data in enumerate(valid_loader):
        input, target = data
        input = input.to(device)
        target = target.to(device)
        output = model.forward(input)
        pred = torch.argmax(output, dim = 1)
        correct += torch.sum(pred == target)
    
    #correct = (float)correct
    #num_val = (float)num_val
    print('valid acc: {:.2f}'.format(correct*100/num_val))
    
    
    

training_loss: 4133.0742, likelihood_loss = 398.4716 training_acc = 4.00, lr = 0.0010
training_loss: 3329.0732, likelihood_loss = 398.4716 training_acc = 85.00, lr = 0.0010
training_loss: 3154.6277, likelihood_loss = 398.4716 training_acc = 89.00, lr = 0.0010
valid acc: 95.00
training_loss: 2991.3130, likelihood_loss = 398.4716 training_acc = 96.00, lr = 0.0010


In [None]:
a = [3,3]
print(torch.tensor(a, dtype = torch.long))