Marcin Wardyński  
wtorek, 9:45

## Laboratorium 7
### 7.4 AE


#### Funkcje pomocnicze

In [1]:
import warnings
warnings.filterwarnings("ignore")

import importlib
import lab7_utils as utils
importlib.reload(utils)

import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

seed = 42

In [2]:
from sklearn.base import BaseEstimator, TransformerMixin
import matplotlib.pyplot as plt

class FrozenEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, encoder):
        self.encoder = encoder

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return self.encoder.predict(X)
    
    
def viz_accuracy_to_latent_dim(latent_dims, scores, score_name):

    plt.figure(figsize=(8, 5))
    plt.plot(latent_dims, scores)
    plt.title(f"{score_name} - Lattent Layer Dimension")
    plt.xlabel("Lattent Layer Dimension")
    plt.ylabel(f"{score_name}")
    plt.show()

    for i in range(len(latent_dims)):
        print(f"[{latent_dims[i]}: {scores[i]:.4f}]", end=" ")
    print()

In [12]:
import keras

from scikeras.wrappers import KerasRegressor
from sklearn.model_selection import GridSearchCV

def evaluate_autoencoder(dataset_name, params, create_ae_fun):

    X_train, X_test, y_train, y_test = utils.get_dataset_for_ae(dataset_name=dataset_name, with_val=False)

    keras_reg = KerasRegressor(
        model=create_ae_fun,
        loss="binary_crossentropy",
        optimizer="adam",
        optimizer__learning_rate=0.001,
        model__latent_dim=128,
        verbose=False,
    )

    grid_search = GridSearchCV(keras_reg, params, cv=7, n_jobs=-1)
    grid_search.fit(X_train, X_train)

    r2_scores = np.array(grid_search.cv_results_['mean_test_score'])
    latent_dims = np.array(list(map(lambda t: t['model__latent_dim'], grid_search.cv_results_['params'])))
    
    
    viz_accuracy_to_latent_dim(latent_dims, r2_scores, "R2")
    

In [5]:
from sklearn.metrics import accuracy_score, classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

def create_model_instance(class_name, *args, **kwargs):
    return class_name(*args, **kwargs)

def find_best_encoder(dataset_name, model_name, latent_dims):
    X_train, X_val, X_test, y_train, y_val, y_test = utils.get_dataset_for_ae(dataset_name, with_val=True)

    max_accuracy = 0
    best_latent_dim = 0
    best_model = None
    accuracies = []
    for latent_dim in latent_dims:
        model = create_model_instance(model_name, latent_dim)
        model.compile(optimizer=tf.keras.optimizers.Adam(), loss=keras.losses.BinaryCrossentropy())
        model.fit(X_train, X_train,
                epochs=40,
                shuffle=True,
                batch_size=64,
                verbose=0,
                validation_data=(X_val, X_val))
        
        frozen_model = FrozenEncoder(encoder=model.encoder)

        pipeline_log_reg = Pipeline([
            ('frozen_model', frozen_model),
            ('log_reg', LogisticRegression(max_iter=1000, solver="newton-cg"))
        ])

        pipeline_log_reg.fit(X_train, y_train)
        y_pred = pipeline_log_reg.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        accuracies.append(accuracy)

        if max_accuracy < accuracy:
            max_accuracy = accuracy
            best_latent_dim = latent_dim
            best_model = frozen_model
    
    viz_accuracy_to_latent_dim(latent_dims, accuracies, "Accuracy")
    return best_model, best_latent_dim


def classify_with_encoder(dataset_name, encoder_model, head_model):
    X_train, X_test, y_train, y_test = utils.get_dataset_for_ae(dataset_name, with_val=False)
    
    pipeline_log_reg = Pipeline([
        ('encoder', encoder_model),
        ('head', head_model)
    ])

    pipeline_log_reg.fit(X_train, y_train)

    y_pred = pipeline_log_reg.predict(X_test)

    print(classification_report(y_test, y_pred))

### Simple Autoencoder

In [6]:
class SimpleAutoencoder(tf.keras.Model):
  def __init__(self, latent_dim):
    super(SimpleAutoencoder, self).__init__()
    self.latent_dim = latent_dim
    self.mid_dim = 384
    self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(shape=(784, )),
            tf.keras.layers.Dense(self.mid_dim, activation=tf.nn.relu),
            tf.keras.layers.Dense(latent_dim, activation=tf.nn.relu),
        ]
    )
    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(shape=(latent_dim,)),
            tf.keras.layers.Dense(self.mid_dim, activation=tf.nn.relu),
            tf.keras.layers.Dense(units=784, activation=tf.nn.sigmoid),
        ]
    )

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded    

