<a href="https://colab.research.google.com/github/kohathyli/Autoencoders_Census/blob/main/autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install keras-tuner

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import backend as K
from keras.layers import Input, Dense, BatchNormalization, Concatenate, Dropout, Lambda
from keras.models import Model
from keras.regularizers import l2
from keras_tuner.tuners import RandomSearch, BayesianOptimization
from keras.callbacks import EarlyStopping

@keras.saving.register_keras_serializable()
class CustomCategoricalCrossentropy(tf.keras.losses.Loss):
    def __init__(self, attribute_cardinalities, name="custom_categorical_crossentropy"):
        super(CustomCategoricalCrossentropy, self).__init__(name=name)
        self.attribute_cardinalities = attribute_cardinalities
        log_cardinalities = [np.log(cardinality) for cardinality in self.attribute_cardinalities]
        log_cardinalities_tensor = tf.constant(log_cardinalities, dtype=tf.float32)
        self.log_cardinalities_expanded = tf.expand_dims(log_cardinalities_tensor, axis=-1)

    def call(self, y_true, y_pred):
        # Your custom loss logic here
        y_true_splits = tf.split(y_true, self.attribute_cardinalities, axis=1)
        y_pred_splits = tf.split(y_pred, self.attribute_cardinalities, axis=1)

        max_size = max(self.attribute_cardinalities)

        y_true_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_true_splits]
        y_pred_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_pred_splits]

        xent_losses = tf.keras.losses.categorical_crossentropy(y_true_splits, y_pred_splits)

        normalized_xent_losses = xent_losses / self.log_cardinalities_expanded

        return tf.reduce_mean(normalized_xent_losses, axis=0)

    def get_config(self):
        return {'attribute_cardinalities': self.attribute_cardinalities}

@keras.saving.register_keras_serializable()
class AutoencoderModel:
    def __init__(self, attribute_cardinalities):
        self.INPUT_SHAPE = None
        # self.D = 2
        self.TEST_SIZE = 0.2
        self.MAX_TRIALS = 20
        self.EXECUTIONS_PER_TRIAL = 1
        self.BATCH_SIZE = 32
        self.attribute_cardinalities = attribute_cardinalities

        log_cardinalities = [np.log(cardinality) for cardinality in self.attribute_cardinalities]
        log_cardinalities_tensor = tf.constant(log_cardinalities, dtype=tf.float32)
        self.log_cardinalities_expanded = tf.expand_dims(log_cardinalities_tensor, axis=-1)

    def get_config(self):
        return {'attribute_cardinalities': self.attribute_cardinalities}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

    def split_train_test(self, df):
        # df = df.fillna(0.0)
        X_train, X_test = train_test_split(df.copy(), test_size=self.TEST_SIZE)
        self.INPUT_SHAPE = X_train.shape[1:]
        return X_train.dropna(), X_test.dropna()

    @staticmethod
    def masked_mse(y_true, y_pred):
        mask = tf.math.is_finite(y_true)
        y_t = tf.where(tf.math.is_finite(y_true), y_true, 0.0)
        y_p = tf.where(tf.math.is_finite(y_pred), y_pred, 0.0)
        mse = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)
        return tf.reduce_mean(mse(y_t*tf.cast(mask, y_t.dtype), y_p*tf.cast(mask, y_p.dtype)))

    def build_encoder(self, hp):
        inputs = Input(shape=self.INPUT_SHAPE)
        x = Dense(units=hp.Int('encoder_units_1', min_value=160, max_value=160, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('encoder_l2_1', [0.0, 0.001])))(inputs)
        x = Dropout(hp.Float('encoder_dropout_1', 0.1, 0.1, step=0.1))(x)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('encoder_units_2', min_value=16, max_value=16, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('encoder_l2_2', [0.0, 0.001])))(x)
        x = Dropout(hp.Float('encoder_dropout_2', 0, 0.5, step=0.1))(x)
        x = BatchNormalization()(x)
        latent_space = Dense(units=hp.Int('latent_space_dim', min_value=2, max_value=50, step=1),
                             activation='relu')(x)
        return Model(inputs, latent_space)

    def build_decoder(self, hp):
        decoder_inputs = Input(shape=(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1),))
        x = Dense(units=hp.Int('decoder_units_1', min_value=16, max_value=256, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('decoder_l2_1', [0.0, 0.001, 0.01])))(decoder_inputs)
        x = Dropout(hp.Float('decoder_dropout_1', 0.0, 0.0, step=0.1))(x)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('decoder_units_2', min_value=160, max_value=256, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('decoder_l2_2', [0.0, 0.001])))(x)
        x = Dropout(hp.Float('decoder_dropout_2', 0, 0.5, step=0.1))(x)
        x = BatchNormalization()(x)

        decoded_attrs = []
        for categories in self.attribute_cardinalities:
          decoder_softmax = Dense(categories, activation='softmax')(x)
          decoded_attrs.append(decoder_softmax)

        outputs = Concatenate()(decoded_attrs)

        return Model(decoder_inputs, outputs)

    '''
    def custom_categorical_crossentropy(self, y_true, y_pred):
        xent_loss = 0
        start_idx = 0

        for categories in self.attribute_cardinalities:
            x_attr = y_true[:, start_idx:start_idx + categories]
            y_attr = y_pred[:, start_idx:start_idx + categories]

            x_attr = K.cast(x_attr, 'float32')
            y_attr = K.cast(y_attr, 'float32')

            xent_loss += K.mean(K.categorical_crossentropy(x_attr, y_attr)) / np.log(categories)

            start_idx += categories
        return xent_loss / len(self.attribute_cardinalities)
    '''
    '''
    def custom_categorical_crossentropy(self, y_true, y_pred):
        # We create a separate vector for each attribute
        y_true_splits = tf.split(y_true, self.attribute_cardinalities, axis=1)
        y_pred_splits = tf.split(y_pred, self.attribute_cardinalities, axis=1)

        # Compute the maximum size among the splits
        max_size = max(self.attribute_cardinalities)

        # Pad each split to have the same size as max_size, this will be useful for calculating
        # cross entropy for each vector. If the shape of the vectors is not consistent, the K.categorical_crossentropy
        # does not work.
        y_true_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_true_splits]
        y_pred_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_pred_splits]

        # Compute the categorical cross-entropy for each attribute
        xent_losses = K.categorical_crossentropy(y_true_splits, y_pred_splits)

        # Normalize by log of cardinality
        normalized_xent_losses = xent_losses / self.log_cardinalities_expanded

        return K.mean(normalized_xent_losses,axis=0)
    '''

    def build_autoencoder(self, hp):
        learning_rate = hp.Choice('learning_rate', values=[1e-3])

        autoencoder_input = Input(shape=self.INPUT_SHAPE)
        encoder_output = self.build_encoder(hp)(autoencoder_input)
        decoder_output = self.build_decoder(hp)(encoder_output)
        autoencoder = Model(autoencoder_input, decoder_output)
        autoencoder.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss=CustomCategoricalCrossentropy(attribute_cardinalities=self.attribute_cardinalities)
        )

        return autoencoder

    def define_tuner(self, seed_hps=None):
        tuner = BayesianOptimization(
            self.build_autoencoder,
            objective='val_loss',
            max_trials=self.MAX_TRIALS,
            executions_per_trial=self.EXECUTIONS_PER_TRIAL,
            hyperparameters=seed_hps
            )
        return tuner


