In [1]:
"""
    Import package
"""
import os
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import pandas as pd
from torch.utils.data import DataLoader, Dataset
import time
import pandas
from torchsummary import summary
import torch.nn.functional as F
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
"""
    Read Image
"""
def readfile(path, label=None, imgRow=256, imgCol=256, datasize=10000):
    # label 是 csv file 在 ./ 下的名稱，有提供則回傳 y
    image_dir = sorted(os.listdir(path))
    i = 0
    count = 0
    eachFolder = datasize // 1000 #so each folder 10 or 100 images
    x = np.zeros((datasize, imgRow, imgCol, 1), dtype=np.uint8)
    y = np.zeros((datasize, imgRow, imgCol, 2), dtype=np.uint8)
    
    
    for folders in tqdm(image_dir):
        imgPath = os.path.join(path, folders)
        for file in os.listdir(imgPath):
            img = cv2.imread(os.path.join(imgPath, file))
            #plt.imshow(img)
            #plt.show()
            #return
            
            # 0≤L≤100 , −127≤a≤127, −127≤b≤127
            # 8-bit images: L←L∗255/100,a←a+128,b←b+128
            img = cv2.resize(img,(imgCol, imgRow))
            lab_image = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) 
            l_channel, a_channel, b_channel = cv2.split(lab_image)
            x[i, :, :, 0] = l_channel # (x, y)
            y[i, :, :, 0] = a_channel
            y[i, :, :, 1] = b_channel
            i += 1
            count += 1
            if(count == eachFolder):
                count = 0
                break
    return x, y
    
def valAndTest(path, label=None, imgRow=256, imgCol=256, datasize=2000):
    # label 是 csv file 在 ./ 下的名稱，有提供則回傳 y
    image_dir = sorted(os.listdir(path))
    i = 0
    j = 0
    count = 0
    eachFolder = datasize // 1000 #so each folder 10 or 100 images
    xVal = np.zeros((datasize, imgRow, imgCol, 1), dtype=np.uint8)
    yVal = np.zeros((datasize, imgRow, imgCol, 2), dtype=np.uint8)
    xTest= np.zeros((datasize, imgRow, imgCol, 1), dtype=np.uint8)
    yTest= np.zeros((datasize, imgRow, imgCol, 2), dtype=np.uint8)
    
    for file in image_dir:
        img = cv2.imread(os.path.join(path, file))
        img = cv2.resize(img,(imgCol, imgRow))
        lab_image = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) 
        l_channel, a_channel, b_channel = cv2.split(lab_image)
        
        if(count < datasize): # val
            xVal[i, :, :, 0]    = l_channel
            yVal[i, :, :, 0] = a_channel
            yVal[i, :, :, 1] = b_channel
            i += 1
        else: # test
            xTest[j, :, :, 0]    = l_channel
            yTest[j, :, :, 0] = a_channel
            yTest[j, :, :, 1] = b_channel
            j += 1
        count += 1
        
        if(count == datasize*2):
            break
    
            
    return xVal, yVal, xTest, yTest


In [None]:

#分別將 training set、validation set、testing set 用 readfile 函式讀進來
workspace_dir = 'D:\Downloads\ILSVRC\Data\CLS-LOC'
print("Reading data")
train_x, train_y = readfile(os.path.join(workspace_dir, "train"), "train", imgRow=256, imgCol=256, datasize=100000)
print("Size of training data = {}".format(len(train_x)))

val_x, val_y, test_x, test_y = valAndTest(os.path.join(workspace_dir, "val"), "dev", imgRow=256, imgCol=256, datasize=5000)
print("Size of validation data = {}".format(len(val_x)))
print("Size of testing data = {}".format(len(test_x)))

In [None]:
L = train_x[101]
realAB = train_y[101]
realImg = (np.concatenate((L, realAB), axis=2)).astype(np.uint8)
# opencv 用 datatype 判斷圖片
realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2RGB)
plt.imshow(realImg)
plt.show()

