In [None]:
"""TinyImageNetClassification.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1VmBUEItsd22iMcTXxjGpKzPoaEnXlW5X
"""

In [41]:
# CSE 666 Assignment 1 : Tiny Imagenet Classification
# Name : Mitul Modi
# Person No : 50288649
# UBIT Name : mitulraj
# Contact : mitulraj@buffalo.edu

In [None]:
import os
import os.path
import csv
import time
import PIL
import math
from sklearn.model_selection import StratifiedShuffleSplit
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms, models
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

In [None]:
class TinyImageNetDataset(Dataset):
    """
        Customized Dataset class to load training and validation dataset.
        
        Code is adapted from code of pytorch sourcecode of DatasetFolder class.
        https://pytorch.org/docs/stable/_modules/torchvision/datasets/folder.html#DatasetFolder.
    """
    def __init__(self, data_dir, transform):      

        self.transform = transform
                        
        data_dir = os.path.expanduser(data_dir)
        classes = [d.name for d in os.scandir(data_dir) if d.is_dir()]
        classes.sort()
        class_to_idx = {classes[i]: i for i in range(len(classes))}        
        images=[]
        labels=[]
        
        for target in sorted(class_to_idx.keys()):
            d = os.path.join(data_dir, target, 'images')
            if not os.path.isdir(d):
                continue

            for root, _, fnames in sorted(os.walk(d)):
                for fname in sorted(fnames):
                    path = os.path.join(root, fname)
                    images.append(path)
                    labels.append(class_to_idx[target])

        self.images = images
        self.labels = labels
        self.class_to_idx = class_to_idx

    def __len__(self):
        # return size of dataset
        return len(self.images)
      
    def __getitem__(self, idx):
        # open image, apply transforms and return with label
        image = Image.open(self.images[idx])  # PIL image
        image = image.convert('RGB')
        image = self.transform(image)
        label = self.labels[idx]       
        return image, label
    
    def get_labels(self):
        return self.labels

    def get_class_to_idx(self):
        return self.class_to_idx

In [None]:
class TinyImageNetTestDataset(Dataset):
    """
       Customized Dataset class to load test dataset.
       
       Code is adapted from code of pytorch sourcecode of DatasetFolder class 
       https://pytorch.org/docs/stable/_modules/torchvision/datasets/folder.html#DatasetFolder.
    """
    def __init__(self, data_dir, annot_filename, class_to_idx, transform):      

        self.transform = transform
                        
        data_dir = os.path.expanduser(data_dir)
        
        images = []
        labels = []
        with open(os.path.join(data_dir, annot_filename),'r') as f:
            reader=csv.reader(f,delimiter='\t')
            for imagename, classname, _, _, _, _ in reader:
                images.append(os.path.join(data_dir, 'images', imagename))
                labels.append(class_to_idx[classname])
                
        self.images = images
        self.labels = labels
          
    def __len__(self):
        # return size of dataset
        return len(self.images)
      
    def __getitem__(self, idx):
        # open image, apply transforms and return with label
        image = Image.open(self.images[idx])  # PIL image
        image = image.convert('RGB')
        image = self.transform(image)
        label = self.labels[idx]
        return image, label
    
    def get_labels(self):
        return self.labels


In [35]:

