In [1]:
import numpy as np
import pandas as pd
import torch

import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib widget

# Reproducibility

In [2]:
import os
import random

def reset_seed():
    seed = 42
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    torch.manual_seed(seed)

reset_seed()

# Data and transformations

## Data

I am using the description over at http://yann.lecun.com/exdb/mnist/, and assume that it's the same for Fashion MNIST.

In [4]:
from pathlib import Path
from torch.utils.data import Dataset
from PIL import Image

class FashionMNIST:
    def load_images(self, fpath):
        file = open(fpath, 'rb')
        
        file.read(4) # magic number
        nimages = int.from_bytes(file.read(4), 'big')
        rows = int.from_bytes(file.read(4), 'big')
        cols = int.from_bytes(file.read(4),'big')
        
        return np.array([Image.frombytes('L', (rows, cols), file.read(rows * cols))
                         for n in range(nimages)], dtype=object)
    
    def load_labels(self, fpath):
        file = open(fpath, 'rb')
        
        file.read(4) # magic number
        nlabels = int.from_bytes(file.read(4),'big')
        labels = file.read(nlabels)
        
        return np.frombuffer(labels, dtype=np.uint8).astype(np.int64)
    
    def __init__(self, images, labels=None):
        self.images = self.load_images(images)
        if labels is not None:
            self.labels = self.load_labels(labels)

class TransDataset(Dataset):
    def __init__(self, X, y=None, transform=None):
        self.X = X
        self.y = y
        self.transform = transform
    
    def __getitem__(self, ix):
        x = self.X[ix]
        if self.transform is not None:
            x = self.transform(x)
        
        if self.y is not None:
            return x, self.y[ix]
        else:
            return x
    
    def __len__(self):
        return len(self.X)

def load_mnist():
    root = Path('data', 'FashionMNIST', 'raw')
    train_images = Path(root, 'train-images-idx3-ubyte')
    train_labels = Path(root, 'train-labels-idx1-ubyte')
    test_images = Path(root, 't10k-images-idx3-ubyte')
    
    train_mnist = FashionMNIST(train_images, train_labels)
    test_mnist = FashionMNIST(test_images)
    
    return {'train': train_mnist, 'test': test_mnist}

mnist = load_mnist()    
    