L = train_x[99000]
realAB = train_y[99000]
realImg = (np.concatenate((L, realAB), axis=2)).astype(np.uint8)
# opencv 用 datatype 判斷圖片
realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2RGB)
plt.imshow(realImg)
plt.show()

L = val_x[4999]
realAB = val_y[4999]
realImg = (np.concatenate((L, realAB), axis=2)).astype(np.uint8)
# opencv 用 datatype 判斷圖片
realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2RGB)
plt.imshow(realImg)
plt.show()

L = test_x[4999]
realAB = test_y[4999]
realImg = (np.concatenate((L, realAB), axis=2)).astype(np.uint8)
# opencv 用 datatype 判斷圖片
realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2RGB)
plt.imshow(realImg)
plt.show()

In [None]:
import torchvision.transforms.functional as TF
import random

fineSize = 176 # 因為 /4 = 44

class ImgDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def transform(self, image, mask):
        #transforms.ToPILImage(),
        toPIL = transforms.ToPILImage()
        image = toPIL(image)
        mask = toPIL(mask)
        
        # Resize
        resize = transforms.Resize(size=(fineSize, fineSize))
        image = resize(image)
        mask = resize(mask)

        # Random crop
        i, j, h, w = transforms.RandomCrop.get_params(
            image, output_size=(fineSize, fineSize))
        image = TF.crop(image, i, j, h, w)
        mask = TF.crop(mask, i, j, h, w)

        # Random horizontal flipping
        if random.random() > 0.5:
            image = TF.hflip(image)
            mask = TF.hflip(mask)

        # Random vertical flipping
        if random.random() > 0.5:
            image = TF.vflip(image)
            mask = TF.vflip(mask)

        # Transform to tensor
        image = TF.to_tensor(image)
        mask = TF.to_tensor(mask)
        return image, mask
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        X, Y = self.transform(self.x[index], self.y[index])
        return X, Y

"""
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomChoice([transforms.RandomResizedCrop(fineSize, interpolation=1),
                            transforms.RandomResizedCrop(fineSize, interpolation=2),
                            transforms.RandomResizedCrop(fineSize, interpolation=3)]),
    transforms.RandomHorizontalFlip(), #隨機將圖片水平翻轉
    transforms.RandomVerticalFlip(), #隨機將圖片垂直翻轉
    transforms.ToTensor(), #將圖片轉成 Tensor，並把數值normalize到[0,1](data normalization)
])
#testing & val 時不需做 data augmentation
val_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomChoice([transforms.RandomResizedCrop(fineSize, interpolation=1),
                            transforms.RandomResizedCrop(fineSize, interpolation=2),
                            transforms.RandomResizedCrop(fineSize, interpolation=3)]),
    transforms.ToTensor(),
])
"""
#會有 X 與 Y transform 不一致的問題
#https://discuss.pytorch.org/t/torchvision-transfors-how-to-perform-identical-transform-on-both-image-and-target/10606/6

In [None]:
train_set = ImgDataset(train_x, train_y)
val_set = ImgDataset(val_x, val_y)

batch_size = 24
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
#num_workers= ? #如果 load data 特別處理，可能有 preprocessing。幫助平行化
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False) #windows 只能 run 0
#https://github.com/pytorch/pytorch/issues/2341

In [35]:
"""
    Model
"""
# input 維度 [3, 176, 176]
# nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')
# nn.MaxPool2d(kernel_size, stride, padding)
# nn.ReLU(inplace=False) #true 可以些微降低記憶體使用量 (如果不會造成問題的話)
# nn.LeakyReLU
# nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride=1, padding=0, output_padding=0, groups=1, bias=True, dilation=1, padding_mode='zeros')
# nn.BatchNorm2d(channels)
# nn.functional.interpolate(input, size=None, scale_factor=None, mode='nearest', align_corners=None, recompute_scale_factor=None)

# Resnet, ResNeXt
#https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=True, dilation=dilation)
    #In a large model, removing the bias inputs makes very little difference 
    #https://stackoverflow.com/questions/51959507/does-bias-in-the-convolutional-layer-really-make-a-difference-to-the-test-accura


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=True)
    
    
  
