# Image Compression for Stylized Images

In [None]:
%pip install -q imageio pydot tensorflow-gpu==2.9.1 keras matplotlib graphviz moviepy scikit-image keras keras-tuner matplotlib kiwisolver scikit-learn tensorflow-io


In [None]:
import os
import keras_tuner as kt
from numpy import asarray
from IPython.display import Image
import numpy as np
import matplotlib.pyplot as plt
import pathlib
import os.path
import math
import imageio.v2 as imageio
import glob
from skimage.color import lab2rgb
import PIL
import PIL.Image


In [None]:

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import mixed_precision
import tensorflow_io as tfio

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    mixed_precision.set_global_policy('mixed_float16')
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)


In [None]:
print("TensorFlow version:", tf.__version__)


## Load Training Data

#

In [None]:
def load_from_directory(data_dir, image_size, batch_size, train_size=0.8):
    def configure_for_performance(ds, AUTOTUNE, shuffleSize):
        #ds = ds.cache(filename='/tmp/stylizedAECache', name='stylizedAECache')
        #ds = ds.cache()
        if shuffleSize > 0:
            ds = ds.shuffle(buffer_size=shuffleSize, reshuffle_each_iteration=False)
        ds = ds.prefetch(buffer_size=AUTOTUNE)

        return ds

    data_train_dir = pathlib.Path(data_dir)
    train_ds = tf.keras.utils.image_dataset_from_directory(
        data_train_dir,
        interpolation='bilinear',
        color_mode='rgb',
        label_mode=None,
        follow_links=True,
        shuffle=False,
        image_size=image_size,
        batch_size=batch_size)
    #
    AUTOTUNE = tf.data.AUTOTUNE
    normalization_layer = tf.keras.layers.Rescaling(1.0 / 255.0)

    @tf.function
    def preprocess_lab2(img):
        image = tf.cast(img, tf.float16)
        lab = tfio.experimental.color.rgb_to_lab(image)

        return lab

    nrBatches = len(train_ds)

    train_ds = configure_for_performance( train_ds, AUTOTUNE, 0)

    # Translate [0,255] -> [-128, 128]
    normalized_train_ds = (train_ds.map(lambda x: preprocess_lab2(normalization_layer(x)) * (1.0 / 128.0)))
    normalized_expected_ds = (train_ds.map(lambda x: preprocess_lab2(normalization_layer(x)) * (1.0 / 128.0)))

    # Combined train and expected data.
    normalized_train_ds = tf.data.Dataset.zip((normalized_train_ds, normalized_expected_ds))
    
    final_normalized_train_ds = normalized_train_ds.take(int(train_size * nrBatches))
    offset_skip =int(train_size * nrBatches)
    validation_nr_batchs = int((1.0 - train_size) * nrBatches) 
    normalized_test_ds = normalized_train_ds.skip(offset_skip).take(validation_nr_batchs )

    return final_normalized_train_ds, normalized_test_ds


In [None]:
BATCH_SIZE = 8
IMAGE_SIZE = (256, 256)
EPOCHS = 24

data_directory_path = "data-stylized-compression/"
data_dir = pathlib.Path(data_directory_path)

train_images, validation_images = load_from_directory(data_dir, IMAGE_SIZE, BATCH_SIZE, 0.8)

print("Number of batches {0} of {1} elements".format(
    len(train_images), BATCH_SIZE))


# Example Data

In [None]:
image_batch, expected_batch = next(iter(train_images))


nrCol = 10
plt.figure(figsize=(10 * 2, 12))
for i in range(0, nrCol):

    trainImage, expected = (image_batch[i % len(image_batch)], expected_batch[i % len(expected_batch)])  # (images + 1.0) / 2.0
    trainImage = trainImage * 128.0
    expected = expected * 128
    # Transform pixel values from [-1,1] to [0,1]
    LabComponet = (trainImage + 1.0) / 2.0

    ax = plt.subplot(4, nrCol, nrCol * 0 + i + 1)
    plt.imshow(LabComponet[ :, :, 0].numpy().astype(dtype='float32'), cmap='gray')
    plt.axis("off")

    ax = plt.subplot(4, nrCol, nrCol * 1 + i + 1)
    plt.imshow(LabComponet[ :, :, 1].numpy().astype(dtype='float32'), cmap='Blues')
    plt.axis("off")

    ax = plt.subplot(4, nrCol, nrCol * 2 + i + 1)
    plt.imshow(LabComponet[ :, :, 2].numpy().astype(dtype='float32'), cmap='Greens')
    plt.axis("off")

    ax = plt.subplot(4, nrCol, nrCol * 3 + i + 1)
    plt.imshow(lab2rgb(expected.numpy().astype(dtype='float32')))
    plt.axis("off")

    if len(image_batch)- 1 == i:
        image_batch, expected_batch = next(iter(train_images))
