# image Segmentation Model
## Testing out the MobileNet model

In [None]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.layers import Conv2D, Activation, BatchNormalization, Flatten
from keras.layers import UpSampling2D, Input, Concatenate, MaxPooling2D, SeparableConv2D, Resizing, Dropout, Conv2DTranspose,DepthwiseConv2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
from tensorflow.keras import layers, models
from keras.models import Model
from keras.applications import MobileNetV2
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from keras.metrics import Recall, Precision
from keras import backend as K
from tensorflow.keras.layers import Lambda
from sklearn.model_selection import train_test_split
from keras.models import load_model
print(tf.__version__)

Check for GPU and set up GPU

In [None]:
tf.config.list_physical_devices('GPU')
print("Num GPUs Available", len(tf.config.list_physical_devices('GPU')))

In [None]:
"""gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Restrict TensorFlow to only allocate a subset of the available memory on each GPU
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)"""

Date Preprocessing

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
IMAGE_SIZE = 224
EPOCHS = 80
BATCH = 8
LR = 1e-4
PATH = "final_data"
# PATH = "all_data-20240807T114638Z-001/all_data"
# PATH = "25 Landmarks-20240609T154533Z-001/25 Landmarks"

In [None]:
class CustomActivation(tf.keras.layers.Layer):
    def __init__(self, a=1.0, b=2.0,  fun_name = None, **kwargs):
        super(CustomActivation, self).__init__(**kwargs)
        self.a = a
        self.b = b
        self.fun_name = fun_name


    def call(self, inputs):
        x = tf.keras.layers.Activation('relu')(inputs)
        if self.fun_name =="activation_a":
            outputs = 1 - tf.math.exp(-self.a * x)
        if self.fun_name =="activation_b":
            outputs = tf.math.igamma(self.b, self.a*x)
        if self.fun_name =="activation_c":
            outputs = 1/(1+(inputs/self.a)**(-self.b))
        if self.fun_name == "activation_d":
            outputs = (2 / tf.constant(np.pi, dtype=tf.float32)) * tf.atan(tf.exp(inputs * tf.constant(np.pi / 2, dtype=tf.float32)))
        if self.fun_name == None:
            outputs = x
        return outputs


class CustomBatchScaling(tf.keras.layers.Layer):
    def __init__(self,fun_name = None, trainable=True):
        super(CustomBatchScaling, self).__init__()
        self.trainable = trainable
        self.fun_name = fun_name

    def build(self, input_shape):
        if self.fun_name =="activation_a":
            self.a = self.add_weight(name='a', shape=(input_shape[-1],),
                                     initializer='ones', trainable=self.trainable)
        else:
            self.a = self.add_weight(name='a', shape=(input_shape[-1],),
                                     initializer='ones', trainable=self.trainable)
            self.b = self.add_weight(name='b', shape=(input_shape[-1],),
                                    initializer='ones', trainable=self.trainable)

    def call(self, inputs):
        x = tf.keras.layers.Activation('relu')(inputs)
        if self.fun_name =="activation_a":
            outputs = 1 - tf.math.exp(-self.a * x)
        if self.fun_name =="activation_b":
            outputs = tf.math.igamma(self.b, self.a*x)
        if self.fun_name =="activation_c":
            outputs = 1/(1+(inputs/self.a)**(-self.b))
        if self.fun_name == None:
            outputs = x

        return outputs

    def get_config(self):
        config = super(CustomBatchScaling, self).get_config()
        config.update({
            'trainable': self.trainable
        })
        return config

## load the data (images as x_train and masks as y_train)

