In [None]:
import os
import matplotlib.pyplot as plt
import tensorflow as tf
from keras import layers
from keras.models import Model
from keras.layers import (Conv2D, MaxPooling2D, BatchNormalization, Input, Activation)
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, Callback
from keras.preprocessing.image import load_img, img_to_array
from keras.optimizers import Adam
from keras.losses import categorical_crossentropy
from tensorflow.keras.applications import ResNet50, VGG16

import numpy as np
from tqdm import tqdm

In [None]:
IMG_SIZE = (256, 256)
# ROOT_DIR = "datasets/dataset_split"
ROOT_DIR = "/kaggle/input/turbine-blades-2/datasets/dataset_split"

In [None]:
# Mengambil nama-nama kelas dari direktori
CLASS_NAMES = ['ablation', 'breakdown', 'fracture', 'groove']

# Membuat mapping dari nama kelas ke indeks
CLASS_TO_INDEX = {name: index for index, name in enumerate(CLASS_NAMES)}

# Menghitung jumlah kelas
NUM_CLASSES = len(CLASS_NAMES) + 1

# Load data

In [None]:
def load_image_mask(image_path, mask_path):
    # Gambar dibaca, di-decode, dan di-resize (format PNG).
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    # Standardisasi ke rentang 0.0 - 1.0
    img = tf.cast(img, tf.float32) / 255.0

    # Mask dibaca, di-decode, dan di-resize.
    msk = tf.io.read_file(mask_path)
    msk = tf.image.decode_png(msk, channels=1)
    msk = tf.image.resize(msk, IMG_SIZE, method='nearest')
    msk = tf.cast(msk, tf.int32)  # pastikan integer
    
    # Ubah mask ke one-hot sesuai NUM_CLASSES
    msk = tf.squeeze(msk, axis=-1)
    msk = tf.one_hot(msk, depth=NUM_CLASSES)
    return img, msk

In [None]:
def get_dataset(split_dir, batch_size=8, shuffle=True):
    img_dir = os.path.join(split_dir, 'images')
    mask_dir = os.path.join(split_dir, 'masks')
    img_files = set([
        f for f in os.listdir(img_dir)
        if not (f.startswith('.') or f.startswith('._')) and f.lower().endswith(('.png', '.jpg', '.jpeg'))
    ])
    mask_files = set([
        f for f in os.listdir(mask_dir)
        if not (f.startswith('.') or f.startswith('._')) and f.lower().endswith(('.png', '.jpg', '.jpeg'))
    ])
    common_files = sorted(list(img_files & mask_files))
    img_paths = [os.path.join(img_dir, f) for f in common_files]
    mask_paths = [os.path.join(mask_dir, f) for f in common_files]
    print(f"Jumlah data di {split_dir}: {len(img_paths)}")
    assert len(img_paths) == len(mask_paths), f"Jumlah gambar ({len(img_paths)}) dan mask ({len(mask_paths)}) tidak sama!"
    dataset = tf.data.Dataset.from_tensor_slices((img_paths, mask_paths))
    dataset = dataset.map(load_image_mask, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(100, seed=42)
    return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
# Load dataset
train_ds = get_dataset(f'{ROOT_DIR}/train', batch_size=4,shuffle=True)
val_ds = get_dataset(f'{ROOT_DIR}/val', batch_size=4, shuffle=False)
test_ds = get_dataset(f'{ROOT_DIR}/test', batch_size=4, shuffle=False)

# Arsitektur

## U-Net

In [None]:
def conv_block(input_tensor, num_filters):
    """Blok konvolusi ganda yang menjadi dasar U-Net."""
    # Lapisan konvolusi pertama
    x = Conv2D(num_filters, (3, 3), padding="same")(input_tensor)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    # Lapisan konvolusi kedua
    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def unet(input_size=(256, 256, 3), num_classes=NUM_CLASSES):
    inputs = Input(input_size)  

    # --- ENCODER (JALUR KONTRAKSI) ---
    # Blok 1
    s1 = conv_block(inputs, 64)
    p1 = layers.MaxPooling2D((2, 2))(s1)
    
    # Blok 2
    s2 = conv_block(p1, 128)
    p2 = layers.MaxPooling2D((2, 2))(s2)

    # Blok 3
    s3 = conv_block(p2, 256)
    p3 = layers.MaxPooling2D((2, 2))(s3)

    # Blok 4
    s4 = conv_block(p3, 512)
    p4 = layers.MaxPooling2D((2, 2))(s4)

    # --- BOTTLENECK ---
    b1 = conv_block(p4, 1024)

    # --- DECODER (JALUR EKSPANSI) ---
    # Blok 6
    u6 = layers.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding="same")(b1)
    # Menggabungkan dengan skip connection dari Blok 4 Encoder
    c6 = layers.concatenate([u6, s4])
    s6 = conv_block(c6, 512)
    
    # Blok 7
    u7 = layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding="same")(s6)
    # Menggabungkan dengan skip connection dari Blok 3 Encoder
    c7 = layers.concatenate([u7, s3])
    s7 = conv_block(c7, 256)

    # Blok 8
    u8 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding="same")(s7)
    # Menggabungkan dengan skip connection dari Blok 2 Encoder
    c8 = layers.concatenate([u8, s2])
    s8 = conv_block(c8, 128)
    
    # Blok 9
    u9 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding="same")(s8)
    # Menggabungkan dengan skip connection dari Blok 1 Encoder
    c9 = layers.concatenate([u9, s1])
    s9 = conv_block(c9, 64)

    conv10 = Conv2D(num_classes, (1, 1), activation='softmax')(s9)

    return Model(inputs=[inputs], outputs=[conv10], name='UNet')

