# 使用U-Net进行语意分割

# 程式参考来源：
https://keras.io/examples/vision/oxford_pets_image_segmentation/

## 下载资料
- 原图：http://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz
- 注解：http://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz

## 载入相关套件

In [1]:
# 载入相关套件
from tensorflow import keras
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras import layers
import PIL
from PIL import ImageOps
import numpy as np
import os
from IPython.display import Image, display

## 取得原图及目标图遮罩(Mask)的档案路径

In [2]:
# 训练资料集路径
root_path = "F:/0_DataMining/0_MY/Keras/ImageSegmentData/"
input_dir = root_path + "images/"               # 原图目录位置
target_dir = root_path + "annotations/trimaps/" # 遮罩图(Mask)目录位置

# 超参数设定
img_size = (160, 160) # 图像宽高
num_classes = 4       # 类别个数
batch_size = 32       # 训练批量

# 取得所有图档案路径
input_img_paths = sorted(
    [
        os.path.join(input_dir, fname)
        for fname in os.listdir(input_dir)
        if fname.endswith(".jpg")
    ]
)

# 取得所有遮罩图档案路径
target_img_paths = sorted(
    [
        os.path.join(target_dir, fname)
        for fname in os.listdir(target_dir)
        if fname.endswith(".png") and not fname.startswith(".")
    ]
)
print("样本数:", len(input_img_paths))

FileNotFoundError: [WinError 3] 系統找不到指定的路徑。: 'F:/0_DataMining/0_MY/Keras/ImageSegmentData/images/'

In [3]:
# 显示前10笔资料
for input_path, target_path in zip(input_img_paths[:10], target_img_paths[:10]):
    print(input_path, "|", target_path)

NameError: name 'input_img_paths' is not defined

## 显示其中一张图

In [None]:
# 显示第10张图
print(input_img_paths[9])
display(Image(filename=input_img_paths[9]))

# 调整对比，将最深的颜色当作黑色(0)，最浅的颜色当作白色(255)
print(target_img_paths[9])
img = PIL.ImageOps.autocontrast(load_img(target_img_paths[9]))
display(img)

## 建立图像的 Iterator，一次传回一批影像

In [None]:
# 建立图像的 Iterator
class OxfordPets(keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths):
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_img_paths = input_img_paths
        self.target_img_paths = target_img_paths

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size

    def __getitem__(self, idx):
        """Returns tuple (input, target) correspond to batch #idx."""
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        x = np.zeros((batch_size,) + self.img_size + (3,), dtype="float32")
        for j, path in enumerate(batch_input_img_paths):
            img = load_img(path, target_size=self.img_size)
            x[j] = img
        y = np.zeros((batch_size,) + self.img_size + (1,), dtype="uint8")
        for j, path in enumerate(batch_target_img_paths):
            img = load_img(path, target_size=self.img_size, color_mode="grayscale")
            y[j] = np.expand_dims(img, 2)
        return x, y

## 建立 U-Net 模型

In [None]:
def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))

    # 编码器
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    previous_block_activation = x  # Set aside residual

    # 除了特征图大小，三个区块均相同
    for filters in [64, 128, 256]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # 残差层(residual)
        residual = layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # 解码器
    for filters in [256, 128, 64, 32]:
        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.UpSampling2D(2)(x)

        # 残差层(residual)
        residual = layers.UpSampling2D(2)(previous_block_activation)
        residual = layers.Conv2D(filters, 1, padding="same")(residual)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # per-pixel 卷积
    outputs = layers.Conv2D(num_classes, 3, activation="softmax", 
                            padding="same")(x)

    model = keras.Model(inputs, outputs)
    return model

In [None]:
# 释放记忆体，以防执行多次造成记忆体的占用
keras.backend.clear_session()

# 建立模型
model = get_model(img_size, num_classes)
model.summary()

# 绘制模型结构

In [None]:
import tensorflow as tf
tf.keras.utils.plot_model(model, to_file='Unet_model.png')

In [None]:
# 图片来源：https://arxiv.org/pdf/1505.04597.pdf
from IPython.display import Image
Image('./images/U-Net.png', width=500)

## 将资料切割为训练及验证资料

In [None]:
import random

# Split our img paths into a training and a validation set
val_samples = 1000
random.Random(1337).shuffle(input_img_paths)
random.Random(1337).shuffle(target_img_paths)
train_input_img_paths = input_img_paths[:-val_samples]
train_target_img_paths = target_img_paths[:-val_samples]
val_input_img_paths = input_img_paths[-val_samples:]
val_target_img_paths = target_img_paths[-val_samples:]

# Instantiate data Sequences for each split
train_gen = OxfordPets(
    batch_size, img_size, train_input_img_paths, train_target_img_paths
)
val_gen = OxfordPets(batch_size, img_size, val_input_img_paths, 
                     val_target_img_paths)

## 训练模型

In [None]:
# 设定优化器(optimizer)、损失函数(loss)、效能衡量指标(metrics)的类别
model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy")

# 设定检查点 callbacks，模型存档
callbacks = [
    keras.callbacks.ModelCheckpoint("oxford_segmentation.h5", save_best_only=True)
]

# 训练 15 周期(epoch)
epochs = 15
model.fit(train_gen, epochs=epochs, validation_data=val_gen, callbacks=callbacks)

## 预测

In [None]:
# 预测所有验证资料
val_gen = OxfordPets(batch_size, img_size, val_input_img_paths, 
                     val_target_img_paths)
val_preds = model.predict(val_gen)

# 显示遮罩(mask)
def display_mask(i):
    """Quick utility to display a model's prediction."""
    mask = np.argmax(val_preds[i], axis=-1)
    mask = np.expand_dims(mask, axis=-1)
    img = PIL.ImageOps.autocontrast(keras.preprocessing.image.array_to_img(mask))
    display(img)

# 显示验证资料第11个图档
i = 10
# 显示原图
print('原图：')
display(Image(filename=val_input_img_paths[i]))

# 显示原图遮罩(mask)
print('原遮罩图：')
img = PIL.ImageOps.autocontrast(load_img(val_target_img_paths[i]))
display(img)

# 显示预测结果
print('预测结果：')
display_mask(i)  # Note that the model only sees inputs at 150x150.