<a href="https://colab.research.google.com/github/mku813/CAU_paper/blob/main/Parameter_Uncertainty.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://xuwd11.github.io/Dropout_Tutorial_in_PyTorch/

## 3.Dropout Implementation

In [None]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import time
import h5py
from scipy.ndimage.interpolation import rotate

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import matplotlib.gridspec as gridspec

import seaborn as sns
%matplotlib inline

import torch
import torchvision
from torchvision import datasets
from torchvision import transforms
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data.sampler import SubsetRandomSampler

import pymc3 as pm

## 2.Dropout Implementation

*   drop된 뉴런에 대해 보상하기 위해서 multiplier를 곱해준다.
*   첫 번째 example에서만 `MyDropout`을 사용할 것이고, 그 이후부터는 `nn.Dropout`을 사용한다.

In [None]:
class MyDropout(nn.Module):
  def __init__(self, p=0.5):
    super(MyDropout, self).__init__()
    self.p = p

    # multiplier is 1/(1-p)
    # Set multiplier to 0 when p = 1 to avoid error
    if self.p < 1:
      self.multiplier_ = 1.0 / (1.0 - p)
    else:
      self.multiplier_ = 0.0
    
  def forward(self, input):
    # if model.eval(), don't apply dropout
    if not self.training:
      return input
    
    # So that we have `input.shape` numbers of Bernoulli(1-p) samples
    # --> input의 데이터 사이즈를 고려하여 베르누이(1-p)의 샘플을 만들기 위한 과정
    selected_ = torch.Tensor(input.shape).uniform_(0, 1) > self.p
    
    # To suppert both CPU and GPU
    if input.is_cuda:
      selected_ = Variable(selected_.type(torch.cuda.FloatTensor), requires_grad = False)
    else:
      selected_ = Variable(selected_.type(torch.FloatTensor), requires_grad = False)

    # Multiply output by multiplier as described in the paper [1]
    return torch.mul(selected_, input)*self.multiplier_

## 3.Dropout as Regularization

* 딥뉴럴넷에서 regularization으로 작동하는 dropout을 보려고 한다.(L1, L2와 같은)
* Dropout이 작동하는 것을 확인하기 위해 fully connected network인 multilayer perceptron을 만들고, 그 후에 dropout이 다른 네트워크 아키텍쳐에서도 잘 작동하는지 확인하기 위해 LeNet(CNN 계열)을 만들 것이다.
* MNIST를 사용할 것이다.



In [None]:
# Normalize data with mean=(0,0,0), std=(1,1,1)
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    transforms.Normalize((0,), (1,))
])

path = './drive/MyDrive/paper/data/'

In [None]:
trainset = datasets.MNIST(root=path,   # root directory of dataset where `MNIST/processed/[training.pt | test.pt]`
                          train=True,     # if True, creates dataset from training.pt, otherwise from test.pt
                          download=True,  # if True, downloads from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again.
                          transform=transform   # A function/transform that takes in an PIL image and returns a tranformed version. E.g, `transforms.RandomCrop`
                          # target_tranform=    # A function/transform that takes in the target and transforms it.
                          )
testset = datasets.MNIST(root=path, train=False, download=True, transform=transform)

In [None]:
print(trainset)
print(testset)

In [None]:
# Visualize 10 images samples in MNIST dataset
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
data_iter = iter(trainloader)
images, labels = data_iter.next()

# plot 10 sample images
_, ax = plt.subplots(1, 10)
ax = ax.flatten()
iml = images[0].numpy().shape[1]
[ax[i].imshow(np.transpose(images[i].numpy(), (1,2,0)).reshape(iml, -1), cmap='Greys') for i in range(10)]
[ax[i].set_axis_off() for i in range(10)]
plt.show()

print('lable:', labels[:10].numpy())
print('image data shape:', images[0].numpy().shape)

### 4.1 Multilayer Perceptron

* 기본적인 MLP(MultiLayer Perceptron)을 만들고자 한다.
  * hidden layer: 2개
  * hidden unit: 800개

<br>

* sklearn과 비슷한 classifier를 만들어보고자 한다.

<br>

* 논문에 보면 3가지 실험이 있다.
  * no dropout
  * dropout(0.5)
  * dropout(0.5), input(0.2)
* 시간이 오래 걸리므로 GPU로 하는 것을 권장함.

<br>

* plotting


