#Network 구성

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import *
import torch
import math
from os.path import join
import torch.utils.data as data
import numpy as np
from os import listdir
from os.path import join
from PIL import Image, ImageOps
import random
from random import randrange
from math import sqrt



class FC4(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(FC4, self).__init__()

        self.layer1 = nn.Conv2d(in_channels=in_ch, out_channels=96, kernel_size=11, stride=4, padding=5, bias=False)
        self.layer2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, stride=1, padding=2, bias=False)
        self.layer3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=5, stride=1, padding=2, bias=False)
        self.layer4 = nn.Conv2d(in_channels=384, out_channels=384, kernel_size=5, stride=1, padding=2, bias=False)
        self.layer5 = nn.Conv2d(in_channels=384, out_channels=256, kernel_size=5, stride=1, padding=2, bias=False)
        self.layer6 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False)
        self.layer7 = nn.Conv2d(in_channels=128, out_channels=out_ch, kernel_size=3, stride=1, padding=1, bias=False)
        self.Maxpooling = nn.MaxPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):

        x = self.relu(self.layer1(x))
        x = self.Maxpooling(x)
        x = self.relu(self.layer2(x))
        x = self.Maxpooling(x)
        x = self.relu(self.layer3(x))
        x = self.relu(self.layer4(x))
        x = self.relu(self.layer5(x))
        x = self.Maxpooling(x)
        out = self.relu(self.layer6(x))
        # out = self.dropout(out)
        out = abs(self.layer7(out))

        local_illum =out[:,:3,:,:]
        conf = out[:,3:4,:,:]

        conf_map_re = torch.reshape(conf, [conf.size(0), conf.size(2) * conf.size(3)])
        sum = torch.sum(conf_map_re, dim=1)
        sum = sum.reshape([sum.size(0), 1, 1, 1])
        conf = conf / (sum.repeat([1, 1, conf.size(2), conf.size(3)]) + 0.00001)
        conf_map = conf.repeat([1, 3, 1, 1])

        normal = (torch.norm(local_illum[:, :, :, :], p=2, dim=1, keepdim=True) + 1e-04)
        local_illum = local_illum[:, :, :, :] / normal

        local_il = local_illum * conf_map
        local_il = local_il.reshape(local_il.size(0), local_il.size(1), local_il.size(2) * local_il.size(3))
        weighted_sum = torch.sum(local_il, dim=2)
        pred = weighted_sum / (torch.norm(weighted_sum, dim=1, p=2, keepdim=True) + 0.000001)

        return pred


#Dataset 구성

In [None]:
import os
import torch
import torch.utils.data as data
import numpy as np
from os import listdir
from os.path import join
from PIL import Image
import random
import scipy
from scipy import io


def is_image_file(filename):
    return any(filename.endswith(extension) for extension in [".png", ".jpg", ".jpeg",".bmp"])

def is_mat_file(filename):
    return any(filename.endswith(extension) for extension in [".mat"])

def is_txt_file(filename):
    return any(filename.endswith(extension) for extension in [".txt"])


def load_img(filepath):
    img = Image.open(filepath).convert('RGB')
    return img

def get_patch_2tensor(img_in,wb_img_in, patch_size=256): # img_in : 이미지 파일 w,h | wb_img_in : 텐서 c,h,w
    c ,ih, iw = img_in.size()
    ip = patch_size

    ix = random.randrange(0, iw - ip + 1)
    iy = random.randrange(0, ih - ip + 1)

    img_in = img_in[:,iy:iy+ip,ix:ix+ip]
    wb_img_in = wb_img_in[:,iy:iy+ip,ix:ix+ip]

    return img_in,wb_img_in

