In [2]:
from __future__ import print_function
import os, glob
import numpy as np
import skimage.io as io
import skimage.transform as trans
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Dropout, concatenate
from tensorflow.keras.preprocessing.image import ImageDataGenerator
# 模型：
def unet(pretrained_weights=None, input_size=(256, 256, 1)):
    # Encoder
    inputs = tf.keras.Input(input_size)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv1)  # 介接
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv2)  # 介接
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv3)  # 介接
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)                                                                       # 介接
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)
    # 共用
    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)
    # Decoder
    up6 = Conv2D(512, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(drop5))
    merge6 = concatenate([drop4, up6], axis=3)  # 介接到Encoder drop4
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv6)

    up7 = Conv2D(256, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv6))
    merge7 = concatenate([conv3, up7], axis=3)  # 介接到Encoder conv3
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv7)

    up8 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv7))
    merge8 = concatenate([conv2, up8], axis=3)  # 介接到Encoder conv2
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv8)

    up9 = Conv2D(64, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv8))
    merge9 = concatenate([conv1, up9], axis=3)  # 介接到Encoder conv1
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    
    conv9 = Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv10 = Conv2D(1, 1, activation='sigmoid')(conv9)

    model = tf.keras.Model(inputs=inputs, outputs=conv10)
    model.compile(optimizer=tf.keras.optimizers.Adam(lr=1e-4), 
                  loss='binary_crossentropy', metrics=['accuracy'])
    # 如果有權重就載入
    if (pretrained_weights):
        model.load_weights(pretrained_weights)
    return model
model = unet()
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 256, 256, 1) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 256, 256, 64) 640         input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 256, 256, 64) 36928       conv2d[0][0]                     
__________________________________________________________________________________________________
max_pooling2d (MaxPooling2D)    (None, 128, 128, 64) 0           conv2d_1[0][0]                   
______________________________________________________________________________________________

In [3]:
# 資料前處理：mask(多分類one hot encoding，二分類二值化)、標準化
def adjustData(img, mask, flag_multi_class, num_class):
    # 輸出是否多分類(分割圖是否超過二個顏色)
        # mask每個像素值，進行one hot分類
        # flag_multi_class=True, num_class=分類數目
        # 多分類要配合修改模型
            # conv10 = Conv2D(分類數目, 1, activation='sigmoid')(conv9)
            # model.compile 的 loss 改成 'categorical_crossentropy'
    if (flag_multi_class):
        img = img / 255
        # 灰階去掉最裡面的維度，shape=(batch, 高, 寬) 或 (高, 寬)
        mask = mask[:, :, :, 0] if (len(mask.shape)==4) else mask[:, :, 0]
        # 空矩陣 最裡面是進行one hot的維度
            # mask.shape + (num_class,)=(高, 寬, num_class)
        new_mask = np.zeros(mask.shape + (num_class,))
        # 原來的像素值進行one hot分類
        for i in range(num_class):
            new_mask[mask == i, i] = 1
        # reshape=(batch, 高*寬, num_class) 或 (高*寬, num_class)
        new_mask = np.reshape(
            new_mask, (new_mask.shape[0], new_mask.shape[1]*new_mask.shape[2], new_mask.shape[3])) \
            if flag_multi_class else np.reshape(new_mask, 
                                                (new_mask.shape[0]*new_mask.shape[1], 
                                                 new_mask.shape[2]))
        mask = new_mask
    # 如果原圖沒有標準化
    elif (np.max(img) > 1):
        # 標準化
        img = img / 255
        mask = mask /255
        # mask二值化
        mask[mask > 0.5] = 1
        mask[mask <= 0.5] = 0
    return (img, mask)

# 訓練資料生成器 生成資料=(img, mask)
def trainGenerator(batch_size, train_path, image_folder, mask_folder, aug_dict, target_size=(256, 256), 
                   image_color_mode="grayscale", mask_color_mode="grayscale", 
                   save_to_dir=None, image_save_prefix="image", mask_save_prefix="mask", seed=1, 
                   flag_multi_class=False, num_class=2):
    '''
    can generate image and mask at the same time
    use the same seed for image_datagen and mask_datagen to ensure the transformation for image and mask is the same
    if you want to visualize the results of generator, set save_to_dir = "your path"
    '''
    image_datagen = ImageDataGenerator(**aug_dict)
    mask_datagen = ImageDataGenerator(**aug_dict)
    image_generator = image_datagen.flow_from_directory(
        train_path, target_size=target_size, color_mode=image_color_mode, 
        classes=[image_folder], class_mode=None, batch_size=batch_size, 
        save_to_dir=save_to_dir, save_prefix=image_save_prefix, seed=seed)
    mask_generator = mask_datagen.flow_from_directory(
        train_path, target_size=target_size, color_mode=mask_color_mode, 
        classes=[mask_folder], class_mode=None, batch_size=batch_size, 
        save_to_dir=save_to_dir, save_prefix=mask_save_prefix, seed=seed)
    # 將生成器組合成 圖像 + 處理過後分割圖的生成器(mask)
    train_generator = zip(image_generator, mask_generator)
    # 前處理：mask(多分類one hot encoding，二分類二值化)、標準化
    for (img, mask) in train_generator:
        img, mask = adjustData(img, mask, flag_multi_class, num_class)
        yield (img, mask)

