<a href="https://colab.research.google.com/github/mancinimassimiliano/DeepLearningLab/blob/master/Lab3/batch_normalization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

### Batch Normalization from scratch

In this lab session we are going to use *Batch Normalization* (BN) layers in our network. BN standardizes each feature in a mini-batch, to have 0 mean and unit variance, and then scale and shifts the standardized activations with learnable parameters. BN is known for faster convergence properties and also for improving the performance. More details about BN can be found in the original [paper](https://arxiv.org/abs/1502.03167).

$BN(x_{i, k}) = \gamma_{k} \frac{x_{i, k} - \mu_{B, k}}{\sqrt{\sigma^{2}_{B,k} + \epsilon}} + \beta_{k}$

The intuitive idea behind BN is as follows: a neural network is trained using mini-batches. The distribution of inputs varies from one batch to the other. Difference in distributions between mini-batches can cause the training to be unstable and heavily dependant on the initial weights of the network. Therefore, this kind of transformation (transforming the inputs to be mean 0 and unit variance) guarantees that input distribution of each layer remains unchanged across mini-batches.

More interestingly, we will learn how to code BN layer from scratch using PyTorch.

In [0]:
# import libraries
import torch
import torchvision
import torchvision.transforms as T
import torch.nn.functional as F

# Library needed for visualization purposes
from tensorboardcolab import TensorBoardColab

# Instantiate visualizer
tb = TensorBoardColab(graph_path='./log')

### BatchNorm1d

BN module for fully-connected hidden layers.

In [0]:
'''
Applies Batch Normalization over a 1D input (or 2D tensor)

Shape:
  Input: (N, C)
  Output: (N, C)

Input Parameters:
  in_features: number of features of the input activations
  track_running_stats: whether to keep track of running mean and std. (default: True)
  affine: whether to scale and shift the normalized activations. (default: True)
  momentum: the momentum value for the moving average. (default: 0.9)

Usage:
  >>> # with learable parameters
  >>> bn = BatchNorm1d(4)
  >>> # without learable parameters
  >>> bn = BatchNorm1d(4, affine=False)
  >>> input = torch.rand(10, 4)
  >>> out = bn(input)
'''

class BatchNorm1d(torch.nn.Module):
  def __init__(self, in_features, track_running_stats=True, affine=True, momentum=0.9):
    super(BatchNorm1d, self).__init__()
    
    self.in_features = in_features
    self.track_running_stats = track_running_stats
    self.affine = affine
    self.momentum = momentum
    
    if self.affine:
      # TODO declare scale param
      # TODO declare shift param
    
    if self.track_running_stats:
      # TODO declare running mean
      # TODO declare running std
  
  def forward(self, x):
    
    # transpose (N, C) to (C, N)
    x = x.transpose(0, 1).contiguous().view(x.shape[1], -1)
    
    # calculate batch mean
    # TODO
    
    # calculate batch std
    # TODO
    
    # keep running statistics (moving average of mean and std)
    if self.track_running_stats:
      # TODO
      # TODO
    
    # during inference time
    if not self.training and self.track_running_stats:
      # TODO
      # TODO
    
    # normalize the input activations
    # TODO
    
    # scale and shift the normalized activations
    if self.affine:
      # TODO
    
    return x.transpose(0, 1)
    

### BatchNorm2d

BN module for convolutional layers

In [0]:
'''
Applies Batch Normalization over a 2D or 3D input (4D tensor)

Shape:
  Input: (N, C, H, W)
  Output: (N, C, H, W)

Input Parameters:
  in_features: number of features of the input activations
  track_running_stats: whether to keep track of running mean and std. (default: True)
  affine: whether to scale and shift the normalized activations. (default: True)
  momentum: the momentum value for the moving average. (default: 0.9)

Usage:
  >>> # with learable parameters
  >>> bn = BatchNorm2d(4)
  >>> # without learable parameters
  >>> bn = BatchNorm2d(4, affine=False)
  >>> input = torch.rand(10, 4, 5, 5)
  >>> out = bn(input)
'''

class BatchNorm2d(torch.nn.Module):
  def __init__(self, in_features, track_running_stats=True, affine=True, momentum=0.9):
    super(BatchNorm2d, self).__init__()
    
    self.in_features = in_features
    self.track_running_stats = track_running_stats
    self.affine = affine
    self.momentum = momentum
    
    if self.affine:
      # TODO declare scale param
      # TODO declare shift param
    
    if self.track_running_stats:
      # TODO declare running mean
      # TODO declare running std
  
  def forward(self, x):
    
    # transpose (N, C, H, W) to (C, N, H, W)
    x = x.transpose(0, 1)
    
    # store the shape
    c, bs, h, w = x.shape
    
    # collapse all dimensions except the 'channel' dimension
    # TODO
    
    # calculate batch mean
    # TODO
    
    # calculate batch std
    # TODO
    
    # keep running statistics (moving average of mean and std)
    if self.track_running_stats:
      # TODO
      # TODO
    
    # during inference time
    if not self.training and self.track_running_stats:
      # TODO
      # TODO
    
    # normalize the input activations
    # TODO
    
    # scale and shift the normalized activations
    if self.affine:
      # TODO
    
    return x.view(c, bs, h, w).transpose(0, 1)
    

### Lenet-5 with Batch Norm

We will be using BN layers for LeNet-5. BN layers are added right after the conv and fully-connected layers, except the output layer.

In [0]:
class LeNet(torch.nn.Module):
  def __init__(self, norm=False):
    super(LeNet, self).__init__()
    self.norm = norm
    
    # input channel = 3, output channels = 6, kernel size = 5
    # input image size = (32, 32), image output size = (28, 28)
    self.conv1 = torch.nn.Conv2d(in_channels=3, out_channels=6, kernel_size=(5, 5))
    if self.norm:
      # TODO
    
    # input channel = 6, output channels = 16, kernel size = 5
    # input image size = (14, 14), output image size = (10, 10)
    self.conv2 = torch.nn.Conv2d(in_channels=6, out_channels=16, kernel_size=(5, 5))
    if self.norm:
      # TODO
    
    # input dim = 5 * 5 * 16 ( H x W x C), output dim = 120
    self.fc3 = torch.nn.Linear(in_features=5 * 5 * 16, out_features=120)
    if self.norm:
      # TODO
    
    # input dim = 120, output dim = 84
    self.fc4 = torch.nn.Linear(in_features=120, out_features=84)
    if self.norm:
      # TODO
    
    # input dim = 84, output dim = 10
    self.fc5 = torch.nn.Linear(in_features=84, out_features=10)
    
  def forward(self, x):
    
    x = self.conv1(x)
    if self.norm:
      # TODO
    x = F.relu(x)
    # Max Pooling with kernel size = 2
    # output size = (14, 14)
    x = F.max_pool2d(x, kernel_size=2)
    
    x = self.conv2(x)
    if self.norm:
      # TODO
    x = F.relu(x)
    # Max Pooling with kernel size = 2
    # output size = (5, 5)
    x = F.max_pool2d(x, kernel_size=2)
    
    # flatten the feature maps into a long vector
    x = x.view(x.shape[0], -1)
    
    x = self.fc3(x)
    if self.norm:
      # TODO
    x = F.relu(x)
    
    x = self.fc4(x)
    if self.norm:
      # TODO
    x = F.relu(x)
    
    x = self.fc5(x)
    
    return x

### Define cost function

In [0]:
def get_cost_function():
  cost_function = torch.nn.CrossEntropyLoss()
  return cost_function

### Define the optimizer

In [0]:
def get_optimizer(net, lr, wd, momentum):
  optimizer = torch.optim.SGD(net.parameters(), lr=lr, weight_decay=wd, momentum=momentum)
  return optimizer

### Train and test functions

In [0]:
def test(net, data_loader, cost_function, device='cuda:0'):
  samples = 0.
  cumulative_loss = 0.
  cumulative_accuracy = 0.

  # Strictly needed if network contains layers which has different behaviours between train and test
  net.eval()
  with torch.no_grad():
    for batch_idx, (inputs, targets) in enumerate(data_loader):
      # Load data into GPU
      inputs = inputs.to(device)
      targets = targets.to(device)
        
      # Forward pass
      outputs = net(inputs)

      # Apply the loss
      loss = cost_function(outputs, targets)

      # Better print something
      samples+=inputs.shape[0]
      cumulative_loss += loss.item() # Note: the .item() is needed to extract scalars from tensors
      _, predicted = outputs.max(1)
      cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss/samples, cumulative_accuracy/samples*100


def train(net,data_loader,optimizer,cost_function, device='cuda:0'):
  samples = 0.
  cumulative_loss = 0.
  cumulative_accuracy = 0.

  # Strictly needed if network contains layers which has different behaviours between train and test
  net.train()
  for batch_idx, (inputs, targets) in enumerate(data_loader):
    # Load data into GPU
    inputs = inputs.to(device)
    targets = targets.to(device)
      
    # Forward pass
    outputs = net(inputs)

    # Apply the loss
    loss = cost_function(outputs,targets)

    # Reset the optimizer
      
    # Backward pass
    loss.backward()
    
    # Update parameters
    optimizer.step()
    
    optimizer.zero_grad()

    # Better print something, no?
    samples+=inputs.shape[0]
    cumulative_loss += loss.item()
    _, predicted = outputs.max(1)
    cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss/samples, cumulative_accuracy/samples*100

### Define the function that fetches a data loader that is then used during iterative training.

In [0]:
def get_data(batch_size, test_batch_size=256, dataset='mnist'):
  
  # Prepare data transformations and then combine them sequentially
  if dataset == 'mnist':
    transform = list()
    transform.append(T.ToTensor())                                              # converts Numpy to Pytorch Tensor
    # TODO                                                                      # pad zeros to make MNIST 32 x 32
    transform.append(T.Lambda(lambda x: x.repeat(3, 1, 1)))                     # to make MNIST RGB instead of grayscale
    # TODO                                                                      # Normalizes the Tensors between [-1, 1]
    transform = T.Compose(transform)                                            # Composes the above transformations into one.
  elif dataset == 'svhn':
    transform = list()
    transform.append(T.ToTensor())                                              # converts Numpy to Pytorch Tensor
    # TODO                                                                      # Normalizes the Tensors between [-1, 1]
    transform = T.Compose(transform)                                            # Composes the above transformations into one.
  
  # Prepare dataset
  if dataset == 'mnist':  
    # Load data
    full_training_data = torchvision.datasets.MNIST('./data/mnist', train=True, transform=transform, download=True) 
    test_data = torchvision.datasets.MNIST('./data/mnist', train=False, transform=transform, download=True)
  elif dataset == 'svhn':
    full_training_data = torchvision.datasets.SVHN('./data/svhn', split='train', transform=transform, download=True) 
    test_data = torchvision.datasets.SVHN('./data/svhn', split='test', transform=transform, download=True)
  

  # Create train and validation splits
  num_samples = len(full_training_data)
  training_samples = int(num_samples * 0.8 + 1)
  validation_samples = num_samples - training_samples

  training_data, validation_data = torch.utils.data.random_split(full_training_data, [training_samples, validation_samples])

  # Initialize dataloaders
  train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True, drop_last=True)
  val_loader = torch.utils.data.DataLoader(validation_data, test_batch_size, shuffle=False)
  test_loader = torch.utils.data.DataLoader(test_data, test_batch_size, shuffle=False)
  
  return train_loader, val_loader, test_loader

