<a href="https://colab.research.google.com/github/RuihangZhao/A-SR-preprocessing-module/blob/main/0820_lighter_network(control%20group%2C%20using%20bicubic).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Control group: Our custom classification network with bicubic interpolation as preprocessing SR module
==============================================

This Jupyter code trains our smaller classification network on Cifar-10 with bicubic interpolation (it's default method as used in image resizing, so we didn't write specific codes to achieve this function) as the control group of image super resolution.

We have split the original Cifar-10 dataset into train/val/test subsets and upload them in "Dataset" folder. You need to unzip it and put it in the same directory when you run this code.

In this code we comment out the codes for execution in Google Colab environment and assume desktop environment. Some modules work when you want to start the training from previous epochs, so you need to pay attention to which modules you should use for your own task.


In [None]:
%matplotlib inline

Dependencies
-------------

In [None]:
from torch.autograd import Variable
import numpy as np
import tensorflow as tf
import time, math, glob
import scipy.io as sio
import torch
import torch.nn as nn
from math import sqrt
import argparse, os
import torch
import random
import torch.backends.cudnn as cudnn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.utils.data as data
import h5py

from __future__ import print_function, division

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy

import numpy as np
import torch
import torchvision.datasets as dsets
import torchvision.transforms as transforms
from PIL.ImageOps import colorize


Colab Drive mount
--------

The codes below for Colab drive use are commented. Use them when you want to run this in Colab Environment.

In [None]:
# from google.colab import drive
import sys
# drive.mount('/content/drive')
# sys.path.append('/content/drive/MyDrive/modules')

Mounted at /content/drive


Load Data
--------

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(64),
        transforms.RandomRotation(20),
        transforms.ColorJitter(),
        # transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(64),
        # transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(64),
        # transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

working_dir = os.getcwd()
data_dir = os.path.join(working_dir, 'Cifar10_split')
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val', 'test']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=16,
                                             shuffle=True, num_workers=8, drop_last=True)
              for x in ['train', 'val', 'test']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

trainloaders = dataloaders['train']
valloaders = dataloaders['val']
testloaders = dataloaders['test']

  cpuset_checked))


In [None]:
from torch.utils.tensorboard import SummaryWriter

# default `log_dir` is "runs" - we'll be more specific here
model_name = '0820_pytorch_bicubic_lighter_network_epoch=50_lr=adjusted'
writer = SummaryWriter(os.path.join(working_dir, 'tensorboard', model_name))

Network Structure
---------

In [None]:
BATCH_SIZE = 10
LEARNING_RATE = 0.01
EPOCH = 50
N_CLASSES = 25

def conv_layer(chann_in, chann_out, k_size, p_size):
    layer = nn.Sequential(
        nn.Conv2d(chann_in, chann_out, kernel_size=k_size, padding=p_size),
        nn.BatchNorm2d(chann_out),
        nn.ReLU()
    )
    return layer


def vgg_conv_block(in_list, out_list, k_list, p_list, pooling_k, pooling_s):

    layers = [conv_layer(in_list[i], out_list[i], k_list[i], p_list[i]) for i in range(len(in_list))]
    layers += [nn.MaxPool2d(kernel_size=pooling_k, stride=pooling_s)]
    return nn.Sequential(*layers)


def vgg_fc_layer(size_in, size_out):
    layer = nn.Sequential(
        nn.Linear(size_in, size_out),
        nn.BatchNorm1d(size_out),
        nn.ReLU()
    )
    return layer


class SDCNN(nn.Module):
    def __init__(self, n_classes=10):
        super(SDCNN, self).__init__()

        # Conv blocks (BatchNorm + ReLU activation added in each block)
        self.layer1 = vgg_conv_block([3,32], [32,32], [3,3], [1,1], 2, 2)
        self.layer2 = vgg_conv_block([32,64], [64,64], [3,3], [1,1], 2, 2)

        # FC layers
        self.layer3 = vgg_fc_layer(16*16*64, 4096)  # 4096->smaller

        # Final layer
        self.layer4 = nn.Linear(4096, 10)

    def forward(self, x):
        # Start from here
        out = self.layer1(x)
        features = self.layer2(out)
        out = features.view(out.size(0), -1)
        out = self.layer3(out)
        out = self.layer4(out)

        return features, out

In [None]:
CN = SDCNN().to(device)
cost = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(CN.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)

Load Previous Training Stage (if interrupted)
---------------

In [None]:
CN.load_state_dict(torch.load(os.path.join(working_dir, 'trained_models', model_name , 'model_weights_epoch=46.pth')))

<All keys matched successfully>

Train the model
------

In [None]:
# the epoch will be declined to 1/10 every 10 epoch
def adjust_learning_rate(optimizer, epoch):
    """Sets the learning rate to the initial LR decayed by 10 every 10 epochs"""
    lr = 0.01 * (0.1 ** (epoch // 10))
    return lr


for epoch in range(EPOCH):
    print('Epoch: ', epoch)
    CN.train()
    running_loss = 0.0
    running_corrects = 0
    avg_loss = 0
    for images, labels in trainloaders:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        _, outputs = CN(images)
        _, preds = torch.max(outputs, 1)
        loss = cost(outputs, labels)
        avg_loss += float(loss.data)
        running_corrects += torch.sum(preds == labels.data)
        running_loss += loss.item() * images.size(0)
        loss.backward()
        optimizer.step()
    
    scheduler.step(avg_loss)
    epoch_loss = running_loss / dataset_sizes['train']
    epoch_acc = running_corrects.double() / dataset_sizes['train']
    print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                'train', epoch_loss, epoch_acc))
    writer.add_scalar('training loss',epoch_loss,epoch)
    writer.add_scalar('training accuracy',epoch_acc,epoch)
    
    # Save the model
    torch.save(CN.state_dict(), os.path.join(working_dir, 'trained_models', model_name , 'model_weights_epoch=' + str(epoch) + '.pth'))
    
    CN.eval()
    running_loss = 0.0
    running_corrects = 0
    with torch.no_grad():
      for images, labels in valloaders:        
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        _, outputs = CN(images)
        _, preds = torch.max(outputs, 1)
        loss = cost(outputs, labels)
        running_corrects += torch.sum(preds == labels.data)
        running_loss += loss.item() * images.size(0)

        
      # scheduler.step()
      epoch_loss = running_loss / dataset_sizes['val']
      epoch_acc = running_corrects.double() / dataset_sizes['val']
      print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                'val', epoch_loss, epoch_acc))
      writer.add_scalar('validation loss',epoch_loss,epoch)
      writer.add_scalar('validation accuracy',epoch_acc,epoch)
    
    
      lr = adjust_learning_rate(optimizer, epoch-1)

      for param_group in optimizer.param_groups:
          param_group["lr"] = lr

Test the model
---------

In [None]:
CN.eval()
correct = 0
total = 0
for images, labels in testloaders:
    _, outputs = CN(images)
    _, predicted = torch.max(outputs, 1)
    total += labels.size(0)
    correct += (predicted.cpu() == labels).sum()
    print(predicted, labels, correct, total)
    print("avg acc: %f" % (100* correct/total))