"""
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    ...
    之後需要增加 channel 的時候再對 input 做 1x1 conv 
    (0): BasicBlock(
      (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (downsample): Sequential(
        (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
        (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
"""
# pre-activation (resnet v2)
# ResnetBlock(64, 128, downsample=True) -> 128
# ResnetBlock(128, 128, isThree=True) -> 128
class ResnetBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, norm_layer=None, downsample=False, isThree=False):
        super(ResnetBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
            
        self.bn1 = norm_layer(inplanes) #64
        self.conv1 = conv3x3(inplanes, planes, stride) #64 -> 128
        self.bn2 = norm_layer(planes)
        self.conv2 = conv3x3(planes, planes) #128 -> 128
        self.bn3 = norm_layer(planes)
        self.conv3 = conv3x3(planes, planes) 
        self.relu = nn.ReLU(inplace=True)
        self.down = conv1x1(inplanes, planes) #64->128
        
        self.downsample = downsample
        self.isThree = isThree
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.bn1(x)
        out = self.relu(out)
        out = self.conv1(out)

        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)
        
        if(self.isThree):
            out = self.bn3(out)
            out = self.relu(out)
            out = self.conv3(out)
        
        if self.downsample:
            identity = self.down(x)

        out += identity

        return out

"""
(0)Bottleneck(
    (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
    (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
)
之後正常
(1): Bottleneck(
    (conv1): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
    (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
)

"""
#ResNextBasicBlock(64, downsample=True) -> 128
#ResNextBasicBlock(128) -> 128
class ResNextBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes=0, stride=1, groups=32, dilation=1, norm_layer=None, downsample=False):
        super(ResNextBasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        
        width = inplanes
        
        if downsample:
            width *= 2
        
        self.conv1 = conv3x3(inplanes, width*2, groups=groups) #64 -> 256
        self.bn1 = norm_layer(width*2)
        self.conv2 = conv3x3(width*2, width) #256->128
        self.bn2 = norm_layer(width)
        
        self.relu = nn.ReLU(inplace=True)
        self.down = conv1x1(inplanes, width) #64->128
        
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        
        if self.downsample:
            identity = self.down(x)
        
        out += identity
        out = self.relu(out)
        return out

#ResNextBlock(64, 64, downsample=True) -> 128
#ResNextBlock(128, 64) -> 128
class ResNextBlock(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, groups=32, dilation=1, norm_layer=None, downsample=False):
        super(ResNextBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = planes
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv1x1(inplanes, width) #64->64
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation) #64->64
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, width * 2) #64->128
        self.bn3 = norm_layer(width * 2)
        self.relu = nn.ReLU(inplace=True)
        self.down = conv1x1(inplanes, width * 2) #64->128
        
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)
        
        if self.downsample:
            identity = self.down(x)

        out += identity
        out = self.relu(out)
        return out
    