plt.subplots_adjust(wspace=0.025, hspace=0.025)
plt.show()


# Create Model

In [None]:
def generate_cnn_builder(hp, image_size):

    class Autoencoder(keras.Model):
        def __init__(self, hp, image_size, **kwargs):
            super(Autoencoder, self).__init__(**kwargs)

            kernel_init = hp.Choice('kernel_initializer', ['uniform', 'lecun_uniform', 'normal',
                                                           'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform'])

            hp_kernel_filter_size_l1 = hp.Int('kernel_filter', min_value=5, max_value=8, step=1)
            hp_kernel_size = hp.Int('kernel_size', min_value=3, max_value=4, step=1)
            num_downscale = hp.Int('num_downscale', min_value=1, max_value=1, step=1)

            number_layers = hp.Int(
                'number_layers', min_value=2, max_value=3, step=1)

            self.encoder = tf.keras.Sequential(name="Encoder")
            self.encoder.add(layers.Input(image_size))

            encoder_last_conv2 = None
            for i in range(0, number_layers + 1):
                filter_size = 2 ** (hp_kernel_filter_size_l1 - i)

                for j in range(0, num_downscale + 1):
                    self.encoder.add(layers.Conv2D(filters=filter_size, kernel_size=hp_kernel_size,
                                                   padding='same', strides=(2, 2), kernel_initializer=kernel_init))
                    self.encoder.add(layers.ReLU(dtype='float32'))

                encoder_last_conv2 = layers.Conv2D(filters=filter_size, kernel_size=hp_kernel_size, strides=(1, 1), padding='same', kernel_initializer=kernel_init)
                self.encoder.add(encoder_last_conv2)
                self.encoder.add(layers.ReLU(dtype='float32'))

            # Flatten to connect tot forward neuron.
            self.encoder.add(layers.Flatten())

            # Create decoder.
            self.decoder = tf.keras.Sequential(name="Decoder")

            connect_conv_shape = encoder_last_conv2.output_shape

            self.decoder.add(layers.Reshape(target_shape=(
                connect_conv_shape[1], connect_conv_shape[2], connect_conv_shape[3])))

            for i in range(0, number_layers + 1):
                filter_size = 2 ** ((hp_kernel_filter_size_l1 - number_layers) + i)

                self.decoder.add(layers.Conv2D(filters=filter_size, kernel_size=hp_kernel_size, strides=(1, 1), padding='same', kernel_initializer=kernel_init))
                self.decoder.add(layers.ReLU(dtype='float32'))

                for j in range(0, num_downscale + 1):
                    self.decoder.add(layers.Conv2D(filters=filter_size, kernel_size=hp_kernel_size,
                                                            padding='same', strides=(1, 1), kernel_initializer=kernel_init))
                    self.decoder.add(layers.ReLU(dtype='float32'))
                    self.decoder.add(layers.UpSampling2D(size=(2,2)))



            self.decoder.add(layers.Conv2D(filters=3, kernel_size=(3, 3), padding='same', kernel_initializer=kernel_init))
            self.decoder.add(layers.Activation(activation='tanh', dtype='float32'))



        def load_weights(self, filepath, by_name=False, skip_mismatch=False, options=None):
            self.encoded.load_weights(filepath+'encode', by_name, skip_mismatch, options)
            self.decoded.load_weights(filepath+'decode', by_name, skip_mismatch, options)

        def save_weights(self, filepath, overwrite=True, save_format=None, options=None):
            self.encoded.save_weights(filepath+'encode', overwrite, save_format, options)
            self.decoded.save_weights(filepath+'decode', overwrite, save_format, options)

        def getEncoder(self):
            return self.encoder

        def getLatentSpace(self):
            return self.latentspace

        def getDecoder(self):
            return self.decoder

        def compile(self, optimizer, **kwargs):
            super(Autoencoder, self).compile(**kwargs)
            self.optimizer = optimizer

        def summary(self, **kwargs):
            super(Autoencoder, self).summary(**kwargs)
            self.encoder.summary()
            self.decode.summary()

    autoencoder = Autoencoder(hp, image_size)

    hp_optimizer = hp.Choice('optimizer', ['sgd', 'adam', 'adadelta'])

    hp_learning_rate = hp.Choice('learning_rate', values=[2e-3, 2e-4, 5e-4])
    optimizer = tf.keras.optimizers.get(hp_optimizer)
    optimizer.learning_rate = hp_learning_rate

    ae_input = layers.Input(shape=image_size, name="AE_input")
    ae_encoder_output = autoencoder.encoder(ae_input)
    ae_decoder_output = autoencoder.decoder(ae_encoder_output)

    conv_autoencoder = keras.Model(inputs=ae_input, outputs=ae_decoder_output)

    conv_autoencoder.compile(optimizer=optimizer, loss='mse', metrics=['accuracy'])
    # Present the model.
    conv_autoencoder.summary()
    conv_autoencoder.layers[1].summary()
    conv_autoencoder.layers[2].summary()

    return conv_autoencoder


