<a href="https://colab.research.google.com/github/jirapat-tho/my-work/blob/main/U_net.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import torch
device = ("cuda" if torch.cuda.is_available() else 'cpu')
device

'cpu'

# Split Data

In [3]:
import os
from sklearn.model_selection import train_test_split

# กำหนดเส้นทางไปยังโฟลเดอร์ที่เก็บภาพและ label
train_img_dir = '/content/drive/MyDrive/images/train'
train_mask_dir = '/content/drive/MyDrive/masks/train'
image_files = os.listdir(train_img_dir)

# สร้าง X_train (เป็น path ของภาพ) และ y_train (เป็น path ของ mask)
X_train = [os.path.join(train_img_dir, file) for file in image_files]
y_train = [os.path.join(train_mask_dir, file) for file in image_files]

# แบ่งข้อมูลเป็น 90% Train และ 10% Validation

train, val, train_mask, val_mask = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# ตรวจสอบจำนวนไฟล์ในแต่ละชุด
print(f'Train set size: {len(train)}')
print(f'Validation set size: {len(val)}')
print(f'Mask train set size: {len(train_mask)}')
print(f'Mask validation set size: {len(val_mask)}')

Train set size: 2141
Validation set size: 238
Mask train set size: 2141
Mask validation set size: 238


In [4]:
import os

# Define paths
test_images_path = '/content/drive/MyDrive/images/test'
test_masks_path = '/content/drive/MyDrive/masks/test'

# Get list of all files in each folder with full path
test_image = {os.path.splitext(filename)[0]: os.path.join(test_images_path, filename)
               for filename in os.listdir(test_images_path) if filename.endswith(('.png'))}

test_mask = {os.path.splitext(filename)[0]: os.path.join(test_masks_path, filename)
              for filename in os.listdir(test_masks_path) if filename.endswith(('.png'))}

# Find common keys (filenames without extensions) to match images and masks
common_keys = sorted(test_image.keys() & test_mask.keys())

# Create lists of matched images and masks paths
test = [test_image[key] for key in common_keys]
test_mask = [test_mask[key] for key in common_keys]

print("Number of matched test images:", len(test_image))
print("Number of matched test masks:", len(test_mask))

Number of matched test images: 290
Number of matched test masks: 290


## ทำ Data Augmentation

In [6]:
import tensorflow as tf
import numpy as np
import cv2 # Import cv2

batch_size=16
target_size=(512, 512)

# ฟังก์ชันเพิ่ม Data Augmentation
def augment_image(image, mask):
    # Random Brightness (ค่าความสว่างปรับในช่วง [-0.2, 0.2])
    image = tf.image.random_brightness(image, max_delta=0.2)

    # Random Contrast (ปรับ contrast ในช่วง [0.8, 1.2])
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)

    # Elastic Transform หรือ Augmentation อื่นสามารถเพิ่มตรงนี้ได้ (หากต้องการ)

    return image, mask

# ฟังก์ชันโหลด + Resize
def load_and_resize(image_path, mask_path, target_size=(256, 256)): # Define load_and_resize function
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, target_size)
    image = image / 255.0  # Normalize

    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, target_size)
    mask = mask / 255.0  # Normalize

    return image, mask

# ฟังก์ชันโหลด + Resize + Augment
def load_resize_augment(image_path, mask_path, target_size=(256, 256)):
    # Load and resize (ตามโค้ดเดิม)
    image, mask = load_and_resize(image_path, mask_path, target_size)

    # เพิ่ม Data Augmentation
    image, mask = augment_image(image, mask)

    return image, mask

