In [None]:
# !pip install tensorflow tensorlayerx numpy easydict tqdm scikit-image

In [None]:
import os
import numpy

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
train_image_path = f'/path/to/train/folder'
val_image_path = f'/path/to/val/folder'

model_name = "define-your-own-model-code-name-here"

checkpoint_path = f'/path/to/your/model/training/backup/folder'

# existing_model = f"/path/to/npz/file/for/transfer-learning/model.npz"
existing_model = None

log_path = f"/path/to/your/log/folder"

if(not os.path.exists(checkpoint_path)):
    os.makedirs(checkpoint_path)

if(not os.path.exists(log_path)):
    os.makedirs(log_path)

# Helper Functions

In [None]:
import requests
from datetime import datetime
from pathlib import Path

"""
Example log:
[2023-10-01T00:00][INFO] Some message

Put in log_path
File name is current time session with format of [Implementation - Session Ymd H:i]
"""
def write_log(log: str, type: str, namespace: str):
    operation = "x"
    time = datetime.now()
    log_location = log_path + "/" + time.strftime("%Y%m%d") + ".init_training.log"

    if(Path(log_location).is_file()):
        operation = "a"

    fopen = open(log_location, operation)

    message = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}][{type}] [{namespace.upper()}] {log}\n"

    fopen.write(message)
    fopen.close()

# Custom Metrics Functions

In [None]:
from skimage.metrics import peak_signal_noise_ratio, structural_similarity
from skimage.color import rgb2gray
from tensorlayerx import convert_to_tensor
import re

In [None]:
"""
hard_round
    Converts the float into string first, and then turn it into float by substring
    the string value according to the configured decimal_places.

    @params float32 number
    @params int decimal_place

    @returns float32
"""
def hard_round(number, decimal_places = 0):
    non_coma_value_count = 2

    number_in_string: str = str(number)

    behind_coma_value_count = len(number_in_string.split(".")[1])

    if(behind_coma_value_count > 4):
        behind_coma_value_count = 4

    if(number > 9.9):
        non_coma_value_count = len(number_in_string.split(".")[0]) + 1

    value_length = len(number_in_string)

    substring_size = decimal_places + non_coma_value_count # Don't forget the front number 0.

    if(value_length < substring_size):
        for i in range(0, (substring_size - value_length)):
            number_in_string = number_in_string + "0"

    return float(number_in_string[0:behind_coma_value_count + non_coma_value_count])

