In [None]:
"""
task3_autoencoders.py

Task 3 only: design & train autoencoders on CIFAR-10 (grayscale).
- Computes K using PCA (95% explained variance) on the 70% training subset.
- Trains:
    1) Single hidden-layer dense AE (sigmoid encoder, linear decoder) latent_dim=K
    2) Deep convolutional AE with bottleneck size = K
    3) 3-hidden-layer dense AE where total encoder hidden nodes sum approx K (distributed equally)
- Reports reconstruction MSE on test set and saves models.

Requirements:
    - Python 3.8+
    - numpy, scikit-learn, matplotlib
    - tensorflow (>=2.4)  (Keras included)
Run:
    python task3_autoencoders.py
"""

import os
import math
import numpy as np
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# --- try imports for TF/Keras ---
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
except Exception as e:
    raise RuntimeError("TensorFlow (tensorflow) is required to run this script. "
                       "Please run on the course VM or install tensorflow. Error: " + str(e))

# ----------------------------
# Config / hyperparameters
# ----------------------------
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

BATCH_SIZE = 256
EPOCHS_SINGLE = 25       # single-layer dense AE
EPOCHS_CONV = 20         # conv AE
EPOCHS_3LAYER = 30       # 3-layer dense AE

MODEL_SAVE_DIR = "./models"
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)

# ----------------------------
# Helper functions
# ----------------------------
def rgb2gray(images):
    # images in [0,1], shape (N,H,W,3)
    r, g, b = images[..., 0], images[..., 1], images[..., 2]
    gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
    return gray[..., np.newaxis]

def recon_mse(orig, recon):
    # mean MSE per image averaged
    orig_flat = orig.reshape(orig.shape[0], -1)
    recon_flat = recon.reshape(recon.shape[0], -1)
    return np.mean(np.mean((orig_flat - recon_flat) ** 2, axis=1))

# ----------------------------
# Load CIFAR-10, convert to grayscale, split 70/30
# ----------------------------
(x_train0, y_train0), (x_test0, y_test0) = keras.datasets.cifar10.load_data()
x_all = np.vstack([x_train0, x_test0]).astype("float32") / 255.0
y_all = np.vstack([y_train0, y_test0]).squeeze()

x_gray = rgb2gray(x_all)  # shape (60000, 32, 32, 1)

X_train, X_test, y_train, y_test = train_test_split(
    x_gray, y_all, train_size=0.7, stratify=y_all, random_state=RANDOM_SEED
)
print("Train shape:", X_train.shape, "Test shape:", X_test.shape)

H, W = X_train.shape[1], X_train.shape[2]
input_dim = H * W

# Flattened standardized arrays for PCA & dense AEs
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)

# Standardize (zero mean, unit var) using training statistics
mean_train = X_train_flat.mean(axis=0)
std_train = X_train_flat.std(axis=0) + 1e-9
X_train_std = (X_train_flat - mean_train) / std_train
X_test_std = (X_test_flat - mean_train) / std_train

# ----------------------------
# PCA to find K for 95% explained variance
# ----------------------------
pca = PCA(n_components=0.95, svd_solver='full', random_state=RANDOM_SEED)
pca.fit(X_train_std)
K = pca.n_components_
print(f"PCA-picked latent dimension to retain 95% energy: K = {K}")

latent_dim = int(K)

# ----------------------------
# Model 1: Single hidden-layer dense AE (sigmoid encoder, linear decoder)
# ----------------------------
tf.keras.backend.clear_session()
inp = keras.Input(shape=(input_dim,), name="flat_input")
encoded = layers.Dense(latent_dim, activation='sigmoid', name="encoder_dense")(inp)
decoded = layers.Dense(input_dim, activation='linear', name="decoder_dense")(encoded)
ae_single = keras.Model(inp, decoded, name="ae_single")
ae_single.compile(optimizer='adam', loss='mse')
ae_single.summary()

# Train on standardized flattened data
history_single = ae_single.fit(
    X_train_std, X_train_std,
    epochs=EPOCHS_SINGLE, batch_size=BATCH_SIZE,
    validation_data=(X_test_std, X_test_std),
    verbose=2
)

# Reconstruct and invert standardization
recon_test_std = ae_single.predict(X_test_std)
recon_test_flat = recon_test_std * std_train + mean_train
recon_single = recon_test_flat.reshape(-1, H, W, 1)
mse_single = recon_mse(X_test, recon_single)
print("Single-layer AE test MSE:", mse_single)

