# Imports

In [1]:
# General imports
import os
import sys
import random 
import functools

import cv2
import numpy as np
import torch
import torch.nn as nn

import torchvision
import torchvision.transforms as transforms
from tqdm.notebook import tqdm

import matplotlib.pyplot as plt 
import albumentations as albu
import albumentations.pytorch as albu_pt
import photosynthesis_metrics as pm

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container {width:100% !important;}</style>"))

# Fix to be able to import python modules inside a notebook
os.chdir('..')

# Useful extensions
%load_ext watermark
%watermark -v -n -m -p numpy,torch,albumentations,photosynthesis_metrics

# %load_ext autoreload
# %autoreload 2

# Nice plot formating
%matplotlib inline

Mon Jun 08 2020 

CPython 3.6.9
IPython 7.8.0

numpy 1.17.0
torch 1.5.0
albumentations 0.4.5
photosynthesis_metrics 0.3.0

compiler   : GCC 8.4.0
system     : Linux
release    : 4.15.0-99-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 16
interpreter: 64bit


# TID2013 dataset

1. Парные сравнения изображений с оригинальными версиями для попиксельных метрик
2. Дробить на множество маленьких кусочков для distribution метрик
3. Выдавать только distorted изображения для no-reference метрик

In [3]:
# Task specific imports
from src.utils import walk_files
import imageio

In [4]:
class TID2013(torch.utils.data.Dataset):
    """
    Total length = 120 (3000 / 25)
    Args:
        root (str) – Root directory path.
        train (bool): Flag to return train if True and validation if False
        transform (callable) – A function/transform that takes in the input and transforms it.
        
    Returns:
        distorted: 25 images with fixed distortion type and level
        reference: 25 original images
    """
    _filename = "/mos_with_names.txt"
    
    def __init__(
        self, root="data/raw/tid2013", transform=None):
        
        reference_walker = walk_files(
            root + "/reference_images", suffix=(".bmp", ".BMP"), prefix=True, remove_suffix=False
        )
        self.reference_files = sorted(list(reference_walker))    
    
        # Read file mith MOS and names
        with open(root + self._filename) as f:
            lines = f.readlines()

        scores, self.distorted_files = [], []
        
        for line in lines:
            score, name = line.split(' ')
            scores.append(float(score))
            self.distorted_files.append(root + "/distorted_images/" + name[:-1])
        
        self.scores = np.array(scores)
        
        if transform is None:
            self.transform = albu_pt.ToTensorV2()
        else:
            self.transform = transform

    def __getitem__(self, index):
        step = int(len(self.distorted_files) / len(self.reference_files)) # 120
        distorted_files = self.distorted_files[index::step]
        assert len(distorted_files) == len(self.reference_files)
        
        distorted_images, reference_images = [], []
        for i in range(len(distorted_files)):
            # Load image and ref
            distorted = imageio.imread(distorted_files[i])
            distorted = self.transform(image=distorted)["image"]
            
            reference = imageio.imread(self.reference_files[i])
            reference = self.transform(image=reference)["image"]
        
            distorted_images.append(distorted.unsqueeze(0))
            reference_images.append(reference.unsqueeze(0))
            
        distorted_images = torch.cat(distorted_images, dim=0)
        reference_images = torch.cat(reference_images, dim=0)

        scores = self.scores[index::step]
        return distorted_images, reference_images, scores

    def __len__(self):
        return int(len(self.distorted_files) / len(self.reference_files))


In [5]:
# Test dataset

transform = albu.Compose([
#     albu.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), # to [-1, 1]
    albu.Normalize(mean=[0., 0., 0.], std=[1., 1., 1.]), # to [0, 1]
    albu_pt.ToTensorV2(),
])

dataset = TID2013(transform=transform)
distorted_images, reference_images, scores = dataset[0]

# Predict on images and compute score

1. Prediction in form 1, 121, 241, ...
2. want to be 1, 2, 3, 4, 5, 6, 

In [7]:
# MOS scores ordered by image name
mos_path = "data/raw/tid2013/mos.txt"
# Read file mith MOS
with open(mos_path) as f:
    mos_scores = f.readlines()
mos_scores = [float(score) for score in mos_scores]

In [18]:
r = [torch.rand(4), torch.rand(4), torch.rand(4)]
r

