In [None]:
from google.colab import drive
drive.mount('/content/drive')

network folders:

In [None]:
# architecture.py
import torch.nn as nn
def get_batchnorm_layer(opts):
    if opts.norm_layer == "batch":
        norm_layer = nn.BatchNorm2d
    elif opts.layer == "spectral_instance":
        norm_layer = nn.InstanceNorm2d
    else:
        print("not implemented")
        exit()
    return norm_layer

def get_conv2d_layer(in_c, out_c, k, s, p=0, dilation=1, groups=1):
    return nn.Conv2d(in_channels=in_c,
                    out_channels=out_c,
                    kernel_size=k,
                    stride=s,
                    padding=p,dilation=dilation, groups=groups)

def get_deconv2d_layer(in_c, out_c, k=1, s=1, p=1):
    return nn.Sequential(
        nn.Upsample(scale_factor=2, mode="bilinear"),
        nn.Conv2d(
            in_channels=in_c,
            out_channels=out_c,
            kernel_size=k,
            stride=s,
            padding=p
        )
    )

class Identity(nn.Module):

    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

In [None]:
# illumination_adjustment
import torch.nn as nn
class Adjust_naive(nn.Module):
    def __init__(self, opt):
        super().__init__()
        self.conv1 = get_conv2d_layer(in_c=2, out_c=32, k=5, s=1, p=2)
        self.conv2 = get_conv2d_layer(in_c=32, out_c=32, k=5, s=1, p=2)
        self.conv3 = get_conv2d_layer(in_c=32, out_c=32, k=5, s=1, p=2)
        self.conv4 = get_conv2d_layer(in_c=32, out_c=1, k=5, s=1, p=2)
        self.leaky_relu = nn.LeakyReLU(0.2)
        self.relu = nn.ReLU()
    def forward(self, l, alpha):
        input = torch.cat([l, alpha], dim=1)
        x = self.conv1(input)            
        x = self.conv2(self.leaky_relu(x))   
        x = self.conv3(self.leaky_relu(x))   
        x = self.conv4(self.leaky_relu(x))
        x = self.relu(x) 
        return x

In [None]:
#illumination_enhance
class Illumination_Alone(nn.Module):
    def __init__(self, opts):
        super().__init__()
        self.opts = opts
        self.conv1 = get_conv2d_layer(in_c=1, out_c=32, k=5, s=1, p=2)
        self.conv2 = get_conv2d_layer(in_c=32, out_c=32, k=5, s=1, p=2)
        self.conv3 = get_conv2d_layer(in_c=32, out_c=32, k=5, s=1, p=2)
        self.conv4 = get_conv2d_layer(in_c=32, out_c=32, k=5, s=1, p=2)
        self.conv5 = get_conv2d_layer(in_c=32, out_c=1, k=1, s=1, p=0)

        self.leaky_relu_1 = nn.LeakyReLU(0.2, inplace=True)
        self.leaky_relu_2 = nn.LeakyReLU(0.2, inplace=True)
        self.leaky_relu_3 = nn.LeakyReLU(0.2, inplace=True)
        self.leaky_relu_4 = nn.LeakyReLU(0.2, inplace=True)
        self.relu = nn.ReLU()
        #self.sigmoid = nn.Sigmoid()
    
    def forward(self, l):
        x = l
        x1 = self.leaky_relu_1(self.conv1(x))
        x2 = self.leaky_relu_2(self.conv2(x1))
        x3 = self.leaky_relu_3(self.conv3(x2))
        x4 = self.leaky_relu_4(self.conv4(x3))
        x5 = self.relu(self.conv5(x4))
        return x5


In [None]:
#math_module

class P(nn.Module):
    """
        to solve min(P) = ||I-PQ||^2 + γ||P-R||^2
        this is a least square problem
        how to solve?
        P* = (gamma*R + I*Q) / (Q*Q + gamma)
    """
    def __init__(self):
        super().__init__()

    def forward(self, I, Q, R, gamma):
        return ((I * Q + gamma * R) / (gamma + Q * Q))
        
class Q(nn.Module):
    """
        to solve min(Q) = ||I-PQ||^2 + λ||Q-L||^2
        Q* = (lamda*L + I*P) / (P*P + lamda)
    """
    def __init__(self):
        super().__init__()
    
    def forward(self, I, P, L, lamda):
        
        IR = I[:, 0:1, :, :]
        IG = I[:, 1:2, :, :]
        IB = I[:, 2:3, :, :]

        PR = P[:, 0:1, :, :]
        PG = P[:, 1:2, :, :]
        PB = P[:, 2:3, :, :]

        return (IR*PR + IG*PG + IB*PB + lamda*L) / ((PR*PR + PG*PG + PB*PB) + lamda)

