### UNET을 이용한 컬러 복원 처리

### 학습 내용

In [1]:
from keras import models, backend

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
from keras.layers import Input, Conv2D, MaxPooling2D, Dropout, Activation

In [3]:
from keras.layers import UpSampling2D, BatchNormalization, Concatenate

## 1-2 클래스 선언 및 초기화 함수

* ic 는 이미지 행렬에서 어떤 차원에 채널 수가 기록되었는지 저장
* keras는 고차원 라이브러리이다. 
* keras는 실행하는 기본 엔진을 Tensorflow, Theano, CNTK 를 사용한다.
* 기본값은 다음의 값을 갖는다.
```
{
    "image_data_format": "channels_last",
    "epsilon": 1e-07,
    "floatx": "float32",
    "backend": "tensorflow"
}
```

### (QA) channels_last, channels_first의 차이는 무엇일까?

In [12]:
print( backend.image_data_format() ) # channels_last, channels_first 존재
print( backend.epsilon() )
print( backend.floatx() )
print( backend.backend() )

channels_last
1e-07
float32
tensorflow


### (QA) ic = 3 과 ic = 1의 차이는?

### conv 
 * LeNet 필터 이용 (5,5)
 * AlexNet 필터 이용 (11,11), (7,7), (3,3)
 * VGG, ResNet, Xception 필터 이용 (3,3)
 * 배치 정규화와 드롭아웃 계층은 어떻게 하는가에 따라 신경망을 구성하는 형태와 입력 데이터의 종류에 따라 달라짐. 최적의 조합은 경험을 통해 찾는다.

In [17]:
# (QA) ic = 3 과 ic = 1의 차이는?
class UNET(models.Model):   # models.Model을 상속받음.
    def __init__(self, org_shape, n_ch):
        # ic = 3 if backend.image_data_format() == 'channels_last' else 1
        if backend.image_data_format() == 'channels_last':
            ic = 3
        else:
            ic = 1
            
        # UNET 용 합성곱 계층 블록
        # MaxPooling 정의
        # 활성함수 및 Dropout 정의
        def conv(x, n_f, mp_flag=True):
            # 입력 이미지를 (2,2) 단위의 작은 이미지로 나누고 가장 큰 값을 출력
            # mp_flag가 True일때만 동작
            x = MaxPooling2D( (2,2), padding='same')(x) if mp_flag else x # 1/4로 줄게됨.
            
            # Conv2D 합성곱 필터(3,3), 개수 n_f로 지정
            # 활성화 함수 : tanh 로 설정
            # 초기에는 (5,5), (7,7), (11,11)사용했으나, 이후 (3,3)이 성능이 좋아, 
            # 보편적으로 (3,3) 사용됨.
            x = Conv2D(n_f, (3,3), padding='same')(x)
            x = BatchNormalization()(x)
            x = Activation('tanh')(x)
            x = Dropout(0.05)(x)  # 과적합되지 않도록 정규화와 드롭 확률을 5%로 함.
            
            x = Conv2D(n_f, (3,3), padding='same')(x)
            x = BatchNormalization()(x)
            x = Activation('tanh')(x)
            return x
        
        # 역합성곱 계층 블록
        def deconv_unet(x, e, n_f):
            # 들어온 이미지를 지정된 배수만큼 늘린다.
            x = UpSampling2D((2,2))(x)   # 좌우 두배씩 늘리기 
            x = Concatenate(axis=ic)([x,e])  # 두 입력을 결합, 합쳐지는 차원은 ic로 결정. ic는 이미지 채널의 차원
            
            # 첫 번째 합성곱, 드롭아웃이 없다.
            # 이미지 확장단계에서 Dropout이 잘 사용되지 않는 경향이 있다.
            x = Conv2D(n_f, (3,3), padding='same')(x) # 역합성곱 계산
            x = BatchNormalization()(x)
            x = Activation('tanh')(x)
            
            # 두 번째 합성곱
            x = Conv2D(n_f, (3,3), padding='same')(x)
            x = BatchNormalization()(x)
            x = Activation('tanh')(x)
            
            return x
        
        # Input(입력)
        original = Input(shape=org_shape)
        
        #################################
        # Encoding(부호화)
        # 각 계층에 사용된 필터 수 16개, 32개, 64개
        # 반복 횟수, 단계별 사용 필터 수는 하이퍼 파리미터이다.
        # 이미지 데이터는 c1, c2, c3에 저장됨.
        c1 = conv(original, 16, mp_flag=False) # 이미지가 줄지 않았다. mp_flag=True만 동작
        c2 = conv(c1, 32)                      # 1/4로 줄어듬
        
        # Encoder 
        encoded = conv(c2, 64)                 # 1/4로 줄어듬
        #################################
        
        ## ==========================
        ## 복호화 단계
        x = deconv_unet(encoded, c2, 32)     # deconv_unet 호출
        x = deconv_unet(x, c1, 16)           # deconv_unet 호출 - 역합성곱
        
        decoded = Conv2D(n_ch, (3,3), activation='sigmoid', padding='same')(x)
        ## ==========================
        
        super().__init__(original, decoded)  # 부모 클래스의 초기화 함수 호출
        self.compile(optimizer='adadelta', loss='mse')  #  최적화 함수 : Adadelta(), 출력입력오차비교 : mse