In [None]:
def load_data_with_matching_pairs(path, num_masks=3):
    input_dir = os.path.join(path, "data_images")
    target_dir = os.path.join(path, "2D Team 1/data_heatmaps_shifted30")

    input_files = [fname for fname in os.listdir(input_dir)]
    target_files = [fname for fname in os.listdir(target_dir)]

    # Create a dictionary to hold the mask paths
    masks = {}
    for fname in input_files:
        basename = os.path.splitext(fname)[0]
        mask_files = []
        for i in range(num_masks):
            # Check each index for multiple file formats
            found = False
            for ext in ['.png', '.jpg', '.jpeg', '.JPG']:
                potential_path = os.path.join(target_dir, f"{basename}_{i}{ext}")
                if os.path.exists(potential_path):
                    mask_files.append(potential_path)
                    found = True
                    break

        masks[basename] = mask_files
    input_basenames = set(os.path.splitext(fname)[0] for fname in input_files)
    target_basenames = set("_".join(fname.split('_')[:-1]) for fname in target_files)
    common_files = input_basenames & target_basenames
    images = []
    mask_sets = []
    for fname in common_files:
        # if "arpit" in fname.lower():
        #     continue
        found = False
        for ext in ['.png', '.jpg', '.jpeg', '.JPG']:
            image_path = os.path.join(input_dir, f"{fname}{ext}")
            if os.path.exists(image_path):
                images.append(image_path)
                mask_sets.append(masks[fname])
                found = True
                break
        if not found:
            print(f"Warning: No image found for {fname} in .png, .jpg, .jpeg, or .JPG format")

    print(f"Found {len(images)} images and {len(mask_sets)} mask sets")

    if len(images) != len(mask_sets):
        raise ValueError("Mismatch between the number of images and mask sets")
    return images, mask_sets

image_path, mask_path = load_data_with_matching_pairs(PATH)

In [None]:
def create_augmented_generator(X_train, y_train, batch, seed):

    # Define data augmentation for images
    image_datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0,
        channel_shift_range = 0,
        horizontal_flip=True,
        vertical_flip=False,
        fill_mode='constant',
        cval=0
    )

    # Define data augmentation for masks
    mask_datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0,
        channel_shift_range = 0,
        horizontal_flip=True,
        vertical_flip=False,
        fill_mode='constant',
        cval=0
    )

    # Fit the generators to your data (if necessary)
    image_datagen.fit(X_train, augment=True, seed=seed)
    mask_datagen.fit(y_train, augment=True, seed=seed)

    # Create the generators for images and masks
    image_generator = image_datagen.flow(X_train, batch_size=batch, seed=seed)
    mask_generator = mask_datagen.flow(y_train, batch_size=batch, seed=seed)
    train_generator = zip(image_generator, mask_generator)

    while True:
        for (img, mask) in train_generator:
            yield (img, mask)


In [None]:
def read_image(path_or_array):
    if isinstance(path_or_array, (str, bytes)):
        # If it's a path, read the image
        if isinstance(path_or_array, bytes):
            path_or_array = path_or_array.decode('utf-8')  # Convert bytes to string
        x = cv2.imread(path_or_array, cv2.IMREAD_COLOR)  # Read as color
        x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
        x = cv2.resize(x, (IMAGE_SIZE, IMAGE_SIZE))  # Resize the image
        x = x / 255.0  # Normalize to [0, 1]

    else:
        # If it's already an array, use it directly
        x = path_or_array

    return x

def read_mask(path_or_array):
    if isinstance(path_or_array, (str, bytes)):
        # If it's a path, read the mask
        if isinstance(path_or_array, bytes):
            path_or_array = path_or_array.decode('utf-8')  # Convert bytes to string
        x = cv2.imread(path_or_array, cv2.IMREAD_GRAYSCALE)  # Read as grayscale
        x = cv2.resize(x, (IMAGE_SIZE, IMAGE_SIZE))  # Resize the mask
        x = x / 255.0  # Normalize to [0, 1]
        x = np.expand_dims(x, axis=-1)  # Add a single channel

    else:
        # If it's already an array, use it directly
        x = path_or_array

    return x

In [None]:
# Convert paths to images and masks
import concurrent.futures

