In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint
from keras import callbacks

In [2]:
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available: 1


In [3]:
from data_loading import OxfordPetDataset
from augmentation import random_flip, augmentation_layers_geometric, augmentation_layers_color
from models import clip_segmentation_model # 2_c exercise
from experiment import plot_results

# Load the data
dataset = OxfordPetDataset()
dataset.load_data()
dataset.one_hot_encoding()

# Resize and normalize
reshape_size = (256, 256)
dataset.res_norm(reshape_size)

train = dataset.train_raw
val = dataset.val_raw
test = dataset.test_raw
ds_info = dataset.ds_info
get_label_name = dataset.get_label_name

print("\n")

# for i, example in enumerate(train.take(2)):  # Take two examples
#     print(f"Example {i+1} - Image shape:", example['image'].shape)
#     image = example['image']
#     plt.figure(figsize=(10, 5))
#     plt.subplot(1, 2, i+1)
#     plt.imshow(image)
#     label = example['label']
#     print(f"Example {i+1} - Label = ", label.numpy())
#     mask = example['segmentation_mask']  # This is the segmentation mask
#     plt.imshow(mask, cmap='gray', alpha=0.5)
#     print(f"Example {i+1} - Mask shape:", mask.numpy().shape)
#     print("\n")
# plt.show()

Number of classes: 37
Number of training samples: 3312
Number of validation samples: 368
Number of test samples: 3669




In [4]:
def extract_image_mask(data):
    """Extract image and mask from dictionary"""
    return data['image'], data['segmentation_mask']

# Apply to all datasets
train = train.map(extract_image_mask)
val = val.map(extract_image_mask)
test = test.map(extract_image_mask)

In [8]:
def augment_image_mask(image, mask):
    # Add a batch dimension to the image and mask for the augmentation layers
    batched_image = tf.expand_dims(image, axis=0)
    batched_mask = tf.expand_dims(mask, axis=0)
    
    # Cast the mask to float32 to match the image's data type after augmentation
    batched_mask = tf.cast(batched_mask, tf.float32)
    
    # Apply color-based augmentations to the image only
    aug_image = augmentation_color(batched_image, training=True)
    
    # Concatenate the augmented image and mask to apply geometric augmentations
    combined = tf.concat([aug_image, batched_mask], axis=-1)
    
    # Apply geometric augmentations to both image and mask
    combined = augmentation_geometric(combined, training=True)
    
    # Split the concatenated tensor back into image and mask
    aug_image, aug_mask = tf.split(combined, [3, tf.shape(batched_mask)[-1]], axis=-1)
    
    # Remove the batch dimension and cast the mask back to uint8 if necessary
    aug_image = tf.squeeze(aug_image, axis=0)
    aug_mask = tf.squeeze(aug_mask, axis=0)
    aug_mask = tf.cast(aug_mask, tf.uint8)  # Cast back to original type if needed
    
    return aug_image, aug_mask

In [5]:
BATCH_SIZE = 32

# Assume `raw_train` is your unaugmented dataset
cached_train = train.cache()  # Cache the raw images & masks