## U-Net 2

In [None]:
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPooling2D, Dropout, Conv2DTranspose, concatenate
from tensorflow.keras.models import Model

In [None]:
def conv2d_block(input_tensor, n_filters, kernel_size):
    # first layer
    x = Conv2D(n_filters, (kernel_size, kernel_size), padding = 'same', kernel_initializer = 'he_normal')(input_tensor)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # second layer
    x = Conv2D(n_filters, kernel_size, padding = 'same', kernel_initializer = 'he_normal')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    return x

In [None]:
def unet_2(n_filters=16, dropout_prob=0.5, kernel_size=3, num_classes=NUM_CLASSES):
    input_img = Input((256, 256, 3))

    # contracting path
    c1 = conv2d_block(input_img, n_filters, kernel_size)
    p1 = MaxPooling2D(pool_size = (2, 2))(c1)
    p1 = Dropout(0.5 * dropout_prob)(p1)

    c2 = conv2d_block(p1, 2 * n_filters, kernel_size)
    p2 = MaxPooling2D(pool_size = (2, 2))(c2)
    p2 = Dropout(dropout_prob)(p2)

    c3 = conv2d_block(p2, 4 * n_filters, kernel_size)
    p3 = MaxPooling2D(pool_size = (2, 2))(c3)
    p3 = Dropout(dropout_prob)(p3)

    c4 = conv2d_block(p3, 8 * n_filters, kernel_size)
    p4 = MaxPooling2D(pool_size = (2, 2))(c4)
    p4 = Dropout(dropout_prob)(p4)

    c5 = conv2d_block(p4, 16 * n_filters, kernel_size)

    # Expansive path
    u6 = Conv2DTranspose(8 * n_filters, (kernel_size, kernel_size), padding = 'same', strides = (2, 2))(c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout_prob)(u6)
    c6 = conv2d_block(u6, 8 * n_filters, kernel_size)

    u7 = Conv2DTranspose(4 * n_filters, (kernel_size, kernel_size), padding = 'same', strides = (2, 2))(c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout_prob)(u7)
    c7 = conv2d_block(u7, 4 * n_filters, kernel_size)

    u8 = Conv2DTranspose(2 * n_filters, (kernel_size, kernel_size), padding = 'same', strides = (2, 2))(c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout_prob)(u8)
    c8 = conv2d_block(u8, 2 * n_filters, kernel_size)

    u9 = Conv2DTranspose(n_filters, (kernel_size, kernel_size), padding = 'same', strides = (2, 2))(c8)
    u9 = concatenate([u9, c1])
    u9 = Dropout(dropout_prob)(u9)
    c9 = conv2d_block(u9, n_filters, kernel_size)

    # Lapisan output yang dimodifikasi untuk 4 kelas
    outputs = Conv2D(num_classes, 1, activation = 'softmax')(c9)

    model = Model(inputs=[input_img], outputs=[outputs], name="UNet2")

    return model

## Recurrent U-Net

