In [1]:
import os
import numpy as np

import torch
torch.manual_seed(29)
from torch import nn
from torch.autograd import Variable
import torch.backends.cudnn as cudnn
import torch.nn.parallel
cudnn.benchmark = True
from torch.utils.data import DataLoader

from torchstat import stat
from utils import  models
from utils.data import dataset_1
from utils.trainer_utils import parfilter

from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

from skimage.measure import compare_psnr, compare_ssim

from matplotlib import pyplot as plt

from glob import glob
from tqdm import tqdm

In [15]:
w, h = I.size

p = 256
s = 240

s_w = np.arange(0, w, 240)
if s_w[-1] + p >= w:
    s_w[-1] = w - p
s_h = np.arange(0, h, 240)
if s_h[-1] + p >= h:
    s_h[-1] = h - p

In [10]:
s_w

array([   0,  240,  480,  720,  960, 1200, 1440, 1680, 1792])

True

In [14]:
np.arange(0, 2048, 240)

array([   0,  240,  480,  720,  960, 1200, 1440, 1680, 1920])

In [15]:
np.arange(0, 1365, 240)

array([   0,  240,  480,  720,  960, 1200])

In [2]:
def image_quality_comp(I, Iout, Enout):
    
    for x in ['test.png', 'test_35.jpg', 'test.flif', 'test1.png', 'test1.flif', 'test_comp.png']: 
        if os.path.exists(x):os.remove(x)
    
    I.save('test.png')
    Iout.save('test_comp.png')
    I.save('test_35.jpg', quality = 35)
    Enout.save('test1.png')
    
    os.system("flif -e test.png test.flif")
    os.system("flif -e test1.png test1.flif")
    
    print('Original Image :: ')
    print('PNG     :: ' + str(os.path.getsize('test.png')/(I.size[0] * I.size[1])))
    print('JPG-35% :: ' + str(os.path.getsize('test_35.jpg')/(I.size[0] * I.size[1])))
    print('FLIF    :: ' + str(os.path.getsize('test.flif')/(I.size[0] * I.size[1])))
    print('Encoded Image :: ')
    print('PNG     :: ' + str(os.path.getsize('test1.png')/(I.size[0] * I.size[1])))
    print('FLIF    :: ' + str(os.path.getsize('test1.flif')/(I.size[0] * I.size[1])))

In [28]:
from numpy import dot, ndarray, array

def yuv2rgb(im, version='SDTV'):
    """
    Convert array-like YUV image to RGB colourspace

    version:
      - 'SDTV':  ITU-R BT.601 version  (default)
      - 'HDTV':  ITU-R BT.709 version
    """
    if not im.dtype == 'uint8':
        raise TypeError('yuv2rgb only implemented for uint8 arrays')

    # clip input to the valid range
    yuv = ndarray(im.shape)  # float64
    yuv[:,:, 0] = im[:,:, 0].clip(16, 235).astype(yuv.dtype) - 16
    yuv[:,:,1:] = im[:,:,1:].clip(16, 240).astype(yuv.dtype) - 128

    if version.upper() == 'SDTV':
        A = array([[1.,                 0.,  0.701            ],
                   [1., -0.886*0.114/0.587, -0.701*0.299/0.587],
                   [1.,  0.886,                             0.]])
        A[:,0]  *= 255./219.
        A[:,1:] *= 255./112.
    elif version.upper() == 'HDTV':
        A = array([[1.164,     0.,  1.793],
                   [1.164, -0.213, -0.533],
                   [1.164,  2.112,     0.]])
    else:
        raise Exception("Unrecognised version (choose 'SDTV' or 'HDTV')")

    rgb = dot(yuv, A.T)
    result = rgb.clip(0, 255).astype('uint8')

    return result


