# Autoencoder

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


params.py

In [None]:
LOCAL_REGISTRY_PATH='/content/gdrive/MyDrive/Bootcamp_ENAP_2022/new_output/model'
LOCAL_DATA_PATH_OUTPUT_IMG='/content/gdrive/MyDrive/Bootcamp_ENAP_2022/new_output/processed_img'

AUTOENCODER_WIDTH=160
AUTOENCODER_HEIGHT=192
AUTOENCODER_LEARNING_RATE=0.0005
AUTOENCODER_LOSS_FACTOR=10000
AUTOENCODER_PATIENCE=20
AUTOENCODER_VALIDATION_SPLIT=0.3
AUTOENCODER_BATCHSIZE=256
AUTOENCODER_LATENT_DIMENSION=500
AUTOENCODER_N_EPOCHS=400
DATA_SOURCE='local'
CHUNK_SIZE=10000

registry.py

In [None]:
import time
import glob
import pickle
import os
from tensorflow.keras import Model, models

def load_autoencoder(elected: bool, bw: bool, custom_objects: dict, save_copy_locally=False) -> Model:
    """
    load the latest saved autoencoder, return None if no autoencoder found
    """

    print("\nLoad autoencoder from local disk...")

   # get latest model version
    autoencoder_directory = os.path.join(LOCAL_REGISTRY_PATH,
        'bw' if bw else 'color',
        'elected' if elected else 'not_elected',
        'models', 'autoencoder')

    results = glob.glob(f"{autoencoder_directory}/*")
    if not results:
        return None

    autoencoder_path = sorted(results)[-1]
    print(f"- path: {autoencoder_path}")

    autoencoder = models.load_model(autoencoder_path, custom_objects=custom_objects)
    print("\n✅ autoencoder loaded from disk")

    return autoencoder

def save_autoencoder(autoencoder: Model = None,
               params: dict = None,
               metrics: dict = None,
               elected: bool = None,
               bw: bool = None) -> None:

    """
    persist trained autoencoder, params and metrics
    """

    timestamp = time.strftime("%Y%m%d-%H%M%S")

    print("\nSave autoencoder to local disk...")

    # save params
    if params is not None:
        params_path = os.path.join(LOCAL_REGISTRY_PATH,
        'bw' if bw else 'color',
        'elected' if elected else 'not_elected',
        'params', 'autoencoder')

        if not os.path.exists(params_path):
            os.makedirs(params_path)

        print(f"- params path: {params_path}")
        with open(os.path.join(params_path,timestamp + ".pickle"), "wb") as file:
            pickle.dump(params, file)

    # save metrics
    if metrics is not None:
        metrics_path = os.path.join(LOCAL_REGISTRY_PATH,
        'bw' if bw else 'color',
        'elected' if elected else 'not_elected',
        'metrics', 'autoencoder')

        if not os.path.exists(metrics_path):
            os.makedirs(metrics_path)

        print(f"- metrics path: {metrics_path}")
        with open(os.path.join(metrics_path,timestamp + ".pickle"), "wb") as file:
            pickle.dump(metrics, file)

    # save autoencoder
    if autoencoder is not None:
        autoencoder_path = os.path.join(LOCAL_REGISTRY_PATH,
        'bw' if bw else 'color',
        'elected' if elected else 'not_elected',
        'models', 'autoencoder')

        if not os.path.exists(autoencoder_path):
            os.makedirs(autoencoder_path)

        print(f"- autoencoder path: {autoencoder_path}")
        autoencoder.save(os.path.join(autoencoder_path, timestamp + ".model"))

    print("\n✅ data saved locally")

    return None


model.py

In [None]:
from typing import Tuple
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense, Reshape, Conv2DTranspose, BatchNormalization, Dropout, Lambda, Masking
from tensorflow.keras import Model, backend as K
from tensorflow.keras.layers.experimental.preprocessing import Rescaling
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

def r_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred), axis = [1,2,3])

def kl_loss(y_true, y_pred):
    kl_loss =  -0.5 * K.sum(1 + log_var - K.square(mean_mu) - K.exp(log_var), axis = 1)
    return kl_loss

def total_loss(y_true, y_pred):
    return AUTOENCODER_LOSS_FACTOR*r_loss(y_true, y_pred) + kl_loss(y_true, y_pred)

