## Set up

In [1]:
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import tensorflow as tf
import keras as tfk
import matplotlib.pyplot as plt
from keras import layers as tfkl

2024-12-01 15:49:22.821717: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-01 15:49:22.821967: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-01 15:49:22.824390: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-01 15:49:22.855795: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
SEED = 42

In [3]:
tfk.utils.set_random_seed(SEED)

In [4]:

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")

TensorFlow version: 2.16.1
Keras version: 3.3.3
GPU devices: 0


## Data

In [5]:
DATA_ROOT = Path("/kaggle/input/an2dl-hw2-clean")
if not DATA_ROOT.exists():
    DATA_ROOT = Path().absolute().parent / "data" / "clean"

DATA_ROOT

PosixPath('/home/tomaz/git/Politecnico/Subjects/deep-learning/an2dl/homework-2/data/clean')

In [6]:
with np.load(DATA_ROOT / "train.npz") as data:
    X_train = data["x"]
    y_train = data["y"]
with np.load(DATA_ROOT / "test.npz") as data:
    X_test = data["x"]

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}")

Training X shape: (2505, 64, 128)
Training y shape: (2505, 64, 128)
Test X shape: (10022, 64, 128)


In [7]:
# Add color channel and rescale pixels between 0 and 1
X_train = X_train[..., np.newaxis] / 255.0
X_test = X_test[..., np.newaxis] / 255.0

input_shape = X_train.shape[1:]
num_classes = len(np.unique(y_train))

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")

Input shape: (64, 128, 1)
Number of classes: 5


In [8]:
# Ensure the input shape for both images and masks is correct
X_train = np.squeeze(X_train)  # Remove any singleton dimensions
y_train = np.squeeze(y_train)  # Remove any singleton dimensions

# Check if the channel dimension exists, otherwise add it
if X_train.ndim == 3:  # If missing the channel dimension, add it
    X_train = X_train[..., np.newaxis]

if y_train.ndim == 3:  # Ensure y_train has the correct shape
    y_train = y_train[..., np.newaxis]

print(f"Shape of X_train after reshaping: {X_train.shape}")
print(f"Shape of y_train after reshaping: {y_train.shape}")

Shape of X_train after reshaping: (2505, 64, 128, 1)
Shape of y_train after reshaping: (2505, 64, 128, 1)


In [9]:
print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")

X_train shape: (2505, 64, 128, 1)
y_train shape: (2505, 64, 128, 1)


In [10]:
# Convert y_train to integers for class count
y_train_int = y_train.astype(np.int32)

# Calculate class weights based on pixel proportions
class_pixel_counts = np.bincount(
    y_train_int.flatten(), minlength=num_classes
)  # Count pixels for each class
total_pixels = np.sum(class_pixel_counts)  # Total number of pixels
class_weights = total_pixels / (class_pixel_counts + 1e-6)  # Inverse frequency
class_weights /= np.sum(class_weights)  # Normalize to sum to 1

print(f"Class pixel counts: {class_pixel_counts}")
print(f"Calculated class weights: {class_weights}")

Class pixel counts: [4988826 6957538 4776810 3770823   26963]
Calculated class weights: [0.00528795 0.00379166 0.00552265 0.00699599 0.97840175]


## Model

In [None]:
@dataclass
class Hyperparameter:
    # Structure
    activation: str = "relu"
    filter_size: int = 3
    transpose_filter_size: int = 2
    transpose_stride: int = 2
    max_pool_size: int = 2
    encoder_filters: tuple[int] = (64, 128, 256)
    latent_filters: int = 512
    decoder_filters: tuple[int] = (256, 128, 64)
    use_class_weights: bool = True
    # Training
    optimizer: tfk.Optimizer = tfk.optimizers.AdamW
    learning_rate: float = 1e-4
    epochs: int = 1000
    validation_split: float = 0.1
    # Callbacks
    es_patience: int = 30
    es_min_delta: float = 1e-4
    lr_factor: float = 0.5
    lr_patience: int = 5
    lr_min_lr: float = 1e-8


hp = Hyperparameter()