class CNN(nn.Module):
    """ Convolutional Neural Network implementation """
    def __init__(self):
        super(CNN, self).__init__()
        
        self.conv0 = nn.Conv2d(3, 32, 3, stride = 1, padding = 1) #(64*64*3) -> (64*64*32)
        self.bn0 = nn.BatchNorm2d(32)
        self.conv1 = nn.Conv2d(32, 64, 3, stride = 1, padding = 1) #(64*64*32) -> (64*64*64)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 64, 3, stride = 2, padding = 0) #(64*64*64) -> (31*31*64)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, 1, stride = 1, padding = 0) #(31*31*64) -> (31*31*64)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 64, 3, stride = 1, padding = 1) #(31*31*64) -> (31*31*64)
        self.bn4 = nn.BatchNorm2d(64)
        self.conv5 = nn.Conv2d(64, 64, 3, stride = 2, padding = 0) #(31*31*64) -> (15*15*64)
        self.bn5 = nn.BatchNorm2d(64)
        self.conv6 = nn.Conv2d(64, 128, 1, stride = 1, padding = 0) #(15*15*64) -> (15*15*128)
        self.bn6 = nn.BatchNorm2d(128)
        self.conv7 = nn.Conv2d(128, 256, 3, stride = 2, padding = 0) #(15*15*128) -> (8*8*256)
        self.bn7 = nn.BatchNorm2d(256)
        self.conv8 = nn.Conv2d(256, 512, 3, stride = 2, padding = 0) #(8*8*256) -> (3*3*512)
        self.bn8 = nn.BatchNorm2d(512)
        self.fc1 = nn.Linear(512*3*3, 512) #(3*3*512 -> 512)
        self.bn9 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 200) #(512 -> 200)
        
        self.dropout = nn.Dropout(0.15)
                
    def forward(self, input):
        x = F.relu(self.bn0(self.dropout(self.conv0(input))))
        x = F.relu(self.bn1(self.dropout(self.conv1(x))))
        x = F.relu(self.bn2(self.dropout(self.conv2(x))))
        x = F.relu(self.bn3(self.dropout(self.conv3(x))))        
        x = F.relu(self.bn4(self.dropout(self.conv4(x))))
        x = F.relu(self.bn5(self.dropout(self.conv5(x))))
        x = F.relu(self.bn6(self.dropout(self.conv6(x))))
        x = F.relu(self.bn7(self.dropout(self.conv7(x))))
        x = F.relu(self.bn8(self.dropout(self.conv8(x))))
        x = x.view(-1, 3*3*512)
        x = F.relu(self.bn9(self.dropout(self.fc1(x))))
        x = self.fc2(x)
        return x

In [38]:
class ResNetLayer(nn.Module):
    """ Implementaion of One basic Block of Resnet model """
    def __init__(self, in_feature_maps, out_feature_maps, downsample = True):
        super(ResNetLayer, self).__init__()

        self.stride = 2 if downsample == True else 1
        self.conv0 = nn.Conv2d(in_feature_maps, out_feature_maps, 3, stride = self.stride, padding = 1)
        self.bn0 = nn.BatchNorm2d(out_feature_maps)
        self.conv1 = nn.Conv2d(out_feature_maps, out_feature_maps, 3, stride = 1, padding = 1)
        self.bn1 = nn.BatchNorm2d(out_feature_maps)

        self.skipconn_cnn = nn.Conv2d(in_feature_maps, out_feature_maps, kernel_size=1, stride=self.stride, padding = 0)
        self.skipconn_bn = nn.BatchNorm2d(out_feature_maps)

    def forward(self, input):
        
        x = F.relu(self.bn0(self.conv0(input)))
        x = F.relu(self.bn1(self.conv1(x)))
        skipconn = self.skipconn_bn(self.skipconn_cnn(input))
        return F.relu(x + skipconn)

class ResNet(nn.Module):
    """ Implementaion of ResNet like Neural Network Model """
    def __init__(self):
        super(ResNet, self).__init__()
        
        self.conv0 = nn.Conv2d(3, 64, 3, stride = 1, padding = 1) #(64*64*3) -> (64*64*64)
        self.bn0 = nn.BatchNorm2d(64)
        self.layer1 = ResNetLayer(64,128, downsample = False) #(64*64*64) -> (64*64*128)
        self.layer2 = ResNetLayer(128,128, downsample = True) #(64*64*128) -> (32*32*128)
        self.layer3 = ResNetLayer(128,256, downsample = False) #(32*32*128) -> (32*32*256)
        self.layer4 = ResNetLayer(256,256, downsample = True) #(32*32*256) -> (16*16*256)
        self.layer5 = ResNetLayer(256,512, downsample = False) #(16*16*256) -> (16*16*512)
        self.layer6 = ResNetLayer(512,512, downsample = True) #(16*16*512) -> (8*8*512)
        self.layer7 = ResNetLayer(512,512, downsample = True) #(8*8*512) -> (4*4*512)
        self.layer8 = ResNetLayer(512,512, downsample = True) #(4*4*512) -> (2*2*512)
        
        self.fc1 = nn.Linear(2048, 200) #(2048 -> 200)
        self.dropout = nn.Dropout(0.15)
        
    def forward(self, input):
        x = F.relu(self.bn0(self.dropout(self.conv0(input))))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        x = self.layer7(x)
        x = self.layer8(x)
        x = x.view(-1, 2*2*512)
        x = self.fc1(x)
        return x