def paths_to_images_and_masks(image_paths, mask_paths):
    with concurrent.futures.ThreadPoolExecutor() as executor:

        images = list(executor.map(read_image, image_paths))

        masks = []
        for mask_set in mask_paths:
            mask_set_images = list(executor.map(read_mask, mask_set))
            combined_masks = np.stack(mask_set_images, axis=-1)
            masks.append(combined_masks.squeeze(-2))
    return np.array(images), np.array(masks)

image_data, mask_data = paths_to_images_and_masks(image_path, mask_path)

In [None]:
def augment_data(X_train, y_train, num_augmented=5):
    # Define data augmentation parameters
    datagen = ImageDataGenerator(
        rotation_range=20,
        width_shift_range=0.1,
        height_shift_range=0,
        shear_range=0.1,
        zoom_range=0,
        horizontal_flip=True,
        vertical_flip=False,
        fill_mode='constant',
        cval=0
    )

    # Preallocate numpy arrays for the augmented images
    x_train_extra = np.empty((num_augmented * len(X_train), *X_train.shape[1:]))
    y_train_extra = np.empty((num_augmented * len(y_train), *y_train.shape[1:]))

    # Generate augmented images
    for i in range(len(X_train)):
        x_image = X_train[i]
        x_image = np.expand_dims(x_image, axis=0)
        y_image = y_train[i]
        y_image = np.expand_dims(y_image, axis=0)

        # Generate specified number of augmented versions of the image
        image_it = datagen.flow(x_image, batch_size=1, seed=42)
        mask_it = datagen.flow(y_image, batch_size=1, seed=42)

        for j in range(num_augmented):
            x_aug = next(image_it)
            y_aug = next(mask_it)
            augmented_x_image = x_aug[0]
            augmented_y_image = y_aug[0]
            x_train_extra[i*num_augmented + j] = augmented_x_image
            y_train_extra[i*num_augmented + j] = augmented_y_image

    # Using np.vstack for efficient concatenation
    x_train_augmented = np.vstack([X_train, x_train_extra])
    y_train_augmented = np.vstack([y_train, y_train_extra])

    print(f"Original number of images: {len(X_train)}, Augmented combined number of images: {len(x_train_augmented)}")
    print(f"Original number of masks: {len(y_train)}, Augmented combined number of masks: {len(y_train_augmented)}")

    return x_train_augmented, y_train_augmented

# Example usage
image_data, mask_data = augment_data(image_data, mask_data, num_augmented=0)
print(f"Final number of images: {len(image_data)}, Final number of masks: {len(image_data)}")

In [None]:
# Slit data for train, validation and test
train_x, test_x, train_y, test_y = train_test_split(image_data, mask_data, test_size=0.05, random_state=42)
train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=0.2, random_state=42)
print(f"Train: {len(train_x)}, Validation: {len(valid_x)}, Test: {len(test_x)}")

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import RandomRotation

class CustomRotationLayer(tf.keras.layers.Layer):
    def __init__(self, max_angle, **kwargs):
        super(CustomRotationLayer, self).__init__(**kwargs)
        self.max_angle = max_angle / 180  # Convert degrees to radians

    def build(self, input_shape):
        self.input_spec = tf.keras.layers.InputSpec(shape=input_shape)
        # Create the RandomRotation layer here
        self.rotation_layer = RandomRotation(self.max_angle, fill_mode='reflect')

    def call(self, inputs, training=None):
        if training:
            # Use the RandomRotation layer to rotate the images
            rotated_images = self.rotation_layer(inputs)
            return rotated_images
        else:
            return inputs

    def compute_output_shape(self, input_shape):
        return input_shape

    def get_config(self):
        config = super(CustomRotationLayer, self).get_config()
        config.update({'max_angle': self.max_angle * 180})  # Convert radians back to degrees for serialization
        return config

