### Training the UNet model on the original dataset


In [8]:
import os
import numpy as np
import cv2
import tensorflow as tf
from glob import glob
from tensorflow.keras.layers import (
    Conv2D, BatchNormalization, MaxPooling2D, UpSampling2D, concatenate,
    Activation, Conv2DTranspose, Input
)
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger

### Seeding

In [9]:
# Seeding for reproducibility
os.environ["PYTHONHASHSEED"] = str(42)
np.random.seed(42)
tf.random.set_seed(42)

### Hyperparameters

In [10]:
# Hyperparameters
batch_size = 8
lr = 1e-4
epochs = 100
height = 768
width = 512
num_classes = 3  # "My Way", "Other Way", "Non-Drivable Area"

### Path

In [12]:
# Paths
dataset_path = "/home/ahsan/University/Thesis/UNet_Directory/Datasets/second_phase/processed_dataset/aug"
files_dir = "/home/ahsan/University/Thesis/UNet_Directory/Datasets/second_phase/files"
model_file = os.path.join(files_dir, "unet-multiclass.h5")
log_file = os.path.join(files_dir, "log-multiclass.csv")

### Creating folder

In [11]:
# Ensure directories exist
os.makedirs(files_dir, exist_ok=True)
if not os.path.exists(dataset_path):
    raise FileNotFoundError(f"Dataset path {dataset_path} does not exist.")

In [13]:
# create_dir(files_dir)

### UNET (same code as implementation notebook)


#### Convolutional Block

In [15]:
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

#### Encoder Block

In [14]:
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPooling2D((2, 2))(x)
    return x, p

### Decoder Block

In [16]:
def decoder_block(inputs, skip, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs)
    x = concatenate([x, skip])
    x = conv_block(x, num_filters)
    return x

### UNet Model

In [17]:
def build_unet(input_shape, num_classes):
    inputs = Input(input_shape)

    # Encoder
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    # Bridge
    b1 = conv_block(p4, 1024)

    # Decoder
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(num_classes, 1, padding="same", activation="softmax")(d4)
    model = Model(inputs, outputs, name="UNET")

### Dataset Pipeline

#### Loading the training and validation dataset

In [None]:
# Dataset Loading
def load_data(path):
    train_x = sorted(glob(os.path.join(path, "train", "images", "*")))
    train_y = sorted(glob(os.path.join(path, "train", "masks", "*")))
    valid_x = sorted(glob(os.path.join(path, "valid", "images", "*")))
    valid_y = sorted(glob(os.path.join(path, "valid", "masks", "*")))
    return (train_x, train_y), (valid_x, valid_y)

#### Reading the images

In [None]:
# Preprocessing Functions
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (width, height))  # Ensure the correct image size
    x = x / 255.0
    return x.astype(np.float32)

#### Reading the masks

In [None]:
def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (width, height))
    x = np.clip(x, 0, num_classes - 1).astype(np.int32)  # Ensure valid class values
    x = tf.one_hot(x, num_classes)
    return x.numpy().astype(np.float32)

#### tf.data pipeline

In [None]:
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([height, width, 3])
    y.set_shape([height, width, num_classes])
    return x, y


In [None]:
def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
# IoU Metric
def iou_metric(y_true, y_pred):
    y_pred = tf.argmax(y_pred, axis=-1)
    y_true = tf.argmax(y_true, axis=-1)
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
    return intersection / (union + tf.keras.backend.epsilon())

### Training

In [None]:
# Load dataset
(train_x, train_y), (valid_x, valid_y) = load_data(dataset_path)

print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")

In [None]:
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

In [None]:
# Build model
input_shape = (height, width, 3)
model = build_unet(input_shape, num_classes)
# model.summary()

In [None]:
# Learning rate scheduling
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=lr, decay_steps=10000, decay_rate=0.9
)

In [None]:
model.compile(
    loss="categorical_crossentropy",
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
    metrics=["accuracy", iou_metric]
)

In [None]:
# Callbacks
callbacks = [
    ModelCheckpoint(model_file, verbose=1, save_best_only=True),
    ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=4),
    CSVLogger(log_file),
    EarlyStopping(monitor="val_loss", patience=20, restore_best_weights=True),
]

In [None]:
# Train the model
model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=callbacks
)

print("Training complete. Model saved at:", model_file)