[tensor([0.0306, 0.0226, 0.4322, 0.7475]),
 tensor([0.8428, 0.0030, 0.8655, 0.2265]),
 tensor([0.4773, 0.7080, 0.8092, 0.9406])]

AttributeError: 'list' object has no attribute 'flatten'

In [25]:
torch.stack(r).t().flatten()

tensor([0.0306, 0.8428, 0.4773, 0.0226, 0.0030, 0.7080, 0.4322, 0.8655, 0.8092,
        0.7475, 0.2265, 0.9406])

In [27]:
# Abstract iterator 
metric_scores = []
for i in tqdm(range(len(dataset))):
    distorted_images, reference_images, true_scores = dataset[0]
#     predicted_scores = metric(distorted_images, reference_images)
    predicted_scores = torch.tensor(true_scores)
    metric_scores.append(predicted_scores)

# Transpose to flatten in column-wise order
metric_scores = torch.stack(metric_scores, dim=0).t().flatten()

HBox(children=(FloatProgress(value=0.0, max=120.0), HTML(value='')))




In [None]:
def crop_patches(images: torch.Tensor, size=64, stride=32):
    """Crop input images into smaller patches
    Args:
        images: Tensor of images with shape (batch x 3 x H x W)
        size: size of a square patch
        stride: Step between patches
    """
    shape = images.shape
    patches = images.data.unfold(1, 3, 3).unfold(2, size, stride).unfold(3, size, stride)
    patches = patches.reshape(-1, 3, size, size)
    return patches

# Logistic regression

In [31]:
class Regression(nn.Module):
    def __init__(self):
        super().__init__()
        self.b1 = nn.Parameter(torch.ones(1))# * 1e-3)
        self.b2 = nn.Parameter(torch.ones(1)) #* 1e-3)
        self.b3 = nn.Parameter(torch.ones(1)) # * 1e-3)
        self.b4 = nn.Parameter(torch.ones(1)) # * 1e-3)
        self.b5 = nn.Parameter(torch.zeros(1))
    
    def forward(self, predicted_scores):
        adjusted = self.b1 * (0.5 - 1 / (1 + torch.exp(self.b2 * (predicted_scores - self.b3)))) + \
            predicted_scores * self.b4 + self.b5
        return adjusted

def pearson_correlation(x, y, invert=False):
    vx = x - torch.mean(x)
    vy = y - torch.mean(y)

    corr = torch.sum(vx * vy) / (torch.sqrt(torch.sum(vx ** 2)) * torch.sqrt(torch.sum(vy ** 2)))
    if invert: 
        return 1 - corr
    return corr

In [32]:
import numpy as np
import scipy.stats
from scipy.stats import pearsonr, spearmanr, kendalltau

x = np.arange(10, 20)
y = np.array([2, 1, 4, 5, 8, 12, 18, 25, 96, 48])
x_t = torch.tensor(x, dtype=torch.float32)
y_t = torch.tensor(y, dtype=torch.float32)

r = np.corrcoef(x, y)[0, 1]
r_t = pearson_correlation(x_t, y_t)
print(r, r_t)

print(scipy.stats.pearsonr(x, y)[0])    # Pearson's r

print(scipy.stats.spearmanr(x, y)[0])   # Spearman's rho

print(scipy.stats.kendalltau(x, y)[0])  # Kendall's tau



0.7586402890911867 tensor(0.7586)
0.758640289091187
0.9757575757575757
0.911111111111111


In [None]:
# 1. Load MOS score
# 2. Load score for FSIMc
# 3. Fit regression using MSE loss
# 4. Fit regression using PLCC loss
# 5. Compute PLCC, SRCC in both cases
# 6. Compare with values from the paper

learningRate = 0.01
epochs = 10000
    
metric_names = ["PSNRHVSM", "VIFP", "MSSIM", 
#                 "WSNR", "PSNRHMA", "FSIM", "SSIM", "PSNRc", "PSNR", "FSIMc", 
#                "PSNRHVS", "PSNRHA", "VSNR", "NQM"
               ]


for metric in metric_names:
    done = False
    metric_path = "data/raw/tid2013/metrics_values/" + metric + ".txt"
    # Read metric score
    with open(metric_path) as f:
        metric_scores = f.readlines()

    metric_scores = [float(score) for score in metric_scores]

    model = Regression()
#     criterion = torch.nn.MSELoss() 
    criterion = functools.partial(pearson_correlation, invert=True)
    optimizer = torch.optim.SGD(model.parameters(), lr=learningRate)