In [None]:
def recurrent_conv_block(inputs, filters, kernel_size=(3, 3), t=2):
    """
    Recurrent Convolution Block: applies recurrent convolution t times.
    """
    x = layers.Conv2D(filters, kernel_size, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    for i in range(t):
        x = layers.Conv2D(filters, kernel_size, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation('relu')(x)

    return x

In [None]:
def rec_unet(input_size=(256, 256, 3), t=2, num_classes=NUM_CLASSES):
    inputs = Input(input_size)

    # Encoder
    conv1 = recurrent_conv_block(inputs, 32, t=t)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = recurrent_conv_block(pool1, 64, t=t)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = recurrent_conv_block(pool2, 128, t=t)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = recurrent_conv_block(pool3, 256, t=t)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    # Middle
    conv5 = recurrent_conv_block(pool4, 512, t=t)

    # Decoder
    up6 = layers.concatenate([layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5), conv4], axis=3)
    conv6 = recurrent_conv_block(up6, 256, t=t)

    up7 = layers.concatenate([layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=3)
    conv7 = recurrent_conv_block(up7, 128, t=t)

    up8 = layers.concatenate([layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=3)
    conv8 = recurrent_conv_block(up8, 64, t=t)

    up9 = layers.concatenate([layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=3)
    conv9 = recurrent_conv_block(up9, 32, t=t)

    conv10 = Conv2D(num_classes, (1, 1), activation='softmax')(conv9)

    return Model(inputs=[inputs], outputs=[conv10], name='rec_unet')

## ResNet50

In [None]:
def conv_block(x, filters):
    x = Conv2D(filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    return x

def resnet50_unet(input_size=(256, 256, 3), num_classes=NUM_CLASSES):
    inputs = Input(input_size)

    # Load ResNet50 encoder backbone
    base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)

    # Ambil beberapa layer penting untuk skip connection
    skip1 = base_model.get_layer("conv1_relu").output       # 128x128
    skip2 = base_model.get_layer("conv2_block3_out").output # 64x64
    skip3 = base_model.get_layer("conv3_block4_out").output # 32x32
    skip4 = base_model.get_layer("conv4_block6_out").output # 16x16
    bridge = base_model.get_layer("conv5_block3_out").output # 8x8

    # Decoder
    up1 = layers.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(bridge)
    up1 = layers.concatenate([up1, skip4])
    up1 = conv_block(up1, 512)

    up2 = layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(up1)
    up2 = layers.concatenate([up2, skip3])
    up2 = conv_block(up2, 256)

    up3 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(up2)
    up3 = layers.concatenate([up3, skip2])
    up3 = conv_block(up3, 128)

    up4 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(up3)
    up4 = layers.concatenate([up4, skip1])
    up4 = conv_block(up4, 64)

    up5 = layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(up4)
    up5 = conv_block(up5, 32)

    outputs = Conv2D(num_classes, (1, 1), activation='softmax')(up5)

    return Model(inputs=inputs, outputs=outputs, name='ResNet50_UNet')

## VGG16-Unet

In [None]:
def conv_block(inputs,filterCount):
    x = Conv2D(filterCount,3,padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    x = Conv2D(filterCount,3,padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    return x

def decoder_block(inputs,skip_features,filter_count):
    
    x = layers.Conv2DTranspose(filter_count, (2, 2), strides=2, padding="same")(inputs)
    x = layers.Concatenate()([x, skip_features])
    x = conv_block(x, filter_count)
    return x

In [None]:
def unet_vgg16(input_size=(256, 256, 3), num_classes=NUM_CLASSES):
    inputs = Input(input_size)
    print(inputs.shape)
    vgg16 = VGG16(include_top=False,weights='imagenet',input_tensor = inputs)
    #vgg16.summary()
    # the encoder 
    skip1 = vgg16.get_layer("block1_conv2").output
    print(skip1.shape)
    skip2 = vgg16.get_layer("block2_conv2").output
    skip3 = vgg16.get_layer("block3_conv3").output
    skip4 = vgg16.get_layer("block4_conv3").output
    # the center
    center = vgg16.get_layer("block5_conv3").output
    
    # the decoder 
    
    d1 = decoder_block(center,skip4,512)
    d2 = decoder_block(d1,skip3,256)
    d3 = decoder_block(d2,skip2,128)
    d4 = decoder_block(d3,skip1,64)
    #output
    #conv1 = Conv2D(32,3,padding="same")(d4)
    #conv2 = Conv2D(16,3,padding="same")(conv1)
    outputs = Conv2D(num_classes, 1, padding="same", activation="softmax")(d4)
    model = Model(inputs, outputs, name="VGG16_U-Net")
    return model

## Transformer Recurrent U-net

In [None]:
from tensorflow.keras.layers import (
    Input, Conv2D, MaxPooling2D, Conv2DTranspose, concatenate,
    BatchNormalization, Activation, Add, LayerNormalization,
    Dense, MultiHeadAttention, Reshape
)

from tensorflow.keras.models import Model

In [None]:
def recurrent_block(x, filters, t=2):
    for i in range(t):
        if i == 0:
            x1 = Conv2D(filters, kernel_size=3, padding='same', activation='relu')(x)
        else:
            x1 = Conv2D(filters, kernel_size=3, padding='same', activation='relu')(Add()([x, x1]))
            x1 = BatchNormalization()(x1)
    return x1

In [None]:
def rcnn_block(x, filters, t=2):
    x1 = recurrent_block(x, filters, t)
    x2 = recurrent_block(x1, filters, t)
    out = Add()([x, x2])
    return out

In [None]:
def transformer_block(x, num_heads=4, ff_dim=512):
    B, H, W, C = x.shape
    x_flat = Reshape((H * W, C))(x)

    # LayerNorm + MHA
    attn_input = LayerNormalization(epsilon=1e-6)(x_flat)
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=C // num_heads)(attn_input, attn_input)
    x = x_flat + attn_output  # Residual connection

    # FFN
    ffn_input = LayerNormalization(epsilon=1e-6)(x)
    ffn_output = Dense(ff_dim, activation='relu')(ffn_input)
    ffn_output = Dense(C)(ffn_output)
    x = x + ffn_output  # Residual connection

    return Reshape((H, W, C))(x)


In [None]:
def transunet_with_rcnn(input_size=(256, 256, 3), num_classes=NUM_CLASSES):
    inputs = Input(input_size)
    
    # Encoder
    conv1 = rcnn_block(inputs, 32)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = rcnn_block(pool1, 64)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = rcnn_block(pool2, 128)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = rcnn_block(pool3, 256)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    # Bottleneck with Transformer
    conv5 = rcnn_block(pool4, 512)
    trans = transformer_block(conv5)

    # Decoder
    up6 = concatenate([Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(trans), conv4], axis=-1)
    conv6 = rcnn_block(up6, 256)

    up7 = concatenate([Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=-1)
    conv7 = rcnn_block(up7, 128)

    up8 = concatenate([Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=-1)
    conv8 = rcnn_block(up8, 64)

    up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=-1)
    conv9 = rcnn_block(up9, 32)

    outputs = Conv2D(num_classes, (1, 1), activation='softmax')(conv9)

    return Model(inputs=inputs, outputs=outputs, name="Transunet_RCL")


# Metrix dan Callback

## Callback

In [None]:
earlystopping = EarlyStopping(monitor='val_dsc', patience=10, mode='max',
                              verbose=1, restore_best_weights=True, min_delta=1e-6)
reduce_lr = ReduceLROnPlateau(monitor='val_dsc', factor=0.5, patience=3, mode='max', verbose=2)
# Callback untuk U-Net
unet_checkpoint = ModelCheckpoint(
    filepath='model/unet.keras',
    monitor='val_dsc',
    mode='max',
    save_best_only=True,
    verbose=1
)

# Callback untuk Recurrent U-Net
r_unet_checkpoint = ModelCheckpoint(
    filepath='model/recurrent_unet.keras',
    monitor='val_dsc',
    mode='max',
    save_best_only=True,
    verbose=1
)

resnet50_unet_checkpoint = ModelCheckpoint(
    filepath='model/resnet50_unet.keras',
    monitor='val_dsc',
    mode='max',
    save_best_only=True,
    verbose=1
)

class PrintValDsc(Callback):
    def on_epoch_end(self, epoch, logs=None):
        val_dsc = logs.get('val_dsc')
        print(f"Epoch {epoch+1}: val_dsc = {val_dsc} ({type(val_dsc)})")
        if val_dsc is None:
            print("⚠️ WARNING: val_dsc is None!")
        elif isinstance(val_dsc, float) and (val_dsc != val_dsc):  # NaN check
            print("⚠️ WARNING: val_dsc is NaN!")

## Metrix

In [None]:
EPOCHS = 100
IMG_SIZE = (256, 256)  # atau sesuai ukuran kamu

### Dice coef

In [None]:
def dsc(y_true, y_pred, smooth=1e-6):
    """
    Multi-class Dice Coefficient.
    y_true, y_pred: shape (batch, H, W, C), one-hot encoded.
    """
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    
    intersection = tf.reduce_sum(y_true * y_pred, axis=[0, 1, 2])
    union = tf.reduce_sum(y_true + y_pred, axis=[0, 1, 2])
    
    dice = (2. * intersection + smooth) / (union + smooth)
    return tf.reduce_mean(dice)

# Dice Coefficient metric
def dice_coef(y_true, y_pred):
    smooth = 1
    y_true = tf.keras.backend.flatten(y_true)
    y_pred = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true * y_pred)
    return (2. * (intersection + smooth)) / (tf.keras.backend.sum(y_true) + tf.keras.backend.sum(y_pred) + smooth)

# def dice_coef_loss(y_true, y_pred):
#     return (1-dice_coef(y_true, y_pred))

### Dice loss

In [None]:
def dice_loss(y_true, y_pred):
    loss = 1 - dsc(y_true, y_pred)
    return loss

### IOU Score

In [None]:
def iou_score(y_true, y_pred, smooth=1e-7):
    y_true_f = tf.reshape(y_true, [-1, tf.shape(y_true)[-1]])
    y_pred_f = tf.reshape(y_pred, [-1, tf.shape(y_pred)[-1]])

    intersection = tf.reduce_sum(y_true_f * y_pred_f, axis=0)
    union = tf.reduce_sum(y_true_f + y_pred_f, axis=0) - intersection

    iou = (intersection + smooth) / (union + smooth)
    return tf.reduce_mean(iou)


### Categorical focal loss

#### Alpha calculate

In [None]:
def calculate_class_distribution(mask_dir, num_classes):
    class_counts = np.zeros(num_classes, dtype=np.int64)

    mask_files = [
        f for f in os.listdir(mask_dir)
        if not f.startswith('.') and f.lower().endswith(('.png', '.jpg', '.jpeg'))
    ]

    for fname in tqdm(mask_files, desc="Menghitung distribusi kelas"):
        path = os.path.join(mask_dir, fname)

        # Load mask
        mask = tf.io.read_file(path)
        mask = tf.image.decode_png(mask, channels=1)
        mask = tf.image.resize(mask, IMG_SIZE, method='nearest')
        mask = tf.cast(mask, tf.int32).numpy().squeeze()

        # Hitung pixel untuk tiap class
        for i in range(num_classes):
            class_counts[i] += np.sum(mask == i)

    return class_counts

def compute_alpha_weights(class_counts):
    total = np.sum(class_counts)
    class_frequencies = class_counts / total
    alpha = 1.0 / (class_frequencies + 1e-6)  # Hindari divide by zero
    alpha /= np.max(alpha)  # Normalisasi agar max=1
    return alpha.tolist()


In [None]:
mask_dir = f"{ROOT_DIR}/train/masks"

counts = calculate_class_distribution(mask_dir, NUM_CLASSES)
alpha = compute_alpha_weights(counts)

print("Distribusi kelas:", counts)
print("Alpha untuk focal loss:", alpha)

#### Focal loss

In [None]:
def categorical_focal_loss(gamma=2.0, alpha=0.25):
    """
    Focal Loss for multi-class semantic segmentation.
    gamma: focusing parameter (default 2.0)
    alpha: balance parameter (can be scalar or list per class)
    """
    def loss(y_true, y_pred):
        # Clip predictions to prevent log(0)
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0 - 1e-7)

        # Calculate cross-entropy
        cross_entropy = -y_true * tf.math.log(y_pred)

        # Calculate focal loss components
        weight = tf.pow(1 - y_pred, gamma)

        if isinstance(alpha, (list, tuple)):
            alpha_tensor = tf.constant(alpha, dtype=tf.float32)
            alpha_factor = y_true * alpha_tensor
        else:
            alpha_factor = y_true * alpha

        focal_loss = alpha_factor * weight * cross_entropy

        # Sum over classes, mean over batch
        return tf.reduce_mean(tf.reduce_sum(focal_loss, axis=-1))
    
    return loss

In [None]:
loss_fn = categorical_focal_loss(gamma=2.0, alpha=[0.1, 0.3, 0.3, 0.3, 0.3])

### Jaccard Disctance Loss

In [None]:
def jaccard_distance_loss(y_true, y_pred,smooth = 100):
    intersection = tf.keras.backend.sum(tf.keras.backend.abs(y_true * y_pred), axis=-1)
    sum_ = tf.keras.backend.sum(tf.keras.backend.abs(y_true) + tf.keras.backend.abs(y_pred), axis=-1)
    jac = (intersection + smooth) / (sum_ - intersection + smooth)
    return (1 - jac) * smooth

# Compile model

In [None]:
model_unet = unet()
model_unet.compile(
    optimizer=Adam(learning_rate=1e-4),
    # loss=categorical_crossentropy,
    # loss=bce_dice_loss,
    loss=loss_fn,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        dsc,
        tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES, name='miou')
    ]
)
    
model_rec_unet = rec_unet()
model_rec_unet.compile(
    optimizer=Adam(learning_rate=1e-4),
    # loss=categorical_crossentropy,
    # loss=bce_dice_loss,
    loss=loss_fn,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        dsc,
        tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES, name='miou')
    ]
)

In [None]:
model_resnet50_unet = resnet50_unet()
model_resnet50_unet.compile(
    optimizer=Adam(learning_rate=1e-4),
    # loss=categorical_crossentropy,
    # loss=bce_dice_loss,
    # loss=loss_fn,
    loss=jaccard_distance_loss,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        dsc,
        tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES, name='miou')
    ]
)

