In [None]:
from IPython import get_ipython
from IPython.display import display
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import (
    BatchNormalization, ReLU, Dropout, Concatenate, Conv2D, DepthwiseConv2D, GlobalAveragePooling2D,
    Reshape, Dense, Multiply, Add, Activation, UpSampling2D
)
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model
from tensorflow.keras.applications import Xception, MobileNetV3Small, EfficientNetV2S
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint
import pandas as pd
import gc

def load_dataset(image_dir, mask_dir, size=512, max_images=None):
    """Loads and processes the dataset of images and masks."""
    image_dataset, mask_dataset = [], []

    # Get all image and mask files directly from the main directory
    image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]
    mask_files = [f for f in os.listdir(mask_dir) if os.path.isfile(os.path.join(mask_dir, f))]

    # Ensure the number of image and mask files are the same
    num_files = min(len(image_files), len(mask_files))
    image_files = image_files[:num_files]
    mask_files = mask_files[:num_files]

    # Limit the number of images loaded if max_images is specified
    if max_images:
        image_files = image_files[:max_images]
        mask_files = mask_files[:max_images]

    # Load images and masks
    for img_file, mask_file in zip(image_files, mask_files):
        img_path = os.path.join(image_dir, img_file)
        mask_path = os.path.join(mask_dir, mask_file)

        try:
            image = Image.open(img_path).convert('RGB').resize((size, size))
            mask = Image.open(mask_path).convert('L').resize((size, size))

            image_dataset.append(np.array(image) / 255.0)
            mask_dataset.append(np.array(mask) / 255.0)
        except Exception as e:
            print(f"Error loading {img_path} or {mask_path}: {e}")

    return np.array(image_dataset), np.expand_dims(np.array(mask_dataset), axis=-1)

def split_dataset(image_dataset, mask_dataset, val_size=0.1, test_size=0.2, random_state=0):
    """Splits the dataset into train, validation, and test sets."""
    # First splitting into train+val and test
    X_train_val, X_test, y_train_val, y_test = train_test_split(
        image_dataset, mask_dataset, test_size=test_size, random_state=random_state
    )
    # Then splitting train+val into train and val
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=val_size / (1 - test_size), random_state=random_state
    )
    return X_train, X_val, X_test, y_train, y_val, y_test

## Layers

def DenseDDSSPPLayer(inputs, growth_rate, dilation_rate, dropout_rate=0.0, efficient=True):
    """A single DenseDDSSPP Layer with optional efficiency improvements."""
    x = BatchNormalization()(inputs)
    x = ReLU()(x)
    x = Conv2D(filters=growth_rate, kernel_size=1, use_bias=False, kernel_regularizer=l2(1e-4))(x)

    if efficient:
        x = DepthwiseConv2D(kernel_size=3, dilation_rate=dilation_rate, padding='same', use_bias=False)(x)
        x = Conv2D(filters=growth_rate, kernel_size=1, use_bias=False)(x)
    else:
        x = Conv2D(filters=growth_rate, kernel_size=3, dilation_rate=dilation_rate, padding='same',
                   use_bias=False, kernel_regularizer=l2(1e-4))(x)

    x = BatchNormalization()(x)
    x = ReLU()(x)
    if dropout_rate > 0:
        x = Dropout(rate=dropout_rate)(x)

    return x

def DenseDDSSPP(inputs, growth_rate=48, dropout_rate=0.1):
    """DenseDDSSPP module with multiple dilation rates."""
    x0 = inputs
    x_initial = Conv2D(filters=growth_rate, kernel_size=1, use_bias=False, kernel_regularizer=l2(1e-4))(x0)
    x_initial = BatchNormalization()(x_initial)
    x_initial = ReLU()(x_initial)
    x0 = Concatenate()([x0, x_initial])

    dilation_rates = [6, 12, 24, 36, 48]
    for rate in dilation_rates:
        x = DenseDDSSPPLayer(x0, growth_rate, rate, dropout_rate)
        x0 = Concatenate()([x0, x])

    return x0