class DatasetFromFolder(data.Dataset):
    def __init__(self, image_dir, folder, isTrain, transform=None):
        super(DatasetFromFolder, self).__init__()

        self.Train = isTrain
        self.istrain = folder

        self.im_dir = os.path.join(image_dir, self.istrain, 'rgb/')
        self.wb_im_dir = os.path.join(image_dir, self.istrain, 'wb_rgb/')
        self.gt_label_dir = os.path.join(image_dir, self.istrain, 'label/')

        self.image_filenames = [join(self.im_dir, x) for x in os.listdir(self.im_dir) if is_image_file(x)]
        self.wb_image_filenames = [join(self.wb_im_dir, x) for x in os.listdir(self.wb_im_dir) if is_image_file(x)]
        self.gt_label_filenames = [join(self.gt_label_dir, x) for x in os.listdir(self.gt_label_dir) if is_txt_file(x)]

        self.transform = transform

    def __getitem__(self, index):

        input_im = load_img(self.image_filenames[index])
        wb_input_im = load_img(self.wb_image_filenames[index])
        gt_label = np.zeros(shape=(3))

        with open(self.gt_label_filenames[index]) as f:
            lines = f.readlines()[0:3]
        for i in range(3):
            gt_label[i] = float(lines[i])  # 형 변환 추가

        gt_label = torch.from_numpy(gt_label)


        if self.Train == True:
            if self.transform:
                rgb = self.transform(input_im)
                wb_rgb = self.transform(wb_input_im)
            return rgb, wb_rgb, gt_label
        else:
            _, file = os.path.split(self.image_filenames[index])
            if self.transform:
                rgb = self.transform(input_im)
                wb_rgb = self.transform(wb_input_im)

            return rgb, wb_rgb, gt_label, file

    def __len__(self):
        return len(self.image_filenames)

#Training

In [None]:
from __future__ import print_function
import argparse
from math import log10

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
from torch.utils.data import DataLoader
import pdb
import socket
import time
import easydict

opt = easydict.EasyDict({
    "batchSize": 8,           # batch size - 한번에 training할 patch의 숫자
    "lr": 1e-4,                # learning rate
    "start_iter": 1,
    "nEpochs": 100,            # training 횟수
    "data_dir":"/content/drive/MyDrive/Colab Notebooks/dataset/", # dataset 저장 위치
    "model_type": "FC4",         # 모델이름
    "save_folder": "/content/drive/MyDrive/Colab Notebooks/weight/", # weight 저장 위치
    "pretrained_sr": "/content/drive/MyDrive/Colab Notebooks/weight/epoch_200.pth",
    "pretrained": False,
    "gpu_mode": True,
    "threads": 1,
    "seed": 123,
    "gpus": 1})

# gpus_list = range(opt.gpus)
device = "cuda:0"
hostname = str(socket.gethostname())
cudnn.benchmark = True
print(opt)

if not os.path.exists(opt.save_folder):
    os.makedirs(opt.save_folder)

def train(epoch):
    epoch_loss = 0
    model.train()
    for iteration, batch in enumerate(training_data_loader, 1):
        rgb, wb_rgb, gt_label = Variable(batch[0]), Variable(batch[1]), Variable(batch[2])
        if cuda:
            rgb = rgb.cuda()
            wb_rgb = wb_rgb.cuda()
            gt_label = gt_label.cuda()

        optimizer.zero_grad()
        t0 = time.time()
        pred = model(rgb)


        loss = angular_loss(pred, gt_label)
        t1 = time.time()
        epoch_loss += loss.data
        loss.backward()
        optimizer.step()

        print("===> Epoch[{}]({}/{}): Loss: {:.4f} || Timer: {:.4f} sec.".format(epoch, iteration, len(training_data_loader), loss.data, (t1 - t0)))

    print("===> Epoch {} Complete: Avg. Loss: {:.4f}".format(epoch, epoch_loss / len(training_data_loader)))


