In [None]:
import keras
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
import tensorflow_datasets as tfds
import pickle
from collections import namedtuple
from ctypes import ArgumentError
from dataclasses import dataclass
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.layers import RandomZoom, RandomFlip, RandomRotation, Input, Dense, Dropout, GlobalAveragePooling2D, Flatten
from keras.models import Model
from keras.optimizers import RMSprop
from textwrap import dedent
from tqdm import tqdm
from keras.callbacks import (
    ModelCheckpoint,
    EarlyStopping,
    Callback,
)

keras.mixed_precision.set_global_policy("mixed_float16")

SEED = 42

def set_global_determinism():
    os.environ['PYTHONHASHSEED'] = str(SEED)
    tf.random.set_seed(SEED)
    np.random.seed(SEED)

    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'

    tf.config.threading.set_inter_op_parallelism_threads(1)
    tf.config.threading.set_intra_op_parallelism_threads(1)


set_global_determinism()

In [None]:
@tf.function
def compress_image_with_energy(image, energy_factor=0.9):
    # Returns a compressed image based on a desired energy factor
    image_rescaled = tf.convert_to_tensor(image)
    image_batched = tf.transpose(image_rescaled, [2, 0, 1])
    s, U, V = tf.linalg.svd(image_batched, compute_uv=True, full_matrices=False)

    # Extracting singular values
    props_rgb = tf.map_fn(lambda x: tf.cumsum(x) / tf.reduce_sum(x), s)
    props_rgb_mean = tf.reduce_mean(props_rgb, axis=0)

    # Find closest k that corresponds to the energy factor
    k = tf.argmin(tf.abs(props_rgb_mean - energy_factor)) + 1

    # Compute the low-rank approximation
    s_k, U_k, V_k = s[..., :k], U[..., :, :k], V[..., :, :k]
    A_k = tf.einsum("...s,...us,...vs->...uv", s_k, U_k, V_k)

    return tf.transpose(A_k, [1, 2, 0])

In [None]:
@dataclass
class TrialDetail:
    dataset_name: str
    augmentation_method: str
    energy_factor: float
    percentage_data: float = 1

    def __getitem__(self, idx):
        props = (
            self.dataset_name,
            self.augmentation_method,
            self.percentage_data,
            self.energy_factor,
        )
        return props[idx]

    def __repr__(self):
        energy_factor_repr = (
            f"{self.energy_factor:.2%}"
            if self.augmentation_method in {"all", "svd"}
            else "N/A"
        )
        return dedent(
            f"""
        Model & Dataset Name: {self.dataset_name.upper()}
        Percentage of Data Used: {self.percentage_data:.2%}
        Augmentation Method: {self.augmentation_method.upper()}
        Energy Factor: {energy_factor_repr}
        """
        )

    def file_str(self):
        return f"{self.dataset_name}_{self.augmentation_method}_{self.percentage_data}_{self.energy_factor}"

In [None]:
def get_kfold_callbacks(trial_detail: TrialDetail, fold_num):
    checkpoint_cb = ModelCheckpoint(
            f"{trial_detail.file_str()}_fine_tuning",
            save_best_only=True,
            monitor="val_accuracy",
    )
    kfold_train_details_logger_cb = _KFoldTrainingDetailsLogger(trial_detail, fold_num)

    return [
        checkpoint_cb,
        kfold_train_details_logger_cb,
    ]


def get_retrain_callbacks(trial_detail: TrialDetail, purpose="retrain"):
    checkpoint_cb = ModelCheckpoint(
          f"{trial_detail.file_str()}_{purpose}",
          save_best_only=True,
          monitor="accuracy",
    )
    retrain_train_details_logger_cb = _RetrainDetailsLogger(trial_detail)

    return [
        checkpoint_cb,
        retrain_train_details_logger_cb,
    ]


