## UNet-Based Ship Segmentation Pipeline

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lorinctoldi/deep-learning/blob/main/model.ipynb)

This notebook covers an end-to-end pipeline for ship segmentation using a UNet model. 
It includes dataset preparation, model creation, training, evaluation, and visualization of predictions.

## Dataset Preparation

In [None]:
import numpy as np
from utils import load_masks

We load the masks into `masks_df` and replace empty strings with `NaN` values. 
This standardizes missing masks for images with no ships.

In [None]:
masks_df = load_masks()
masks_df['EncodedPixels'] = masks_df['EncodedPixels'].replace('', np.nan)

In [None]:
masks_df.head(3)

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

As the dataset is heavily imbalanced towards images with no ships (i.e., no masks), training directly on the full dataset would bias the model towards predicting empty images. To mitigate this, we first group images by the presence of masks. Images with at least one ship mask are treated as positive examples, while images with no masks are treated as negative examples.

Next, we limit the sample size to 10,000 images, taking 5,000 images with ships and 5,000 empty images. This ensures a balanced subset that is representative but more manageable for training.

Finally, we create a binary label for each image. Images containing at least one ship mask are assigned a label of 1, while empty images are assigned a label of 0. This simplifies balancing the dataset and splitting it for training, validation, and testing.

In [None]:
image_labels = masks_df.groupby('ImageId')['EncodedPixels'] \
            .apply(lambda x: 1 if x.notna().any() else 0) \
            .reset_index()

image_labels = image_labels.rename(columns={'EncodedPixels': 'label'})

In [None]:
ship_images = image_labels[image_labels['label'] == 1]
empty_images = image_labels[image_labels['label'] == 0]

SAMPLE_SIZE = 5000

ship_sample = ship_images.sample(
    n=min(SAMPLE_SIZE, len(ship_images)), random_state=42
)
empty_sample = empty_images.sample(
    n=min(SAMPLE_SIZE, len(empty_images)), random_state=42
)

balanced_df = pd.concat([ship_sample, empty_sample], axis=0).reset_index(drop=True)

We split the balanced dataset into:
- `train_df` (80%)  
- `temp_df` (20%), which is further split into:  
  - `val_df` (10%) for validation  
  - `test_df` (10%) for final evaluation  

Stratification ensures the proportion of ship vs empty images is consistent across splits.

In [None]:
train_df, temp_df = train_test_split(
    balanced_df,
    test_size=0.2,
    random_state=42,
    stratify=balanced_df['label']
)

val_df, test_df = train_test_split(
    temp_df,
    test_size=0.5,
    random_state=42,
    stratify=temp_df['label']
)

To retrieve all masks for a given efficiently, we created a dictionary mapping each `ImageId` to a list of RLE-encoded masks.

In [None]:
mask_dict = masks_df.groupby("ImageId")["EncodedPixels"].apply(list).to_dict()

In [None]:
import os
import tensorflow as tf
from utils import get_mask_from_rle
from constants import IMAGE_PATH

Since the dataset has been split into training, validation, and test sets, we need a pipeline to efficiently load each image along with its corresponding mask. For each ImageId in the DataFrame, we read the image from disk and retrieve all associated RLE masks, combining them into a single binary mask. The images are then resized to 256x256 and normalized to values between 0 and 1, while the masks are resized to the same resolution and converted into binary tensors. Finally, this process is wrapped into a `tf.data.Dataset` pipeline. The training dataset is shuffled and batched to improve training efficiency, whereas the validation and test datasets are batched but not shuffled to ensure consistent evaluation.

In [None]:
def load_image_and_mask(img_id: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
    """
    Load an image and its combined mask from disk.

    :param img_id: Tensor containing the image filename
    :return: Tuple of (image, mask) tensors with shapes (256,256,3) and (256,256,1)
    """
    img_id = img_id.numpy().decode("utf-8")

    path = os.path.join(IMAGE_PATH, img_id)
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [768, 768])
    img = img / 255.0

    rles = mask_dict.get(img_id, [])
    mask = np.zeros((768, 768), dtype=np.uint8)

    for rle in rles:
        if isinstance(rle, str):
            mask += get_mask_from_rle(rle)

    mask = tf.convert_to_tensor(mask[..., None], dtype=tf.float32)
    mask = tf.image.resize(mask, [768, 768])
    mask = tf.cast(mask > 0, tf.float32)

    return img, mask

