# Trying different network architecture - DeepLabV3



In [15]:
import os
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from tqdm import tqdm
from PIL import Image

IMAGE_SIZE = 128
BATCH_SIZE = 1
NUM_CLASSES = 5
NUM_TRAIN_IMAGES = 1000
NUM_VAL_IMAGES = 50

train_sat_folder = "./../Data_filtered_80_resampled_128_LANCZOS/train/sat/"
train_gt_folder = "./../Data_filtered_80_resampled_128_LANCZOS/train/gt/"

val_sat_folder = "./../Data_filtered_80_resampled_128_LANCZOS/validate/sat/"
val_gt_folder = "./../Data_filtered_80_resampled_128_LANCZOS/validate/gt/"

# Function to load and preprocess the dataset
def load_data(sat_folder, gt_folder, num_classes, batch_size=1):
    input_images = []
    output_masks = []

    filenames = [filename for filename in os.listdir(sat_folder) if filename.endswith(".tif")]
    num_batches = len(filenames) // batch_size

    for batch_idx in tqdm(range(num_batches), desc="Loading batches", unit="batch"):
        batch_filenames = filenames[batch_idx * batch_size : (batch_idx + 1) * batch_size]
        
        batch_inputs = []
        batch_outputs = []
        for filename in batch_filenames:
            input_path = os.path.join(sat_folder, filename)
            output_path = os.path.join(gt_folder, filename.replace("sat_", "gt_"))

            # Load and preprocess input image
            input_image = np.array(Image.open(input_path)) / 255.0  # Normalize to [0, 1]
            batch_inputs.append(input_image)

            # Load and preprocess output mask
            output_mask = np.array(Image.open(output_path))

            # Ensure that class indices are within the range [0, num_classes-1]
            output_mask = np.clip(output_mask, 0, num_classes - 1)
            output_mask = keras.utils.to_categorical(output_mask, num_classes=num_classes)
            batch_outputs.append(output_mask)

        input_images.append(np.array(batch_inputs))
        output_masks.append(np.array(batch_outputs))

    return np.vstack(input_images), np.vstack(output_masks)

# Load and preprocess the training dataset
X_train, y_train = load_data(train_sat_folder, train_gt_folder, NUM_CLASSES, BATCH_SIZE)

# Load and preprocess the validation dataset
X_val, y_val = load_data(val_sat_folder, val_gt_folder, NUM_CLASSES, BATCH_SIZE)

Loading batches: 100%|██████████| 5999/5999 [00:08<00:00, 737.12batch/s]
Loading batches: 100%|██████████| 2214/2214 [00:02<00:00, 816.33batch/s]


In [16]:
# deeplabv3_plus architecture found on https://keras.io/examples/vision/deeplabv3_plus/

def convolution_block(
    block_input,
    num_filters=256,
    kernel_size=3,
    dilation_rate=1,
    use_bias=False,
):
    x = layers.Conv2D(
        num_filters,
        kernel_size=kernel_size,
        dilation_rate=dilation_rate,
        padding="same",
        use_bias=use_bias,
        kernel_initializer=keras.initializers.HeNormal(),
        activation="relu"
    )(block_input)
    x = layers.BatchNormalization()(x)
    #return ops.nn.relu(x)
    return x


