# Regularization

# Data Loading

In [5]:
import numpy as np
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
import torchsummary
import torchvision
import matplotlib.pyplot as plt

In [6]:
import os
import urllib.request
from sh import gunzip
import numpy as np
from sklearn.model_selection import train_test_split
from mlxtend.data import loadlocal_mnist

class FashionMnistLoader:
    
    dir_name = "data/fashion"
    url_train_imgs = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz"
    url_train_labels = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz"
    url_test_imgs = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz"
    url_test_labels = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz"
    
    def __init__(self):
        self.train_imgs_fn = None
        self.train_labels_fn = None
        self.test_imgs_fn = None
        self.test_labels_fn = None
        
    def get_data(self, url):
        gz_file_name = url.split("/")[-1]
        gz_file_path = os.path.join(self.dir_name, gz_file_name)  
        file_name = gz_file_name.split(".")[0]
        file_path = os.path.join(self.dir_name, file_name)
        os.makedirs(self.dir_name, exist_ok=True)
        if not os.path.exists(file_path):
            urllib.request.urlretrieve(url, gz_file_path) 
            gunzip(gz_file_path)  
        return file_path
        
    def get_all_data(self):
        self.train_imgs_fn = self.get_data(self.url_train_imgs)
        self.train_labels_fn = self.get_data(self.url_train_labels)
        self.test_imgs_fn = self.get_data(self.url_test_imgs)
        self.test_labels_fn = self.get_data(self.url_test_labels)
        return self
    
    def load_train(self):
        X, y = loadlocal_mnist(
            images_path=self.train_imgs_fn, 
            labels_path=self.train_labels_fn)
        return X, y
    
    def load_test(self):
        X, y = loadlocal_mnist(
            images_path=self.test_imgs_fn, 
            labels_path=self.test_labels_fn)
        return X, y
    
    def _split(self, X, y, test_size):
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=666)
        return X_train, X_test, y_train, y_test
            
    def train_split(self, test_size):
        X, y = self.load_train()
        X_train, X_test, y_train, y_test = self._split(X, y, test_size)
        return X_train, X_test, y_train, y_test

    def standard_split(self):
        X_train, y_train = self.load_train()
        X_test, y_test = self.load_test()
        return X_train, X_test, y_train, y_test
    
    
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

transform = transforms.Compose(
    [transforms.ToPILImage(), transforms.ToTensor(), transforms.Normalize([0], [1])])

class FashionMnist(Dataset):
    
    def __init__(self, X, y, transform=None):
        self.data = (torch.from_numpy(X).float()/255).reshape(-1, 1, 28, 28).squeeze()
        self.target = torch.from_numpy(y).long()
        self.transform = transform
        
    def __len__(self):
        return len(self.target)
        
    def __getitem__(self, index):
        img, tar = self.data[index], self.target[index]
        if self.transform:
            img = self.transform(img)
        return img, tar
    
## Windows users
## Ctrl + / to uncommnet
# import os
#import urllib.request
#import gzip
#import numpy as np
#from sklearn.model_selection import train_test_split
#from mlxtend.data import loadlocal_mnist
#
# class FashionMnistLoader:
    
#     dir_name = "data/fashion"
#     url_train_imgs = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz"
#     url_train_labels = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz"
#     url_test_imgs = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz"
#     url_test_labels = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz"
    
#     def __init__(self):
#         self.train_imgs_fn = None
#         self.train_labels_fn = None
#         self.test_imgs_fn = None
#         self.test_labels_fn = None
        
#     def get_data(self, url):
#         gz_file_name = url.split("/")[-1]
#         gz_file_path = os.path.join(self.dir_name, gz_file_name)  
#         file_name = gz_file_name.split(".")[0]
#         file_path = os.path.join(self.dir_name, file_name)
#         os.makedirs(self.dir_name, exist_ok=True)
#         if not os.path.exists(file_path):
#             urllib.request.urlretrieve(url, gz_file_path) 
#         with gzip.open(gz_file_path) as data:
#             with open(file_path, 'wb') as out:
#                 out.write(data.read())
#         return file_path
        