# Batch the data and use prefetching to optimize loading speed
# Apply augmentation dynamically during training, only on the training dataset
# train_ds = cached_train.map(
#     augment_image_mask, 
#     num_parallel_calls=tf.data.AUTOTUNE
# ).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
train_ds = train.batch(batch_size=BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
val_ds = val.batch(batch_size=BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
test_ds = test.batch(batch_size=BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
# Initialize and compile model
from metrics import MeanIoUWrapper,dice_coefficient
from models import clip_segmentation_model
from tensorflow.keras.models import Model
import os

model = clip_segmentation_model(input_shape=reshape_size + (3,))
miou_metric = MeanIoUWrapper(num_classes=3)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy',miou_metric, dice_coefficient])
top_dir = "task_2c/baseline"
best_model_folder = os.path.join(top_dir, "Clip_model")
# Define callback to save the best model
checkpoint = ModelCheckpoint(
    best_model_folder,
    monitor='val_loss',
    save_best_only=True,
    mode='min',
    verbose=1
)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=20, restore_best_weights=True
)

In [7]:
# Train the model
history = model.fit(
    train_ds,
    epochs=100,
    validation_data=val_ds,
    callbacks=[checkpoint,early_stopping]
)

Epoch 1/3
Epoch 1: val_loss improved from inf to 0.33330, saving model to task_2c/baseline\Clip_model




INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets


INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets


Epoch 2/3
Epoch 2: val_loss improved from 0.33330 to 0.32184, saving model to task_2c/baseline\Clip_model




INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets


INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets


Epoch 3/3
Epoch 3: val_loss improved from 0.32184 to 0.29928, saving model to task_2c/baseline\Clip_model




INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets


INFO:tensorflow:Assets written to: task_2c/baseline\Clip_model\assets




In [9]:
import os 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

# Define folder paths
top_dir = "task_2c/baseline"
best_model_folder = os.path.join(top_dir, "Clip_model")
results_folder = os.path.join(top_dir, "Clip_model_results")
os.makedirs(results_folder, exist_ok=True)

# Load the model without compiling to avoid custom object restoration issues.
best_model = tf.keras.models.load_model(best_model_folder, compile=False)

# Re-compile the model with the custom metrics.
best_model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy', MeanIoUWrapper(num_classes=3), dice_coefficient]
)

# Evaluate on validation set
val_metrics = best_model.evaluate(val_ds, return_dict=True)
val_loss = val_metrics['loss']
val_accuracy = val_metrics['accuracy']
val_iou = val_metrics['mean_iou']
val_dice = val_metrics['dice_coefficient']

# Evaluate on test set
test_metrics = best_model.evaluate(test_ds, return_dict=True)
print(test_metrics)
test_loss = test_metrics['loss']
test_accuracy = test_metrics['accuracy']
test_iou = test_metrics['mean_iou']
test_dice = test_metrics['dice_coefficient']


# Save test results in a separate CSV file
test_results = {
    "Loss": [test_loss],
    "Accuracy": [test_accuracy],
    "IoU": [test_iou],
    "Dice Coefficient": [test_dice]
}
test_results_df = pd.DataFrame(test_results)
test_csv_path = os.path.join(results_folder, "test_metrics.csv")
test_results_df.to_csv(test_csv_path, index=False)
print(f"Test metrics saved to {test_csv_path}")

# Plot training curves and save training history if available.
# 'history' is assumed to be a variable obtained from model.fit() during training.
if 'history' in globals():
    # Plot training and validation loss
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    loss_plot_path = os.path.join(results_folder, "loss_plot.png")
    plt.savefig(loss_plot_path)
    plt.close()
    print(f"Loss plot saved to {loss_plot_path}")

    # Plot training and validation accuracy
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    accuracy_plot_path = os.path.join(results_folder, "accuracy_plot.png")
    plt.savefig(accuracy_plot_path)
    plt.close()
    print(f"Accuracy plot saved to {accuracy_plot_path}")

    # Plot training and validation IoU
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['mean_iou'], label='Training IoU')
    plt.plot(history.history['val_mean_iou'], label='Validation IoU')
    plt.title('Training and Validation IoU')
    plt.xlabel('Epoch')
    plt.ylabel('IoU')
    plt.legend()
    iou_plot_path = os.path.join(results_folder, "iou_plot.png")
    plt.savefig(iou_plot_path)
    plt.close()
    print(f"IoU plot saved to {iou_plot_path}")

    # Plot training and validation Dice Coefficient
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['dice_coefficient'], label='Training Dice Coefficient')
    plt.plot(history.history['val_dice_coefficient'], label='Validation Dice Coefficient')
    plt.title('Training and Validation Dice Coefficient')
    plt.xlabel('Epoch')
    plt.ylabel('Dice Coefficient')
    plt.legend()
    dice_plot_path = os.path.join(results_folder, "dice_plot.png")
    plt.savefig(dice_plot_path)
    plt.close()
    print(f"Dice Coefficient plot saved to {dice_plot_path}")

    # Save the training history to a CSV file
    history_df = pd.DataFrame(history.history)
    history_csv_path = os.path.join(results_folder, "training_history.csv")
    history_df.to_csv(history_csv_path, index=False)
    print(f"Training history saved to {history_csv_path}")
else:
    print("Training history is not available as 'history' variable.")











{'loss': 0.3214944005012512, 'accuracy': 0.8647025227546692, 'mean_iou': 0.6367175579071045, 'dice_coefficient': 0.864684522151947}
Test metrics saved to task_2c/baseline\Clip_model_results\test_metrics.csv
Loss plot saved to task_2c/baseline\Clip_model_results\loss_plot.png
Accuracy plot saved to task_2c/baseline\Clip_model_results\accuracy_plot.png
IoU plot saved to task_2c/baseline\Clip_model_results\iou_plot.png
Dice Coefficient plot saved to task_2c/baseline\Clip_model_results\dice_plot.png
Training history saved to task_2c/baseline\Clip_model_results\training_history.csv