### Wrapping everything up

Finally, we need a main function which initializes everything + the needed hyperparameters and loops over multiple epochs (printing the results).

In [0]:
'''
Input arguments
  batch_size: Size of a mini-batch
  device: GPU where you want to train your network
  weight_decay: Weight decay co-efficient for regularization of weights
  momentum: Momentum for SGD optimizer
  epochs: Number of epochs for training the network
  visualization_name: name of the tensorboard folder
  dataset: which dataset to train
  norm: whether to use batch normalization
'''

def main(batch_size=128, 
         device='cuda:0', 
         learning_rate=0.01, 
         weight_decay=0.000001, 
         momentum=0.9, 
         epochs=50, 
         visualization_name='mnist',
         dataset='mnist', 
         norm=False):
  
  train_loader, val_loader, test_loader = get_data(batch_size=batch_size, 
                                                   test_batch_size=batch_size, 
                                                   dataset=dataset)
  
  net = LeNet(norm=norm).to(device)
  
  optimizer = get_optimizer(net, learning_rate, weight_decay, momentum)
  
  cost_function = get_cost_function()

  print('Before training:')
  train_loss, train_accuracy = test(net, train_loader, cost_function)
  val_loss, val_accuracy = test(net, val_loader, cost_function)
  test_loss, test_accuracy = test(net, test_loader, cost_function)

  print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
  print('\t Validation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
  print('\t Test loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
  print('-----------------------------------------------------')
  
  # Add values to plots
  tb.save_value('Loss/train_loss', visualization_name, 0, train_loss)
  tb.save_value('Loss/val_loss', visualization_name, 0, val_loss)
  tb.save_value('Accuracy/train_accuracy', visualization_name, 0, train_accuracy)
  tb.save_value('Accuracy/val_accuracy', visualization_name, 0, val_accuracy)
  
  # Update plots 
  tb.flush_line(visualization_name)

  for e in range(epochs):
    train_loss, train_accuracy = train(net, train_loader, optimizer, cost_function)
    val_loss, val_accuracy = test(net, val_loader, cost_function)
    print('Epoch: {:d}'.format(e+1))
    print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
    print('\t Validation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
    print('-----------------------------------------------------')
    
    # Add values to plots
    tb.save_value('Loss/train_loss', visualization_name, e + 1, train_loss)
    tb.save_value('Loss/val_loss', visualization_name, e + 1, val_loss)
    tb.save_value('Accuracy/train_accuracy', visualization_name, e + 1, train_accuracy)
    tb.save_value('Accuracy/val_accuracy', visualization_name, e + 1, val_accuracy)
    
    # Update plots 
    tb.flush_line(visualization_name)

  print('After training:')
  train_loss, train_accuracy = test(net, train_loader, cost_function)
  val_loss, val_accuracy = test(net, val_loader, cost_function)
  test_loss, test_accuracy = test(net, test_loader, cost_function)

  print('\t Training loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
  print('\t Validation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
  print('\t Test loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
  print('-----------------------------------------------------')

Train on MNIST without BN layers

In [0]:
main(visualization_name='mnist', dataset='mnist')

Train on MNIST with BN layers

In [0]:
main(visualization_name='mnist_bn', dataset='mnist', norm=True)

Train on SVHN without BN layers

In [0]:
main(visualization_name='svhn', dataset='svhn')

Train on SVHN with BN layers

In [0]:
main(visualization_name='svhn_bn', dataset='svhn', norm=True)