# 图像分割

本文的图像分割任务, 使用一个修改后的 <a href="https://lmb.informatik.uni-freiburg.de/people/ronneber/u-net/" class="external">U-Net</a>.

## 什么是图像分割？

在图像分类任务中，网络为每个输入图像分配一个标签（或类）。但是，假设您想知道该对象的形状，哪个像素属于哪个对象等。在这种情况下，你需要为图像的每个像素指定一个类，这项任务称为分割。分割模型返回关于图像的更详细信息。图像分割在医学成像、自动驾驶汽车和卫星成像等领域有许多应用。

本文使用 [Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/) ([Parkhi et al, 2012](https://www.robots.ox.ac.uk/~vgg/publications/2012/parkhi12a/parkhi12a.pdf))。该数据集由37个宠物品种的图像组成，每个品种有200张图像（训练和测试中每个约100张）。每个图像都包含相应的标签和像素级遮罩。遮罩是每个像素的类标签。每个像素都有三种类别：

- 类别1：属于宠物的像素。
- 类别2：宠物周围的像素。
- 类别3：没有宠物之上及周围的像素。

In [1]:
# !pip install git+https://github.com/tensorflow/examples.git

In [2]:
import tensorflow as tf
import tensorflow_datasets as tfds

In [3]:
from tensorflow_examples.models.pix2pix import pix2pix
from IPython.display import clear_output
import matplotlib.pyplot as plt

In [5]:
gpu = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpu[0], True)

## 下载 Oxford-IIIT Pets 数据集

这个数据集可以从 [TensorFlow 数据集](https://www.tensorflow.org/datasets/catalog/oxford_iiit_pet)中获得。分割掩码包含在版本3+中。

In [None]:
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True, data_dir='../data/', download=False)

此外，图像颜色值被规范化为`[0, 1]`范围。最后，如上所述，分割掩码中的像素标记为`{1, 2, 3}`。为了方便起见，从分段掩码中减去1，得到的标签为：`{0, 1, 2}`。

In [None]:
def normalize(input_image, input_mask):
    input_image = tf.cast(input_image, tf.float32) / 255.0
    input_mask -= 1
    return input_image, input_mask

In [None]:
def load_image(datapoint):
    input_image = tf.image.resize(datapoint['image'], (128, 128))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))
    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

数据集已包含所需的训练和测试拆分，因此请继续使用相同的拆分：

In [None]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

In [None]:
train_images = dataset['train'].map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_images = dataset['test'].map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

下面的类通过随机翻转图像来执行简单的增强。转到[图像增强](data_augmentation.ipynb)教程了解更多信息。

In [None]:
class Augment(tf.keras.layers.Layer):
    def __init__(self, seed=42):
        super().__init__()
        # 两者都使用相同的种子，因此它们将进行相同的随机更改。
        self.augment_inputs = tf.keras.layers.experimental.preprocessing.RandomFlip(mode="horizontal", seed=seed)
        self.augment_labels = tf.keras.layers.experimental.preprocessing.RandomFlip(mode="horizontal", seed=seed)
  
    def call(self, inputs, labels):
        inputs = self.augment_inputs(inputs)
        labels = self.augment_labels(labels)
        return inputs, labels

构建输入管道，在批处理输入后应用增强：

In [None]:
train_batches = (
    train_images
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .repeat()
    .map(Augment())
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE))

test_batches = test_images.batch(BATCH_SIZE)

可视化数据集中的图像示例及其对应的掩码：

In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))
    title = ['Input Image', 'True Mask', 'Predicted Mask']
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
for images, masks in train_batches.take(2):
    sample_image, sample_mask = images[0], masks[0]
    display([sample_image, sample_mask])

## 定义模型
此处使用的模型是修改过的[U-Net](https://arxiv.org/abs/1505.04597)。U-Net由编码器（下采样器）和解码器（上采样器）组成。为了学习强大的功能并减少可训练参数的数量，使用预处理模型-[MobileNetV2](https://arxiv.org/abs/1801.04381)-作为编码器。对于解码器，你将使用upsample块，它已经在[pix2pix](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py)中实现。

如前所述，编码器是经过预处理的MobileNetV2型号。您将使用`tf.keras.applications`中的模型。编码器由模型中间层的特定输出组成。请注意，编码器在训练过程中不会被训练。

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

# 使用这些层的激活
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    'block_16_project',      # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

# 创建特征提取模型
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)
down_stack.trainable = False

解码器/上采样器只是在TensorFlow示例中实现的一系列上采样块：

In [None]:
up_stack = [
    pix2pix.upsample(512, 3),  # 4x4 -> 8x8
    pix2pix.upsample(256, 3),  # 8x8 -> 16x16
    pix2pix.upsample(128, 3),  # 16x16 -> 32x32
    pix2pix.upsample(64, 3),   # 32x32 -> 64x64
]