def create_simple_ae_as_sequence(latent_dim):
  model =  SimpleAutoencoder(latent_dim)
  return tf.keras.Sequential([model.encoder, model.decoder])

In [14]:
latent_dims = [10, 64, 128, 256, 512]

param_grid = {
    'model__latent_dim': latent_dims,
    'epochs': [40]
}

#### MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.MNIST.value, param_grid, create_simple_ae_as_sequence)

In [None]:
mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.MNIST.value, SimpleAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Simple Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Simple Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, mnist_encoder_model, head_model)

#### Fashion MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.F_MNIST.value, param_grid, create_simple_ae_as_sequence)

In [None]:
f_mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.F_MNIST.value, SimpleAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Simple Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, f_mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Simple Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, f_mnist_encoder_model, head_model)

#### Kuzushiji MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.K_MNIST.value, param_grid, create_simple_ae_as_sequence)

In [None]:
k_mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.K_MNIST.value, SimpleAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Simple Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, k_mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Simple Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, k_mnist_encoder_model, head_model)

### Convolutional Autoencoder

In [9]:
class ConvAutoencoder(tf.keras.Model):
  def __init__(self, latent_dim):
    super(ConvAutoencoder, self).__init__()
    self.latent_dim = latent_dim
    self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.Input(shape=(784,)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation=tf.nn.relu, padding='same'),
            tf.keras.layers.MaxPooling2D(2, padding='same'),
            tf.keras.layers.Conv2D(64, 3, activation=tf.nn.relu, padding='same'),
            tf.keras.layers.MaxPooling2D(2, padding='same'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim),
            
        ]
    )
    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(shape=(latent_dim,)),
            tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
            tf.keras.layers.Reshape((7, 7, 32)),
            tf.keras.layers.Conv2DTranspose(64, 3, 2, activation=tf.nn.relu, padding='same'),
            tf.keras.layers.Conv2DTranspose(32, 3, 2, activation=tf.nn.relu, padding='same'),
            tf.keras.layers.Conv2D(1, (3, 3), activation=tf.nn.sigmoid, padding='same'),
            tf.keras.layers.Reshape(target_shape=(784,))
        ]
    )

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded
  
  def create_conv_ae_as_sequence(latent_dim):
    model =  ConvAutoencoder(latent_dim)
    return tf.keras.Sequential([model.encoder, model.decoder])


#### MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.MNIST.value, param_grid, create_conv_ae_as_sequence)

In [None]:
mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.MNIST.value, ConvAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Convolutional Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Convolutional Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, mnist_encoder_model, head_model)

#### Fashion MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.F_MNIST.value, param_grid, create_conv_ae_as_sequence)

In [None]:
f_mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.F_MNIST.value, ConvAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Convolutional Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, f_mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Convolutional Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, f_mnist_encoder_model, head_model)

#### Kuzushiji MNIST

In [None]:
evaluate_autoencoder(utils.Dataset_Select.K_MNIST.value, param_grid, create_conv_ae_as_sequence)

In [None]:
k_mnist_encoder_model, latent_dim = find_best_encoder(utils.Dataset_Select.K_MNIST.value, ConvAutoencoder, latent_dims)