## Hyperparameter

In [None]:

image_shape = (IMAGE_SIZE[0], IMAGE_SIZE[1], 3)
print('Train Image Size: ' + str(image_shape))


def hyperparamter_model_builder(hp):
    model = generate_cnn_builder(hp, image_shape)
    return model


tuner = kt.Hyperband(hyperparamter_model_builder,
                     objective='val_accuracy',
                     max_epochs=20,
                     factor=4,
                     directory='cache',
                     project_name=str.format('Stylized AutoEncoder - {0}', "Compression"))

stop_early = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=2)

tuner.search(train_images, epochs=EPOCHS,
             validation_data=validation_images,
             callbacks=[stop_early, tf.keras.callbacks.TerminateOnNaN()], verbose=1)


best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]
autoencoder_model = tuner.hypermodel.build(best_hps)
autoencoder_model.summary()
autoencoder_model.layers[1].summary()
autoencoder_model.layers[2].summary()


In [None]:
def showResult(model, batchImage, maxNumImages=6):
    output = model.predict(batchImage)
    output = output * 128
    nrElements = min(len(output), maxNumImages)
    image_batch, _ = next(iter(train_images))

    fig = plt.figure(figsize=(maxNumImages * 2, 5 * 2))
    for i in range(nrElements):

        ax = plt.subplot(5, maxNumImages, i + 1)
        plt.imshow((asarray(lab2rgb(image_batch[i] * 128)).astype(dtype='float32')))
        plt.axis("off")
        
        ax = plt.subplot(5, maxNumImages, maxNumImages * 1 + i + 1)
        plt.imshow(output[i, :, :, 0]  , cmap='gray')
        plt.axis("off")

        ax = plt.subplot(5, maxNumImages, maxNumImages * 2 + i + 1)
        plt.imshow(output[i, :, :, 1], cmap='Blues')
        plt.axis("off")

        ax = plt.subplot(5, maxNumImages, maxNumImages * 3 + i + 1)
        plt.imshow(output[i, :, :, 2], cmap='Greens')
        plt.axis("off")

        ax = plt.subplot(5, maxNumImages, maxNumImages * 4 + 1 + i)
        plt.imshow(asarray(lab2rgb(output[i])).astype(dtype='float32'))
        plt.axis("off")

        if len(image_batch)- 1 == i:
            image_batch, expected_batch = next(iter(train_images))

    fig.subplots_adjust(wspace=0.05, hspace=0.05)
    plt.close()
    return fig


class save_images(tf.keras.callbacks.Callback):

    def __init__(self, trainData, **kwargs):
        super(tf.keras.callbacks.Callback, self).__init__(**kwargs)
        self.trainSet = trainData.take(1)

    def on_epoch_end(self, epoch, logs=None):
        fig = showResult(self.model, self.trainSet)
        fig.savefig("StylizedCompression{0}.png".format(epoch))
        fig = showResult(self.model, self.trainSet, 12)
        fig.savefig("StylizedCompressionBig{0}.png".format(epoch))


In [None]:

checkpoint_path = "checkpoints/training_stylized_compression/cp.ckpt"

# Create a callback that saves the model's weights
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=0)

checkpoint = tf.train.Checkpoint(model=autoencoder_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_path)).expect_partial()


# The model weights (that are considered the best) are loaded into the model.
if os.path.exists(checkpoint_path):
    autoencoder_model.load_weights(checkpoint_path)
    
autoencoder_history = autoencoder_model.fit(train_images,
                      epochs=EPOCHS,
                      shuffle=True,
                      validation_data=validation_images, callbacks=[cp_callback, save_images(train_images)])
autoencoder_model.save_weights(checkpoint_path)

In [None]:
autoencoder_model.save("stylized-ae-compression.h5")