def DilatedSpatialPyramidPooling(dspp_input):
    dims = dspp_input.shape
    x = layers.AveragePooling2D(pool_size=(dims[-3], dims[-2]))(dspp_input)
    x = convolution_block(x, kernel_size=1, use_bias=True)
    out_pool = layers.UpSampling2D(
        size=(dims[-3] // x.shape[1], dims[-2] // x.shape[2]),
        interpolation="bilinear",
        #interpolation="lanczos",
    )(x)

    out_1 = convolution_block(dspp_input, kernel_size=1, dilation_rate=1)
    out_6 = convolution_block(dspp_input, kernel_size=3, dilation_rate=6)
    out_12 = convolution_block(dspp_input, kernel_size=3, dilation_rate=12)
    out_18 = convolution_block(dspp_input, kernel_size=3, dilation_rate=18)

    x = layers.Concatenate(axis=-1)([out_pool, out_1, out_6, out_12, out_18])
    output = convolution_block(x, kernel_size=1)
    return output


In [17]:
# deeplabv3_plus architecture found on https://keras.io/examples/vision/deeplabv3_plus/

def DeeplabV3Plus(image_size, num_classes):
    model_input = keras.Input(shape=(image_size, image_size, 3))
    preprocessed = keras.applications.resnet50.preprocess_input(model_input)
    resnet50 = keras.applications.ResNet50(
        weights="imagenet", include_top=False, input_tensor=preprocessed
    )
    x = resnet50.get_layer("conv4_block6_2_relu").output
    x = DilatedSpatialPyramidPooling(x)

    input_a = layers.UpSampling2D(
        size=(image_size // 4 // x.shape[1], image_size // 4 // x.shape[2]),
        interpolation="bilinear",
    )(x)
    input_b = resnet50.get_layer("conv2_block3_2_relu").output
    input_b = convolution_block(input_b, num_filters=48, kernel_size=1)

    x = layers.Concatenate(axis=-1)([input_a, input_b])
    x = convolution_block(x)
    x = convolution_block(x)
    x = layers.UpSampling2D(
        size=(image_size // x.shape[1], image_size // x.shape[2]),
        interpolation="bilinear",
    )(x)
    model_output = layers.Conv2D(num_classes, kernel_size=(1, 1), padding="same")(x)
    return keras.Model(inputs=model_input, outputs=model_output)

In [18]:
model_deeplabv3 = DeeplabV3Plus(image_size=IMAGE_SIZE, num_classes=NUM_CLASSES)
model_deeplabv3.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 128, 128, 3)]        0         []                            
                                                                                                  
 tf.__operators__.getitem_2  (None, 128, 128, 3)          0         ['input_3[0][0]']             
  (SlicingOpLambda)                                                                               
                                                                                                  
 tf.nn.bias_add_2 (TFOpLamb  (None, 128, 128, 3)          0         ['tf.__operators__.getitem_2[0
 da)                                                                ][0]']                        
                                                                                            

In [19]:
# https://www.tensorflow.org/tutorials/structured_data/imbalanced_data
METRICS = [
      keras.metrics.BinaryCrossentropy(name='cross entropy'),  # same as model's loss
      keras.metrics.MeanSquaredError(name='Brier score'),
      keras.metrics.TruePositives(name='tp'),
      keras.metrics.FalsePositives(name='fp'),
      keras.metrics.TrueNegatives(name='tn'),
      keras.metrics.FalseNegatives(name='fn'), 
      keras.metrics.BinaryAccuracy(name='accuracy'),
      keras.metrics.Precision(name='precision'),
      keras.metrics.Recall(name='recall'),
      keras.metrics.AUC(name='auc'),
      keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
]

In [20]:
# https://www.tensorflow.org/tutorials/structured_data/imbalanced_data
import matplotlib.pyplot as plt

def plot_metrics(history):
  metrics = ['loss', 'prc', 'precision', 'recall']
  for n, metric in enumerate(metrics):
    name = metric.replace("_"," ").capitalize()
    plt.subplot(2,2,n+1)
    plt.plot(history.epoch, history.history[metric], label='Train')
    plt.plot(history.epoch, history.history['val_'+metric],
             linestyle="--", label='Val')
    plt.xlabel('Epoch')
    plt.ylabel(name)
    if metric == 'loss':
      plt.ylim([0, plt.ylim()[1]])
    elif metric == 'auc':
      plt.ylim([0.8,1])
    else:
      plt.ylim([0,1])

    plt.legend()

In [2]:
#import os
#os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'

In [21]:
# warning: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of ... exceeds 10% of free system memory.
# adding steps_per_epoch=STEPS_PER_EPOCH to model.fit method
#os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import os
import tensorflow as tf


import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint

#STEPS_PER_EPOCH = 3000

#loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
loss = "categorical_crossentropy"
model_deeplabv3.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss=loss,
    #metrics=["accuracy"],
    metrics=METRICS,
)

#history = model_deeplabv3.fit(train_dataset, validation_data=val_dataset, epochs=25)

