<a href="https://colab.research.google.com/github/mortgad/DLVR/blob/main/VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Encoder

In [17]:
import os
import tensorflow as tf
import keras
from keras import ops
from keras.layers import Layer, Conv2D, Flatten, Dense

os.environ["KERAS_BACKEND"] = "tensorflow"

class Sampling(Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

latent_dim = 4 # <--------------------------------------------- IMPORTANT: How many latent dimensions?

encoder_inputs = keras.Input(shape=(224, 224, 3))
x = Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = Flatten()(x)
x = Dense(16, activation="relu")(x)
z_mean = Dense(latent_dim, name="z_mean")(x)
z_log_var = Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

# Decoder

In [28]:
from keras.layers import Input, Conv2DTranspose, Reshape
from keras import Model

latent_inputs = Input(shape=(latent_dim,))
x = Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = Reshape((7, 7, 64))(x)
x = Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = Conv2DTranspose(64, 3, activation="relu", strides=8, padding="same")(x)
x = Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = Conv2DTranspose(3, 3, activation="sigmoid", padding="same")(x)
decoder = Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

# VAE model

In [29]:
from keras.metrics import Mean
class VAE(Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = Mean(name="total_loss")
        self.reconstruction_loss_tracker = Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        # This function is called at every iteration of the model training.
        # The gradient tape just records operations for automatic gradient computation.
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = ops.mean(
                ops.sum(
                    keras.losses.binary_crossentropy(data, reconstruction),
                    axis=(1, 2),
                )
            )
            kl_loss = -0.5 * (1 + z_log_var/2 - ops.square(z_mean) - ops.exp(z_log_var))
            kl_loss = ops.mean(ops.sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())

KeyboardInterrupt: 

# Import data

In [7]:
# Mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
!git clone https://github.com/mortgad/DLVR.git
%cd DLVR

Cloning into 'DLVR'...
remote: Enumerating objects: 456, done.[K
remote: Counting objects: 100% (314/314), done.[K
remote: Compressing objects: 100% (212/212), done.[K
remote: Total 456 (delta 185), reused 176 (delta 101), pack-reused 142 (from 1)[K
Receiving objects: 100% (456/456), 56.36 MiB | 12.47 MiB/s, done.
Resolving deltas: 100% (247/247), done.
/content/DLVR


In [13]:
import os
base_dir = '/content/drive/MyDrive/Deep_Learning_Visual_Recognition/Project'
folder_name = 'UTKFace_Filtered'
metadata_file = 'UTKFace_Filtered_Metadata.csv'
filtered_images_dir = os.path.join(base_dir, folder_name)

# Load csv-file
import pandas as pd
df = pd.read_csv(os.path.join(base_dir, metadata_file))

# Preprocess df
df = df[['age_raw','gender_code','race_code','file']]
df = df[df['age_raw']<=100]
df.rename(columns={'age_raw': 'age_code'}, inplace=True)

# Fix paths
df['file'] = df['file'].apply(lambda x: os.path.join(filtered_images_dir, x))

# Create lists
from utils.preprocessing import create_lists
df_first_1000 = df.head(1000)
images, ages, races, genders = create_lists(df_first_1000)
print(f"Loaded {len(images)} images.")

# Get unique ages and their counts
import numpy as np
unique, counts = np.unique(ages, return_counts=True)

# Find ages with a count of 1 or lower
rare_ages = unique[counts <= 1]

# Print or return the result
print("Ages with count of 1 or lower:", rare_ages)

# Get indices of these ages and remove from all lists
indices_to_remove = np.where(np.isin(ages, rare_ages))[0]
images = np.delete(images, indices_to_remove, axis=0)
ages = np.delete(ages, indices_to_remove)
races = np.delete(races, indices_to_remove)
genders = np.delete(genders, indices_to_remove)

# Verify
unique, counts = np.unique(ages, return_counts=True)
rare_ages = unique[counts <= 1]
print("Ages with count of 1 or lower:", rare_ages)

# Preprocess images
from keras.applications.resnet_v2 import preprocess_input
images_preprocessed = preprocess_input(images)

# Split into train (80%) and temp (20%)
from sklearn.model_selection import train_test_split
X_train, X_temp, y_train_age, y_temp_age, y_train_races, y_temp_races, y_train_gender, y_temp_gender = train_test_split(images_preprocessed, ages, races, genders, test_size=0.20, random_state=42, stratify=ages)

# Split temp into validation (75% of temp, which is 15% of original data) and test (25% of temp, which is 5% of original data)
X_val, X_test, y_val_age, y_test_age, y_val_races, y_test_races, y_val_gender, y_test_gender = train_test_split(X_temp, y_temp_age, y_temp_races, y_temp_gender, test_size=0.25, random_state=42)

Processing images:   0%|          | 0/1000 [00:00<?, ?it/s]

Loaded 1000 images.
Ages with count of 1 or lower: [ 57  61  64  74  82  89 100]
Ages with count of 1 or lower: []


In [16]:
print('X_train shape: ' + str(X_train.shape))
print('y_train shape: ' + str(y_train_age.shape))
print('X_val shape: ' + str(X_val.shape))
print('y_val shape: ' + str(y_val_age.shape))

X_train shape: (794, 224, 224, 3)
y_train shape: (794,)
X_val shape: (149, 224, 224, 3)
y_val shape: (149,)


In [30]:
vae.fit(X_train, epochs=1, batch_size=128)

[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 2s/step - kl_loss: 21.6257 - loss: 34546.7344 - reconstruction_loss: 34525.1094


<keras.src.callbacks.history.History at 0x7c7ebe6c9480>