# Semantic Segmentation of Mars terrain images

In [7]:
import numpy as np
import tensorflow as tf
import keras

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.regularizers import l2
from tensorflow.keras import backend as K
import matplotlib as mpl

import matplotlib as mpl
import matplotlib.pyplot as plt
from tensorflow.keras import layers as tfkl

from tensorflow.keras import layers, Model
from tensorflow.keras.applications import ConvNeXtBase

# Importing custom libraries tailored for this project
import sys
sys.path.append('/kaggle/input/augment4')
import AugmentationHelper

# Setting the seeds for reproducibility
seed = 42
np.random.seed(42)
tf.random.set_seed(42)

## ⏳ Load the Data

In [8]:
# Load the NpzFile
data = np.load("/kaggle/input/cleandata/Dataset_clean.npz")

# Inspect content of the NpzFile: contains two keys "training_set", "test_set"
print(data.keys())

# Extract training_set and test_set
training_set = data.get("training_set")
test_set = data.get("test_set")

# Print the shapes of both sets
print(f"The shape of the training set: {training_set.shape}")
print(f"The shape of the test set: {test_set.shape}")

# Split training set into input images and masks
images = training_set[:, 0, :, :]
masks = training_set[:, 1, :, :]

# Analyse the values of the grayscale images in the training set
print("---------------------------------------------------")
print("TRAINING SET INPUT IMAGES ANALYSIS")
print(f"The shape of the input images: {images.shape}")
print(f"Max pixel value: {images.max()}")
print(f"Min pixel value: {images.min()}")
print(f"Data type to encode pixel values: {images.dtype}")

# Analyse the values of the masks in the training set
print("---------------------------------------------------")
print("TRAINING SET OUTPUT MASKS ANALYSIS")
print(f"The shape of the output masks: {masks.shape}")
print(f"Max pixel value: {masks.max()}")
print(f"Min pixel value: {masks.min()}")
print(f"Data type to encode pixel values: {masks.dtype}")

# Analyse the values of the grayscale images in the test set
print("---------------------------------------------------")
print("TEST SET ANALYSIS")
print(f"Max pixel value: {test_set.max()}")
print(f"Min pixel value: {test_set.min()}")
print(f"Data type to encode pixel values: {test_set.dtype}")


KeysView(NpzFile '/kaggle/input/cleandata/Dataset_clean.npz' with keys: training_set, test_set)
The shape of the training set: (2102, 2, 64, 128)
The shape of the test set: (10022, 64, 128)
---------------------------------------------------
TRAINING SET INPUT IMAGES ANALYSIS
The shape of the input images: (2102, 64, 128)
Max pixel value: 254
Min pixel value: 2
Data type to encode pixel values: uint8
---------------------------------------------------
TRAINING SET OUTPUT MASKS ANALYSIS
The shape of the output masks: (2102, 64, 128)
Max pixel value: 4
Min pixel value: 0
Data type to encode pixel values: uint8
---------------------------------------------------
TEST SET ANALYSIS
Max pixel value: 255
Min pixel value: 1
Data type to encode pixel values: uint8


In [9]:
# Cast images to uint
images = images.astype(np.uint8)
masks = masks.astype(np.uint8)

# 10% of input data to both test and validation sets
test_ratio = 0.05 
validation_ratio = 0.05 / 0.9 

train_val_img, test_img, train_val_lbl, test_lbl = train_test_split(
    images, masks, test_size=test_ratio, random_state=seed
)
train_img, val_img, train_lbl, val_lbl = train_test_split(
    train_val_img, train_val_lbl, test_size=validation_ratio, random_state=seed
)

print(f"Number of images: {len(images)}")
print(f"Train: {len(train_img)}")
print(f"Validation: {len(val_img)}")
print(f"Test: {len(test_img)}")

Number of images: 2102
Train: 1885
Validation: 111
Test: 106


In [10]:
# Set number of classes
NUM_CLASSES = 5

# Set batch size for training
BATCH_SIZE = 32

