https://github.com/karolzak/keras-unet/blob/master/notebooks/kz-whale-tails.ipynb

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import glob
import os
import sys
from pathlib import Path
from PIL import Image

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.callbacks import ModelCheckpoint


In [None]:
DATA_DIR = Path("./data/tiles/ls78_ae")
BATCH_SIZE = 2

MAX_X = 255
MAX_Y = 65535  # TODO: Tilegen outputs rgb image, not grayscale

BAND_DIRS = sorted(list(DATA_DIR.glob("B*"))) # [::-1]  # reverse for rgb
BASE_DIR = DATA_DIR / "B1"

display(BAND_DIRS)
display(list())

In [None]:
base_paths = list(BASE_DIR.glob("*.png"))

In [None]:
def read_fname(fname):
    # div by maxy, but mul by maxx in anticipation of preprocess_transform
    bands = np.asarray([np.array(Image.open(band_dir / fname)) for band_dir in BAND_DIRS])
#     bands = bands[1:4] # [::-1]
    mchannel = np.dstack(bands).astype('uint8')
    return mchannel

def preprocess_transform(ds):
    return (ds / MAX_X).astype('float32')

def rgb_transform(ds):
#     return ds # bands 4, 3, 2, -> rgb
#     return np.flip(ds, 3)  # Bands 2, 3, 4 -> rgb
    return np.flip(ds[:,:,:,1:4], 3)  # Bands 1, 2, 3, 4, 5, 7 -> rgb

In [None]:
display(base_paths[0].name)

x0 = read_fname(base_paths[0].name)

display(x0.shape)
# display(y0.shape)


In [None]:
img_lst = []

for base_path in base_paths:
    img = read_fname(base_path.name)
    img_lst.append(img)

img_np = np.asarray(img_lst)

del img_lst

display(f'{img_np.shape=}')

In [None]:
from keras_unet.utils import plot_imgs, plot_aeimgs

display(img_np.min())
display(img_np.max())

plot_aeimgs(org_imgs=rgb_transform(img_np) / 255, nm_img_to_plot=5, figsize=6)

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_val, _, _ = train_test_split(img_np, img_np, test_size=0.30, random_state=0)

print("x_train: ", x_train.shape)
# print("y_train: ", y_train.shape)
print("x_val: ", x_val.shape)
# print("y_val: ", y_val.shape)

input_shape = x_train[0].shape
display(f'{input_shape=}')

train_len = len(x_train)
validation_len = len(x_val)

In [None]:
x_val = preprocess_transform(x_val)

x_val_tf = tf.convert_to_tensor(x_val)

In [None]:
from keras_unet.utils import get_augmented

train_gen = get_augmented(
    x_train, x_train, 
#     x_val_tf, y_val_tf,
    batch_size=BATCH_SIZE,
    data_gen_args = dict(
#         rescale=1 / 255,
        preprocessing_function=preprocess_transform,
        rotation_range=360.,
        width_shift_range=0.05,
        height_shift_range=0.05,
        shear_range=40,
        zoom_range=0.2,
#         brightness_range=[0.6,1.4],  # only works on 1 or 3 channel images
        channel_shift_range=10.0,
        horizontal_flip=True,
        vertical_flip=True,
        fill_mode='reflect'
    ))

# train_gen = get_augmented(
#     x_train, y_train, batch_size=2,
#     data_gen_args=dict(
#         rotation_range=0.,
#         width_shift_range=0.00,
#         height_shift_range=0.00,
#         shear_range=0,
#         zoom_range=0.0,
#         horizontal_flip=True,
#         vertical_flip=True,
#         fill_mode='constant'
#     )
# )

del x_train

In [None]:
sample_batch = next(train_gen)

xx, yy = sample_batch
print(xx.shape, yy.shape)
from keras_unet.utils import plot_imgs

plot_aeimgs(org_imgs=rgb_transform(xx), nm_img_to_plot=2, figsize=6)

In [None]:
from bcdunet import BCDU_net_D3, BCDU_net_D1

# input_shape = (512, 512, 3)
# model = BCDU_net_D3(input_size=input_shape)


In [None]:
from keras_unet.models import custom_unet

# # input_shape = (512, 512, 6)

def create_model_unet():
    model = custom_unet(
        input_shape,
        filters=32,
        use_batch_norm=True,
        dropout=0.5,  # 0.3
        dropout_change_per_layer=0.0,
        use_attention=False,
        use_squeeze_on_enc=True,
        use_squeeze_on_dec=True,
        num_layers=5,
        skip_connections=False,  # auto encoder
        num_classes=6,  # 6 output channels
    )
    return model