In [36]:
class resNeXtUnet(nn.Module):
    def __init__(self, in_channels, out_channels, classification=True):
        super(resNeXtUnet, self).__init__()
        self.classification = classification
        
        self.conv1 = nn.Sequential(
            conv3x3(in_channels, 64), # 4 -> raw 1 channel + rgb userinput 3 channels
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            conv3x3(64, 64),
        )
        
        #downsampling
        self.conv2 = ResNextBasicBlock(64, 128, downsample=True)
        
        #downsampling
        #ResNextBasicBlock(64, downsample=True) -> 128
        #ResNextBasicBlock(128) -> 128
        self.conv3 = ResNextBlock(128, 128, downsample=True)
        
         #downsampling
        self.conv4 = ResNextBlock(256, 256, downsample=True)
        
        self.conv5 = ResNextBlock(512, 256)
        
        self.conv6 = ResNextBlock(512, 256)
        
        self.conv7 = ResNextBlock(512, 256)
        
        #upsampling
        self.up8 = nn.Sequential(
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv8 = ResNextBlock(256, 128)
        
        #upsampling
        self.up9 = nn.Sequential(
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv9 = ResNextBasicBlock(128)
        
        #upsampling
        self.up10 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 128, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv1to10 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            conv3x3(64, 128)
        )
        self.conv10 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            conv3x3(128, 128)
        )
        
        # classification output
        self.model_class= nn.Conv2d(256, 529, kernel_size=1, padding=0, dilation=1, stride=1, bias=True)
        
        # regression output
        self.model_out = nn.Sequential(
            nn.Conv2d(128, 2, kernel_size=1, padding=0, dilation=1, stride=1, bias=True), #128 -> 2
            nn.Sigmoid(), # 0~1
        )
        
        
    def forward(self, x):
        # 1, 10 not touch
        conv1_2 = self.conv1(x)
        conv2_2 = self.conv2(conv1_2[:,:,::2,::2])
        conv3_3 = self.conv3(conv2_2[:,:,::2,::2])
        
        conv4_3 = self.conv4(conv3_3[:,:,::2,::2])
        conv5_3 = self.conv5(conv4_3)
        conv6_3 = self.conv6(conv5_3)
        conv7_3 = self.conv7(conv6_3)
        
        #前面的都 res 了
        conv8_up = self.up8(conv7_3)
        conv8_3 = self.conv8(conv8_up + conv3_3) + (conv8_up + conv3_3)

        if(self.classification):
            out_class = self.model_class(conv8_3)

            conv9_up = self.up9(conv8_3.detach())
            conv9_3 = self.conv9(conv9_up + conv2_2.detach()) + (conv9_up + conv2_2.detach())

            conv10_up = self.up10(conv9_3) + self.conv1to10(conv1_2.detach())
            conv10_2 = self.conv10(conv10_up)
            out_reg = self.model_out(conv10_2)
        else:
            out_class = self.model_class(conv8_3.detach())

            conv9_up = self.up9(conv8_3)
            conv9_3 = self.conv9(conv9_up + conv2_2) + (conv9_up + conv2_2)

            conv10_up = self.up10(conv9_3) + self.conv1to10(conv1_2)
            conv10_2 = self.conv10(conv10_up)
            out_reg = self.model_out(conv10_2)

        return (out_class, out_reg)

In [37]:
model = resNeXtUnet(in_channels=1, out_channels=2).cuda()
print( summary(model, (1, 176, 176)) )

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 176, 176]             640
       BatchNorm2d-2         [-1, 64, 176, 176]             128
              ReLU-3         [-1, 64, 176, 176]               0
            Conv2d-4         [-1, 64, 176, 176]          36,928
            Conv2d-5          [-1, 256, 88, 88]           4,864
       BatchNorm2d-6          [-1, 256, 88, 88]             512
              ReLU-7          [-1, 256, 88, 88]               0
            Conv2d-8          [-1, 128, 88, 88]         295,040
       BatchNorm2d-9          [-1, 128, 88, 88]             256
             ReLU-10          [-1, 128, 88, 88]               0
           Conv2d-11          [-1, 128, 88, 88]           8,320
             ReLU-12          [-1, 128, 88, 88]               0
