## Data Split

In [6]:
import os
import random
import shutil

img_path = "Your Images Path"
mask_path = "Your masks Path"


images = sorted(os.listdir(img_path))
masks = sorted(os.listdir(mask_path))

assert len(images) == len(masks), "The number of images and masks should be equal."

combined = list(zip(images, masks))
random.shuffle(combined)
images[:], masks[:] = zip(*combined)

total_images = len(images)
train_end = int(total_images * 0.8)
val_end = train_end + int(total_images * 0.1)

train_images = images[:train_end]
train_masks = masks[:train_end]

val_images = images[train_end:val_end]
val_masks = masks[train_end:val_end]

test_images = images[val_end:]
test_masks = masks[val_end:]

def copy_files(file_list, source_dir, target_dir):
    os.makedirs(target_dir, exist_ok=True)
    for file in file_list:
        shutil.copy(os.path.join(source_dir, file), os.path.join(target_dir, file))


train_img_dir = "Your path/train/img/img"
train_mask_dir = "Your path/train/mask/img"
val_img_dir = "Your path/val/img/img"
val_mask_dir = "Your path/val/mask/img"
test_img_dir = "Your path/test/img/img"
test_mask_dir = "Your path/test/mask/img"

# Copy files to target directories
copy_files(train_images, img_path, train_img_dir)
copy_files(train_masks, mask_path, train_mask_dir)

copy_files(val_images, img_path, val_img_dir)
copy_files(val_masks, mask_path, val_mask_dir)

copy_files(test_images, img_path, test_img_dir)
copy_files(test_masks, mask_path, test_mask_dir)

print("Files have been split and copied successfully.")

Files have been split and copied successfully.


# Start Preprocessing

In [76]:
import os
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator 
from tensorflow import keras

In [78]:
# Define constants
SEED = 909
BATCH_SIZE_TRAIN = 32
BATCH_SIZE_TEST = 32

IMAGE_HEIGHT = 512
IMAGE_WIDTH = 512
IMG_SIZE = (IMAGE_HEIGHT, IMAGE_WIDTH)

data_dir = 'Processed data directory with tran|val|test folders'
data_dir_train = os.path.join(data_dir, 'train')
data_dir_train_image = os.path.join(data_dir_train, 'img')
data_dir_train_mask = os.path.join(data_dir_train, 'mask')

data_dir_test = os.path.join(data_dir, 'val')
data_dir_test_image = os.path.join(data_dir_test, 'img')
data_dir_test_mask = os.path.join(data_dir_test, 'mask')

## Segmentation data generator with Augmentation

In [80]:
import tensorflow as tf
import numpy as np
import random
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from scipy.ndimage import gaussian_filter, map_coordinates

IMG_SIZE = (512, 512) 
SEED = 42 

def elastic_transform(image, mask, alpha, sigma):
    """Applies elastic deformation to an image and mask."""
    random_state = np.random.RandomState(None)

    shape = image.shape[:2]
    dx = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma) * alpha
    dy = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma) * alpha

    x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
    indices = np.reshape(y + dy, (-1, 1)), np.reshape(x + dx, (-1, 1))

    distorted_image = map_coordinates(image, indices, order=1, mode='reflect').reshape(shape)
    distorted_mask = map_coordinates(mask, indices, order=1, mode='reflect').reshape(shape)

    return distorted_image, distorted_mask

def apply_augmentation(img, msk):
    """Applies data augmentation including elastic_transform, flips."""
    if random.random() > 0.5:
        img = tf.image.flip_left_right(img)
        msk = tf.image.flip_left_right(msk)
    if random.random() > 0.5:
        img = tf.image.flip_up_down(img)
        msk = tf.image.flip_up_down(msk)

    return img, msk

def preprocess_data(img, msk):
    """Rescale and apply augmentation."""
    img = tf.image.resize(img, IMG_SIZE)
    msk = tf.image.resize(msk, IMG_SIZE)
    
    img, msk = apply_augmentation(img, msk)

    img = img / 255.0
    msk = msk / 255.0

    return img, msk