In [12]:
class VisualizeSegmentationCallback(tfk.callbacks.Callback):
    def __init__(self, X_train, y_train, num_images=2):
        super().__init__()
        self.X_train = X_train
        self.y_train = y_train
        self.num_images = num_images
        self.selected_indices = []

        # Identify images containing at least 4 classes
        for i in range(len(y_train)):
            if len(np.unique(y_train[i])) >= 4:  # Check for at least 4 classes
                self.selected_indices.append(i)
            if len(self.selected_indices) == num_images:
                break

    def on_epoch_end(self, epoch, logs=None):
        # Plot predictions for the selected images
        fig, axes = plt.subplots(self.num_images, 3, figsize=(15, self.num_images * 5))

        for idx, i in enumerate(self.selected_indices):
            # Extract image and ground truth
            X_sample = self.X_train[i : i + 1]  # Add batch dimension
            y_sample = self.y_train[i]

            # Predict on the image
            predicted_mask = self.model.predict(X_sample)
            predicted_mask = np.argmax(predicted_mask, axis=-1)[
                0
            ]  # Convert to class labels

            # Visualize the input, ground truth, and predicted mask
            axes[idx, 0].imshow(X_sample[0].squeeze(), cmap="gray")
            axes[idx, 0].set_title("Input Image")
            axes[idx, 0].axis("off")

            axes[idx, 1].imshow(y_sample, cmap="viridis")
            axes[idx, 1].set_title("Ground Truth Mask")
            axes[idx, 1].axis("off")

            axes[idx, 2].imshow(predicted_mask, cmap="viridis")
            axes[idx, 2].set_title(f"Predicted Mask (Epoch {epoch + 1})")
            axes[idx, 2].axis("off")

        plt.tight_layout()
        plt.show()

In [None]:
def residual_block(x, filters, size):
    # https://www.researchgate.net/figure/The-improved-U-Net-architecture-with-introduction-of-BN-BN-batch-normalization_fig2_355159616
    shortcut = x
    x = tfkl.Conv2D(filters, size, activation="relu", padding="same")(x)
    x = tfkl.LayerNormalization()(x)
    x = tfkl.Conv2D(filters, size, activation="relu", padding="same")(x)
    x = tfkl.LayerNormalization()(x)
    shortcut = tfkl.Conv2D(filters, (1, 1), padding="same")(shortcut)
    return tfkl.add([x, shortcut])


def weighted_categorical_crossentropy(weights):
    weights = tf.constant(weights, dtype=tf.float32)

    def loss(y_true, y_pred):
        # Remove the extra dimension from y_true using tf.squeeze
        y_true = tf.squeeze(y_true, axis=-1)

        y_true = tf.one_hot(tf.cast(y_true, tf.int32), depth=len(weights))
        y_pred = tf.clip_by_value(
            y_pred, tfk.backend.epsilon(), 1 - tfk.backend.epsilon()
        )
        return -tf.reduce_sum(y_true * tf.math.log(y_pred) * weights, axis=-1)

    return loss


# Dice Loss
def dice_loss(y_true, y_pred):
    numerator = 2 * tf.reduce_sum(y_true * y_pred, axis=(1, 2, 3))
    denominator = tf.reduce_sum(y_true + y_pred, axis=(1, 2, 3))
    # Calculate Dice loss per image in the batch
    dice_loss_per_image = 1 - numerator / (denominator + tfk.backend.epsilon())
    # Return the mean Dice loss across the batch
    return tf.reduce_mean(dice_loss_per_image)


# Hybrid Loss Function
def combined_loss(weights):
    ce_loss = weighted_categorical_crossentropy(weights)

    def loss(y_true, y_pred):
        # Calculate losses
        ce_loss_value = ce_loss(y_true, y_pred)
        dice_loss_value = dice_loss(y_true, y_pred)
        # Return the sum of the two losses
        return ce_loss_value + dice_loss_value

    return loss