ResNextBasicBlock-13          [-1, 128, 88, 88]               0
           Conv2d-14          [-1, 128,

In [9]:
class resUnet(nn.Module):
    def __init__(self, in_channels, out_channels, classification=True):
        super(resUnet, self).__init__()
        self.classification = classification
        
        self.conv1 = nn.Sequential(
            conv3x3(in_channels, 64), # 4 -> raw 1 channel + rgb userinput 3 channels
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            conv3x3(64, 64),
        )
        
        #downsampling
        self.conv2 = ResnetBlock(64, 128, downsample=True)
        
        #downsampling
        self.conv3 = ResnetBlock(128, 256, downsample=True, isThree=True)
        
         #downsampling
        self.conv4 = ResnetBlock(256, 512, downsample=True, isThree=True)
        
        self.conv5 = ResnetBlock(512, 512, isThree=True)
        
        self.conv6 = ResnetBlock(512, 512, isThree=True)
        
        self.conv7 = ResnetBlock(512, 512, isThree=True)
        
        #upsampling
        self.up8 = nn.Sequential(
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv3to8 = nn.Sequential(
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            conv3x3(256, 256)
        )
        self.conv8 = nn.Sequential(
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            conv3x3(256, 256),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            conv3x3(256, 256),
        )
        
        #upsampling
        self.up9 = nn.Sequential(
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv2to9 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            conv3x3(128, 128)
        )
        self.conv9 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            conv3x3(128, 128),
        )
        
        #upsampling
        self.up10 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 128, kernel_size=4, stride=2, padding=1, bias=True)
        )
        
        self.conv1to10 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            conv3x3(64, 128)
        )
        self.conv10 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            conv3x3(128, 128),
            nn.LeakyReLU(negative_slope=.2),
        )
        
        # classification output
        self.model_class= nn.Conv2d(256, 529, kernel_size=1, padding=0, dilation=1, stride=1, bias=True)
        
        # regression output
        self.model_out = nn.Sequential(
            nn.Conv2d(128, 2, kernel_size=1, padding=0, dilation=1, stride=1, bias=True), #128 -> 2
            nn.Sigmoid(), # 0~1
        )
        
        
    def forward(self, x):
        # 1, 10 not touch
        conv1_2 = self.conv1(x)
        conv2_2 = self.conv2(conv1_2[:,:,::2,::2])
        conv3_3 = self.conv3(conv2_2[:,:,::2,::2])
        
        conv4_3 = self.conv4(conv3_3[:,:,::2,::2])
        conv5_3 = self.conv5(conv4_3)
        conv6_3 = self.conv6(conv5_3)
        conv7_3 = self.conv7(conv6_3)
        
        #前面的都 res 了
        conv8_up = self.up8(conv7_3) + self.conv3to8(conv3_3)
        conv8_3 = self.conv8(conv8_up) + conv8_up

        if(self.classification):
            out_class = self.model_class(conv8_3)

            conv9_up = self.up9(conv8_3.detach()) + self.conv2to9(conv2_2.detach())
            conv9_3 = self.conv9(conv9_up) + conv9_up

            conv10_up = self.up10(conv9_3) + self.conv1to10(conv1_2.detach())
            conv10_2 = self.conv10(conv10_up)
            out_reg = self.model_out(conv10_2)
        else:
            out_class = self.model_class(conv8_3.detach())

            conv9_up = self.up9(conv8_3) + self.conv2to9(conv2_2)
            conv9_3 = self.conv9(conv9_up) + conv9_up

            conv10_up = self.up10(conv9_3) + self.conv1to10(conv1_2)
            conv10_2 = self.conv10(conv10_up)
            out_reg = self.model_out(conv10_2)

        return (out_class, out_reg)

In [11]:
"""
    Utility
"""
def calculate_psnr_np(img1, img2): # for uint8 image
    SE_map = (1.*img1-img2)**2
    cur_MSE = np.mean(SE_map)
    return 20*np.log10(255./np.sqrt(cur_MSE))


def calculate_psnr_torch(img1, img2):
    SE_map = (1.*img1-img2)**2
    cur_MSE = torch.mean(SE_map)
    return 20*torch.log10(1./torch.sqrt(cur_MSE))

# the ab space is divided into 10 × 10 bins, Q = 313
def encode_ab_ind(data_ab, ab_norm=220, ab_quant=10):
    # Encode ab value into an index
    # INPUTS
    #   data_ab   Nx2xHxW \in [0,1]
    # OUTPUTS
    #   data_q    Nx1xHxW \in [0,Q)
    
    #no need ab max, because origin [-1, 1] need shift to [0,~]
    
    #變回 -110~110 -> 0~220 -> 0~22 (共 23) 所以才有 23*23 = 529
    #A = 2*110/10 + 1 = 23
    
    data_ab_rs = torch.round((data_ab*ab_norm)/ab_quant) # normalized bin number
    #opt.A = 2 * opt.ab_max / opt.ab_quant + 1
    data_q = data_ab_rs[:,[0],:,:]* (ab_norm / ab_quant + 1)  + data_ab_rs[:,[1],:,:]
    return data_q


