I used Kaggle to run this notebook with accelerator P100, and then switched to Colab for training and testing my model as I ran out of GPU quota in Kaggle

In [6]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import timm
from timm.loss import LabelSmoothingCrossEntropy
import glob
import torchvision
from torchvision import datasets, transforms
from torchvision.io import read_image
from torch.utils.data import Dataset, DataLoader
import pathlib
import matplotlib.pyplot as plt
import random
import pathlib
import csv
import pandas as pd
from PIL import Image
from operator import itemgetter
import argparse

In [7]:
%matplotlib inline

def seed_everything(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    os.environ['PYTHONHASHSEED']=str(seed)

In [21]:
batch_size = 256      #batch size
num_classes = 10      #number of classes
epochs = 60         #epoch size

In [9]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # check is GPU is available
print(device)

cpu


In [29]:
# specify class for loading the training dataset

class TrainDataset(Dataset):
    def __init__(self, img_dir, label_file, transform=None, train_idxs=None):
        self.img_labels = pd.read_csv(label_file)
        self.img_dir = img_dir
        self.transform = transform
        self.train_idxs = train_idxs

    def __len__(self):
        if self.train_idxs is not None:
            return len(self.train_idxs)
        else:
            return len(self.img_labels)

    def __getitem__(self, idx):
        if self.train_idxs is not None:
            img_path = os.path.join(self.img_dir, str(self.img_labels.iloc[self.train_idxs[idx], 0]))
            label = self.img_labels.iloc[self.train_idxs[idx], 1]
        else:
            img_path = os.path.join(self.img_dir, str(self.img_labels.iloc[idx, 0]))
            label = self.img_labels.iloc[idx, 1]
        image = read_image(img_path + '.jpg') 

        image = self.transform(image)

        return image, label


In [14]:
# specify class for loading the test dataset

class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.transform = transform
        self.input_paths = sorted(glob.glob(img_dir+ '/*.jpg'))

    def __len__(self):
        return len(self.input_paths)

    def __getitem__(self, idx):
        img_path = self.input_paths[idx]
        image = read_image(img_path)
        # check what type of image is being read before/after applying transformations
        # I added this because I was getting an error that the image types were different before/after transformation
        print("Type of image before transformation:", type(image))
        if self.transform:
            image = self.transform(image)
            print("Type of image after transformation:", type(image))
        return image, img_path

In [15]:
# function for running the test dataset after training

def test(model, test_loader, device):
    model.eval()
    
    preds_idxs = []
    preds = []
    sel_test_data = []
    sel_test_labels =[]
    F = nn.Softmax(dim=1)
    with torch.no_grad():
        for idx, (inputs, IDS) in enumerate(test_loader):
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            preds.append(predicted.cpu().detach().numpy())
            preds_idxs.append(IDS)
            scores, indices = F(outputs).max(dim=1)

    L = []
    for P, I in zip(preds, preds_idxs):
        for num, file in enumerate(I):
            img_id = int(os.path.split(file)[-1].split('.')[0])
            img_label = P[num].item()
            L.append([img_id, img_label])
    L = sorted(L, key=itemgetter(0))
    
    # write the test predictions into a csv file
    with open('test_prediction.csv','w+') as f:
        writer = csv.writer(f)
        writer.writerow(['id', 'label'])
        writer.writerows(L)

    return preds, preds_idxs, sel_test_data, sel_test_labels

In [2]:
# function for training the model

def train(model, train_loader, val_loader, test_loader, device, optimizer, loss_function, scheduler, train_transform = None):

    for epoch in range(epochs):

        model.train()
        train_loss = 0
        acc = 0

        for idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_function(outputs, targets)
            loss.backward()
           
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            acc += predicted.eq(targets).sum().item() / len(targets)


        print('epoch: ', epoch, 'Running loss: %.4f | Train accuracy: %.4f'% (train_loss/(idx+1), 100.*acc/(idx+1)))
    scheduler.step()

In [20]:
'''I tried to use EfficientNet B0 for training my model (no pre-trained weights)
"EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks" https://doi.org/10.48550/arXiv.1905.11946
The code was adapted from https://github.com/lukemelas/EfficientNet-PyTorch/blob/master/efficientnet_pytorch/model.py
'''

def swish(x):
    return x * x.sigmoid()


def drop_connect(x, drop_ratio):
    keep_ratio = 1.0 - drop_ratio
    mask = torch.empty([x.shape[0], 1, 1, 1], dtype=x.dtype, device=x.device)
    mask.bernoulli_(keep_ratio)
    x.div_(keep_ratio)
    x.mul_(mask)
    return x


class SE(nn.Module):
    '''Squeeze-and-Excitation block with Swish.'''

    def __init__(self, in_channels, se_channels):
        super(SE, self).__init__()
        self.se1 = nn.Conv2d(in_channels, se_channels,
                             kernel_size=1, bias=True)
        self.se2 = nn.Conv2d(se_channels, in_channels,
                             kernel_size=1, bias=True)

    def forward(self, x):
        out = F.adaptive_avg_pool2d(x, (1, 1))
        out = swish(self.se1(out))
        out = self.se2(out).sigmoid()
        out = x * out
        return out


class Block(nn.Module):
    '''expansion + depthwise + pointwise + squeeze-excitation'''

    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size,
                 stride,
                 expand_ratio=1,
                 se_ratio=0.,
                 drop_rate=0.):
        super(Block, self).__init__()
        self.stride = stride
        self.drop_rate = drop_rate
        self.expand_ratio = expand_ratio

        # Expansion
        channels = expand_ratio * in_channels
        self.conv1 = nn.Conv2d(in_channels,
                               channels,
                               kernel_size=1,
                               stride=1,
                               padding=0,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(channels)

        # Depthwise conv
        self.conv2 = nn.Conv2d(channels,
                               channels,
                               kernel_size=kernel_size,
                               stride=stride,
                               padding=(1 if kernel_size == 3 else 2),
                               groups=channels,
                               bias=False)
        self.bn2 = nn.BatchNorm2d(channels)

        # SE layers
        se_channels = int(in_channels * se_ratio)
        self.se = SE(channels, se_channels)

        # Output
        self.conv3 = nn.Conv2d(channels,
                               out_channels,
                               kernel_size=1,
                               stride=1,
                               padding=0,
                               bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)

        # Skip connection if in and out shapes are the same (MV-V2 style)
        self.has_skip = (stride == 1) and (in_channels == out_channels)

    def forward(self, x):
        out = x if self.expand_ratio == 1 else swish(self.bn1(self.conv1(x)))
        out = swish(self.bn2(self.conv2(out)))
        out = self.se(out)
        out = self.bn3(self.conv3(out))
        if self.has_skip:
            if self.training and self.drop_rate > 0:
                out = drop_connect(out, self.drop_rate)
            out = out + x
        return out


class EfficientNet(nn.Module):
    def __init__(self, cfg, num_classes=10):
        super(EfficientNet, self).__init__()
        self.cfg = cfg
        self.conv1 = nn.Conv2d(3,
                               32,
                               kernel_size=3,
                               stride=1,
                               padding=1,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.layers = self._make_layers(in_channels=32)
        self.linear = nn.Linear(cfg['out_channels'][-1], num_classes)

    def _make_layers(self, in_channels):
        layers = []
        cfg = [self.cfg[k] for k in ['expansion', 'out_channels', 'num_blocks', 'kernel_size',
                                     'stride']]
        b = 0
        blocks = sum(self.cfg['num_blocks'])
        for expansion, out_channels, num_blocks, kernel_size, stride in zip(*cfg):
            strides = [stride] + [1] * (num_blocks - 1)
            for stride in strides:
                drop_rate = self.cfg['drop_connect_rate'] * b / blocks
                layers.append(
                    Block(in_channels,
                          out_channels,
                          kernel_size,
                          stride,
                          expansion,
                          se_ratio=0.25,
                          drop_rate=drop_rate))
                in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = swish(self.bn1(self.conv1(x)))
        out = self.layers(out)
        out = F.adaptive_avg_pool2d(out, 1)
        out = out.view(out.size(0), -1)
        dropout_rate = self.cfg['dropout_rate']
        if self.training and dropout_rate > 0:
            out = F.dropout(out, p=dropout_rate)
        out = self.linear(out)
        return out


def EfficientNetB0():
    config = {
        'num_blocks': [1, 2, 2, 3, 3, 4, 1],
        'expansion': [1, 6, 6, 6, 6, 6, 6],
        'out_channels': [16, 24, 40, 80, 112, 192, 320],
        'kernel_size': [3, 3, 5, 3, 5, 5, 3],
        'stride': [1, 2, 2, 2, 1, 2, 1],
        'dropout_rate': 0.2,
        'drop_connect_rate': 0.2,
    }
    return EfficientNet(config)

torch.Size([2, 10])


In [33]:
# main function for running the training and test datasets with parameters

def main():

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print('Device is: ', device)
    seed_everything(940)


    # data augmentation
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomCrop(32, padding=4),
        transforms.RandomRotation(degrees=10),
        transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 5)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    '''
    # no data augmentation
    transformation = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    '''

    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    # data loaders
    train_loader = torch.utils.data.DataLoader(
        train_set, batch_size=batch_size, shuffle=True, num_workers=2, persistent_workers=True)
    val_loader = None

    test_loader = torch.utils.data.DataLoader(
        test_set, batch_size=batch_size, shuffle=False, num_workers=2, persistent_workers=True)

    classes = ('Household Items', 'Vehicles', 'Flowers', 'Big mammals', 'Medium mammals',
               'Small mammals', 'Aquatic mammals', 'Airplane', 'Food', 'Birds')


    model = EfficientNetB0()

    model = model.to(device)

    if device == 'cuda':
        model = torch.nn.DataParallel(model)
        cudnn.benchmark = True

    #loss_function = nn.CrossEntropyLoss()
    loss_function = LabelSmoothingCrossEntropy() #ref: https://timm.fast.ai/loss.cross_entropy

    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
    #optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)

    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

    print("Starting training...")

    train(model, train_loader, val_loader, test_loader, device, optimizer, loss_function, scheduler, train_transform=train_transform)

    print('Finished training...')
    
    print('Start testing...')

    test(model, test_loader, device)

    print("Finished testing...")


if __name__=="__main__":
    main()

Device is:  cpu
Starting training...


KeyboardInterrupt: 