#     def get_all_data(self):
#         self.train_imgs_fn = self.get_data(self.url_train_imgs)
#         self.train_labels_fn = self.get_data(self.url_train_labels)
#         self.test_imgs_fn = self.get_data(self.url_test_imgs)
#         self.test_labels_fn = self.get_data(self.url_test_labels)
#         return self
    
#     def load_train(self):
#         X, y = loadlocal_mnist(
#             images_path=self.train_imgs_fn, 
#             labels_path=self.train_labels_fn)
#         return X, y
    
#     def load_test(self):
#         X, y = loadlocal_mnist(
#             images_path=self.test_imgs_fn, 
#             labels_path=self.test_labels_fn)
#         return X, y
    
#     def _split(self, X, y, test_size):
#         X_train, X_test, y_train, y_test = train_test_split(
#             X, y, test_size=test_size, random_state=666)
#         return X_train, X_test, y_train, y_test
            
#     def train_split(self, test_size):
#         X, y = self.load_train()
#         X_train, X_test, y_train, y_test = self._split(X, y, test_size)
#         return X_train, X_test, y_train, y_test

#     def standard_split(self):
#         X_train, y_train = self.load_train()
#         X_test, y_test = self.load_test()
#         return X_train, X_test, y_train, y_test

    

In [16]:
classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 
           'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
num_classes = len(classes)

batch_size = 16

data_loader = FashionMnistLoader().get_all_data()

X_train_dev, X_test, y_train_dev, y_test = data_loader.standard_split()
X_train_dev.shape, X_test.shape, len(y_train_dev), len(y_test)

X_train, X_dev, y_train, y_dev = data_loader.train_split(1/6)
X_train.shape, X_dev.shape, len(y_train), len(y_dev)

train_dataset = FashionMnist(X_train, y_train, transform=transform)
dev_dataset = FashionMnist(X_dev, y_dev, transform=transform)


train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, 
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dev_dataset, batch_size=batch_size, shuffle=True)

In [12]:
input_dim = 28 * 28

class LinearNN(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 10)
        
    def forward(self, x):
        x = x.reshape(x.size(0), -1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x
    
model = LinearNN()
criterion = nn.CrossEntropyLoss()

learning_rate = 0.01
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [17]:
def train(n_epochs):
    
    loss_over_time = [] # to track the loss as the network trains
    
    for epoch in range(n_epochs):  # loop over the dataset multiple times
        
        running_loss = 0.0
        
        for batch_i, data in enumerate(train_loader):
            # get the input images and their corresponding labels
            inputs, labels = data

            # zero the parameter (weight) gradients
            optimizer.zero_grad()

            # forward pass to get outputs
            outputs = model(inputs)

            # calculate the loss
            loss = criterion(outputs, labels)

            # backward pass to calculate the parameter gradients
            loss.backward()

            # update the parameters
            optimizer.step()

            # print loss statistics
            # to convert loss into a scalar and add it to running_loss, we use .item()
            running_loss += loss.item()
            
            if batch_i % 1000 == 999:    # print every 1000 batches
                avg_loss = running_loss/1000
                # record and print the avg loss over the 1000 batches
                loss_over_time.append(avg_loss)
                print('Epoch: {}, Batch: {}, Avg. Loss: {}'.format(epoch + 1, batch_i+1, avg_loss))
                running_loss = 0.0

    print('Finished Training')
    return loss_over_time


In [18]:
n_epochs = 2 # start small to see if your model works, initially
model.train() # put model in train mode (important with, for example, dropouts, batch normalizations...)

training_loss = train(n_epochs) # call train and record the loss over time

Epoch: 1, Batch: 1000, Avg. Loss: 0.44059368059411647
Epoch: 1, Batch: 2000, Avg. Loss: 0.46890907338634136
Epoch: 1, Batch: 3000, Avg. Loss: 0.4440670166835189
Epoch: 2, Batch: 1000, Avg. Loss: 0.4460339671969414
Epoch: 2, Batch: 2000, Avg. Loss: 0.44574086439237
Epoch: 2, Batch: 3000, Avg. Loss: 0.4526415270343423
Finished Training


In [19]:
# initialize tensor and lists to monitor test loss and accuracy
test_loss = torch.zeros(1)
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))