In [None]:
# restoration
class HalfDnCNNSE(nn.Module):
    def __init__(self, opts):
        super().__init__()
        self.opts = opts
        
        if self.opts.concat_L:
            self.conv1 = get_conv2d_layer(in_c=3, out_c=32, k=3, s=1, p=1)
            self.relu1 = nn.ReLU(inplace=True)
            self.conv2 = get_conv2d_layer(in_c=1, out_c=32, k=3, s=1, p=1)
            self.relu2 = nn.ReLU(inplace=True)
        else:
            self.conv1 = self.conv1 = get_conv2d_layer(in_c=3, out_c=64, k=3, s=1, p=1)
            self.relu1 = nn.ReLU(inplace=True)
        self.se_layer = SELayer(channel=64)
        self.conv3 = get_conv2d_layer(in_c=64, out_c=64, k=3, s=1, p=1)
        self.relu3 = nn.ReLU(inplace=True)
        self.conv4 = get_conv2d_layer(in_c=64, out_c=64, k=3, s=1, p=1)
        self.relu4 = nn.ReLU(inplace=True)
        self.conv5 = get_conv2d_layer(in_c=64, out_c=64, k=3, s=1, p=1)
        self.relu5 = nn.ReLU(inplace=True)
        self.conv6 = get_conv2d_layer(in_c=64, out_c=64, k=3, s=1, p=1)
        self.relu6 = nn.ReLU(inplace=True)
        self.conv7 = get_conv2d_layer(in_c=64, out_c=64, k=3, s=1, p=1)
        self.relu7 = nn.ReLU(inplace=True)

        self.conv8 = get_conv2d_layer(in_c=64, out_c=3, k=3, s=1, p=1)

    def forward(self, r, l):
        if self.opts.concat_L:
            r_fs = self.relu1(self.conv1(r))
            l_fs = self.relu2(self.conv2(l))
            inf = torch.cat([r_fs, l_fs], dim=1)
            se_inf = self.se_layer(inf)
        else:
            r_fs = self.relu1(self.conv1(r))
            se_inf = self.se_layer(r_fs)
        x1 = self.relu3(self.conv3(se_inf))
        x2 = self.relu4(self.conv4(x1))
        x3 = self.relu5(self.conv5(x2))
        x4 = self.relu6(self.conv6(x3))
        x5 = self.relu7(self.conv7(x4))
        n = self.conv8(x5)
        r_restore = r + n
        return r_restore