In [None]:
head_model = LogisticRegression(max_iter=1000, solver="newton-cg")
print("\n--- Convolutional Autoencoder with Logistic Regression ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, k_mnist_encoder_model, head_model)

head_model = RandomForestClassifier(n_estimators=250, random_state=seed)
print("\n\n--- Convolutional Autoencoder with Random Forest ---\n")
classify_with_encoder(utils.Dataset_Select.F_MNIST.value, k_mnist_encoder_model, head_model)

Latent 256:

==== LogisticRegression (ConvAE) ====
              precision    recall  f1-score   support

           0       0.84      0.85      0.84      1000
           1       0.99      0.98      0.98      1000
           2       0.80      0.83      0.82      1000
           3       0.88      0.91      0.90      1000
           4       0.80      0.81      0.81      1000
           5       0.97      0.97      0.97      1000
           6       0.72      0.66      0.69      1000
           7       0.95      0.97      0.96      1000
           8       0.97      0.97      0.97      1000
           9       0.97      0.96      0.97      1000

    accuracy                           0.89     10000
   macro avg       0.89      0.89      0.89     10000
weighted avg       0.89      0.89      0.89     10000

Accuracy (ConvAE + LogisticRegression): 0.8900

Latent 10:

==== LogisticRegression (ConvAE) ====
              precision    recall  f1-score   support

           0       0.74      0.78      0.76      1000
           1       0.96      0.94      0.95      1000
           2       0.64      0.60      0.62      1000
           3       0.76      0.84      0.80      1000
           4       0.61      0.68      0.64      1000
           5       0.91      0.89      0.90      1000
           6       0.45      0.34      0.39      1000
           7       0.87      0.89      0.88      1000
           8       0.92      0.94      0.93      1000
           9       0.91      0.92      0.92      1000

    accuracy                           0.78     10000
   macro avg       0.78      0.78      0.78     10000
weighted avg       0.78      0.78      0.78     10000

Accuracy (ConvAE + LogisticRegression): 0.7834

Dense Latent 512:

```
==== LogisticRegression (ConvAE) ====
              precision    recall  f1-score   support

           0       0.83      0.85      0.84      1000
           1       0.99      0.98      0.98      1000
           2       0.83      0.85      0.84      1000
           3       0.89      0.91      0.90      1000
           4       0.82      0.83      0.83      1000
           5       0.97      0.97      0.97      1000
           6       0.74      0.68      0.71      1000
           7       0.95      0.97      0.96      1000
           8       0.97      0.98      0.98      1000
           9       0.98      0.96      0.97      1000

    accuracy                           0.90     10000
   macro avg       0.90      0.90      0.90     10000
weighted avg       0.90      0.90      0.90     10000

Accuracy (ConvAE + LogisticRegression): 0.8979
```


No mod:

```
==== LogisticRegression (ConvAE) ====
              precision    recall  f1-score   support

           0       0.84      0.87      0.85      1000
           1       0.99      0.98      0.99      1000
           2       0.85      0.85      0.85      1000
           3       0.90      0.91      0.91      1000
           4       0.84      0.86      0.85      1000
           5       0.98      0.98      0.98      1000
           6       0.77      0.72      0.74      1000
           7       0.96      0.98      0.97      1000
           8       0.98      0.97      0.98      1000
           9       0.98      0.97      0.97      1000

    accuracy                           0.91     10000
   macro avg       0.91      0.91      0.91     10000
weighted avg       0.91      0.91      0.91     10000
```

In [22]:
conv_ae = ConvAutoencoder(256)

optimizer = tf.keras.optimizers.Adam(1e-3)
conv_ae.compile(optimizer=optimizer, loss=keras.losses.BinaryCrossentropy())

X_train, X_val, X_test, y_train, y_val, y_test = utils.get_dataset_for_ae(utils.Dataset_Select.F_MNIST.value, with_val=True)

In [23]:
conv_ae.fit(X_train, X_train,
                epochs=40,
                shuffle=True,
                batch_size=64,
                validation_data=(X_val, X_val))

Epoch 1/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 38ms/step - loss: 0.3652 - val_loss: 0.2757
Epoch 2/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 39ms/step - loss: 0.2726 - val_loss: 0.2672
Epoch 3/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 38ms/step - loss: 0.2653 - val_loss: 0.2635
Epoch 4/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 37ms/step - loss: 0.2625 - val_loss: 0.2612
Epoch 5/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 37ms/step - loss: 0.2604 - val_loss: 0.2598
Epoch 6/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 37ms/step - loss: 0.2595 - val_loss: 0.2588
Epoch 7/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 37ms/step - loss: 0.2574 - val_loss: 0.2583
Epoch 8/40
[1m804/804[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 37ms/step - loss: 0.2572 - val_loss: 0.2571
Epoch 9/40
[1m804/804[

<keras.src.callbacks.history.History at 0x133d22ac0>

In [24]:
from sklearn.base import BaseEstimator, TransformerMixin

class FrozenEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, encoder):
        self.encoder = encoder

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return self.encoder.predict(X)


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report

frozen_conv_ae = FrozenEncoder(encoder=conv_ae.encoder)

X_train, X_test, y_train, y_test = utils.get_dataset_for_ae(dataset_name=utils.Dataset_Select.F_MNIST.value, with_val=False)

pipeline_log_reg = Pipeline([
    ('frozen_conv_ae', frozen_conv_ae),
    ('log_reg', LogisticRegression(max_iter=5000, solver="newton-cg"))
])

pipeline_log_reg.fit(X_train, y_train)

y_pred = pipeline_log_reg.predict(X_test)

print(classification_report(y_test, y_pred))

[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 4ms/step
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step
              precision    recall  f1-score   support

           0       0.84      0.86      0.85      1000
           1       0.99      0.97      0.98      1000
           2       0.82      0.83      0.82      1000
           3       0.88      0.91      0.90      1000
           4       0.80      0.82      0.81      1000
           5       0.97      0.97      0.97      1000
           6       0.70      0.64      0.67      1000
           7       0.94      0.96      0.95      1000
           8       0.97      0.97      0.97      1000
           9       0.97      0.95      0.96      1000

    accuracy                           0.89     10000
   macro avg       0.89      0.89      0.89     10000
weighted avg       0.89      0.89      0.89     10000



3(256)
```
              precision    recall  f1-score   support

           0       0.83      0.83      0.83      1000
           1       0.98      0.98      0.98      1000
           2       0.81      0.82      0.82      1000
           3       0.88      0.91      0.89      1000
           4       0.80      0.82      0.81      1000
           5       0.97      0.97      0.97      1000
           6       0.69      0.64      0.67      1000
           7       0.94      0.97      0.95      1000
           8       0.97      0.97      0.97      1000
           9       0.97      0.96      0.97      1000

    accuracy                           0.89     10000
   macro avg       0.89      0.89      0.89     10000
weighted avg       0.89      0.89      0.89     10000
```
1(256)
```
              precision    recall  f1-score   support

           0       0.82      0.82      0.82      1000
           1       0.98      0.96      0.97      1000
           2       0.75      0.77      0.76      1000
           3       0.85      0.89      0.87      1000
           4       0.76      0.77      0.76      1000
           5       0.96      0.96      0.96      1000
           6       0.65      0.59      0.62      1000
           7       0.93      0.95      0.94      1000
           8       0.96      0.96      0.96      1000
           9       0.96      0.96      0.96      1000

    accuracy                           0.86     10000
   macro avg       0.86      0.86      0.86     10000
weighted avg       0.86      0.86      0.86     10000
```

In [271]:
import keras

from scikeras.wrappers import KerasRegressor
from sklearn.model_selection import GridSearchCV

def evaluate_conv_autoencoder(dataset_name, params, create_plain_ae_fun, create_ae_from_params_fun):

    X_train, X_test, y_train, y_test = utils.get_dataset_for_ae(dataset_name=dataset_name, with_val=False)

    keras_reg = KerasRegressor(
        model=create_plain_ae_fun,
        loss="binary_crossentropy",
        optimizer="adam",
        optimizer__learning_rate=0.001,
        model__latent_dim=10,
        verbose=False,
    )

    grid_search = GridSearchCV(keras_reg, params, refit=False, cv=7, n_jobs=-1)
    grid_search.fit(X_train, X_train)

    print(grid_search.best_score_, grid_search.best_params_)

    X_train_s, X_val_s, _, _, _, _ = utils.get_dataset_for_ae(dataset_name=dataset_name, with_val=True)

    ae = create_ae_from_params_fun(grid_search.best_params_)
    ae.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=grid_search.best_params_["optimizer__learning_rate"]), loss=keras.losses.BinaryCrossentropy)
    ae.fit(X_train_s, X_train_s,
                epochs=grid_search.best_params_["epochs"],
                shuffle=True,
                validation_data=(X_val_s, X_val_s))
    
    return ae

In [None]:
import warnings

warnings.filterwarnings("ignore", category=UserWarning, message=".*urllib3.*LibreSSL.*")

param_grid = {
    'model__latent_dim': [10, 64, 128, 192],
    'optimizer__learning_rate': [0.0001, 0.005, 0.001],
    'epochs': [40]
}

evaluated_conv_ae = evaluate_conv_autoencoder(utils.Dataset_Select.F_MNIST.value, param_grid, create_conv_ae_as_sequence, create_conv_ae_from_params)

2025-01-03 07:21:01.541260: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: INVALID_ARGUMENT: Incompatible shapes: [0] vs. [6272,384]
	 [[{{function_node __inference_one_step_on_data_202248}}{{node adam/truediv_13}}]]


## VAE

Nieudane próby

In [None]:
class CVAE(tf.keras.Model):
  """Convolutional variational autoencoder."""

  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(shape=(784)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(
                filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
            tf.keras.layers.Conv2D(
                filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim + latent_dim),
        ]
    )

    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(shape=(latent_dim,)),
            tf.keras.layers.Dense(units=7*7*64, activation=tf.nn.relu),
            tf.keras.layers.Reshape(target_shape=(7, 7, 64)),
            tf.keras.layers.Conv2DTranspose(
                filters=64, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            tf.keras.layers.Conv2DTranspose(
                filters=32, kernel_size=3, strides=2, padding='same',
                activation='relu'),
            # No activation
            tf.keras.layers.Conv2DTranspose(
                filters=1, kernel_size=3, strides=1, padding='same'),
            tf.keras.layers.Reshape(target_shape=(784)),
        ]
    )

In [None]:
from tensorflow.keras import layers

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class VAE(tf.keras.Model):
    def __init__(self, latent_dim):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = self.prepare_encoder()
        self.decoder = self.prepare_decoder()

    def prepare_encoder(self):
        input_img = tf.keras.layers.Input(shape=(784,))
        x = tf.keras.layers.Reshape(target_shape=(28, 28, 1))(input_img)
        x = tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=(2, 2), activation='relu')(x)
        x = tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=(2, 2), activation='relu')(x)
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.Dense(self.latent_dim + self.latent_dim)(x)
        z_mean = layers.Dense(self.latent_dim, name="z_mean")(x)
        z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(x)
        z = Sampling()([z_mean, z_log_var])
        return  tf.keras.Model([input_img], [z_mean, z_log_var, z], name="encoder")

    def prepare_decoder(self):
        input = tf.keras.layers.Input(shape=(self.latent_dim,))
        x = tf.keras.layers.Dense(units=7*7*64, activation=tf.nn.relu)(input)
        x = tf.keras.layers.Reshape(target_shape=(7, 7, 64))(x)
        x = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=3, strides=2, padding='same',activation='relu')(x)
        x = tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, padding='same',activation='relu')(x)
        x = tf.keras.layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same', activation=tf.nn.sigmoid)(x) 
        x = tf.keras.layers.Reshape(target_shape=(784,))(x)
        return  tf.keras.Model([input], [x], name="decoder")
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        self.add_loss(self.kl_loss(z_mean, z_log_var))
        return reconstructed

    def kl_loss(self, z_mean, z_log_var):
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        return tf.reduce_mean(kl_loss)