# set the module to evaluation mode
model.eval()

for batch_i, data in enumerate(test_loader):
    
    # get the input images and their corresponding labels
    inputs, labels = data
    # forward pass to get outputs
    outputs = model(inputs)

    # calculate the loss
    loss = criterion(outputs, labels)
            
    # update average test loss 
    test_loss = test_loss + ((torch.ones(1) / (batch_i + 1)) * (loss.data - test_loss))
    
    # get the predicted class from the maximum value in the output-list of class scores
    _, predicted = torch.max(outputs.data, 1)
    
    # compare predictions to true label
    # this creates a `correct` Tensor that holds the number of correctly classified images in a batch
    correct = np.squeeze(predicted.eq(labels.data.view_as(predicted)))
    
    # calculate test accuracy for *each* object class
    # we get the scalar value of correct items for a class, by calling `correct[i].item()`
    for i in range(batch_size):
        label = labels.data[i]
        class_correct[label] += correct[i].item()
        class_total[label] += 1

print('Test Loss: {:.6f}\n'.format(test_loss.numpy()[0]))

Test Loss: 0.501134



In [20]:
for i in range(10):
    if class_total[i] > 0:
        print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
            classes[i], 100 * class_correct[i] / class_total[i],
            np.sum(class_correct[i]), np.sum(class_total[i])))
    else:
        print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

        
print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
    100. * np.sum(class_correct) / np.sum(class_total),
    np.sum(class_correct), np.sum(class_total)))

Test Accuracy of T-shirt/top: 82% (855/1034)
Test Accuracy of Trouser: 96% (966/1005)
Test Accuracy of Pullover: 73% (739/1001)
Test Accuracy of Dress: 80% (787/979)
Test Accuracy of  Coat: 90% (872/959)
Test Accuracy of Sandal: 85% (855/1004)
Test Accuracy of Shirt: 43% (443/1007)
Test Accuracy of Sneaker: 97% (983/1009)
Test Accuracy of   Bag: 89% (915/1027)
Test Accuracy of Ankle boot: 87% (856/975)

Test Accuracy (Overall): 82% (8271/10000)


# Regularization
The following abstract class implements a generic regularizer.

In [None]:
class _Regularizer(object):
    """
    Parent class of Regularizers
    """
    def __init__(self, model):
        super().__init__()
        self.model = model

    def regularized_param(self, param_weights, reg_loss_function):
        raise NotImplementedError

    def regularized_all_param(self, reg_loss_function):
        raise NotImplementedError

L1 Regularizer can be implemented as follows

In [None]:
class L1Regularizer(_Regularizer):
    """
    L1 regularized loss
    """
    def __init__(self, model, lambda_reg=0):
        super().__init__(model=model)
        self.lambda_reg = lambda_reg

    def regularized_param(self, param_weights, reg_loss_function):
        reg_loss_function += self.lambda_reg * self.__add_l1(var=param_weights)
        return reg_loss_function

    def regularized_all_param(self, reg_loss_function):
        for model_param_name, model_param_value in self.model.named_parameters():
            if model_param_name.endswith('weight'):
                reg_loss_function += self.lambda_reg * self.__add_l1(var=model_param_value)
        return reg_loss_function

    @staticmethod
    def __add_l1(var):
        return var.abs().sum()

The regularizer adds the regularization to the loss function.

In [None]:
reg_loss = L1Regularizer(model).regularized_all_param(loss)

Let's train the network 