class SELayer(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

In [None]:
#decom
class Decom(nn.Module):
    def __init__(self):
        super().__init__()
        self.decom = nn.Sequential(
            get_conv2d_layer(in_c=3, out_c=32, k=3, s=1, p=1),
            nn.LeakyReLU(0.2, inplace=True),
            get_conv2d_layer(in_c=32, out_c=32, k=3, s=1, p=1),
            nn.LeakyReLU(0.2, inplace=True),
            get_conv2d_layer(in_c=32, out_c=32, k=3, s=1, p=1),
            nn.LeakyReLU(0.2, inplace=True),
            get_conv2d_layer(in_c=32, out_c=4, k=3, s=1, p=1),
            nn.ReLU()
        )

    def forward(self, input):
        output = self.decom(input)
        R = output[:, 0:3, :, :]
        L = output[:, 3:4, :, :]
        return R, L

utils.py

In [None]:
import torchvision

import numpy as np
import os

import torch
from PIL import Image



def save_TensorImg(img_tensor, path, nrow=1):
    torchvision.utils.save_image(img_tensor, path, nrow=nrow)


def np_save_TensorImg(img_tensor, path):
    img = np.squeeze(img_tensor.cpu().permute(0, 2, 3, 1).numpy())
    im = Image.fromarray(np.clip(img * 255, 0, 255.0).astype('uint8'))
    im.save(path, 'png')


def define_modelR(opts):
    if opts.R_model == "HalfDnCNNSE":
        model_R = HalfDnCNNSE(opts)
    return model_R


def define_modelL(opts):
    if opts.L_model == "Illumination_Alone":

        model_L = Illumination_Alone(opts)
    return model_L


def define_modelA(opts):
    if opts.A_model == "naive":

        model_A = Adjust_naive(opts)
    return model_A


def load_initialize(model, decom_model_path):
    if os.path.exists(decom_model_path):
        checkpoint_Decom_low = torch.load(decom_model_path) # For CUDA, I don't have NVIDIA GPU.
        # checkpoint_Decom_low = torch.load(decom_model_path,
        #                                   map_location=torch.device('cpu'))
        model.load_state_dict(checkpoint_Decom_low['state_dict']['model_R'])
        # to freeze the params of Decomposition Model
        for param in model.parameters():
            param.requires_grad = False
        return model
    else:
        print(
            "pretrained Initialize Model does not exist, check ---> %s " % decom_model_path)
        exit()


def load_unfolding(unfolding_model_path):
    if os.path.exists(unfolding_model_path):
        checkpoint = torch.load(unfolding_model_path) # For CUDA, I don't have NVIDIA GPU.
        # checkpoint = torch.load(unfolding_model_path,
        #                                   map_location=torch.device('cpu'))
        old_opts = checkpoint["opts"]
        model_R = define_modelR(old_opts)
        model_L = define_modelL(old_opts)
        model_R.load_state_dict(checkpoint['state_dict']['model_R'])
        model_L.load_state_dict(checkpoint['state_dict']['model_L'])
        for param_R in model_R.parameters():
            param_R.requires_grad = False
        for param_L in model_L.parameters():
            param_L.requires_grad = False
        return old_opts, model_R, model_L
    else:
        print(
            "pretrained Unfolding Model does not exist, check ---> %s" % unfolding_model_path)
        exit()


def load_adjustment(adjust_model_path):
    if os.path.exists(adjust_model_path):
        # checkpoint_Adjust = torch.load(adjust_model_path)
        checkpoint_Adjust = torch.load(adjust_model_path,
                                       map_location=torch.device('cpu'))
        model_A = define_modelA(checkpoint_Adjust['opts'])
        model_A.load_state_dict(checkpoint_Adjust['state_dict']['model_A'])
        print(
            " ===========>  loading pretrained Illumination Adjustment Model from: %s " % adjust_model_path)
        # to freeze the params of Decomposition Model
        for param in model_A.parameters():
            param.requires_grad = False
        return model_A
    else:
        print(
            "pretrained Adjustment Model does not exist, check ---> %s" % adjust_model_path)
        exit()


def param_all(model, net_input):
    import torchsummary
    shape = net_input.shape
    torchsummary.summary(model, (shape[1], shape[2], shape[3]))


def param_self_compute(model):
    parmas = 0
    for p in model.parameters():
        # print(p)
        parmas += p.numel()
    return parmas


test.py

In [None]:
import argparse
import torch.nn as nn
import torchvision.transforms as transforms
import time


def one2three(x):
    return torch.cat([x, x, x], dim=1).to(x)

class Inference(nn.Module):
    def __init__(self, opts):
        super().__init__()
        self.opts = opts
        # loading decomposition model 
        self.model_Decom_low = Decom()
        self.model_Decom_low = load_initialize(self.model_Decom_low, self.opts.Decom_model_low_path)
        # loading R; old_model_opts; and L model
        self.unfolding_opts, self.model_R, self.model_L= load_unfolding(self.opts.unfolding_model_path)
        # loading adjustment model
        self.adjust_model = load_adjustment(self.opts.adjust_model_path)
        self.P = P()
        self.Q = Q()
        transform = [
            transforms.ToTensor(),
        ]
        self.transform = transforms.Compose(transform)
        print(self.model_Decom_low)
        print(self.model_R)
        print(self.model_L)
        print(self.adjust_model)
        #time.sleep(8)

    def unfolding(self, input_low_img):
        for t in range(self.unfolding_opts.round):      
            if t == 0: # initialize R0, L0
                P, Q = self.model_Decom_low(input_low_img)
            else: # update P and Q
                w_p = (self.unfolding_opts.gamma + self.unfolding_opts.Roffset * t)
                w_q = (self.unfolding_opts.lamda + self.unfolding_opts.Loffset * t)
                P = self.P(I=input_low_img, Q=Q, R=R, gamma=w_p)
                Q = self.Q(I=input_low_img, P=P, L=L, lamda=w_q) 
            R = self.model_R(r=P, l=Q)
            L = self.model_L(l=Q)
        return R, L
    
    def lllumination_adjust(self, L, ratio):
        ratio = torch.ones(L.shape).cuda() * self.opts.ratio
        return self.adjust_model(l=L, alpha=ratio)
    
    def forward(self, input_low_img):
        if torch.cuda.is_available():
            input_low_img = input_low_img.cuda()
        with torch.no_grad():
            start = time.time()  
            R, L = self.unfolding(input_low_img)
            High_L = self.lllumination_adjust(L, self.opts.ratio)
            I_enhance = High_L * R
            p_time = (time.time() - start)
        return I_enhance, p_time

    def run(self, input_dir):
          if not os.path.exists(input_dir):
              print(f"Directory '{input_dir}' does not exist!")
              return
          if not os.path.isdir(input_dir):
              print(f"'{input_dir}' is not a directory!")
              return

          if not os.path.exists(self.opts.output):
              os.makedirs(self.opts.output)
          run_starttime = time.time()
          tmplst = os.listdir(input_dir)
          for i, file_name in enumerate(tmplst):
              if not file_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                  continue

              input_path = os.path.join(input_dir, file_name)
              name = os.path.splitext(file_name)[0]
              low_img = self.transform(Image.open(input_path)).unsqueeze(0)
              enhance, p_time = self.forward(input_low_img=low_img)
              # save_path = os.path.join(self.opts.output, file_name.replace(name,
              #                                                              f"{name}_{self.opts.ratio}_URetinexNet"))
              save_path = os.path.join(self.opts.output, name)
              np_save_TensorImg(enhance, save_path)
              if i % 50 == 0:
                tmptime = time.time()-run_starttime
                print(f"{i}/{len(tmplst)}: {tmptime:.2f} seconds.")

          totaltime=time.time()-run_starttime
          print(f"Total time: {totaltime:.2f} seconds.")



    
# if __name__ == "__main__":
#     parser = argparse.ArgumentParser(description='Configure')
#     # specify your data path here!
#     parser.add_argument('--img_path', type=str, default="/content/drive/MyDrive/Mobile Robotics Project/Full Dataset/Converted/image_0")
#     parser.add_argument('--output', type=str, default="/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/Datasets/Full Dataset/URetinex-Net/image_0")
#     # ratio are recommended to be 3-5, bigger ratio will lead to over-exposure 
#     parser.add_argument('--ratio', type=int, default=5)
#     # model path
#     parser.add_argument('--Decom_model_low_path', type=str, default="/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/init_low.pth")
#     parser.add_argument('--unfolding_model_path', type=str, default="/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/unfolding.pth")
#     parser.add_argument('--adjust_model_path', type=str, default="/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/L_adjust.pth")
#     parser.add_argument('--gpu_id', type=int, default=0)
    
#     opts = parser.parse_args()
#     for k, v in vars(opts).items():
#         print(k, v)
    
#     os.environ['CUDA_VISIBLE_DEVICES'] = str(opts.gpu_id)
#     model = Inference(opts).cuda()
#     model.run(opts.img_path)


In [None]:
import argparse, os
if __name__ == "__main__":
    img_path = "/content/drive/MyDrive/Mobile Robotics Project/Full Dataset/Converted/image_0"
    output = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/Datasets/Full Dataset/URetinex-Net/image_0"
    ratio = 5
    decom_model_low_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/init_low.pth"
    unfolding_model_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/unfolding.pth"
    adjust_model_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/L_adjust.pth"
    gpu_id = 0
    
    opts = argparse.Namespace(img_path=img_path, output=output, ratio=ratio,
                              Decom_model_low_path=decom_model_low_path,
                              unfolding_model_path=unfolding_model_path,
                              adjust_model_path=adjust_model_path,
                              gpu_id=gpu_id)
    
    os.environ['CUDA_VISIBLE_DEVICES'] = str(opts.gpu_id)
    model = Inference(opts).cuda()
    model.run(opts.img_path)


In [None]:
if __name__ == "__main__":
    img_path = "/content/drive/MyDrive/Mobile Robotics Project/Full Dataset/Converted/image_1"
    output = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/Datasets/Full Dataset/URetinex-Net/image_1"
    ratio = 5
    decom_model_low_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/init_low.pth"
    unfolding_model_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/unfolding.pth"
    adjust_model_path = "/content/drive/MyDrive/Mobile Robotics Project/Image Enhancement/ckpt/L_adjust.pth"
    gpu_id = 0
    
    opts = argparse.Namespace(img_path=img_path, output=output, ratio=ratio,
                              Decom_model_low_path=decom_model_low_path,
                              unfolding_model_path=unfolding_model_path,
                              adjust_model_path=adjust_model_path,
                              gpu_id=gpu_id)
    
    os.environ['CUDA_VISIBLE_DEVICES'] = str(opts.gpu_id)
    model = Inference(opts).cuda()
    model.run(opts.img_path)
