# Reproducing 'Real-Time Single Image and Video Super-Resolution Using an Efficient Sub-Pixel Convolutional Neural Network'

In this notebook we reproduce 91 generated images from the paper **Real-Time Single Image and Video Super-Resolution 
Using an Efficient Sub-Pixel Convolutional Neural Network**. The code has been created following the paper with gaps 
filled in by ourselves.
The paper presents a convulutional neural network capable of real-time Super-Resolution (SR).
They designed a CNN architecture where the feature maps are extracted in the Low-Resolution(LR) space, 
and introducing an efficient sub-pixel convolution layer which learns an array of upscaling filters to upscale
the final LR feature maps into the High-Resolution(HR) output.
A researcher would like a reproduction of this to test on his high-res dataset.

---
### SRCNN
Recovers SR output image from an upscaled and interpolated version of LR image. 
Uses bicubic interpolation (as a special case of the deconvolution layer) to upscale image.
Deconvolution layer a.k.a. Transposed convolution layer: \\
_insert image_


### ESPCN
First apply a  layer CNN directly to LR image, then apply a sub-pixel convolution layer that upscales the LR feature 
maps to produce the SR output image. (to avoid upscaling LR before feeding it into the network) \\
_insert image_ \\
For a network composed of L layers, the first L−1 layers can be described as follows: \\
$f^1(\boldsymbol{I}^{LR};W_1, b_1)= \phi (W_1 \ast \boldsymbol{I}^{LR}+b_1)$ \\
$f^l(\boldsymbol{I}^{LR};W_{1:l}, b_{1:l})= \phi (W_1 \ast f^{l-1}(\boldsymbol{I}^{LR})+b_l)$ \\
Where $W_l , b_l , l \in (1, L - 1)$ are learnable network weights and biases respectively.  
$W_l$ is a 2D convolution tensor of size $n_{l-1} \times n_l \times k_l \times k_l$ , where $n_l$ is the number of 
features at layer $l$, $n_0 = C$, and $k_l$ is the filter size at layer $l$. 
The biases $b_l$ are vectors of length $n_l$ . The nonlinearity function (or activation function) $\phi$ is applied 
element-wise and is fixed. The last layer $f^L$ has to convert the LR feature maps to a HR image $\boldsymbol{I}^{SR}$.

---

## Experiment Setup 

Input = H x W x C
Output = rH x rW x C

Apply l layer CNN directly to Low Resolution (LR)
Apply sub-pixel convolution layer upscaling LR feature maps

Each layer except the last: \\
$f^1(\boldsymbol{I}^{LR};W_1, b_1)= \phi (W_1 \ast \boldsymbol{I}^{LR}+b_1)$ \\
$f^l(\boldsymbol{I}^{LR};W_{1:l}, b_{1:l})= \phi (W_1 \ast f^{l-1}(\boldsymbol{I}^{LR})+b_l)$

W & b learnable parameters

W is a 2D convolution tensor of size n_(l-1) x n_l  x k_l x k_l

n_l is the amount of features of layer l, n_0 = C, k_l is the filter size at l

biases are of length n_l

activation function is fixed and element-wise

Last layer converts LR to HR

Efficient sub-pixel convolution layer: (biggest addition)

Convolution with stride 1/r over LR with filter W_s of size k_s and weight spacing 1/r \\
Weights between pixels are not activated and do not need to be calculated \\
The number of activation patterns is exactly r^2. \\
Each pattern has at most ceil(k_s/r)^2 weights \\
Patterns are periodically activated during convolution of the filter depending on the subpixel location mod(x, r), mod(y,r), where x and y are the output pixel coordinates in HR. \\
Solution for mod(k_s, r) = 0 \\
Last layer: note NO ACTIVATION \\
 $I^{SR} = f^L(I^{LR}) = PS(W_L * f^{L-1}(I^{LR})+b_L)$ \\
PS is a periodic shuffling operator (sort of mapping) \\
_formule naar latex_ \\
W_L has shape n_(L-1) x r^2C x k_L x k_L \\
k_L = k_s/r and mod(k_s, r) = 0