# 測試資料生成器：逐一讀取 0.png到29.png 回傳
def testGenerator(test_path, num_image=30, target_size=(256, 256), 
                  flag_multi_class=False, as_gray=True):
    for i in range(num_image):
        # 灰階讀取 0.png到29.png
        img = io.imread(os.path.join(test_path, "%d.png"%i), as_gray=as_gray)
        # 標準化
        img = img / 255
        # (256, 256)
        img = trans.resize(img, target_size)
        # 加一維：(256, 256, 1)
        img = np.reshape(img, img.shape+(1,)) if (not flag_multi_class) else img
        # 加上batch：(1, 256, 256, 1)
        img = np.reshape(img, (1,)+img.shape)
        yield img

# 讀取指定路徑資料夾
def geneTrainNpy(image_path, mask_path, flag_multi_class=False, num_class=2, image_prefix="image", 
                 mask_prefix="mask", image_as_gray=True, mask_as_gray=True):
    image_name_arr = glob.glob(os.path.join(image_path, "%s*.png" % image_prefix))
    image_arr = []
    mask_arr = []
    for index, item in enumerate(image_name_arr):
        img = io.imread(item, as_gray=image_as_gray)
        img = np.reshape(img, img.shape+(1,)) if image_as_gray else img
        mask = io.imread(item.replace(image_path, mask_path).replace(image_prefix, mask_prefix), 
                         as_gray=mask_as_gray)
        mask = np.reshape(mask, mask.shape+(1,)) if mask_as_gray else mask
        img,mask = adjustData(img, mask,flag_multi_class, num_class)
        image_arr.append(img)
        mask_arr.append(mask)
    image_arr = np.array(image_arr)
    mask_arr = np.array(mask_arr)
    return image_arr, mask_arr

# 標示顏色
Sky = [128, 128, 128]
Building = [128, 0, 0]
Pole = [192, 192, 128]
Road = [128, 64, 128]
Pavement = [60, 40, 222]
Tree = [128, 128, 0]
SignSymbol = [192, 128, 128]
Fence = [64, 64,128]
Car = [64, 0, 128]
Pedestrian = [64, 64, 0]
Bicyclist = [0, 128, 192]
Unlabelled = [0, 0, 0]
COLOR_DICT = np.array([Sky, Building, Pole, Road, Pavement, Tree, SignSymbol, 
                       Fence, Car, Pedestrian, Bicyclist, Unlabelled])

# 多分類處理：參數(分類數目, 可供標示的顏色dict, 預測出的mask圖)
def labelVisualize(num_class, color_dict, img):
    img = img[:, :, 0] if len(img.shape) == 3 else img  # shape=(寬, 高)
    img_out = np.zeros(img.shape+(num_class,))          # shape=(寬, 高, 分類數目)
    for i in range(num_class):
        img_out[img==i, :] = color_dict[i]
    return img_out / 255

# 預測圖片存檔
def saveResult(save_path, npyfile, flag_multi_class=False, num_class=2):
    for i, item in enumerate(npyfile):
        # 多分類處理
        img = (labelVisualize(num_class, COLOR_DICT, item) if flag_multi_class else item[:, :, 0])
        io.imsave(os.path.join(save_path, "%d_predict.png" % i), img)

In [4]:
# 生成資料
data_gen_args = dict(
    rotation_range=0.2, width_shift_range=0.05, height_shift_range=0.05, 
    shear_range=0.05, zoom_range=0.05, horizontal_flip=True, fill_mode='nearest')
    # 參數 (batch_size, 資料路徑, 路徑下的影像資料夾, 路徑下的mask影像資料夾, 
    #      ImageDataGenerator參數, 存檔路徑)
myGene = trainGenerator(
    2, '0.data/U-nets/keras/unet-master/data/membrane/train', 'image', 'label', 
    data_gen_args, save_to_dir=None)
# 模型
model = unet()
# 訓練
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    '0.data/U-nets/keras/unet-master/unet_membrane.hdf5', 
    monitor='loss', save_best_only=True, verbose=1)
model.fit_generator(myGene, steps_per_epoch=300, epochs=1, callbacks=[model_checkpoint])

# 預測
    # 測試資料生成器：逐一讀取 0.png到29.png 回傳
testGene = testGenerator("0.data/U-nets/keras/unet-master/data/membrane/test")
    # 預測出淨化後的分割圖
        # predict_generator參數
            # (生成器, steps=生成器應該返回的總樣本數, 
            #  max_queue_size=10生成器隊列的最大容量, workers=1同時處理(多線程), 
            #  use_multiprocessing=False是否使用多線程, verbose=0顯示設定)
results = model.predict_generator(testGene, 30, verbose=1)
    # 預測圖片存檔
saveResult("0.data/U-nets/keras/unet-master/data/membrane/result", results)

Found 30 images belonging to 1 classes.
Found 30 images belonging to 1 classes.
Epoch 00001: loss improved from inf to 0.34077, saving model to 0.data/U-nets/keras/unet-master/unet_membrane.hdf5


