In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.utils import make_grid
import torchvision.utils as vutils
import matplotlib.animation as animation
from IPython.display import HTML

import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import pandas as pd
import copy
import time
import cv2 as cv
from tqdm import tqdm_notebook as tqdm
import matplotlib.image as mpimg

import torchvision.transforms.functional as TF

In [None]:
img_dir = r'/kaggle/input/kodak-data-jpg/kodak_data_jpg//'
img_list = os.listdir(img_dir)
print(len(img_list))
valid_ratio = 0.8

In [None]:
class ConvertToRGB: 
    def __call__(self, image): 
        if image.mode != 'RGB': 
            image = image.convert('RGB') 
        return image


class ImageData(Dataset):
    def __init__(self,is_train=True):
        self.is_train = is_train
        self.transform = transforms.Compose([ConvertToRGB(),
                                             transforms.Resize((218, 178)),
                                             #transforms.Grayscale(num_output_channels=3),
                                             transforms.ToTensor(),])
        self.train_index = int(valid_ratio * len(img_list))
        self.crop = transforms.CenterCrop((218,178))
    def __len__(self):
        if self.is_train:
            return self.train_index
        else:
            return len(img_list) - self.train_index -1
    def __getitem__(self, index):
        if not self.is_train:
            index = self.train_index + index
#         print("hey  "*4 + str(index))
        img = mpimg.imread(img_dir+img_list[index])
        img = self.crop(TF.to_pil_image(img))
        img = self.transform(img)
        img = (img-0.5) /0.5
#         img = (img - 255.0) / 255.0
        return img

In [None]:
batch_size=5
dataset = ImageData(is_train=False)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
device = 'cuda'

In [None]:
a = next(iter(dataloader))
print(a[0].shape)
img = a[3]
img = img *0.5 + 0.5
plt.imshow(img.permute(1,2,0))

In [None]:
# custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
IMG_WIDTH = 178
IMG_HEIGHT = 218
latent_size = 200

In [None]:
num_images_to_show = 24


valid_dataset = ImageData(is_train=False)
batch_size = num_images_to_show
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
valid_batch = next(iter(valid_dataloader)).to(device)
valid_batch_1 = next(iter(valid_dataloader)).to(device)


In [None]:
# Encoder Model
class Encoder(nn.Module):
    def __init__(self,num_channels_in_encoder):
        super(Encoder, self).__init__()
        
        # ENCODER

        # 64x64x64
        self.e_conv_1 = nn.Sequential(
            nn.ZeroPad2d((1, 2, 1, 2)),
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(5, 5), stride=(2, 2)),nn.LeakyReLU()
        )

        # 128x32x32
        self.e_conv_2 = nn.Sequential(
            nn.ZeroPad2d((1, 2, 1, 2)),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(5, 5), stride=(2, 2)),
            nn.LeakyReLU()
        )
        
        # 128x32x32
        self.e_block_1 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x32x32
        self.e_block_2 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x32x32
        self.e_block_3 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 32x32x32
        self.e_conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=num_channels_in_encoder, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)),
            nn.Tanh()
        )
    def forward(self, x):
        ec1 = self.e_conv_1(x)
        ec2 = self.e_conv_2(ec1)
        eblock1 = self.e_block_1(ec2) + ec2
        eblock2 = self.e_block_2(eblock1) + eblock1
        eblock3 = self.e_block_3(eblock2) + eblock2
        ec3 = self.e_conv_3(eblock3)  # in [-1, 1] from tanh activation
        return ec3

In [None]:
device

In [None]:
# Generator / Decoder Model

class Generator(nn.Module):
    def __init__(self,num_channels_in_encoder):
        super(Generator, self).__init__()
        
        # DECODER