In [29]:
def compress_and_decompress(img, model, src_fldr, dst_fldr):
    I_org = Image.open(img).convert('RGB')
    
    w, h = I_org.size
    I = np.uint8(I_org).copy()
    I = np.pad(I, ((0, int(256*np.ceil(I.shape[0]/256) - I.shape[0])), 
                   (0, int(256*np.ceil(I.shape[1]/256) - I.shape[1])), 
                   (0, 0)), mode = "reflect")
    I = Image.fromarray(I).convert('YCbCr')
    
    
    s_ = 256
    d_ = 64
    
    I1 = np.float32(I)#[:256, :256, :]
    I1 = np.transpose(I1, [2, 0, 1])
    Iout = np.zeros_like(I1)
    for i in range(0, I1.shape[1], s_):
        for j in range(0, I1.shape[2], s_):
            It = torch.from_numpy(np.expand_dims(I1[:, i:i+s_, j:j+s_], 0))/255.0
            X = model(It.cuda())
            X = X.data.squeeze().cpu().numpy()
            Iout[:, i:i+s_, j:j+s_] = np.clip(X, 0, 1)
            
    Iout = np.uint8(255 * Iout.transpose([1, 2, 0]))
    
    Iout = yuv2rgb(Iout, version='SDTV')
    
    Iout = Image.fromarray(Iout).crop((0, 0, w, h))
    
    Iout.save(img.replace(src_fldr, dst_fldr))
    return 0