In [None]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class VAE(tf.keras.Model):
    def __init__(self, latent_dim):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.sampling_layer = Sampling()
        self.encoder = tf.keras.Sequential(
            [
                tf.keras.layers.Input(shape=(784,)),
                tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
                tf.keras.layers.Conv2D(
                    filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(latent_dim + latent_dim),
            ]
        )

        self.decoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(shape=(latent_dim,)),
                tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
                tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
                tf.keras.layers.Conv2DTranspose(
                    filters=64, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=32, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                # No activation
                tf.keras.layers.Conv2DTranspose(
                    filters=1, kernel_size=3, strides=1, padding='same'),
                tf.keras.layers.Reshape(target_shape=(784,))
            ]
        )

    def call(self, inputs):
        z_mean, z_log_var = tf.split(self.encoder(inputs), num_or_size_splits=2, axis=1)
        z = self.sampling_layer([z_mean, z_log_var])
        reconstructed = self.decoder(z)
        self.add_loss(self.kl_loss(z_mean, z_log_var))
        return reconstructed


    def kl_loss(self, z_mean, z_log_var):
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        return tf.reduce_mean(kl_loss)

In [None]:
vae = VAE(64)

optimizer = tf.keras.optimizers.Adam(1e-3)
vae.compile(optimizer=optimizer, loss=keras.losses.BinaryCrossentropy())

X_train, X_val, X_test, y_train, y_val, y_test = utils.get_dataset_for_ae(utils.Dataset_Select.MNIST.value, with_val=True)

In [None]:
# X_train = X_train.reshape(-1, 784)
# X_val = X_val.reshape(-1, 784)

vae.fit(X_train, X_train,
                epochs=3,
                shuffle=True,
                validation_data=(X_val, X_val))

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class FrozenVEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, encoder):
        self.encoder = encoder

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        z_mean, z_log_var = tf.split(self.encoder.predict(X), num_or_size_splits=2, axis=1)
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

frozen_vencoder = FrozenEncoder(encoder=vae.encoder)

X_train, X_test, y_train, y_test = utils.get_dataset_for_ae(dataset_name=utils.Dataset_Select.MNIST.value, with_val=False)

pipeline_log_reg = Pipeline([
    ('frozen_vencoder', frozen_vencoder),
    ('log_reg', LogisticRegression(max_iter=2000, solver="newton-cg"))
])

pipeline_log_reg.fit(X_train, y_train)

y_pred = pipeline_log_reg.predict(X_test)

print(classification_report(y_test, y_pred))