In [None]:
def tf_load_image_and_mask(img_id: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
    """
    Load an image and its mask as TensorFlow tensors for use in a dataset.

    :param img_id: Image ID tensor (string)
    :return: Tuple of (image, mask) tensors with shapes (256,256,3) and (256,256,1)
    """
    img, mask = tf.py_function(
        load_image_and_mask,
        [img_id],
        [tf.float32, tf.float32]
    )
    img.set_shape([768, 768, 3])
    mask.set_shape([768, 768, 1])
    return img, mask

In [None]:
def make_dataset(df: pd.DataFrame, batch_size: int = 8, shuffle: bool = True) -> tf.data.Dataset:
    """
    Create a TensorFlow dataset of images and masks from a DataFrame.

    :param df: DataFrame containing 'ImageId' column
    :param batch_size: Number of samples per batch
    :param shuffle: Whether to shuffle the dataset
    :return: tf.data.Dataset yielding (image, mask) tuples
    """
    dataset = tf.data.Dataset.from_tensor_slices(df["ImageId"].values)
    dataset = dataset.map(tf_load_image_and_mask, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(1000)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
train_ds = make_dataset(train_df)
val_ds   = make_dataset(val_df, shuffle=False)
test_ds  = make_dataset(test_df, shuffle=False)

## Model Creation and Training

## Model Creation and Training

After trials and errors we decided to use the UNet architecture. It is a widely used CNN for image segmentation tasks, because it can capture both the overall structure and fine details of objects. 

We resized all images and masks to 256x256 to reduce memory usage and speed up training, even though using the full 768x768 resolution would likely produce more accurate segmentations. Similarly, we limited the dataset to a balanced sample of 10,000 images to save computational resources while maintaining a representative set of images with and without ships. Hyperparameters were chosen after testing multiple configurations to optimize segmentation performance.

The model outputs a single-channel probability map for each image, indicating the likelihood that each pixel belongs to a ship. To train the model, we use a combined loss function that equally weights Binary Crossentropy and Dice loss. Binary Crossentropy encourages correct pixel-wise predictions, while Dice loss directly measures the overlap between the predicted and true masks, which is particularly useful for handling the class imbalance between ship and background pixels. We also monitor Intersection over Union (IoU) and F2 score as metrics, which provide a more holistic evaluation of segmentation quality by considering both precision and recall, rather than just pixel-wise accuracy.

The Adam optimizer is used to update model parameters efficiently during training. Overall, this setup provides a balance between computational feasibility and segmentation performance, allowing us to experiment and evaluate the model within reasonable resource constraints.

### Metrics: IoU and F2 Score

Most of the images in this dataset are empty or mostly background. If we used simple accuracy as a metric, the model could just predict all zeros and still get a very high score. That wouldnâ€™t tell us anything about how well it actually detects ships. For this reason, we use Intersection over Union (IoU) and F2 score, which focus on how good the predicted masks are.

**Intersection over Union (IoU):**
IoU measures how much the predicted mask overlaps with the ground truth:

$IoU = \frac{\text{Intersection}}{\text{Union}} = \frac{TP}{TP + FP + FN}$

It basically counts how many pixels the prediction got right compared to the total pixels that should have been or were predicted as ships. A perfect mask gets 1, and no overlap gets 0. This helps us focus on actual ship pixels rather than the huge number of background pixels.

**F2 Score:**
The F-beta score combines precision and recall. We use F2, which weights recall more heavily because missing ship pixels is worse than predicting a few extra:

$F_2 = (1 + 2^2) \cdot \frac{Precision \cdot Recall}{(2^2 \cdot Precision) + Recall}$

- Precision: How many predicted ship pixels are actually ships.
- Recall: How many true ship pixels we actually detected.

F2 is high when the model finds most of the ship pixels, even if it makes a few extra predictions.

Why not accuracy?
Accuracy would mostly reflect background pixels and would make the model look better than it actually is. IoU and F2 give a more realistic picture of how well the model is actually detecting ships.


In [None]:
def dice_loss(y_true: tf.Tensor, y_pred: tf.Tensor, smooth: float = 1e-6) -> tf.Tensor:
    """
    Compute the Dice loss for binary segmentation.

    :param y_true: Ground truth mask tensor
    :param y_pred: Predicted mask tensor
    :param smooth: Small value to avoid division by zero
    :return: Dice loss value
    """
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return 1 - (2 * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

In [None]:
bce = tf.keras.losses.BinaryCrossentropy()

def combined_loss(y_true: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
    """
    Combine Binary Crossentropy and Dice loss for binary segmentation.

    :param y_true: Ground truth mask tensor
    :param y_pred: Predicted mask tensor
    :return: Combined loss value
    """
    return 0.5 * bce(y_true, y_pred) + 0.5 * dice_loss(y_true, y_pred)

In [None]:
def f2_score_metric(y_true: tf.Tensor, y_pred: tf.Tensor, beta: float = 2, smooth: float = 1e-6) -> tf.Tensor:
    """
    Compute the F-beta score for binary segmentation.

    :param y_true: Ground truth mask tensor
    :param y_pred: Predicted mask tensor
    :param beta: Weight of recall relative to precision (default=2)
    :param smooth: Small value to avoid division by zero
    :return: F-beta score tensor
    """
    y_pred = tf.cast(y_pred > 0.5, tf.float32)
    y_true = tf.cast(y_true, tf.float32)

    tp = tf.reduce_sum(y_true * y_pred)
    fp = tf.reduce_sum(y_pred) - tp
    fn = tf.reduce_sum(y_true) - tp

    fbeta = (1 + beta**2) * tp / ((1 + beta**2)*tp + beta**2 * fn + fp + smooth)
    return fbeta

In [None]:
def iou_metric(y_true: tf.Tensor, y_pred: tf.Tensor, smooth: float = 1e-6) -> tf.Tensor:
    """
    Compute the Intersection over Union (IoU) for binary segmentation.

    :param y_true: Ground truth mask tensor
    :param y_pred: Predicted mask tensor
    :param smooth: Small value to avoid division by zero
    :return: IoU score tensor
    """
    y_pred = tf.cast(y_pred > 0.5, tf.float32)
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
    return (intersection + smooth) / (union + smooth)

In [None]:

from tensorflow.keras import layers, Model

In [None]:
def unet(input_shape: tuple[int, int, int] = (768, 768, 3)) -> Model:
    """
    Build a simple UNet model for binary image segmentation.

    :param input_shape: Shape of the input image (height, width, channels)
    :return: Compiled Keras Model with UNet architecture
    """
    inputs = layers.Input(shape=input_shape)

    c1 = layers.Conv2D(16, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(16, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPool2D()(c1)

    c2 = layers.Conv2D(32, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(32, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPool2D()(c2)

    b = layers.Conv2D(64, 3, activation='relu', padding='same')(p2)
    b = layers.Conv2D(64, 3, activation='relu', padding='same')(b)

    u2 = layers.UpSampling2D()(b)
    u2 = layers.Concatenate()([u2, c2])
    c3 = layers.Conv2D(32, 3, activation='relu', padding='same')(u2)
    c3 = layers.Conv2D(32, 3, activation='relu', padding='same')(c3)

    u1 = layers.UpSampling2D()(c3)
    u1 = layers.Concatenate()([u1, c1])
    c4 = layers.Conv2D(16, 3, activation='relu', padding='same')(u1)
    c4 = layers.Conv2D(16, 3, activation='relu', padding='same')(c4)

    outputs = layers.Conv2D(1, 1, activation='sigmoid')(c4)
    return Model(inputs, outputs)

model = unet()

model.compile(optimizer="adam", loss=combined_loss, metrics=[iou_metric, f2_score_metric])

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import load_model

In [None]:
CHECKPOINT_PATH = "checkpoints/best_unet_model.h5"

checkpoint = tf.keras.callbacks.ModelCheckpoint(
    CHECKPOINT_PATH,
    monitor="val_loss",
    save_best_only=True,
    save_weights_only=False,
)

In [None]:
early_stop_iou = tf.keras.callbacks.EarlyStopping(
    monitor="val_iou_metric",
    patience=3,
    mode="max",
    restore_best_weights=True
)

early_stop_f2 = tf.keras.callbacks.EarlyStopping(
    monitor="val_f2_score_metric",
    patience=3,
    mode="max",
    restore_best_weights=True
)

In [None]:
if os.path.exists(CHECKPOINT_PATH):
    print(f"Checkpoint found at {CHECKPOINT_PATH}, skipping training.")
    model = load_model(
        CHECKPOINT_PATH,
        custom_objects={
            "iou_metric": iou_metric,
            "f2_score_metric": f2_score_metric,
            "combined_loss": combined_loss
        }
    )
else:
    print("Checkpoint not found, training the model.")
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=20,
        callbacks=[checkpoint, early_stop_iou, early_stop_f2],
    )

## Test / Evaluation / Inference

Once the UNet model is trained, we evaluate it on the held-out test set to see how well it generalizes. Since most images are mostly background, simple accuracy would be misleading, so we focus on metrics like IoU and F2 score, which better reflect how well the model detects ships and captures their shapes. The combined loss gives an overall sense of both pixel-level accuracy and mask overlap quality.

For inference, we take each image, resize it to 256x256 for the model, and predict a pixel-wise probability map. This map is then resized back to the original 768x768 resolution and thresholded to produce a binary mask highlighting the ships.

To understand how well the model performs, we visualize both curated examples and random test images. Curated picks illustrate strengths such as detecting low-contrast ships, handling multiple vessels, and correctly ignoring empty images, while also showing where minor noise or segmentation errors occur. Random samples give a broader view of performance across the dataset, helping us see both successes and typical failure modes in realistic scenarios.

In [None]:
results = model.evaluate(test_ds, return_dict=True)
print("Test Loss:", results["loss"])
print("Test IoU:", results["iou_metric"])
print("Test F2 Score:", results["f2_score_metric"])

In [None]:
from utils import get_image

In [None]:
def predict_mask(model, img_id, threshold: float = 0.5):
    img = get_image(img_id)

    img_resized = tf.image.resize(img, [768, 768])
    img_batch = tf.expand_dims(img_resized, 0)
    pred_mask = model.predict(img_batch)[0]

    pred_mask_full = tf.image.resize(pred_mask, [768, 768])
    pred_mask_2d = tf.squeeze(pred_mask_full, axis=-1)

    # Keep only pixels with probability >= threshold
    pred_mask_2d = tf.cast(pred_mask_2d >= threshold, tf.float32)
    
    return pred_mask_2d

In [None]:
from utils import visualize_image_with_mask

In [None]:
curated_picks = {
    "0a12e3118": "No ship detected, background correctly ignored.",
    "0a8d5d261": "Single ship with high contrast detected accurately.",
    "0a9fb0743": "Single ship partially out of frame still detected well.",
    "0a3b48a9c": "Multiple ships detected, minor noise in mask.",
    "0a9bc3e3a": "Connected ships detected as one, overall mask is good.",
    "0a814feb5": "Ship with low contrast detected, some mask noise.",
    "0a1174f25": "Two ships detected, major mask noise for partially covered ship.",
    "0a286fb15": "Segmentation errors present, but ships partially detected.",
}

for key in curated_picks.keys():
    print(f"{key}.jpg: {curated_picks[key]}")
    visualize_image_with_mask(key, predict_mask(model, key).numpy())

In [None]:
random_sample = test_df.sample(n=5)

for sample in random_sample.ImageId:
    img_id = sample.split('.')[0]
    visualize_image_with_mask(img_id, predict_mask(model, img_id).numpy())