#         self.latent_fc1 = nn.Sequential(
#             nn.Linear(latent_size,1000),
#             nn.Sigmoid(),
#         )
#         self.latent_fc2 = nn.Sequential(
#             nn.Linear(1000,54*44),
#             nn.Sigmoid(),
#         )
        # 128x64x64
        self.d_up_conv_1 = nn.Sequential(
        nn.Conv2d(in_channels=num_channels_in_encoder, out_channels=64, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.ConvTranspose2d(in_channels=64, out_channels=128, kernel_size=(2, 2), stride=(2, 2))
        )

        # 128x64x64
        self.d_block_1 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x64x64
        self.d_block_2 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x64x64
        self.d_block_3 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 256x128x128
        self.d_up_conv_2 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=32, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.ConvTranspose2d(in_channels=32, out_channels=256, kernel_size=(2, 2), stride=(2, 2))
        )

        # 3x128x128
        self.d_up_conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=16, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ReflectionPad2d((3, 3, 3, 3)),
            nn.Conv2d(in_channels=16, out_channels=3, kernel_size=(3, 3), stride=(1, 1)),
            nn.Tanh()
        )

        
        
    def forward(self, x):
        uc1 = self.d_up_conv_1(x)
        dblock1 = self.d_block_1(uc1) + uc1
        dblock2 = self.d_block_2(dblock1) + dblock1
        dblock3 = self.d_block_3(dblock2) + dblock2
        uc2 = self.d_up_conv_2(dblock3)
        dec = self.d_up_conv_3(uc2)
        return dec

In [None]:
num_channels_in_encoder = 18
netE28 = Encoder(num_channels_in_encoder).to(device)
netE28.apply(weights_init)

num_channels_in_encoder = 18
netG28 = Generator(num_channels_in_encoder).to(device)
netG28.apply(weights_init)

netG28.load_state_dict(torch.load("/kaggle/input/gan-model/netG18.model"))
netE28.load_state_dict(torch.load("/kaggle/input/gan-model/netE18.model"))


netE28.eval()
netG28.eval()


reconstructed_img_28 = netG28(netE28(valid_batch))
reconstructed_img_28_1 = netG28(netE28(valid_batch_1))




del netE28
del netG28
torch.cuda.empty_cache()

In [None]:
num_channels_in_encoder = 16
netG16 = Generator(num_channels_in_encoder).to(device)
netG16.apply(weights_init)

num_channels_in_encoder = 16
netE16 = Encoder(num_channels_in_encoder).to(device)
netE16.apply(weights_init)

netG16.load_state_dict(torch.load("/kaggle/input/trained-image-compressionmodels/netG16.model"))
netE16.load_state_dict(torch.load("/kaggle/input/trained-image-compressionmodels/netE16.model"))


netE16.eval()
netG16.eval()
reconstructed_img_16 = netG16(netE16(valid_batch))
reconstructed_img_16_1 = netG16(netE16(valid_batch_1))

del netE16
del netG16
torch.cuda.empty_cache()

In [None]:
num_channels_in_encoder = 8
netE8 = Encoder(num_channels_in_encoder).to(device)
netE8.apply(weights_init)


netG8 = Generator(num_channels_in_encoder).to(device)
netG8.apply(weights_init)


netG8.load_state_dict(torch.load("/kaggle/input/trained-image-compressionmodels/netG8.model"))
netE8.load_state_dict(torch.load("/kaggle/input/trained-image-compressionmodels/netE8.model"))


netG8.eval()
netE8.eval()

reconstructed_img_8 = netG8(netE8(valid_batch))
reconstructed_img_8_1 = netG8(netE8(valid_batch_1))

del netE8
del netG8
torch.cuda.empty_cache()

In [None]:

f, axarr = plt.subplots(4,4)

axarr[0,0].title.set_text('Original \n Image')
axarr[0,1].title.set_text('Reconstructed Image with \n 43% Compression')
axarr[0,2].title.set_text('Reconstructed Image with \n 68% Compression')
axarr[0,3].title.set_text('Reconstructed Image with \n 84% Compression')

for i in range(4):
    axarr[0,i].title.set_fontsize(15)

