<a href="https://colab.research.google.com/github/kohathyli/Chicago-Crime-Prediction/blob/main/autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.layers import Input, Dense, BatchNormalization
from keras.models import Model
from kerastuner.tuners import RandomSearch, BayesianOptimization
import torch
import torch.nn as nn
from torch.autograd import Variable

class AutoencoderModel:
    def __init__(self):
        self.INPUT_SHAPE = None
        self.D = 2
        self.TEST_SIZE = 0.2
        self.RANDOM_STATE = 42
        self.SEED = 42
        self.MAX_TRIALS = 20
        self.EXECUTIONS_PER_TRIAL = 1
        self.EPOCHS = 50

    def split_train_test(self, df):
        # df = df.fillna(0.0)
        X_train, X_test = train_test_split(df.copy(), test_size=self.TEST_SIZE, random_state=self.RANDOM_STATE)
        # X_train = X_train.values.astype('float32')
        # X_test = X_test.values.astype('float32')
        # X_train = X_train.reshape((len(X_train), np.prod(X_train.shape[1:])))
        # X_test = X_test.reshape((len(X_test), np.prod(X_test.shape[1:])))
        self.INPUT_SHAPE = X_train.shape[1:]
        return X_train.dropna(), X_test.dropna()

    @staticmethod
    def masked_mse(y_true, y_pred):
        mask = tf.math.is_finite(y_true)
        y_t = tf.where(tf.math.is_finite(y_true), y_true, 0.0)
        y_p = tf.where(tf.math.is_finite(y_pred), y_pred, 0.0)
        mse = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)
        return tf.reduce_mean(mse(y_t*tf.cast(mask, y_t.dtype), y_p*tf.cast(mask, y_p.dtype)))



    def build_encoder(self, hp):
        inputs = Input(shape=self.INPUT_SHAPE)
        x = Dense(units=hp.Int('encoder_units_1', min_value=16, max_value=256, step=16), activation='relu')(inputs)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('encoder_units_2', min_value=16, max_value=128, step=16), activation='relu')(x)
        x = BatchNormalization()(x)
        latent_space = Dense(units=self.D, activation='relu')(x)
        return Model(inputs, latent_space)

    def build_decoder(self, hp):
        decoder_inputs = Input(shape=(self.D,))
        x = Dense(units=hp.Int('decoder_units_1', min_value=16, max_value=256, step=16), activation='relu')(decoder_inputs)
        x = BatchNormalization()(x)
        x = Dense(units=hp.Int('decoder_units_2', min_value=16, max_value=256, step=16), activation='relu')(x)
        x = BatchNormalization()(x)
        outputs = Dense(units=self.INPUT_SHAPE[0], activation='linear')(x)
        return Model(decoder_inputs, outputs)

    def build_autoencoder(self, hp):
        learning_rate = hp.Choice('learning_rate', values=[1.0, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5])
        batch_size = hp.Int('batch_size', min_value=16, max_value=256, step=16)
        autoencoder_input = Input(shape=self.INPUT_SHAPE)
        encoder_output = self.build_encoder(hp)(autoencoder_input)
        decoder_output = self.build_decoder(hp)(encoder_output)
        autoencoder = Model(autoencoder_input, decoder_output)
        autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=learning_rate), loss='mse')
        # autoencoder.summary()
        # autoencoder.save(f'{self.DIRECTORY}/autoencoder_model.h5
        return autoencoder

    def define_tuner(self):
        tuner = BayesianOptimization(
            self.build_autoencoder,
            objective='val_loss',
            max_trials=self.MAX_TRIALS,
            executions_per_trial=self.EXECUTIONS_PER_TRIAL,
            # directory=self.DIRECTORY,
            # project_name=self.PROJECT_NAME,
            # overwrite=self.OVERWRITE,
            seed=self.SEED
            )
        return tuner


In [None]:
class Autoencoder(nn.Module):
    def __init__(self,D_in,H=50,H2=12,latent_dim=3):

        #Encoder
        super(Autoencoder,self).__init__()
        self.linear1=nn.Linear(D_in,H)
        self.lin_bn1 = nn.BatchNorm1d(num_features=H)
        self.linear2=nn.Linear(H,H2)
        self.lin_bn2 = nn.BatchNorm1d(num_features=H2)
        self.linear3=nn.Linear(H2,H2)
        self.lin_bn3 = nn.BatchNorm1d(num_features=H2)

        # Latent vectors mu and sigma
        self.fc1 = nn.Linear(H2, latent_dim)
        self.bn1 = nn.BatchNorm1d(num_features=latent_dim)
        self.fc21 = nn.Linear(latent_dim, latent_dim)
        self.fc22 = nn.Linear(latent_dim, latent_dim)

        # Sampling vector
        self.fc3 = nn.Linear(latent_dim, latent_dim)
        self.fc_bn3 = nn.BatchNorm1d(latent_dim)
        self.fc4 = nn.Linear(latent_dim, H2)
        self.fc_bn4 = nn.BatchNorm1d(H2)

        # Decoder
        self.linear4=nn.Linear(H2,H2)
        self.lin_bn4 = nn.BatchNorm1d(num_features=H2)
        self.linear5=nn.Linear(H2,H)
        self.lin_bn5 = nn.BatchNorm1d(num_features=H)
        self.linear6=nn.Linear(H,D_in)
        self.lin_bn6 = nn.BatchNorm1d(num_features=D_in)

        self.relu = nn.ReLU()

    def encode(self, x):
        lin1 = self.relu(self.lin_bn1(self.linear1(x)))
        lin2 = self.relu(self.lin_bn2(self.linear2(lin1)))
        lin3 = self.relu(self.lin_bn3(self.linear3(lin2)))

        fc1 = F.relu(self.bn1(self.fc1(lin3)))

        r1 = self.fc21(fc1)
        r2 = self.fc22(fc1)

        return r1, r2

    def reparameterize(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = Variable(std.data.new(std.size()).normal_())
            return eps.mul(std).add_(mu)
        else:
            return mu

    def decode(self, z):
        fc3 = self.relu(self.fc_bn3(self.fc3(z)))
        fc4 = self.relu(self.fc_bn4(self.fc4(fc3)))

        lin4 = self.relu(self.lin_bn4(self.linear4(fc4)))
        lin5 = self.relu(self.lin_bn5(self.linear5(lin4)))
        return self.lin_bn6(self.linear6(lin5))



    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar



In [None]:
class customLoss(nn.Module):
    def __init__(self):
        super(customLoss, self).__init__()
        self.mse_loss = nn.MSELoss(reduction="sum")

    def forward(self, x_recon, x, mu, logvar):
        loss_MSE = self.mse_loss(x_recon, x)
        loss_KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

        return loss_MSE + loss_KLD