In [None]:
class MLP(nn.Module):
  def __init__(self, hidden_layers=[800, 800], dropRates=[0,0]):
    super(MLP, self).__init__()
    self.model = nn.Sequential()
    self.model.add_module("dropout0", MyDropout(p=dropRates[0]))
    self.model.add_module("input", nn.Linear(28*28, hidden_layers[0]))
    self.model.add_module("tanh", nn.Tanh())

    # Add hidden layer
    for i, d in enumerate(hidden_layers[:-1]):
      self.model.add_module("dropout_hidden"+str(i+1), MyDropout(p=dropRates[1]))
      self.model.add_module("hidden"+str(i+1), nn.Linear(hidden_layers[i], hidden_layers[i+1]))
      self.model.add_module("tanh_hidden"+str(i+1), nn.Tanh())
    self.model.add_module("final", nn.Linear(hidden_layers[-1], 10))

  def forward(self, x):
    # Trun to 1-Dimension
    x = x.view(x.shape[0], 28*28)
    x = self.model(x)
    return x

In [None]:
class MLPClassifier:
  def __init__(self, hidden_layer=[800, 800], dropRates=[0,0], batch_size=128, max_epoch=10, lr=0.1, momentum=0):
    # Wrap MLP model
    self.hidden_layer = hidden_layer
    self.dropRates = dropRates
    self.batch_size = batch_size
    self.max_epoch = max_epoch

    self.model = MLP(hidden_layers=hidden_layers, dropRates=dropRates)
    self.model.cuda()
    self.criterion = nn.CrossEntropyLoss().cuda()
    self.optimizer = optim.SGD(self.model.parameters(), lr=lr, momentum=momentum)

    self.loss_ = []
    self.test_acc = []
    self.test_err = []
  
  def fit(self, trainset, testset, verbose=True):
    # GPU 확인하고 진행하길,, 안그러면 매우 느릴수 있음
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=self.batch_size, shuffle=True)
    testloader = torch.utils.data.DataLoader(testset, batch_size=len(testset), shuffle=False)
    X_test, y_test = iter(testloader).next()
    X_test = X_test.cuda()

    for epoch in range(self.max_epoch):
      running_loss = 0
      for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs, labels = Variable(inputs).cuda(), Variable(labels).cuda()
        self.optimizer.zero_grad()
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        loss.backward()
        self.optimizer.step()
        running_loss += loss.data.cpu().numpy()
      self.loss_.append(running_loss / len(trainloader))

      if verbose:
        print('Epoch {} loss: {}'.format(epoch+1, self.loss_[-1]))
      
      y_test_pred = self.predict(X_test).cpu()
      tmp_test_acc = y_test == y_test_pred
      self.test_acc.append(np.mean(tmp_test_acc.data.cpu().numpy()))
      # self.test_acc.append(np.mean(y_test == y_test_pred))
      self.test_err.append(int(len(testset)*(1-self.test_acc[-1])))

      if verbose:
        print('Test error: {}; test acc: {}'.format(self.test_err[-1], self.test_acc[-1]))
    
    return self

  def predict(self, x):
    # Used to keep all test errors after each epoch
    model = self.model.eval()
    outputs = model(Variable(x))
    _, pred = torch.max(outputs.data, 1)
    model = self.model.train()
    return pred
  
  def __str__(self):
    return 'Hidden layers: {}; dropout rate: {}'.format(self.hidden_layers, self.dropRates)

In [None]:
hidden_layers = [800, 800]

### Below is training code, uncomment to train your own model... ###
### Note: You need GPU to run this section ###

# Define networks
mlp1 = [MLPClassifier(hidden_layers, dropRates=[0, 0], max_epoch=300), 
        MLPClassifier(hidden_layers, dropRates=[0, 0.5], max_epoch=300),
        MLPClassifier(hidden_layers, dropRates=[0.2, 0.5], max_epoch=300)]
        
# Training, set verbose=True to see loss after each epoch.
[mlp.fit(trainset, testset,verbose=True) for mlp in mlp1]

In [None]:
# Save torch models
for ind, mlp in enumerate(mlp1):
    torch.save(mlp.model, './drive/MyDrive/paper/mnist_mlp1_'+str(ind)+'.pth')
    # Prepare to save errors
    mlp.test_error = list(map(str, mlp.test_err))