def eval(epoch):
    angular_list=[]
    avg_cc = 0
    model.eval()
    for i,batch in enumerate(testing_data_loader):
        with torch.no_grad():
            rgb,wb_rgb,gt_label = Variable(batch[0]), Variable(batch[1]), Variable(batch[2])

        if cuda:
            rgb = rgb.cuda()
            wb_rgb = wb_rgb.cuda()
            gt_label = gt_label.cuda()

        with torch.no_grad():
            pred = model(rgb)

        ''' calculate AE '''
        cc = angular_loss(pred, gt_label)

        avg_cc += cc
        angular_list.append(cc)
        print('{}번째 angular error = {:.4f}'.format(i, cc))
    angular_list1 = angular_list.copy()
    index_min = []
    index_max = []
    '''AE measure'''
    angular_list.sort()
    onefourth = len(testing_data_loader) // 4
    median = angular_list[len(testing_data_loader) // 2]
    maxerror, minerror = 0, 0
    # min 25% error
    for i in range(onefourth):
        minerror += angular_list[i]
        index_min.append(angular_list1.index(angular_list[i]))
    # max 25% error
    angular_list.sort(reverse=True)
    for i in range(onefourth):
        maxerror += angular_list[i]
        index_max.append(angular_list1.index(angular_list[i]))

    maxerror = maxerror / onefourth
    minerror = minerror / onefourth
    avg_cc = avg_cc / len(testing_data_loader)
    print('average:{:.4f} | median:{:.4f} | worst:{:.4f} | Best:{:.4f}'.format(avg_cc,median,maxerror,minerror))
    print("===> Processing Done")
    log('Epoch[{:.4f}] : average = {:.4f}| median:{:.4f} | worst:{:.4f} | Best:{:.4f}\n'.format(epoch,avg_cc,median,maxerror,minerror) , logfile)

def log(text, LOGGER_FILE):
    with open(LOGGER_FILE, 'a') as f:
        f.write(text)
        f.close()

def angular_loss(pred, gt):
    pred.cuda()
    gt.cuda()
    a = (pred[:, 0] * gt[:, 0] + pred[:, 1] * gt[:, 1] + pred[:, 2] * gt[:, 2])
    b = pow(torch.abs(pred[:, 0] * pred[:, 0] + pred[:, 1] * pred[:, 1] + pred[:, 2] * pred[:, 2] + 1e-06), 1 / 2)
    c = pow(torch.abs(gt[:, 0] * gt[:, 0] + gt[:, 1] * gt[:, 1] + gt[:, 2] * gt[:, 2] + 1e-06), 1 / 2)

    acos = torch.acos(a / (b * c + 1e-05))
    output = torch.mean(acos)
    output = output * 180 / math.pi

    return output

def print_network(net):
    num_params = 0
    for param in net.parameters():
        num_params += param.numel()
    print(net)
    print('Total number of parameters: %d' % num_params)

def checkpoint(epoch):
    model_out_path = opt.save_folder+"epoch_{}.pth".format(epoch)
    torch.save(model.state_dict(), model_out_path)
    print("Checkpoint saved to {}".format(model_out_path))

def transform():
    return Compose([
        ToTensor(),
    ])

if __name__ == '__main__':

    cuda = opt.gpu_mode
    if cuda and not torch.cuda.is_available():
        raise Exception("No GPU found, please run without --cuda")

    torch.manual_seed(opt.seed)
    if cuda:
        torch.cuda.manual_seed(opt.seed)

    print('===> Loading datasets')

    train_set = DatasetFromFolder(image_dir = opt.data_dir, folder = "train/" , isTrain = True, transform = transform())
    training_data_loader = DataLoader(dataset=train_set, num_workers=opt.threads, batch_size=opt.batchSize, shuffle=True)
    test_set = DatasetFromFolder(image_dir = opt.data_dir, folder = "test/" , isTrain = False, transform = transform())
    testing_data_loader = DataLoader(dataset=test_set, num_workers=opt.threads, batch_size=opt.batchSize, shuffle=False)

    print('===> Building model ', opt.model_type)
    model = FC4(in_ch=3,out_ch=4)

    model = torch.nn.DataParallel(model, device_ids=[0])

    if opt.pretrained:
        model_name = os.path.join(opt.save_folder + opt.pretrained_sr)
        if os.path.exists(model_name):
            model.load_state_dict(torch.load(model_name, map_location=lambda storage, loc: storage))
            print('Pre-trained FC4 model is loaded.')

    if cuda:
        model = model.cuda()

    optimizer = optim.Adam(model.parameters(), lr=opt.lr, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-4)
    logfile = opt.save_folder + 'eval.txt'
    for epoch in range(opt.start_iter, opt.nEpochs + 1):

        train(epoch)

        if (epoch) % 10 == 0:
            checkpoint(epoch)
            eval(epoch)


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
===> Epoch[4](90/145): Loss: 7.7972 || Timer: 0.0103 sec.
===> Epoch[4](91/145): Loss: 11.6094 || Timer: 0.0053 sec.
===> Epoch[4](92/145): Loss: 5.9526 || Timer: 0.0068 sec.
===> Epoch[4](93/145): Loss: 7.2882 || Timer: 0.0220 sec.
===> Epoch[4](94/145): Loss: 5.9069 || Timer: 0.0110 sec.
===> Epoch[4](95/145): Loss: 7.7949 || Timer: 0.0089 sec.
===> Epoch[4](96/145): Loss: 5.3341 || Timer: 0.0034 sec.
===> Epoch[4](97/145): Loss: 9.9203 || Timer: 0.0101 sec.
===> Epoch[4](98/145): Loss: 10.9826 || Timer: 0.0041 sec.
===> Epoch[4](99/145): Loss: 7.2795 || Timer: 0.0036 sec.
===> Epoch[4](100/145): Loss: 9.9631 || Timer: 0.0048 sec.
===> Epoch[4](101/145): Loss: 8.1268 || Timer: 0.0032 sec.
===> Epoch[4](102/145): Loss: 4.7264 || Timer: 0.0038 sec.
===> Epoch[4](103/145): Loss: 5.6482 || Timer: 0.0046 sec.
===> Epoch[4](104/145): Loss: 5.0151 || Timer: 0.0034 sec.
===> Epoch[4](105/145): Loss: 3.7454 || Timer: 0.0037 sec.
===> Epoch[4](

#Test

In [None]:
from __future__ import print_function
import argparse

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from functools import reduce
from math import log10
import scipy.io as sio
import time
import cv2
import easydict
from google.colab import drive
drive.mount('/content/drive')

opt = easydict.EasyDict({
    "testBatchSize": 1,
    "self_ensemble": False,
    "chop_forward": False,
    "gpu_mode": True,
    "threads": 1,
    "seed": 123,
    "gpus": 1,
    "data_dir":"/content/drive/MyDrive/Colab Notebooks/dataset/",  # test dataset 불러올위치
    "output": "/content/drive/MyDrive/Colab Notebooks/results/", # 결과영상 저장위치
    "model_type": "FC4",
    "residual": False,
    "model": "/content/drive/MyDrive/Colab Notebooks/weight/epoch_1.pth"})  # test에 사용할 weight 불러올 위치 ac040091dff2_epoch_59.pth


device = "cuda"
print(opt)

cuda = opt.gpu_mode
if cuda and not torch.cuda.is_available():
    raise Exception("No GPU found, please run without --cuda")
if not os.path.exists(opt.output):
    os.makedirs(opt.output)

torch.manual_seed(opt.seed)




print('===> Loading datasets')
test_set = DatasetFromFolder(image_dir = opt.data_dir, folder = "test/" , isTrain = False, transform = transform())
testing_data_loader = DataLoader(dataset=test_set, num_workers=opt.threads, batch_size=opt.testBatchSize, shuffle=False)


print('===> Building model')

model = FC4(in_ch=3,out_ch=4)
model = torch.nn.DataParallel(model, device_ids= [0])
model.load_state_dict(torch.load(opt.model, map_location=lambda storage, loc: storage))
print('Pre-trained FC4 model is loaded.')

if cuda:
    model = model.cuda()


def angular_loss(pred, gt):
    pred.cuda()
    gt.cuda()
    a = (pred[:, 0] * gt[:, 0] + pred[:, 1] * gt[:, 1] + pred[:, 2] * gt[:, 2])
    b = pow(torch.abs(pred[:, 0] * pred[:, 0] + pred[:, 1] * pred[:, 1] + pred[:, 2] * pred[:, 2] + 1e-06), 1 / 2)
    c = pow(torch.abs(gt[:, 0] * gt[:, 0] + gt[:, 1] * gt[:, 1] + gt[:, 2] * gt[:, 2] + 1e-06), 1 / 2)

    acos = torch.acos(a / (b * c + 1e-05))
    output = torch.mean(acos)
    output = output * 180 / math.pi

    return output


def eval():
    excel_i=[]
    excel_cc=[]
    angular_list = []
    avg_cc = 0
    model.eval()

    for i,batch in enumerate(testing_data_loader):
        with torch.no_grad():
            rgb,wb_rgb,gt_label,file = Variable(batch[0]), Variable(batch[1]), Variable(batch[2]), batch[3]
        if cuda:
            rgb = rgb.cuda()
            wb_rgb = wb_rgb.cuda()
            gt_label = gt_label.cuda()

        with torch.no_grad():
            pred = model(rgb)


        t0 = time.time()

        cc_loss = angular_loss(pred, gt_label)
        avg_cc += cc_loss

        t1 = time.time()
        print(f'{i} 번째 angular error : {cc_loss}')
        angular_list.append(cc_loss)
    index_min = []
    index_max = []
    '''AE measure'''
    angular_list1 = angular_list[:]
    angular_list.sort()
    onefourth = len(testing_data_loader)//4

    median = angular_list[len(testing_data_loader)//2]
    maxerror,minerror = 0,0
    # min 25% error
    for i in range(onefourth):
        minerror += angular_list[i]
        index_min.append(angular_list1.index(angular_list[i]))
    # max 25% error
    angular_list.sort(reverse=True)
    for i in range(onefourth):
        maxerror += angular_list[i]
        index_max.append(angular_list1.index(angular_list[i]))

    maxerror = maxerror/onefourth
    minerror = minerror/onefourth
    avg_cc = avg_cc / len(testing_data_loader)
    print('median angular error = {}'.format(median))
    print('average angular error = {}'.format(avg_cc))
    print('max 25% angular error = {}'.format(maxerror))
    print('min 25% angular error = {}'.format(minerror))


def save_img(img, img_name):
    save_img = img.squeeze().clamp(0, 1).numpy().transpose(1,2,0)
    # save img
    save_dir=os.path.join(opt.output,opt.test_dataset)
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    save_fn = save_dir +'/'+ img_name
    cv2.imwrite(save_fn, cv2.cvtColor(save_img*255, cv2.COLOR_BGR2RGB),  [cv2.IMWRITE_PNG_COMPRESSION, 0])



##Eval Start!!!!
if __name__ == '__main__':
    eval()

{'testBatchSize': 1, 'self_ensemble': False, 'chop_forward': False, 'gpu_mode': True, 'threads': 1, 'seed': 123, 'gpus': 1, 'data_dir': '/content/drive/MyDrive/cc_dataset/dataset/', 'output': '/content/drive/MyDrive/result/cc/', 'model_type': 'FC4', 'residual': False, 'model': '/content/drive/MyDrive/weights/cc/epoch_10.pth'}
===> Loading datasets
===> Building model
Pre-trained FC4 model is loaded.


KeyboardInterrupt: 