In [1]:
# 외부 데이터 불러오기 및 관리 
import os
from glob import glob

# 이미지 데이터 처리 
import numpy as np  # 행렬 및 수학
from PIL import Image  # 이미지 처리
import cv2
import matplotlib.pyplot as plt  # 시각화

# Framework
import tensorflow as tf
from tensorflow.keras import layers  # layer 구현
from tensorflow.keras.preprocessing.image import ImageDataGenerator  # 전처리
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K

%matplotlib inline

In [2]:
label_paths = glob('VOC2007/SegmentationClass/*.png')

lbl_path = label_paths[0]

label = np.array(Image.open(lbl_path))
label = np.where(label==255, 0, label)

heights = []
widths = []

unique_classes = []
for path in label_paths:
    label = np.array(Image.open(path))
    for lbl in np.unique(label):
        if lbl not in unique_classes:
            unique_classes.append(lbl)
    if label.shape[0] not in heights:
        heights.append(label.shape[0])
    if label.shape[1] not in widths:
        widths.append(label.shape[1])

In [3]:
unique_classes.sort()
unique_classes

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 255]

In [4]:
np.mean(heights), np.mean(widths)

(343.8333333333333, 381.9230769230769)

# Hyperparameter

In [5]:
batch_size = 8
num_epochs = 10
learning_rate = 0.001
drop_rate = 0.7

num_classes = 21
input_shape = (32, 32, 3)

# Data Preprocess

In [6]:
image_paths = glob('VOC2007/JPEGImages/*.jpg')
label_paths = glob('VOC2007/SegmentationClass/*.png')

test_rate = 0.8
train_images = image_paths[:int(test_rate * len(image_paths))]
test_images = image_paths[int(test_rate * len(image_paths)):]

train_labels = label_paths[:int(test_rate * len(label_paths))]
test_labels = label_paths[int(test_rate * len(label_paths)):]

In [7]:
len(train_images), len(train_labels)  # 파일 갯수가 달라서 없는 건 제거 할 것임

(4008, 337)

In [8]:
label_path = train_labels[0]

image_path = label_path.replace("SegmentationClass", "JPEGImages").replace("png", "jpg")
if image_path not in train_images:
    print(image_path)  # 없음이 확인

In [9]:
# Function to Load Data (image, label)
def get_label(label_path):
    label_pil = Image.open(label_path)
    label = np.array(label_pil)
    label = np.where(label==255, 0, label)  # 255는 필요없어서 제거 
    
    # Resize Label
    label = cv2.resize(label, input_shape[:2])

    label_onehot = to_categorical(label, num_classes)

    return label_onehot  # shape: (281, 500, 21)

def get_data(label_path):
    image_path = label_path.replace("SegmentationClass", "JPEGImages").replace("png", "jpg")
    image_pil = Image.open(image_path)
    image = np.array(image_pil)
    
    label = get_label(path)
    
    # Resize image
    image = cv2.resize(image, input_shape[:2])

    return image.astype(np.float32)/255, label.astype(np.float32)

In [10]:
# batch dataset
def make_batch(batch_paths):
    batch_images = []
    batch_labels = []

    for path in batch_paths:
        image, label = get_data(path)
        batch_images.append(image)
        batch_labels.append(label)

    batch_images = np.array(batch_images)
    batch_labels = np.array(batch_labels)
    
    return batch_images, batch_labels

In [11]:
def data_gen(data_paths, is_training=True):
    global_step = 0
    steps_per_epoch = len(data_paths) // batch_size
    while True:
        step = global_step % steps_per_epoch
        if step == 0:
            np.random.shuffle(data_paths)        
        images, labels = make_batch(data_paths[step*batch_size: (step+1)*batch_size])
        global_step += 1
        yield images, labels

# Modeling

In [12]:
# Build UNet
inputs = layers.Input(input_shape)
conv1 = layers.Conv2D(64, 3, padding = 'same', kernel_initializer = 'he_normal')(inputs)
conv1 = layers.BatchNormalization()(conv1)
conv1 = layers.Conv2D(64, 3, padding = 'same', kernel_initializer = 'he_normal')(conv1)
conv1 = layers.BatchNormalization()(conv1)
conv1 = layers.Activation("relu")(conv1)
pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = layers.Conv2D(128, 3, padding = 'same', kernel_initializer = 'he_normal')(pool1)
conv2 = layers.BatchNormalization()(conv2)
conv2 = layers.Activation("relu")(conv2)
conv2 = layers.Conv2D(128, 3, padding = 'same', kernel_initializer = 'he_normal')(conv2)
conv2 = layers.BatchNormalization()(conv2)
conv2 = layers.Activation("relu")(conv2)
pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = layers.Conv2D(256, 3, padding = 'same', kernel_initializer = 'he_normal')(pool2)
conv3 = layers.BatchNormalization()(conv3)
conv3 = layers.Activation("relu")(conv3)
conv3 = layers.Conv2D(256, 3, padding = 'same', kernel_initializer = 'he_normal')(conv3)
conv3 = layers.BatchNormalization()(conv3)
conv3 = layers.Activation("relu")(conv3)
pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)
conv4 = layers.Conv2D(512, 3, padding = 'same', kernel_initializer = 'he_normal')(pool3)
conv4 = layers.BatchNormalization()(conv4)
conv4 = layers.Activation("relu")(conv4)
conv4 = layers.Conv2D(512, 3, padding = 'same', kernel_initializer = 'he_normal')(conv4)
conv4 = layers.BatchNormalization()(conv4)
conv4 = layers.Activation("relu")(conv4)
drop4 = layers.Dropout(drop_rate)(conv4)
pool4 = layers.MaxPooling2D(pool_size=(2, 2))(drop4)

