# **Homework 2 - Semantic Segmentation**

Objective: Implement a U-Net Network for semantic segmentation.



Dataset:

<figure>
<center>
<img src= 'https://raw.githubusercontent.com/mabelortega/DL_Semantic_Segmentation/main/Figures/drawing1.png'/>
</figure>

You must train the network model by using these images [https://drive.google.com/file/d/1TU2nTVGS2932hRs1u-ma4r3vmgqHRbMO/view?usp=sharing]. Image_Train.tif and Reference_Train.tif images and it must be evaluated on Image_Test.tif and Reference_Test.tif images. You can use this notebook that contains some basic functions.

Experimental Protocol

Load the input data
1.     Load the images provided from 2D Semantic Labeling-Vaihingen dataset using the function load_tiff_image(image) and normalize the data into the range [0,1] using the function normalization (image)

Train the FCN model
2.     To train the FCN model you need patches as input. You must extract patches of size w-by-w-by-c pixels from Image_Train and patches with size w-by-w from Reference_Train. The number of patches and the w must be chosen based on the input size of network.

3.     Split randomly the training patches into two sets: Training (80%) and validation (20%).

4.     Convert the patches of the Reference image into one-hot encoding base on the number of classes. Hint: Use the function tf.keras.utils.to_categorical.

5.     Create the function of the U-Net model - Using skip connections: Hint: use tensorflow.keras.layers.concatenate

6.     For training, use the weighted_categorical_crossentropy as a loss function. Hint: To compute the weights you must count the number of pixels of each class and apply the formula: w_i = #total_pixels / #pixels_of_class_i



7.     Train the model using Train_model() function, which has as input the training and validation patches. You must the best model adding the early stop strategy with patience equal to 10.

8.     Extract patches from the test images and test the model using Test(model, patch_test).

9.     Reconstruct the prediction (whole test image)

The report must present the classification results as label images, and report accuracy metrics (overall and average class accuracies, F1-score) you also must change the size of the extracted patches to compare the results (32x32, 64x64, 128x128)

# **Import the libraries**

In [1]:
import keras.backend as K
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from keras import Model
from keras.layers import Conv2D, Input, MaxPool2D, UpSampling2D, Conv2DTranspose, concatenate
from keras.optimizers import Adam
from PIL import Image
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.utils import shuffle


2023-09-25 11:48:48.126843: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-25 11:48:48.435534: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-25 11:48:48.436776: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Load images

In [2]:
def load_tiff_image(path, grayscale=False):
    image = Image.open(path)
    image = image.convert("L") if grayscale else image

    return np.array(image)


train_image = load_tiff_image("images/Images_Train/Image_Train.tif")
train_reference = load_tiff_image(
    "images/Images_Train/Reference_Train.tif",
    grayscale=True,
)

test_image = load_tiff_image("images/Images_Test/Image_Test.tif")
test_reference = load_tiff_image(
    "images/Images_Test/Reference_Test.tif",
    grayscale=True,
)


In [3]:
def one_hot_encode(image: np.ndarray):
    W, H = image.shape
    
    colors = np.sort(np.unique(image))
    image_encoded = np.zeros((W, H, len(colors)))

    for i, color in enumerate(colors):
        image_encoded[:, :, i] = image == color

    return image_encoded

train_reference_encoded = one_hot_encode(train_reference)
test_reference_encoded = one_hot_encode(test_reference)


In [4]:
def normalization(image: np.ndarray):
    W, H, C = image.shape

    scaler = MinMaxScaler(feature_range=(0, 1))
    image_normalized = scaler.fit_transform(image.reshape((W * H), C))
    image_normalized = image_normalized.reshape(W, H, C)

    return image_normalized

train_image_normalized = normalization(train_image)
test_image_normalized = normalization(test_image)


# **Define the functions**

In [6]:
def extract_patches(image, size, stride):
    W, H, C = image.shape
    patches = []

    for i in range(0, W, stride):
        for j in range(0, H, stride):
            if i + size > W or j + size > H:
                continue

            patch = image[i : i + size, j : j + size]
            patches.append(patch)

    return np.array(patches).reshape(-1, size, size, C)


def weighted_categorical_crossentropy(weights):
    weights = K.variable(weights)

    def loss(y_true, y_pred):
        y_pred /= K.sum(y_pred, axis=-1, keepdims=True)
        y_pred = K.clip(y_pred, K.epsilon(), 1 - K.epsilon())

        loss = y_true * K.log(y_pred) + (1 - y_true) * K.log(1 - y_pred)
        loss = -K.mean(loss * weights, -1)

        return loss

    return loss


In [42]:
def convolution_layers(layer, n_filters):
    conv = Conv2D(n_filters, 3, padding="same", activation="relu")(layer)
    conv = Conv2D(n_filters, 3, padding="same", activation="relu")(conv)
    return conv


def downsample_block(layer, n_filters):
    conv = convolution_layers(layer, n_filters)
    pool = MaxPool2D(2)(conv)
    return conv, pool


def upsample_block(layer, conv_features, n_filters):
    layer = Conv2DTranspose(n_filters, 3, 2, padding="same")(layer)
    layer = concatenate([layer, conv_features])
    layer = convolution_layers(layer, n_filters)
    return layer


def unet(input_shape, n_classes):
    print(input_shape)
    in_layer = Input(input_shape)

    down_conv1, down_pool1 = downsample_block(in_layer, 64)
    down_conv2, down_pool2 = downsample_block(down_conv1, 128)
    down_conv3, down_pool3 = downsample_block(down_conv2, 256)
    down_conv4, down_pool4 = downsample_block(down_conv3, 512)

    middle_conv = convolution_layers(down_conv4, 1024)

    up_conv1 = upsample_block(middle_conv, down_conv4, 512)
    up_conv2 = upsample_block(up_conv1, down_conv3, 256)
    up_conv3 = upsample_block(up_conv2, down_conv2, 128)
    up_conv4 = upsample_block(up_conv3, down_conv1, 64)

    out_layer = Conv2D(3, 1, padding="same", activation="softmax")(up_conv4)

    return Model(in_layer, out_layer)


In [40]:
def train_model(
    model,
    patches_train,
    patches_tr_lb_h,
    patches_val,
    patches_val_lb_h,
    batch_size,
    epochs,
):
    print("Start training.. ")
    for epoch in range(epochs):
        loss_train = np.zeros((1, 2))
        loss_val = np.zeros((1, 2))

        # Computing the number of batchs
        n_batches_train = patches_train.shape[0] // batch_size

        # Random shuffle the data
        patches_train, patches_tr_lb_h = shuffle(
            patches_train, patches_tr_lb_h, random_state=0
        )

        # Training the network per batch
        for batch in range(n_batches_train):
            x_train_b = patches_train[
                batch * batch_size : (batch + 1) * batch_size, :, :, :
            ]
            y_train_h_b = patches_tr_lb_h[
                batch * batch_size : (batch + 1) * batch_size, :, :, :
            ]
            loss_train = loss_train + model.train_on_batch(x_train_b, y_train_h_b)

        # Training loss
        loss_train = loss_train / n_batches_train
        print(
            "%d [Training loss: %f , Train acc.: %.2f%%]"
            % (epoch, loss_train[0, 0], 100 * loss_train[0, 1])
        )

        # Computing the number of batchs
        n_batches_val = patches_val.shape[0] // batch_size

        # Evaluating the model in the validation set
        for batch in range(n_batches_val):
            x_val_b = patches_val[
                batch * batch_size : (batch + 1) * batch_size, :, :, :
            ]
            y_val_h_b = patches_val_lb_h[
                batch * batch_size : (batch + 1) * batch_size, :, :, :
            ]
            loss_val = loss_val + model.test_on_batch(x_val_b, y_val_h_b)

        # validation loss
        loss_val = loss_val / n_batches_val
        print(
            "%d [Validation loss: %f , Validation acc.: %.2f%%]"
            % (epoch, loss_val[0, 0], 100 * loss_val[0, 1])
        )
        # Add early stopping


def test(model, patch_test):
    result = model.predict(patch_test)
    predicted_class = np.argmax(result, axis=-1)
    return predicted_class


def compute_metrics(true_labels, predicted_labels):
    accuracy = 100 * accuracy_score(true_labels, predicted_labels)
    f1score = 100 * f1_score(true_labels, predicted_labels, average=None)
    recall = 100 * recall_score(true_labels, predicted_labels, average=None)
    precision = 100 * precision_score(true_labels, predicted_labels, average=None)
    return accuracy, f1score, recall, precision


In [37]:
PATCH_SIZES = [32, 64, 128]
VALIDATION_SPLIT = 0.2
N_CHANNELS = train_image_normalized.shape[-1]
N_CLASSES = train_reference_encoded.shape[-1]

# weights are computed as the inverse of the frequency of each class in the training set
WEIGHTS = np.array(
    [
        np.sum(train_reference_encoded[:, :, i] == 1)
        for i in range(train_reference_encoded.shape[-1])
    ]
) / np.sum(train_reference_encoded == 1)

print("Training set shape:  ", train_image_normalized.shape)
print("Test set shape:      ", test_image_normalized.shape)
print("Patch sizes:         ", PATCH_SIZES)
print("Validation split:    ", VALIDATION_SPLIT)
print("Number of classes:   ", N_CLASSES)
print("Number of channels:  ", N_CHANNELS)
print("Weights:             ", WEIGHTS)

Training set shape:   (2565, 1919, 3)
Test set shape:       (2558, 2818, 3)
Patch sizes:          [32, 64, 128]
Validation split:     0.2
Number of classes:    5
Number of channels:   3
Weights:              [0.17625794 0.3359289  0.25502094 0.00267358 0.23011864]


In [43]:



for size in PATCH_SIZES:
    # # Extract training patches
    train_image_patches = extract_patches(train_image_normalized, size, size)
    train_reference_patches = extract_patches(train_reference_encoded, size, size)

    # Split the training patches into training and validation sets
    train_image_patches, train_reference_patches = shuffle(
        train_image_patches, train_reference_patches, random_state=0
    )

    train_size = int(train_image_patches.shape[0] * (1 - VALIDATION_SPLIT))
    validation_size = int(train_image_patches.shape[0] * VALIDATION_SPLIT)

    train_image_patches, validation_image_patches = (
        train_image_patches[:train_size],
        train_image_patches[train_size:],
    )

    train_reference_patches, validation_reference_patches = (
        train_reference_patches[:train_size],
        train_reference_patches[train_size:],
    )

    # # Train the model
    adam = Adam(lr=0.0001, beta_1=0.9)
    model = unet((size, size, N_CHANNELS), N_CLASSES)
    loss = weighted_categorical_crossentropy(WEIGHTS)
    model.summary()
    model.compile(loss=loss, optimizer=adam, metrics=["accuracy"])

    train_model(
        model,
        train_image_patches,
        train_reference_patches,
        validation_image_patches,
        validation_reference_patches,
        batch_size=32,
        epochs=100,
    )

    # # load the model
    # model = load_model(name)

    # Train the model

    # # Test the model
    # predicted_labels = test(model, patch_test)

    # # Metrics
    # compute_metrics(true_labels, predicted_labels)

    # # Plot the prediction (whole test image)




(32, 32, 3)


ValueError: A `Concatenate` layer requires inputs with matching shapes except for the concatenation axis. Received: input_shape=[(None, 64, 64, 512), (None, 32, 32, 512)]