In [None]:
model_unet_2 = unet_2()
model_unet_2.compile(
    optimizer=Adam(learning_rate=1e-4),
    # optimizer=Adam(learning_rate=0.0055),
    # loss=categorical_crossentropy,
    # loss=bce_dice_loss,
    loss=loss_fn,
    # loss=jaccard_distance_loss,
    # loss=dice_loss,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        dsc,
        tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES, name='miou')
    ]
)

In [None]:
model_unet_vgg16 = unet_vgg16()
model_unet_vgg16.compile(
    optimizer=Adam(learning_rate=1e-4),
    # loss=categorical_crossentropy,
    # loss=bce_dice_loss,
    loss=loss_fn,
    # loss=jaccard_distance_loss,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        dsc,
        tf.keras.metrics.MeanIoU(num_classes=NUM_CLASSES, name='miou')
    ]
)

# Training

In [None]:
CLASS_NAMES = ['Background', 'ablation', 'breakdown', 'fracture', 'groove']

In [None]:
class VisualizePredictionCallback(tf.keras.callbacks.Callback):
    def __init__(self, dataset, class_names=CLASS_NAMES, num_samples=3):
        super().__init__()
        self.dataset = dataset
        self.class_names = CLASS_NAMES
        self.num_samples = num_samples

    def on_epoch_end(self, epoch, logs=None):
        
        # Ambil satu batch dari dataset
        for imgs, masks in self.dataset.take(1):
            preds = self.model.predict(imgs)
            plt.figure(figsize=(self.num_samples * 4, 4))
            for i in range(self.num_samples):
                # Input image
                plt.subplot(3, self.num_samples, i+1)
                plt.imshow(imgs[i].numpy())
                plt.axis('off')
                plt.title('Input')
                # Ground truth mask
                plt.subplot(3, self.num_samples, self.num_samples + i + 1)
                plt.imshow(np.argmax(masks[i].numpy(), axis=-1), cmap='tab20')
                plt.axis('off')
                plt.title('Ground Truth')
                # Predicted mask
                plt.subplot(3, self.num_samples, 2*self.num_samples + i + 1)
                plt.imshow(np.argmax(preds[i], axis=-1), cmap='tab20')
                plt.axis('off')
                plt.title('Prediction')
            plt.suptitle(f'Epoch {epoch+1}')
            plt.tight_layout()
            plt.show()
            break

