In [None]:
!pip install tensorflow_addons

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow_addons
  Downloading tensorflow_addons-0.17.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 15.5 MB/s 
Installing collected packages: tensorflow-addons
Successfully installed tensorflow-addons-0.17.1


In [None]:
import os
import cv2
import numpy as np
from glob import glob
from scipy.io import loadmat
import matplotlib.pyplot as plt
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir('/content/drive/MyDrive/Colab Notebooks/collage/segmentation')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
rs = 1234
# 기존 deeplab v3 + 에서 사용했던 이미지 사이즈를 그대로 사용
IMAGE_SIZE = 512
# colab pro + 에서 구동할 수 있는 최대의 배치 사이즈 선택
BATCH_SIZE = 8
# human level 만을 binary로 예측하기때문에 1
NUM_CLASSES = 1

# 여러 실험을 적용하기 위한 랜덤시드 고정
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
seed_everything(rs)

In [None]:
DATA_DIR = "/content/drive/MyDrive/Colab Notebooks/collage/segmentation/data/rawdata/"

# 50000여개의 이미지 + 마스크쌍을 가지고옴
all_images = sorted(glob(os.path.join(DATA_DIR, "humanparsing/val/Images/*"))+glob(os.path.join(DATA_DIR, "humanparsing/train/Images/*"))+glob(os.path.join(DATA_DIR, "fashion/JPEGImages/*")))
all_masks = sorted(glob(os.path.join(DATA_DIR, "humanparsing/val/Human/*"))+glob(os.path.join(DATA_DIR, "humanparsing/train/Human/*"))+glob(os.path.join(DATA_DIR, "fashion/SegmentationClassAug/*")))

# validation size 5000으로 설정 실험 재현읋 위한 random_state 고정
from sklearn.model_selection import train_test_split
val_size = 5000
train_images, val_images, train_masks, val_masks  = train_test_split(all_images, all_masks, test_size=val_size, random_state=rs)

In [None]:
import tensorflow_addons as tfa

def read_image(image_path, mask=False):
  '''
  하나의 이미지 경로에 대한 이미지를 불러오기위한 함수
  image_path : str / 불러올 이미지 경로
  mask : bool / 일반이미지의 경우 정규화를 하고 마스크 이미지의 경우 정교화를 하지않음
  '''
  image = tf.io.read_file(image_path)
  image = tf.image.decode_png(image, channels=3)
  image.set_shape([None, None, 3])
  image = tf.image.resize(images=image, size=[IMAGE_SIZE, IMAGE_SIZE])
  if not mask:
      image = image / 255
  return image

def load_data(image_list, mask_list):
  '''
  image : str / 불러올 이미지 경로
  mask : str / 불러올 마스크 경로
  '''
  image = read_image(image_list)
  mask = read_image(mask_list, mask=True)
  return image, mask

def random_crop(image, mask, size):
  '''
  이미지와 마스크 쌍을 인풋하면 각각 같은 크기와 위치를 크롭하여 리턴해주는 함수

  image : array / 이미지
  mask : array / 마스크
  '''
  stacked_image = tf.stack([image, mask], axis=0)
  cropped_image = tf.image.random_crop(
      stacked_image, size=[2, size, size, 3])
  img,msk = cropped_image[0], cropped_image[1]
  return img,msk

def augment(input_image, input_mask):
  '''
  확률적으로 여러 augmentation을 두 쌍으로 적용하는 함수

  input_image : array / 이미지
  input_mask : array / 마스크
  '''
  # 랜덤 크로핑 후 다시 512x512로 리사이즈
  if tf.random.uniform(()) > 0.3:
      ratio = tf.random.uniform(shape=[],minval=400, maxval=440, dtype=tf.int32)
      input_image, input_mask = random_crop(input_image, input_mask, ratio)
      input_image = tf.image.resize(input_image, (IMAGE_SIZE, IMAGE_SIZE))
      input_mask = tf.image.resize(input_mask, (IMAGE_SIZE, IMAGE_SIZE))
  # 이미지에 대해 밝기 조절
  input_image = tf.image.random_brightness(input_image, 0.2)
  # 색상 대비 조절
  saturation_factor = tf.random.uniform((),0,2)
  input_image = tf.image.adjust_saturation(input_image, saturation_factor)
  # 좌우반전
  if tf.random.uniform(()) > 0.5:
      input_image = tf.image.flip_left_right(input_image)
      input_mask = tf.image.flip_left_right(input_mask)
  # 회전
  rot_factor = tf.cast(tf.random.uniform(shape=[],minval=-6, maxval=6, dtype=tf.int32), tf.float32)
  angle = np.pi/(12*6) *rot_factor
  input_image = tfa.image.rotate(input_image, angle)
  input_mask = tfa.image.rotate(input_mask, angle)

  return input_image, input_mask

def cast_mask(input_image, input_mask):
  '''
  rgb 채널로 구성되어있는 마스킹 정보를 1채널의 정보로 압축하는 함수 인물에 해당하는 픽셀은 1 아닌 픽셀은 0
  이미지는 그대로 리턴

  input_image : array / 이미지
  input_mask : array / 마스크
  '''
  input_mask = tf.math.reduce_sum(input_mask, axis=2)
  input_mask = input_mask[..., tf.newaxis]
  input_mask = tf.cast(input_mask>0, dtype=tf.float32)
  return input_image,input_mask