class _KFoldTrainingDetailsLogger(Callback):
    def __init__(self, trial_detail, fold_num):
        self.trial_detail = trial_detail
        self.fold_num = fold_num + 1

    def on_train_begin(self, logs=None):
        print(
            dedent(
                f"""
            \n\n{'*' * 80}\n\nSTART OF TRAINING - FOLD #{self.fold_num}:\n{self.trial_detail!r}\n\n{'*' * 80}\n\n"""
            )
        )

    def on_train_end(self, logs=None):
        print(
            dedent(
                f"""
            \n\n{'*' * 80}\n\nEND OF TRAINING - FOLD #{self.fold_num}:\n{self.trial_detail!r}\n\n{'*' * 80}\n\n"""
            )
        )


class _RetrainDetailsLogger(Callback):
    def __init__(self, trial_detail):
        self.trial_detail = trial_detail

    def on_train_begin(self, logs=None):
        print(
            dedent(
                f"""
            \n\n{'*' * 80}\n{'*' * 80}\n{'*' * 80}\n\nRETRAINING ON ENTIRE DATASET:\n{self.trial_detail!r}\n\n{'*' * 80}\n{'*' * 80}\n{'*' * 80}\n\n"""
            )
        )

    def on_train_end(self, logs=None):
        print(
            dedent(
                f"""
            \n\n{'*' * 80}\n{'*' * 80}\n{'*' * 80}\n\nEND OF RETRAINING:\n{self.trial_detail!r}\n\n{'*' * 80}\n{'*' * 80}\n{'*' * 80}\n\n"""
            )
        )

In [None]:
class RandomCompression():
    """Utilizes low rank approximation of images using SVD with randomized energy factors"""
    def __init__(
        self,
        max_energy_factor=0.975,
        min_energy_factor=0.95,
        distribution="normal",
        skip_threshold=0.80,
        **kwargs
    ):
        super(RandomCompression, self).__init__(**kwargs)
        self.min_energy_factor = min_energy_factor
        self.max_energy_factor = max_energy_factor
        self.distribution = distribution
        self.skip_threshold = skip_threshold

    def __call__(self, input, training=True):
        if training:
            return self.compressed_input(input)
        else:
            return input

    def compressed_input(self, input):
        def compress_with_distribution(x):
            if self._will_skip():
                return None
            else:
                energy_factor = self._sample_from_distribution()
                return compress_image_with_energy(x, energy_factor)


        compressed_image = compress_with_distribution(input)

        return compressed_image

    def _will_skip(self):
        uncompressed_threshold = tf.random.uniform(shape=(), minval=0.0, maxval=1.0)
        return uncompressed_threshold < self.skip_threshold

    def _sample_from_distribution(self):
        if self.distribution in ["gaussian", "normal"]:
            return tf.random.normal(shape=(), mean=self.max_energy_factor, stddev=0.025)
        elif self.distribution == "uniform":
            return tf.random.uniform(
                shape=(), minval=self.min_energy_factor, maxval=self.max_energy_factor
            )
        else:
            raise ArgumentError(
                "Random Compression layer only supports uniform and gaussian distributions"
            )



def preprocess(image, label):
    TARGET_SIZE = (180, 180)
    image = tf.image.resize(image, TARGET_SIZE)
    image = tf.cast(image, tf.float32)
    return image, label


def extend_ds(ds):
    print(f"\n{'*'*50}Compressing Images{'*'*50}")

    ds = ds.unbatch()
    aug_ds = []

    total = len(list(ds))
    compression = RandomCompression(max_energy_factor=0.975)
    total_compressed_images = 0

    for idx, (image, label) in tqdm(enumerate(ds.map(preprocess)), total=total, ncols=110):
        compressed_image_0 = compression(image)
        if compressed_image_0 is not None:
            aug_ds.append((compressed_image_0, label))
            total_compressed_images += 1


    images = [item[0] for item in aug_ds]
    labels = [item[1] for item in aug_ds]

    # Create a TensorFlow dataset from the lists
    aug_ds = tf.data.Dataset.from_tensor_slices((images, labels)).batch(BATCH_SZ)

    print("Total compressed images:", total_compressed_images)  # Print total compressed images

    print(f"{'*'*46}Finished Compressing Images{'*'*46}\n")

    return aug_ds

In [None]:
BUFFER_SZ = 1024
BATCH_SZ = 32