In [None]:
visualize_cb = VisualizePredictionCallback(val_ds, class_names=CLASS_NAMES)

## U-Net

In [None]:
# history_unet_2 = model_unet_2.fit(train_ds, validation_data=val_ds,
#                     epochs=EPOCHS, callbacks=[earlystopping, reduce_lr, visualize_cb], verbose=1)


In [None]:
history_unet = model_unet.fit(train_ds, validation_data=val_ds,
                    epochs=EPOCHS, callbacks=[earlystopping, reduce_lr, unet_checkpoint, visualize_cb], verbose=1)


## Recurrent U-Net

In [None]:
history_rec_unet = model_rec_unet.fit(train_ds, validation_data=val_ds,
                    epochs=EPOCHS, callbacks=[earlystopping, reduce_lr, visualize_cb], verbose=1)

## Resnet 50

In [None]:
history_resnet50_unet = model_resnet50_unet.fit(train_ds, validation_data=val_ds,
                    epochs=EPOCHS, callbacks=[earlystopping, reduce_lr], verbose=1)

In [None]:
# Simpan model ResNet50 U-Net ke format Keras (.keras)
model_resnet50_unet.save("model/resnet50_unet_final.keras")

# Grafik Training

In [None]:
def plot_training_history(history, title="Training History"):
    plt.figure(figsize=(12, 6))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['categorical_accuracy'], label='Train categorical_accuracy')
    plt.plot(history.history['val_categorical_accuracy'], label='Val categorical_accuracy')
    plt.title(f"{title} - categorical_accuracy")
    plt.xlabel('Epochs')
    plt.ylabel('categorical_accuracy')
    plt.legend()

    # Plot Dice Coefficient
    plt.subplot(1, 2, 2)
    plt.plot(history.history['dsc'], label='Train dsc')
    plt.plot(history.history['val_dsc'], label='Validation dsc')
    plt.title(f"{title} - Dice Coefficient")
    plt.xlabel('Epochs')
    plt.ylabel('DSC')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