In [39]:
 def init_weights(module):
    """ Function to initialze weights """
    if isinstance(module, nn.Conv2d):
        init.xavier_uniform_(module.weight)
    if isinstance(module, nn.Linear):
        init.xavier_uniform_(module.weight)

In [None]:
def save_model(filename, model, optimizer, scheduler, epoch, loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist, early_stop_counter):
    """
        Function to save model.
        
        Function saves model and other training related information so that it can be loaded later to resume training or for inference.
        It is called by fit() function to save best model during training.
    """
    state_dict = {
        'epoch':epoch,
        'model': model.state_dict(),
        'optimizer': optimizer.state_dict(),
        'scheduler': scheduler.state_dict(),
        'loss_tr_hist': loss_tr_hist,
        'loss_val_hist': loss_val_hist,
        'accuracy_tr_hist': accuracy_tr_hist,
        'accuracy_val_hist': accuracy_val_hist,
        'early_stop_counter': early_stop_counter
    }
    torch.save(state_dict, filename)

In [None]:
def load_model(filename, model, optimizer = None, scheduler = None, mode = 'test'):
    """
        This function loads previously saved model and its related training details from file specified by filename.
        
        Parameters:
            filename : path of saved model file.
            model : Instance of model to be loaded.
            optimizer : Instance of optimizer to be loaded to previous saved state. Useful to resume training of model from saved state.
            scheduler : Instance of scheduler to be loaded to previous saved state. Useful to resume training of model from saved state.
            mode : Values should be 'train' or 'test'. If value is 'train', it returns model and all other information required to resume training from saved state.
                   If value is 'test', it loads and returns only model.
    """
    state_dict = torch.load(filename)

    model.load_state_dict(state_dict['model'])
    if mode == 'test':
        return model

    epoch = state_dict['epoch']
    optimizer.load_state_dict(state_dict['optimizer'])
    loss_tr_hist = state_dict['loss_tr_hist']
    loss_val_hist = state_dict['loss_val_hist']
    accuracy_tr_hist = state_dict['accuracy_tr_hist']
    accuracy_val_hist = state_dict['accuracy_val_hist']
    early_stop_counter = state_dict['early_stop_counter']
    if scheduler is not None:
        scheduler.load_state_dict(state_dict['scheduler'])

    return epoch, model, optimizer, scheduler, early_stop_counter, loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist

In [None]:
def train(dataloader, model, optimizer, criterion):
    """
        Function to perform training step.
        
        This function performs primary step of training. It performs forward and backward pass to update model parameters. It is called by fit() function during each epoch. 
        Returns loss and accuracy for current epoch.
    """
    batch = 0
    loss = 0.0
    correct = 0.0

    model.train()    
    
    for X, Y in dataloader:
        if gpu:
            X = X.cuda()
            Y = Y.cuda()
        optimizer.zero_grad()        
        logits = model(X)
        cur_loss = criterion(logits, Y)
        cur_loss.backward()
        optimizer.step()
        loss += cur_loss.item()
        pred = logits.argmax(dim = 1)
        correct += pred.eq(Y).sum()

        # Display Progres Bar. 
        # Reference - https://stackoverflow.com/questions/46141302/how-to-make-a-still-progress-in-python/46141777
        batch += 1
        completed = math.floor(batch * dataloader.batch_size / len(dataloader.dataset) * 50)
        print('\r' + 'Training: ' + '▮' * completed + '▯' * (50-completed) + str(completed*2) + '%', end='')
    
    print('\r', end='')
    
    loss = loss / float(len(dataloader.dataset))
    accuracy = float(correct) / float(len(dataloader.dataset)) * 100
    
    return loss, accuracy