# Save test errors to plot figures
open("./drive/MyDrive/paper/mlp1_test_errors.txt","w").write('\n'.join([','.join(mlp.test_error) for mlp in mlp1])) 

In [None]:
# Load saved models to CPU
mlp1_models = [torch.load('./drive/MyDrive/paper/mnist_mlp1_'+str(ind)+'.pth',map_location={'cuda:0': 'cpu'}) for ind in [0,1,2]]

# Load saved test errors to plot figures.
mlp1_test_errors = [error_array.split(',') for error_array in open("./drive/MyDrive/paper/mlp1_test_errors.txt","r").read().split('\n')]
mlp1_test_errors = np.array(mlp1_test_errors,dtype='f')

In [None]:
label = ['MLP no dropout',
         'MLP 50% dropout in hidden layers',
         'MLP 50% dropout in hidden layers + 20% in input layer']

plt.figure(figsize=(8,7))
for i, r in enumerate(mlp1_test_errors):
  plt.plot(range(1, len(r)+1), r, '.-', label=labels[i], alpha=0.6)
# plt.ylim([50,250])
plt.legend(loc=1)
plt.xlabel('Epochs')
plt.ylabel('Number of errors in test set')
plt.title('Test Error on MNIST dataset for MLP')
plt.show()

### 4.2 CNN with LeNet

* LeNet에 대한 간략한 소개 후 Dropout이 test 성능에 더 좋은 결과를 가져온다는 것을 보여줄것이다.


- 일단은 conv layer와 pooling layer를 통과한 후 이미지의 dimension을 계산해야한다.
- pytorch의 linear layer 때문에 해야함

In [None]:
def caloutdim(hin, kernel_size, stride=1, padding=0, dilation=1):
    return int(np.floor((hin+2*padding-dilation*(kernel_size-1)-1)/stride+1))

d = [28]
d.append(caloutdim(d[-1], 5, padding=2))
d.append(caloutdim(d[-1], 2, 2))
d.append(caloutdim(d[-1], 5, padding=2))
d.append(caloutdim(d[-1], 2, 2))
print(d)

- 아래의 코드는 LeNet의 코드이다.
- 여기에 `nn.Dropout2d`를 사용했다.
- 이것은 `MyDropout`과 같은 것이다.
  - 2차원인 것 빼고
  - 효율은 더 좋다.

In [None]:
class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return x

class LeNet(nn.Module):
    def __init__(self, droprate=0.5):
        super(LeNet, self).__init__()
        self.model = nn.Sequential()
        self.model.add_module('conv1', nn.Conv2d(1, 20, kernel_size=5, padding=2))
        self.model.add_module('dropout1', nn.Dropout2d(p=droprate))
        self.model.add_module('maxpool1', nn.MaxPool2d(2, stride=2))
        self.model.add_module('conv2', nn.Conv2d(20, 50, kernel_size=5, padding=2))
        self.model.add_module('dropout2', nn.Dropout2d(p=droprate))
        self.model.add_module('maxpool2', nn.MaxPool2d(2, stride=2))
        self.model.add_module('flatten', Flatten())
        self.model.add_module('dense3', nn.Linear(50*7*7, 500))
        self.model.add_module('relu3', nn.ReLU())
        self.model.add_module('dropout3', nn.Dropout(p=droprate))
        self.model.add_module('final', nn.Linear(500, 10))
        
    def forward(self, x):
        return self.model(x)

- 위의 내용과 비슷하지만 sklearn을 만든 LeNet Classifier다.

