Import & batch size

In [None]:
import os
import matplotlib.pyplot as plt # Needed when showing examples
import cv2                      # Needed when showing examples
import numpy as np

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"  # ignore TF unsupported NUMA warnings


# Filter out TFA warning
#import warnings
#warnings.filterwarnings("ignore", message="TFA has entered a minimal maintenance and release mode", category=Warning)


import tensorflow as tf
import tensorflow_addons as tfa


# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)



BATCH_SIZE = 4

Importing dataset

In [None]:
print("Started dataset loading...")

#input_path = "/mnt/c/Users/jacop/Desktop/DL_Project/processed_dataset/" #if in wsl
input_path = "/mnt/c/Users/vitto/Desktop/DL project/DL project github/processed_dataset/" #if in wsl
#input_path = r"C:\Users\vitto\Desktop\DL project\DL project github\processed_dataset"  # if in windows
#input_path = r"C:\Users\jacop\Desktop\DL_Project\processed_dataset"  # if in windows


# =========== Images =========== #


# Definizione della funzione per l'elaborazione delle immagini
def process_image(x):
    # Leggi il contenuto del file immagine come sequenza di byte
    byte_img = tf.io.read_file(x)
    # Decodifica l'immagine utilizzando il formato JPEG
    img = tf.io.decode_jpeg(byte_img)
    # Convert images to grayscale
    img = tf.image.rgb_to_grayscale(img)
    img = tf.squeeze(img, axis=-1)
    # Normalizza i valori dei pixel nell'intervallo [0, 1]
    img = img / 255
    return img


# Creazione di un dataset per le immagini di addestramento
train_images = tf.data.Dataset.list_files(
    input_path + "/images/train/*.jpg", shuffle=False
)
# Applica la funzione process_image a ciascun elemento del dataset
train_images = train_images.map(process_image)

# Creazione di un dataset per le immagini di test
test_images = tf.data.Dataset.list_files(
    input_path + "/images/test/*.jpg", shuffle=False
)
# Applica la funzione process_image a ciascun elemento del dataset
test_images = test_images.map(process_image)

# Creazione di un dataset per le immagini di validazione
val_images = tf.data.Dataset.list_files(input_path + "/images/val/*.jpg", shuffle=False)
# Applica la funzione process_image a ciascun elemento del dataset
val_images = val_images.map(process_image)


# =========== Labels =========== #


# Definizione di una funzione per caricare le etichette da un file di testo
def load_labels(path):
    # Inizializza una lista vuota per le etichette
    landmarks = []

    # Apre il file di etichette specificato da `path` in modalità di lettura con encoding "utf-8"
    with open(path.numpy(), "r", encoding="utf-8") as file:
        # Salta la prima riga del file
        next(file, None)

        # Itera attraverso le righe rimanenti nel file
        for line in file:
            # Divide ciascuna riga in colonne utilizzando il carattere di tabulazione '\t' come delimitatore
            columns = line.strip().split("\t")

            # Estrae i valori X e Y dalle colonne 1 e 2 (considerando gli indici 0-based)
            # e li converte in valori float. Inoltre, li normalizza dividendo per 256
            x_value = float(columns[1]) / 256
            y_value = float(columns[2]) / 256

            # Aggiunge i valori X e Y normalizzati alla lista `landmarks`
            landmarks.extend([x_value, y_value])

    # Convert `landmarks` to a NumPy array
    return np.array(landmarks)


# Create train, test and val label dataset and load their labels using the corresponding function
train_labels = tf.data.Dataset.list_files(input_path + "/labels/train/*.txt", shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))

test_labels = tf.data.Dataset.list_files(input_path + "/labels/test/*.txt", shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))

val_labels = tf.data.Dataset.list_files(input_path + "/labels/val/*.txt", shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))


# =========== Combine Images and Labels =========== #

# Creazione del dataset di addestramento combinando immagini ed etichette
train = tf.data.Dataset.zip((train_images, train_labels))
test = tf.data.Dataset.zip((test_images, test_labels))
val = tf.data.Dataset.zip((val_images, val_labels))

"""
# Mescola casualmente l'ordine degli elementi nel dataset di addestramento
train = train.shuffle(1500)

# Raggruppa gli elementi del dataset in batch di dimensione specificata da `BATCH_SIZE`
train = train.batch(BATCH_SIZE)

# Implementa il prefetching per caricare in anticipo i dati del prossimo batch
train = train.prefetch(4)

# Lo stesso procedimento viene ora applicato ai dataset di test e validazione


# Mescola casualmente l'ordine degli elementi nel dataset di test
test = test.shuffle(1000)

# Raggruppa gli elementi del dataset in batch di dimensione specificata da `BATCH_SIZE`
test = test.batch(BATCH_SIZE)

# Implementa il prefetching per caricare in anticipo i dati del prossimo batch
test = test.prefetch(4)

# Mescola casualmente l'ordine degli elementi nel dataset di validazione
val = val.shuffle(1000)

# Raggruppa gli elementi del dataset in batch di dimensione specificata da `BATCH_SIZE`
val = val.batch(BATCH_SIZE)

# Implementa il prefetching per caricare in anticipo i dati del prossimo batch
val = val.prefetch(4)
"""

