# U-Net notebook

Authors:
- Szymon Siemieniuk 151947
- Nikita Makarevich 153989

The whole notebook was executed using **Google Colab T4 GPU**

## Setup of environment

## Define paths

In [None]:
BASE_FOLDER = '/content/drive/MyDrive/cv_project3/'
DATASET_PATH = BASE_FOLDER + 'dataset/small_data.zip'

UNET_FOLDER = BASE_FOLDER + 'unet/'

UHYPER_FOLDER = UNET_FOLDER + 'hyperparameters/'
UHIST_FOLDER = UNET_FOLDER + "histories/"
UMODELS_FOLDER = UNET_FOLDER + "models/"

### Drive mounting

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Cloning dataset

In [None]:
! cp DATASET_PATH .

cp: cannot stat 'DATASET_PATH': No such file or directory


In [None]:
! unzip ./small_data.zip -d ./small_data

Archive:  ./small_data.zip
replace ./small_data/test_ground-truth/9995.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

### Cloning source code

In [None]:
! rm ./*.py
! rm -r ./fingerprint-inpainting
! git clone https://github.com/AmevinLS/fingerprint-inpainting.git
! cp ./fingerprint-inpainting/*.py ./
! ls

Cloning into 'fingerprint-inpainting'...
remote: Enumerating objects: 159, done.[K
remote: Counting objects: 100% (159/159), done.[K
remote: Compressing objects: 100% (105/105), done.[K
remote: Total 159 (delta 83), reused 123 (delta 49), pack-reused 0[K
Receiving objects: 100% (159/159), 15.59 MiB | 15.14 MiB/s, done.
Resolving deltas: 100% (83/83), done.
create_small_dataset_new.py  data_postprocessing.py  metrics.py   sample_data  small_data.zip
create_small_dataset.py      data_preprocessing.py   model.png	  schemas.py   tuners.py
data_augmentation.py	     drive		     models.py	  server.py
data_loading.py		     fingerprint-inpainting  __pycache__  small_data


### Import libraries

In [50]:
import glob
import os

import cv2
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import seaborn as sns
import tensorflow as tf

import src.data_scripts.data_augmentation as aug
import src.data_scripts.data_preprocessing as prep
import src.data_scripts.data_loading as load

from src.ml.models import get_unet, get_custom_conv_and_dense, get_custom_autoencoder, UnetGAN
from src.ml.metrics import MSSSIM_L1_Loss, MSSSIM_Loss, PSNRMetric, SSIMMetric

PATH_TO_INPUT_TRAIN = "./small_data/training_input"
PATH_TO_TRUTH_TRAIN = "./small_data/training_ground-truth"

PATH_TO_INPUT_VALID = "./small_data/validation_input"
PATH_TO_TRUTH_VALID = "./small_data/validation_ground-truth"

PATH_TO_INPUT_TEST = "./small_data/test_input"
PATH_TO_TRUTH_TEST = "./small_data/test_ground-truth"

### Loading dataset

In [None]:
input_func_list = [prep.resize,
                   aug.Augment(aug.random_brightness, prob=0.3),
                   aug.Augment(aug.random_saturation, prob=0.3),
                   prep.normalize_pixels,]

truth_func_list = [prep.resize,
                   prep.invert,
                   prep.to_grayscale,
                   prep.normalize_pixels]

ds_factory = load.FingerprintDatasetFactory(input_func_list,
                                            truth_func_list,
                                            batch_size=16,
                                            seed=42)

train_ds = ds_factory.create_dataset(PATH_TO_INPUT_TRAIN, PATH_TO_TRUTH_TRAIN)
valid_ds = ds_factory.create_dataset(PATH_TO_INPUT_VALID, PATH_TO_TRUTH_VALID)
test_ds = ds_factory.create_dataset(PATH_TO_INPUT_TEST, PATH_TO_TRUTH_TEST)

Found 4000 files belonging to 1 classes.
Found 4000 files belonging to 1 classes.
Found 1000 files belonging to 1 classes.
Found 1000 files belonging to 1 classes.
Found 10000 files belonging to 1 classes.
Found 10000 files belonging to 1 classes.


## UNet

### Example usage

In [None]:
unet_model = get_unet(depth=3)
unet_model.compile(optimizer='adam', loss=['mse'])
unet_model.summary()

Model: "unet"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 512, 384, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 512, 384, 16)         448       ['input_1[0][0]']             
                                                                                                  
 dropout (Dropout)           (None, 512, 384, 16)         0         ['conv2d[0][0]']              
                                                                                                  
 conv2d_1 (Conv2D)           (None, 512, 384, 16)         2320      ['dropout[0][0]']             
                                                                                               

### Hyperparameters

In [None]:
! pip install -q -U keras-tuner

In [None]:
import keras_tuner as kt

In [None]:
def tune_model_mse(hp):
    hp_act = hp.Choice('activation', values=['relu', 'elu', 'tanh'])
    hp_depth = hp.Choice('depth', values=[2, 3, 4, 5])
    hp_dropout = hp.Float('dropout', min_value=0.05, max_value=0.2)
    hp_optimizer = hp.Choice('optimizer', values=['adam', 'SGD', 'rmsprop'])

    model = get_unet(depth=hp_depth, activation=hp_act, dropout=hp_dropout)
    match hp_optimizer:
        case "adam":
            optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        case "SGD":
            optimizer = optimizer=tf.keras.optimizers.SGD(learning_rate=0.001)
        case "rmsprop":
            optimizer = optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.001)

    model.compile(optimizer=optimizer, loss='mse')
    return model

def tune_model_ms_ssim(hp):
    hp_act = hp.Choice('activation', values=['relu', 'elu', 'tanh'])
    hp_depth = hp.Choice('depth', values=[2, 3, 4, 5])
    hp_dropout = hp.Float('dropout', min_value=0.05, max_value=0.2)
    hp_optimizer = hp.Choice('optimizer', values=['adam', 'SGD', 'rmsprop'])

    model = get_unet(depth=hp_depth, activation=hp_act, dropout=hp_dropout)
    match hp_optimizer:
        case "adam":
            optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        case "SGD":
            optimizer = optimizer=tf.keras.optimizers.SGD(learning_rate=0.001)
        case "rmsprop":
            optimizer = optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.001)

    model.compile(optimizer=optimizer, loss=MSSSIM_Loss())
    return model

def tune_model_mae(hp):
    hp_act = hp.Choice('activation', values=['relu', 'elu', 'tanh'])
    # hp_depth = hp.Choice('depth', values=[2, 3, 4, 5])
    hp_depth = hp.Choice('depth', values=[3])
    hp_dropout = hp.Float('dropout', min_value=0.05, max_value=0.2)
    hp_optimizer = hp.Choice('optimizer', values=['adam', 'SGD', 'rmsprop'])

    model = get_unet(depth=hp_depth, activation=hp_act, dropout=hp_dropout)
    match hp_optimizer:
        case "adam":
            optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        case "SGD":
            optimizer = optimizer=tf.keras.optimizers.SGD(learning_rate=0.001)
        case "rmsprop":
            optimizer = optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.001)

    model.compile(optimizer=optimizer, loss='mae')
    return model

#### Print best found hyperparameters

In [None]:
losses = [
    ('mse', 'mse'),
    ('ms_ssim', MSSSIM_Loss()),
    # ('mae', 'mae')
]

for s, loss in losses:
    func = None
    if s == 'mse':
        func = tune_model_mse
    elif s == 'ms_ssim':
        func = tune_model_ms_ssim
    elif s == 'mae':
        func = tune_model_mae
    else:
        raise ValueError("Invalid parameter")

    tuner = kt.Hyperband(func,
                         objective='val_loss',
                         max_epochs=5,
                         factor=3,
                         directory=UHYPER_FOLDER,
                         project_name=s)

    params = tuner.get_best_hyperparameters()[0]
    print(params.values)

Reloading Tuner from /content/drive/MyDrive/cv_project3/unet/hyperparameters/mse/tuner0.json
{'activation': 'elu', 'depth': 3, 'dropout': 0.07745662128302169, 'optimizer': 'adam', 'tuner/epochs': 5, 'tuner/initial_epoch': 2, 'tuner/bracket': 1, 'tuner/round': 1, 'tuner/trial_id': '0003'}
Reloading Tuner from /content/drive/MyDrive/cv_project3/unet/hyperparameters/ms_ssim/tuner0.json
{'activation': 'relu', 'depth': 5, 'dropout': 0.059535321530272856, 'optimizer': 'rmsprop', 'tuner/epochs': 2, 'tuner/initial_epoch': 0, 'tuner/bracket': 1, 'tuner/round': 0}


# U-Net Training

In [None]:
# losses = [
#     ('mse', 'mse'),
#     ('ms_ssim', MSSSIM_Loss()),
#     ('mae', 'mae')
# ]

# metrics = [
#     SSIMMetric(),
#     PSNRMetric(),
# ]

# EPOCHS=15

# for s, loss in losses:
#     func = None
#     if s == 'mse':
#         func = tune_model_mse
#     elif s == 'ms_ssim':
#         func = tune_model_ms_ssim
#     elif s == 'mae':
#         func = tune_model_mae
#     else:
#         raise ValueError("Invalid parameter")

#     tuner = kt.Hyperband(func,
#                          objective='val_loss',
#                          max_epochs=5,
#                          factor=3,
#                          directory='/content/drive/MyDrive/',
#                          project_name='cv_params_'+s)

#     tuner.search(train_ds, validation_data=valid_ds, epochs=5)

#     params = tuner.get_best_hyperparameters()[0]
#     print(params.values)

#     activation = params.values['activation']
#     depth = params.values['depth']
#     dropout = params.values['dropout']
#     optimizer = params.values['optimizer']

#     unet_model = get_unet(depth, dropout, activation)
#     unet_model.compile(optimizer=optimizer,
#                        loss=loss,
#                        metrics=metrics)

#     MC_PATH = UMODELS_FOLDER + 'model_' + s + '.{epoch:02d}.h5'
#     HIS_PATH = UHIST_FOLDER + 'history_' + s + '.csv'

#     y_callbacks = [
#         tf.keras.callbacks.EarlyStopping(patience=3),
#         tf.keras.callbacks.ModelCheckpoint(filepath=MC_PATH,
#                                            monitor='val_loss',
#                                            save_best_only=False,
#                                            save_weights_only=False),
#         tf.keras.callbacks.CSVLogger(HIS_PATH)
#     ]

#     print(f"Now executes {loss}")

#     history = unet_model.fit(train_ds,
#                              epochs=EPOCHS,
#                              validation_data=valid_ds,
#                              callbacks=y_callbacks)


Trial 1 Complete [00h 08m 40s]
val_loss: 0.20386290550231934

Best val_loss So Far: 0.20386290550231934
Total elapsed time: 00h 08m 40s

Search: Running Trial #2

Value             |Best Value So Far |Hyperparameter
tanh              |tanh              |activation
3                 |3                 |depth
0.10575           |0.080806          |dropout
adam              |SGD               |optimizer
2                 |2                 |tuner/epochs
0                 |0                 |tuner/initial_epoch
1                 |1                 |tuner/bracket
0                 |0                 |tuner/round

Epoch 1/2
  6/250 [..............................] - ETA: 3:40 - loss: 0.2432





KeyboardInterrupt: 

### Train U-Net with MAE (no hiperparameter tuning)

In [None]:
metrics = [
    SSIMMetric(),
    PSNRMetric(),
]

EPOCHS=20

unet_model = get_unet(depth=4, dropout=0.15, activation='relu')
unet_model.compile(optimizer='adam',
                    loss='mae',
                    metrics=metrics)

MC_PATH = UMODELS_FOLDER + 'model_mae.{epoch:02d}.h5'
HIS_PATH = UHIST_FOLDER + 'history_mae.csv'

y_callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=3),
    tf.keras.callbacks.ModelCheckpoint(filepath=MC_PATH,
                                        monitor='val_loss',
                                        save_best_only=False,
                                        save_weights_only=False),
    tf.keras.callbacks.CSVLogger(HIS_PATH)
]

history = unet_model.fit(train_ds,
                         epochs=EPOCHS,
                         validation_data=valid_ds,
                         callbacks=y_callbacks)


# Evaluation (all models)

We evaluated our model on 10K images (batch size=16)

In [57]:
ENCODER_PATH = BASE_FOLDER + 'custom_autoencoder/custom_autoencoder.h5'
CONV_PATH = BASE_FOLDER + 'custom_conv_and_dense/custom_conv_and_dense.h5'

MAE_PATH = UMODELS_FOLDER + 'mae/model_mae.04.h5'
G_DISC_PATH = BASE_FOLDER + 'unetgan/unetgan_disc.h5'
G_GEN_PATH = BASE_FOLDER + 'unetgan/unetgan_gen.h5'

metrics = [
    SSIMMetric(),
    PSNRMetric(),
]

### U-Net

In [55]:
comb = (
    ('mse', 'mse', 'model_mse.15.h5', tune_model_mse),
    ('ms_ssim', MSSSIM_Loss(), 'model_ms_ssim.15.h5', tune_model_ms_ssim),
)

metrics = [
    SSIMMetric(),
    PSNRMetric(),
]

for loss_name, loss, weights, fun in comb:
    tuner = kt.Hyperband(fun,
                         objective='val_loss',
                         max_epochs=5,
                         factor=3,
                         directory=UHYPER_FOLDER,
                         project_name=loss_name)

    params = tuner.get_best_hyperparameters()[0]

    activation = params.values['activation']
    depth = params.values['depth']
    dropout = params.values['dropout']
    optimizer = params.values['optimizer']

    PATH_WEIGHTS = f"{UMODELS_FOLDER}{loss_name}/{weights}"

    unet_model = get_unet(depth, dropout, activation)
    unet_model.compile(optimizer, loss=loss, metrics=metrics)
    unet_model.load_weights(PATH_WEIGHTS)
    unet_model.evaluate(test_ds)

Reloading Tuner from /content/drive/MyDrive/cv_project3/unet/hyperparameters/mse/tuner0.json
Reloading Tuner from /content/drive/MyDrive/cv_project3/unet/hyperparameters/ms_ssim/tuner0.json


### GAN+U-Net

In [59]:
from tqdm import tqdm

unet_gan = UnetGAN()
unet_gan.generator.load_weights(G_GEN_PATH)

ssim = SSIMMetric()
psnr = PSNRMetric()

for test_input, test_truth in tqdm(test_ds):
    test_pred = unet_gan.generator(test_input, training=False)
    ssim.update_state(test_truth, test_pred)
    psnr.update_state(test_truth, test_pred)

print("SSIM:", ssim.result().numpy())
print("PSNR:", psnr.result().numpy())

100%|██████████| 625/625 [02:45<00:00,  3.78it/s]

SSIM: 0.6298725
PSNR: 14.37348





### Custom Conv and Dense

In [None]:
model = get_custom_conv_and_dense()
model.compile(loss='mse', metrics=metrics)
model.load_weights(CONV_PATH)
model.evaluate(test_ds)



[0.06660273671150208, 0.09966105222702026, 11.785283088684082]

### Autoencoder

In [None]:
model = get_custom_autoencoder()
model.compile(loss='mse', metrics=metrics)
model.load_weights(ENCODER_PATH)
model.evaluate(test_ds)



[0.02721424773335457, 0.35628002882003784, 13.965827941894531]