In [None]:
def dice_loss(y_true, y_pred):
    smooth = 1e-15
    num_channels = 3

    def dice_coef_per_channel(y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)  # Ensure y_true is float32
        y_pred = tf.cast(y_pred, tf.float32)  # Ensure y_pred is float32
        # intersection = tf.reduce_sum(y_true * y_pred)
        intersection = tf.reduce_sum( tf.math.sqrt(y_true ) *  tf.math.sqrt (y_pred))
        dice = (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)
        return dice

    per_channel_losses = []

    for i in range(num_channels):
        y_true_channel = tf.reshape(y_true[..., i], [-1])
        y_pred_channel = tf.reshape(y_pred[..., i], [-1])
        dice_loss_per_channel = 1 - dice_coef_per_channel(y_true_channel, y_pred_channel)
        per_channel_losses.append(dice_loss_per_channel)

    average_loss = tf.reduce_mean(per_channel_losses)

    return average_loss

In [None]:
def create_dataset(train_x, train_y, valid_x, valid_y, batch):
    train_x_ = train_x.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)
    train_y_ = train_y.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)

    valid_x_ = valid_x.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)
    valid_y_ = valid_y.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)


    train_dataset = tf.data.Dataset.from_tensor_slices((train_x_, train_y_))
    train_dataset = train_dataset.shuffle(len(train_x_)).batch(batch).repeat().prefetch(tf.data.experimental.AUTOTUNE)

    valid_dataset = tf.data.Dataset.from_tensor_slices((valid_x_, valid_y_))
    valid_dataset = valid_dataset.batch(batch).repeat().prefetch(tf.data.experimental.AUTOTUNE)

    print(f"Dataset Created Successfully\nData Shape: Train: {train_x_.shape}, {train_y_.shape}\nValidation: {valid_x_.shape}, {valid_y_.shape}")

    return train_dataset, valid_dataset
train_dataset, valid_dataset = create_dataset(train_x, train_y, valid_x, valid_y, batch=BATCH)

In [None]:
custom_layers = {
    #"CustomRotationLayer": CustomRotationLayer,
    "dice_loss": dice_loss
}

'''prev_model = tf.keras.models.load_model('SavedModels5/model_Feb_12_xyz_3cascade', custom_objects= custom_layers)
prev_model.summary()

def create_dataset(train_x, train_y, valid_x, valid_y, test_x, test_y, batch=BATCH):
    # Ensure the shape of inputs
    train_x = train_x.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)
    train_y= train_y.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)
    valid_x = valid_x.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)
    valid_y = valid_y.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3)

    train_x_mask = (prev_model.predict(train_x) > 0.5)
    valid_x_mask = (prev_model.predict(valid_x) > 0.5)
    test_x_mask = (prev_model.predict(test_x) > 0.5)

    final_train_x = train_x + train_x_mask
    final_valid_x = valid_x + valid_x_mask
    final_test_x = test_x + test_x_mask

    train_dataset = create_augmented_generator(final_train_x, train_y, batch, seed=42)

    valid_dataset = tf.data.Dataset.from_tensor_slices((final_valid_x, valid_y))
    valid_dataset = valid_dataset.batch(batch)
    valid_dataset = valid_dataset.repeat()
    return valid_dataset, train_dataset, final_test_x

valid_dataset, train_dataset, new_test_x = create_dataset(train_x, train_y, valid_x, valid_y, test_x, test_y, batch=BATCH)'''

In [None]:
def display_batch(batch_images, batch_masks):
    plt.figure(figsize=(10, 30))
    for i in range(batch_images.shape[0]):
        plt.subplot(batch_images.shape[0], 2, 2*i + 1)
        plt.imshow(batch_images[i])
        plt.axis('off')

        plt.subplot(batch_images.shape[0], 2, 2*i + 2)
        plt.imshow(batch_masks[i].squeeze(), cmap='gray')
        plt.axis('off')
    plt.show()

# Get the first batch of images and masks
train_iterator = iter(train_dataset)
batch_images, batch_masks = next(train_iterator)
#batch_images, batch_masks = next(train_dataset)

# Display the batch
display_batch(batch_images.numpy(), batch_masks.numpy())
#display_batch(batch_images, batch_masks)


