## Split dataset into training and testing

In [4]:
import random

with open('data/data.txt') as f:
    s = f.readlines()
cl = [i for i in s if i[-2]=='0']
bl = [i for i in s if i[-2]=='1']

l = min(len(cl), len(bl))
r = int(l * .8)

strain = random.sample(cl, r) + random.sample(bl, r)
stest = [i for i in s if i not in strain]

with open('data/train.txt', 'w') as f:
    f.writelines(strain)
    
with open('data/test.txt', 'w') as f:
    f.writelines(stest)

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import Adam
from torch.utils.data import Dataset
from torchvision import transforms
from ignite.engine import *
from ignite.metrics import Accuracy, Loss
from PIL import Image

## Construct CNN network for Image Quality Assessment (IQA)

In [2]:
class CNNIQAnet(nn.Module):
    def __init__(self, ker_size=7, n_kers=50, n1_nodes=800, n2_nodes=800):
        super(CNNIQAnet, self).__init__()
        self.conv1  = nn.Conv2d(1, n_kers, ker_size)
        self.fc1    = nn.Linear(2 * n_kers, n1_nodes)
        self.fc2    = nn.Linear(n1_nodes, n2_nodes)
        self.fc3    = nn.Linear(n2_nodes, 2)

    def forward(self, x):
        x  = x.view(-1, x.size(-3), x.size(-2), x.size(-1))  #

        h  = self.conv1(x)

        h1 = F.max_pool2d(h, (h.size(-2), h.size(-1)))
        h2 = -F.max_pool2d(-h, (h.size(-2), h.size(-1)))
        h  = torch.cat((h1, h2), 1)  # max-min pooling
        h  = h.squeeze(3).squeeze(2)

        h  = F.relu(self.fc1(h))
        h  = F.dropout(h)
        h  = F.relu(self.fc2(h))

        q  = F.softmax(self.fc3(h), dim=1)
        return q

## Construct Training Data Loader

In [3]:
class IQADataset(Dataset):
    def __init__(self, purpose='train'):
        to_tensor = transforms.ToTensor()
        
        with open('data/%s.txt' % purpose) as f:
            s = f.readlines()
        s = [i[:-1].split() for i in s]
        path = ['data/' + i[0] for i in s]
        self.label = [int(i[1]) for i in s]
        self.img = [to_tensor(Image.open(i).convert('L')) for i in path] 
        
    def __len__(self):
        return len(self.img)

    def __getitem__(self, idx):
        return self.img[idx], torch.tensor(self.label[idx], dtype=torch.long)

## Start Training Process

In [4]:
model = CNNIQAnet()
device = torch.device("cuda")
    
model = model.to(device)
print(model)

def run(epochs=200, lr=1e-4, weight_decay=0.0):
    train_dataset = IQADataset('train')
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    
    global best_criterion, n_iter, a_loss
    best_criterion = 1000
    n_iter = 0
    a_loss = 0
    
    trainer = create_supervised_trainer(model, optimizer, loss_fn, device=device)
#     evaluator = create_supervised_evaluator(model, metrics={'accuracy': Accuracy(), 'nll': Loss(loss_fn)},
#                                             device=device)

    @trainer.on(Events.ITERATION_COMPLETED)
    def log_training_loss(engine):
        global n_iter, a_loss
        a_loss += engine.state.output
        n_iter += 1
        if (n_iter % 20 == 0):
            print('\titer: %03d - avg_loss: %.5f' % (engine.state.iteration, a_loss/n_iter))

    @trainer.on(Events.EPOCH_COMPLETED)
    def log_training_results(engine):
        global best_criterion, n_iter, a_loss
        global best_epoch
#         evaluator.run(train_loader)
#         metrics = evaluator.state.metrics
#         print("Training Results - Epoch: {}  Avg accuracy: {:.2f} Avg loss: {:.2f}"
#           .format(engine.state.epoch, metrics['accuracy'], metrics['nll']))
        loss = a_loss / n_iter
        print('Epoch: %d - Loss: %.5f' % (engine.state.epoch, loss))
        if loss < best_criterion:
            best_criterion = loss
            best_epoch = engine.state.epoch
            torch.save(model.state_dict(), 'CNNIQA-ep=%03d-loss=%.4f' % (best_epoch, loss))
        a_loss = 0
        n_iter = 0

    trainer.run(train_loader, max_epochs=epochs)
    
run()

CNNIQAnet(
  (conv1): Conv2d(1, 50, kernel_size=(7, 7), stride=(1, 1))
  (fc1): Linear(in_features=100, out_features=800, bias=True)
  (fc2): Linear(in_features=800, out_features=800, bias=True)
  (fc3): Linear(in_features=800, out_features=2, bias=True)
)


## Model Evaluation

In [4]:
device = torch.device("cuda")
model = CNNIQAnet().cuda()
model.load_state_dict(torch.load('CNNIQA-ep=086-loss=0.3690', map_location=device))

data = IQADataset('test')

In [5]:
Y = [i[1].numpy().tolist() for i in data]
y = []
y_score = []
with torch.no_grad():
    for i in range(0, len(data)):
        res = model(data[i][0].cuda())
        v, i = res.max(1)
        y.append(i.cpu().numpy()[0])
        y_score.append(res)

### Accuray (using 0.5 as threshold for this binary classification)

In [6]:
from sklearn.metrics import accuracy_score
accuracy_score(Y, y)

0.9260869565217391

### ROC Score

In [7]:
from sklearn.metrics import roc_auc_score
roc_auc_score(Y, [i[0][1].cpu().numpy().tolist() for i in y_score])

0.9516561331524437