for history in [history_rec_unet]: 
    plot_training_history(history, title=history.model.name)

# Testing

In [None]:
def mean_iou_per_class(model, dataset, num_classes=NUM_CLASSES):
    """
    Menghitung Mean IoU per kelas pada seluruh dataset.
    """
    total_intersection = np.zeros(num_classes)
    total_union = np.zeros(num_classes)

    for images, masks in dataset:
        preds = model.predict(images)
        y_pred = np.argmax(preds, axis=-1).reshape(-1)
        y_true = np.argmax(masks.numpy(), axis=-1).reshape(-1)  # or masks.numpy().reshape(-1) if already in int format
        for c in range(num_classes):
            true_c = (y_true == c)
            pred_c = (y_pred == c)
            intersection = np.logical_and(true_c, pred_c).sum()
            union = np.logical_or(true_c, pred_c).sum()
            total_intersection[c] += intersection
            total_union[c] += union

    mean_iou = np.divide(
        total_intersection,
        total_union,
        out=np.zeros_like(total_intersection, dtype=np.float32),
        where=total_union != 0
    )
    return mean_iou


In [None]:
results = model_unet.evaluate(test_ds)
print("Test loss:", results[0])
print("Test categorical_accuracyt:", results[1])
print("Test dice_coef:", results[2])
print("Test miou:", results[3])