Model Compile

In [None]:
def create_model(image_size):
    inputs = Input(shape=(image_size, image_size, 3), name="input_image")

    # Load the MobileNetV2 model with pre-trained weights

    encoder = MobileNetV2(input_tensor=inputs, weights="imagenet", include_top=False, alpha=0.5)

    # Freeze/Not Freeze the weights of the MobileNetV2 model
    for layer in encoder.layers:
        layer.trainable = True

    skip_connection_names = ["input_image", "block_1_expand_relu", "block_3_expand_relu", "block_6_expand_relu"]
    encoder_output = encoder.get_layer("block_13_expand_relu").output

    f = [16, 32, 64, 128]

    # Decoder
    x = encoder_output
    for i in range(1, len(skip_connection_names)+1, 1):

        # Conv2D layer with upsampling
        x_skip = encoder.get_layer(skip_connection_names[-i]).output
        x = UpSampling2D((2, 2))(x)
        x = Concatenate()([x, x_skip])

        x = Conv2D(f[-i], (3, 3), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)

        x = Conv2D(f[-i],(3, 3), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)

    x = Conv2D(3, (1, 1), padding="same")(x)
    x = Activation("sigmoid")(x)

    model = Model(inputs, x)

    return model

In [None]:
from tensorflow.keras.utils import plot_model
model = create_model(IMAGE_SIZE)
opt = tf.keras.optimizers.Nadam(LR)
metrics = [dice_loss, Recall(), Precision()]
model.compile(loss=dice_loss, optimizer=opt, metrics=metrics)
print(model.summary())
plot_model(model, to_file='mobilenetv2_architecture.png', show_shapes=True, show_layer_names=True)

In [None]:
callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6, verbose=1),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    #ModelCheckpoint("./best_31.h5", save_weights_only=False, monitor='val_loss', mode='min')
]

In [None]:
train_steps = len(train_x)//BATCH
valid_steps = len(valid_x)//BATCH

if len(train_x) % BATCH != 0:
    train_steps += 1
if len(valid_x) % BATCH != 0:
    valid_steps += 1

print(f"Train steps: {train_steps}, Valid steps: {valid_steps}")

In [None]:
history =  model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=100,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks
)

In [None]:
model.save('SavedModels5/model_Mar_18_xyz_3cascade', save_format='tf')
# model.save('SavedModels5/model_Sept_10_xyz', save_format='h5')

Combining trained model and new model

In [None]:
custom_layers = {
    #"CustomRotationLayer": CustomRotationLayer,
    "dice_loss": dice_loss
}

trained_model = tf.keras.models.load_model('SavedModels5/model_Feb_12_xyz_3cascade', custom_objects=custom_layers)

In [None]:
from tensorflow.keras.utils import plot_model
import matplotlib.pyplot as plt

plot_model(trained_model, to_file="best_model.png", show_shapes=True, show_layer_names=True)

In [None]:
from tensorflow.keras.layers import Add, Concatenate

def clone_layer(layer):
    new_layer_name = f"new_{layer.name}"
    layer_config = layer.get_config()
    layer_config["name"] = new_layer_name
    new_layer = layer.__class__.from_config(layer_config)
    new_layer.build(layer.input_shape)

    original_weights = layer.get_weights()

    if len(original_weights) == 2:
        weights, biases = original_weights
        new_layer.set_weights([weights, biases])
    elif len(original_weights) == 1:
        new_layer.set_weights(original_weights)

    return new_layer

