In [None]:
import os

import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from skimage import feature
from skimage import filters
from skimage.io import imread
from skimage.transform import resize
import scipy
from sklearn.model_selection import train_test_split

In [None]:
def upsample(filters, size, norm_type='batchnorm', apply_dropout=False):
    """Upsamples an input.

    Conv2DTranspose => Batchnorm => Dropout => Relu

    Args:
    filters: number of filters
    size: filter size
    norm_type: Normalization type; either 'batchnorm' or 'instancenorm'.
    apply_dropout: If True, adds the dropout layer

    Returns:
    Upsample Sequential Model
    """

    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(
            tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                          padding='same',
                                          kernel_initializer=initializer,
                                          use_bias=False))

    if norm_type.lower() == 'batchnorm':
        result.add(tf.keras.layers.BatchNormalization())
    elif norm_type.lower() == 'instancenorm':
        result.add(InstanceNormalization())

    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))

    result.add(tf.keras.layers.ReLU())

    return result

In [None]:
try:
    from google.colab import drive
    drive.mount('/content/gdrive')
except:
    print(f'Google colab 환경이 아닙니다.')
    pass

In [None]:
!mkdir /tmp/dataset
!tar --directory /tmp/dataset -xvf 

In [None]:
path_root = os.path.abspath(os.path.expanduser('/tmp/dataset/SegmentationClass'))
queue = [path_root]
npy_items = list()
while queue:
    ptr = queue.pop()
    for entry in os.scandir(ptr):
        if entry.name.startswith('.') or not entry.is_file():
            queue.append(entry.path)
        elif entry.name.endswith('.npy'):
            npy_items.append(entry.path)
npy_items.sort()
print(f'Loaded: {len(npy_items)} files')

In [None]:
path_root = os.path.abspath(os.path.expanduser('/tmp/dataset/JPEGImages'))
queue = [path_root]
jpg_items = list()
while queue:
    ptr = queue.pop()
    for entry in os.scandir(ptr):
        if entry.name.startswith('.') or not entry.is_file():
            queue.append(entry.path)
        elif entry.name.endswith('.jpg'):
            jpg_items.append(entry.path)
jpg_items.sort()
print(f'Loaded: {len(jpg_items)} files')

In [None]:
def preprocess(path):
    data = np.load(path)
    data = resize(data, (128, 128), anti_aliasing=False)
    data = np.round(data/np.max(data))
    data[data > 0] = data[data > 0] + 1 # Class label + 1
    edge = filters.sobel(data) # Find edge
    data[edge > 0] = 1 # Set class 1 to edge
    return data

In [None]:
X = list()
y = list()
for train, label in zip(jpg_items, npy_items):
    t = imread(train)
    t = resize(t, (128, 128), anti_aliasing=True)
    X.append(t)
    l = np.expand_dims(preprocess(label), axis=-1)
    y.append(l)

In [None]:
test_size = 0.2
random_state = None
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

In [None]:
BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 100
train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = test_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)

In [None]:
OUTPUT_CHANNELS = 3

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

# Use the activations of these layers
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    'block_16_project',      # 4x4
]
layers = [base_model.get_layer(name).output for name in layer_names]

# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=layers)

down_stack.trainable = False

In [None]:
up_stack = [
    upsample(512, 3),  # 4x4 -> 8x8
    upsample(256, 3),  # 8x8 -> 16x16
    upsample(128, 3),  # 16x16 -> 32x32
    upsample(64, 3),   # 32x32 -> 64x64
]

In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
for image, mask in train_dataset.take(1):
    sample_image, sample_mask = image[0], mask[0]
display([sample_image, sample_mask])

In [None]:
def unet_model(output_channels):
    inputs = tf.keras.layers.Input(shape=[128, 128, 3])
    x = inputs

    # Downsampling through the model
    skips = down_stack(x)
    x = skips[-1]
    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        concat = tf.keras.layers.Concatenate()
        x = concat([x, skip])

    # This is the last layer of the model
    last = tf.keras.layers.Conv2DTranspose(
                   output_channels, 3, strides=2,
                   padding='same')  #64x64 -> 128x128

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
model = unet_model(OUTPUT_CHANNELS)
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
def create_mask(pred_mask):
    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            display([image[0], mask[0], create_mask(pred_mask)])
    else:
        display([x[0], y[0],
                 create_mask(model.predict(x[0][tf.newaxis, ...]))])

In [None]:
show_predictions()

In [None]:
CALLBACKTERM = 10
class DisplayCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        if epoch % CALLBACKTERM == 0:
            show_predictions()
            print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
EPOCHS = 100

# model_history = model.fit(train_dataset, epochs=EPOCHS,
#                           callbacks=[DisplayCallback()])
model_history = model.fit(train_dataset, epochs=EPOCHS,
                          validation_data=test_dataset,
                          callbacks=[DisplayCallback()])

In [None]:
IDX = 15
r = model.predict(np.expand_dims(x[IDX], axis=0))
display([x[IDX], y[IDX], create_mask(r)])