# Primitive Unet on ditches

In [3]:
import os
import imageio # For images
from skimage.transform import resize # For preprocessing
import tensorflow as tf

2023-05-05 10:09:46.187151: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Read data from disk

In [8]:
%%time

IMG_WIDTH = 512 # the original images are 500x500. is this a ok way to change size?
IMG_HEIGHT = 512
IMG_CHANNELS = 1 # a grey scale image only has one band for color.
NUM_CLASSES = 1 # 0 = no crater and 1 = crater

#### Training data ####
TRAIN_PATH = '/workspace/data/ditches/training/'
IMG_DIR = 'hpmf'
GT_DIR = '3labels'
X_train_ditches = []
Y_train_ditches = []

# load from disk
img_path = os.path.join(TRAIN_PATH, IMG_DIR)
gt_path = os.path.join(TRAIN_PATH, GT_DIR)
for image in (os.listdir(img_path)):
    if image.endswith('.tif'):
        img = imageio.imread(os.path.join(img_path, image))

        img = resize(img, (IMG_WIDTH, IMG_HEIGHT,1), mode='constant', preserve_range=True)

        mask = imageio.imread(os.path.join(gt_path, image))
        mask = resize(mask, (IMG_WIDTH, IMG_HEIGHT, 1), preserve_range=True, order=0).astype(int)

        X_train_ditches.append(img)
        Y_train_ditches.append(mask)

#### Test data ####
TEST_PATH = '/workspace/data/ditches/testing/'
IMG_DIR = 'hpmf'
GT_DIR = '3labels'
X_test_ditches = []
Y_test_ditches = []

# load from disk
img_path = os.path.join(TEST_PATH, IMG_DIR)
gt_path = os.path.join(TEST_PATH, GT_DIR)
for image in (os.listdir(img_path)):
    if image.endswith('.tif'):
        img = imageio.imread(os.path.join(img_path, image))
        img = resize(img, (IMG_WIDTH, IMG_HEIGHT,1), mode='constant', preserve_range=True)
        mask = imageio.imread(os.path.join(gt_path, image))
        mask = resize(mask, (IMG_WIDTH, IMG_HEIGHT, 1), preserve_range=True, order=0).astype(int)

        X_test_ditches.append(img)
        Y_test_ditches.append(mask)

# convert list of numpy arrays into tensorflow dataset for further processing
train_images_ditches = tf.data.Dataset.from_tensor_slices((X_train_ditches, Y_train_ditches))
test_images_ditches = tf.data.Dataset.from_tensor_slices((X_test_ditches, Y_test_ditches))



ValueError: Could not find a backend to open `/workspace/data/ditches/training/hpmf/Thumbs.db`` with iomode `ri`.

## Set up pipeline

In [1]:
BATCH_SIZE = 64
BUFFER_SIZE = 128

# training
train_batches_ditches  = (train_images_ditches
                    .cache() # cache data
                    .shuffle(BUFFER_SIZE) # fill buffer, sample from it and replace with new items (buffer size > training set for perfect shuffling)
                    .batch(BATCH_SIZE)  
                    .repeat() 
                    .prefetch(buffer_size=128))
# testing
test_batches_ditches  = (test_images_ditches 
                    .cache() # cache data
                    .shuffle(BUFFER_SIZE) # fill buffer, sample from it and replace with new items (buffer size > training set for perfect shuffling)
                    .batch(BATCH_SIZE)  
                    .repeat(1)  # repeat dataset idefinetely
                    .prefetch(buffer_size=128)) 


NameError: name 'train_images_ditches' is not defined

## Add f1-score as a metric to monitor

In [None]:
from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

## Basic U-net

In [None]:
#Build the model
inputs = tf.keras.layers.Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
NUM_CLASSES = 1
#Contraction path
c1 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(inputs)
c1 = tf.keras.layers.Dropout(0.1)(c1) # to prevent overfitting
c1 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
p1 = tf.keras.layers.MaxPooling2D((2, 2))(c1)

c2 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
c2 = tf.keras.layers.Dropout(0.1)(c2)
c2 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
p2 = tf.keras.layers.MaxPooling2D((2, 2))(c2)
 
c3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
c3 = tf.keras.layers.Dropout(0.2)(c3)
c3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
p3 = tf.keras.layers.MaxPooling2D((2, 2))(c3)
 
c4 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
c4 = tf.keras.layers.Dropout(0.2)(c4)
c4 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
p4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c4)
 
c5 = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
c5 = tf.keras.layers.Dropout(0.3)(c5)
c5 = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)

#Expansive path 
u6 = tf.keras.layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(c5)
u6 = tf.keras.layers.concatenate([u6, c4])
c6 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
c6 = tf.keras.layers.Dropout(0.2)(c6)
c6 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
 
u7 = tf.keras.layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c6)
u7 = tf.keras.layers.concatenate([u7, c3])
c7 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
c7 = tf.keras.layers.Dropout(0.2)(c7)
c7 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
 
u8 = tf.keras.layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c7)
u8 = tf.keras.layers.concatenate([u8, c2])
c8 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
c8 = tf.keras.layers.Dropout(0.1)(c8)
c8 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
 
u9 = tf.keras.layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c8)
u9 = tf.keras.layers.concatenate([u9, c1], axis=3)
c9 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
c9 = tf.keras.layers.Dropout(0.1)(c9)
c9 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
 
outputs = tf.keras.layers.Conv2D(NUM_CLASSES, (1, 1))(c9)

model_ditches = tf.keras.Model(inputs=[inputs], outputs=[outputs])

model_ditches.compile(optimizer='adam', loss=tf.keras.losses.BinaryFocalCrossentropy(gamma=2.0, from_logits=True), metrics=['acc', f1_m, recall_m])

## Set Weigths

In [None]:
def add_sample_weights(image, label):
    # The weights for each class, with the constraint that:
    #     sum(class_weights) == 1.0
    class_weights = tf.constant([0.01, 1]) # the first weight is for the background and the second for the craters.
    class_weights = class_weights/tf.reduce_sum(class_weights)

    # Create an image of `sample_weights` by using the label at each pixel as an 
    # index into the `class weights` .
    sample_weights = tf.gather(class_weights, indices=tf.cast(label, tf.int32))

    return image, label, sample_weights

## Train the model

In [None]:
%%time
result_ditches = model_ditches.fit(train_batches_ditches.map(add_sample_weights), epochs=200, steps_per_epoch=20)


## Plot history

In [None]:
# summarize history for accuracy
import matplotlib.pyplot as plt
plt.plot(result_ditches.history['loss'])
plt.title('Model accuracy using f1 score')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

## Evaluate model on test data

In [None]:
model__ditches.evaluate(test_batches_ditches)