In [None]:
def validate(dataloader, model, criterion):
    """
        Function to perform validation step.
        
        This function is used to perform validation of a model. It is called by function fit() during each epoch.
        Returns validation loss and accuracy for current epoch.
    """

    batch = 0    
    loss = 0.0
    correct = 0.0
    
    model.eval()
    
    for X, Y in dataloader:
        if gpu:
            X = X.cuda()
            Y = Y.cuda()
        logits = model(X)
        loss += criterion(logits, Y).item()
        pred = logits.argmax(dim = 1)
        correct += pred.eq(Y).sum()

        # Display Progres Bar. 
        # Reference - https://stackoverflow.com/questions/46141302/how-to-make-a-still-progress-in-python/46141777        
        batch += 1
        completed = math.floor(batch * dataloader.batch_size / len(dataloader.dataset) * 50)
        print('\r' + 'Validation: ' + '▮' * completed + '▯' * (50-completed) + str(completed*2) + '%', end='')
    
    print('\r', end='')        
        
    loss = loss / float(len(dataloader.dataset))
    accuracy = float(correct) / float(len(dataloader.dataset)) * 100
    
    return loss, accuracy

In [None]:
def test(model, dataloader):
    """ Infers output of given trained model for given test data. """
    loss = 0.0
    correct = 0.0
    accuracy = 0.0

    model.eval()
    
    for X, Y in dataloader:
        if gpu:
            X = X.cuda()
            Y = Y.cuda()        
        logits = model(X)
        loss += criterion(logits, Y).item()
        pred = logits.argmax(dim = 1)
        correct += pred.eq(Y).sum()
        
    loss = loss / float(len(dataloader_test.dataset))
    accuracy = float(correct) / float(len(dataloader_test.dataset)) * 100
    return pred, loss, accuracy


In [None]:

def fit(dataloader_tr, dataloader_val, model, criterion, optimizer, max_epoch = 100, scheduler = None, filename = None, early_stop = True, patience = 10, resume = False):
    """
        Function to train and validate model for given epochs. It calls train and validate functions.
        
        Parameters: 
            dataloader_tr : data loader for training dataset.
            dataloader_val : dataloader for validation dataset.
            model : Instance of a Model. which is to be trained.
            criterion : criterion or loss function
            optimizer : Instance of Optimizer.
            max_epoch : Maximum number of epochs to train model
            scheduler : learning rate scheduler to change value of learning rate while model is trained.
            filename : Filename to save the best model during training. Function will save model with lowest validation loss, so that best model can be retrieved after training.
                       If resume = True, filename will be used to load previously saved model and resume the training.
            early_stop : If True, training will be stopped when validation_loss doesnt improve for epochs specified by patience. Recommended to prevent overfitting.
            patience : number of epochs to wait for early_stopping.
            resume : If True, model specified by filename will be loaded and training will be resumed for loaded model.
        Returns history of Training and Validation loss, and Training and Validation Accuracy.
    """
    start_epoch = 0
    early_stop_counter = 0
    min_loss_val = 1e10    
    loss_tr_hist = []
    loss_val_hist = []
    accuracy_tr_hist = []
    accuracy_val_hist = []

    if resume == True:
        if filename is None:
            print('Please Provide File Name to load model')
            return
        start_epoch, model, optimizer, scheduler, early_stop_counter, loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist = load_model(filename, model, optimizer, scheduler, mode = 'train')
        
        
    for epoch in range(start_epoch+1, max_epoch + 1):
        t0 = time.time()

        loss_tr, accuracy_tr = train(dataloader_tr, model, optimizer, criterion)
        loss_tr_hist.append(loss_tr)
        accuracy_tr_hist.append(accuracy_tr)

        loss_val, accuracy_val = validate(dataloader_val, model, criterion)
        loss_val_hist.append(loss_val)
        accuracy_val_hist.append(accuracy_val)

        if scheduler is not None:
            scheduler.step(loss_val)

        early_stop_counter += 1
        if loss_val < min_loss_val:
            if filename is not None:                
                save_model(filename, model, optimizer, scheduler, epoch, loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist, early_stop_counter)
            min_loss_val = loss_val
            early_stop_counter = 0
        
        print("[{0:3d} / {1:3d}]  |  Loss_Tr: {2:7.4f}  |  Loss_Val: {3:7.4f}  |  Acc_Tr: {4:7.4f}  |  Acc_Val: {5:7.4f}  |  Time taken: {6:7.4f}s  |  {7}".format(epoch, max_epoch, loss_tr, loss_val, accuracy_tr, accuracy_val, time.time() - t0, "Best Model" if early_stop_counter == 0 else ""))
        
        if early_stop == True and early_stop_counter > patience:
            print('\nEarly Stopping ... !')
            break
    return loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist

In [None]:
def plot(loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist):
    """ Plots training loss vs validation loss and training accuracy vs validation accuracy graphs. """
    fig, ax = plt.subplots()
    fig.set_figheight(5)
    fig.set_figwidth(10)

    plt.subplot(121)
    plt.plot(loss_tr_hist)
    plt.plot(loss_val_hist)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(('Training', 'Validation'))

    plt.subplot(122)
    plt.plot(accuracy_tr_hist)
    plt.plot(accuracy_val_hist)
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(('Training', 'Validation'))
    plt.show()

In [40]:
# Initialize dataset related parameters

data_dir = 'tiny-imagenet-200'

transform1 = transforms.RandomApply([
    transforms.ColorJitter(hue=.05, saturation=.05),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20, resample=PIL.Image.BICUBIC),
    transforms.RandomAffine(degrees = 0, translate=(0.1,0.25), scale=(1,1.5), shear=5, resample=PIL.Image.BICUBIC, fillcolor=0)
])
transform2 = transforms.ToTensor()

workers = 1
batch_size = 64

# Check if GPU is available
gpu = torch.cuda.is_available()

if gpu:
    print('Training on GPU')
else:
    print('Training on CPU')

Training on GPU


In [None]:
# Load training dataset and split into training and validation sets by stratified shuffle split so that each class has equal training and validation samples.

dataset = TinyImageNetDataset(data_dir+'/train', transform=transforms.Compose([transform1, transform2]))
labels = dataset.get_labels()

sss = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=0)
idx_tr, idx_val = next(sss.split(labels, labels))

dataset_tr = torch.utils.data.Subset(dataset, idx_tr)
dataset_val = torch.utils.data.Subset(dataset, idx_val)

dataloader_tr = torch.utils.data.DataLoader(dataset_tr, batch_size=batch_size, shuffle=True, num_workers=workers)
dataloader_val = torch.utils.data.DataLoader(dataset_val, batch_size=batch_size, shuffle=True, num_workers=workers)

print('Training Dataset Length: ' + str(len(dataset_tr)))
print('Validation Dataset Length: ' + str(len(dataset_val)))

In [None]:
# Load test dataset.

class_to_idx = dataset.get_class_to_idx()
dataset_test = TinyImageNetTestDataset(data_dir+'/val', annot_filename = 'val_annotations.txt', class_to_idx = class_to_idx, transform=transform2)
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=batch_size, shuffle=True, num_workers=workers)

print('Test Dataset Length: ' + str(len(dataset_test)))

In [None]:
# Initialize model and its weights
#model = ResNet()
model = CNN()
model.apply(init_weights)
if gpu:
    model.cuda()

In [None]:
# Initialize hyper parameters and functions required for training
max_epochs = 100
lr = 0.015 #0.015 with weight decay 0.05 for ResNet without Weight Decay

optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, dampening=0, weight_decay=0.001, nesterov=True)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=7, verbose=False, threshold=0.0005, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)
criterion = nn.CrossEntropyLoss()
model_filename = 'best_model_cnn_47.pth'

In [None]:
# Train the model.
loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist = fit(dataloader_tr, dataloader_val, model, criterion, optimizer, scheduler = scheduler, filename = model_filename, resume = True)

In [None]:
plot(loss_tr_hist, loss_val_hist, accuracy_tr_hist, accuracy_val_hist)

In [None]:
#model = ResNet()
model = CNN()
model = load_model(model_filename, model, mode = 'test')
if gpu:
    model.cuda()
pred, loss_test, accuracy_test = test(model, dataloader_test)
print('Test Loss: {0:7.4f}  |  Test Accuracy: {1:7.4f}'.format(loss_test, accuracy_test))