conv5 = layers.Conv2D(1024, 3, padding = 'same', kernel_initializer = 'he_normal')(pool4)
conv5 = layers.BatchNormalization()(conv5)
conv5 = layers.Activation("relu")(conv5)
conv5 = layers.Conv2D(1024, 3, padding = 'same', kernel_initializer = 'he_normal')(conv5)
conv5 = layers.BatchNormalization()(conv5)
conv5 = layers.Activation("relu")(conv5)
drop5 = layers.Dropout(drop_rate)(conv5)

up6 = layers.Conv2DTranspose(1024, 2, padding='same', strides=(2, 2))(drop5)
merge6 = layers.concatenate([drop4, up6], axis=3)
conv6 = layers.Conv2D(512, 3, padding = 'same', kernel_initializer = 'he_normal')(merge6)
conv6 = layers.BatchNormalization()(conv6)
conv6 = layers.Activation("relu")(conv6)
conv6 = layers.Conv2D(512, 3, padding = 'same', kernel_initializer = 'he_normal')(conv6)
conv6 = layers.BatchNormalization()(conv6)
conv6 = layers.Activation("relu")(conv6)

up7 = layers.Conv2DTranspose(512, 2, padding='same', strides=(2, 2))(conv6)
merge7 = layers.concatenate([conv3, up7], axis=3)
conv7 = layers.Conv2D(256, 3, padding = 'same', kernel_initializer = 'he_normal')(merge7)
conv7 = layers.BatchNormalization()(conv7)
conv7 = layers.Activation("relu")(conv7)
conv7 = layers.Conv2D(256, 3, padding = 'same', kernel_initializer = 'he_normal')(conv7)
conv7 = layers.BatchNormalization()(conv7)
conv7 = layers.Activation("relu")(conv7)

up8 = layers.Conv2DTranspose(256, 2, padding='same', strides=(2, 2))(conv7)
merge8 = layers.concatenate([conv2, up8], axis=3)
conv8 = layers.Conv2D(128, 3, padding = 'same', kernel_initializer = 'he_normal')(merge8)
conv8 = layers.BatchNormalization()(conv8)
conv8 = layers.Activation("relu")(conv8)
conv8 = layers.Conv2D(128, 3, padding = 'same', kernel_initializer = 'he_normal')(conv8)
conv8 = layers.BatchNormalization()(conv8)
conv8 = layers.Activation("relu")(conv8)

up9 = layers.Conv2DTranspose(128, 2, padding='same', strides=(2, 2))(conv8)
merge9 = layers.concatenate([conv1, up9], axis=3)
conv9 = layers.Conv2D(64, 3, padding = 'same', kernel_initializer = 'he_normal')(merge9)
conv9 = layers.BatchNormalization()(conv9)
conv9 = layers.Activation("relu")(conv9)
conv9 = layers.Conv2D(64, 3, padding = 'same', kernel_initializer = 'he_normal')(conv9)
conv9 = layers.BatchNormalization()(conv9)
conv9 = layers.Activation("relu")(conv9)
conv9 = layers.Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
conv10 = layers.Conv2D(1, 1, activation = 'sigmoid')(conv9)

model = tf.keras.Model(inputs=inputs, outputs=conv10)

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


# Optimization

In [20]:
# Dice Coefficient
def precision(y_true, y_pred):
    axes = tuple(range(1, len(y_pred.shape)-1))
    
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)), axes)
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)), axes)
    precision = true_positives / (predicted_positives + K.epsilon())
    
    return K.mean(precision)


def recall(y_true, y_pred):
    axes = tuple(range(1, len(y_pred.shape)-1))
    
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)), axes)
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)), axes)
    recall = true_positives / (possible_positives + K.epsilon())
    
    return K.mean(recall)

def dice(y_true, y_pred):
    epsilon=1e-6
    axes = tuple(range(1, len(y_pred.shape)-1)) 
    numerator = 2. * K.sum(y_pred * y_true, axes)
    denominator = K.sum(K.square(y_pred) + K.square(y_true), axes)
    
    return K.mean(numerator / (denominator + epsilon))

def soft_dice_loss(y_true, y_pred):
    epsilon=1e-6
    axes = tuple(range(1, len(y_pred.shape)-1)) 
    numerator = 2. * K.sum(y_pred * y_true, axes)
    denominator = K.sum(K.square(y_pred) + K.square(y_true), axes)
    
    return 1 - K.mean(numerator / (denominator + epsilon))

In [21]:
log_dir = os.path.join(os.getcwd(), 'logs')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir)
]

In [22]:
model.compile(loss=soft_dice_loss, 
              optimizer=tf.keras.optimizers.Adam(learning_rate),
              metrics=['accuracy'])

# Train

In [23]:
steps_per_epoch = len(train_labels) // batch_size

model.fit_generator(generator=data_gen(train_labels), 
                   steps_per_epoch=steps_per_epoch,
                   epochs=num_epochs,
                   callbacks=callbacks,
                   verbose=1)

Instructions for updating:
Use tf.cast instead.
Epoch 1/10

KeyboardInterrupt: 