In [None]:
class LeNetClassifier:
    def __init__(self, droprate=0.5, batch_size=128, max_epoch=300, lr=0.01):
        self.batch_size = batch_size
        self.max_epoch = max_epoch
        self.lr = lr
        self.model = LeNet(droprate)
        self.model.cuda()
        self.criterion = nn.CrossEntropyLoss().cuda()
        self.optimizer = optim.SGD(self.model.parameters(), lr=lr)
        self.loss_ = []
        self.test_error = []
        self.test_accuracy = []
        
    def fit(self, trainset, testset, verbose=True):
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=self.batch_size, shuffle=True)
        testloader = torch.utils.data.DataLoader(testset, batch_size=len(testset), shuffle=False)
        X_test, y_test = iter(testloader).next()
        X_test = X_test.cuda()
        print(self.model)
        for epoch in range(self.max_epoch):
            running_loss = 0
            for i, data in enumerate(trainloader, 0):
                inputs, labels = data
                inputs, labels = Variable(inputs).cuda(), Variable(labels).cuda()
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.data.cpu().numpy()
            self.loss_.append(running_loss / len(trainloader))
            if verbose:
                print('Epoch {} loss: {}'.format(epoch+1, self.loss_[-1]))
            y_test_pred = self.predict(X_test).cpu()

            tmp_test_accuracy = y_test == y_test_pred
            self.test_accuracy.append(np.mean(tmp_test_accuracy.data.cpu().numpy()))
            # self.test_accuracy.append(np.mean(y_test == y_test_pred))
            self.test_error.append(int(len(testset)*(1-self.test_accuracy[-1])))
            if verbose:
                print('Test error: {}; test accuracy: {}'.format(self.test_error[-1], self.test_accuracy[-1]))
        return self
    
    def predict(self, x):
        model = self.model.eval()
        outputs = model(Variable(x))
        _, pred = torch.max(outputs.data, 1)
        model = self.model.train()
        return pred

- 아래 코드는 training code이고 학습시킨 모델을 로딩할 것이다.
- training 시간이 오래걸린다.

In [None]:
### Below is training code, uncomment to train your own model... ###
### Note: You need GPU and CUDA to run this section ###

# Define networks
lenet1 = [LeNetClassifier(droprate=0, max_epoch=300),
          LeNetClassifier(droprate=0.5, max_epoch=300)]
        
# Training, set verbose=True to see loss after each epoch.
[lenet.fit(trainset, testset,verbose=True) for lenet in lenet1]

In [None]:
# Save torch models
for ind, lenet in enumerate(lenet1):
    torch.save(lenet.model, './drive/MyDrive/paper/mnist_lenet1_'+str(ind)+'.pth')
    # Prepare to save errors
    lenet.test_error = list(map(str, lenet.test_error))

# Save test errors to plot figures
open("./drive/MyDrive/paper/lenet1_test_errors.txt","w").write('\n'.join([','.join(lenet.test_error) for lenet in lenet1])) 

In [None]:
# Load saved models to CPU
lenet1_models = [torch.load('./drive/MyDrive/paper/mnist_lenet1_'+str(ind)+'.pth', map_location={'cuda:0': 'cpu'}) for ind in [0,1]]

# Load saved test errors to plot figures.
lenet1_test_errors = [error_array.split(',') for error_array in 
                      open("./drive/MyDrive/paper/lenet1_test_errors.txt","r").read().split('\n')]
lenet1_test_errors = np.array(lenet1_test_errors,dtype='f')

- 학습 후 

In [None]:
labels = ['MLP no dropout', 
          'MLP 50% dropout in hidden layers', 
          'MLP 50% dropout in hidden layers + 20% in input layer',
          'LeNet no dropout',
          'LeNet 50% dropout']

plt.figure(figsize=(8, 7))
for i, r in enumerate(mlp1_test_errors.tolist() + lenet1_test_errors.tolist()):
    plt.plot(range(1, len(r)+1), r, '.-', label=labels[i], alpha=0.6);
plt.ylim([50, 250]);
plt.legend(loc=1);
plt.xlabel('Epochs');
plt.ylabel('Number of errors in test set');
plt.title('Test Error on MNIST Dataset for All Networks')
plt.show()

#### get parameter

모델의 각 parameter들을 구할 수 있음

In [None]:
for name, param in lenet1_models[0].named_parameters():
  if param.requires_grad:
    print(name, param.data.shape)

In [None]:
for name, param in lenet1_models[1].named_parameters():
  if param.requires_grad:
    print(name, param.data.shape)

## 5.Dropout as Bayesian Approximation

### 5.1 Dropout as Bayesian Approximation in Classification Task

- 이제 두 모델(MLP와 LeNet)에서 dropout을 사용하여 모델 불확실성을 구하는지 진행해 볼 것이다.
- 일단 숫자 1을 조금씩 돌려가며 12개의 이미지를 만든다.

In [None]:
testloader = torch.utils.data.DataLoader(testset, batch_size=len(testset), shuffle=False)
X_test, y_test = iter(testloader).next()
X_test = X_test.numpy()
X1 = np.array([rotate(X_test[9978].squeeze(), i, reshape=False) for i in range(50, 130, 7)])
X1 = X1.reshape(X1.shape[0], 1, X1.shape[1], X1.shape[2])