def load_dataset():
    ds = tfds.load(
        "cats_vs_dogs", split="train", as_supervised=True, shuffle_files=True,
    )

    num_train_samples = int(len(ds) * 0.6)  # 60% Train
    num_val_samples = int(len(ds) * 0.2)  # 20% Validation

    ds = ds.shuffle(BUFFER_SZ).prefetch(tf.data.AUTOTUNE)

    ds_train = ds.take(num_train_samples).cache()
    ds_val = ds.skip(num_train_samples)
    ds_test = ds.skip(num_train_samples + num_val_samples)


    ds_train = ds_train.map(preprocess).batch(BATCH_SZ)
    ds_test = ds_test.map(preprocess).batch(BATCH_SZ)
    ds_val = ds_val.map(preprocess).batch(BATCH_SZ)

    return ds_train, ds_test, ds_val


def split_dataset_kfold(ds_train, k):
    num_train_samples = len(list(ds_train))
    fold_size = num_train_samples // k

    fold_datasets = []
    for fold in range(k):
        start_index = fold * fold_size
        end_index = (fold + 1) * fold_size

        ds_val_fold = (
            ds_train.skip(start_index).take(fold_size)
        )

        ds_train_fold_1 = ds_train.take(start_index)
        ds_train_fold_2 = ds_train.skip(end_index)
        ds_train_fold = ds_train_fold_1.concatenate(ds_train_fold_2)

        fold_datasets.append((ds_train_fold, ds_val_fold))


    return fold_datasets

In [None]:
TrialResult = namedtuple("TrialResult", "histories generalization_performance")

def _save_trial_result(histories, trial_detail):
    dataset_name, augmentation_method, percentage_data, energy_factor = trial_detail

    if augmentation_method not in {"all", "svd"}:
        energy_factor = 0

    output_path = os.path.join(dataset_name)
    file_path = f"{augmentation_method}_{percentage_data}_{energy_factor}.pkl"
    os.makedirs(output_path, exist_ok=True)

    with open(os.path.join(output_path, file_path), "wb") as f:
        pickle.dump(trial_result, f)

In [None]:
def default_augmentations_layer():
    data_augmentation = keras.Sequential(
        [
            RandomFlip("horizontal"),
            RandomRotation(0.1),
            RandomZoom(0.2),
        ],
        name = 'hflip_rot_zoom'
    )
    return data_augmentation

In [None]:
def build_feat_ext_model():
    feature_ext_model = keras.models.load_model(f"{trial_detail_1.file_str()}_feature_extraction")
    conv_base = feature_ext_model.get_layer("vgg16")

    conv_base.trainable = True
    for layer in conv_base.layers[:-4]:
        layer.trainable = False

    feature_ext_model.compile(
          optimizer=RMSprop(learning_rate=1e-5),
          loss="binary_crossentropy",
          metrics="accuracy",
        )

    return feature_ext_model

In [None]:
conv_base  = keras.applications.vgg16.VGG16(
    weights="imagenet",
    include_top=False)
conv_base.trainable = False

data_augmentation = default_augmentations_layer()

inputs = keras.Input(shape=(180, 180, 3))
x = data_augmentation(inputs)
x = keras.applications.vgg16.preprocess_input(x)
x = conv_base(x)
x = Flatten()(x)
x = Dense(256)(x)
x = Dropout(0.5)(x)
outputs = Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(loss="binary_crossentropy",
              optimizer="rmsprop",
              metrics=["accuracy"])


trial_detail_1 = TrialDetail(dataset_name="cats_vs_dogs",
                             percentage_data=1.0,
                             augmentation_method="svd",
                             energy_factor=0.975
                             )
print(trial_detail_1)

ds_train, ds_test, ds_val = load_dataset()
ds_train_augmented = extend_ds(ds_train)
ds_train_ext = ds_train.concatenate(ds_train_augmented).shuffle(BUFFER_SZ).cache().prefetch(tf.data.AUTOTUNE)

"""
*****************************Feature Extraction*****************************
"""