In [None]:
import unittest
import pandas as pd
import numpy as np

class TestAutoencoderModel(unittest.TestCase):

    def setUp(self):
        # Create a DataFrame with synthetic data
        self.df = pd.DataFrame({
            'col1': ['A', 'B', 'A', 'C', 'B'],
            'col2': [1, 2, 1, 2, 2],
            'col3': ['X', 'Y', 'X', 'Y', 'Z']
        })
        self.attribute_cardinalities = [3, 2, 3]  # 3 unique categories in each attribute
        self.autoencoder = AutoencoderModel(self.attribute_cardinalities)

    def test_split_train_test(self):
        X_train, X_test = self.autoencoder.split_train_test(self.df)
        self.assertEqual(len(X_train) + len(X_test), len(self.df))

    def test_masked_mse(self):
        y_true = np.array([[1.0, np.nan], [2.0, 2.0]])
        y_pred = np.array([[1.0, 1.0], [3.0, 2.0]])
        result = self.autoencoder.masked_mse(y_true, y_pred)
        self.assertIsNotNone(result)

    def test_custom_categorical_crossentropy(self):
        y_true = np.array([[1, 0, 0, 1, 0, 1, 0, 0],
                           [0, 1, 0, 0, 1, 0, 0, 1]])
        y_pred = np.array([[0.8, 0.1, 0.1, 0.7, 0.3, 0.9, 0.05, 0.05],
                           [0.1, 0.8, 0.1, 0.3, 0.7, 0.05, 0.05, 0.9]])
        result = CustomCategoricalCrossentropy(attribute_cardinalities=self.attribute_cardinalities)(y_true, y_pred)
        # p=[1,0,0],q=[0.8,0.1,0.1]: 0.2231 / np.log(3)
        # p=[1,0],q=[0.7,0.3]: 0.3567 / np.log(2)
        # p=[1,0,0],q=[0.9,0.05,0.05]: 0.1054 / np.log(3)
        # p=[0,1,0],q=[0.1,0.8,0.1]: 0.2231 / np.log(3)
        # p=[0,1,0],q=[0.3,0.7]: 0.3567 / np.log(2)
        # p=[0,0,1],q=[0.05,0.05,0.9]: 0.1054 / np.log(3)
        print(result)
        self.assertIsNotNone(result)
        # We divide by 2 because we have two examples
        # We divide by 3 because we have 3 attributes per example
        # We divide by np.log(3) for normalization (all attributes have cardinality 3)
        self.assertAlmostEqual(result.mean(), (0.2231/ np.log(3) + 0.3567/ np.log(2) + 0.1054/ np.log(3))/3, places=2)



