<a href="https://colab.research.google.com/github/amalvarezme/AprendizajeMaquina/blob/main/7_TopicosAvanzados/2_Autoencoders/3_Autoencoder_TwoOutputs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from typing import Optional, Sequence

import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten, Reshape, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Model
from tensorflow.keras.datasets import fashion_mnist, mnist
from tensorflow.keras.losses import SparseCategoricalCrossentropy, MeanSquaredError
from tensorflow.keras.optimizers import Adam
import numpy as np
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error, f1_score
import matplotlib.pyplot as plt
from matplotlib.offsetbox import OffsetImage, AnnotationBbox

In [None]:
scale: float = 0.4 #variabilidad del ruido
(x_train, y_train), (x_test, y_test) = mnist.load_data() #cargo la base de datos mnist
x_train = x_train.astype('float32') / 255. + np.random.normal(scale=scale, size=x_train.shape) #normalizo la imagen y agrego ruido
x_test = x_test.astype('float32') / 255. + np.random.normal(scale=scale, size=x_test.shape) #normalizo la imagen y agrego ruido

# Creo conjuntos de entrenamiento, validación y prueba
x_val = x_train[50000:] # para validar, desde el dato 50.000 en adelante, es decir, 10.000 datos para validar
y_val = y_train[50000:] # para validar, desde el dato 50.000 en adelante, es decir, 10.000 datos para validar
x_train = x_train[:50000] # para entrenar los primeros 50.000 datos
x_train = x_train[..., tf.newaxis] # nueva dimensión para los filtros convolucionales
y_train = y_train[:50000] # entrenar los primeros 50.000 datos
x_val = x_val[..., tf.newaxis] # nueva dimensión para los filtros convolucionales
x_val = x_val[..., tf.newaxis] # nueva dimensión para los filtros convolucionales
x_test = x_test[..., tf.newaxis] # nueva dimensión para los filtros convolucionales


In [None]:
print(x_train.shape,x_val.shape,x_test.shape,y_train.shape,y_val.shape,y_test.shape)

In [None]:
def plot_mnist_autoencoder(
    x: Sequence[np.ndarray],
    x_: Sequence[np.ndarray],
    y: Optional[Sequence] = None,
    y_: Optional[Sequence] = None,
    cmap: str = 'gray',
    vmin: float = 0,
    vmax: float = 1
) -> None:

    plt.figure(figsize=(20, 2))
    for i, (train, predict) in enumerate(zip(x, x_), start=1):
        plt.subplot(2, len(x), i)
        plt.imshow(train.reshape(28, 28), cmap=cmap, vmin=vmin, vmax=vmax)
        plt.axis('off')

        plt.subplot(2, len(x), i + len(x))
        plt.imshow(predict.reshape(28, 28), cmap=cmap, vmin=vmin, vmax=vmax)
        plt.axis('off')

        if y is not None and y_ is not None:
            color = "red" if y[i - 1] != y_[i - 1] else 'green'
            plt.text(
                0, 28, str(y_[i - 1]), color=color, fontsize=15,
                verticalalignment='bottom', horizontalalignment='left'
            )

In [None]:
#plot images on latent space
def plot_mnist_2d(Z,y,images,img_w=28,img_h=28,zoom=0.5,cmap='jet'):
    fig, ax = plt.subplots(figsize=(5,5))
    plt.axis('off')
    for i in range(Z.shape[0]):
        #print('img',i+1,'/',Z.shape[0])
        image = images[i].reshape((img_w, img_h))
        im = OffsetImage(image, zoom=zoom,cmap=cmap)
        ab = AnnotationBbox(im, (Z[i,0], Z[i,1]), xycoords='data', frameon=False)
        ax.add_artist(ab)
        ax.update_datalim([(Z[i,0], Z[i,1])])
        ax.autoscale()
    plt.show()

In [None]:
x_train.shape

In [None]:
x_train.reshape(x_train.shape[0],-1).shape

In [None]:
#traditional PCA algorithm
red = PCA(n_components=2, random_state=123) #encontrar componentes principales que  ayuden a maximizar la variabilidad de los datos
Z = red.fit_transform(x_train.reshape(x_train.shape[0],-1)) #entrenar, cambiando las dimensiones
N = 500 #  500 imagenes  espacio latente
plot_mnist_2d(Z[:N],y_train[:N],x_train[:N],img_w=28,img_h=28,zoom=0.3,cmap='gray')

In [None]:
# Definir el objeto de pérdida y el optimizador
tf.keras.backend.clear_session() #limpiar la memoria

# Definir el modelo autoencoder
input_img = Input(shape=(28, 28, 1)) #defino el tamaño de la entrada

# Encoder

x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img) #convolución 2D (16 filtros, 3x3 es el tamaño del kernel, sobre la imagen de entrada)
xe = Conv2D(8, (3, 3), activation='relu', padding='same')(x) #convolución 2D (8 filtros, 3x3 es el tamaño del kernel, sobre la salida de la convolución anterior)

# Decoder