def create_segmentation_generator(img_path, msk_path, BATCH_SIZE):
    datagen = ImageDataGenerator(rescale=1./255)
    
    img_generator = datagen.flow_from_directory(
        img_path,
        target_size=IMG_SIZE,
        class_mode=None,
        color_mode='grayscale',
        batch_size=BATCH_SIZE,
        seed=SEED
    )
    msk_generator = datagen.flow_from_directory(
        msk_path,
        target_size=IMG_SIZE,
        class_mode=None,
        color_mode='grayscale',
        batch_size=BATCH_SIZE,
        seed=SEED
    )
    while True:
        imgs = next(img_generator)
        msks = next(msk_generator)
        yield imgs, msks


In [56]:
 def create_segmentation_generator_test(img_path, msk_path, BATCH_SIZE):
    data_gen_args = dict(rescale=1./255)
    datagen = ImageDataGenerator(**data_gen_args)
    
    img_generator = datagen.flow_from_directory(
        img_path, 
        target_size=IMG_SIZE, 
        class_mode=None, 
        color_mode='grayscale', 
        batch_size=BATCH_SIZE, 
        seed=SEED
    )
    msk_generator = datagen.flow_from_directory(
        msk_path, 
        target_size=IMG_SIZE, 
        class_mode=None, 
        color_mode='grayscale', 
        batch_size=BATCH_SIZE, 
        seed=SEED
    )
    while True:
        imgs = next(img_generator)
        msks = next(msk_generator)
        yield imgs, msks

def create_segmentation_generator_train(img_path, msk_path, BATCH_SIZE):
    data_gen_args = dict(rescale=1./255)
    datagen = ImageDataGenerator(**data_gen_args)
    img_generator = datagen.flow_from_directory(
        img_path, 
        target_size=IMG_SIZE, 
        class_mode=None, 
        color_mode='grayscale', 
        batch_size=BATCH_SIZE, 
        seed=SEED
    )
    msk_generator = datagen.flow_from_directory(
        msk_path, 
        target_size=IMG_SIZE, 
        class_mode=None, 
        color_mode='grayscale', 
        batch_size=BATCH_SIZE, 
        seed=SEED
    )
    while True:
        imgs = next(img_generator)
        msks = next(msk_generator)
        yield imgs, msks


In [82]:
train_generator = create_segmentation_generator(data_dir_train_image, data_dir_train_mask, BATCH_SIZE_TRAIN)
test_generator = create_segmentation_generator(data_dir_test_image, data_dir_test_mask, BATCH_SIZE_TEST)

In [84]:
def display(display_list):
    plt.figure(figsize=(15,15))
    
    title = ['Input Image', 'True Mask', 'Predicted Mask']
    
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]), cmap='gray')
    plt.show()

In [None]:
def show_dataset(datagen, num=1):
    for i in range(0,num):
        image,mask = next(datagen)
        display([image[0], mask[0]])
show_dataset(test_generator, num=2)

In [88]:
def unet(n_levels, initial_features=32, n_blocks=2, kernel_size=3, pooling_size=2, in_channels=1, out_channels=1):
    inputs = keras.layers.Input(shape=(IMAGE_HEIGHT, IMAGE_WIDTH, in_channels))
    x = inputs
    
    convpars = dict(kernel_size=kernel_size, activation='relu', padding='same')
    
    #downstream
    skips = {}
    for level in range(n_levels):
        for _ in range(n_blocks):
            x = keras.layers.Conv2D(initial_features * 2 ** level, **convpars)(x)
        if level < n_levels - 1:
            skips[level] = x
            x = keras.layers.MaxPool2D(pooling_size)(x)
            
    # upstream
    for level in reversed(range(n_levels-1)):
        x = keras.layers.Conv2DTranspose(initial_features * 2 ** level, strides=pooling_size, **convpars)(x)
        x = keras.layers.Concatenate()([x, skips[level]])
        for _ in range(n_blocks):
            x = keras.layers.Conv2D(initial_features * 2 ** level, **convpars)(x)
            
    # output
    activation = 'sigmoid' if out_channels == 1 else 'softmax'
    x = keras.layers.Conv2D(out_channels, kernel_size=1, activation=activation, padding='same')(x)
    return keras.Model(inputs=[inputs], outputs=[x], name=f'UNET-L{n_levels}-F{initial_features}')