In [None]:
results = model_rec_unet.evaluate(test_ds)
print("Test loss:", results[0])
print("Test categorical_accuracyt:", results[1])
print("Test dice_coef:", results[2])
print("Test miou:", results[3])

In [None]:
results = model_resnet50_unet.evaluate(test_ds)
print("Test loss:", results[0])
print("Test categorical_accuracyt:", results[1])
print("Test dice_coef:", results[2])
print("Test miou:", results[3])
# print("Test iou_score:", results[3])

In [None]:
CLASS_MAPPING = {
    0: 'Background',
    1: 'ablation',
    2: 'breakdown',
    3: 'fracture',
    4: 'groove'
}

In [None]:
# Contoh penggunaan:
miou_per_class = mean_iou_per_class(model_unet, test_ds, num_classes=NUM_CLASSES)
for idx, name in CLASS_MAPPING.items():
    print(f"Mean IoU for class '{name}': {miou_per_class[idx]:.4f}")

In [None]:
# Contoh penggunaan:
miou_per_class = mean_iou_per_class(model_rec_unet, test_ds, num_classes=NUM_CLASSES)
for idx, name in CLASS_MAPPING.items():
    print(f"Mean IoU for class '{name}': {miou_per_class[idx]:.4f}")

In [None]:
# Contoh penggunaan:
miou_per_class = mean_iou_per_class(model_resnet50_unet, test_ds, num_classes=NUM_CLASSES)
for idx, name in CLASS_MAPPING.items():
    print(f"Mean IoU for class '{name}': {miou_per_class[idx]:.4f}")

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_model_and_plot_confusion(model, dataset, class_names):
    """
    Evaluasi model segmentasi:
    - Menampilkan test loss, accuracy, dice coef, dan mIoU
    - Menampilkan confusion matrix pixel-wise
    """

    # Hitung confusion matrix pixel-wise
    y_true_all, y_pred_all = [], []

    for images, masks in dataset:
        preds = model.predict(images)
        y_pred = np.argmax(preds, axis=-1).flatten()
        y_true = np.argmax(masks.numpy(), axis=-1).flatten()
        y_true_all.extend(y_true)
        y_pred_all.extend(y_pred)

    cm = confusion_matrix(y_true_all, y_pred_all, labels=range(len(class_names)))

    # Visualisasi confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix (Pixel-wise)')
    plt.tight_layout()
    plt.show()
    
      # Evaluasi model
    print(classification_report(
        y_true_all, y_pred_all,
        target_names=class_names,
        zero_division=0
    ))


In [None]:
class_names = ['Background', 'ablation', 'breakdown', 'fracture', 'groove']
evaluate_model_and_plot_confusion(model_rec_unet, test_ds, class_names)

# Visualisasi testing

In [None]:
def overlay_mask_on_image(image, mask, class_names, alpha=0.5):
    """
    image: numpy array (H, W, 3), range [0, 1] or [0, 255]
    mask: numpy array (H, W), berisi index kelas (0=background, dst)
    class_names: list of str
    alpha: transparansi overlay
    """
    # Pastikan image dalam range [0, 1]
    if image.max() > 1.0:
        image = image / 255.0

    # Buat colormap
    cmap = plt.get_cmap('tab20', len(class_names))
    mask_rgb = cmap(mask)[..., :3]  # Ambil RGB saja

    # Overlay: background (0) tidak diwarnai
    mask_bool = mask > 0
    overlay = image.copy()
    overlay[mask_bool] = (1 - alpha) * image[mask_bool] + alpha * mask_rgb[mask_bool]
    return overlay