def build_prefix_encoder(input_shape, use_batch_norm = False, use_dropout = False):
    global K
    K.clear_session()

    '''returns an encoder model, of output_shape equals to latent_dimension'''
    encoder = Sequential()

    encoder.add(Masking(mask_value=255, input_shape=input_shape))
    encoder.add(Rescaling(1./255))

    encoder.add(Conv2D(32, (3,3), (2, 2), padding='same', activation='LeakyReLU'))
    if use_batch_norm: encoder.add(BatchNormalization())
    if use_dropout: encoder.add(Dropout(0.25))

    encoder.add(Conv2D(64, (3,3), (2, 2), padding='same', activation='LeakyReLU'))
    if use_batch_norm: encoder.add(BatchNormalization())
    if use_dropout: encoder.add(Dropout(0.25))

    encoder.add(Conv2D(64, (3,3), (2, 2), padding='same', activation='LeakyReLU'))
    if use_batch_norm: encoder.add(BatchNormalization())
    if use_dropout: encoder.add(Dropout(0.25))

    encoder.add(Conv2D(64, (3,3), (2, 2), padding='same', activation='LeakyReLU'))
    if use_batch_norm: encoder.add(BatchNormalization())
    if use_dropout: encoder.add(Dropout(0.25))

    shape_before_flattening = K.int_shape(encoder.layers[-1].output)[1:]

    encoder.add(Flatten())

    return encoder, shape_before_flattening

def build_suffix_encoder(prefix_model, latent_dimension):
  prefix_model_input = prefix_model.layers[0].input
  prefix_model_output = prefix_model.layers[-1].output

  mean_mu = Dense(latent_dimension)(prefix_model_output)
  log_var = Dense(latent_dimension)(prefix_model_output)

  # Defining a function for sampling
  def sampling(args):
    global mean_mu
    global log_var
    mean_mu, log_var = args
    epsilon = K.random_normal(shape=K.shape(mean_mu), mean=0., stddev=1.)
    return mean_mu + K.exp(log_var/2)*epsilon

  concatenate = Lambda(sampling)([mean_mu, log_var])
  return Model(prefix_model_input, concatenate)

def build_encoder(input_shape, use_batch_norm = False, use_dropout = False, latent_dimension=200):
  prefix_encoder, shape_before_flattening = build_prefix_encoder(input_shape, use_batch_norm, use_dropout)
  encoder = build_suffix_encoder(prefix_encoder, latent_dimension)
  return encoder, shape_before_flattening

def build_decoder(latent_dimension, shape_before_flattening):
  decoder = Sequential()
  decoder.add(Dense(np.prod(shape_before_flattening), input_shape=(latent_dimension, ), activation=None))
  decoder.add(Reshape(shape_before_flattening))
  decoder.add(Conv2DTranspose(64, kernel_size=(3,3), strides=(2,2), padding='same', activation='LeakyReLU'))
  decoder.add(Conv2DTranspose(64, kernel_size=(3,3), strides=(2,2), padding='same', activation='LeakyReLU'))
  decoder.add(Conv2DTranspose(32, kernel_size=(3,3), strides=(2,2), padding='same', activation='LeakyReLU'))
  decoder.add(Conv2DTranspose(3, kernel_size=(3,3), strides=(2,2), padding='same', activation='sigmoid'))
  return decoder

def build_autoencoder(input_shape, latent_dimension):
  encoder, shape_before_flattening = build_encoder(input_shape, True, True, latent_dimension)
  decoder = build_decoder(latent_dimension, shape_before_flattening)
  autoencoder = Model(encoder.layers[0].input, decoder(encoder.layers[-1].output))
  return autoencoder, encoder, decoder

def initialize_autoencoder(X: np.ndarray, latent_dimension: int) -> Model:
    """
    Initialize the Neural Network with random weights
    """
    autoencoder, _, _ = build_autoencoder(X.shape[1:], latent_dimension)
    print("\n✅ autoencoder initialized")

    return autoencoder

def compile_autoencoder(autoencoder: Model) -> Model:
    """
    Compile the Neural Network
    """

    adam_optimizer = Adam(learning_rate = AUTOENCODER_LEARNING_RATE)

    autoencoder.compile(optimizer=adam_optimizer, loss = total_loss, metrics = [r_loss, kl_loss])

    print("\n✅ autoencoder compiled")
    return autoencoder

def train_autoencoder(autoencoder: Model,
                X: np.ndarray,
                y: np.ndarray) -> Tuple[Model, dict]:
    """
    Fit autoencoder and return a the tuple (fitted_autoencoder, history)
    """

    print("\nTrain autoencoder...")

    es = EarlyStopping(patience=AUTOENCODER_PATIENCE, restore_best_weights=True, monitor='val_kl_loss')

    history = autoencoder.fit(X,
                        y,
                        validation_split=AUTOENCODER_VALIDATION_SPLIT,
                        epochs=AUTOENCODER_N_EPOCHS,
                        batch_size=AUTOENCODER_BATCHSIZE,
                        callbacks=[es],
                        verbose=1,
                        shuffle=True)

    print(f"\n✅ autoencoder trained ({len(X)} rows)")

    return autoencoder, history

def evaluate_autoencoder(autoencoder: Model,
                   X: np.ndarray,
                   y: np.ndarray) -> Tuple[Model, dict]:
    """
    Evaluate trained autoencoder performance on dataset
    """

    print(f"\nEvaluate autoencoder on {len(X)} rows...")

    if autoencoder is None:
        print(f"\n❌ no autoencoder to evaluate")
        return None

    metrics = autoencoder.evaluate(
        x=X,
        y=y,
        batch_size=AUTOENCODER_BATCHSIZE,
        verbose=1,
        # callbacks=None,
        return_dict=True)

    r_loss = metrics["r_loss"]
    kl_loss = metrics["kl_loss"]

    print(f"\n✅ autoencoder evaluated: loss {round(r_loss, 2)} mae {round(kl_loss, 2)}")

    return metrics


