In [8]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from tensorflow.keras.layers import (
    Conv2D, BatchNormalization, Activation, MaxPool2D,
    Conv2DTranspose, Concatenate, Input
)
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import (
    EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger
)
from tensorflow.keras.optimizers import Adam

In [11]:
# Set seeds for reproducibility
os.environ["PYTHONHASHSEED"] = str(42)
np.random.seed(42)
tf.random.set_seed(42)

# Hyperparameters
batch_size = 1
epochs = 50
height, width = 640, 640  # Ensure height and width are defined
input_shape = (height, width, 3)
lr = 1e-4  # Learning rate

# Define convolution block
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

# Define encoder block
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

# Define decoder block
def decoder_block(inputs, skip, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs)
    x = Concatenate()([x, skip])
    x = conv_block(x, num_filters)
    return x

# Build U-Net model for segmentation
def build_unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    bl = conv_block(p4, 1024)

    dl = decoder_block(bl, s4, 512)
    d2 = decoder_block(dl, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    segmentation_output = Conv2D(1, 1, padding="same", activation="sigmoid", name="segmentation")(d4)
    model = Model(inputs, segmentation_output, name="UNET")
    return model

# Define paths and create directories
data_dir = r"/home/user/unet/data"
files_dir = data_dir

def create_dir(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Directory {directory} created.")
    else:
        print(f"Directory {directory} already exists.")

create_dir(files_dir)
model_file = os.path.join(files_dir, "unet-model.keras")
log_file = os.path.join(files_dir, "log.csv")

# Function to load and split data
def load_data(data_dir):
    def get_data_paths(subset):
        images = sorted(glob(os.path.join(data_dir, subset, "*.png")))
        masks = sorted(glob(os.path.join(data_dir, subset, "masks", "*.png")))
        return images, masks
    
    train_x, train_y = get_data_paths("train")
    valid_x, valid_y = get_data_paths("val")
    test_x, test_y = get_data_paths("test")

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

# Functions to read images and masks
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (width, height))
    x = x / 255.0
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (width, height))
    x = x / 255.0
    x = np.expand_dims(x, axis=-1)
    return x

# TensorFlow parsing and dataset functions
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float64, tf.float64])
    x.set_shape([height, width, 3])
    y.set_shape([height, width, 1])
    return x, y

def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

# Load the dataset
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(data_dir)
print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")
print(f"Test: {len(test_x)} - {len(test_y)}")

# Create datasets
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

for x, y in valid_dataset:
    print(x.shape, y.shape)
    break

# Build and compile the model
model = build_unet(input_shape)
opt = Adam(learning_rate=lr)
model.compile(loss="binary_crossentropy", optimizer=opt, metrics=["accuracy"])

model.summary()

# Callbacks for training
callbacks = [
    ModelCheckpoint(model_file, verbose=1, save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4),
    CSVLogger(log_file),
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
]

# Train the model
model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=callbacks
)

# Evaluate the model on the test set
model.evaluate(test_dataset)

Directory /home/user/unet/data already exists.
Train: 401 - 401
Valid: 86 - 86
Test: 86 - 86
(1, 640, 640, 3) (1, 640, 640, 1)


Epoch 1/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 217ms/step - accuracy: 0.7561 - loss: 0.5261
Epoch 1: val_loss improved from inf to 0.48134, saving model to /home/user/unet/data/unet-model.keras
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m157s[0m 299ms/step - accuracy: 0.7561 - loss: 0.5260 - val_accuracy: 0.8378 - val_loss: 0.4813 - learning_rate: 1.0000e-04
Epoch 2/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 223ms/step - accuracy: 0.7883 - loss: 0.4300
Epoch 2: val_loss improved from 0.48134 to 0.38120, saving model to /home/user/unet/data/unet-model.keras
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m99s[0m 247ms/step - accuracy: 0.7883 - loss: 0.4300 - val_accuracy: 0.8469 - val_loss: 0.3812 - learning_rate: 1.0000e-04
Epoch 3/50
[1m401/401[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 256ms/step - accuracy: 0.7902 - loss: 0.4259
Epoch 3: val_loss improved from 0.38120 to 0.36615, saving mo

[0.33091747760772705, 0.8394681215286255]

In [7]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")


[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 84ms/step - accuracy: 0.8263 - loss: 0.3772
Test Loss: 0.372972697019577
Test Accuracy: 0.8298653960227966