# Contoh penggunaan di fungsi predict_and_visualize:
def predict_and_visualize(model, image_path, input_size=(256, 256), class_names=None):
    img = load_img(image_path, target_size=input_size)
    img_array = img_to_array(img) / 255.0
    img_input = np.expand_dims(img_array, axis=0)

    pred_mask = model.predict(img_input)[0]  # shape: (H, W, C)
    mask_argmax = np.argmax(pred_mask, axis=-1)  # shape: (H, W)

    plt.figure(figsize=(18, 6))

    # Gambar asli
    plt.subplot(1, 3, 1)
    plt.imshow(img)
    plt.title("Input Image")
    plt.axis("off")

    # Mask prediksi
    plt.subplot(1, 3, 2)
    if class_names is None:
        class_names = [f"Class {i}" for i in range(pred_mask.shape[-1])]
    cmap = plt.get_cmap('tab20', len(class_names))
    im = plt.imshow(mask_argmax, cmap=cmap, vmin=0, vmax=len(class_names)-1)
    plt.title("Predicted Mask")
    plt.axis("off")
    cbar = plt.colorbar(im, ticks=range(len(class_names)), fraction=0.046, pad=0.04)
    cbar.ax.set_yticklabels(class_names)
    plt.clim(-0.5, len(class_names)-0.5)

    # Overlay
    plt.subplot(1, 3, 3)
    overlay = overlay_mask_on_image(img_array, mask_argmax, class_names, alpha=0.5)
    plt.imshow(overlay)
    plt.title("Overlay")
    plt.axis("off")

    plt.show()

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import load_img, img_to_array

# Overlay mask ke atas gambar asli
def overlay_mask_on_image(image, mask, class_names, alpha=0.5):
    if image.max() > 1.0:
        image = image / 255.0

    cmap = plt.get_cmap('tab20', len(class_names))
    mask_rgb = cmap(mask)[..., :3]

    mask_bool = mask > 0
    overlay = image.copy()
    overlay[mask_bool] = (1 - alpha) * image[mask_bool] + alpha * mask_rgb[mask_bool]
    return overlay

# Visualisasi image + mask (true), pred mask, image + pred mask
def visualize_segmentation(image_array, pred_mask, true_mask=None, class_names=None):
    if class_names is None:
        num_classes = pred_mask.max() + 1 if true_mask is None else max(pred_mask.max(), true_mask.max()) + 1
        class_names = [f"Class {i}" for i in range(num_classes)]

    cmap = plt.get_cmap('tab20', len(class_names))
    ncols = 3 if true_mask is not None else 2
    plt.figure(figsize=(6 * ncols, 6))

    if true_mask is not None:
        plt.subplot(1, ncols, 1)
        overlay_true = overlay_mask_on_image(image_array, true_mask, class_names, alpha=0.5)
        plt.imshow(overlay_true)
        plt.title("Image + True Mask")
        plt.axis("off")
        idx = 2
    else:
        idx = 1

    plt.subplot(1, ncols, idx)
    im = plt.imshow(pred_mask, cmap=cmap, vmin=0, vmax=len(class_names)-1)
    plt.title("Predicted Mask")
    plt.axis("off")
    cbar = plt.colorbar(im, ticks=range(len(class_names)), fraction=0.046, pad=0.04)
    cbar.ax.set_yticklabels(class_names)
    plt.clim(-0.5, len(class_names)-0.5)

    plt.subplot(1, ncols, idx + 1)
    overlay_pred = overlay_mask_on_image(image_array, pred_mask, class_names, alpha=0.5)
    plt.imshow(overlay_pred)
    plt.title("Image + Predicted Mask")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

# Prediksi dan visualisasi
def predict_and_visualize(model, image_path, input_size=(256, 256), class_names=None, true_mask=None):
    img = load_img(image_path, target_size=input_size)
    img_array = img_to_array(img) / 255.0
    img_input = np.expand_dims(img_array, axis=0)

    pred_mask = model.predict(img_input)[0]  # (H, W, C)
    pred_classes = np.argmax(pred_mask, axis=-1)  # (H, W)

    visualize_segmentation(img_array, pred_classes, true_mask=true_mask, class_names=class_names)

In [None]:
def load_true_mask(mask_path, target_size):
    mask = load_img(mask_path, target_size=target_size, color_mode="grayscale")
    mask_array = img_to_array(mask).squeeze().astype(np.uint8)  # shape: (H, W)
    return mask_array

# Direktori image dan mask
image_dir = f"{ROOT_DIR}/test/images"
mask_dir = f"{ROOT_DIR}/test/masks"

for img_file in os.listdir(image_dir):
    if img_file.endswith(".jpg") or img_file.endswith(".png"):
        img_path = os.path.join(image_dir, img_file)

        # Ganti ekstensi ke .png jika mask disimpan dalam .png
        mask_file = os.path.splitext(img_file)[0] + ".png"
        mask_path = os.path.join(mask_dir, mask_file)

        # Cek apakah file mask ada
        if os.path.exists(mask_path):
            true_mask = load_true_mask(mask_path, target_size=(256, 256))
        else:
            print(f"WARNING: Mask not found for {img_file}, skipping true mask overlay.")
            true_mask = None

        print(f"Processing: {img_file}")
        predict_and_visualize(model_resnet50_unet, img_path, input_size=(256, 256),
                              class_names=CLASS_NAMES + ["Background"], true_mask=true_mask)