# ----------------------------
# Model 2: Deep convolutional autoencoder with bottleneck of size K
# ----------------------------
tf.keras.backend.clear_session()
input_img = keras.Input(shape=(H, W, 1), name="img_input")
x_enc = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(input_img)  # 16x16x32
x_enc = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x_enc)      # 8x8x64
x_enc = layers.Conv2D(128, 3, strides=2, padding='same', activation='relu')(x_enc)     # 4x4x128
shape_before_flat = tf.keras.backend.int_shape(x_enc)[1:]  # tuple (4,4,128)
x_flat = layers.Flatten()(x_enc)
bottleneck = layers.Dense(latent_dim, name="bottleneck")(x_flat)  # linear bottleneck
# decoder
x_dec = layers.Dense(np.prod(shape_before_flat), activation='relu')(bottleneck)
x_dec = layers.Reshape(shape_before_flat)(x_dec)
x_dec = layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu')(x_dec)  # 8x8
x_dec = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(x_dec)   # 16x16
x_dec = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(x_dec)   # 32x32
decoded_img = layers.Conv2D(1, 3, padding='same', activation='linear')(x_dec)

ae_conv = keras.Model(input_img, decoded_img, name="ae_conv")
ae_conv.compile(optimizer=keras.optimizers.Adam(1e-3), loss='mse')
ae_conv.summary()

history_conv = ae_conv.fit(
    X_train, X_train,
    epochs=EPOCHS_CONV, batch_size=BATCH_SIZE,
    validation_data=(X_test, X_test),
    verbose=2
)

recon_conv = ae_conv.predict(X_test)
mse_conv = recon_mse(X_test, recon_conv)
print("Convolutional AE test MSE:", mse_conv)

# ----------------------------
# Model 3: 3-hidden-layer dense AE where total hidden nodes ≈ K and are split equally
# Interpretation: sum of encoder hidden nodes ≈ K and the last encoder layer acts as bottleneck.
# ----------------------------
tf.keras.backend.clear_session()
nodes_each = int(math.ceil(latent_dim / 3.0))
sizes = [nodes_each, nodes_each, max(1, latent_dim - 2 * nodes_each)]
print("3-layer encoder sizes (sum approx K):", sizes, "-> bottleneck size:", sizes[-1])

inp3 = keras.Input(shape=(input_dim,), name="flat_input_3")
h1 = layers.Dense(sizes[0], activation='sigmoid')(inp3)
h2 = layers.Dense(sizes[1], activation='sigmoid')(h1)
bott3 = layers.Dense(sizes[2], activation='sigmoid', name='bottleneck3')(h2)
# decoder (mirror)
d1 = layers.Dense(sizes[1], activation='sigmoid')(bott3)
d2 = layers.Dense(sizes[0], activation='sigmoid')(d1)
out3 = layers.Dense(input_dim, activation='linear')(d2)

ae_3layer = keras.Model(inp3, out3, name="ae_3layer")
ae_3layer.compile(optimizer='adam', loss='mse')
ae_3layer.summary()

history_3 = ae_3layer.fit(
    X_train_std, X_train_std,
    epochs=EPOCHS_3LAYER, batch_size=BATCH_SIZE,
    validation_data=(X_test_std, X_test_std),
    verbose=2
)

recon_3_std = ae_3layer.predict(X_test_std)
recon_3_flat = recon_3_std * std_train + mean_train
recon_3 = recon_3_flat.reshape(-1, H, W, 1)
mse_3layer = recon_mse(X_test, recon_3)
print("3-hidden-layer AE test MSE:", mse_3layer)

# ----------------------------
# Visualize a few reconstructions
# ----------------------------
n_show = 6
import matplotlib.pyplot as plt
fig, axs = plt.subplots(4, n_show, figsize=(n_show * 2, 8))
for i in range(n_show):
    axs[0, i].imshow(X_test[i].squeeze(), cmap='gray')
    axs[0, i].axis('off'); axs[0, i].set_title("orig")
    axs[1, i].imshow(recon_single[i].squeeze(), cmap='gray')
    axs[1, i].axis('off'); axs[1, i].set_title("single AE")
    axs[2, i].imshow(recon_conv[i].squeeze(), cmap='gray')
    axs[2, i].axis('off'); axs[2, i].set_title("conv AE")
    axs[3, i].imshow(recon_3[i].squeeze(), cmap='gray')
    axs[3, i].axis('off'); axs[3, i].set_title("3-layer AE")