# U-Net with Residual Connections
def unet_with_residual_blocks(hp: Hyperparameter, input_shape, num_classes):
    inputs = tfkl.Input(input_shape)
    x = inputs

    # Encoder
    encoder_outputs = []
    for encoder_filters in hp.encoder_filters:
        x = residual_block(x, encoder_filters, (hp.filter_size, hp.filter_size))
        encoder_outputs.append(x)
        x = tfkl.MaxPooling2D((hp.max_pool_size, hp.max_pool_size))(x)

    x = residual_block(x, hp.latent_filters, (hp.filter_size, hp.filter_size))

    # Decoder
    for i, decoder_filters in enumerate(hp.decoder_filters):
        x = tfkl.Conv2DTranspose(
            decoder_filters,
            (hp.transpose_filter_size, hp.transpose_filter_size),
            strides=(hp.transpose_stride, hp.transpose_stride),
            padding="same",
        )(x)
        x = tfkl.concatenate([x, encoder_outputs[-(i + 1)]])
        x = residual_block(x, decoder_filters, (hp.filter_size, hp.filter_size))

    outputs = tfkl.Conv2D(num_classes, (1, 1), activation="softmax")(x)

    return tfk.Model(inputs, outputs)


if not hp.use_class_weights:
    print("Not using class weights")
    class_weights = np.ones_like(class_weights)

# Initialize the model
input_shape = X_train.shape[1:]
loss = combined_loss(class_weights)
model = unet_with_residual_blocks(hp, input_shape, num_classes)
model.compile(
    optimizer=hp.optimizer(learning_rate=hp.learning_rate),
    loss=loss,
    metrics=["accuracy"],
)

model.summary()

In [14]:
early_stopping = tfk.callbacks.EarlyStopping(
    monitor="val_loss",  # Monitor loss instead of validation loss
    restore_best_weights=True,
    patience=hp.es_patience,
    min_delta=hp.es_min_delta,
    verbose=1,
)

reduce_lr = tfk.callbacks.ReduceLROnPlateau(
    monitor="val_loss",  # Monitor loss instead of validation loss
    factor=hp.lr_factor,
    patience=hp.lr_patience,
    min_lr=hp.lr_min_lr,
    verbose=1,
)

visualize_callback = VisualizeSegmentationCallback(X_train, y_train)

# Training the model
history = model.fit(
    X_train,
    y_train,
    callbacks=[early_stopping, reduce_lr, visualize_callback],
    epochs=hp.epochs,
    validation_split=hp.validation_split,
    verbose=2,
)

Epoch 1/1000


KeyboardInterrupt: 

In [None]:
model_filename = "model.keras"
model.save(model_filename)

print(f"Model saved to {model_filename}")

## Submission

In [None]:
print(f"X_test shape before reshaping: {X_test.shape}")

In [None]:
# Generate predictions
preds = model.predict(X_test)
preds = np.argmax(preds, axis=-1)  # Convert probabilities to class labels
print(f"Predictions shape: {preds.shape}")

In [None]:
# Set random seed for reproducibility
np.random.seed(42)
# Randomly select 20 indices for prediction
random_indices = np.random.choice(X_test.shape[0], size=20, replace=False)
print(f"Randomly selected indices for prediction (seed=42): {random_indices}")

# Select 20 random samples and their corresponding masks
X_sample = X_test[random_indices]


predicted_masks = preds

# Visualize the predictions
num_images = len(X_sample)  # Number of images to visualize
fig, axes = plt.subplots(num_images, 2, figsize=(15, num_images * 5))

for i in range(num_images):
    # Original Image
    ax_img = axes[i, 0]
    ax_img.imshow(X_sample[i].squeeze(), cmap="gray")
    ax_img.set_title("Input Image")
    ax_img.axis("off")

    # Predicted Mask
    ax_pred = axes[i, 1]
    ax_pred.imshow(predicted_masks[i], cmap="viridis")
    ax_pred.set_title("Predicted Mask")
    ax_pred.axis("off")

plt.tight_layout()
plt.show()

In [None]:
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [None]:
# Create and download the csv submission file
submission_filename = "submission.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)

In [None]:
len(submission_df.columns)

In [None]:
len(submission_df)