"""
@since July, 1st 2024
MetricManager

Metrics turn so bloaty in this experiment. This class intended to clean up testing / evaluator / metrics code.
This extremely needed since the functional approach starts turns ugly and complex. This class intended to
eliminate the needs of code that needs to run multiple times for example as previous convert_chw_tensor_image_to_standard_image
that run in psnr and ssim.

Also this design pattern will ease the method call.

@example
```python
metrics = MetricManager(ori_image, gen_image)
metrics.ssim().psnr().cer_wer()

print(f"ssim: {metrics.ssim_score} - psnr: {metrics.psnr_score} - cer: {metrics.cer_score}")
```
"""
class MetricManager():

    """
    constructor
    The constructor helps to reverse Tensor images into NumPy<float32>[].
    Replacing the previous convert_chw_tensor_image_to_standard_image

    This function doing this procedure as follow:
        1. Convert current index's Tensor into NumPy array.
        2. Transpose then NumPy array, flip it into 1, 2, 0. This brings back the original HWC array format.
        3. Undo the normalization that done earlier in the dataset preprocessing step.

    @param @Tensor.float32[] | NumPy<float32> original_images: An array of tensor form of original image array.
    @param @Tensor.float32[] generated_images: An array of tensor form of original image array.
    """
    def __init__(self, original_images, generated_images):
        self.original_images = original_images
        self.generated_images = generated_images

        self.original_images = self.original_images.numpy().astype('uint8')
        self.original_images = numpy.transpose(self.original_images, [0, 2, 3, 1])
        self.original_images = (self.original_images * 127.5) + 127.5

        self.generated_images = self.generated_images.numpy().astype('uint8')
        self.generated_images = numpy.transpose(self.generated_images, [0, 2, 3, 1])
        self.generated_images = (self.generated_images * 127.5) + 127.5

    """
    ssim
        Stands for Structural Similarity Index Measurement (SSIM), is a used metric
        upon training Generator model. This endeavors, eliminate the needs of qualitative
        corpus layout analysis structure check when training is performed.

        This function presume that the original_images's array and generated_images's array
        had same shape.

        The function will loop for every images in the original_images, getting its length.
        For every index, the function is doing this procedure as follow:

        1. Convert the NumPy array into Grayscale format
        2. Call structural_similarity by skimage
        3. Scoring

        @requirements: skimage

        @return MetricManager
    """
    def ssim(self) -> 'MetricManager':
        ssim_scores = []

        for i in range (0, len(self.original_images)):
            original_image = self.original_images[i]
            generated_image = self.generated_images[i]

            original_image = rgb2gray(original_image)
            generated_image = rgb2gray(generated_image)

            ssim_value, ssim_map = structural_similarity(
                original_image,
                generated_image,
                win_size = 3,
                full = True,
                multi_channel = True,
                data_range = 255
            )

            ssim_scores.append(ssim_value)

        self.ssim_scores = hard_round(numpy.mean(ssim_scores), 4)

        return self

    """
    psnr
        Another metric used in this experimentation is Peak Signal-Noise Ratio (PSNR).
        PSNR is more standardized than MSE for scoring things.

        @requirements: skimage

        @returns MetricManager
    """
    def psnr(self) -> 'MetricManager':
        psnr_scores = []

        for i in range (0, len(self.original_images)):
            original_image = self.original_images[i]
            generated_image = self.generated_images[i]

            # Init psnr
            psnr_score = 0

            # Handle precise image
            if(numpy.array_equal(original_image, generated_image)):
                psnr_score = 99
            else:
                psnr_score = peak_signal_noise_ratio(
                    original_image,
                    generated_image,
                    data_range = 255
                )

                # Handle infinity.
                if(psnr_score > 99):
                    psnr_score = 99

            psnr_scores.append(psnr_score)

        self.psnr_scores = hard_round(numpy.mean(psnr_scores), 4)
        return self

    """"
    cer_wer
    This metric function stands for Characters Error Rate (CER), and Words Error Rate (WER).
    As its name, this function runs a live Tesseract OCR through pytesseract API, to extract
    text within image that later be used to count how fixed are the images.

    This function working as follow:
    1. Do OCR the Image
    2. Clean text from escape characters
    3. Calculate Levenshtein distance

    @requirements: pytesseract, python-Levenshtein

    @returns MetricManager
    """
    def cer_wer(self) -> 'MetricManager':
        cer_scores = []
        wer_scores = []

        for i in range (0, len(self.original_images)):
            original_image = self.original_images[i]
            generated_image = self.generated_images[i]

            original_image = original_image.astype('uint8')
            generated_image = generated_image.astype('uint8')

            original_image_ocr_result = pytesseract.image_to_string(original_image)
            generated_image_ocr_result = pytesseract.image_to_string(generated_image)

            # Clean text from escape characters
            pattern = r"\\."
            original_image_ocr_result = re.sub(pattern, "", original_image_ocr_result)
            generated_image_ocr_result = re.sub(pattern, "", generated_image_ocr_result)

            levenshtein_distance = Levenshtein.distance(original_image_ocr_result, generated_image_ocr_result)
            cer = levenshtein_distance / len(original_image_ocr_result)
            wer = levenshtein_distance / len(original_image_ocr_result.split(" "))

            cer_scores.append(cer)
            wer_scores.append(wer)

        self.cer_scores = hard_round(numpy.mean(cer_scores), 4)
        self.wer_scores = hard_round(numpy.mean(wer_scores), 4)

        return self