x = Conv2DTranspose(8, (3, 3), activation='relu', padding='same')(xe) #desconvolución del espacio latente de la capa anterior (reflejo)
x = Conv2DTranspose(16, (3, 3), activation='relu', padding='same')(x) #desconvolución del espacio latente de la capa anterior (reflejo)
reconstructed_img = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x) #se reconstruye la imagen con activación sigmoide para normalizar

# Rama de clasificación

x = Flatten(name='fencoded')(xe) #aplico flatten al espacio latente
classification_output = Dense(10, activation='softmax')(x) #salida de clasificación con activación softmax para obtener probabilidades, 10 clases

# Definir el modelo con dos salidas

autoencoder = Model(inputs=input_img, outputs=[reconstructed_img, classification_output]) #describe las entradas y salidas del modelo, en la API Funcional de Keras


In [None]:
# Custom loss function

def custom_loss(lambda_=0.5):
    def custom_loss_autoencoder(y_true, y_pred):
        reconstruction_loss = MeanSquaredError()(y_true[0], y_pred[0]) #con mse (error cuadrático medio)para minimizar la distancia
        classification_loss = SparseCategoricalCrossentropy()(y_true[1], y_pred[1]) #pseudositancia que minimiza la distancia entre las función de densidad de probabilidad (softmax)
        return lambda_*reconstruction_loss + (1-lambda_)*classification_loss # va a retornar la suma de ambos loss (sopésada por lambda)
    return custom_loss_autoencoder #retorna la función

lam_ = 0.25 #peso del error de la reconstrucción
autoencoder.compile(optimizer=Adam(), loss=custom_loss(lambda_=lam_)) #compila el modelo con optimizador Adam y función de costo

In [None]:
# Custom training loop
batch_size = 64 #cantidad de imagenes de una iteración
epochs = 20
N = 500 #numero de imagenes que vamos a plotear
red = PCA(n_components=2, random_state=123) #definir PCA

for epoch in range(epochs):
    print(f'Epoch {epoch+1}/{epochs}')
    for x_batch, y_batch in tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(buffer_size=1024).batch(batch_size): #para cada lote ...
        with tf.GradientTape() as tape: # gradiente
            reconstruction, classification = autoencoder(x_batch, training=True) #sacar la resconstruccion y clasificación
            loss = autoencoder.loss([x_batch, y_batch], [reconstruction, classification]) #calcular el loss del lote
            gradients = tape.gradient(loss, autoencoder.trainable_variables) #calcular el gradiente
        autoencoder.optimizer.apply_gradients(zip(gradients, autoencoder.trainable_variables)) #aplicar el gradiente

    loss_ = [] #lista de métricas
    for x_val_batch, y_val_batch in tf.data.Dataset.from_tensor_slices((x_val, y_val)).shuffle(buffer_size=128).batch(batch_size): #para cada lote de validación
        val_reconstruction, val_classification = autoencoder(x_val_batch, training=False) #calcular la resconstrucción y la clasificación
        loss_.append(autoencoder.loss([x_val_batch, y_val_batch], [val_reconstruction, val_classification])) #calculamos el loss y agrgar a la lista de métricas
    print(f'Loss: {loss.numpy()} Val_loss: {np.array(loss_).mean()}') #imprimimos el loss
    if (epoch+1)%5 == 0: #si la epoca es multiplo de 5...

      encoder_ = tf.keras.Model(inputs=autoencoder.inputs,outputs=autoencoder.get_layer('fencoded').output)
      Z = red.fit_transform(encoder_(x_val))

      #plot_mnist_2d(Z[:N],y_val[:N],x_val[:N],img_w=28,img_h=28,zoom=0.3,cmap='gray')

print('done')

In [None]:
# Realizar predicciones con el modelo
reconstruccion_test, clasificacion_test = autoencoder.predict(x_test) #realizo la predicción del modelo en los datos de testeo

# Calcular el Error Cuadrático Medio para la reconstrucción
mse_reconstruction = MeanSquaredError()(x_test, reconstruccion_test) #calculo el error de reconstrucción de los datos de testeo

# Preparar las etiquetas verdaderas y predichas para calcular el puntaje F1
# Convertir etiquetas de formato categórico (one-hot) a formato de clase única

y_test_classes = np.argmax(y_test.reshape(-1, 1), axis=1) #convierto one-hot a sparse
y_pca_pred_classes = np.argmax(clasificacion_test, axis=1) #convierto one-hot a sparse

# Calcular el puntaje F1 para la clasificación
f1 = f1_score(y_test_classes, y_pca_pred_classes, average='weighted') #calculo el puntaje F1

print(f"Error Cuadrático Medio (Reconstrucción): {mse_reconstruction}") #imprimo el error de reconstrucción
print(f"Puntaje F1 (Clasificación): {f1}") #imprimo el puntaje F1 de clasificación


In [None]:
# Set the number of images to display
N = 20

plot_mnist_autoencoder(
    x_test[:N],  # First N original test images
    reconstruccion_test[:N],   # First N reconstructed images
    y_test[:N],  # True labels for the first N images
    clasificacion_test.argmax(axis=1)[:N]    # Predicted labels for the first N images
)

In [None]:
N = 300

In [None]:
plot_mnist_2d(Z[:N], y_val[:N], x_val[:N], img_w=28, img_h=28, zoom=0.3, cmap='gray')