# Define the ModelCheckpoint callback
checkpoint = ModelCheckpoint(
    'deeplabv3_model_{epoch:02d}_{val_accuracy:.3f}.keras',
    monitor='val_accuracy',  # Monitor validation accuracy
    save_best_only=True,
    save_weights_only=False,  # Save entire model
    mode='max',  # Save the model with the highest validation accuracy
    verbose=1
)

model_history = model_deeplabv3.fit(X_train, y_train,
                          epochs=10,
                          batch_size=BATCH_SIZE,
                          validation_data=(X_val, y_val),
                          #class_weight=class_weight_dict,
                          #steps_per_epoch=STEPS_PER_EPOCH,
                          callbacks=[checkpoint]
)

plt.plot(model_history.history["loss"])
plt.title("Training Loss")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.show()

plt.plot(model_history.history["accuracy"])
plt.title("Training Accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.show()

plt.plot(model_history.history["val_loss"])
plt.title("Validation Loss")
plt.ylabel("val_loss")
plt.xlabel("epoch")
plt.show()

plt.plot(model_history.history["val_accuracy"])
plt.title("Validation Accuracy")
plt.ylabel("val_accuracy")
plt.xlabel("epoch")
plt.show()


2024-01-15 22:07:51.383858: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 1965752320 exceeds 10% of free system memory.


Epoch 1/10


In [None]:
plot_metrics(model_history)

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from PIL import Image
import matplotlib.pyplot as plt

from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

# Function to load and preprocess a batch of the dataset
def load_batch(sat_folder, gt_folder, filenames, target_size=(128, 128), num_classes=5):
    input_images = []
    output_masks = []

    for filename in filenames:
        sat_filename = filename
        gt_filename = filename.replace("sat_", "gt_")

        input_path = os.path.join(sat_folder, sat_filename)
        output_path = os.path.join(gt_folder, gt_filename)

        # Load and preprocess input image
        input_image = np.array(Image.open(input_path).resize(target_size)) / 255.0  # Normalize to [0, 1]
        input_images.append(input_image)

        # Load and preprocess output mask
        output_mask = np.array(Image.open(output_path).resize(target_size))
        
        # Ensure that class indices start from 0
        output_mask -= 1

        # Exclude class labels that are not present in the training set
        output_mask[output_mask >= num_classes] = 0

        output_mask = keras.utils.to_categorical(output_mask, num_classes=num_classes)
        output_masks.append(output_mask)

    return np.array(input_images), np.array(output_masks)

# Function to evaluate the model on a folder of images
def evaluate_model_on_folder(model, sat_folder, gt_folder, target_size=(128, 128)):
    # Get a list of image files in the folders
    sat_files = os.listdir(sat_folder)
    gt_files = os.listdir(gt_folder)

    # Ensure the order of files is the same
    sat_files.sort()
    gt_files.sort()

    # Load and preprocess the validation dataset
    X_val, y_val = load_batch(sat_folder, gt_folder, sat_files, target_size=target_size)

    # Predict on validation data
    y_pred = model.predict(X_val)

    # Convert predictions and ground truth to class labels
    y_pred_labels = np.argmax(y_pred, axis=-1)
    y_val_labels = np.argmax(y_val, axis=-1)

    # Flatten the 2D arrays to 1D
    y_pred_flat = y_pred_labels.flatten()
    y_val_flat = y_val_labels.flatten()

    # Define class names
    class_names = ["Unlabeled", "Buildings", "Woodlands", "Water", "Road"]

    # Compute confusion matrix
    conf_mat = confusion_matrix(y_val_flat, y_pred_flat)

    # Display confusion matrix as a heatmap
    plt.figure(figsize=(8, 8))
    sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    # Display classification report
    print("Classification Report:")
    print(classification_report(y_val_flat, y_pred_flat, target_names=class_names))

In [None]:
num_classes = NUM_CLASSES
input_size = (IMAGE_SIZE, IMAGE_SIZE, 3)
model_path = "./../Model/unet_model_19_0.936.keras"
model = keras.models.load_model(model_path)
#visualize_images(val_sat_folder, val_gt_folder, model_path=model_path, num_samples=5)
evaluate_model_on_folder(model, val_sat_folder, val_gt_folder, (IMAGE_SIZE, IMAGE_SIZE))