# ฟังก์ชันสร้าง Dataset จาก path ของ images และ masks พร้อม Augmentation
def create_dataset_with_augmentation(image_paths, mask_paths, batch_size=16, target_size=(256, 256), is_training=True):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, mask_paths))

    # ใช้ Augmentation เฉพาะตอน training
    if is_training:
        dataset = dataset.map(lambda x, y: load_resize_augment(x, y, target_size), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    else:
        dataset = dataset.map(lambda x, y: load_and_resize(x, y, target_size), num_parallel_calls=tf.data.experimental.AUTOTUNE)

    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return dataset

# สร้าง dataset สำหรับ train และ validation
train_dataset = create_dataset_with_augmentation(train, train_mask, batch_size=batch_size, target_size=target_size, is_training=True)
val_dataset = create_dataset_with_augmentation(val, val_mask, batch_size=batch_size, target_size=target_size, is_training=False)

In [7]:
#ดีสุดตอนนี้
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Conv2DTranspose, concatenate
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Dropout, BatchNormalization, concatenate, Input, Add, Activation, Multiply, AveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import UpSampling2D

# Attention Gate
def attention_gate(skip_connection, gating_signal, filters):
    gating = Conv2D(filters, kernel_size=1, padding='same')(gating_signal)
    gating = BatchNormalization()(gating)

    skip = Conv2D(filters, kernel_size=1, padding='same')(skip_connection)
    skip = BatchNormalization()(skip)

    attention = Add()([gating, skip])
    attention = Activation('relu')(attention)
    attention = Conv2D(1, kernel_size=1, padding='same', activation='sigmoid')(attention)
    attention = Multiply()([skip_connection, attention])

    return attention

# Residual Block
def residual_block(input_tensor, filters):
    x = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal")(input_tensor)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Dropout(0.3)(x)  # เพิ่ม Dropout
    x = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal")(x)
    x = BatchNormalization()(x)

    shortcut = Conv2D(filters, 1, padding="same")(input_tensor)  # Match dimensions
    x = Add()([x, shortcut])
    x = Activation("relu")(x)
    return x

# ASPP Block
def aspp_block(input_tensor, filters):
    pool = AveragePooling2D(pool_size=(2, 2))(input_tensor)
    pool = Conv2D(filters, kernel_size=1, padding="same", activation="relu")(pool)
    pool = UpSampling2D(size=(2, 2), interpolation="bilinear")(pool)

    conv1 = Conv2D(filters, kernel_size=1, padding="same", activation="relu")(input_tensor)
    conv3 = Conv2D(filters, kernel_size=3, padding="same", dilation_rate=3, activation="relu")(input_tensor)
    conv6 = Conv2D(filters, kernel_size=3, padding="same", dilation_rate=6, activation="relu")(input_tensor)

    concat = concatenate([pool, conv1, conv3, conv6])
    return concat

# Improved U-Net with ResNet50 and enhancements
def improved_unet_with_enhancements(input_size=(512, 512, 3), l2_value=1e-5, dropout_rate=0.2):
    resnet = ResNet50(weights='imagenet', include_top=False, input_shape=input_size)

    # Encoder (ResNet layers)
    encoder_outputs = [
        resnet.input,
        resnet.get_layer("conv1_relu").output,         # 112x112
        resnet.get_layer("conv2_block3_out").output,   # 56x56
        resnet.get_layer("conv3_block4_out").output,   # 28x28
        resnet.get_layer("conv4_block6_out").output,   # 14x14
        resnet.get_layer("conv5_block3_out").output    # 7x7
    ]

    # Bottleneck with ASPP and Residual Block
    center = aspp_block(encoder_outputs[-1], filters=512)
    center = residual_block(center, filters=512)

    # Decoder with Dropout, L2 Regularization, and BatchNormalization
    dec4 = Conv2DTranspose(256, 2, strides=(2, 2), padding='same', kernel_regularizer=l2(l2_value))(center)
    dec4 = BatchNormalization()(dec4)
    dec4 = Dropout(dropout_rate)(dec4)
    dec4 = attention_gate(dec4, encoder_outputs[4], filters=256)
    dec4 = concatenate([dec4, encoder_outputs[4]])
    dec4 = residual_block(dec4, filters=256)

    dec3 = Conv2DTranspose(128, 2, strides=(2, 2), padding='same', kernel_regularizer=l2(l2_value))(dec4)
    dec3 = BatchNormalization()(dec3)
    dec3 = Dropout(dropout_rate)(dec3)
    dec3 = attention_gate(dec3, encoder_outputs[3], filters=128)
    dec3 = concatenate([dec3, encoder_outputs[3]])
    dec3 = residual_block(dec3, filters=128)

    dec2 = Conv2DTranspose(64, 2, strides=(2, 2), padding='same', kernel_regularizer=l2(l2_value))(dec3)
    dec2 = BatchNormalization()(dec2)
    dec2 = Dropout(dropout_rate)(dec2)
    dec2 = attention_gate(dec2, encoder_outputs[2], filters=64)
    dec2 = concatenate([dec2, encoder_outputs[2]])
    dec2 = residual_block(dec2, filters=64)

    dec1 = Conv2DTranspose(32, 2, strides=(2, 2), padding='same', kernel_regularizer=l2(l2_value))(dec2)
    dec1 = BatchNormalization()(dec1)
    dec1 = Dropout(dropout_rate)(dec1)
    dec1 = attention_gate(dec1, encoder_outputs[1], filters=32)
    dec1 = concatenate([dec1, encoder_outputs[1]])
    dec1 = residual_block(dec1, filters=32)

    # Final up-sampling and output layer
    outputs = Conv2DTranspose(16, 2, strides=(2, 2), padding='same', kernel_regularizer=l2(l2_value))(dec1)
    outputs = BatchNormalization()(outputs)
    outputs = Conv2D(1, 1, activation='sigmoid', kernel_regularizer=l2(l2_value))(outputs)

    model = Model(inputs=resnet.input, outputs=outputs)
    return model

# สร้างโมเดล
model = improved_unet_with_enhancements()

In [8]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.optimizers import SGD

optimizer = AdamW(learning_rate= 1e-4, weight_decay=2e-4)

In [9]:
import tensorflow as tf
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras.backend import epsilon
import tensorflow.keras.backend as K

@register_keras_serializable()
class IoU(tf.keras.metrics.Metric):
    def __init__(self, name="iou", dtype=None, **kwargs):
        super().__init__(name=name, dtype=dtype, **kwargs)
        self.intersection = self.add_weight(name="intersection", initializer="zeros", dtype=tf.float32)
        self.union = self.add_weight(name="union", initializer="zeros", dtype=tf.float32)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, dtype=tf.float32)
        y_pred = tf.cast(y_pred > 0.5, dtype=tf.float32)

        intersection = tf.reduce_sum(y_true * y_pred)
        union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection

        self.intersection.assign_add(intersection)
        self.union.assign_add(union)

    def result(self):
        return self.intersection / (self.union + epsilon())

    def reset_state(self):
        self.intersection.assign(0.0)
        self.union.assign(0.0)