def se_block(input_tensor, num_filters, ratio=16):
    """Squeeze-and-Excitation Block."""
    se_shape = (1, 1, num_filters)
    se = GlobalAveragePooling2D()(input_tensor)
    se = Reshape(se_shape)(se)
    se = Dense(num_filters // ratio, activation='relu', use_bias=False, kernel_regularizer=l2(1e-4))(se)
    se = Dense(num_filters, activation='sigmoid', use_bias=False, kernel_regularizer=l2(1e-4))(se)
    return Multiply()([input_tensor, se])

def scse_block(input_tensor, num_filters, ratio=16):
    """Concurrent Spatial and Channel Squeeze & Excitation (scSE) Block."""
    # Channel Squeeze & Excitation
    cse = GlobalAveragePooling2D()(input_tensor)
    cse = Reshape((1, 1, num_filters))(cse)
    cse = Dense(num_filters // ratio, activation='relu', use_bias=False, kernel_regularizer=l2(1e-4))(cse)
    cse = Dense(num_filters, activation='sigmoid', use_bias=False, kernel_regularizer=l2(1e-4))(cse)
    channel_attention = Multiply()([input_tensor, cse])

    # Spatial Squeeze & Excitation
    sse = Conv2D(1, (1, 1), activation='sigmoid', use_bias=False, kernel_regularizer=l2(1e-4))(input_tensor)
    spatial_attention = Multiply()([input_tensor, sse])

    # Concurrent Fusion
    scse_output = Add()([channel_attention, spatial_attention])
    return scse_output

def eca_block(input_tensor, b=1, gamma=2):
    """Efficient Channel Attention (ECA) Block."""
    num_channels = input_tensor.shape[-1]
    kernel_size = int(abs((tf.math.log(float(num_channels), 2) + b) / gamma))
    kernel_size = kernel_size if kernel_size % 2 == 1 else kernel_size + 1

    x = GlobalAveragePooling2D()(input_tensor)
    x = Reshape((1, 1, num_channels))(x)
    x = Conv2D(1, kernel_size=kernel_size, padding='same', use_bias=False, groups=1,
               kernel_regularizer=l2(1e-4))(x)
    x = Activation('sigmoid')(x)
    return Multiply()([input_tensor, x])

## Model

def DeepLabV3PlusDenseDDSSPP(input_shape=(512, 512, 3), backbone_network='EfficientNetV2S', attention_module='se_block'):
    """DeepLabV3+ with DenseDDSSPP integration, accepting backbone and attention mechanism as parameters."""
    inputs = tf.keras.Input(input_shape)

    if backbone_network == 'Xception':
        base_model = Xception(weights='imagenet', include_top=False, input_tensor=inputs)
        image_features_layer_name = 'block13_sepconv2_bn' # A deeper layer
        skip_connection_layer_name = 'block3_sepconv2_bn' # A shallower layer
    elif backbone_network == 'MobileNetV3Small':
        base_model = MobileNetV3Small(weights='imagenet', include_top=False, input_tensor=inputs)
        image_features_layer_name = 'expanded_conv_7/depthwise/BatchNorm' # A deeper layer
        skip_connection_layer_name = 'expanded_conv/depthwise_bn' # A shallower layer
    elif backbone_network == 'EfficientNetV2S':
        base_model = EfficientNetV2S(weights='imagenet', include_top=False, input_tensor=inputs)
        image_features_layer_name = 'block6a_expand_bn' # A deeper layer
        skip_connection_layer_name = 'block2d_add' # A shallower layer
    else:
        raise ValueError(f"Unsupported backbone network: {backbone_network}. Choose from 'Xception', 'MobileNetV3Small', 'EfficientNetV2S'.")

    image_features = base_model.get_layer(image_features_layer_name).output
    spatial_shape_bfr_ddsspp = K.int_shape(image_features)[1:3]
    print(f"Ukuran spasial sebelum DenseDDSSPP: {spatial_shape_bfr_ddsspp}")

    x_a = DenseDDSSPP(image_features)
    spatial_shape_after_ddsspp = K.int_shape(x_a)[1:3]
    print(f"Ukuran spasial setelah DenseDDSSPP: {spatial_shape_after_ddsspp}")
    x_a = UpSampling2D((4, 4), interpolation="bilinear")(x_a)
    spatial_shape_after_upsampling = K.int_shape(x_a)[1:3]
    print(f"Ukuran spasial setelah UpSampling2D: {spatial_shape_after_upsampling}")

    x_b = base_model.get_layer(skip_connection_layer_name).output
    x_b = Conv2D(filters=48, kernel_size=1, padding='same', use_bias=False)(x_b)
    x_b = BatchNormalization()(x_b)
    x_b = Activation('relu')(x_b)

    # Ensure x_b is resized to match x_a's spatial dimensions after upsampling
    target_height, target_width = spatial_shape_after_upsampling
    x_b = tf.image.resize(x_b, [target_height, target_width])

    x = Concatenate()([x_a, x_b])
    num_channels_after_concat = x.shape[-1]
    print(f"num_channels_after_concat: {num_channels_after_concat}")

    if attention_module == 'se_block':
        x = se_block(x, num_filters=num_channels_after_concat)
    elif attention_module == 'scse_block':
        x = scse_block(x, num_filters=num_channels_after_concat)
    elif attention_module == 'eca_block':
        x = eca_block(x) # ECA block typically infers num_channels
    else:
        print(f"Warning: Unknown attention module '{attention_module}'. No attention block applied.")

    for _ in range(4):  # Repeating Conv2D-BatchNorm-Activation layers
        x = Conv2D(filters=256, kernel_size=3, padding='same', use_bias=False)(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)

    x = UpSampling2D((4, 4), interpolation="bilinear")(x)
    x = Conv2D(1, (1, 1), name='output_layer')(x)
    x = Activation('sigmoid')(x)

    model = Model(inputs=inputs, outputs=x)
    return model

## Metrics

def precision_m(y_true, y_pred):
    y_pred = tf.round(y_pred)
    true_positives = tf.reduce_sum(tf.cast(y_true * y_pred, tf.float32))
    predicted_positives = tf.reduce_sum(tf.cast(y_pred, tf.float32))
    precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
    return precision

def recall_m(y_true, y_pred):
    y_pred = tf.round(y_pred)
    true_positives = tf.reduce_sum(tf.cast(y_true * y_pred, tf.float32))
    possible_positives = tf.reduce_sum(tf.cast(y_true, tf.float32))
    recall = true_positives / (possible_positives + tf.keras.backend.epsilon())
    return recall

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))

