In [None]:
import math
import numpy
import os
import pathlib
import skimage.io
import sklearn.model_selection
import tensorflow

from metrics import dice_coef, jaccard_distance
from model import unet_model, get_loss_function

In [None]:
cfg = {
    "channel": 3,
    "batch_size": 4,
    "fold": 5,
    "epochs": 75,
    "image_size": 256,
    "learning_rate": 0.001,
    "random_state": 1234,
    "test_size": 0.2,
    "val_size": 0.05,
    "path_dataset": "dataset",
    "path_out": "out",
    "loss_function": "dice"
}
images_folder = os.path.join(cfg["path_dataset"], "IMAGEM_ORIGINAL", "CONVERTIDAS", "RGB", "256", "OUT")
masks_folder = os.path.join(cfg["path_dataset"], "MASK", "BITMAP", "256", "OUT")

In [None]:
list_images = sorted(list([file for file in pathlib.Path(images_folder).rglob("*")]))
list_masks = sorted(list([file for file in pathlib.Path(masks_folder).rglob("*")]))

x_train, x_val, y_train, y_val = sklearn.model_selection.train_test_split(list_images, list_masks, test_size=cfg["test_size"], random_state=cfg["random_state"])
len(x_train), len(x_val)

In [None]:
class CreateSequence(tensorflow.keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths):
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_img_paths = input_img_paths
        self.target_img_paths = target_img_paths

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size

    def __getitem__(self, idx):
        """Returns tuple (input, target) correspond to batch #idx."""
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        x = numpy.zeros((self.batch_size,) + self.img_size + (3,), dtype="float32")
        for j, path in enumerate(batch_input_img_paths):
            img = skimage.io.imread(path)
            img = numpy.float32(img/255)
            x[j] = img
        y = numpy.zeros((self.batch_size,) + self.img_size, dtype="float32")
        for j, path in enumerate(batch_target_img_paths):
            img = skimage.io.imread(path)
            img = numpy.float32(img/255)
            y[j] = img
        return x, y

In [None]:
train_data = CreateSequence(cfg["batch_size"], (cfg["image_size"], cfg["image_size"]), list_images, list_masks)
val_data = CreateSequence(cfg["batch_size"], (cfg["image_size"], cfg["image_size"]), x_val, y_val)

In [None]:
path_model = os.path.join(cfg["path_out"], "train")
pathlib.Path(path_model).mkdir(parents=True, exist_ok=True)

steps_per_epoch = math.ceil(len(images_folder) / cfg["batch_size"])
reduce_learning_rate = tensorflow.keras.callbacks.ReduceLROnPlateau(monitor="loss", factor=0.5, patience=3, verbose=1)
filename_model = os.path.join(path_model, "unet.h5")
checkpointer = tensorflow.keras.callbacks.ModelCheckpoint(filename_model, verbose=1, save_best_only=True)
strategy = tensorflow.distribute.MirroredStrategy()

with strategy.scope():
    model = unet_model(cfg)
    adam_opt = tensorflow.keras.optimizers.Adam(learning_rate=cfg["learning_rate"])
    model.compile(optimizer=adam_opt, loss=get_loss_function(cfg["loss_function"]), metrics=[dice_coef, jaccard_distance, tensorflow.keras.metrics.Precision(), tensorflow.keras.metrics.Recall()])

tensorflow.keras.backend.clear_session()
fit = model.fit(train_data, steps_per_epoch=steps_per_epoch, epochs=cfg["epochs"], validation_data=val_data, callbacks=[checkpointer, reduce_learning_rate])