# Set learning rate for the optimiser
LEARNING_RATE = 1e-3

# Set early stopping patience threshold
PATIENCE = 30

# Set maximum number of training epochs
EPOCHS = 200

## Preprocessing

In [11]:
def one_hot_encoding(image, mask):
    num_classes = 5
    tf.ensure_shape(mask, (64, 128))
    mask = tf.one_hot(mask, depth=num_classes, dtype=tf.uint8)
    print(mask.shape)
    return image, mask

# Reshape the input images to (64, 128, 1) and normalise to [0,1]
def reshape_input(image, mask):
    scaled_image = tf.cast(image, tf.float32) / 255
    reshaped_image = tf.expand_dims(scaled_image, axis=-1)
    return reshaped_image, mask


def data_flow(images, masks, batch_size, shuffle=True, augment=False, seed=None):

    dataset = tf.data.Dataset.from_tensor_slices((images, masks))

    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 2, seed=seed)

    if augment:
        aug_geometric = dataset.map(AugmentationHelper.map_geometric_transform, num_parallel_calls=tf.data.AUTOTUNE)
        aug_intensity = dataset.map(AugmentationHelper.map_intensity_transform, num_parallel_calls=tf.data.AUTOTUNE)
        aug_total = dataset.map(AugmentationHelper.map_total_transform, num_parallel_calls=tf.data.AUTOTUNE)
        dataset = dataset.concatenate(aug_geometric).concatenate(aug_intensity).concatenate(aug_total)


    dataset = dataset.map(reshape_input, num_parallel_calls=tf.data.AUTOTUNE)
    
    dataset = dataset.map(one_hot_encoding, num_parallel_calls=tf.data.AUTOTUNE)

    print(dataset.take(2))
    
    # Batch the dat
    dataset = dataset.batch(batch_size, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset


In [12]:
# Create the datasets
print("Creating datasets...")

train_dataset = data_flow(
    train_img, train_lbl,
    batch_size=BATCH_SIZE,
    shuffle=True,
    augment=True,
    seed=seed
)

val_dataset = data_flow(
    val_img, val_lbl,
    batch_size=BATCH_SIZE,
    shuffle=False
)

test_dataset = data_flow(
    test_img, test_lbl,
    batch_size=BATCH_SIZE,
    shuffle=False
)
print("Datasets created!")


# Check the shape of the data
for images, labels in train_dataset.take(2):
    input_shape = images.shape[1:]
    print(f"\nInput shape: {input_shape}")
    print("Images shape:", images.shape)
    print("Labels shape:", labels.shape)
    print("Labels dtype:", labels.dtype)
    break

Creating datasets...
(64, 128, 5)
<_TakeDataset element_spec=(TensorSpec(shape=(64, 128, 1), dtype=tf.float32, name=None), TensorSpec(shape=(64, 128, 5), dtype=tf.uint8, name=None))>
(64, 128, 5)
<_TakeDataset element_spec=(TensorSpec(shape=(64, 128, 1), dtype=tf.float32, name=None), TensorSpec(shape=(64, 128, 5), dtype=tf.uint8, name=None))>
(64, 128, 5)
<_TakeDataset element_spec=(TensorSpec(shape=(64, 128, 1), dtype=tf.float32, name=None), TensorSpec(shape=(64, 128, 5), dtype=tf.uint8, name=None))>
Datasets created!

Input shape: (64, 128, 1)
Images shape: (32, 64, 128, 1)
Labels shape: (32, 64, 128, 5)
Labels dtype: <dtype: 'uint8'>


## Build the Model

In [13]:
input_shape=(64,128,1)

def u_net(input_shape=input_shape, depth=4, 
          d_conv_count = 3, b_conv_count = 3, u_conv_count = 3, 
          start_filter=32, skip_connections=True, dropout=0.2):

    # Input Layer
    inputs = tfkl.Input(shape=input_shape, name='input_layer')

    x = inputs
    skipped = []
    # Downsampling
    for i in range(depth):
        for j in range(d_conv_count):
            x = tfkl.Conv2D(filters=start_filter,
                            kernel_size=(3,3),
                            strides=(1, 1),
                            padding='same')(x)
            x = tfkl.BatchNormalization()(x) 
            x = tfkl.ReLU()(x)

        if skip_connections:
            # Save the layer for skip connections
            skipped.append(x)

        x = tfkl.MaxPooling2D(pool_size=(2,2))(x)
        x = tfkl.Dropout(dropout)(x)
        
        start_filter = start_filter * 2
    
    # Bottleneck
    for j in range(b_conv_count):
        x = tfkl.Conv2D(filters=start_filter,
                            kernel_size=(3,3),
                            strides=(1, 1),
                            padding='same')(x)
        x = tfkl.BatchNormalization()(x) 
        x = tfkl.ReLU()(x)
    
    start_filter = start_filter // 2

    # Upsampling
    for i in range(depth):
        x = tfkl.UpSampling2D(2, interpolation='bilinear')(x)

        if skip_connections:
            x = tfkl.Concatenate()([x, skipped[depth - i - 1]])

        x = tfkl.Dropout(dropout)(x)

        for j in range(u_conv_count):
            x = tfkl.Conv2D(filters=start_filter,
                            kernel_size=(3,3),
                            strides=(1, 1),
                            padding='same')(x)
            x = tfkl.BatchNormalization()(x) ## remove?
            x = tfkl.ReLU()(x)
        start_filter = start_filter // 2
    

    # Output Layer
    outputs = tfkl.Conv2D(filters=NUM_CLASSES,
                        kernel_size=(1,1),
                        strides=(1, 1),
                        padding='same',
                        activation='softmax',
                        name="output_layer")(x)
    
    model = tf.keras.Model(inputs, outputs, name='UNet')
    return model

In [14]:
model = u_net()

# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=True, show_trainable=True)

### Metrics

In [15]:
# Define custom Mean Intersection Over Union metric
class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, name="mean_iou", dtype=None):
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype)
        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0
        self.labels_to_exclude = labels_to_exclude

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)
        y_true = tf.math.argmax(y_true, axis=-1)

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the state
        return super().update_state(y_true, y_pred, sample_weight)