plt.suptitle("Reconstructions (rows: orig, single dense AE, conv AE, 3-layer AE)")
plt.tight_layout()
plt.show()

# ----------------------------
# Results summary and save models
# ----------------------------
results = {
    "PCA_95_K": latent_dim,
    "MSE_single_dense_AE": float(mse_single),
    "MSE_conv_AE": float(mse_conv),
    "MSE_3layer_AE": float(mse_3layer)
}
print("Results summary:", results)

# Save models to MODEL_SAVE_DIR
ae_single.save(os.path.join(MODEL_SAVE_DIR, "ae_single.h5"))
ae_conv.save(os.path.join(MODEL_SAVE_DIR, "ae_conv.h5"))
ae_3layer.save(os.path.join(MODEL_SAVE_DIR, "ae_3layer.h5"))

# Also save training histories (optional)
import json
def history_to_dict(h):
    return {k: [float(v) for v in vals] for k, vals in h.history.items()}

with open(os.path.join(MODEL_SAVE_DIR, "hist_single.json"), "w") as f:
    json.dump(history_to_dict(history_single), f)
with open(os.path.join(MODEL_SAVE_DIR, "hist_conv.json"), "w") as f:
    json.dump(history_to_dict(history_conv), f)
with open(os.path.join(MODEL_SAVE_DIR, "hist_3layer.json"), "w") as f:
    json.dump(history_to_dict(history_3), f)

print(f"Models & histories saved to {MODEL_SAVE_DIR}")


Train shape: (42000, 32, 32, 1) Test shape: (18000, 32, 32, 1)
PCA-picked latent dimension to retain 95% energy: K = 163



Epoch 1/25
165/165 - 2s - 12ms/step - loss: 0.4357 - val_loss: 0.2774
Epoch 2/25
165/165 - 1s - 8ms/step - loss: 0.2314 - val_loss: 0.2057
Epoch 3/25
165/165 - 1s - 7ms/step - loss: 0.1795 - val_loss: 0.1672
Epoch 4/25
165/165 - 1s - 6ms/step - loss: 0.1516 - val_loss: 0.1451
Epoch 5/25
165/165 - 1s - 7ms/step - loss: 0.1332 - val_loss: 0.1300
Epoch 6/25
165/165 - 1s - 7ms/step - loss: 0.1201 - val_loss: 0.1193
Epoch 7/25
165/165 - 1s - 7ms/step - loss: 0.1101 - val_loss: 0.1097
Epoch 8/25
165/165 - 1s - 8ms/step - loss: 0.1023 - val_loss: 0.1026
Epoch 9/25
165/165 - 1s - 9ms/step - loss: 0.0962 - val_loss: 0.0970
Epoch 10/25
165/165 - 1s - 8ms/step - loss: 0.0911 - val_loss: 0.0923
Epoch 11/25
165/165 - 1s - 7ms/step - loss: 0.0870 - val_loss: 0.0884
Epoch 12/25
165/165 - 1s - 9ms/step - loss: 0.0835 - val_loss: 0.0852
Epoch 13/25
165/165 - 1s - 7ms/step - loss: 0.0806 - val_loss: 0.0824
Epoch 14/25
165/165 - 1s - 7ms/step - loss: 0.0782 - val_loss: 0.0801
Epoch 15/25
165/165 - 1s - 8

Epoch 1/20
165/165 - 33s - 201ms/step - loss: 0.0457 - val_loss: 0.0169
Epoch 2/20
165/165 - 31s - 186ms/step - loss: 0.0137 - val_loss: 0.0110
Epoch 3/20
165/165 - 31s - 187ms/step - loss: 0.0100 - val_loss: 0.0089
Epoch 4/20
165/165 - 30s - 182ms/step - loss: 0.0083 - val_loss: 0.0078
Epoch 5/20
165/165 - 30s - 181ms/step - loss: 0.0073 - val_loss: 0.0076
Epoch 6/20
165/165 - 41s - 246ms/step - loss: 0.0067 - val_loss: 0.0063
Epoch 7/20
165/165 - 30s - 181ms/step - loss: 0.0059 - val_loss: 0.0058
Epoch 8/20
165/165 - 43s - 260ms/step - loss: 0.0056 - val_loss: 0.0054
Epoch 9/20
165/165 - 32s - 192ms/step - loss: 0.0051 - val_loss: 0.0051
Epoch 10/20
