In [None]:
## Inštalácia potrebných komponentov ##
## Nutné inštalovať každú zvlášť ##

pip install pydot
pip install git+https://github.com/tensorflow/examples.git
pip install graphviz
pip install opencv-python

In [69]:
## Načítanie potrebných knižníc ##

from IPython.display import clear_output
import matplotlib.pyplot as plt
import numpy as np
import cv2
from tensorflow_examples.models.pix2pix import pix2pix
import tensorflow as tf
import os
from tensorflow.keras.layers import Dropout
from sklearn.model_selection import train_test_split

In [70]:
## Priradenie ciest k trénovacím, testovacím obrázkom a ich maskám ##
## V prípade trénovania na iné dáta, stačí zmeniť cestu priamo tu. Zvyšok kódu ostáva nezmenený ##

train_images_path = "/home/jovyan/work/bakalar_praca/scss-net/data1/171_train/171(spoca)/imgs/"
train_masks_path = "/home/jovyan/work/bakalar_praca/scss-net/data1/171_train/171(spoca)/masks/"
test_images_path = "/home/jovyan/work/bakalar_praca/scss-net/data1/171_test/imgs/"
test_masks_path = "/home/jovyan/work/bakalar_praca/scss-net/data1/171_test/masks(spoca)/"

In [71]:
## Mapovanie štítkov do 3 kategorii (OUTPUT CLASSES = 3) ##


OUTPUT_CLASSES = 3

unique_labels = [
     0, 18, 54, 66, 43, 9, 40, 114, 185, 88, 2, 38, 177, 232, 254, 255, 248, 216,
     242, 237, 17, 11, 33, 178, 174, 133, 168, 63, 164, 241, 28, 8, 152, 214, 49, 16,
     192, 219, 14, 50, 244, 202, 68, 10, 153, 235, 190, 65, 64, 36, 52, 239, 176, 62,
     116, 209, 228, 106, 24, 167, 166, 238, 213, 208, 23, 51, 187, 15, 31, 206, 84, 224,
     245, 200, 247, 196, 105, 57, 173, 21, 136, 227, 108, 6, 205, 253, 160, 243, 218, 87,
     123, 59, 37, 55, 91, 29, 56, 128, 77, 231, 229, 110, 41, 230, 93, 79, 58
]

label_map = {
    0: 0}

class_counter = 0
for label in unique_labels:
    if label not in label_map:
        label_map[label] = class_counter
        class_counter = (class_counter + 1) % 3  

In [72]:
## Načítanie a úprava obrázkov ##

def load_and_preprocess_image(image_path, mask_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, [256, 256])
    image = tf.cast(image, tf.float32) / 255.0
    
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, [256, 256], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    
    new_mask = tf.zeros_like(mask)
    for original_label, new_label in label_map.items():
        mask_of_label = tf.equal(mask, tf.constant(original_label, dtype=tf.uint8))
        new_mask = tf.where(mask_of_label, tf.constant(new_label, dtype=tf.uint8), new_mask)
        
    return image, new_mask

In [73]:
## Augmentácia trénovacích dát horizontálnym otočením o 90 stupňov, náhodným jasom a kontrastom ##


def augment(image, mask):
    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_left_right(image)
        mask = tf.image.flip_left_right(mask)
    
    k = tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)
    image = tf.image.rot90(image, k)
    mask = tf.image.rot90(mask, k)
    
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    
    return image, mask

In [74]:
## Definícia vytvorenia datasetu ##

def create_dataset(image_paths, mask_paths, use_augmentation=True):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, mask_paths))
    dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    if use_augmentation:
        dataset = dataset.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset

In [75]:
## Vytvorenie ciest s jednotlivým dátam ##
## Pomocou definície create_dataset() sa vytvoria 3 datasety (trénovací a testovací a validačný, ktorý tvorí 20% z trénovacieho) ##

train_image_files, val_image_files, train_mask_files, val_mask_files = train_test_split(
    [os.path.join(train_images_path, file_name) for file_name in os.listdir(train_images_path) if os.path.isfile(os.path.join(train_images_path, file_name))],
    [os.path.join(train_masks_path, file_name) for file_name in os.listdir(train_masks_path) if os.path.isfile(os.path.join(train_masks_path, file_name))],
    test_size=0.2, random_state=42)