def data_generator(image_list, mask_list, augument=True):
  '''
  데이터 제네레이터 정의

  image_list : list / 이미지들의 경로가 담긴 리스트
  mask_list : list / 마스크들의 경로가 담긴 리스트
  '''
  dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
  dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
  if augument:
    dataset = dataset.map(augment)
  dataset = dataset.map(cast_mask)
  dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
  return dataset

# train set에만 agument 적용 
train_dataset = data_generator(train_images, train_masks)
print("Train Dataset:", train_dataset)

val_dataset = data_generator(val_images, val_masks, augument=False)
print("Validation Dataset:", val_dataset)

Train Dataset: <BatchDataset element_spec=(TensorSpec(shape=(8, 512, 512, 3), dtype=tf.float32, name=None), TensorSpec(shape=(8, 512, 512, 1), dtype=tf.float32, name=None))>
Validation Dataset: <BatchDataset element_spec=(TensorSpec(shape=(8, 512, 512, 3), dtype=tf.float32, name=None), TensorSpec(shape=(8, 512, 512, 1), dtype=tf.float32, name=None))>


![image](https://user-images.githubusercontent.com/102151612/188038684-cd2e6f89-f01c-4cc2-842e-b3aa3340dbf2.png)


In [None]:
def conv_block_2times(x, num_filters):
  '''
  conv -> batch nomalization -> relu 를 두번 반복
  x : input
  num_filters : 필터의 개수
  '''
  # 2번반복
  for _ in range(2):
    x = layers.Conv2D(num_filters, 3, strides=1, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
  return x

def encoder_block(x, num_filters):
  '''
  conv -> batch nomalization -> relu 를 두번 반복 후 피처맵의 크기를 1/2로 줄임
  숏컷 구조를 위해 풀링이후의 피처맵과 이전의 피처맵 모두 리턴
  x : input
  num_filters : 필터의 개수
  '''
  x = conv_block_2times(x, num_filters)
  p = layers.MaxPooling2D((2,2))(x)
  return x, p 

def decoder_block(x, skip, num_filters):
  '''
  conv -> batch nomalization -> relu 를 두번 반복 후 피처맵의 크기를 1/2로 줄임
  숏컷 구조를 위해 풀링이후의 피처맵과 이전의 피처맵 모두 리턴
  x : input
  skip : concat할 encoder에서의 같은 사이즈의 피처맵
  num_filters : 필터의 개수
  '''
  x = layers.Conv2DTranspose(num_filters, (2,2), strides=2, padding='same')(x)
  x = layers.Concatenate(axis=-1)([x, skip])
  x = conv_block_2times(x, num_filters)
  return x

def Unet(image_size, num_classes):
  '''
  Unet의 구조를 본땀
  image_size : 이미지 사이즈
  num_classes : 픽셀별 예측할 클래스의 수
  '''

  inputs = keras.Input(shape=(image_size, image_size, 3))

  x1, p1 = encoder_block(inputs, 64)
  x2, p2 = encoder_block(p1, 128)
  x3, p3 = encoder_block(p2, 256)
  x4, p4 = encoder_block(p3, 512)

  center = conv_block_2times(p4,1024)

  d1 = decoder_block(center, x4, 512)
  d2 = decoder_block(d1, x3, 256)
  d3 = decoder_block(d2, x2, 128)
  d4 = decoder_block(d3, x1, 64)   

  model_output = layers.Conv2D(num_classes, kernel_size=(1, 1), padding="same", activation='sigmoid')(d4)
  # Define the model
  return keras.Model(inputs, model_output)

In [None]:
model = Unet(image_size=IMAGE_SIZE, num_classes=NUM_CLASSES)
loss = keras.losses.BinaryCrossentropy()
model.compile(
    optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9, nesterov=True),
    loss=loss,
    metrics=["accuracy", tf.keras.metrics.BinaryIoU(),],
)

In [None]:
from keras import backend
from keras.utils import io_utils
from tensorflow.python.platform import tf_logging as logging


class CustomSaver(keras.callbacks.Callback):
  def __init__(self, save_path, save_name, frequency):
    self.save_path = save_path
    self.save_name = save_name
    self.frequency = frequency

  def on_epoch_end(self, epoch, logs={}):
      if (epoch+1) % self.frequency == 0 :
          name = self.save_path + self.save_name + f'{epoch+1:03d}.h5'
          self.model.save(name)
          io_utils.print_msg(
                    f'\nEpoch {epoch + 1}:'
                    f'saving model to {name}')
          
model_path, model_name = './model_weight/UNet/', 'UNet_epoch_'
model_path_best = './model_weight/UNet.h5'
csv_name = './model_weight/UNet.csv'

# 5 epoch 마다 모델을 저장하도록함 
custom_saver = CustomSaver(model_path, model_name ,frequency=5)
# 위에서 정의한 loss가 줄지않으면 backbone을 unfreeze 하고 lr을 0.01 -> 0.001로 바꾸어 계속 학습을 이어나감 / lr decay도 계속해서 적용
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, verbose=1, mode='min', min_delta=1e-4, cooldown=0, min_lr=0.0001,)
# 가장 좋았던 모델은 따로 저장
save_best = tf.keras.callbacks.ModelCheckpoint(model_path_best, monitor='val_loss', verbose=0, save_best_only=True,)
# 모델의 로그를 기록
write_log = tf.keras.callbacks.CSVLogger(csv_name, separator=',', append=True)

cb_list = [custom_saver, reduce_lr, save_best, write_log]

In [None]:
history = model.fit(train_dataset, validation_data=val_dataset,
                    callbacks=cb_list, epochs=100)