"""
# =========== Show some examples =========== #

data_samples = train.as_numpy_iterator()
res = data_samples.next()

fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][0][idx]
    
    for i in range(0, len(sample_coords), 2):
        x, y = sample_coords[i], sample_coords[i + 1]
        x_pixel = int(x * 256)
        y_pixel = int(y * 256)
        cv2.circle(sample_image, (x_pixel, y_pixel), 2, (255, 0, 0), -1)
    
    ax[idx].imshow(sample_image,cmap='gray', vmin=0, vmax=1)
plt.show()
"""

U-net generators

In [None]:
print("Started U-Net training...")
 
# We only need images in this part
train_images_only = train.map(lambda x, y: x)
test_images_only = test.map(lambda x, y: x)
val_images_only = val.map(lambda x, y: x)


# Create a generator function to yield the combined image tensor
train_list = list(train_images_only)
def train_image_generator():
    for moving_image in train_list:
        # The first image in train_images_only is considered as "fixed_image"
        fixed_image = train_list[1]

        # Combine fixed and moving images into a single tensor
        combined_image = tf.stack([fixed_image,moving_image],-1)
        yield (combined_image, fixed_image)


# Create a TensorFlow dataset from the generator
train_images_dataset = tf.data.Dataset.from_generator(
    train_image_generator,
    output_signature=(tf.TensorSpec(shape=(256, 256, 2), dtype=tf.float32),
                      tf.TensorSpec(shape=(256, 256), dtype=tf.float32))
    )
train_images_dataset = train_images_dataset.shuffle(1000).batch(BATCH_SIZE)

val_list = list(val_images_only)



# Create a generator function to yield the combined image tensor for validation
def val_image_generator():
    for moving_image in val_list:
        fixed_image = train_list[1]
        combined_image = tf.stack([fixed_image, moving_image], -1)
        yield (combined_image, fixed_image)

# Create a TensorFlow dataset from the generator for validation
val_images_dataset = tf.data.Dataset.from_generator(
    val_image_generator,
    output_signature=(
        tf.TensorSpec(shape=(256, 256, 2), dtype=tf.float32),
        tf.TensorSpec(shape=(256, 256), dtype=tf.float32)
    )
)

# No need to shuffle for the validation set
val_images_dataset = val_images_dataset.batch(BATCH_SIZE)

U-net downsampling path

In [None]:
input_shape = (256, 256, 2)


# Due immagini, una fixed e una moving, prese come input
input = tf.keras.layers.Input(shape=input_shape)

### Downsampling path ###

# Applicazione della funzione di attivazione Leaky ReLU con coefficiente alpha
a1 = tf.keras.layers.LeakyReLU(alpha=0.01)(input)

# Applicazione di un layer di convoluzione 2D con 64 filtri di dimensione 3x3
c1 = tf.keras.layers.Conv2D(64, 3, padding="same", kernel_initializer="he_normal")(a1)

# Applicazione del layer di Dropout con un tasso di dropout del 10%
c1 = tf.keras.layers.Dropout(0.1)(c1)

# Applicazione del max-pooling con una finestra di pooling di dimensione 2x2
p1 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c1)


a2 = tf.keras.layers.LeakyReLU(alpha=0.01)(p1)
c2 = tf.keras.layers.Conv2D(128, 3, padding="same", kernel_initializer="he_normal")(a2)
c2 = tf.keras.layers.Dropout(0.1)(c2)
p2 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c2)

a3 = tf.keras.layers.LeakyReLU(alpha=0.01)(p2)
c3 = tf.keras.layers.Conv2D(256, 3, padding="same", kernel_initializer="he_normal")(a3)
c3 = tf.keras.layers.Dropout(0.2)(c3)
p3 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c3)

a4 = tf.keras.layers.LeakyReLU(alpha=0.01)(p3)
c4 = tf.keras.layers.Conv2D(512, 3, padding="same", kernel_initializer="he_normal")(a4)
c4 = tf.keras.layers.Dropout(0.2)(c4)
p4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c4)

a5 = tf.keras.layers.LeakyReLU(alpha=0.01)(p4)
c5 = tf.keras.layers.Conv2D(1024, 3, padding="same", kernel_initializer="he_normal")(a5)
c5 = tf.keras.layers.Dropout(0.3)(c5)