callbacks = [EarlyStopping(patience=10,
                           monitor="val_accuracy",
                           restore_best_weights=True),
             ModelCheckpoint(filepath=f"{trial_detail_1.file_str()}_feature_extraction",
                             monitor="val_accuracy",
                             save_best_only=True),
             ]

print(f"\n\n{'*'*50}\nFeature Extraction\n{'*'*50}\n\n")
history_1 = model.fit(
            ds_train_ext,
            epochs=50,
            validation_data=ds_val,
            callbacks=callbacks,
)

"""
*****************************Fine Tuning*****************************
"""


all_histories = []
best_epochs_loss = []
best_epochs_acc = []

fold_datasets = split_dataset_kfold(ds_train_ext, 5)

print(f"\n\n{'*'*50}\nFine Tuning\n{'*'*50}\n\n")
for fold, (ds_train_fold, ds_val_fold) in enumerate(fold_datasets):
    callbacks = get_kfold_callbacks(trial_detail_1, fold)
    feature_ext_model = build_feat_ext_model()

    history = feature_ext_model.fit(
                ds_train_fold,
                epochs=100,
                validation_data=ds_val_fold,
                callbacks=callbacks,
            )

    all_histories.append(history.history)
    best_epochs_loss.append(np.argmin(history.history['val_loss']))
    best_epochs_acc.append(np.argmax(history.history['val_accuracy']))


"""
*****************************Retraining on Entire Dataset*****************************
"""

ds_train_ext = ds_train_ext.concatenate(ds_val).shuffle(BUFFER_SZ).cache().prefetch(tf.data.AUTOTUNE)

print(f"\n\n{'*'*50}\nRetraining\n{'*'*50}\n\n")
feature_ext_model = build_feat_ext_model()
callbacks = get_retrain_callbacks(trial_detail_1, "naive")
best_epoch = int(50 * 1.2)
history_2 = feature_ext_model.fit(
          ds_train_ext,
          epochs=best_epoch,
          callbacks=callbacks,
          verbose=1,
)

feature_ext_model = build_feat_ext_model()
callbacks = get_retrain_callbacks(trial_detail_1, "argmin_loss")
best_epoch = int(np.mean(best_epochs_loss)  * 1.2)
history_3 = feature_ext_model.fit(
          ds_train_ext,
          epochs=best_epoch,
          callbacks=callbacks,
          verbose=1,
)

feature_ext_model = build_feat_ext_model()
callbacks = get_retrain_callbacks(trial_detail_1, "argmax_acc")
best_epoch = int(np.mean(best_epochs_acc)  * 1.2)
history_4 = feature_ext_model.fit(
          ds_train_ext,
          epochs=best_epoch,
          callbacks=callbacks,
          verbose=1,
)



test_model = keras.models.load_model(f"{trial_detail_1.file_str()}_feature_extraction")
_, test_acc = test_model.evaluate(ds_test)
print(f"Feature Extraction Test accuracy: {test_acc:.3f}")

test_model = keras.models.load_model(f"{trial_detail_1.file_str()}_fine_tuning")
_, test_acc = test_model.evaluate(ds_test)
print(f"Fine Tuning Test accuracy: {test_acc:.3f}")

test_model = keras.models.load_model(f"{trial_detail_1.file_str()}_naive")
_, test_acc = test_model.evaluate(ds_test)
print(f"Naive Retraining Test accuracy: {test_acc:.3f}")

test_model = keras.models.load_model(f"{trial_detail_1.file_str()}_argmin_loss")
_, test_acc = test_model.evaluate(ds_test)
print(f"Argmin Retraining accuracy: {test_acc:.3f}")

test_model = keras.models.load_model(f"{trial_detail_1.file_str()}_argmax_acc")
_, test_acc = test_model.evaluate(ds_test)
print(f"Argmax Retraining accuracy: {test_acc:.3f}")

generalization_performance = test_model.evaluate(ds_test)
trial_result = TrialResult(all_histories, generalization_performance)
_save_trial_result(trial_result, trial_detail_1)

plot_history(history_1)
plot_history(history_2)
plot_history(history_3)
plot_history(history_4)