def decode_ind_ab(data_q, opt):
    # Decode index into ab value
    # INPUTS
    #   data_q      Nx1xHxW \in [0,Q)
    # OUTPUTS
    #   data_ab     Nx2xHxW \in [-1,1]

    data_a = data_q/opt.A
    data_b = data_q - data_a*opt.A
    data_ab = torch.cat((data_a,data_b),dim=1)

    if(data_q.is_cuda):
        type_out = torch.cuda.FloatTensor
    else:
        type_out = torch.FloatTensor
    data_ab = ((data_ab.type(type_out)*opt.ab_quant) - opt.ab_max)/opt.ab_norm

    return data_ab


## Loss
# 自己定義 loss 
# https://discuss.pytorch.org/t/custom-loss-functions/29387
# loss = nn.CrossEntropyLoss() # 如果是 classification task
def my_loss(output, target):
    mask = torch.zeros_like(output)
    mann = torch.abs(output-target)
    eucl = .5 * (mann**2)
    mask[...] = mann < self.delta # < delta 的會 = 1

    # loss = eucl*mask + self.delta*(mann-.5*self.delta)*(1-mask)
    # 前半 < delta，後半 > delta
    loss = eucl*mask/self.delta + (mann-.5*self.delta)*(1-mask)
    return torch.sum(loss,dim=1,keepdim=True)


class HuberLoss(nn.Module):
    def __init__(self, delta=.01):
        super(HuberLoss, self).__init__()
        self.delta=delta

    def __call__(self, in0, in1):
        mask = torch.zeros_like(in0)
        mann = torch.abs(in0-in1)
        eucl = .5 * (mann**2)
        mask[...] = mann < self.delta # < delta 的會 = 1

        # loss = eucl*mask + self.delta*(mann-.5*self.delta)*(1-mask)
        # 前半 < delta，後半 > delta
        loss = eucl*mask/self.delta + (mann-.5*self.delta)*(1-mask)
        return torch.sum(loss,dim=1,keepdim=True)
"""
class L1Loss(nn.Module):
    def __init__(self):
        super(L1Loss, self).__init__()

    def __call__(self, in0, in1):
        return torch.sum(torch.abs(in0-in1),dim=1,keepdim=True)
"""
    
#loss = nn.SmoothL1Loss(size_average=None, reduce=None, reduction='sum')
lossL1 = nn.L1Loss(size_average=None, reduce=None, reduction='mean') #or 'sum'
lossCE = nn.CrossEntropyLoss()

In [None]:
"""
    Training
"""
#training 時做 data augmentation
# 官方說明
# https://pytorch.org/docs/stable/torchvision/transforms.html#module-torchvision.transforms.functional
# 中文說明(有解釋)
# https://wizardforcel.gitbooks.io/learn-dl-with-pytorch-liaoxingyu/4.7.1.html

#model = Classifier().cuda()

localPath = "./stage1/"

## Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
# optimizer 必須要吃到 model 的參數
#據說 SGD+momentum test 表現好
#optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

num_epoch = 100
best_val_loss = 1e+9

train_loss_history = []
val_loss_history = []