Loss function: (mean squared error) \\
_formule naar latex_ \\
Preshuffle training data avoiding the use of PS

---
## Discussion
We did not manage to reproduce the results from table 1 from the [original paper](https://arxiv.org/pdf/1609.05158v2.pdf), column ESPCN (91). When trying to recreate the results from the paper, we recreated the model as described in the paper, however some hyperparameters and architectural decisions were not given. The following information was missing from the paper and made it harder to reproduce the results:
* **Optimizer**: It is not stated what optimizer is used when training the network.
* **How the learning rate changes from high to low**: It is not stated how the learning rate changes between its start- and end-value. A parameter ‘mu’ is mentioned, where the learning rate decreases with an unknown amount when the improvement is smaller than ‘mu’. However, the value of ‘mu’ is not given.
* **Gaussian blur**: To simulate the downscale image when training in a realistic manner, the training images are blurred. However, the intensity of this blur is not given.

In order to find optimal values for these parameters we export the values of these parameters and their respective results to a *.csv* file after each training session. This allowed us to easily run the training multiple times with different parameter sets and compare their respective results. Though we were able to optimize our network by finding better parameters, we were still unable to reproduce the results from the paper.

When implementing and training **ESPCN** we encountered a few other parameters that were not necessarily missing from the paper, but would have been beneficial to have more information on, namely:

* **Batch size**: The batch size was not given, nor was it clear from the paper whether batches were used at all. This information would have been useful to interpret their training times and compare them to ours.
* **Model validation**: The T91 dataset was used for training. However, it was not specified whether this dataset was split up into a train, test and validation set. Was (k-fold) cross validation used, was some other validation method used or did they train on the full dataset without validation?



---
# The model



Everything we need is imported and help methods are created



In [0]:
import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
from skimage.filters import *
from skimage.transform import *
import os
import math
import csv

hyperparameters to set


In [0]:
r = 3  # upscaling ratio
blur = 0.25  # gaussian blur
lr_start = 0.01
lr_end = 0.0001
mu = 1e-6  # threshold for lowering the lr
no_learning_threshold = 1e-8  # threshold for stopping training of no improvement has been made for 'repeats' epochs
repeats = 100
batch_size = 1
train_test_fraction = 0.8
C = 3 # amount of colour channels
dataset = "T91"
epoch_save_interval = 100
minibatch_size = 100
use_gpu = torch.cuda.is_available()

Periodic Shuffle


In [None]:
def PS(T, r):
    T = np.transpose(T, (1, 2, 0))
    rW = r * len(T)
    rH = r * len(T[0])
    C = len(T[0][0]) / (r * r)

    # make sure C is an integer and cast if this is the case
    assert (C == int(C))
    C = int(C)

    res = np.zeros((rW, rH, C))

    for x in range(len(res)):
        for y in range(len(res[x])):
            for c in range(len(res[x][y])):
                res[x][y][c] = \
                    T[x // r][y // r][C * r * (y % r) + C * (x % r) + c]
    return res


def PS_inv(img, r):
    r2 = r * r
    W = len(img) / r
    H = len(img[0]) / r
    C = len(img[0][0])
    Cr2 = C * r2

    # Make sure H and W are integers
    assert (int(H) == H and int(W) == W)
    H, W = int(H), int(W)

    res = np.zeros((W, H, Cr2))

    for x in range(len(img)):
        for y in range(len(img[x])):
            for c in range(len(img[x][y])):
                res[x // r][y // r][C * r * (y % r) + C * (x % r) + c] = img[x][y][c]
    return res

Data en sample uitleg

In [0]:
"""
Downsample images
- gaussian blur
- resize by downsample factor (using interpolation)
How To Use:
    function lr_dataset_from_path takes a path to the dataset of HR image png files and returns an ndarray to use for training the model
For debugging/showing examples:
    (see bottom of file)
    save_png set to True to save resulting lr images in specified directory.
    !check the param_ varaiables
"""

SUBSAMPLING_STRIDE_SIZE = 14
SUBSAMPLING_SAMPLE_SIZE = 17


# hr_dataset_path: dir to the hr_dataset png files
# downscale: downscale factor, e.g. if original image 64*64 and downscale=2 then result will be 32*32
# returns list of numpy.ndarray representing the lr_images
def lr_dataset_from_path(hr_dataset_path, downscale):
    original_filenames = os.listdir(hr_dataset_path)
    original_images = []
    for file in original_filenames:
        original_images.append(plt.imread(hr_dataset_path + '/' + file))
    return lr_images(original_images, downscale)  # ndarray of images


def torchDataloader_from_path(hr_dataset_path, downscale, gaussian_sigma, batch_size):
    original_filenames = os.listdir(hr_dataset_path)
    original_images = []
    for file in original_filenames:
        original_images.append(plt.imread(hr_dataset_path + '/' + file))

    # subsample
    subsamples_hr = []
    subsamples_hr_rev_shuff = []
    for i in range(len(original_images)):
        temp_subsamples = subsample(original_images[i], downscale)
        subsamples_hr += temp_subsamples
        for sample_indx in range(len(temp_subsamples)):
            subsamples_hr_rev_shuff.append(PS_inv(temp_subsamples[sample_indx], downscale))  # labels
    lr_dataset = lr_images(subsamples_hr, downscale, gaussian_sigma)  # ndarray of images
    return toDataloader(lr_dataset, subsamples_hr_rev_shuff, batch_size=batch_size)


# Takes list of images and provide LR images in form of numpy array
def lr_images(images_real, downscale, gaussianSigma):
    lr_images = []
    for img in range(len(images_real)):
        img_blurred = gaussian(images_real[img], sigma=gaussianSigma,
                               multichannel=True)  # multichannel blurr so that 3rd channel is not blurred
        lr_images.append(resize(img_blurred, (img_blurred.shape[0] // downscale, img_blurred.shape[1] // downscale)))
    return lr_images


# extract a 17r*17r subsample from original image, no overlap so every pixel appears at most once in output
def subsample(image_real, downscale):
    subsample_size = SUBSAMPLING_SAMPLE_SIZE * downscale
    subsample_stride = SUBSAMPLING_STRIDE_SIZE * downscale
    subsamples = []
    for y in range(math.floor((image_real.shape[0] - (subsample_size - subsample_stride)) / subsample_stride)):
        for x in range(math.floor((image_real.shape[1] - (subsample_size - subsample_stride)) / subsample_stride)):
            ss = image_real[(y * subsample_stride):(y * subsample_stride) + subsample_size,
                 (x * subsample_stride):(x * subsample_stride) + subsample_size]
            subsamples.append(ss)

    return subsamples


# returns a torch Dataloader (to iterate over training data) using the training data samples and traing data labels
def toDataloader(train_data, train_labels, batch_size):
    labeled_data = []
    for i in range(len(train_data)):
        labeled_data.append([np.transpose(train_data[i], (2, 0, 1)), np.transpose(train_labels[i], (2, 0, 1))])
    trainDataloader = DataLoader(labeled_data, batch_size=batch_size, shuffle=True)
    return trainDataloader

# Load the data
dataloader = torchDataloader_from_path('./datasets/' + dataset, r, blur, batch_size)
train_size = int(train_test_fraction * len(dataloader.dataset))
test_size = len(dataloader.dataset) - train_size
train_set, test_set = torch.utils.data.random_split(dataloader.dataset, [train_size, test_size])
train_dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=batch_size, shuffle=True)
print("Data loaded")

neural network uitleg


In [0]:
class Net(nn.Module):
    def __init__(self, r, C):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(C, 64, 5, padding=2)
        self.conv2 = nn.Conv2d(64, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, r * r * C, 3, padding=1)

    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = torch.tanh(self.conv2(x))
        x = self.conv3(x)
        return x


training uitleg


In [0]:
start_time = datetime.datetime.now()
print("starting training at: " + str(start_time))

net = Net(r, C)
net.double()

if use_gpu:
    net = net.cuda()

# define loss fuction
criterion = nn.MSELoss()
optimizer = optim.SGD(net.parameters(), lr=lr_start, momentum=0.75) # momentum???

losses_train = []
losses_test = []

epoch = 0
last_epoch_loss_test = float("inf")
last_epoch_loss_train = float("inf")
ni_counter = 0  # counts the amount of epochs no where no improvement has been made

now = datetime.datetime.now()
dt_string = now.strftime("%Y-%m-%d_%H-%M-%S")
models_folder = "models"
model_name = "{}_espcnn_r{}".format(dt_string, r)

try:
    os.mkdir(models_folder + '/' + model_name)
except:
    print("Folder {} already exists, overwritting model data".format(models_folder + '/' + model_name))
model_dest = models_folder + '/' + model_name + "/model_epoch_"
best_model_dest = models_folder + '/' + model_name + "/best_model"
lr = lr_start

best_test_loss = 100000  # start with dummy value, keep track of best loss on test dataset
best_epoch = 0
while True:  # loop over the dataset multiple times
    epoch_loss_train = 0.0
    running_loss_train = 0.0
    for i, data in enumerate(train_dataloader, 0):
        # get the inputs
        inputs, labels = data
        if use_gpu:
            inputs = inputs.cuda()
            labels = labels.cuda()
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs.double())
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        epoch_loss_train += outputs.shape[0] * loss.item()
        running_loss_train += loss.item()
        if i % minibatch_size == minibatch_size - 1:  # print every 2000 mini-batches
            print('[%d, %5d] train_loss: %.5f' %
                  (epoch + 1, i + 1, running_loss_train / minibatch_size))
            running_loss_train = 0.0
    epoch_loss_train = epoch_loss_train / len(train_dataloader.dataset)
    print(epoch + 1, epoch_loss_train)

    epoch_loss_test = 0.0
    running_loss_test = 0.0
    for i, data in enumerate(test_dataloader, 0):  # get loss on test dataset
        # get the inputs
        inputs, labels = data
        if use_gpu:
            inputs = inputs.cuda()
            labels = labels.cuda()

        # forward + backward + optimize
        outputs = net(inputs.double())
        loss = criterion(outputs, labels)

        # print statistics
        epoch_loss_test += outputs.shape[0] * loss.item()
        running_loss_test += loss.item()
        if i % minibatch_size == minibatch_size - 1:  # print every 2000 mini-batches
            print('[%d, %5d] test_loss: %.5f' %
                  (epoch + 1, i + 1, running_loss_test / minibatch_size))
            running_loss_test = 0.0
    epoch_loss_test = epoch_loss_test / len(test_dataloader.dataset)
    print(epoch + 1, epoch_loss_test)

    improvement = best_test_loss - epoch_loss_test

    if epoch_loss_test < best_test_loss:  # save best model, 'best' meaning lowest loss on test set
        best_test_loss = epoch_loss_test
        torch.save(net.state_dict(), best_model_dest)  # overwrite best model so the best model filename doesn't change
        best_epoch = epoch
        best_epoch_train_loss = epoch_loss_train

    print("epoch " + str(epoch + 1) + ": improvement = " + str(improvement))
    if improvement < no_learning_threshold:
        ni_counter += 1
    else:
        ni_counter = 0

    if ni_counter >= repeats:  # stop training if no improvement has been made for 100 epochs
        break

    # If  the improvement is too small, make the learning rate smaller
    if improvement < mu and lr > lr_end:
        lr = lr / 10
        print("Learning rate decreased to:", lr)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

    losses_train.append(epoch_loss_train)
    losses_test.append(epoch_loss_test)
    last_epoch_loss_train = epoch_loss_train
    last_epoch_loss_test = epoch_loss_test

    if epoch % epoch_save_interval == 0:
        torch.save(net.state_dict(), model_dest + str(epoch + 1))
    epoch += 1

end_time = datetime.datetime.now()
print('Finished training at: ' + str(end_time))


Uitleg Metrics

In [None]:
def PSNR(original, compressed):
    mse = np.mean((original - compressed) ** 2)
    if (mse == 0):  # MSE is zero means no noise is present in the signal .
        # Therefore PSNR have no importance.
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse))
    return psnr


def average_PSNR(folder, net, r, gaussianSigma):
    images = []
    for filename in os.listdir(folder):
        img = plt.imread(os.path.join(folder, filename))
        if img is not None:
            img = resize(img, ((img.shape[0] // r) * r, (img.shape[1] // r) * r))
            images.append(img)

    sumPSNR = 0
    for og_img in images:
        img_blurred = gaussian(og_img, sigma=gaussianSigma,
                               multichannel=True)  # multichannel blurr so that 3rd channel is not blurred
        img = resize(img_blurred, (img_blurred.shape[0] // r, img_blurred.shape[1] // r))
        if (len(img.shape) == 2):  # convert image to rgb if it is grayscale
            img = np.stack((img, img, img), axis=2)
            og_img = np.stack((og_img, og_img, og_img), axis=2)
        img = np.transpose(img, (2, 0, 1))
        img = torch.Tensor(img).unsqueeze(0).double()
        result = net(img).detach().numpy()
        sumPSNR += PSNR(PS(result[0], r) * 255, og_img * 255)

    return sumPSNR / len(images)

Output the results and save them to a csv

In [None]:
print('Saving train and test loss')
np.save(models_folder + '/' + model_name + '/loss_train', losses_train)
np.save(models_folder + '/' + model_name + '/loss_test', losses_test)

net.load_state_dict(torch.load(best_model_dest))
net.eval()

net.cpu()
set5_PSNR = average_PSNR("./datasets/testing/Set5", net, r, blur)
set14_PSNR = average_PSNR("./datasets/testing/Set14", net, r, blur)

print("Finished validation \n")

print("dataset:               " + dataset)
print("psnr Set5:             " + str(set5_PSNR))
print("psnr Set14:            " + str(set14_PSNR))
print("best epoch:            " + str(best_epoch))  # epoch with the lowest loss on the test dataset
print("loss on training set:  " + str(best_epoch_train_loss))  # loss for the best epoch
print("loss on test set:      " + str(best_test_loss))  # loss for the best epoch
print("r:                     " + str(r))
print("blur:                  " + str(blur))
print("lr_start:              " + str(lr_start))
print("lr_end:                " + str(lr_end))
print("mu:                    " + str(mu))
print("no_learning_threshold: " + str(no_learning_threshold))
print("epochs:                " + str(epoch + 1))
print("training duration:     " + str(end_time - start_time))
print("batch_size:            " + str(batch_size))
print("train_test_fraction:   " + str(train_test_fraction))
print("model:                 " + model_name)

with open(models_folder + '/' + model_name + '/results.csv', mode='w') as csv_file:
    fieldnames = ['dataset', 'psnr_Set5', 'psnr_Set14', 'best_epoch', 'training_loss', 'test_loss', 'r', 'blur', 'lr_start', 'lr_end', 'mu', 'no_learning_threshold', 'epochs', 'training_duration', 'batch_size', 'train_test_fraction', 'model']
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({
        'dataset': dataset,
        'psnr_Set5': set5_PSNR,
        'psnr_Set14': set14_PSNR,
        'best_epoch': best_epoch,
        'training_loss': best_epoch_train_loss,
        'test_loss': best_test_loss,
        'r': r,
        'blur': blur,
        'lr_start': lr_start,
        'lr_end': lr_end,
        'mu': mu,
        'no_learning_threshold': no_learning_threshold,
        'epochs': (epoch + 1),
        'training_duration': (end_time - start_time),
        'batch_size': batch_size,
        'train_test_fraction': train_test_fraction,
        'model': model_name})

resultaten op een image


In [0]:
_resultaten block_


resultaten op een video


In [0]:
_block met video resultaten_


We succeeded in reproducing the 91 pictures and the high-res video from the researcher requesting this.