In [16]:
# https://stackoverflow.com/questions/65125670/implementing-multiclass-dice-loss-function
def dice_loss(y_true, y_pred, smooth=1e-7):
    '''
    Dice coefficient for X categories. Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    
    # Remove the background class (class 0)
    y_true = y_true[..., 1:]  # Shape: [batch_size, height, width, num_classes - 1]
    y_pred = y_pred[..., 1:]  # Shape: [batch_size, height, width, num_classes - 1]
    
    # Flatten y_true and y_pred
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)

    # Compute intersection and denominator
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    
    return K.mean((2. * intersect / (denom + smooth)))

def dice_loss_multiclass(y_true, y_pred):
    '''
    Dice loss to minimize. Pass to model as loss during compile statement
    '''
    return 1 - dice_loss(y_true, y_pred)

def combined_multiclass_loss(y_true, y_pred, alpha=0.5, beta=0.5):
    dice = dice_loss_multiclass(y_true, y_pred)
    categorical_ce = tf.keras.losses.SparseCategoricalCrossentropy()(y_true, y_pred)
    return alpha * dice + beta * categorical_ce

# TODO
# boundary loss

In [17]:
# Compile the model
print("Compiling model...")
model.compile(
    loss= dice_loss_multiclass, #combined_multiclass_loss,#keras.losses.CategoricalFocalCrossentropy(), #dice_loss_multiclass, #tf.keras.losses.Dice, #tf.keras.losses.SparseCategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.AdamW(LEARNING_RATE),
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=NUM_CLASSES, labels_to_exclude=[0])]
)
print("Model compiled!")

Compiling model...
Model compiled!


## 🛠️ Train the Model

In [18]:
# Setup callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    mode='max',
    patience=PATIENCE,
    restore_best_weights=True
)

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.5,
    patience=3,
    mode="auto",
    min_lr=1e-5
)

In [None]:
# Train the model
history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

Epoch 1/200


I0000 00:00:1733584655.754429      89 service.cc:145] XLA service 0x7973b800cc10 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1733584655.754493      89 service.cc:153]   StreamExecutor device (0): Tesla P100-PCIE-16GB, Compute Capability 6.0


[1m  2/235[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m20s[0m 87ms/step - accuracy: 0.2167 - loss: 0.7980 - mean_iou: 0.0819   

I0000 00:00:1733584683.233994      89 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m235/235[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m128s[0m 367ms/step - accuracy: 0.4137 - loss: 0.5805 - mean_iou: 0.2085 - val_accuracy: 0.1697 - val_loss: 0.8072 - val_mean_iou: 0.0548 - learning_rate: 0.0010
Epoch 2/200
[1m235/235[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 346ms/step - accuracy: 0.5032 - loss: 0.4426 - mean_iou: 0.3377 - val_accuracy: 0.2757 - val_loss: 0.6881 - val_mean_iou: 0.1703 - learning_rate: 0.0010
Epoch 3/200
[1m235/235[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 345ms/step - accuracy: 0.4751 - loss: 0.4715 - mean_iou: 0.3133 - val_accuracy: 0.4316 - val_loss: 0.5170 - val_mean_iou: 0.1506 - learning_rate: 0.0010
Epoch 4/200
[1m235/235[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 345ms/step - accuracy: 0.5288 - loss: 0.4109 - mean_iou: 0.3667 - val_accuracy: 0.4955 - val_loss: 0.4450 - val_mean_iou: 0.3425 - learning_rate: 0.0010
Epoch 5/200
[1m235/235[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 338m

In [None]:
# Save the trained model to a file with the accuracy included in the filename
model_filename = 'model.keras'
model.save(model_filename)

# Delete the model to free up resources
del model

In [None]:
# Plot and display training and validation loss
plt.figure(figsize=(18, 3))
plt.plot(history['loss'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=2)
plt.plot(history['val_loss'], label='Validation', alpha=0.9, color='#5a9aa5', linewidth=2)
plt.title('Cross Entropy')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

# Plot and display training and validation accuracy
plt.figure(figsize=(18, 3))
plt.plot(history['accuracy'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=2)
plt.plot(history['val_accuracy'], label='Validation', alpha=0.9, color='#5a9aa5', linewidth=2)
plt.title('Accuracy')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

# Plot and display training and validation mean IoU
plt.figure(figsize=(18, 3))
plt.plot(history['mean_iou'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=2)
plt.plot(history['val_mean_iou'], label='Validation', alpha=0.9, color='#5a9aa5', linewidth=2)
plt.title('Mean Intersection over Union')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

## 📊 Prepare Your Submission

In our Kaggle competition, submissions are made as `csv` files. To create a proper `csv` file, you need to flatten your predictions and include an `id` column as the first column of your dataframe. To maintain consistency between your results and our solution, please avoid shuffling the test set. The code below demonstrates how to prepare the `csv` file from your model predictions.

In [None]:
X_test_submission = data["test_set"]
print(f"Test X shape: {X_test_submission.shape}")
X_test_submission = X_test_submission[..., np.newaxis] / 255.0
X_test_submission = tf.cast(X_test_submission, tf.float32)
X_test_submission = tf.image.grayscale_to_rgb(X_test_submission)

In [None]:
# Load UNet model without compiling
model = tfk.models.load_model('/kaggle/working/model.keras', compile=False)

# Compile the model with specified loss, optimizer, and metrics
model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    optimizer=tfk.optimizers.AdamW(LEARNING_RATE),
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=NUM_CLASSES, labels_to_exclude=[0])]
)

# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=True, show_trainable=True)

In [None]:
preds = model.predict(X_test_submission)
preds = np.argmax(preds, axis=-1)
print(f"Predictions shape: {preds.shape}")

In [None]:
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

# Create and download the csv submission file
submission_filename = f"submission.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)