plt.figure(figsize=(8, 1))

gs = gridspec.GridSpec(1, 12)
gs.update(wspace=0, hspace=0)

for i in range(len(X1)):
    plt.subplot(gs[i])
    plt.imshow(X1.squeeze()[i], cmap='gray');
    plt.axis('off');

- 각각의 모델들에 대해 1000번의 시뮬레이션을 진행했고, softmax의 input과 output의 분포를 가장 정답이라고 생각하는 클래스에 대해 시각화

In [None]:
def predict_class(model, X):
    model = model.eval()
    outputs = model(Variable(X))
    print('outputs.data: \n{}'.format(outputs.data))
    _, pred = torch.max(outputs.data, 1)
    print('_, pred: \n{}, {}'.format(_, pred))
    model = model.train()
    return pred.numpy()

def predict(model, X, T=1000):
    standard_pred = predict_class(model, X)
    y1 = []
    y2 = []
    for _ in range(T):
        _y1 = model(Variable(X))
        _y2 = F.softmax(_y1, dim=1)
        y1.append(_y1.data.numpy())
        y2.append(_y2.data.numpy())
    return standard_pred, np.array(y1), np.array(y2)    # return pred, softmax input, softmax output

#### 5.1.1 MLP 50% dropout in hidden layers + 20% in input layer

- dropout을 제목과 같이 적용하였다
  - 일단은 숫자 `1`만 보도록 하자(12개의 rotate된 숫자 1)

In [None]:
model = mlp1_models[2]

# Need to flatten X1 before feeding into MLP
y1_pred, y1_si, y1_so = predict(model, torch.from_numpy(X1.reshape(-1,784)))    # si: softmax input, so:softmax output
print('Predictions: {}'.format(y1_pred))

- 위에서 나온 class의 분포를 softmax input 값과 output값들에 대하여 그려놓은 것이다.
- 

In [None]:
plt.figure(figsize=(10, 3))
plt.subplot(1, 2, 1)
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 1].flatten(), \
            color='g', marker='_', linewidth=None, alpha=0.2, label='1');
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 7].flatten(), \
            color='r', marker='_', linewidth=None, alpha=0.2, label='7');
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 3].flatten(), \
            color='b', marker='_', linewidth=None, alpha=0.2, label='3');
plt.title('Softmax input scatter');
plt.legend(framealpha=0.7);

plt.subplot(1, 2, 2)
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 1].flatten(), \
            color='g', marker='_', linewidth=None, alpha=0.2, label='1');
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 7].flatten(), \
            color='r', marker='_', linewidth=None, alpha=0.2, label='7');
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 3].flatten(), \
            color='b', marker='_', linewidth=None, alpha=0.2, label='3');
plt.title('Softmax output scatter');
plt.legend(framealpha=0.7);

plt.tight_layout();

In [None]:
model = lenet1_models[1]
y1_pred, y1_si, y1_so = predict(model, torch.from_numpy(X1))
print('Predictions: {}'.format(y1_pred))

In [None]:
model = lenet1_models[1]
y1_pred, y1_si, y1_so = predict(model, torch.from_numpy(X1))
print('Predictions: {}'.format(y1_pred))

In [None]:
plt.figure(figsize=(10, 3))
plt.subplot(1, 2, 1)
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 1].flatten(), \
            color='g', marker='_', linewidth=None, alpha=0.2, label='1');
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 7].flatten(), \
            color='r', marker='_', linewidth=None, alpha=0.2, label='7');
plt.scatter(np.tile(np.arange(1, 13), y1_si.shape[0]), y1_si[:, :, 3].flatten(), \
            color='b', marker='_', linewidth=None, alpha=0.2, label='3');
plt.title('Softmax input scatter');
plt.legend(framealpha=0.7);

plt.subplot(1, 2, 2)
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 1].flatten(), \
            color='g', marker='_', linewidth=None, alpha=0.2, label='1');
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 7].flatten(), \
            color='r', marker='_', linewidth=None, alpha=0.2, label='7');
plt.scatter(np.tile(np.arange(1, 13), y1_so.shape[0]), y1_so[:, :, 3].flatten(), \
            color='b', marker='_', linewidth=None, alpha=0.2, label='3');
plt.title('Softmax output scatter');
plt.legend(framealpha=0.7);

plt.tight_layout();