def iou_m(y_true, y_pred):
    y_pred = tf.round(y_pred)
    intersection = tf.reduce_sum(tf.cast(y_true * y_pred, tf.float32))
    union = tf.reduce_sum(tf.cast(y_true + y_pred, tf.float32)) - intersection
    iou = intersection / (union + tf.keras.backend.epsilon())
    return iou

In [None]:
## Training

IMAGE_DIR = "/content/drive/MyDrive/data/images"
MASK_DIR = "/content/drive/MyDrive/data/masks"
MODEL_SAVE_PATH = "/content/drive/MyDrive/model_checkpoint.h5"

# Load and split dataset
image_dataset, mask_dataset = load_dataset(IMAGE_DIR, MASK_DIR, size=512, max_images=4216)
X_train, X_val, X_test, y_train, y_val, y_test = split_dataset(image_dataset, mask_dataset)

# Define and compile the model
# Example usage:
model = DeepLabV3PlusDenseDDSSPP(input_shape=(512, 512, 3), backbone_network='Xception', attention_module='scse_block')
# Uncomment below to try other configurations
# model = DeepLabV3PlusDenseDDSSPP(input_shape=(512, 512, 3), backbone_network='MobileNetV3Small', attention_module='eca_block')
# model = DeepLabV3PlusDenseDDSSPP(input_shape=(512, 512, 3), backbone_network='EfficientNetV2S', attention_module='se_block')

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.001, decay_steps=10000, decay_rate=0.96, staircase=True
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

model.compile(
    optimizer=optimizer,
    loss='binary_crossentropy',
    metrics=['accuracy', iou_m, f1_m]
)

# Setup checkpointing
checkpoint = ModelCheckpoint(MODEL_SAVE_PATH, monitor='val_loss', save_best_only=True, verbose=1)

# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=5,
    callbacks=[checkpoint],
    verbose=1
)

In [None]:
## Testing

MODEL_PATH_BEST = "/content/drive/MyDrive/model_checkpoint_512_Xception_seblock_best.h5" # Using forward slashes

# Free up memory
del image_dataset, mask_dataset
gc.collect()

# Load the best model for testing
# Rebuild the model structure before loading weights
model = DeepLabV3PlusDenseDDSSPP(input_shape=(512, 512, 3), backbone_network='Xception', attention_module='scse_block') # Specify correct backbone and attention
model.load_weights(MODEL_PATH_BEST)

# Compile model for evaluation (optimizer doesn't affect evaluation, but metrics are needed)
model.compile(
    optimizer='adam', # Optimizer is required, but its value doesn't impact evaluation results
    loss='binary_crossentropy',
    metrics=[precision_m, recall_m, f1_m, iou_m]
)

# Set smaller batch size to avoid memory issues
BATCH_SIZE = 2

# Evaluate the model on the test set with batching
test_loss, test_precision, test_recall, test_f1, test_iou = model.evaluate(
    X_test, y_test,
    batch_size=BATCH_SIZE,
    verbose=1
)

print(f"\nTest Loss: {test_loss}")
print(f"Test Precision: {test_precision}")
print(f"Test Recall: {test_recall}")
print(f"Test F1 Score: {test_f1}")
print(f"Test IoU: {test_iou}")

# Store results in a DataFrame and save to Excel
results = pd.DataFrame({
    'Metric': ['test_loss', 'test_precision', 'test_recall', 'test_f1', 'test_iou'],
    'Value': [test_loss, test_precision, test_recall, test_f1, test_iou]
})

excel_file = 'test_results_512_Xception_seblock.xlsx'
results.to_excel(excel_file, index=False)

print(f"\nHasil testing disimpan dalam file: {excel_file}")