## 02. 데이터 준비 

### 사용할 데이터 셋 : CIFAR-10

In [21]:
from keras import datasets, utils

In [25]:
## 확인
(x_train, y_train), (x_test, y_test) = datasets.cifar10.load_data()
print( x_train.shape, x_train.ndim)

(50000, 32, 32, 3) 4


In [28]:
class DATA():
    def __init__(self, in_ch=None):
        (x_train, y_train), (x_test, y_test) = datasets.cifar10.load_data()
        
        if x_train.ndim == 4:  # 4이면 컬러 이미지 
            # n_ch : 채널 수, img_rows: 행 크기, img_cols: 열 크기 
            if backend.image_data_format() == 'channels_first':
                n_ch, img_rows, img_cols = x_train.shape[1:]  
            else:
                img_rows, img_cols, n_ch = x_train.shape[1:]  
        else:  # 흑백 이미지 처리
            img_rows, img_cols = x_train.shape[1:]
            n_ch = 1
        
        in_ch = n_ch if in_ch is None else in_ch  # in_ch가 빈값이면 n_ch, 아니면 in_ch로 넣기
        
        # 인공신경망에 적합한 0~1 사이의 실수로 변환
        x_train = x_train.astype('float32')
        x_test = x_test.astype('float32')
        
        x_train /= 255
        x_test /=255
        
        ## 컬러를 흑백으로 만드는 함수 정의 
        def RGB2Gray(X, fmt):
            if fmt == 'channels_first':
                R = X[:, 0:1]
                G = X[:, 1:2]
                B = X[:, 2:3]
            else:    # 'channels_last'
                R = X[..., 0:1]
                G = X[..., 1:2]
                B = X[..., 2:3]
            return 0.299 * R + 0.587 * G + 0.114 * B  # 컬러 이미지를 흑백으로 변경
        
        def RGB2RG(x_train_out, x_test_out, fmt):
            if fmt=='channels_first':
                x_train_in = x_train_out[: , 0:2]
                x_test_in = x_test_out[:, 0:2]
            else:
                x_train_in = x_train_out[...,0:2]
                x_test_in = x_test_out[..., 0:2]
            return x_train_in, x_test_in
        
        ## 흑백이미지의 차원을 3차원에서 4차원으로 변경
        if backend.image_data_format() == 'channels_first':
            x_train_out = x_train.reshape(x_train.shape[0], n_ch, img_rows, img_cols)
            x_test_out = x_test.reshape(x_test.shape[0], n_ch, img_rows, img_cols)
            input_shape = (in_ch, img_rows, img_cols)
        else:
            x_train_out = x_train.reshape(x_train.shape[0], img_rows, img_cols, n_ch)
            x_test_out = x_test.reshape(x_test.shape[0], img_rows, img_cols, n_ch)
            input_shape = (img_rows, img_cols, in_ch)
                
        ## RGB2Gray() 함수 데이터에 적용
        if in_ch ==1 and in_ch==3:
            x_train_in = RGB2Gray(x_train_out, backend.image_data_format())
            x_test_in = RGB2Gray(x_test_out, backend.image_data_format())
        elif in_ch == 2 and n_ch == 3:
            x_train_in, x_test_in = RGB2RG(x_train_out, x_test_out, 
                                          backend.image_data_format())
        else:
            x_train_in = x_train_out
            x_test_in = x_test_out
            
        self.input_shape = input_shape
        self.x_train_in, self.x_train_out = x_train_in, x_train_out
        self.x_test_in, self.x_test_out = x_test_in, x_test_out
        self.n_ch = n_ch
        self.in_ch = in_ch