## Result
The result is both in how good the model reconstruct as well how much of a compression ratio it would yield.

In [None]:
# Compute evolution
result = autoencoder_model.evaluate(validation_images, batch_size=BATCH_SIZE)
print(result)


latent_space_size = autoencoder_model.layers[1].output_shape[1]
print("Latent Space {0}", latent_space_size)

In [None]:
def plotCostHistory(history, loss_label="", val_label="", title="", x_label="", y_label=""):
    fig = plt.figure()
    for k, v in history.items():
        plt.plot(v, label=k)
    plt.title(label=title)
    plt.ylabel(ylabel=y_label)
    plt.xlabel(xlabel=x_label)
    plt.legend(loc="upper left")
    fig.show()
    fig.savefig(title + ".png")


plotCostHistory(autoencoder_history.history, title=str.format(
    "{0} Performance History", "Stylized Colorization"))



In [None]:

#plt.figure(1,1)
fig, ax = plt.subplots(figsize=(18, 5))
ax.axis("off")
ax.imshow(plt.imread("StylizedCompressionBig{0}.png".format(EPOCHS - 1)))
plt.title('Final Result', 
                                     fontweight ="bold")
fig.show()

## Compression Ratio
Since the purpose of dimmension reduction is for creating a lossy compression that hopefully has a low size than either jpg/png. In respect to the level lossy.

## Episode Improvement Transition

In [None]:
anim_file = 'stylized-compression.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
    filenames = glob.glob('StylizedCompressionBig*.png')
    filenames = sorted(filenames)
    for filename in filenames:
        image = imageio.imread(filename)
        writer.append_data(image)
    image = imageio.imread(filename)
    writer.append_data(image)

Image(url=anim_file)


In [None]:
def interpolate(zs, steps):
    out = []
    for i in range(len(zs)-1):
        for index in range(steps):
            fraction = index/float(steps)
            out.append(zs[i+1]*fraction + zs[i]*(1-fraction))
    return out


## Present LatentSpace

In [None]:
import scipy
import moviepy.editor

In [None]:
def generate_transition(model, latent_space, grid_size):
      nr_elemenets = 4
      seeds = np.random.randint(10000, size=2)
      zs = [tf.random.normal([nr_elemenets, latent_space_size], seed=s)
            for s in seeds]
      
      all_latents = interpolate(zs, nr_elemenets)

      generated_images = model(latent_space, training=False)

      fig = plt.figure(figsize=(grid_size[0] * 2, grid_size[1] * 2))
      for i in range(generated_images.shape[0]):
            plt.subplot(grid_size[0], grid_size[1], i+1,)

            plt.imshow(((generated_images[i, :, :, :] + 1.0) / 2.0), aspect='auto')
            plt.axis("off")
      plt.subplots_adjust(wspace=0, hspace=0)
      plt.close()



In [None]:
def generate_grid_image(model, latent_space, figsize=(8, 8), subplotsize=(3, 3)):
    # Notice `training` is set to False.
    # This is so all layers run in inference mode (batchnorm).
    predictions = model(latent_space, training=False)

    fig = plt.figure(figsize=figsize)
    for i in range(predictions.shape[0]):
        plt.subplot(subplotsize[0], subplotsize[1], i + 1)
        rgb = (predictions[i, :, :, 0:3] + 1.0) / 2.0
        plt.imshow(asarray(rgb))
        plt.axis('off')
    plt.close()
    return fig

In [None]:

fps = 15
duration_sec = 5
smoothing_sec = 1.0
num_frames = 4 * fps

shape = [num_frames, np.prod(latent_space_size)]

nr_elemenets = 16

seeds = np.random.randint(10000, size=2)
zs = [tf.random.normal([nr_elemenets, latent_space_size], seed=s)
      for s in seeds]

all_latents = interpolate(zs, num_frames)


def make_frame(t):
    frame_idx = int(np.clip(np.round(t * fps), 0, num_frames - 1))
    latents = all_latents[frame_idx]

    # Generate figure in respect new latent pace 
    fig = generate_grid_image(autoencoder_model.layers[2], latents, (5, 5), (4, 4))

    # Convert figure to bitmap.
    fig.canvas.draw()
    data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))

    return data


gif_filepath = ('autoencoder_stylized_compression_transition_grid_{0}.gif'.format(seeds[0]))
video_clip = moviepy.editor.VideoClip(make_frame, duration=duration_sec)
video_clip.write_gif(gif_filepath, fps=fps)


In [None]:
Image(url=gif_filepath)