def combine_models(model_a, model_b):
    input_tensor = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3), name="shared_input")

    # Process input through Model A (Best model)
    outputs_a = {}
    current_used_layers = {}
    best_used_layers = {}
    x_a = input_tensor

    for layer in model_a.layers:
        if isinstance(layer, tf.keras.layers.InputLayer):
            continue

        if isinstance(layer, (tf.keras.layers.Add, tf.keras.layers.Concatenate)):
            x_1 = x_a
            if layer.input[1].name == "input_image":
                x_2 = input_tensor
            else:
                real_name1 = layer.input[0].name.split("/")[0]
                real_name2 = layer.input[1].name.split("/")[0]
                x_2 = None
                for layer_name, layer_output in best_used_layers.items():
                    if layer_name in {real_name1, real_name2}:
                        x_2 = layer_output
                        break
                if x_2 is None:
                    raise ValueError(f"Layer inputs {real_name1}, {real_name2} not found in best_used_layers.")

            x_a = layer([x_1, x_2])
            best_used_layers[layer.name.split("/")[0]] = x_a
            continue

        x_a = layer(input_tensor if layer.name == "Conv1" else x_a)
        outputs_a[layer.name] = (layer, x_a)  # Store (layer object, output tensor) pairs
        best_used_layers[layer.name] = x_a

    x_b = input_tensor

    # Process input through Model B (Current Model), inserting Model A's outputs where types match
    for layer in model_b.layers:
        print(f"\nProcessing Layer: {layer.name}")

        if isinstance(layer, tf.keras.layers.InputLayer):
            continue

        new_layer = clone_layer(layer)

        if isinstance(layer, tf.keras.layers.BatchNormalization):
            x_b = new_layer(x_b)
            print(f"Final Layer Output of {layer.name}: {x_b.shape}\n")
            current_used_layers[layer.name] = x_b
            continue

        if isinstance(layer, (tf.keras.layers.Add, tf.keras.layers.Concatenate)):
            x_1 = x_b
            print(f"{layer.input[0].name}")
            print(f"{layer.input[1].name}")

            if layer.input[1].name == "input_image":
                x_2 = input_tensor

            elif layer.input[1].name.split("/")[0] == "block_1_expand_relu":
                x_2 = current_used_layers["block_1_pad"]

            elif layer.input[1].name.split("/")[0] == "block_3_expand_relu":
                x_2 = current_used_layers["block_3_pad"]

            elif layer.input[1].name.split("/")[0] == "block_6_expand_relu":
                x_2 = current_used_layers["block_6_pad"]

            else:
                real_name1 = layer.input[0].name.split("/")[0]
                real_name2 = layer.input[1].name.split("/")[0]
                print(f"Real name 2:{real_name2}")
                x_2 = None
                for layer_name, layer_output in current_used_layers.items():
                    if layer_name in {real_name1, real_name2}:
                        x_2 = layer_output
                        break
                if x_2 is None:
                    raise ValueError(f"Layer inputs {real_name1}, {real_name2} not found in current_used_layers.")

            print(f"input 1: {x_1.name}")
            print(f"input 2: {x_2.name}")
            x_b = new_layer([x_1, x_2])
            current_used_layers[layer.name.split("/")[0]] = x_b
            continue

        else:
            for layer_a_name, (layer_a, output_a) in outputs_a.items():
                # Skip layers that have already been concatenated
                if layer_a_name in current_used_layers:
                    continue

                if isinstance(layer, type(layer_a)):
                    shape_a = output_a.shape
                    shape_b = layer.get_input_shape_at(0)
                    print(f"Shape a (from Model A): {shape_a}")
                    print(f"Shape b (from Model B input): {shape_b}")

                    current_used_layers[layer_a_name] = x_b

                    # Check if spatial dimensions match
                    if shape_a[1:] == shape_b[1:]:
                        print(f"Merging current model's {x_b.name} (shape: {shape_b}) with best model's {layer_a.name} (shape: {shape_a})")
                        x_b = Add(name=f"concat_{layer.name}")([x_b, output_a])
                    break

        x_b = new_layer(x_b)
        print(f"Final Layer Output of {layer.name}: {x_b.shape}\n")

    #x = Activation("sigmoid")(x_b)

    combined_model = Model(inputs=input_tensor, outputs=x_b)
    return combined_model

# Create the combined model
combined_model = combine_models(trained_model, model)