### 03. UNET 처리 그래프 그리기

In [32]:
###########################
# UNET 검증
###########################
from keraspp.skeras import plot_loss
import matplotlib.pyplot as plt

In [33]:
###########################
# UNET 동작 확인
###########################
import numpy as np
from sklearn.preprocessing import minmax_scale


def show_images(data, unet):
    x_test_in = data.x_test_in
    x_test_out = data.x_test_out
    decoded_imgs_org = unet.predict(x_test_in)
    decoded_imgs = decoded_imgs_org

    if backend.image_data_format() == 'channels_first':
        print(x_test_out.shape)
        x_test_out = x_test_out.swapaxes(1, 3).swapaxes(1, 2)
        print(x_test_out.shape)
        decoded_imgs = decoded_imgs.swapaxes(1, 3).swapaxes(1, 2)
        if data.in_ch == 1:
            x_test_in = x_test_in[:, 0, ...]
        elif data.in_ch == 2:
            print(x_test_out.shape)
            x_test_in_tmp = np.zeros_like(x_test_out)
            x_test_in = x_test_in.swapaxes(1, 3).swapaxes(1, 2)
            x_test_in_tmp[..., :2] = x_test_in
            x_test_in = x_test_in_tmp
        else:
            x_test_in = x_test_in.swapaxes(1, 3).swapaxes(1, 2)
    else:
        # x_test_in = x_test_in[..., 0]
        if data.in_ch == 1:
            x_test_in = x_test_in[..., 0]
        elif data.in_ch == 2:
            x_test_in_tmp = np.zeros_like(x_test_out)
            x_test_in_tmp[..., :2] = x_test_in
            x_test_in = x_test_in_tmp

    n = 10
    plt.figure(figsize=(20, 6))
    for i in range(n):

        ax = plt.subplot(3, n, i + 1)
        if x_test_in.ndim < 4:
            plt.imshow(x_test_in[i], cmap='gray')
        else:
            plt.imshow(x_test_in[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        ax = plt.subplot(3, n, i + 1 + n)
        plt.imshow(decoded_imgs[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        ax = plt.subplot(3, n, i + 1 + n * 2)
        plt.imshow(x_test_out[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    plt.show()


In [34]:

def main(in_ch=1, epochs=10, batch_size=512, fig=True):
    ###########################
    # 학습 및 확인
    ###########################

    data = DATA(in_ch=in_ch)
    print(data.input_shape, data.x_train_in.shape)
    unet = UNET(data.input_shape, data.n_ch)

    history = unet.fit(data.x_train_in, data.x_train_out,
                       epochs=epochs,
                       batch_size=batch_size,
                       shuffle=True,
                       validation_split=0.2)

    if fig:
        plot_loss(history)
        show_images(data, unet)

In [39]:
__name__ = '__main__'
if __name__ == '__main__':
    import argparse
    from distutils import util

    parser = argparse.ArgumentParser(description='UNET for Cifar-10: Gray to RGB')
    parser.add_argument('--input_channels', type=int, default=1,
                        help='input channels (default: 1)')
    parser.add_argument('--epochs', type=int, default=10,
                        help='training epochs (default: 10)')
    parser.add_argument('--batch_size', type=int, default=512,
                        help='batch size (default: 1000)')
    parser.add_argument('--fig', type=lambda x: bool(util.strtobool(x)),
                        default=True, help='flag to show figures (default: True)')
    args = parser.parse_args()

    print("Aargs:", args)

    print(args.fig)
    main(args.input_channels, args.epochs, args.batch_size, args.fig)

usage: ipykernel_launcher.py [-h] [--input_channels INPUT_CHANNELS]
                             [--epochs EPOCHS] [--batch_size BATCH_SIZE]
                             [--fig FIG]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\WITHJS\AppData\Roaming\jupyter\runtime\kernel-7e1ba0e2-1708-47fd-8714-dafd5d66ea1c.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