for i in range(4):
    axarr[i,0].imshow((valid_batch[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5)
    axarr[i,1].imshow((reconstructed_img_28[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5)
    axarr[i,2].imshow((reconstructed_img_16[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5)
    axarr[i,3].imshow((reconstructed_img_8[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5)
    f.set_figheight(20)
    f.set_figwidth(20)
plt.show()

In [None]:
plt.savefig('results.png')
f.savefig('results.png')

In [None]:
f, axarr = plt.subplots(2,2)

axarr[0,0].title.set_text('Original \n Image')
axarr[0,1].title.set_text('Reconstructed Image with \n 43% Compression')
axarr[1,0].title.set_text('Reconstructed Image with \n 68% Compression')
axarr[1,1].title.set_text('Reconstructed Image with \n 84% Compression')

for i in range(2):
    for j in range(2):
        axarr[i,j].title.set_fontsize(40)
i = 0


reimg = (valid_batch_1[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5
reimg_28 = (reconstructed_img_28_1[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5
reimg_16 = (reconstructed_img_16_1[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5
reimg_8 = (reconstructed_img_8_1[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5



axarr[0,0].imshow(reimg)
axarr[0,1].imshow(reimg_28)
axarr[1,0].imshow(reimg_16)
axarr[1,1].imshow(reimg_8)
f.set_figheight(50)
f.set_figwidth(50)
plt.show()

In [None]:
plt.savefig('results1.png')
f.savefig('results1.png')

In [None]:
import torch
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
from math import exp

def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size//2)**2/float(2*sigma**2)) for x in range(window_size)])
    return gauss/gauss.sum()

def create_window(window_size, channel):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = Variable(_2D_window.expand(channel, 1, window_size, window_size).contiguous())
    return window

def _ssim(img1, img2, window, window_size, channel, size_average = True):
    mu1 = F.conv2d(img1, window, padding = window_size//2, groups = channel)
    mu2 = F.conv2d(img2, window, padding = window_size//2, groups = channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1*mu2

    sigma1_sq = F.conv2d(img1*img1, window, padding = window_size//2, groups = channel) - mu1_sq
    sigma2_sq = F.conv2d(img2*img2, window, padding = window_size//2, groups = channel) - mu2_sq
    sigma12 = F.conv2d(img1*img2, window, padding = window_size//2, groups = channel) - mu1_mu2

    C1 = 0.01**2
    C2 = 0.03**2

    ssim_map = ((2*mu1_mu2 + C1)*(2*sigma12 + C2))/((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2))

    if size_average:
        return ssim_map.mean()
    else:
        return ssim_map.mean(1).mean(1).mean(1)

class SSIM(torch.nn.Module):
    def __init__(self, window_size = 11, size_average = True):
        super(SSIM, self).__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = create_window(window_size, self.channel)

    def forward(self, img1, img2):
        (_, channel, _, _) = img1.size()

        if channel == self.channel and self.window.data.type() == img1.data.type():
            window = self.window
        else:
            window = create_window(self.window_size, channel)
            
            if img1.is_cuda:
                window = window.cuda(img1.get_device())
            window = window.type_as(img1)
            
            self.window = window
            self.channel = channel


        return _ssim(img1, img2, window, self.window_size, channel, self.size_average)

def ssim(img1, img2, window_size = 11, size_average = True):
    (_, channel, _, _) = img1.size()
    window = create_window(window_size, channel)
    
    if img1.is_cuda:
        window = window.cuda(img1.get_device())
    window = window.type_as(img1)
    
    return _ssim(img1, img2, window, window_size, channel, size_average)

In [None]:
reimg = reimg.view(1,reimg.shape[0],reimg.shape[1],reimg.shape[2])
reimg_28 = reimg_28.view(1,reimg_28.shape[0],reimg_28.shape[1],reimg_28.shape[2])
reimg_16 = reimg_16.view(1,reimg_16.shape[0],reimg_16.shape[1],reimg_16.shape[2])
reimg_8 = reimg_8.view(1,reimg_8.shape[0],reimg_8.shape[1],reimg_8.shape[2])

In [None]:
# reimg = int(reimg.view(1,reimg.shape[0],reimg.shape[1],reimg.shape[2]) * 256)
# reimg_28 = int(reimg_28.view(1,reimg_28.shape[0],reimg_28.shape[1],reimg_28.shape[2]) *256)
# reimg_16 = int(reimg_16.view(1,reimg_16.shape[0],reimg_16.shape[1],reimg_16.shape[2]) *256)
# reimg_8 = int(reimg_8.view(1,reimg_8.shape[0],reimg_8.shape[1],reimg_8.shape[2]) *256)

In [None]:
# import torch
# from torch.autograd import Variable

# img1 = Variable(torch.rand(1, 1, 256, 256))
# img2 = Variable(torch.rand(1, 1, 256, 256))

# if torch.cuda.is_available():
#     img1 = img1.cuda()
#     img2 = img2.cuda()


# reimg_28 = reimg_28.view(1,reimg_28.shape[0],reimg_28.shape[1],reimg_28.shape[2])
# reimg = reimg.view(1,reimg.shape[0],reimg.shape[1],reimg.shape[2])
# reimg = reimg.view(1,reimg.shape[0],reimg.shape[1],reimg.shape[2])
print(ssim(reimg, reimg_28))
print(ssim(reimg, reimg_16))
print(ssim(reimg, reimg_8))

In [None]:
reimg.shape

In [None]:
compressed_sizes

In [None]:
def compute_psnr(a, b):
    mse = torch.mean((a - b)**2).item()
    return -10 * math.log10(mse)

def compute_bitrate(compressed_size, original_size):
    bit_rate = compressed_size * 8 / original_size # 乘以8将字节转换为比特
    return bit_rate



for i in range(num_images_to_show):

    original_images = (valid_batch[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5
    compressed_images = (reconstructed_img_28[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5
    #compressed_sizes = compressed_images.size()
    
    original_images = original_images.view(
        1,
        original_images.shape[0],
        original_images.shape[1],
        original_images.shape[2])
    compressed_images = compressed_images.view(
        1,
        compressed_images.shape[0],
        compressed_images.shape[1],
        compressed_images.shape[2])
    

    #original_images = original_images.to(device)
    #compressed_images = compressed_images.to(device)
    

    # 计算PSNR和SSIM
    psnr = compute_psnr(original_images, compressed_images)
    ms_ssim = -10 * math.log10(1-ssim(original_images, compressed_images))

    
    # 计算Bit-rate，这需要你知道压缩后的图像大小
    # 下面的示例假设`compressed_sizes`是一个包含每张图片压缩后大小的tensor
    original_size = original_images.nelement() * original_images.element_size()  # 总字节数
    compressed_sizes = compressed_images.nelement() * compressed_images.element_size()
    bit_rate = compute_bitrate(compressed_sizes, original_size)
    
    # 输出结果
    print(f'PSNR: {psnr:.2f}dB')
    print(f'MS-SSIM: {ms_ssim:.4f}dB')
    print(f'Bit-rate: {bit_rate:.3f}bpp')

In [None]:
import math

def compute_psnr(a, b):
    mse = torch.mean((a - b)**2).item()
    return -10 * math.log10(mse)


def compute_msssim(a, b):
    return -10 * math.log10(1-ssim(a, b).item())

def compute_bpp(a, b):
    size = b.size()
    num_pixels = size[0] * size[2] * size[3]
    return sum(torch.log(likelihoods).sum() / (-math.log(2) * num_pixels)
              for likelihoods in a.values()).item()


for i in range(num_images_to_show):

    original_images = (valid_batch[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5
    compressed_images = (reconstructed_img_28[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5
    #compressed_sizes = compressed_images.size()
    
    
    original_images = original_images.view(
        1,
        original_images.shape[0],
        original_images.shape[1],
        original_images.shape[2])
    compressed_images = compressed_images.view(
        1,
        compressed_images.shape[0],
        compressed_images.shape[1],
        compressed_images.shape[2])
    

    #original_images = original_images.to(device)
    #compressed_images = compressed_images.to(device)
    

    # 计算PSNR和SSIM
    psnr = compute_psnr(original_images, compressed_images)
    ms_ssim = compute_msssim(original_images, compressed_images)
    
    # 计算Bit-rate，这需要你知道压缩后的图像大小
    # 下面的示例假设`compressed_sizes`是一个包含每张图片压缩后大小的tensor
    original_size = original_images.nelement() * original_images.element_size()  # 总字节数
    compressed_sizes = compressed_images.nelement() * compressed_images.element_size()
    bit_rate = compute_bpp(original_images, compressed_images)
    
    # 输出结果
    print(f'PSNR: {psnr:.2f}dB')
    print(f'MS-SSIM: {ms_ssim:.4f}')
    print(f'Bit-rate: {bit_rate:.3f}bpp')

In [None]:
def compute_bpp(out_net):
    size = out_net['x_hat'].size()
    num_pixels = size[0] * size[2] * size[3]
    return sum(torch.log(likelihoods).sum() / (-math.log(2) * num_pixels)
              for likelihoods in out_net['likelihoods'].values()).item()

i=1
original_images = (valid_batch[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5
compressed_images = (reconstructed_img_28[i].cpu().detach().permute(1, 2, 0) *0.5) + 0.5
original_images = original_images.view(
    1,
    original_images.shape[0],
    original_images.shape[1],
    original_images.shape[2])
compressed_images = compressed_images.view(
    1,
    compressed_images.shape[0],
    compressed_images.shape[1],
    compressed_images.shape[2])

In [None]:
size = compressed_images.size()
num_pixels = size[0] * size[2] * size[3]
num_pixels

In [None]:
((valid_batch[i].cpu().detach().permute(1, 2, 0) * 0.5) + 0.5).values()