1. 區域分割 （工具）
2. 區域 -> 類別

In [None]:
# UNET model

from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

input_size = (1024, 1024, 3)

def unet(input_size):
    
    inputs = Input(input_size)

    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)   # 64: filters, 3: kernel size
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=2)(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)   # 64: filters, 3: kernel size
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=2)(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)   # 64: filters, 3: kernel size
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=2)(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)   # 64: filters, 3: kernel size
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=2)(conv4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(pool4)   # 64: filters, 3: kernel size
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(conv5)

    # up-scaling

    up6 = UpSampling2D(size=2)(conv5)
    up6 = Conv2D(512, 2, activation='relu', padding='same')(up6)
    merge6 = concatenate([conv4 , up6], axis=3)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(conv6)
    # output_shape = (128, 128, 512)

    up7 = UpSampling2D(size=2)(conv6)
    up7 = Conv2D(256, 2, activation='relu', padding='same')(up7)
    merge7 = concatenate([conv3 , up7], axis=3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(conv7)
    # output_shape = (256, 256, 256)

    up8 = UpSampling2D(size=2)(conv7)
    up8 = Conv2D(128, 2, activation='relu', padding='same')(up8)
    merge8 = concatenate([conv2 , up8], axis=3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(conv8)
    # output_shape = (512, 512, 128)

    up9 = UpSampling2D(size=2)(conv8)
    up9 = Conv2D(64, 2, activation='relu', padding='same')(up9)
    merge9 = concatenate([conv1 , up9], axis=3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(conv9)
    # output_shape = (1024, 1024, 128)

    outputs = Conv2D(1, 3, activation='sigmoid', padding='same')(conv9)

    model = Model(inputs=inputs, outputs=outputs)
    
    return model

In [None]:
model = unet(input_size=input_size)
model.compile(optimizer = Adam(lr = 1e-4), loss='binary_crossentropy', metrics=['accuracy'])

# Data generator

## small example

In [None]:
from glob import glob

import numpy as np
from PIL import Image

mask = np.array(Image.open("../data/train/104_mask.png"))

In [None]:
mask.shape

In [None]:
data_files = glob(f"../data/train/*.jpg")

In [None]:
data_files

In [None]:
mask_files = [f.replace('sat', 'mask').replace('jpg', 'png') for f in data_files]

In [None]:
mask_files

In [None]:
import pandas as pd

pd.DataFrame(data=np.array([data_files, mask_files]).T)

In [None]:
from typing import Optional, Tuple, List

import pandas as pd
import numpy as np
from PIL import Image
from tensorflow.keras.utils import Sequence


class DataGenerator(Sequence):

    def __init__(self, dir: str, img_col: str, mask_col: str, img_augmentation, sample_size: Optional[int] = None, batch_size: int = 32, shuffle: bool = True):
        
        """
        Args:
            dir: directory in which images are stored
            sample_size: Optional; number of images will be sampled in each of sub_directory,
            from tying import Union
            sample_size: Union[int, None] -> Optional[int]
            if not provided all images in the dir are taken into account.
            batch_size: number of images in each of batch
            shuffle: if shuffle the order of the data
        """
        
        self.dir = dir
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.sample_size = sample_size
        self.img_col = img_col
        self.mask_col = mask_col
        self.img_augmentation = img_augmentation

        self.on_epoch_end()

        self.max = self.__len__()
        self.n = 0

    def __transform_to_dataframe(self) -> pd.DataFrame:
        
        """
        transform the data into a pandas dataframe to track the image files and the corresponding masks
        """
        
        dir_ = f"../data/{self.dir}"

        data = []

        data_files = glob(f"{dir_}/*.jpg")
        
        if self.sample_size:
            sampled_files = random.sample(data_files, min(self.sample_size, len(data_files)))
        else:
            sampled_files = data_files
        
        mask_files = [f.replace('sat', 'mask').replace('jpg', 'png') for f in sampled_files]
    
        df = pd.DataFrame(data=np.array([data_files, mask_files]).T, columns=[self.img_col, self.mask_col], dtype=object)

        return df

    def on_epoch_end(self):
        
        self.df = self.__transform_to_dataframe()
        self.indices = self.df.index.tolist()

        self.index = np.arange(len(self.indices))
        if self.shuffle:
            np.random.shuffle(self.index)

    def __len__(self):
        #  Denotes the number of batches per epoch
        return int(np.ceil(len(self.indices) / self.batch_size))

    def __getitem__(self, index) -> Tuple[np.ndarray, np.ndarray]:
        # Generate one batch of data
        # Generate indices of the batch
        index = self.index[index * self.batch_size:(index + 1) * self.batch_size]
        # Find list of IDs
        batch = [self.indices[k] for k in index]
        # Generate data
        X, y = self.__get_data(batch)

        return X, y

    def __get_data(self, batch: List) -> Tuple[np.ndarray, np.ndarray]:

        df_batch = self.df.loc[batch]

        sat_dataset = []
        mask_dataset = []

        for _, row in df_batch.iterrows():
            # lock the image augmentation
            seq_det = self.img_augmentation.to_deterministic()
            
            # input image
            f = row[self.img_col]
            sat_image = seq_det.augment_image(np.array(Image.open(f)))
            sat_dataset.append(sat_image/255.0)
            
            
            # mask image
            f = row[self.mask_col]
            mask_image = seq_det.augment_image(np.array(Image.open(f).convert('L')), hooks=ia.HooksImages(activator=self.activator))
            mask_dataset.append(mask_image//255)
            
            # for multiclasses: one-hot encoding
            # new_mask = np.zeros(mask.shape + (num_classes, ))
            # for i in range(num_classes):
            #   new_mask[mask==i, i] = 1

        return np.array(sat_dataset), np.array(mask_dataset)

    def __next__(self):
        
        """
        generate data of size batch_size
        """
        
        if self.n >= self.max:
            self.n = 0

        result = self.__getitem__(self.n)
        self.n += 1
        return result
    
    def activator(self, images, augmenter, parents, default):
        return False if augmenter.name in ["GaussianBlur"] else default

## let us see how image augmentation works

In [None]:
import imgaug as ia
import matplotlib.pyplot as plt
from imgaug import augmenters as iaa

seq = iaa.Sequential([
    iaa.Crop(px=(0, 16)), # crop images from each side by 0 to 16px (randomly chosen)
    iaa.Fliplr(0.5), # horizontally flip 50% of the images
    iaa.Flipud(0.5),
    iaa.GaussianBlur(sigma=(0, 3.0)) # blur images with a sigma of 0 to 3.0
])

In [None]:
sat_image = np.array(Image.open("../data/train/104_sat.jpg"))
mask_image = np.array(Image.open("../data/train/104_mask.png").convert('L'))

In [None]:
seq_det_1 = seq.to_deterministic()
sat_image_1 = seq_det_1.augment_image(sat_image)
mask_image_1 = seq_det_1.augment_image(mask_image)

In [None]:
plt.imshow(sat_image_1)

In [None]:
plt.imshow(mask_image_1)

In [None]:
seq_det_2 = seq.to_deterministic()
sat_image_2 = seq_det_2.augment_image(sat_image)
mask_image_2 = seq_det_2.augment_image(mask_image)

In [None]:
plt.imshow(sat_image_2)

In [None]:
plt.imshow(mask_image_2)

In [None]:
def activator(images, augmenter, parents, default):
    return False if augmenter.name in ["GaussianBlur"] else default

seq_det_2.augment_image(mask_image, hooks=ia.HooksImages(activator=activator)).shape

# Fit model test

In [None]:
from glob import glob

datagen = DataGenerator(dir='train', img_col='sat', mask_col='mask', img_augmentation=seq, shuffle=True, batch_size=1)

In [None]:
pair = next(datagen)  # sat image, mask image

In [None]:
pair[0].shape

In [None]:
pair[1].shape

In [None]:
model.fit_generator(generator=datagen, steps_per_epoch=4, epochs=1)