In [4]:
def perform_compression(I_org, model):
    
    w, h = I_org.size
    I = np.uint8(I_org).copy()
    I = np.pad(I, ((0, int(256*np.ceil(I.shape[0]/256) - I.shape[0])), 
                   (0, int(256*np.ceil(I.shape[1]/256) - I.shape[1])), 
                   (0, 0)), mode = "reflect")
    I = Image.fromarray(I)
    
    
    s_ = 256
    d_ = 64
    
    I1 = np.float32(I)#[:256, :256, :]
    I1 = np.transpose(I1, [2, 0, 1])
    Iout = np.zeros_like(I1)
    Enout = np.zeros((3, I1.shape[1]//4, I1.shape[2]//4))
    for i in range(0, I1.shape[1], s_):
        for j in range(0, I1.shape[2], s_):
            It = torch.from_numpy(np.expand_dims(I1[:, i:i+s_, j:j+s_], 0))/255.0
            X = model(It)
            X = X.data.squeeze().cpu().numpy()
            Iout[:, i:i+s_, j:j+s_] = np.clip(X, 0, 1)
            Xe = model.encode(It)
            Xe = (Xe + 1)/2
            Enout[:, i//4:(i+s_)//4, j//4:(j+s_)//4] = Xe.data.squeeze().cpu().numpy()
    
    
    Iout = np.uint8(255 * Iout.transpose([1, 2, 0]))
    Enout = np.uint8(255 * Enout.transpose([1, 2, 0]))
    
    Iout = Image.fromarray(Iout).crop((0, 0, w, h))
    Enout = Image.fromarray(Enout)
    
    psnr = compare_psnr(np.uint8(I_org).copy(), np.uint8(Iout).copy())
#     ssim = compare_ssim(np.uint8(I_org).copy(), np.uint8(Iout).copy())
    
    print('PSNR-PROP :: ' + "{0:0.02f}".format(psnr))
#     print('SSIM-PROP :: ' + "{0:0.02f}".format(ssim), multichannel=True)
    
    image_quality_comp(I_org, Iout, Enout)
    psnr = compare_psnr(np.uint8(I_org).copy(), np.uint8(Image.open('test_35.jpg')).copy())
    print('PSNR-JPG35 :: ' + "{0:0.02f}".format(psnr))
    
    
    return Iout, Enout

In [5]:
def perform_compression2(I_org, model):
    
    w, h = I_org.size
    I = I_org.crop((0, 0, 256*(int(np.ceil(w/256))), 256*(np.ceil(h/256))))
    
    s_ = 256
    d_ = 64
    
    I1 = np.float32(I)#[:256, :256, :]
    I1 = np.transpose(I1, [2, 0, 1])
    Iout = np.zeros_like(I1)
    Enout = np.zeros((I1.shape[1]//2, I1.shape[2]//2))
    for i in range(0, I1.shape[1], s_):
        for j in range(0, I1.shape[2], s_):
            It = torch.from_numpy(np.expand_dims(I1[:, i:i+s_, j:j+s_], 0))/255.0
            X = model(It)
            Iout[:, i:i+s_, j:j+s_] = np.clip(X.data.squeeze().cpu().numpy(), 0, 1)
            Xe = model.encode(It)
            Xe = nn.functional.pixel_shuffle((Xe + 1)/2, 2)
            Enout[i//2:(i+s_)//2, j//2:(j+s_)//2] = Xe.data.squeeze().cpu().numpy()
    
    
    Iout = np.uint8(255 * Iout.transpose([1, 2, 0]))
    Enout = np.uint8(255 * Enout)
    
    Iout = Image.fromarray(Iout).crop((0, 0, w, h))
    Enout = Image.fromarray(Enout)
    
    psnr = compare_psnr(np.uint8(I_org).copy(), np.uint8(Iout).copy())
#     ssim = compare_ssim(np.uint8(I_org).copy(), np.uint8(Iout).copy(), multichannel=True)
    
    print('PSNR-PROP :: ' + "{0:0.02f}".format(psnr))
#     print('SSIM-PROP :: ' + "{0:0.02f}".format(ssim))
    image_quality_comp(I_org, Iout, Enout)
    
    psnr = compare_psnr(np.uint8(I_org).copy(), np.uint8(Image.open('test_35.jpg')).copy())
    print('PSNR-JPG35 :: ' + "{0:0.02f}".format(psnr))
    
    
    return Iout, Enout

In [2]:
img_file = "/media/narsi/LargeData/DATASETS/superrez/clic_2019/professional_valid/valid/jared-erondu-21325.png"

In [3]:
I = Image.open(img_file).convert("RGB")

In [30]:
model = models.QuantACTShuffleV6()
check_point_file = "/media/narsi/LargeData/SP_2020/compressACT/weights/"+\
"QuantACTShuffleV6_exp01_YCbCr/model_best.pth.tar"
checkpoint = torch.load(check_point_file)
model.load_state_dict(checkpoint['state_dict'], strict = False)

<All keys matched successfully>

In [None]:
M = models.QuantACTShuffleV3()
check_point_file = "/media/narsi/LargeData/SP_2020/compressACT/weights/QuantACTShuffleV3_exp02/model_best.pth.tar"
checkpoint = torch.load(check_point_file)
M.load_state_dict(checkpoint['state_dict'], strict = False)

model = models.CleanImg(M)
check_point_file = "/media/narsi/LargeData/SP_2020/compressACT/weights/CleanImg_exp01/model_best.pth.tar"
checkpoint = torch.load(check_point_file)
model.load_state_dict(checkpoint['state_dict'], strict = False)

In [None]:
Iout, Enout = perform_compression(I, model)

In [8]:
model = models.QuantACTShuffleV6()
model.cuda()
%timeit model.encode(torch.randn(8, 3, 256, 256).cuda())
%timeit model.decode(torch.randn(8, 3, 64, 64).cuda())

8.35 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
12.7 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [9]:
model = models.QuantACTShuffleV6()
%timeit model.encode(torch.randn(8, 3, 256, 256))
%timeit model.decode(torch.randn(8, 3, 64, 64))

82.6 ms ± 295 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
374 ms ± 2.21 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
src_fldr = "/media/narsi/LargeData/DATASETS/superrez/clic_2019/professional_valid/valid"
dst_fldr = "/media/narsi/LargeData/DATASETS/superrez/clic_2019/professional_valid/model_test"

imgs = glob(src_fldr + os.sep + "*.png")

In [31]:
model.cuda()
print('.')

.


In [33]:
for img in tqdm(imgs):
    compress_and_decompress(img, model, src_fldr, dst_fldr)

100%|██████████| 41/41 [00:49<00:00,  1.21s/it]


In [34]:
compressed_imgs = [img.replace(src_fldr, dst_fldr) for img in imgs]

In [35]:
from utils.competition_eval import evaluate2

In [36]:
results = evaluate2(compressed_imgs, imgs)

41it [01:32,  2.26s/it]


In [37]:
results

{'PSNR': 25.887533405937198, 'MSSSIM': 0.9490569508571586}

In [18]:
results

{'PSNR': 29.974277397745475, 'MSSSIM': 0.9699624670205289}