In [None]:
print(combined_model.summary())
plot_model(combined_model, to_file="combined_model.png", show_shapes=True, show_layer_names=True)

In [None]:
opt = tf.keras.optimizers.Nadam(LR)
metrics = [dice_loss, Recall(), Precision()]
combined_model.compile(loss=dice_loss, optimizer=opt, metrics=metrics)

for layer in combined_model.layers:
    print(layer.name, "Trainable:", layer.trainable)

In [None]:
history =  combined_model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=100,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks
)

In [None]:
import matplotlib.pyplot as plt

def plot_model_history(history):

    if 'loss' in history.history and 'val_loss' in history.history:
        plt.figure(figsize=(10, 5))

        # Plot training loss
        plt.plot(history.history['loss'], label='Training Loss')

        # Plot validation loss
        plt.plot(history.history['val_loss'], label='Validation Loss')

        # Set plot title and labels
        plt.title('Model Loss Over Epochs')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()

        # Display the plot
        plt.show()
    else:
        print("The provided history object does not contain 'loss' and 'val_loss'.")


plot_model_history(history)


In [None]:
combined_model.save('SavedModels5/model_Mar_18_xyz_3cascade', save_format='tf')
# combined_model.save('SavedModels5/model_Sept_10_xyz', save_format='h5')
















































































## Displaying the Result

In [None]:
custom_layers = {
    #"CustomRotationLayer": CustomRotationLayer,
    "dice_loss": dice_loss
}
load_trained_model = tf.keras.models.load_model('SavedModels5/model_Feb_12_xyz_3cascade', custom_objects= custom_layers)
load_combined_model = tf.keras.models.load_model('SavedModels5/model_Mar_18_xyz_3cascade', custom_objects= custom_layers)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def mask_parse(mask, channel):
    mask = np.squeeze(mask[..., channel])
    mask = [mask, mask, mask]
    mask = np.transpose(mask, (1, 2, 0))
    return mask

num_pred = 50

for i, (x, y) in enumerate(zip(test_x[:num_pred], test_y[:num_pred])):
    x = read_image(x)
    y = read_mask(y)

    # Predict using the loaded model on background-removed images (new_test_x)
    y_pred = load_trained_model.predict(np.expand_dims(test_x[i], axis=0))[0] > 0.5

    h, w, _ = x.shape
    white_line = np.ones((h, 10, 3))

    all_images = [
        x, white_line,  # Raw input image (test_x)
        test_x[i], white_line,  # Background-removed image (new_test_x)
        mask_parse(y, 0), white_line,  # Ground truth xy
        mask_parse(y_pred, 0), white_line,  # Predicted xy
        mask_parse(y, 1), white_line,  # Ground truth yz
        mask_parse(y_pred, 1), white_line,  # Predicted yz
        mask_parse(y, 2), white_line,  # Ground truth xz
        mask_parse(y_pred, 2)  # Predicted xz
    ]

    # Concatenate all the images horizontally
    image = np.concatenate(all_images, axis=1)

    # Plot the image grid
    fig = plt.figure(figsize=(25, 12))
    a = fig.add_subplot(1, 1, 1)

    a.text(70, -5, "Raw Image")
    a.text(300, -5, "Modified Image")
    a.text(560, -5, "GT XY")
    a.text(775, -5, "Predicted XY")
    a.text(1025, -5, "GT XZ")
    a.text(1240, -5, "Predicted XZ")
    a.text(1490, -5, "GT YZ")
    a.text(1700, -5, "Predicted YZ")

    imgplot = plt.imshow(image)
    plt.show()

In [None]:
import numpy as np

# Predict before saving
y_pred_before = combined_model.predict(np.expand_dims(test_x[0], axis=0))[0]

# Predict after loading
y_pred_after = load_combined_model.predict(np.expand_dims(test_x[0], axis=0))[0]