#     optimizer = torch.optim.AdamW(model.parameters(), lr=learningRate)
    # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=100, cooldown=50)
    scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=learningRate, max_lr=0.05)

    prediction = torch.tensor(metric_scores)
    target = torch.tensor(mos_scores)

    # Fit regression
    for epoch in range(epochs + 1):
        if done:
            continue
        # Clear gradient 
        optimizer.zero_grad()

        # get output from the model, given the inputs
        outputs = model(prediction)

        # Get loss for the predicted output
        loss = criterion(outputs, target)
        if torch.isnan(loss):
            print("NaN in loss!")
            done = True
            continue
            
        # get gradients w.r.t to parameters
        loss.backward()

        # update parameters
        optimizer.step()
        scheduler.step(loss)
        if epoch % 5000 == 0:
            print(f'{metric} ({epoch}): {loss.item()}')
    
    if done:
        continue
    # Compute metrics:
    x = outputs.detach().numpy()
    y = mos_scores
    print(f"{metric}: PLCC {pearsonr(x, y)[0]:0.3f}, SRCC {spearmanr(x, y)[0]:0.3f}", 
          f"KRCC {kendalltau(x, y)[0]:0.3f}")

PSNRHVSM (0): 1.0107723474502563
NaN in loss!
VIFP (0): 0.3845893144607544
VIFP (5000): 0.24854415655136108
VIFP (10000): 0.2481238842010498
VIFP: PLCC 0.752, SRCC 0.658 KRCC 0.481
MSSIM (0): 0.2195185422897339
MSSIM (5000): 0.16958576440811157


In [None]:
lines = """VIFP: PLCC 0.752 SRCC 0.658 KRCC 0.481
MSSIM PLCC 0.831 SRCC 0.787 KRCC 0.608
PSNRHMA PLCC 0.799 SRCC 0.813 KRCC 0.632
FSIM PLCC 0.854 SRCC 0.801 KRCC 0.630
SSIM PLCC 0.684 SRCC 0.637 KRCC 0.464
PSNRc PLCC 0.660 SRCC 0.687 KRCC 0.496
FSIMc PLCC 0.870 SRCC 0.851 KRCC 0.667
PSNRHA PLCC 0.796 SRCC 0.819 KRCC 0.643"""
result = {}
plcc = []
srcc = []
krcc = []
for line in lines.split("\n"):
    split = line.split(" ")
    plcc.append((split[0], float(split[2])))
    srcc.append((split[0], float(split[4])))
    krcc.append((split[0], float(split[6])))
    
    result[split[0]] = (float(split[2]), float(split[4]), float(split[6]))
    
result
idx = 1
print("Посчитанные мной")
for pair in sorted(plcc, key = lambda x: x[1], reverse=True):
    print(idx, pair[0], pair[1])
    idx += 1
    

In [None]:
Данные из статьи
1  FSIMc   0.851        
 2  PSNR-HA 0.819
 3  PSNR-HMA 0.813        
 4  FSIM 0.801     
 5  MSSIM   0.787         
 6  PSNRc   0.687        
 7 SSIM 0.637                                        
 8 VIFP 0.608                     

In [None]:
# Plot graph: Ox - metric value, Oy - MOS. Blue cross - image in dataset. Black line - fitted logregression
plt.plot(metric_scores, mos_scores, "+")
x = np.arange(0.0, 1.0, 0.005)
y = model(torch.tensor(x))
plt.plot(x, y.detach().numpy())

In [None]:
FSIMc
MSE
epoch 9000, loss 0.4084590971469879
tensor(0.8559) 

PLCC
epoch 10000, loss 0.14436256885528564
tensor(0.8556)

VIFP
PLCC
tensor(0.7530)

MSE
epoch1000 loss 0.6762036681175232
tensor(0.7519)

# Predict PM lib scores, fit regression, get ranking

# Garbage

In [None]:
class HDRLoss(nn.Module):
    """High dynamic range loss."""

    def __init__(self, eps=0.01):
        """Initializes loss with numerical stability epsilon."""

        super(HDRLoss, self).__init__()
        self._eps = eps


    def forward(self, denoised, target):
        """Computes loss by unpacking render buffer."""

        loss = ((denoised - target) ** 2) / (denoised + self._eps) ** 2
        return loss.mean(dim=[1,2,3])