In [1]:
import torch
from torch.utils.data import Dataset, DataLoader

import numpy as np
import cv2
import os
from tqdm import tqdm
from PIL import Image

import torchvision
import torchvision.transforms as tfs
from torchvision.transforms import ToPILImage
from torchvision.transforms import functional as FF
from torch.autograd import Variable
import torch.nn as nn
import math
import random
import pandas as pd
from glob import glob
from sklearn.model_selection import train_test_split

In [2]:
config = {}
config['batch_size'] = 64
config['lr'] = 1e-3
config['epochs'] = 100
config['test_size'] = 0.2

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [4]:
_DATA_PATH = '../input/indoor-training-set-its-residestandard'

In [5]:
list_hazy = glob(os.path.join(_DATA_PATH, "hazy", '*.png'))
df_ = pd.DataFrame([])
df_['hazy'] = list_hazy
df_train, df_valid = train_test_split(df_, test_size = config['test_size'], random_state=42)
df_train['is_train'] = True
df_valid['is_train'] = False
df_ = pd.concat([df_train, df_valid], axis=0).reset_index()
df_

In [6]:
class RESIDE_Dataset(Dataset):
    def __init__(self, path, df_haze_imgs, size=240, format='.png'):
        super(RESIDE_Dataset, self).__init__()
        self.size = size
        self.format = format
        self.haze_imgs = list(df_haze_imgs['hazy'])
        self.depth_imgs_dir = os.path.join(path,'trans').replace('\\','/')
        
    def __getitem__(self, index):
        # depth = Image.open(self.depth_imgs[index])
        haze_fname = self.haze_imgs[index]

        haze = cv2.imread(haze_fname)
        hazeHSV = cv2.cvtColor(haze, cv2.COLOR_BGR2HSV)
        s = torch.tensor(hazeHSV[:,:,1] / 255.0)
        v = torch.tensor(hazeHSV[:,:,2] / 255.0)
        
        id_ = '_'.join(haze_fname.split('/')[-1].split('_')[:2])
        depth_fname = id_ + self.format
        depth = Image.open(os.path.join(self.depth_imgs_dir, depth_fname))
        depth = tfs.ToTensor()(depth)
        depth = torch.squeeze(depth)
        return s, v, depth

    def __len__(self):
        return len(self.haze_imgs)

In [7]:
train_ds = RESIDE_Dataset(path = _DATA_PATH, df_haze_imgs = df_[df_['is_train']==True])
valid_ds = RESIDE_Dataset(path = _DATA_PATH, df_haze_imgs = df_[df_['is_train']==False])

In [8]:
train_dl = DataLoader(train_ds, batch_size = config['batch_size'])
valid_dl = DataLoader(valid_ds, batch_size = config['batch_size'])

In [9]:
for s, v, d in train_dl:
    # Image.open(x)
    break

In [10]:
s.shape, v.shape, d.shape

In [11]:
d.min(), d.max()

In [12]:
def init_model():
    theta_0 = Variable(torch.tensor([0.]), requires_grad=True)
    theta_1 = Variable(torch.tensor([1.]), requires_grad=True)
    theta_2 = Variable(torch.tensor([-1.]), requires_grad=True)
    model = theta_0, theta_1, theta_2
    return model

def forward_model(model, v, s):
    theta_0, theta_1, theta_2 = model
    d = theta_0 + theta_1 * v + theta_2 * s
    return d, model

def mse(t1, t2):
    diff = t1 - t2
    return (torch.sum(diff * diff) / diff.numel())

def optimizer(theta, lr = config['lr']):
    theta -= theta.grad * lr
    theta.grad.zero_()
    
def evaluate(model, v, s, gt):
    theta_0, theta_1, theta_2 = model
    d_hat = theta_0 + theta_1 * v + theta_2 * s
    d_hat = (d_hat*255).astype(np.uint8)
    gt = (gt*255).astype(np.uint8)
    psnr = compute_psnr(gt, d_hat)
    ssim = compute_ssim(gt, d_hat)
    return psnr, ssim

In [13]:
def compute_psnr(img_origin, img_denoise):
    mse = np.mean((img_origin - img_denoise) ** 2)
    if(mse == 0):
        return 100
    max_pixel = 255.0
    psnr = 20 * np.log10(max_pixel / np.sqrt(mse))
    return psnr

def ssim(img1, img2):
    C1 = (0.01 * 255)**2
    C2 = (0.03 * 255)**2

    img1 = img1.astype(np.float64)
    img2 = img2.astype(np.float64)
    kernel = cv2.getGaussianKernel(11, 1.5)
    window = np.outer(kernel, kernel.transpose())

    mu1 = cv2.filter2D(img1, -1, window)[5:-5, 5:-5]  # valid
    mu2 = cv2.filter2D(img2, -1, window)[5:-5, 5:-5]
    mu1_sq = mu1**2
    mu2_sq = mu2**2
    mu1_mu2 = mu1 * mu2
    sigma1_sq = cv2.filter2D(img1**2, -1, window)[5:-5, 5:-5] - mu1_sq
    sigma2_sq = cv2.filter2D(img2**2, -1, window)[5:-5, 5:-5] - mu2_sq
    sigma12 = cv2.filter2D(img1 * img2, -1, window)[5:-5, 5:-5] - mu1_mu2

    ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) *
                                                            (sigma1_sq + sigma2_sq + C2))
    return ssim_map.mean()


def compute_ssim(img1, img2):
    '''calculate SSIM
    the same outputs as MATLAB's
    img1, img2: [0, 255]
    '''
    if not img1.shape == img2.shape:
        raise ValueError('Input images must have the same dimensions.')
    if img1.ndim == 2:
        return ssim(img1, img2)
    elif img1.ndim == 3:
        if img1.shape[2] == 3:
            ssims = []
            for i in range(3):
                ssims.append(ssim(img1, img2))
            return np.array(ssims).mean()
        elif img1.shape[2] == 1:
            return ssim(np.squeeze(img1), np.squeeze(img2))
    else:
        raise ValueError('Wrong input image dimensions.')

In [14]:
def train_one(model, train_dl, device):
    losses = 0
    for s, v, depth in tqdm(train_dl):
        depth = depth.to(device)
        depth_hat, model = forward_model(model, v, s)
        depth_hat = depth_hat.to(device)
        loss = mse(depth_hat, depth)
#         print(loss)
        losses += loss.item()
        loss.backward()
        with torch.no_grad():
            theta_0, theta_1, theta_2 = model
            optimizer(theta_0)
            optimizer(theta_1)
            optimizer(theta_2)
            model = theta_0, theta_1, theta_2
    return losses/len(train_dl), model

def eval_one(model, valid_dl, device):
    losses = 0
    psnrs = 0
    ssims = 0
    
    count = 0
    for s, v, depth in tqdm(valid_dl):
        depth = depth.to(device)
        depth_hat, model = forward_model(model, v, s)
        depth_hat = depth_hat.to(device)
        loss = mse(depth_hat, depth)
        losses += loss.item()    
    return losses/len(valid_dl)

In [None]:
model = init_model()
train_losses = []
valid_losses = []
for epoch in range(config['epochs']):
    train_loss, model = train_one(model, train_dl, device)
    valid_loss = eval_one(model, valid_dl, device)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    
    print('Epoch-{0}'.format(epoch))
    print(f'\tTrain Loss: {train_loss:.3f} | Valid Loss: {valid_loss:.3f}')
    print('\tParameters')
    print(model)