@register_keras_serializable()
def dice_coef(y_true, y_pred):
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + epsilon()) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + epsilon())

@register_keras_serializable(package="CustomLoss")
def dice_loss(y_true, y_pred):
    smooth = 1e-6
    intersection = K.sum(y_true * y_pred)
    union = K.sum(y_true) + K.sum(y_pred)
    dice = (2. * intersection + smooth) / (union + smooth)
    return 1 - dice

In [10]:
# Compile the model
model.compile(optimizer=optimizer, loss=dice_loss,metrics=["accuracy", IoU(), dice_coef]) #'binary_crossentropy'

In [11]:
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.callbacks import EarlyStopping

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.4, patience=7, min_lr=1e-6)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True,verbose=1)

In [12]:
from tensorflow.keras.callbacks import ModelCheckpoint

# สร้าง ModelCheckpoint

checkpoint = ModelCheckpoint(
    'best_model.keras',
    monitor='val_iou',
    save_best_only=True,
    mode='max',   # ใช้ 'max' เมื่อค่าสูงกว่าถือว่าดี
    verbose=1
)
# เพิ่มเข้า callback list
callbacks = [reduce_lr, early_stopping, checkpoint]

#Train Model

In [None]:
history = model.fit(train_dataset, validation_data=val_dataset, epochs=5, callbacks=callbacks)

Epoch 1/5


#Test Model

In [None]:
import cv2
import numpy as np
target_size=(512, 512)
# ฟังก์ชันโหลดภาพ
def load_images(image_paths, target_size=target_size):
    images = []
    for path in image_paths:
        img = cv2.imread(path, cv2.IMREAD_COLOR)  # โหลดภาพสี (RGB)
        img = cv2.resize(img, target_size)       # Resize เป็น 512x512
        img = img / 255.0  # Normalize ค่า pixel ให้เป็น 0-1
        images.append(img)
    return np.array(images)

# ฟังก์ชันโหลดมาสก์
def load_masks(mask_paths, target_size=target_size):
    masks = []
    for path in mask_paths:
        mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  # โหลดมาสก์แบบ Grayscale
        mask = cv2.resize(mask, target_size)          # Resize เป็น 512x512
        mask = mask / 255.0
        mask = np.expand_dims(mask, axis=-1)          # เพิ่มช่องให้เป็น (H, W, 1)
        masks.append(mask)
    return np.array(masks)

# โหลด test images และ masks
test_images = load_images(test)
test_masks = load_masks(test_mask)

print("Test Images Shape:", test_images.shape)
print("Test Masks Shape:", test_masks.shape)

In [None]:
from tensorflow.keras.models import load_model

model = load_model("best_model.keras", custom_objects={"IoU": IoU, "dice_coef": dice_coef})

In [None]:
from tensorflow.keras.models import load_model

model = load_model("/content/best_model.keras", custom_objects={"IoU": IoU, "dice_coef": dice_coef})
predicted_masks = model.predict(test_images)
predicted_masks = (predicted_masks > 0.5).astype(np.uint8)

In [None]:
# ประเมินผลโมเดล
from tensorflow.keras.metrics import MeanIoU

# สร้างตัววัด Mean IoU
mean_iou = MeanIoU(num_classes=2)
results = model.evaluate(test_images, test_masks, batch_size=1)
print(f"Test Loss: {results[0]:.4f}")
print(f"Test Accuracy: {results[1]*100:.2f}%")
#print(f"Test Accuracy: {results[1]:.4f}")
print(f"Test IoU: {results[2]:.4f}")
print(f"Test Dice Coefficient: {results[3]:.4f}")
print("Mean IoU:", results[1])