In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import zipfile
import os

zip_path = '/content/drive/MyDrive/Dataset.zip'
extract_path = '/content/data/test'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)


In [None]:
# Uninstall piq completely
!pip uninstall -y piq

# Clean up any cached versions
!rm -rf piq

# Clone the latest PIQ with niqe support
!git clone https://github.com/photosynthesis-team/piq.git

# Move into the directory
%cd piq

# Install from source
!pip install . --quiet

# Return to your working directory
%cd ..


[0mCloning into 'piq'...
remote: Enumerating objects: 2289, done.[K
remote: Counting objects: 100% (890/890), done.[K
remote: Compressing objects: 100% (502/502), done.[K
remote: Total 2289 (delta 585), reused 635 (delta 379), pack-reused 1399 (from 1)[K
Receiving objects: 100% (2289/2289), 3.80 MiB | 9.31 MiB/s, done.
Resolving deltas: 100% (1473/1473), done.
/content/piq
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m119.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m95.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m58.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m

In [None]:
from piq import LPIPS
print("✅ NIQE and LPIPS imported successfully.")

✅ NIQE and LPIPS imported successfully.


In [None]:
import os
from math import log10
from math import exp
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
import torchvision.utils as utils
from torchvision.transforms import Compose, RandomCrop, ToTensor, ToPILImage, CenterCrop, Resize
from torch.autograd import Variable
from torch.utils.data import DataLoader
from tqdm import tqdm
from torch import nn
from torch.utils.data.dataset import Dataset
from PIL import Image
import math
from piq import LPIPS

In [None]:
def is_image_file(filename):
    return any(filename.endswith(extension) for extension in ['.png', '.jpg', '.jpeg', '.PNG', '.JPG', '.JPEG'])

def display_transform():
    return Compose([
        ToPILImage(),
        Resize(400),
        CenterCrop(400),
        ToTensor()
    ])



In [None]:
def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size // 2) ** 2 / float(2 * sigma ** 2)) for x in range(window_size)])
    return gauss / gauss.sum()


def create_window(window_size, channel):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = Variable(_2D_window.expand(channel, 1, window_size, window_size).contiguous())
    return window


def _ssim(img1, img2, window, window_size, channel, size_average=True):
    mu1 = F.conv2d(img1, window, padding=window_size // 2, groups=channel)
    mu2 = F.conv2d(img2, window, padding=window_size // 2, groups=channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1 * mu2

    sigma1_sq = F.conv2d(img1 * img1, window, padding=window_size // 2, groups=channel) - mu1_sq
    sigma2_sq = F.conv2d(img2 * img2, window, padding=window_size // 2, groups=channel) - mu2_sq
    sigma12 = F.conv2d(img1 * img2, window, padding=window_size // 2, groups=channel) - mu1_mu2

    C1 = 0.01 ** 2
    C2 = 0.03 ** 2

    ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2))

    if size_average:
        return ssim_map.mean()
    else:
        return ssim_map.mean(1).mean(1).mean(1)


class SSIM(torch.nn.Module):
    def __init__(self, window_size=11, size_average=True):
        super(SSIM, self).__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = create_window(window_size, self.channel)

    def forward(self, img1, img2):
        (_, channel, _, _) = img1.size()

        if channel == self.channel and self.window.data.type() == img1.data.type():
            window = self.window
        else:
            window = create_window(self.window_size, channel)

            if img1.is_cuda:
                window = window.cuda(img1.get_device())
            window = window.type_as(img1)

            self.window = window
            self.channel = channel

        return _ssim(img1, img2, window, self.window_size, channel, self.size_average)


def ssim(img1, img2, window_size=11, size_average=True):
    (_, channel, _, _) = img1.size()
    window = create_window(window_size, channel)

    if img1.is_cuda:
        window = window.cuda(img1.get_device())
    window = window.type_as(img1)

    return _ssim(img1, img2, window, window_size, channel, size_average)

In [None]:
class Generator(nn.Module):
    def __init__(self, scale_factor):
        upsample_block_num = int(math.log(scale_factor, 2))

        super(Generator, self).__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=9, padding=4),
            nn.PReLU()
        )
        self.block2 = ResidualBlock(64)
        self.block3 = ResidualBlock(64)
        self.block4 = ResidualBlock(64)
        self.block5 = ResidualBlock(64)
        self.block6 = ResidualBlock(64)
        self.block7 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
        )
        block8 = [UpsampleBLock(64, 2) for _ in range(upsample_block_num)]
        block8.append(nn.Conv2d(64, 3, kernel_size=9, padding=4))
        self.block8 = nn.Sequential(*block8)

    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        block4 = self.block4(block3)
        block5 = self.block5(block4)
        block6 = self.block6(block5)
        block7 = self.block7(block6)
        block8 = self.block8(block1 + block7)

        return (torch.tanh(block8) + 1) / 2

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.prelu = nn.PReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    def forward(self, x):
        residual = self.conv1(x)
        residual = self.prelu(residual)
        residual = self.conv2(residual)

        return x + residual

class UpsampleBLock(nn.Module):
    def __init__(self, in_channels, up_scale):
        super(UpsampleBLock, self).__init__()
        self.conv = nn.Conv2d(in_channels, in_channels * up_scale ** 2, kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(up_scale)
        self.prelu = nn.PReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.pixel_shuffle(x)
        x = self.prelu(x)
        return x


In [None]:
class TestDatasetFromFolder(Dataset):
    def __init__(self, dataset_dir, upscale_factor):
        super(TestDatasetFromFolder, self).__init__()
        self.lr_path = dataset_dir + '/SRF_' + str(upscale_factor) + '/data/'
        self.hr_path = dataset_dir + '/SRF_' + str(upscale_factor) + '/target/'
        self.upscale_factor = upscale_factor
        self.filenames = sorted([
            f for f in os.listdir(self.lr_path)
            if is_image_file(f) and os.path.exists(os.path.join(self.hr_path, f))
        ])

    def __getitem__(self, index):
        filename = self.filenames[index]
        lr_image = Image.open(os.path.join(self.lr_path, filename)).convert('RGB')
        w, h = lr_image.size
        hr_image = Image.open(os.path.join(self.hr_path, filename)).convert('RGB')
        hr_scale = Resize((self.upscale_factor * h, self.upscale_factor * w), interpolation=Image.BICUBIC)
        hr_restore_img = hr_scale(lr_image)
        return filename, ToTensor()(lr_image), ToTensor()(hr_restore_img), ToTensor()(hr_image)

    def __len__(self):
        return len(self.filenames)

In [None]:
UPSCALE_FACTOR = 4
MODEL_NAME = "netG_epoch_4_162.pth"

results = {'Set5': {'psnr': [], 'ssim': [],'lpips' : []}, 'Set14': {'psnr': [], 'ssim': [],'lpips' : []}, 'BSD100': {'psnr': [], 'ssim': [],'lpips' : []},
           'Urban100': {'psnr': [], 'ssim': [],'lpips' : []}, 'SunHays80': {'psnr': [], 'ssim': [],'lpips' : []}}

results_bicubic = {'Set5': {'psnr': [], 'ssim': [],'lpips' : []}, 'Set14': {'psnr': [], 'ssim': [],'lpips' : []}, 'BSD100': {'psnr': [], 'ssim': [],'lpips' : []},
           'Urban100': {'psnr': [], 'ssim': [],'lpips' : []}, 'SunHays80': {'psnr': [], 'ssim': [],'lpips' : []}}

model = Generator(UPSCALE_FACTOR).eval()
if torch.cuda.is_available():
    model = model.cuda()
model.load_state_dict(torch.load('/content/epochs/' + MODEL_NAME))

lpips_model = LPIPS(reduction='mean')

if torch.cuda.is_available():
    lpips_model = lpips_model.cuda()

test_set = TestDatasetFromFolder('/content/data/test/Dataset', upscale_factor=UPSCALE_FACTOR)
test_loader = DataLoader(dataset=test_set, num_workers=2, batch_size=1, shuffle=False)
test_bar = tqdm(test_loader, desc='[testing benchmark datasets]')

out_path = 'benchmark_results/SRF_' + str(UPSCALE_FACTOR) + '/'
if not os.path.exists(out_path):
    os.makedirs(out_path)

for image_name, lr_image, hr_restore_img, hr_image in test_bar:
    image_name = image_name[0]
    lr_image = Variable(lr_image, volatile=True)
    hr_image = Variable(hr_image, volatile=True)
    with torch.no_grad():
        bicubic_image = F.interpolate(lr_image, scale_factor=UPSCALE_FACTOR, mode='bicubic', align_corners=True)
    if torch.cuda.is_available():
        lr_image = lr_image.cuda()
        hr_image = hr_image.cuda()
        bicubic_image = bicubic_image.cuda()
    sr_image = model(lr_image)
    mse = ((hr_image - sr_image) ** 2).data.mean()
    psnr = 10 * log10(1 / mse)
    lpips_score = lpips_model(sr_image, hr_image)
    calculated_ssim  = ssim(sr_image, hr_image).item()

    mse_bic = ((hr_image - bicubic_image) ** 2).data.mean()
    psnr_bic = 10 * log10(1 / mse_bic)
    lpips_bic = lpips_model(bicubic_image, hr_image)
    ssim_bic = ssim(bicubic_image, hr_image).item()

    test_images = torch.stack(
        [display_transform()(hr_restore_img.squeeze(0)), display_transform()(hr_image.data.cpu().squeeze(0)),
         display_transform()(sr_image.data.cpu().squeeze(0)), display_transform()(bicubic_image.data.cpu().squeeze(0))])
    image = utils.make_grid(test_images, nrow=4, padding=5)
    utils.save_image(image, out_path + image_name.split('.')[0] + '_psnr_sr_%.4f_ssim_sr_%.4f_psnr_bic_%.4f_ssim_bic_%.4f.' % (psnr, calculated_ssim,psnr_bic,ssim_bic) +
                     image_name.split('.')[-1], padding=5)

    # save psnr\ssim
    results[image_name.split('_')[0]]['psnr'].append(psnr)
    results[image_name.split('_')[0]]['ssim'].append(calculated_ssim)
    results[image_name.split('_')[0]]['lpips'].append(lpips_score)

    results_bicubic[image_name.split('_')[0]]['psnr'].append(psnr_bic)
    results_bicubic[image_name.split('_')[0]]['ssim'].append(ssim_bic)
    results_bicubic[image_name.split('_')[0]]['lpips'].append(lpips_bic)

out_path = 'statistics/'
if not os.path.exists(out_path):
    os.makedirs(out_path)
saved_results = {'psnr': [], 'ssim': [],'lpips' : []}
for item in results.values():
    psnr = np.array(item['psnr'])
    calculated_ssim = np.array(item['ssim'])
    if len(item['lpips']) > 0:
        lpips_score = np.array([score.detach().cpu().numpy() for score in item['lpips']])
    else:
        lpips_score = np.array([])
    if (len(psnr) == 0) or (len(calculated_ssim) == 0) or (len(lpips_score) == 0):
        psnr = 'No data'
        calculated_ssim = 'No data'
        lpips_score = 'No data'
    else:
        psnr = psnr.mean()
        calculated_ssim = calculated_ssim.mean()
        lpips_score = lpips_score.mean()
    saved_results['psnr'].append(psnr)
    saved_results['ssim'].append(calculated_ssim)
    saved_results['lpips'].append(lpips_score)

data_frame = pd.DataFrame(saved_results, results.keys())
data_frame.to_csv(out_path + 'srf_' + str(UPSCALE_FACTOR) + '_test_results.csv', index_label='DataSet')


saved_results_bicubic = {'psnr': [], 'ssim': [],'lpips' : []}

for item in results_bicubic.values():
    psnr = np.array(item['psnr'])
    ssim_val = np.array(item['ssim'])
    lpips_score = np.array([score.detach().cpu().numpy() for score in item['lpips']])

    if len(psnr) == 0 or len(ssim_val) == 0 or len(lpips_score) == 0:
        psnr = 'No data'
        ssim_val = 'No data'
        lpips_score = 'No data'
    else:
        psnr = psnr.mean()
        ssim_val = ssim_val.mean()
        lpips_score = lpips_score.mean()

    saved_results_bicubic['psnr'].append(psnr)
    saved_results_bicubic['ssim'].append(ssim_val)
    saved_results_bicubic['lpips'].append(lpips_score)

data_frame_bicubic = pd.DataFrame(saved_results_bicubic, results_bicubic.keys())
data_frame_bicubic.to_csv(out_path + f'srf_{UPSCALE_FACTOR}_bicubic_results.csv', index_label='DataSet')

  lr_image = Variable(lr_image, volatile=True)
  hr_image = Variable(hr_image, volatile=True)
[testing benchmark datasets]: 100%|██████████| 19/19 [00:08<00:00,  2.17it/s]


In [None]:
import shutil

shutil.make_archive('/content/benchmark_results', 'zip', '/content/benchmark_results')

'/content/benchmark_results.zip'