main.py

In [None]:
import time
import numpy as np
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.keras import Model
from tensorflow.errors import NotFoundError

def train():
    partial_train_autoencoder(True, False)
    partial_train_autoencoder(False, False)
    partial_train_autoencoder(True, True)
    partial_train_autoencoder(False, True)

def partial_train_autoencoder(elected=True, bw=True):
    """
    Train a new model on the full (already preprocessed) dataset ITERATIVELY, by loading it
    chunk-by-chunk, and updating the weight of the model after each chunks.
    Save final model once it has seen all data, and compute validation metrics on a holdout validation set
    common to all chunks.
    """
    print("\n⭐️ use case: fit autoencoder")

    print("\nLoading preprocessed data...")

    folder = os.path.join(
        LOCAL_DATA_PATH_OUTPUT_IMG,
        'elected' if elected else 'not_elected',
        'bw' if bw else 'color')

    normalized_images_dataset = None
    # load a train set
    try:
        images_dataset = image_dataset_from_directory(folder,
                                                      label_mode=None,
                                                      batch_size=CHUNK_SIZE,
                                                      image_size=(AUTOENCODER_HEIGHT,AUTOENCODER_WIDTH),
                                                      shuffle=True,
                                                      crop_to_aspect_ratio=True)
    except NotFoundError:
        print("\n✅ no data to train")
        return None

    autoencoder = None
    #autoencoder = load_autoencoder(elected, bw, {'r_loss': r_loss, 'kl_loss': kl_loss, 'total_loss': total_loss})  # production model

    # iterate on the full dataset per chunks
    chunk_id = 0
    row_count = 0
    metrics_r_loss_list = []
    metrics_kl_loss_list = []

    for image_batch in images_dataset:

        print(f"\n✅ Loading and training on preprocessed chunk n°{chunk_id}...")

        # increment trained row count
        chunk_row_count = image_batch.shape[0]
        row_count += chunk_row_count

        # initialize autoencoder
        if autoencoder is None:
            autoencoder = initialize_autoencoder(image_batch, AUTOENCODER_LATENT_DIMENSION)

        # (re)compile and train the model incrementally
        autoencoder = compile_autoencoder(autoencoder)
        autoencoder, history = train_autoencoder(autoencoder,
                                     image_batch,
                                     image_batch/255)

        metrics_r_loss = np.min(history.history['r_loss'])
        metrics_kl_loss = np.min(history.history['kl_loss'])
        metrics_r_loss_list.append(metrics_r_loss)
        metrics_kl_loss_list.append(metrics_kl_loss)
        print(f"chunk r_loss, kl_loss: {round(metrics_r_loss,2)}, {round(metrics_kl_loss,2)}")

        chunk_id += 1

    if row_count == 0:
        print("\n✅ no new data for the training 👌")
        return

    # return the last value of the validation MAE
    val_r_loss, val_kl_loss = metrics_r_loss_list[-1], metrics_kl_loss_list[-1]

    print(f"\n✅ trained on {row_count} rows with r_loss, kl_loss: {round(val_r_loss, 2)}, {round(val_kl_loss, 2)}")

    params = dict(
        # model parameters
        learning_rate=AUTOENCODER_LEARNING_RATE,
        batch_size=AUTOENCODER_BATCHSIZE,
        patience=AUTOENCODER_PATIENCE,
        # package behavior
        context="train",
        chunk_size=CHUNK_SIZE,
        # data source
        training_set_size=round(row_count*(1-AUTOENCODER_VALIDATION_SPLIT)),
        val_set_size=round(row_count*AUTOENCODER_VALIDATION_SPLIT),
        row_count=row_count,
        model_version=None,
        dataset_timestamp=time.strftime("%Y%m%d-%H%M%S"),
    )

    # save autoencoder
    save_autoencoder(autoencoder=autoencoder, params=params, metrics=dict(r_loss= val_r_loss, kl_loss=val_kl_loss), elected=elected, bw=bw)

    return val_r_loss, val_kl_loss

def pred(autoencoder: Model, X_pred: np.ndarray = None) -> np.ndarray:
    """
    Make a prediction using the latest trained model
    """

    print("\n⭐️ use case: predict")

    if X_pred is None:
        decoder = Model(autoencoder.layers[-1].input, autoencoder.layers[-1].output)
        y_pred = decoder.predict(np.random.normal(0,1,size=(1,autoencoder.layers[-2].output.shape[1])))
        return y_pred

    y_pred = autoencoder.predict(X_pred)

    print("\n✅ prediction done: ", y_pred.shape)

    return y_pred

In [None]:
#partial_train_autoencoder(elected=True, bw=False)
train()

KeyboardInterrupt: ignored