In [None]:
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

def run_tests(test_class):
    suite = unittest.TestLoader().loadTestsFromTestCase(test_class)
    runner = unittest.TextTestRunner()
    runner.run(suite)

if __name__ == "__main__":
  run_tests(TestAutoencoderModel)

In [None]:
from kerastuner import HyperModel

@keras.saving.register_keras_serializable()
class CustomCategoricalCrossentropy(tf.keras.losses.Loss):
    def __init__(self, attribute_cardinalities, name="custom_categorical_crossentropy"):
        super(CustomCategoricalCrossentropy, self).__init__(name=name)
        self.attribute_cardinalities = attribute_cardinalities
        log_cardinalities = [np.log(cardinality) for cardinality in self.attribute_cardinalities]
        log_cardinalities_tensor = tf.constant(log_cardinalities, dtype=tf.float32)
        self.log_cardinalities_expanded = tf.expand_dims(log_cardinalities_tensor, axis=-1)

    def call(self, y_true, y_pred):
        y_true_splits = tf.split(y_true, self.attribute_cardinalities, axis=1)
        y_pred_splits = tf.split(y_pred, self.attribute_cardinalities, axis=1)

        max_size = max(self.attribute_cardinalities)

        y_true_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_true_splits]
        y_pred_splits = [tf.pad(split, [[0, 0], [0, max_size - tf.shape(split)[1]]]) for split in y_pred_splits]

        xent_losses = tf.keras.losses.categorical_crossentropy(y_true_splits, y_pred_splits)

        normalized_xent_losses = xent_losses / self.log_cardinalities_expanded

        reconstruction_loss = tf.reduce_mean(normalized_xent_losses)

        kl_divergence = -0.5 * tf.reduce_sum(1 + y_pred - tf.square(y_pred) - tf.exp(y_pred), axis=-1)
        kl_divergence = tf.reduce_mean(kl_divergence)

        total_loss = reconstruction_loss + kl_divergence

        return total_loss

    def get_config(self):
        return {'attribute_cardinalities': self.attribute_cardinalities}