# Pre-Processing and Augmentation Functions

In [None]:
from tensorlayerx.dataflow import Dataset, DataLoader
from tensorlayerx.vision import load_images
from tensorlayerx.vision.transforms import Compose, RandomCrop, Normalize, Resize, HWC2CHW

In [None]:
"""
image_transformer_random_crop
    This function helps to randomly crop a part of image, by taking
    224 x 224 pixel worth image. The result, then being resized into
    56 x 56 pixel to generate its own low-resolution image.

    This function returns the transposed array version
    (CHW -> Channel Height Width) of low-resolution and original cropped
    image with value normalized into 0 to 1.

    @param @NumPy<uint8>[] image_hr

    @return
        (Numpy<int8>[], Numpy<int8>[])
"""
def image_transformer_random_crop(image_hr):
    cropper = Compose([
        RandomCrop(size=(224, 224))
    ])

    image_hr = cropper(image_hr)
    image_lr = Resize(size = (56, 56))(image_hr)

    normalization = Compose([
        Normalize(mean=(127.5), std=(127.5), data_format='HWC'),
        HWC2CHW()
    ])

    return normalization(image_lr), normalization(image_hr)

In [None]:
# Data Loader Pattern
class DatasetLoader(Dataset):

    def __init__(self, highres_image, image_transformer = image_transformer_random_crop):
        self.hr_data = highres_image
        self.image_transformer = image_transformer

    def __getitem__(self, index):
        return self.image_transformer(
            self.hr_data[index],
        )

    def __len__(self):
        return len(self.hr_data)

In [None]:
# Load data from drive
train_hr_image = load_images(path = train_image_path, n_threads = 16)
val_hr_image = load_images(path = val_image_path, n_threads = 10)

# Convert those data into numpy array instead of list.
train_hr_image = numpy.array(train_hr_image).astype('uint8')
val_hr_image = numpy.array(val_hr_image).astype('uint8')

# Data Loading and Transformation
train_dataset = DatasetLoader(train_hr_image, image_transformer = image_transformer_random_crop)
val_dataset = DatasetLoader(val_hr_image, image_transformer = image_transformer_random_crop)
print(f"Dataset for this batch - Train: {len(train_dataset)} - Val: {len(val_dataset)}")

# Data Loader
train_dataset = DataLoader(train_dataset, batch_size = 16, shuffle = True, drop_last = True)
val_dataset = DataLoader(val_dataset, batch_size = 16, shuffle = True, drop_last = True)

# Model part

In [None]:
import tensorlayerx as tlx
os.environ['TL_BACKEND'] = 'tensorflow'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

from tensorlayerx.nn import Module
from tensorlayerx.nn import Conv2d, BatchNorm2d, Elementwise, SubpixelConv2d, Flatten, Sequential, BatchNorm
from tensorlayerx.nn import Linear
from tensorlayerx import LeakyReLU, ReLU

In [None]:
class ResidualBlock(Module):

    def __init__(self, resblock_number: int = 1):
        super(ResidualBlock, self).__init__()

        self.conv1 = Conv2d(
            out_channels=128, kernel_size=(3, 3), stride=(1, 1),
            act=ReLU, padding='SAME',
            data_format='channels_first', b_init=None, name = f"conv2d_resblock_{resblock_number}_1"
        )

        self.bn1 = BatchNorm2d(
            num_features=128, act=None,
            data_format='channels_first', name = f"batchnorm2d_resblock_{resblock_number}_1"
        )

        self.conv2 = Conv2d(
            out_channels=128, kernel_size=(3, 3), stride=(1, 1),
            act=ReLU, padding='SAME',
            data_format='channels_first', b_init=None, name = f"conv2d_resblock_{resblock_number}_2"
        )

        self.bn2 = BatchNorm2d(
            num_features=128, act=None,
            data_format='channels_first', name = f"batchnorm2d_resblock_{resblock_number}_2"
        )

    def forward(self, x):
        z = self.conv1(x)
        z = self.bn1(z)
        z = self.conv2(z)
        z = self.bn2(z)
        x = x + z
        return x