# Compare outputs
print("Are the predictions the same?", np.array_equal(y_pred_before, y_pred_after))
print(f"Min-Max before: {y_pred_before.min()} - {y_pred_before.max()}")
print(f"Min-Max after: {y_pred_after.min()} - {y_pred_after.max()}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

def mask_parse(mask, channel):
    mask = np.squeeze(mask[..., channel])
    mask = [mask, mask, mask]
    mask = np.transpose(mask, (1, 2, 0))
    return mask

num_pred = 50

for i, (x, y) in enumerate(zip(test_x[:num_pred], test_y[:num_pred])):
    x = read_image(x)
    y = read_mask(y)

    # Predict using the loaded model on background-removed images (new_test_x)
    y_pred = load_combined_model.predict(np.expand_dims(test_x[i], axis=0))[0] > 0.5

    h, w, _ = x.shape
    white_line = np.ones((h, 10, 3))

    all_images = [
        x, white_line,  # Raw input image (test_x)
        test_x[i], white_line,  # Background-removed image (new_test_x)
        mask_parse(y, 0), white_line,  # Ground truth xy
        mask_parse(y_pred, 0), white_line,  # Predicted xy
        mask_parse(y, 1), white_line,  # Ground truth yz
        mask_parse(y_pred, 1), white_line,  # Predicted yz
        mask_parse(y, 2), white_line,  # Ground truth xz
        mask_parse(y_pred, 2)  # Predicted xz
    ]

    # Concatenate all the images horizontally
    image = np.concatenate(all_images, axis=1)

    # Plot the image grid
    fig = plt.figure(figsize=(25, 12))
    a = fig.add_subplot(1, 1, 1)

    a.text(70, -5, "Raw Image")
    a.text(300, -5, "Modified Image")
    a.text(560, -5, "GT XY")
    a.text(775, -5, "Predicted XY")
    a.text(1025, -5, "GT XZ")
    a.text(1240, -5, "Predicted XZ")
    a.text(1490, -5, "GT YZ")
    a.text(1700, -5, "Predicted YZ")

    imgplot = plt.imshow(image)
    plt.show()

In [None]:
import tensorflow as np
import tensorflow as tf
import matplotlib.pyplot as plt

# Get a single batch from the dataset generator
x_batch, y_batch = next(iter(train_dataset))  # Fetch one batch

# Extract all layers dynamically from the model's computational graph
all_layers = []
layer_dict = {}

for layer in load_combined_model.layers:
    try:
        if isinstance(layer.output, list):  # If a layer has multiple outputs
            for i, out in enumerate(layer.output):
                layer_dict[f"{layer.name}_{i}"] = out
                all_layers.append(out)
        else:
            layer_dict[layer.name] = layer.output
            all_layers.append(layer.output)
    except Exception as e:
        print(f"Skipping {layer.name} due to error: {e}")

# Debug: Print all detected layers
print("\nCaptured Layer Outputs:")
for i, (layer_name, output) in enumerate(layer_dict.items()):
    print(f"{i+1}: {layer_name} -> Output Shape: {output.shape}")

# Create a model for visualization, capturing all layers dynamically
visualization_model = tf.keras.models.Model(inputs=load_combined_model.input, outputs=list(layer_dict.values()))

# Run the batch through the visualization model
layer_outputs = visualization_model.predict(x_batch)

# Plot all layer outputs
num_layers = len(layer_outputs)
cols = 5
rows = (num_layers // cols) + (1 if num_layers % cols != 0 else 0)

plt.figure(figsize=(25, 5 * rows))

for i, (layer_name, output) in enumerate(zip(layer_dict.keys(), layer_outputs)):
    plt.subplot(rows, cols, i + 1)
    plt.title(f"{layer_name}\nShape: {output.shape}", fontsize=10)

    if len(output.shape) == 4:  # Feature maps
        feature_map = output[0, :, :, 0]  # First feature map from the batch
        plt.imshow(feature_map, cmap='viridis')
        plt.axis("off")
    else:
        plt.text(0.5, 0.5, f"Output Shape: {output.shape}", ha='center', fontsize=8)

plt.tight_layout()
plt.show()