In [90]:
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
model_path = "model_checkpoint.h5"
callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=2, min_lr=0.0001, verbose=1),
    ]

In [92]:
smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

In [94]:
from tensorflow.keras.metrics import AUC

NUM_TRAIN = 7694
NUM_TEST = 961
EPOCH_STEP_TRAIN = NUM_TRAIN // BATCH_SIZE_TRAIN
EPOCH_STEP_TEST = NUM_TEST // BATCH_SIZE_TEST

model = unet(4)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice_coef,'accuracy'])

In [96]:
model.summary()

Model: "UNET-L4-F32"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 512, 512, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_45 (Conv2D)             (None, 512, 512, 32  320         ['input_4[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_46 (Conv2D)             (None, 512, 512, 32  9248        ['conv2d_45[0][0]']              
                                )                                                       

In [None]:
history = model.fit(x=train_generator, 
                    steps_per_epoch=EPOCH_STEP_TRAIN, 
                    validation_data=test_generator, 
                    validation_steps=EPOCH_STEP_TEST,
                    callbacks=callbacks,
                    epochs=40)

In [100]:
    def plot_metric(history, metric, save_path):
        plt.figure(figsize=(10, 6))
        plt.plot(history.history[metric], label=f"Train {metric}")
        plt.plot(history.history[f"val_{metric}"], label=f"Validation {metric}")
        plt.title(f"Training and Validation {metric.capitalize()}")
        plt.xlabel("Epochs")
        plt.ylabel(metric.capitalize())
        plt.legend()
        plt.grid()
        plt.savefig(save_path)
        plt.close()

In [102]:
save_path_loss = "Path/loss.png"
save_path_accuracy = "Path/accuracy.png"
save_path_dice = "Path/dice.png"
plot_metric(history, "loss", save_path_loss)
plot_metric(history, "dice_coef", save_path_dice)
plot_metric(history, "accuracy", save_path_accuracy)


In [104]:
model.save('Path/trained_weight.h5')

# Testing your Prediction with CPU

In [18]:
import h5py
savedModelPath = 'Weight_file.h5'
f = h5py.File(savedModelPath, mode="r+")
model_config_string = f.attrs.get("model_config")
if model_config_string.find('"groups": 1,') != -1:
    model_config_string = model_config_string.replace('"groups": 1,', '')
    f.attrs.modify('model_config', model_config_string)
    f.flush()
    model_config_string = f.attrs.get("model_config")
    assert model_config_string.find('"groups": 1,') == -1
f.close()

## Save predictions

In [260]:
from keras.models import load_model
model = load_model('Weight_file.h5')

In [311]:
from PIL import Image
import numpy as np

def load_and_preprocess_image(image_path, target_size):
    # Load the image
    img = Image.open(image_path)
    
    # Convert to grayscale
    img = img.convert('L')
    
    # Resize the image
    img = img.resize(target_size)
    
    # Convert the image to a numpy array
    img_array = np.array(img)
    
    # Normalize the image (if normalization was used during training)
    img_array = img_array / 255.0
    
    # Expand dimensions to match the model's expected input shape
    img_array = np.expand_dims(img_array, axis=-1)  # Add the channel dimension
    img_array = np.expand_dims(img_array, axis=0)  # Add the batch dimension
    
    return img_array
    
# Example usage
target_size = (512, 512)  # Set this to the size your model expects
image_path = 'image.png'
img_array = load_and_preprocess_image(image_path, target_size)

In [313]:
predicted_mask = model.predict(img_array)
predicted_mask = np.squeeze(predicted_mask, axis=0) 
predicted_mask = (predicted_mask > 0.5).astype(np.uint8)
original_image = Image.open(image_path)
original_size = original_image.size
mask_image = Image.fromarray(predicted_mask.squeeze())
mask_image = mask_image.resize(original_size, Image.NEAREST)
mask_image.save('test.png')

# pred_mask = (model.predict(image_path)[0] > 0.5).astype(np.uint8)