model = create_model_unet()

# model = custom_unet(
#     input_shape,
#     filters=32,
#     use_batch_norm=True,
#     dropout=0.5,  # 0.3
#     dropout_change_per_layer=0.0,
#     use_attention=False,
#     use_squeeze_on_enc=True,
#     use_squeeze_on_dec=True,
#     num_layers=5,
# )


In [None]:
tf.keras.utils.plot_model(model)

display(model.summary())

In [None]:
model_filename = 'model_trainchpt.h5'
callback_checkpoint = ModelCheckpoint(
    model_filename, 
    verbose=1, 
    monitor='val_loss', 
    save_best_only=True,
)


In [None]:
# https://github.com/ailias/Focal-Loss-implement-on-Tensorflow/blob/master/focal_loss.py

from tensorflow.python.ops import array_ops

def focal_loss(prediction_tensor, target_tensor, weights=None, alpha=0.25, gamma=2):
    r"""Compute focal loss for predictions.
        Multi-labels Focal loss formula:
            FL = -alpha * (z-p)^gamma * log(p) -(1-alpha) * p^gamma * log(1-p)
                 ,which alpha = 0.25, gamma = 2, p = sigmoid(x), z = target_tensor.
    Args:
     prediction_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing the predicted logits for each class
     target_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing one-hot encoded classification targets
     weights: A float tensor of shape [batch_size, num_anchors]
     alpha: A scalar tensor for focal loss alpha hyper-parameter
     gamma: A scalar tensor for focal loss gamma hyper-parameter
    Returns:
        loss: A (scalar) tensor representing the value of the loss function
    """
    sigmoid_p = tf.nn.sigmoid(prediction_tensor)
    zeros = array_ops.zeros_like(sigmoid_p, dtype=sigmoid_p.dtype)
    
    # For poitive prediction, only need consider front part loss, back part is 0;
    # target_tensor > zeros <=> z=1, so poitive coefficient = z - p.
    pos_p_sub = array_ops.where(target_tensor > zeros, target_tensor - sigmoid_p, zeros)
    
    # For negative prediction, only need consider back part loss, front part is 0;
    # target_tensor > zeros <=> z=1, so negative coefficient = 0.
    neg_p_sub = array_ops.where(target_tensor > zeros, zeros, sigmoid_p)
    per_entry_cross_ent = - alpha * (pos_p_sub ** gamma) * tf.math.log(tf.clip_by_value(sigmoid_p, 1e-8, 1.0)) \
                          - (1 - alpha) * (neg_p_sub ** gamma) * tf.math.log(tf.clip_by_value(1.0 - sigmoid_p, 1e-8, 1.0))
    return tf.reduce_sum(per_entry_cross_ent)


In [None]:
from tensorflow.keras.optimizers import Adam, SGD
from keras_unet.metrics import iou, iou_thresholded
from keras_unet.losses import jaccard_distance

def closs(y_true, y_pred):
    return 2 * jaccard_distance(y_true, y_pred) + tf.keras.losses.binary_crossentropy(y_true, y_pred)

model.compile(
    optimizer=Adam(),
#     optimizer="adam",
    #optimizer=SGD(lr=0.01, momentum=0.99),
    loss='binary_crossentropy',
#     loss=jaccard_distance,
#     loss=focal_loss,
#     loss=closs,
    metrics=[iou, iou_thresholded],
)


In [None]:
model.load_weights('autoencoder.h5')

In [None]:
history = model.fit(
    train_gen,
    steps_per_epoch=train_len // BATCH_SIZE,
    epochs=500,
    validation_data=(x_val_tf, x_val_tf),
    validation_batch_size=BATCH_SIZE,
    callbacks=[callback_checkpoint]
)


In [None]:
from keras_unet.utils import plot_segm_history

plot_segm_history(history)

In [None]:
model.save_weights('autoencoder.h5')

In [None]:
model.load_weights(model_filename)
# model.load_weights('model_trainchpt.h5')

In [None]:
pred_off = 1000
y_pred = model.predict(x_val[pred_off:pred_off + 10], batch_size=BATCH_SIZE)

In [None]:
from keras_unet.utils import plot_imgs, plot_aeimgs

display(len(x_val))
plot_aeimgs(
    org_imgs=rgb_transform(x_val[pred_off:pred_off + 10]),
    pred_imgs=rgb_transform(y_pred),
    nm_img_to_plot=10, figsize=4)