In [None]:
def unet_model(output_channels: int):
    inputs = tf.keras.layers.Input(shape=[128, 128, 3])

    # 通过模型进行下采样
    skips = down_stack(inputs)
    x = skips[-1]
    skips = reversed(skips[:-1])

    # 上采样和建立跳跃连接
    for up, skip in zip(up_stack, skips):
        x = up(x)
        concat = tf.keras.layers.Concatenate()
        x = concat([x, skip])

    # 这是模型的最后一层
    last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same')  #64x64 -> 128x128

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

请注意，最后一层上的过滤器数量设置为`output_channels`的数量。这将是每类一个输出通道。

## 训练模型

现在，只剩下编译和训练模型了。

由于这是一个多类分类问题，请使用`tf.keras.losses.CategorialCrossentry`损失函数，`from_logits`参数设置为`True`，因为标签是标量整数，而不是每个类中每个像素的得分向量。

运行推断时，分配给像素的标签是具有最高值的通道。这就是`create_mask`函数的作用。

In [None]:
OUTPUT_CLASSES = 3

model = unet_model(output_channels=OUTPUT_CLASSES)
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

绘制生成的模型体系结构：

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True, to_file='../imgs/unet.png')

在训练之前，尝试检查该模型的预测的内容：

In [None]:
def create_mask(pred_mask):
    pred_mask = tf.math.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            display([image[0], mask[0], create_mask(pred_mask)])
    else:
        display([sample_image, sample_mask, create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

下面定义的回调用于观察模型在训练期间的改进情况：

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        clear_output(wait=True)
        show_predictions()
        print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples // BATCH_SIZE // VAL_SUBSPLITS

model_history = model.fit(train_batches, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_batches,
                          callbacks=[DisplayCallback()])

In [None]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

plt.figure()
plt.plot(model_history.epoch, loss, 'r-', label='Training loss')
plt.plot(model_history.epoch, val_loss, 'bo--', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

## 做出预测

现在，做一些预测。为了节省时间，样本数保持较小，但您可以将其设置得更高以获得更准确的结果。

In [None]:
show_predictions(test_batches, 3)

## 可选：不平衡类和类权重

语义分割数据集可能是高度不平衡的，这意味着特定类别的像素在图像内部的表现可能比其他类别的像素更多。由于分割问题可以按照像素分类问题来处理，因此可以通过加权损失函数来解决不平衡问题。这是一种简单而优雅的方法来处理这个问题。

为了[避免歧义](https://github.com/keras-team/keras/issues/3653#issuecomment-243939748), `model.fit`不支持3+维输入的`classweight`参数。

In [None]:
try:
    model_history = model.fit(train_batches, epochs=EPOCHS,
                            steps_per_epoch=STEPS_PER_EPOCH,
                            class_weight = {0:2.0, 1:2.0, 2:1.0})
    assert False
except Exception as e:
    print(f"Expected {type(e).__name__}: {e}")

因此，在这种情况下，您需要自己实现权重。您将使用样本权重：除了`(data, label)` pairs，`Model.fit`还接受`(data, label, sampleweight)`三元组。

Keras `model.fit`将`sample_weight`传播到loss和metrics，后者也接受`sample_weight`参数。样本重量在缩减步骤之前乘以样本值。例如：

In [None]:
label = [0,0]
prediction = [[-3., 0], [-3, 0]] 
sample_weight = [1, 10] 

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
loss(label, prediction, sample_weight).numpy()

因此，要为本文制作样本权重，您需要一个函数，该函数接受一个`(data, label)`对并返回一个`(data, label, sample_weight)`三元组。其中，`sample_weight`是一个单通道图像，包含每个像素的类权重。

最简单的可能实现是将标签用作`class_weight`列表的索引：

In [None]:
def add_sample_weights(image, label):
    # 每个类的权重，具有以下约束：
    #     sum(class_weights) == 1.0
    class_weights = tf.constant([2.0, 2.0, 1.0])
    class_weights = class_weights/tf.reduce_sum(class_weights)

    # 使用每个像素处的标签作为“类权重”的索引，创建“sample_weights”图像。
    sample_weights = tf.gather(class_weights, indices=tf.cast(label, tf.int32))

    return image, label, sample_weights

生成的每个数据集元素包含3个图像：

In [None]:
train_batches.map(add_sample_weights).element_spec

现在，你可以在此加权数据集上训练模型：

In [None]:
weighted_model = unet_model(OUTPUT_CLASSES)
weighted_model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'])

In [None]:
weighted_model.fit(
    train_batches.map(add_sample_weights),
    epochs=1,
    steps_per_epoch=10)

## 保存模型

In [None]:
model.save('../models/unet.h5')