def make_datasets(mnist, transforms=None, total_frac=1, val_frac=0.1):
    if transforms:
        train_trans = transforms['train']
        val_trans = transforms['val']
    else:
        train_trans = None
        val_trans = None
    
    nsamples = mnist['train'].images.shape[0]
    ntotal = int(np.floor(total_frac*nsamples))
    total_Ix = np.random.choice(nsamples, ntotal)
    nval = int(np.floor(val_frac*len(total_Ix)))
    val_Ix = total_Ix[np.random.choice(len(total_Ix), nval)]
    train_Ix = np.setdiff1d(total_Ix, val_Ix)
    
    train_images = mnist['train'].images[train_Ix]
    train_labels = mnist['train'].labels[train_Ix]
    val_images = mnist['train'].images[val_Ix]
    val_labels = mnist['train'].labels[val_Ix]
    test_images = mnist['test'].images
    
    train_ds = TransDataset(train_images, train_labels, transform=train_trans)
    val_ds = TransDataset(val_images, val_labels, transform=val_trans)
    test_ds = TransDataset(test_images, transform=val_trans)
    
    return {'train': train_ds, 'val': val_ds, 'test': test_ds}

  return np.array([Image.frombytes('L', (rows, cols), file.read(rows * cols))


Let's take a look at the unagumented data.

In [4]:
examples_ds = make_datasets(mnist)['train']

nexamples = 5
classes = np.unique(examples_ds.y)

fig, axes = plt.subplots(len(classes), nexamples)
fig.tight_layout()

for i, class_ in enumerate(classes):
    images_Ix = np.where(examples_ds.y == class_)[0]
    samples_Ix = images_Ix[np.random.choice(len(images_Ix), nexamples)]
    for j, sample_Ix in enumerate(samples_Ix):
        axes[i][j].imshow(examples_ds.X[sample_Ix], cmap='gray')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Transformations

Inasfar as data augmentation is concerned, we will upscale (crops of) images to 128x128, apply random flips and rotations, and also random erasing. This *should* make our models more robust (of course the best strategy is to manually check what works and what doesn't)

For validation images (and test images for final results), we will only resize the images.

In [5]:
from torchvision import transforms

datasets = make_datasets(mnist, transforms={
    'train': transforms.Compose([
#         transforms.RandomResizedCrop(64, scale=(0.75, 1)),
#         transforms.RandomHorizontalFlip(),
#         transforms.RandomRotation(90),
#         transforms.Resize(32),
        transforms.ToTensor(),
        transforms.RandomErasing(),
        transforms.Normalize((0.5), (0.5)),
    ]),
    'val': transforms.Compose([
#         transforms.Resize(32),
        transforms.ToTensor(),
        transforms.Normalize((0.5), (0.5))
    ])
})

With these augmentations, the images are as follows:

In [6]:
examples_Ix = np.random.choice(len(datasets['train']), 10)
fig, axes = plt.subplots(2, 5)
axes_Ix = [(i, j) for i in range(2) for j in range(5)]

for (i, j), example_ix in zip(axes_Ix, examples_Ix):
    axes[i][j].imshow(datasets['train'][example_ix][0][0], cmap='gray')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Downscaling from 64x64 to 32x32 should not (in my estimation) impact the accuracy that much.

# PyTorch models

Let's start without ensembling.

## ResNet18, reduced

For reference, I decided to look at ResNet18 implementation.

In [7]:
from torchvision.models import resnet18
resnet = resnet18()
resnet

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

We need to make a number of adjustments:
- the number of channels in input (`conv1`) layer should be 1 instead of 3;
- the output (`fc`) layer should have 10 outputs (as there are 10 classes in out dataset), as opposed to 1000 (from ImageNet).

I would also conjecture that we do not really need *that many* channels to get the accuracy we want.

In [8]:
from torch.nn import Conv2d, Linear
resnet.conv1 = Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
resnet.fc = Linear(in_features=512, out_features=10, bias=True)

## Simple (i.e. custom) CNNs

We will also test some custom-defined models, usually modeled on the first few layers of ResNet.

In [27]:
import torch.nn as nn
import torch.nn.functional as func

class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 5, 1, 2)
        self.pool = nn.MaxPool2d(2, 2)
        self.bn1 = nn.BatchNorm2d(16, eps=1e-05, momentum=0.1,
                                  affine=True, track_running_stats=True)
        
        self.conv2 = nn.Conv2d(16, 32, 5, 1, 2)
        self.bn2 = nn.BatchNorm2d(32, eps=1e-05, momentum=0.1,
                                  affine=True, track_running_stats=True)
        
        self.conv3 = nn.Conv2d(32, 64, 5, 1, 2)
        self.bn3 = nn.BatchNorm2d(64, eps=1e-05, momentum=0.1,
                                  affine=True, track_running_stats=True)
        
        self.fc1 = nn.Linear(64 * 14 * 14, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 10)
    
    def forward(self, x):
        x = self.bn1(self.conv1(x))
        x = self.pool(func.relu(x))
        
        x = self.bn2(self.conv2(x))
        x = func.relu(x)
        
        x = self.bn3(self.conv3(x))
        
        x = x.view(-1, 64 * 14 * 14)
        x = func.relu(self.fc1(x))
        x = func.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x

## Training

First, we need to define data loaders and some other things related to Pytorch.

In [28]:
from torch.utils.data import DataLoader

dataloaders = {
    'train': DataLoader(datasets['train'], batch_size=4,
                        shuffle=True, num_workers=4),
    'val': DataLoader(datasets['val'], batch_size=4,
                      shuffle=True, num_workers=4),
    'test': DataLoader(datasets['test'], batch_size=4,
                       num_workers=4),
}
dataset_sizes = {name: len(ds) for name, ds in datasets.items()}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


Then, the training code. (Taken from https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html, I'm not sure whether that's "cheating", but I didn't exactly think of any reason to modify it)

In [29]:
import time
import copy

def report_time(since):
    time_elapsed = time.time() - since
    print('Elapsed: {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        report_time(since)
        print()

    report_time(since)
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

Now, we can train it.

In [30]:
from torch.nn import CrossEntropyLoss
from torch.optim import lr_scheduler, SGD, AdamW

model = SimpleNet()
model.to(device)

criterion = CrossEntropyLoss()
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9)
# optimizer = AdamW(model.parameters())
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

model = train_model(model, criterion, optimizer, scheduler, num_epochs=25)
torch.save(model.state_dict(), Path('models', 'simple.adam'))

Epoch 1/25
----------
train Loss: 0.5026 Acc: 0.8156
val Loss: 0.3495 Acc: 0.8703
Elapsed: 3m 29s

Epoch 2/25
----------
train Loss: 0.3441 Acc: 0.8734
val Loss: 0.3027 Acc: 0.8858
Elapsed: 6m 48s

Epoch 3/25
----------
train Loss: 0.2978 Acc: 0.8909
val Loss: 0.2886 Acc: 0.8927
Elapsed: 10m 14s

Epoch 4/25
----------
train Loss: 0.2646 Acc: 0.9022
val Loss: 0.2762 Acc: 0.9042
Elapsed: 13m 37s

Epoch 5/25
----------
train Loss: 0.2389 Acc: 0.9084
val Loss: 0.2818 Acc: 0.9035
Elapsed: 16m 50s

Epoch 6/25
----------
train Loss: 0.2188 Acc: 0.9180
val Loss: 0.2619 Acc: 0.9068
Elapsed: 20m 7s

Epoch 7/25
----------
train Loss: 0.2014 Acc: 0.9240
val Loss: 0.2629 Acc: 0.9128
Elapsed: 23m 33s

Epoch 8/25
----------
train Loss: 0.1351 Acc: 0.9510
val Loss: 0.2517 Acc: 0.9195
Elapsed: 26m 56s

Epoch 9/25
----------
train Loss: 0.1149 Acc: 0.9561
val Loss: 0.2460 Acc: 0.9232
Elapsed: 30m 8s

Epoch 10/25
----------
train Loss: 0.1102 Acc: 0.9592
val Loss: 0.2519 Acc: 0.9218
Elapsed: 33m 12s

Epo

Alternatively, we may reload a previously trained model.

In [None]:
# model = SimpleNet()
# model.load_state_dict(torch.load(Path('models', 'simple2')))
# model.eval()

# Predictions

In [31]:
preds = pd.DataFrame(columns=['Id', 'Class'])

idx = 0
with torch.no_grad():
    for images in dataloaders['test']:
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        for pred in predicted.numpy():
            preds = preds.append({'Id': idx, 'Class': pred},
                                 ignore_index=True)
            idx += 1

preds.to_csv(Path('preds', 'simple.sgd.csv'),
             index=False)