train_dataset = create_dataset(train_image_files, train_mask_files, use_augmentation=True)
val_dataset = create_dataset(val_image_files, val_mask_files, use_augmentation=False)
test_dataset = create_dataset(
    [os.path.join(test_images_path, file_name) for file_name in os.listdir(test_images_path) if os.path.isfile(os.path.join(test_images_path, file_name))],
    [os.path.join(test_masks_path, file_name) for file_name in os.listdir(test_masks_path) if os.path.isfile(os.path.join(test_masks_path, file_name))],
    use_augmentation=False)

In [None]:
## Výpis počtu záznamov v trénovacej a testovacej množine ##

def count_files(directory):
    return len([name for name in os.listdir(directory) if os.path.isfile(os.path.join(directory, name))])

num_train = count_files(train_images_path)
num_test = count_files(test_images_path)

print(f"Number of images in training dataset: {num_train}")
print(f"Number of images in testing dataset: {num_test}")

In [77]:
## Definovanie dôležitých parametrov trénovania ##

LEARNING_RATE = 0.0002
BATCH_SIZE = 23
STEPS_PER_EPOCH = len(train_image_files) // BATCH_SIZE
VALIDATION_STEPS = len(val_image_files) // BATCH_SIZE

In [78]:
## Spracovanie trénovacích, validačných a testovacích dát ##

train_batches = train_dataset.cache().shuffle(1000).batch(BATCH_SIZE).repeat().prefetch(buffer_size=tf.data.AUTOTUNE)
val_batches = val_dataset.batch(BATCH_SIZE).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
test_batches = test_dataset.batch(BATCH_SIZE)

In [79]:
## Nastavenie vizualizácie obrázkov pri trénovaní/testovaní ##