class VariationalAutoencoderModel:
    def __init__(self, attribute_cardinalities):
        self.INPUT_SHAPE = None
        # self.D = 2
        self.TEST_SIZE = 0.2
        self.MAX_TRIALS = 20
        self.EXECUTIONS_PER_TRIAL = 1
        self.BATCH_SIZE = 32
        self.attribute_cardinalities = attribute_cardinalities

        log_cardinalities = [np.log(cardinality) for cardinality in self.attribute_cardinalities]
        log_cardinalities_tensor = tf.constant(log_cardinalities, dtype=tf.float32)
        self.log_cardinalities_expanded = tf.expand_dims(log_cardinalities_tensor, axis=-1)

    def get_config(self):
        return {'attribute_cardinalities': self.attribute_cardinalities}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

    def split_train_test(self, df):
        # df = df.fillna(0.0)
        X_train, X_test = train_test_split(df.copy(), test_size=self.TEST_SIZE)
        self.INPUT_SHAPE = X_train.shape[1:]
        return X_train.dropna(), X_test.dropna()

    #def split_train_test(self, data, test_size=0.2):
        #train_size = int((1 - test_size) * len(data))
        #train_data, test_data = data[:train_size], data[train_size:]
        #return train_data, test_data

    def build_encoder(self, hp):
        inputs = Input(shape=self.INPUT_SHAPE)
        x = Dense(units=hp.Int('encoder_units_1', min_value=160, max_value=160, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('encoder_l2_1', [0.0, 0.001])))(inputs)
        x = Dropout(hp.Float('encoder_dropout_1', 0.1, 0.1, step=0.1))(x)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('encoder_units_2', min_value=16, max_value=16, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('encoder_l2_2', [0.0, 0.001])))(x)
        x = Dropout(hp.Float('encoder_dropout_2', 0, 0.5, step=0.1))(x)
        x = BatchNormalization()(x)

        z_mean = Dense(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1), activation='linear')(x)
        z_log_var = Dense(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1), activation='linear')(x)

        # Sampling function for the VAE
        def sampling(args):
            z_mean, z_log_var = args
            batch = tf.shape(z_mean)[0]
            dim = tf.shape(z_mean)[1]
            epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon

        latent_space = tf.keras.layers.Lambda(sampling, output_shape=(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1),))([z_mean, z_log_var])

        return Model(inputs, [z_mean, z_log_var, latent_space])

    def build_decoder(self, hp):
        decoder_inputs = Input(shape=(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1),))
        x = Dense(units=hp.Int('decoder_units_1', min_value=16, max_value=256, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('decoder_l2_1', [0.0, 0.001, 0.01])))(decoder_inputs)
        x = Dropout(hp.Float('decoder_dropout_1', 0.0, 0.0, step=0.1))(x)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('decoder_units_2', min_value=160, max_value=256, step=16),
                  activation='relu',
                  kernel_regularizer=l2(hp.Choice('decoder_l2_2', [0.0, 0.001])))(x)
        x = Dropout(hp.Float('decoder_dropout_2', 0, 0.5, step=0.1))(x)
        x = BatchNormalization()(x)

        decoded_attrs = []
        for categories in self.attribute_cardinalities:
          decoder_softmax = Dense(categories, activation='softmax')(x)
          decoded_attrs.append(decoder_softmax)

        outputs = Concatenate()(decoded_attrs)

        return Model(decoder_inputs, outputs)

    def build_autoencoder(self, hp):

        learning_rate = hp.Choice('learning_rate', values=[1e-3])

        autoencoder_input = Input(shape=self.INPUT_SHAPE)

        encoder_output = self.build_encoder(hp)(autoencoder_input)
        z_mean, z_log_var, latent_space = encoder_output

        def sampling(args):
            z_mean, z_log_var = args
            batch = tf.shape(z_mean)[0]
            dim = tf.shape(z_mean)[1]
            epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon

        latent_sample = tf.keras.layers.Lambda(sampling, output_shape=(hp.Int('latent_space_dim', min_value=2, max_value=50, step=1),))([z_mean, z_log_var])

        decoder_output = self.build_decoder(hp)(latent_sample)

        autoencoder = Model(autoencoder_input, decoder_output)
        autoencoder.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss=CustomCategoricalCrossentropy(attribute_cardinalities=self.attribute_cardinalities)
        )
        return autoencoder

    def define_tuner(self, seed_hps=None):
        tuner = BayesianOptimization(
            self.build_autoencoder,
            objective='val_loss',
            max_trials=self.MAX_TRIALS,
            executions_per_trial=self.EXECUTIONS_PER_TRIAL,
            hyperparameters=seed_hps
            )
        return tuner

In [None]:
class TestVariationalAutoencoderModel(unittest.TestCase):
    def setUp(self):
        self.df = pd.DataFrame({
            'col1': ['A', 'B', 'A', 'C', 'B'],
            'col2': [1, 2, 1, 2, 2],
            'col3': ['X', 'Y', 'X', 'Y', 'Z']
        })
        self.attribute_cardinalities = [3, 2, 3]
        self.vae_model = VariationalAutoencoderModel(self.attribute_cardinalities)

    def test_split_train_test(self):
        X_train, X_test = self.vae_model.split_train_test(self.df)
        self.assertEqual(len(X_train) + len(X_test), len(self.df))

    def test_custom_categorical_crossentropy(self):
        y_true = np.array([[1, 0, 0, 1, 0, 1, 0, 0],
                           [0, 1, 0, 0, 1, 0, 0, 1]])
        y_pred = np.array([[0.8, 0.1, 0.1, 0.7, 0.3, 0.9, 0.05, 0.05],
                           [0.1, 0.8, 0.1, 0.3, 0.7, 0.05, 0.05, 0.9]])
        result = CustomCategoricalCrossentropy(attribute_cardinalities=self.attribute_cardinalities)(y_true, y_pred)
        print(result)
        self.assertIsNotNone(result)
        epsilon = 1e-10

        kl_divergence = np.mean(np.sum(
            y_true * np.log((y_true + epsilon) / (y_pred + epsilon)) +
            (1 - y_true) * np.log((1 - y_true + epsilon) / (1 - y_pred + epsilon)),
            axis=1
        ))
        print(kl_divergence)
        self.assertAlmostEqual(result.mean(), np.sum((0.2231/ np.log(3) + 0.3567/ np.log(2) + 0.1054/ np.log(3))/3+ kl_divergence), places=2)



In [None]:
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

def run_tests(test_class):
    suite = unittest.TestLoader().loadTestsFromTestCase(test_class)
    runner = unittest.TextTestRunner()
    runner.run(suite)

if __name__ == "__main__":
  run_tests(TestVariationalAutoencoderModel)