# 마스크 유넷 학습시키기
![image](https://miro.medium.com/v2/resize:fit:1400/1*qNdglJ1ORP3Gq77MmBLhHQ.png)

In [None]:
# !tar -xf /content/drive/MyDrive/LaPa.tar

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, CSVLogger

import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf

In [None]:
# 컨볼루션 함수
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

# 맥스풀링 함수
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

# 업컨볼루션 함수
def decoder_block(inputs, skip, num_filters):
    # 일반적인 컨볼루션의 반대 방향으로 진행 up-conv
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(
        inputs)  # 스트라이드가 2라서 크기가 2배로 증가 32->64
    x = Concatenate()([x, skip])  # [64x64 512, 64x64 512] -> 64x64 1024
    x = conv_block(x, num_filters)  # 64x64, 1024 -> 64x64, 512
    return x


def build_unet(input_shape, num_classes):
    inputs = Input(input_shape)  # 512x512 3를 받아서 케라스텐서로 변경해줌

    s1, p1 = encoder_block(inputs, 64)  # 512, 256
    s2, p2 = encoder_block(p1, 128)  # 256, 128
    s3, p3 = encoder_block(p2, 256)  # 128,64
    s4, p4 = encoder_block(p3, 512)  # 64, 32

    # 플래튼 시키면 위치정보를 잃어버리니까 컨블럭에 입력
    b1 = conv_block(p4, 1024)  # 32x32 1024

    # 이전형태, 저장해둔 위치정보, 채널수
    d1 = decoder_block(b1, s4, 512)  # 64x64 512
    d2 = decoder_block(d1, s3, 256)  # 128x128 256
    d3 = decoder_block(d2, s2, 128)  # 256x256 128
    d4 = decoder_block(d3, s1, 64)  # 512x512 64

    # 마지막은 클래스 수, 커널은 1, 멀티클래스니까 softmax
    outputs = Conv2D(num_classes, 1, padding="same", activation="softmax")(d4)

    model = Model(inputs, outputs)  # 모델 인스턴스 생성
    return model


In [None]:
global image_h
global image_w
global num_classes
global classes
global rgb_codes

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load_dataset(path):
    train_x = sorted(glob(os.path.join(path, "train", "images", "*.jpg")))
    train_y = sorted(glob(os.path.join(path, "train", "labels", "*.png")))

    valid_x = sorted(glob(os.path.join(path, "val", "images", "*.jpg")))
    valid_y = sorted(glob(os.path.join(path, "val", "labels", "*.png")))

    test_x = sorted(glob(os.path.join(path, "test", "images", "*.jpg")))
    test_y = sorted(glob(os.path.join(path, "test", "labels", "*.png")))

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

def read_image_mask(x, y):

    x = cv2.imread(x, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (image_w, image_h))
    x = x/255.0
    x = x.astype(np.float32)

    y = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
    y = cv2.resize(y, (image_w, image_h))
    y = y.astype(np.int32) # 마스크는 이미 정규화되어있음

    return x, y

def preprocess(x, y):
    def f(x, y):
        x = x.decode() # 바이트에서 스트링으로 디코딩
        y = y.decode()
        return read_image_mask(x, y)

    # 파이썬 함수를 텐서플로우 함수의 연산으로 래핑, 텐서 x, y를 받아서 넘파이 작업하고 텐서로 돌려줌
    image, mask = tf.numpy_function(f, [x, y], [tf.float32, tf.int32])
    mask = tf.one_hot(mask, num_classes)

    image.set_shape([image_h, image_w, 3])
    mask.set_shape([image_h, image_w, num_classes])

    return image, mask

def tf_dataset(X, Y, batch=8):
    ds = tf.data.Dataset.from_tensor_slices((X, Y))
    ds = ds.shuffle(buffer_size=5000).map(preprocess) # 첫 5000개부터 가져와서 전처리처리하고 셔플함
    ds = ds.batch(batch).prefetch(2) # 데이터 로딩을 줄이기 위해서 배치 2개를 미리 가져옴
    return ds

In [None]:
# 시드
np.random.seed(42)
tf.random.set_seed(42)

# 폴더 만들어서 모델 저장
create_dir("files")

# 파라미터
image_h = 512
image_w = 512
num_classes = 11
input_shape = (image_h, image_w, 3)
batch_size = 8
lr = 1e-4 # 0.0001
num_epochs = 1

# 경로 지정
dataset_path = "/content/LaPa"
model_path = os.path.join("files", "model.h5")
csv_path = os.path.join("files", "data.csv")

# RGB 코드와 그에 맞는 클래스
rgb_codes = [
    [0, 0, 0], [0, 153, 255], [102, 255, 153], [0, 204, 153],
    [255, 255, 102], [255, 255, 204], [255, 153, 0], [255, 102, 255],
    [102, 0, 51], [255, 204, 255], [255, 0, 102]
]

# 11개 클래스
classes = [
    "background", "skin", "left eyebrow", "right eyebrow",
    "left eye", "right eye", "nose", "upper lip", "inner mouth",
    "lower lip", "hair"
]

# 데이터셋 로딩하기
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)
print(f"Train: {len(train_x)}/{len(train_y)} - Valid: {len(valid_x)}/{len(valid_y)} - Test: {len(test_x)}/{len(test_x)}")
print("")
train_ds = tf_dataset(train_x, train_y, batch=batch_size)
valid_ds = tf_dataset(valid_x, valid_y, batch=batch_size)

Train: 18168/18168 - Valid: 2000/2000 - Test: 2000/2000



In [None]:
# 모델 빌드하기
model = build_unet(input_shape, num_classes)
model.compile(
    loss="categorical_crossentropy",
    optimizer=tf.keras.optimizers.Adam(lr)
)
model.summary()

Model: "model_5"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_6 (InputLayer)           [(None, 512, 512, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_95 (Conv2D)             (None, 512, 512, 64  1792        ['input_6[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization_90 (BatchN  (None, 512, 512, 64  256        ['conv2d_95[0][0]']              
 ormalization)                  )                                                           

In [None]:
callbacks = [
    ModelCheckpoint(model_path, verbose=1, save_best_only=True, monitor='val_loss'),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
    CSVLogger(csv_path, append=True),
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
]

model.fit(train_ds,
    validation_data=valid_ds,
    epochs=num_epochs,
    callbacks=callbacks
)

Epoch 1: val_loss improved from inf to 0.32670, saving model to files/model.h5


<keras.callbacks.History at 0x7b435162f070>

- 시간이 오래걸려서 에포크1만 돌렸습니다.
- 실제로 학습된 모델은 5시간정도 학습을 시켰습니다. 에포크는 50정도에서 얼리스탑하였습니다.