def display(display_list, title_list):
    plt.figure(figsize=(15, 5))
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i + 1)
        plt.title(title_list[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
## Vytvorenie a konfigurácia základného modelu ###

base_model = tf.keras.applications.MobileNetV2(input_shape=[256, 256, 3], include_top=False)

layer_names = [
    'block_1_expand_relu',   
    'block_3_expand_relu',   
    'block_6_expand_relu',   
    'block_13_expand_relu',  
    'block_16_project',      
]

base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

down_stack.trainable = False

In [81]:
## Up-sampling, ktoré sa používa na zväčšenie priestorových rozmerov dát pri segmentácii obrazu ##

up_stack = [
    pix2pix.upsample(512, 3),  
    pix2pix.upsample(256, 3),  
    pix2pix.upsample(128, 3),  
    pix2pix.upsample(64, 3), 
]

In [82]:
## Definovanie a vytvorenie modelu U-net ##

def unet_model(output_channels: int, dropout_rate=0.2):
    inputs = tf.keras.layers.Input(shape=[256, 256, 3])

    skips = down_stack(inputs)
    x = skips[-1]
    skips = reversed(skips[:-1])

    for up, skip in zip(up_stack, skips):
        x = up(x)
        concat = tf.keras.layers.Concatenate()
        x = concat([x, skip])
        x = Dropout(dropout_rate)(x)

    # This is the last layer of the model
    last = tf.keras.layers.Conv2DTranspose(
        filters=output_channels, kernel_size=3, strides=2,
        padding='same')  

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

model = unet_model(output_channels=OUTPUT_CLASSES, dropout_rate=0.2) 
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [84]:
## Spracovanie predikovaných masiek ##

def create_mask(pred_mask):
    pred_mask = tf.math.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

In [85]:
## Definovanie zobrazenia výsledkov predikcií ##

def show_predictions(dataset=None, num=4):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask,
             create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [86]:
## Definovanie vizuálneho zobrazenia výstupu pri trénovaní ##

class DisplayCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        clear_output(wait=True)
        
        for images, masks in train_batches.take(1):
            pred_mask = model.predict(images)
            print("Shape of Predicted Mask:", pred_mask.shape)
            print("Shape of True Mask:", masks.shape)
            
            if len(images) > 0:
                sample_image, sample_mask = images[0], masks[0]
                pred_mask_img = create_mask(pred_mask[0:1])  
                display([sample_image, sample_mask, pred_mask_img], ['Input Image', 'True Mask', 'Predicted Mask'])
            
            augmented_image, augmented_mask = augment(images[0], masks[0])
            display([images[0], augmented_image, augmented_mask], ['Original Image', 'Augmented Image', 'Augmented Mask'])
        
        print('\nSample Prediction after epoch {}\n'.format(epoch + 1))

In [None]:
## Výpis informácii o modeli ##

model.summary()

In [None]:
## Trénovanie modelu ##

EPOCHS = 80

model_history = model.fit(
    train_batches, 
    epochs=EPOCHS,
    steps_per_epoch=STEPS_PER_EPOCH,
    validation_data=val_batches,
    validation_steps=VALIDATION_STEPS,
    callbacks=[DisplayCallback()]  
)

In [88]:
## Definovanie logiky namapovania predikovanej masky na pôvodný obrázok ##

def create_overlay(image, pred_mask, true_mask=None, alpha=0, contour_thickness = 1):
    image = np.uint8(image * 255)
    
    pred_mask = np.squeeze(pred_mask)  
    color_mask = np.zeros_like(image, dtype=np.uint8)
    
    overlay = cv2.addWeighted(image, 1 - alpha, color_mask, alpha, 0)
    
    if contour_thickness > 0:
        contours, _ = cv2.findContours(pred_mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        cv2.drawContours(overlay, contours, -1, (255, 0, 0), contour_thickness)

    return overlay

In [89]:
## Definovanie IoU a Dice skóre ##

def iou_score(true, pred):
    pred = tf.math.argmax(pred, axis=-1)
    pred = tf.expand_dims(pred, axis=-1)
    true = tf.cast(true, tf.bool)
    pred = tf.cast(pred, tf.bool)
    intersection = tf.logical_and(true, pred)
    union = tf.logical_or(true, pred)
    intersection = tf.cast(intersection, tf.float32)
    union = tf.cast(union, tf.float32)
    iou = tf.reduce_sum(intersection) / tf.reduce_sum(union)
    return iou.numpy()

def dice_score(true, pred):
    pred = tf.math.argmax(pred, axis=-1)
    pred = tf.expand_dims(pred, axis=-1)
    true = tf.cast(true, tf.bool)
    pred = tf.cast(pred, tf.bool)
    intersection = tf.logical_and(true, pred)
    intersection = tf.cast(intersection, tf.float32)
    dice = (2 * tf.reduce_sum(intersection)) / (tf.reduce_sum(tf.cast(true, tf.float32)) + tf.reduce_sum(tf.cast(pred, tf.float32)))
    return dice.numpy()

In [None]:
## Zobrazenie num=x  počtu predikcií z testovacej množiny ##

def show_predictions(dataset, num=360):
    displayed = 0
    total_iou = 0
    total_dice = 0
    
    for images, masks in dataset:
        if displayed >= num:
            break
        
        pred_masks = model.predict(images)
        for i in range(len(images)):
            if displayed >= num:
                break
            
            pred_mask = create_mask(pred_masks[i:i+1])
            overlay = create_overlay(images[i].numpy(), pred_mask.numpy(), masks[i].numpy())
            
            iou = iou_score(masks[i:i+1], pred_masks[i:i+1])
            dice = dice_score(masks[i:i+1], pred_masks[i:i+1])
            
            total_iou += iou
            total_dice += dice
            
            display_list = [images[i], masks[i], pred_mask, overlay]
            titles = ['Input Image', 'True Mask', 'Predicted Mask', 'Overlay Image']
            metrics_text = f'IoU: {iou:.4f}, Dice: {dice:.4f}'
            
            plt.figure(figsize=(15, 5))
            for j in range(len(display_list)):
                plt.subplot(1, len(display_list) + 1, j+1)
                plt.title(titles[j])
                plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[j]))
                plt.axis('off')
            
            plt.subplot(1, len(display_list) + 1, len(display_list) + 1).axis('off')
            plt.text(0.5, 0.5, metrics_text, ha='center', va='center', fontsize=12, transform=plt.gca().transAxes)
            
            plt.show()
            
            displayed += 1

    if displayed > 0:
        avg_iou = total_iou / displayed
        avg_dice = total_dice / displayed
        print(f'Average IoU over {displayed} predictions: {avg_iou:.4f}')
        print(f'Average Dice over {displayed} predictions: {avg_dice:.4f}')
    else:
        print("No predictions were made.")

show_predictions(test_batches)

In [None]:
## Uloženie natrénovaného modelu ##

model.save('/home/jovyan/work/bakalar_praca/scss-net/data1/model_saved_')

In [87]:
## Načítanie natrénovaného modelu ##
model = tf.keras.models.load_model('/home/jovyan/work/bakalar_praca/scss-net/data1/model_saved_')