for epoch in range(num_epoch):
    epoch_start_time = time.time()
    iter_data_time = time.time()
    train_acc = 0.0
    train_loss = 0.0
    val_acc = 0.0
    val_loss = 0.0
    
    model.train() # 確保 model 是在 train model (開啟 Dropout 等...)
    for i, data in enumerate(tqdm(train_loader)):
        iter_start_time = time.time()
        # time to load data
        t_data = iter_start_time - iter_data_time

        # 確保歸零，不然會累積
        optimizer.zero_grad() # 用 optimizer 將 model 參數的 gradient 歸零
        pred_class, train_pred = model(data[0].cuda()) # 利用 model 得到預測的機率分佈 這邊實際上就是去呼叫 model 的 forward 函數
        #print(train_pred.size(), data[1].size())
        
        real_B_enc = encode_ab_ind(data[1][:, :, ::4, ::4])
        
        # 計算 loss （注意 prediction 跟 label 必須同時在 CPU 或是 GPU 上）
        loss_L1_reg = lossL1(train_pred.type(torch.cuda.FloatTensor), data[1].type(torch.cuda.FloatTensor).cuda())
        loss_CE = lossCE(pred_class.type(torch.cuda.FloatTensor), real_B_enc[:, 0, :, :].type(torch.cuda.LongTensor).cuda())
        batch_loss = loss_L1_reg + loss_CE
        batch_loss.backward() # 利用 back propagation 算出每個參數的 gradient
        optimizer.step() # 以 optimizer 用 gradient 更新參數值

        train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
        train_loss += batch_loss.item()
    
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(val_loader):
            pred_class, val_pred = model(data[0].cuda())
            #batch_loss = loss(val_pred, data[1].cuda())
            real_B_enc = encode_ab_ind(data[1][:, :, ::4, ::4])
            loss_L1_reg = lossL1(val_pred.type(torch.cuda.FloatTensor), data[1].type(torch.cuda.FloatTensor).cuda())
            loss_CE = lossCE(pred_class.type(torch.cuda.FloatTensor), real_B_enc[:, 0, :, :].type(torch.cuda.LongTensor).cuda())
            batch_loss = loss_L1_reg + loss_CE
            if(i == 1):
                L = data[0][6].cpu().numpy().transpose(1, 2, 0)
                realAB = data[1][6].cpu().numpy().transpose(1, 2, 0)
                predAB = val_pred[6].cpu().numpy().transpose(1, 2, 0)
                realImg = (np.concatenate((L, realAB), axis=2) * 255).astype(np.uint8)
                fakeImg = (np.concatenate((L, predAB), axis=2) * 255).astype(np.uint8)
                # opencv 用 datatype 判斷圖片
                realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2BGR)
                fakeImg = cv2.cvtColor(fakeImg, cv2.COLOR_LAB2BGR) 
                #print(realImg)
                #plt.imshow(realImg)
                #plt.show()
                cv2.imwrite(localPath + str(epoch + 1) + "_real" + ".png"  , realImg)
                cv2.imwrite(localPath + str(epoch + 1) + "_fake" + ".png"  , fakeImg)
            val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            val_loss += batch_loss.item()
        
        # 只 save parameters (建議的方式)
        torch.save(model.state_dict(), localPath + str(epoch + 1) +  "_stage3_res_model")
        
        #if val_loss < best_val_loss:
        #    best_val_loss = val_loss
        #    torch.save(model, "best_model")
        #將結果 print 出來
        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f | time to load: %2.2f' % \
            (epoch + 1, num_epoch, time.time()-epoch_start_time, \
             train_acc/train_set.__len__(), train_loss/train_set.__len__(), val_acc/val_set.__len__(), val_loss/val_set.__len__(), t_data))
        
        train_loss_history.append(train_loss/train_set.__len__())
        val_loss_history.append(val_loss/val_set.__len__())
        

In [None]:
plt.plot(range(len(train_loss_history)), train_loss_history)
plt.plot(range(len(val_loss_history)), val_loss_history)

In [None]:
np.save(localPath + "res_stage2_train_loss", np.array(train_loss_history))
np.save(localPath + "res_stage2_val_loss", np.array(val_loss_history))

In [None]:
# train regression
PATH = "./100_res_model"

