In [1]:
import sklearn
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd

# Seed para que las redes con iguales parametros no generen resultados aleatorios y tener repetibilidad
np.random.seed(42)
tf.random.set_seed(42)

# Para las graficas importamos matplotlib
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

#Función para plotear
def plot_image(image):
    plt.imshow(image, cmap="binary")
    plt.axis("off")

#Función Rounded Accuracy
def rounded_accuracy(y_true, y_pred):
    return keras.metrics.binary_accuracy(tf.round(y_true), tf.round(y_pred))

    
#Traemos los datos de Fashion MNIST
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train_full_normalized = X_train_full.astype(np.float32) / 255
X_test = X_test.astype(np.float32) / 255
X_train, X_valid = X_train_full_normalized[:-5000], X_train_full_normalized[-5000:]
y_train, y_valid = y_train_full[:-5000], y_train_full[-5000:]

#Función para ver los resultados de las reconstrucciones
def show_reconstructions(model, images=X_test, n_images=5):
    reconstructions = model.predict(images[:n_images])
    fig = plt.figure(figsize=(n_images * 1.5, 3))
    for image_index in range(n_images):
        plt.subplot(2, n_images, 1 + image_index)
        plot_image(images[image_index])
        plt.subplot(2, n_images, 1 + n_images + image_index)
        plot_image(reconstructions[image_index])

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
[1m29515/29515[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
[1m26421880/26421880[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
[1m5148/5148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
[1m4422102/4422102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [2]:
#definimos la operacion kernel como un kernel gaussiano

def rbf_kernel(X, Y, sigma=1.0):
    X_expanded = tf.expand_dims(X, axis=1)  # Shape: (batch_size, 1, latent_dim)
    Y_expanded = tf.expand_dims(Y, axis=0)  # Shape: (1, batch_size, latent_dim)
    squared_distance = tf.reduce_sum(tf.square(X_expanded - Y_expanded), axis=2)
    return tf.exp(-squared_distance / (2 * sigma ** 2))

#definimos la operacion MMD
def compute_mmd(z, z_prior, kernel_fn):

    # Calcula los kernels
    K_zz = kernel_fn(z, z)
    K_zz_prior = kernel_fn(z, z_prior)
    K_prior_prior = kernel_fn(z_prior, z_prior)

    # Calcula MMD
    mmd = tf.reduce_mean(K_zz) + tf.reduce_mean(K_prior_prior) - 2 * tf.reduce_mean(K_zz_prior)
    return mmd


In [3]:
#Definimos una funcion para crear el encoder con la API funcional
def build_encoder(latent_dim):
    encoder_inputs = tf.keras.layers.Input(shape=(28, 28, 1))
    x = keras.layers.RandomFlip(mode="horizontal")(encoder_inputs)#Usamos unas capas de random flip y random contrast para aumentar artificialmente los datos de entrada
    x = keras.layers.RandomContrast(factor=0.2)(x)
    x = tf.keras.layers.Conv2D(32, 3, activation='relu', strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Conv2D(64, 3, activation='relu', strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = keras.layers.Dropout(0.5)(x)
    z_mean = tf.keras.layers.Dense(latent_dim)(x)
    z_log_var = tf.keras.layers.Dense(latent_dim)(x)
    encoder = tf.keras.Model(encoder_inputs, [z_mean, z_log_var], name='encoder')
    return encoder

#Definimos la función que crea el decoder con la API funcional
def build_decoder(latent_dim):
    decoder_inputs = tf.keras.layers.Input(shape=(latent_dim,))
    x = tf.keras.layers.Dense(7*7*64, activation='relu')(decoder_inputs)
    x = tf.keras.layers.Reshape((7, 7, 64))(x)
    x = tf.keras.layers.Conv2DTranspose(64, 3, activation='relu', strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Conv2DTranspose(32, 3, activation='relu', strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    decoder_outputs = tf.keras.layers.Conv2DTranspose(1, 3, activation='sigmoid', padding='same')(x)
    decoder = tf.keras.Model(decoder_inputs, decoder_outputs, name='decoder')
    return decoder

#Esta clase se crea para introducir las propiedades del autoencode variacional, en particular el loss probabilistico
class VAE(tf.keras.Model):
    def __init__(self, encoder, decoder, alpha=0.01,sigma=1.0, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.alpha = alpha #Este parametro se usara para darle peso a la regularizacion con el loss KL
        self.sigma = sigma  # Kernel bandwidth
        
#La funcion call de la clase VAE calcula el loss por MMD usando los datos de salida del encoder, y luego los añade
#usado el metrodo add_loss
    def call(self, inputs):
        z_mean, z_log_var = self.encoder(inputs)
        z = self.reparameterize(z_mean, z_log_var)
        reconstructed = self.decoder(z)
        z_prior = tf.random.normal(shape=tf.shape(z))
        mmd_loss = self.alpha * compute_mmd(z, z_prior, lambda x, y: rbf_kernel(x, y, sigma=self.sigma))
        self.add_loss(mmd_loss)
        return reconstructed
    
#Esta funcion se usa en la función call para convertir las muestras del espacio latente en un conjunto continuo y diferenciable, y asi 
#el espacio latente se convierte en la nube de probabilidad
    def reparameterize(self, z_mean, z_log_var):
        eps = tf.random.normal(shape=tf.shape(z_mean))
        return eps * tf.exp(z_log_var * .5) + z_mean

#Para poder usar el parametro de escalado y darle pesos a los diferentes loss creamos una clase para un loss custom escalado
class ScaledBinaryCrossentropy:
    #la funcion init para inicializar el factor de escalada
    def __init__(self, scale=1.0):
        self.scale = scale
        
    #En la funcion call se calcula la perdida escalada y la retorna
    def __call__(self, y_true, y_pred):
        # Calcula la binary crossentropy
        bce = tf.keras.backend.binary_crossentropy(y_true, y_pred)
        # Retorna la binary crossentropy escalada
        return self.scale * bce

In [4]:
#Luego de definir las funcions y la clase VAE, se crea y compila  el modelo
#Este parametro determina el tamaño del espacio latente, o lo que es lo mismo, el número de neuronas de las últimas capas del encoder
latent_dim = 8
#Llamamos las funciones para crear el encoder y decoder
encoder = build_encoder(latent_dim)
decoder = build_decoder(latent_dim)
#Definimos los pesos que llevaran los loss
alpha = 0.6
beta= 1-alpha
#Definimos el ancho de banda del kernel
sigma = 0.1 
#Usamos la clase VAE para definir el modelo
vae = VAE(encoder, decoder,alpha,sigma)

#Creamos el objeto loss que va a calcular la crossentropia escalada
loss2=ScaledBinaryCrossentropy(beta)
#Compilamos
vae.compile(optimizer=keras.optimizers.SGD(learning_rate=5e-1), loss=loss2,metrics=[rounded_accuracy])

In [5]:
#Clear session para no guardar datos de entrenamientos anteriores
keras.backend.clear_session()
history=vae.fit(X_train, X_train, epochs=2, batch_size=32, validation_data=(X_valid, X_valid))

Epoch 1/2


I0000 00:00:1727732215.357697     109 service.cc:145] XLA service 0x783170005840 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1727732215.357746     109 service.cc:153]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
I0000 00:00:1727732215.357750     109 service.cc:153]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5


[1m  36/1719[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m7s[0m 4ms/step - loss: 0.2897 - rounded_accuracy: 0.6613

I0000 00:00:1727732221.769468     109 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m1719/1719[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 10ms/step - loss: 0.2153 - rounded_accuracy: 0.8054 - val_loss: 0.1683 - val_rounded_accuracy: 0.8966
Epoch 2/2
[1m1719/1719[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - loss: 0.1614 - rounded_accuracy: 0.9086 - val_loss: 0.1639 - val_rounded_accuracy: 0.9053


In [None]:
#Crear un custom loss para que la red tenga 2 salidas, y una sea el loss de regularización del espacio latente

In [None]:
#Graficamos el loss de entrenamiento y validación
plt.plot(history.history["loss"],label="Loss")
plt.plot(history.history["val_loss"],label="Val_loss")
plt.grid(True)
#plt.xlim(0,20)
#plt.ylim(0.49,0.52)
plt.legend()
plt.show()

In [None]:
#Graficamos el accuracy de entrenamiento y el de validación

plt.plot(history.history["rounded_accuracy"],label="Accuracy")
plt.plot(history.history["val_rounded_accuracy"],label="Val_Accuracy")
plt.grid(True)
#plt.xlim(0,20)
#plt.ylim(0.7,0.75)
plt.legend()
plt.show()

In [None]:
#Visualizamos las reconstrucciones usando la funcion ya definida
show_reconstructions(vae)
plt.show()

In [None]:
#Hacemos un mapa de calor para visualizar la distribucion de las medias definidas para cada dato en el encoder
mean, *_ = vae.encoder.predict(X_train)
fig = plt.figure(figsize=(11, 7))
plt.scatter(mean[:, 0], mean[:, 1], c=y_train, cmap="magma")
plt.colorbar()
plt.show()

In [None]:
#Para revisar el espacio latente usamos PCA y TSNE
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

# Extraer los valores latentes
z_mean, _ = vae.encoder.predict(X_test)

# Aplicar PCA
pca = PCA(n_components=2)
z_pca = pca.fit_transform(z_mean)

# Aplicar TSNE
tsne = TSNE(n_components=2)
z_tsne = tsne.fit_transform(z_mean)

# Graficar los resultados
plt.figure(figsize=(12, 6))

# PCA
plt.subplot(1, 2, 1)
plt.scatter(z_pca[:, 0], z_pca[:, 1], c='blue', s=2)
plt.title('PCA del espacio latente')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')

# TSNE
plt.subplot(1, 2, 2)
plt.scatter(z_tsne[:, 0], z_tsne[:, 1], c='red', s=2)
plt.title('TSNE del espacio latente')
plt.xlabel('TSNE 1')
plt.ylabel('TSNE 2')

plt.show()

**Por comparación vamos a revisar que pasa cuando no se añade el MMD, y cuando el ancho de banda es muy pequeño y muy grande**

In [None]:
#Luego de definir las funcions y la clase VAE, se crea y compila  el modelo
#Este parametro determina el tamaño del espacio latente, o lo que es lo mismo, el número de neuronas de las últimas capas del encoder
latent_dim = 8
#Llamamos las funciones para crear el encoder y decoder
encoder = build_encoder(latent_dim)
decoder = build_decoder(latent_dim)
#Definimos los pesos que llevaran los loss
alpha = 0
beta= 1-alpha
#Definimos el ancho de banda del kernel
sigma = 0.1 
#Usamos la clase VAE para definir el modelo
vae_0 = VAE(encoder, decoder,alpha,sigma)


#Creamos el objeto loss que va a calcular la crossentropia escalada
loss2=ScaledBinaryCrossentropy(beta)
#Compilamos
vae_0.compile(optimizer=keras.optimizers.SGD(learning_rate=5e-1), loss=loss2,metrics=[rounded_accuracy])

In [None]:
#Clear session para no guardar datos de entrenamientos anteriores
keras.backend.clear_session()
history=vae_0.fit(X_train, X_train, epochs=100, batch_size=32, validation_data=(X_valid, X_valid))

In [None]:
#Visualizamos las reconstrucciones usando la funcion ya definida
show_reconstructions(vae_0)
plt.show()

In [None]:
#Hacemos un mapa de calor para visualizar la distribucion de las medias definidas para cada dato en el encoder
mean, *_ = vae_0.encoder.predict(X_train)
fig = plt.figure(figsize=(11, 7))
plt.scatter(mean[:, 0], mean[:, 1], c=y_train, cmap="magma")
plt.colorbar()
plt.show()

**Verificamos que pasa si el sigma es muy pequeño**

In [None]:
#Luego de definir las funcions y la clase VAE, se crea y compila  el modelo
#Este parametro determina el tamaño del espacio latente, o lo que es lo mismo, el número de neuronas de las últimas capas del encoder
latent_dim = 8
#Llamamos las funciones para crear el encoder y decoder
encoder = build_encoder(latent_dim)
decoder = build_decoder(latent_dim)
#Definimos los pesos que llevaran los loss
alpha = 0.6
beta= 1-alpha
#Definimos el ancho de banda del kernel
sigma = 0.0001 
#Usamos la clase VAE para definir el modelo
vae_min = VAE(encoder, decoder,alpha,sigma)

#Creamos el objeto loss que va a calcular la crossentropia escalada
loss2=ScaledBinaryCrossentropy(beta)
#Compilamos
vae_min.compile(optimizer=keras.optimizers.SGD(learning_rate=5e-1), loss=loss2,metrics=[rounded_accuracy])

In [None]:
#Clear session para no guardar datos de entrenamientos anteriores
keras.backend.clear_session()
history=vae_min.fit(X_train, X_train, epochs=100, batch_size=32, validation_data=(X_valid, X_valid))

In [None]:
#Visualizamos las reconstrucciones usando la funcion ya definida
show_reconstructions(vae_min)
plt.show()

In [None]:
#Hacemos un mapa de calor para visualizar la distribucion de las medias definidas para cada dato en el encoder
mean, *_ = vae_min.encoder.predict(X_train)
fig = plt.figure(figsize=(11, 7))
plt.scatter(mean[:, 0], mean[:, 1], c=y_train, cmap="magma")
plt.colorbar()
plt.show()

**Ahora con un sigma muy grande**

In [None]:
#Luego de definir las funcions y la clase VAE, se crea y compila  el modelo
#Este parametro determina el tamaño del espacio latente, o lo que es lo mismo, el número de neuronas de las últimas capas del encoder
latent_dim = 8
#Llamamos las funciones para crear el encoder y decoder
encoder = build_encoder(latent_dim)
decoder = build_decoder(latent_dim)
#Definimos los pesos que llevaran los loss
alpha = 0.6
beta= 1-alpha
#Definimos el ancho de banda del kernel
sigma = 10000
#Usamos la clase VAE para definir el modelo
vae_max = VAE(encoder, decoder,alpha,sigma)

#Creamos el objeto loss que va a calcular la crossentropia escalada
loss2=ScaledBinaryCrossentropy(beta)
#Compilamos
vae_max.compile(optimizer=keras.optimizers.SGD(learning_rate=5e-1), loss=loss2,metrics=[rounded_accuracy])

In [None]:
#Clear session para no guardar datos de entrenamientos anteriores
keras.backend.clear_session()
history=vae_max.fit(X_train, X_train, epochs=100, batch_size=32, validation_data=(X_valid, X_valid))

In [None]:
#Visualizamos las reconstrucciones usando la funcion ya definida
show_reconstructions(vae_max)
plt.show()

In [None]:
#Hacemos un mapa de calor para visualizar la distribucion de las medias definidas para cada dato en el encoder
mean, *_ = vae_max.encoder.predict(X_train)
fig = plt.figure(figsize=(11, 7))
plt.scatter(mean[:, 0], mean[:, 1], c=y_train, cmap="magma")
plt.colorbar()
plt.show()

# Discusion

**Se puede ver que la MMD cumple bien el papel de regularizar el espacio latente como lo hacia la divergencia KL, el loss empeoró en general al eliminar esta regularizacion de la red.**

**El parametro sigma no tiene tanto impacto en la reconstrucción pero si en la regularización, el espacio latente aparenta ser mucho más disperso al poner valores muy extremos de sigma, tanto pequeños como grandes.**