In [None]:
class Generator(Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.conv1 = Conv2d(
            out_channels=128, kernel_size=(3, 3), stride=(1, 1),
            act=ReLU,
            data_format='channels_first', name = "conv2d_G_1"
        )

        self.residual_block = self.make_layer()

        self.conv2 = Conv2d(
            out_channels=128, kernel_size=(3, 3), stride=(1, 1),
            act=ReLU,
            data_format='channels_first', b_init=None, name = "conv2d_G_2"
        )

        self.bn1 = BatchNorm2d(
            num_features=128,
            data_format='channels_first', name = "batchnorm2d_G_1"
        )

        self.conv3 = Conv2d(
            out_channels=512, kernel_size=(3, 3), stride=(1, 1),
            data_format='channels_first', name = "conv2d_G_3"
        )

        self.subpixelconv1 = SubpixelConv2d(
            scale=2, act=ReLU,
            data_format='channels_first', name = "subpixelconv2d_G_1"
        )

        self.conv4 = Conv2d(
            out_channels=512, kernel_size=(3, 3), stride=(1, 1),
            data_format='channels_first', name = "conv2d_G_4"
        )

        self.subpixelconv2 = SubpixelConv2d(
            scale=2, act=ReLU,
            data_format='channels_first', name = "subpixelconv2d_G_2"
        )

        self.output = Conv2d(
            out_channels = 3, kernel_size=(1, 1), stride=(1, 1),
            act=tlx.Tanh,
            data_format='channels_first', name = "conv2d_G_output"
        )


    def make_layer(self):
        layer_list = []

        for i in range(16):
            layer_list.append(ResidualBlock(i))

        return Sequential(layer_list)

    def forward(self, x):
        x = self.conv1(x)
        temp = x
        x = self.residual_block(x)
        x = self.conv2(x)
        x = self.bn1(x)
        x = x + temp
        x = self.conv3(x)
        x = self.subpixelconv1(x)
        x = self.conv4(x)
        x = self.subpixelconv2(x)
        x = self.output(x)

        return x

# Training the model

In [None]:
from tqdm import tqdm
from tensorlayerx.model import TrainOneStep
from tensorlayerx.nn import Module
from tensorlayerx.losses import mean_squared_error

tlx.set_device('GPU')

In [None]:
# Hyperparameters
batch_size = 16
epoch_total = 300
decay = tlx.optimizers.lr.StepDecay(
    learning_rate = 1e-4,
    step_size = 200,
    gamma = 1e-1,
    last_epoch = -1,
    verbose = True
)
optimizer = tlx.optimizers.Adam(decay, 5e-2)

generator_model = Generator()
generator_model.init_build(tlx.nn.Input(shape=(batch_size, 3, 56, 56)))
g_weights = generator_model.trainable_weights

In [None]:
class NetWithLoss_init(Module):
    def __init__(self, generator_model, loss_fn):
        super(NetWithLoss_init, self).__init__()
        self.net = generator_model
        self.loss_fn = loss_fn

    def forward(self, lr, hr):
        out = self.net(lr)
        loss = self.loss_fn(out, hr)
        return loss

In [None]:
G_with_loss = NetWithLoss_init(generator_model = generator_model, loss_fn = mean_squared_error)
trainer = TrainOneStep(
    G_with_loss,
    optimizer = optimizer,
    train_weights = g_weights
)

In [None]:
# Evaluation record for Generator training
progress_train_init_G_epoch = []

progress_train_init_G_loss = []
progress_train_init_G_ssim = []
progress_train_init_G_psnr = []

progress_val_init_G_loss = []
progress_val_init_G_ssim = []
progress_val_init_G_psnr = []

for epoch in range(epoch_total):
    print(f"Epoch {epoch + 1} / {epoch_total}", end = " ")

    generator_model.set_train()

    train_loss, train_ssim, train_psnr, train_cer, train_wer = [], [], [], [], []

    for step, (lr_patch, hr_patch) in enumerate(tqdm(train_dataset)):
        # Loss
        loss = trainer(lr_patch, hr_patch)

        train_loss.append(loss)

        # Metrics
        metrics = MetricManager(hr_patch, generator_model(lr_patch))
        metrics.ssim().psnr()

        train_ssim.append(float(metrics.ssim_scores))
        train_psnr.append(float(metrics.psnr_scores))

    train_loss = hard_round(numpy.mean(train_loss), 4)
    train_ssim = hard_round(numpy.mean(train_ssim), 4)
    train_psnr = hard_round(numpy.mean(train_psnr), 4)

    progress_train_init_G_epoch.append(epoch + 1)
    progress_train_init_G_loss.append(train_loss)
    progress_train_init_G_ssim.append(train_ssim)
    progress_train_init_G_psnr.append(train_psnr)

    generator_model.set_eval()
    val_loss, val_ssim, val_psnr, val_cer, val_wer = [], [], [], [], []

    for step, (lr_patch, hr_patch) in enumerate(val_dataset):
        loss = G_with_loss(lr_patch, hr_patch)

        val_loss.append(loss)

        # Metrics
        metrics = MetricManager(hr_patch, generator_model(lr_patch))
        metrics.ssim().psnr()

        val_ssim.append(float(metrics.ssim_scores))
        val_psnr.append(float(metrics.psnr_scores))

    val_loss = hard_round(numpy.mean(val_loss), 4)
    val_ssim = hard_round(numpy.mean(val_ssim), 4)
    val_psnr = hard_round(numpy.mean(val_psnr), 4)

    progress_val_init_G_loss.append(val_loss)
    progress_val_init_G_ssim.append(val_ssim)
    progress_val_init_G_psnr.append(val_psnr)

    train_progress_text = f"Epoch [{epoch+1} / {epoch_total}] - train loss: {train_loss} - train metrics [ssim | pnsr]: [{train_ssim} | {train_psnr}]"
    val_progress_text = f" - val loss: {val_loss} - val metrics [ssim | pnsr]: [{val_ssim} | {val_psnr}]"

    print(train_progress_text)
    print(f"\t\t\t {val_progress_text}")
    print("\n\n")

    decay.step()

    write_log(f"{train_progress_text} {val_progress_text}", "INFO", "INIT TRAINING")

    generator_model.save_weights(os.path.join(checkpoint_path, f'g_init_{epoch + 1}.npz'), format='npz_dict')

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(20, 15))

# Plot training loss
plt.subplot2grid((3, 2), (0, 0))
plt.plot(progress_train_init_G_epoch, progress_train_init_G_loss, label='Training MSE Loss', marker='o')
plt.plot(progress_train_init_G_epoch, progress_val_init_G_loss, label='Validation MSE Loss', marker='o')
plt.title('Generator Initial Training Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE')
plt.legend()

# Plot training SSIM
plt.subplot2grid((3, 2), (0, 1))
plt.plot(progress_train_init_G_epoch, progress_train_init_G_ssim, label='Training SSIM', marker='o')
plt.plot(progress_train_init_G_epoch, progress_val_init_G_ssim, label='Validation SSIM', marker='o')
plt.title('Generator Initial Training SSIM Score')
plt.xlabel('Epoch')
plt.ylabel('SSIM')
plt.legend()

# Plot training PSNR
plt.subplot2grid((3, 2), (1, 0))
plt.plot(progress_train_init_G_epoch, progress_train_init_G_psnr, label='Training PSNR', marker='o')
plt.plot(progress_train_init_G_epoch, progress_val_init_G_psnr, label='Validation PSNR', marker='o')
plt.title('Generator Initial Training PSNR Score')
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.legend()

plt.tight_layout()
plt.show()