optimizer = torch.optim.Adam(model.parameters(), lr=0.00001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
niter = 15
model = resUnet(in_channels=1, out_channels=2, classification=False).cuda()
model.load_state_dict(torch.load(PATH))


for epoch in range(num_epoch):
    epoch_start_time = time.time()
    iter_data_time = time.time()
    train_acc = 0.0
    train_loss = 0.0
    val_acc = 0.0
    val_loss = 0.0
    
    model.train() # 確保 model 是在 train model (開啟 Dropout 等...)
    for i, data in enumerate(tqdm(train_loader)):
        iter_start_time = time.time()
        # time to load data
        t_data = iter_start_time - iter_data_time

        # 確保歸零，不然會累積
        optimizer.zero_grad() # 用 optimizer 將 model 參數的 gradient 歸零
        pred_class, train_pred = model(data[0].cuda()) # 利用 model 得到預測的機率分佈 這邊實際上就是去呼叫 model 的 forward 函數
        #print(train_pred.size(), data[1].size())
        
        real_B_enc = encode_ab_ind(data[1][:, :, ::4, ::4])
        
        # 計算 loss （注意 prediction 跟 label 必須同時在 CPU 或是 GPU 上）
        loss_L1_reg = lossL1(train_pred.type(torch.cuda.FloatTensor), data[1].type(torch.cuda.FloatTensor).cuda())
        loss_CE = lossCE(pred_class.type(torch.cuda.FloatTensor), real_B_enc[:, 0, :, :].type(torch.cuda.LongTensor).cuda())
        # 自動 one-hot encoding
        batch_loss = loss_L1_reg + loss_CE
        batch_loss.backward() # 利用 back propagation 算出每個參數的 gradient
        optimizer.step() # 以 optimizer 用 gradient 更新參數值

        train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
        train_loss += batch_loss.item()
    
    model.eval() #model 變成測試模式
    with torch.no_grad():
        for i, data in enumerate(val_loader):
            val_pred = model(data[0].cuda())
            #batch_loss = loss(val_pred, data[1].cuda())
            real_B_enc = encode_ab_ind(data[1][:, :, ::4, ::4])
            loss_L1_reg = lossL1(val_pred.type(torch.cuda.FloatTensor), data[1].type(torch.cuda.FloatTensor).cuda())
            loss_CE = lossCE(pred_class.type(torch.cuda.FloatTensor), real_B_enc[:, 0, :, :].type(torch.cuda.LongTensor).cuda())
            batch_loss = loss_L1_reg + loss_CE
            
            if(i == 1):
                L = data[0][6].cpu().numpy().transpose(1, 2, 0)
                realAB = data[1][6].cpu().numpy().transpose(1, 2, 0)
                predAB = val_pred[6].cpu().numpy().transpose(1, 2, 0)
                realImg = (np.concatenate((L, realAB), axis=2) * 255).astype(np.uint8)
                fakeImg = (np.concatenate((L, predAB), axis=2) * 255).astype(np.uint8)
                # opencv 用 datatype 判斷圖片
                realImg = cv2.cvtColor(realImg, cv2.COLOR_LAB2BGR)
                fakeImg = cv2.cvtColor(fakeImg, cv2.COLOR_LAB2BGR) 
                #print(realImg)
                #plt.imshow(realImg)
                #plt.show()
                cv2.imwrite(str(epoch + 1) + "_real" + ".png"  , realImg)
                cv2.imwrite(str(epoch + 1) + "_fake" + ".png"  , fakeImg)
            val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            val_loss += batch_loss.item()
        
        # 只 save parameters (建議的方式)
        torch.save(model.state_dict(), str(epoch + 1) +  "_stage2_res_model")
        
        #if val_loss < best_val_loss:
        #    best_val_loss = val_loss
        #    torch.save(model, "best_model")
        #將結果 print 出來
        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f | time to load: %2.2f' % \
            (epoch + 1, num_epoch, time.time()-epoch_start_time, \
             train_acc/train_set.__len__(), train_loss/train_set.__len__(), val_acc/val_set.__len__(), val_loss/val_set.__len__(), t_data))
        
        train_loss_history.append(train_loss/train_set.__len__())
        val_loss_history.append(val_loss/val_set.__len__())