In [None]:
import numpy as np
import os
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from IPython.display import display, Image


In [None]:
class ReLU:
    """ReLU Activation"""
    def forward(self, x):
        self.out = np.maximum(0, x)
        return self.out

    def backward(self, grad_output):
        grad_input = grad_output * (self.out > 0)
        return grad_input

class Tanh:
    """Tanh Activation"""
    def forward(self, x):
        self.out = np.tanh(x)
        return self.out

    def backward(self, grad_output):
        grad_input = grad_output * (1 - self.out**2)
        return grad_input

class Sigmoid:
    """Sigmoid Activation"""
    def forward(self, x):
        self.out = 1 / (1 + np.exp(-x))
        return self.out

    def backward(self, grad_output):
        grad_input = grad_output * self.out * (1 - self.out)
        return grad_input

class Identity:
    def __init__(self):
        pass

    def forward(self, x):
        self.input = x
        return x

    def backward(self, grad_output):
        return grad_output

    def update(self, lr):
        pass

    def zero_grad(self):
        pass  # nothing to reset, but must exist for consistency

    def __repr__(self):
        return "Identity()"


In [None]:
class Linear:
    """Fully Connected Layer"""
    def __init__(self, in_features, out_features, activation):
        self.in_features = in_features
        self.out_features = out_features
        self.activation = activation

        # Initialize weights and biases
        self.W = np.random.randn(in_features, out_features) * np.sqrt(2.0 / in_features)
        self.b = np.zeros((1, out_features))

        # Cumulative gradients
        self.dW_cum = np.zeros_like(self.W)
        self.db_cum = np.zeros_like(self.b)

    def forward(self, x):
        self.input = x  # Save for backward
        self.linear_out = x @ self.W + self.b
        self.out = self.activation.forward(self.linear_out)
        return self.out

    def backward(self, grad_output):
        # Gradient w.r.t activation
        grad_activation = self.activation.backward(grad_output)
        # Gradients w.r.t weights and biases
        self.dW_cum += self.input.T @ grad_activation
        self.db_cum += np.sum(grad_activation, axis=0, keepdims=True)
        # Gradient w.r.t input for previous layer
        grad_input = grad_activation @ self.W.T
        return grad_input

    def zero_grad(self):
        self.dW_cum.fill(0)
        self.db_cum.fill(0)

    def update(self, lr=0.01):
        self.W -= lr * self.dW_cum
        self.b -= lr * self.db_cum
        self.zero_grad()


In [None]:
class Model:
    """Neural Network Model"""
    def __init__(self, layers, loss_type="MSE"):
        self.layers = layers
        self.loss_type = loss_type

    def forward(self, x):
        out = x
        for layer in self.layers:
            out = layer.forward(out)
        return out

    def backward(self, grad):
        for layer in reversed(self.layers):
            grad = layer.backward(grad)

    def train(self, x, y):
        """Forward + backward pass, returns scalar loss"""
        y_pred = self.forward(x)
        # Compute loss
        if self.loss_type == "MSE":
            loss = np.mean((y_pred - y) ** 2)
            grad_loss = 2 * (y_pred - y) / y.shape[0]
        elif self.loss_type == "BCE":
            eps = 1e-9
            y_pred = np.clip(y_pred, eps, 1 - eps)
            loss = -np.mean(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred))
            grad_loss = (y_pred - y) / (y_pred * (1 - y_pred)) / y.shape[0]
        else:
            raise ValueError("Unknown loss type")

        self.backward(grad_loss)
        return float(loss)

    def zero_grad(self):
        for layer in self.layers:
            layer.zero_grad()

    def update(self, lr=0.01):
        for layer in self.layers:
            layer.update(lr=lr)

    def predict(self, x):
        return self.forward(x)

    def save_to(self, path):
        data = {}
        for idx, layer in enumerate(self.layers):
            # only save layers that have weights
            if hasattr(layer, "W") and hasattr(layer, "b"):
                data[f"W_{idx}"] = layer.W
                data[f"b_{idx}"] = layer.b
        np.savez(path, **data)

    def load_from(self, path):
        loaded = np.load(path)
        for idx, layer in enumerate(self.layers):
            w_key = f"W_{idx}"
            b_key = f"b_{idx}"
            if w_key not in loaded or b_key not in loaded:
                raise ValueError("Architecture mismatch!")
            if layer.W.shape != loaded[w_key].shape or layer.b.shape != loaded[b_key].shape:
                raise ValueError("Shape mismatch!")
            layer.W = loaded[w_key]
            layer.b = loaded[b_key]


# 3.1

In [None]:
# Cell 1 - Import Libraries and Load MNIST Dataset
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
from datetime import datetime

# Set random seed for reproducibility
np.random.seed(42)

print("Loading MNIST dataset...")
# Load MNIST dataset
mnist = fetch_openml('mnist_784', version=1, parser='auto')
X = mnist.data.values if hasattr(mnist.data, 'values') else mnist.data
y = mnist.target.values if hasattr(mnist.target, 'values') else mnist.target

# Normalize pixel values to [0, 1]
X = X.astype(np.float32) / 255.0

# Convert labels to integers
y = y.astype(int)

print(f"Dataset loaded successfully!")
print(f"Total samples: {X.shape[0]}")
print(f"Feature dimension: {X.shape[1]}")
print(f"Image shape: 28x28")
print(f"Number of classes: {len(np.unique(y))}")

# Split into train and test sets (MNIST already has standard split)
# First 60000 samples are training, rest are test
X_train = X[:60000]
y_train = y[:60000]
X_test = X[60000:]
y_test = y[60000:]

print(f"\nTraining set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")

In [None]:
# Cell 2 - Visualize Sample MNIST Images
def visualize_mnist_samples(X, y, n_samples=10, title="MNIST Sample Images"):
    """
    Visualize random samples from MNIST dataset.
    
    Parameters:
    -----------
    X : np.ndarray
        Image data (N, 784)
    y : np.ndarray
        Labels (N,)
    n_samples : int
        Number of samples to display
    title : str
        Plot title
    """
    fig, axes = plt.subplots(2, 5, figsize=(12, 6))
    axes = axes.flatten()
    
    # Randomly select samples
    indices = np.random.choice(len(X), n_samples, replace=False)
    
    for idx, ax in enumerate(axes):
        img = X[indices[idx]].reshape(28, 28)
        ax.imshow(img, cmap='gray')
        ax.set_title(f'Label: {y[indices[idx]]}', fontsize=11)
        ax.axis('off')
    
    plt.suptitle(title, fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

# Visualize training samples
visualize_mnist_samples(X_train, y_train, n_samples=10, 
                        title="Random Training Samples from MNIST")

In [None]:
# Cell 3 - MLPAutoencoder Class Implementation
class MLPAutoencoder:
    """
    Multi-Layer Perceptron Autoencoder for image reconstruction.
    
    Uses encoder to compress input to latent representation,
    and decoder to reconstruct input from latent representation.
    """
    
    def __init__(self, input_dim=784, hidden_dims=[256, 128], latent_dim=64):
        """
        Initialize MLPAutoencoder.
        
        Parameters:
        -----------
        input_dim : int
            Dimension of input (e.g., 784 for 28x28 images)
        hidden_dims : list of int
            Hidden layer dimensions for encoder
        latent_dim : int
            Dimension of latent bottleneck representation
        """
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.latent_dim = latent_dim
        
        # Build encoder layers
        self.encoder_layers = []
        prev_dim = input_dim
        
        # Hidden layers in encoder
        for hidden_dim in hidden_dims:
            self.encoder_layers.append(Linear(prev_dim, hidden_dim, ReLU()))
            prev_dim = hidden_dim
        
        # Bottleneck layer (encoder output)
        self.encoder_layers.append(Linear(prev_dim, latent_dim, ReLU()))
        
        # Build decoder layers (mirror of encoder)
        self.decoder_layers = []
        prev_dim = latent_dim
        
        # Hidden layers in decoder (reverse order)
        for hidden_dim in reversed(hidden_dims):
            self.decoder_layers.append(Linear(prev_dim, hidden_dim, ReLU()))
            prev_dim = hidden_dim
        
        # Output layer (decoder output) - use Sigmoid to constrain to [0, 1]
        self.decoder_layers.append(Linear(prev_dim, input_dim, Sigmoid()))
        
        # Combine all layers
        self.all_layers = self.encoder_layers + self.decoder_layers
        
    def encode(self, x):
        """
        Encode input to latent representation.
        
        Parameters:
        -----------
        x : np.ndarray
            Input data (N, input_dim)
            
        Returns:
        --------
        np.ndarray
            Latent representation (N, latent_dim)
        """
        out = x
        for layer in self.encoder_layers:
            out = layer.forward(out)
        return out
    
    def decode(self, z):
        """
        Decode latent representation to reconstructed input.
        
        Parameters:
        -----------
        z : np.ndarray
            Latent representation (N, latent_dim)
            
        Returns:
        --------
        np.ndarray
            Reconstructed input (N, input_dim)
        """
        out = z
        for layer in self.decoder_layers:
            out = layer.forward(out)
        return out
    
    def forward(self, x):
        """
        Full forward pass: encode then decode.
        
        Parameters:
        -----------
        x : np.ndarray
            Input data (N, input_dim)
            
        Returns:
        --------
        np.ndarray
            Reconstructed input (N, input_dim)
        """
        # Encode
        latent = self.encode(x)
        # Decode
        reconstructed = self.decode(latent)
        return reconstructed
    
    def backward(self, grad_output):
        """
        Backward pass through entire autoencoder.
        
        Parameters:
        -----------
        grad_output : np.ndarray
            Gradient of loss w.r.t. output
        """
        grad = grad_output
        for layer in reversed(self.all_layers):
            grad = layer.backward(grad)
    
    def zero_grad(self):
        """Reset all gradients to zero."""
        for layer in self.all_layers:
            layer.zero_grad()
    
    def update(self, lr=0.01):
        """
        Update all parameters using accumulated gradients.
        
        Parameters:
        -----------
        lr : float
            Learning rate
        """
        for layer in self.all_layers:
            layer.update(lr=lr)
    
    def train_step(self, x):
        """
        Single training step: forward, compute loss, backward.
        
        Parameters:
        -----------
        x : np.ndarray
            Input batch (N, input_dim)
            
        Returns:
        --------
        float
            Reconstruction loss (MSE)
        """
        # Forward pass
        x_reconstructed = self.forward(x)
        
        # Compute MSE loss
        loss = np.mean((x_reconstructed - x) ** 2)
        
        # Compute gradient of loss w.r.t. output
        grad_loss = 2 * (x_reconstructed - x) / x.shape[0]
        
        # Backward pass
        self.backward(grad_loss)
        
        return float(loss)
    
    def get_architecture_summary(self):
        """Return string summary of autoencoder architecture."""
        summary = "=" * 70 + "\n"
        summary += "MLPAutoencoder Architecture\n"
        summary += "=" * 70 + "\n"
        summary += f"Input Dimension: {self.input_dim}\n"
        summary += f"Latent Dimension: {self.latent_dim}\n"
        summary += f"Hidden Dimensions: {self.hidden_dims}\n\n"
        
        summary += "Encoder:\n"
        summary += "-" * 70 + "\n"
        prev_dim = self.input_dim
        for i, dim in enumerate(self.hidden_dims):
            summary += f"  Layer {i+1}: Linear({prev_dim} → {dim}) + ReLU\n"
            prev_dim = dim
        summary += f"  Bottleneck: Linear({prev_dim} → {self.latent_dim}) + ReLU\n\n"
        
        summary += "Decoder:\n"
        summary += "-" * 70 + "\n"
        prev_dim = self.latent_dim
        for i, dim in enumerate(reversed(self.hidden_dims)):
            summary += f"  Layer {i+1}: Linear({prev_dim} → {dim}) + ReLU\n"
            prev_dim = dim
        summary += f"  Output: Linear({prev_dim} → {self.input_dim}) + Sigmoid\n"
        
        summary += "=" * 70
        return summary

# Create autoencoder instance
autoencoder = MLPAutoencoder(
    input_dim=784,
    hidden_dims=[256, 128],
    latent_dim=64
)

# Print architecture summary
print(autoencoder.get_architecture_summary())

In [None]:
# Cell 4 - Training Function with Visualization
def train_autoencoder(autoencoder, X_train, X_test, batch_size=128, num_epochs=20, 
                      lr=0.001, patience=5, rel_loss_thresh=0.01):
    """
    Train the autoencoder on MNIST dataset.
    
    Parameters:
    -----------
    autoencoder : MLPAutoencoder
        The autoencoder model to train
    X_train : np.ndarray
        Training data (N, 784)
    X_test : np.ndarray
        Test data (M, 784)
    batch_size : int
        Batch size for training
    num_epochs : int
        Maximum number of epochs
    lr : float
        Learning rate
    patience : int
        Early stopping patience
    rel_loss_thresh : float
        Relative improvement threshold for early stopping
        
    Returns:
    --------
    dict
        Dictionary containing training history and metadata
    """
    num_samples = X_train.shape[0]
    train_loss_history = []
    test_loss_history = []
    epoch_list = []
    
    best_loss = np.inf
    epochs_no_improve = 0
    
    print("=" * 80)
    print("TRAINING AUTOENCODER")
    print("=" * 80)
    print(f"Training samples: {num_samples}")
    print(f"Batch size: {batch_size}")
    print(f"Learning rate: {lr}")
    print(f"Max epochs: {num_epochs}")
    print("=" * 80 + "\n")
    
    for epoch in range(num_epochs):
        # Shuffle training data
        indices = np.random.permutation(num_samples)
        X_shuffled = X_train[indices]
        
        epoch_loss = 0.0
        num_batches = int(np.ceil(num_samples / batch_size))
        
        # Training loop
        for i in range(0, num_samples, batch_size):
            X_batch = X_shuffled[i:i+batch_size]
            
            # Zero gradients
            autoencoder.zero_grad()
            
            # Forward pass and compute loss
            batch_loss = autoencoder.train_step(X_batch)
            
            # Update parameters
            autoencoder.update(lr=lr)
            
            epoch_loss += batch_loss
        
        # Average training loss for epoch
        avg_train_loss = epoch_loss / num_batches
        train_loss_history.append(avg_train_loss)
        
        # Compute test loss
        test_loss = 0.0
        num_test_batches = int(np.ceil(X_test.shape[0] / batch_size))
        for i in range(0, X_test.shape[0], batch_size):
            X_test_batch = X_test[i:i+batch_size]
            X_test_reconstructed = autoencoder.forward(X_test_batch)
            test_loss += np.mean((X_test_reconstructed - X_test_batch) ** 2)
        
        avg_test_loss = test_loss / num_test_batches
        test_loss_history.append(avg_test_loss)
        
        epoch_list.append(epoch + 1)
        
        # Print progress
        print(f"Epoch {epoch+1}/{num_epochs} - "
              f"Train Loss: {avg_train_loss:.6f} - "
              f"Test Loss: {avg_test_loss:.6f}")
        
        # Early stopping check
        if avg_train_loss < best_loss * (1 - rel_loss_thresh):
            best_loss = avg_train_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve >= patience:
            print(f"\nEarly stopping triggered at epoch {epoch+1}")
            break
    
    print("\n" + "=" * 80)
    print("TRAINING COMPLETED")
    print("=" * 80)
    print(f"Final Train Loss: {train_loss_history[-1]:.6f}")
    print(f"Final Test Loss: {test_loss_history[-1]:.6f}")
    print(f"Best Train Loss: {min(train_loss_history):.6f}")
    print(f"Best Test Loss: {min(test_loss_history):.6f}")
    print("=" * 80)
    
    return {
        'train_loss': train_loss_history,
        'test_loss': test_loss_history,
        'epochs': epoch_list,
        'best_train_loss': min(train_loss_history),
        'best_test_loss': min(test_loss_history)
    }

# Train the autoencoder
training_history = train_autoencoder(
    autoencoder=autoencoder,
    X_train=X_train,
    X_test=X_test,
    batch_size=128,
    num_epochs=20,
    lr=0.001,
    patience=5,
    rel_loss_thresh=0.01
)

In [None]:
# Cell 5 - Plot Training History
def plot_training_history(history, save_path=None):
    """
    Plot training and test loss over epochs.
    
    Parameters:
    -----------
    history : dict
        Dictionary containing training history
    save_path : str, optional
        Path to save the plot
    """
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    epochs = history['epochs']
    train_loss = history['train_loss']
    test_loss = history['test_loss']
    
    # Linear scale plot
    axes[0].plot(epochs, train_loss, 'o-', linewidth=2, markersize=6, 
                label='Training Loss', color='#FF6B6B')
    axes[0].plot(epochs, test_loss, 's-', linewidth=2, markersize=6, 
                label='Test Loss', color='#4ECDC4')
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Reconstruction Loss (MSE)', fontsize=12)
    axes[0].set_title('Training History - Linear Scale', fontsize=13, fontweight='bold')
    axes[0].legend(fontsize=11)
    axes[0].grid(True, alpha=0.3)
    
    # Log scale plot
    axes[1].plot(epochs, train_loss, 'o-', linewidth=2, markersize=6, 
                label='Training Loss', color='#FF6B6B')
    axes[1].plot(epochs, test_loss, 's-', linewidth=2, markersize=6, 
                label='Test Loss', color='#4ECDC4')
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Reconstruction Loss (MSE) - Log Scale', fontsize=12)
    axes[1].set_title('Training History - Log Scale', fontsize=13, fontweight='bold')
    axes[1].legend(fontsize=11)
    axes[1].grid(True, alpha=0.3, which='both')
    axes[1].set_yscale('log')
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Plot saved to: {save_path}")
    
    plt.show()

# Plot training history
plot_training_history(training_history)

In [None]:
# Cell 7 - Alternative Compact Visualization (One Sample Per Digit)
def visualize_single_reconstruction_per_digit(autoencoder, X_test, y_test, save_path=None):
    """
    Visualize one original and reconstructed image for each digit class (0-9).
    
    Parameters:
    -----------
    autoencoder : MLPAutoencoder
        Trained autoencoder model
    X_test : np.ndarray
        Test images (N, 784)
    y_test : np.ndarray
        Test labels (N,)
    save_path : str, optional
        Path to save the visualization
    """
    fig, axes = plt.subplots(2, 10, figsize=(20, 5))
    
    for digit in range(10):
        # Get first occurrence of this digit
        digit_idx = np.where(y_test == digit)[0][0]
        
        # Get original image
        original = X_test[digit_idx:digit_idx+1]
        
        # Get reconstruction
        reconstructed = autoencoder.forward(original)
        
        # Reshape for display
        original_img = original.reshape(28, 28)
        reconstructed_img = reconstructed.reshape(28, 28)
        
        # Plot original (top row)
        axes[0, digit].imshow(original_img, cmap='gray')
        axes[0, digit].set_title(f'Digit {digit}', fontsize=11, fontweight='bold')
        axes[0, digit].axis('off')
        
        # Plot reconstruction (bottom row)
        axes[1, digit].imshow(reconstructed_img, cmap='gray')
        axes[1, digit].axis('off')
    
    # Add row labels
    axes[0, 0].text(-0.5, 0.5, 'Original', fontsize=13, fontweight='bold',
                   rotation=90, transform=axes[0, 0].transAxes,
                   verticalalignment='center')
    axes[1, 0].text(-0.5, 0.5, 'Reconstructed', fontsize=13, fontweight='bold',
                   rotation=90, transform=axes[1, 0].transAxes,
                   verticalalignment='center')
    
    plt.suptitle('Autoencoder Performance: One Sample Per Digit Class', 
                fontsize=14, fontweight='bold')
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Visualization saved to: {save_path}")
    
    plt.show()

# Visualize one sample per digit
visualize_single_reconstruction_per_digit(
    autoencoder=autoencoder,
    X_test=X_test,
    y_test=y_test
)

In [None]:
# Cell 8 - Compute and Visualize Reconstruction Error by Digit
def compute_reconstruction_error_by_digit(autoencoder, X_test, y_test):
    """
    Compute average reconstruction error for each digit class.
    
    Parameters:
    -----------
    autoencoder : MLPAutoencoder
        Trained autoencoder model
    X_test : np.ndarray
        Test images (N, 784)
    y_test : np.ndarray
        Test labels (N,)
        
    Returns:
    --------
    dict
        Dictionary mapping digit to average reconstruction error
    """
    reconstruction_errors = {}
    
    for digit in range(10):
        # Get all samples for this digit
        digit_indices = np.where(y_test == digit)[0]
        X_digit = X_test[digit_indices]
        
        # Get reconstructions
        X_reconstructed = autoencoder.forward(X_digit)
        
        # Compute MSE for each sample
        mse_per_sample = np.mean((X_reconstructed - X_digit) ** 2, axis=1)
        
        # Average over all samples of this digit
        avg_error = np.mean(mse_per_sample)
        reconstruction_errors[digit] = avg_error
    
    return reconstruction_errors

def plot_reconstruction_error_by_digit(reconstruction_errors, save_path=None):
    """
    Plot average reconstruction error for each digit class.
    
    Parameters:
    -----------
    reconstruction_errors : dict
        Dictionary mapping digit to reconstruction error
    save_path : str, optional
        Path to save the plot
    """
    digits = list(reconstruction_errors.keys())
    errors = list(reconstruction_errors.values())
    
    plt.figure(figsize=(10, 6))
    
    bars = plt.bar(digits, errors, color='#4ECDC4', edgecolor='black', alpha=0.7)
    
    # Color the bar with highest error differently
    max_error_digit = max(reconstruction_errors, key=reconstruction_errors.get)
    bars[max_error_digit].set_color('#FF6B6B')
    
    plt.xlabel('Digit Class', fontsize=12)
    plt.ylabel('Average Reconstruction Error (MSE)', fontsize=12)
    plt.title('Reconstruction Error by Digit Class', fontsize=14, fontweight='bold')
    plt.xticks(digits)
    plt.grid(axis='y', alpha=0.3)
    
    # Add value labels on bars
    for i, (digit, error) in enumerate(reconstruction_errors.items()):
        plt.text(digit, error + 0.0001, f'{error:.5f}', 
                ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Plot saved to: {save_path}")
    
    plt.show()

# Compute reconstruction errors
reconstruction_errors = compute_reconstruction_error_by_digit(autoencoder, X_test, y_test)

# Print errors
print("\n" + "="*60)
print("RECONSTRUCTION ERROR BY DIGIT CLASS")
print("="*60)
for digit in range(10):
    print(f"Digit {digit}: {reconstruction_errors[digit]:.6f}")
print("="*60)

# Plot errors
plot_reconstruction_error_by_digit(reconstruction_errors)

In [None]:
# # ==========================================================
# # ✅ Cell 13 - Save Model and Results (Safe version)
# # ==========================================================

# import os
# import numpy as np

# def save_autoencoder_results(autoencoder, training_history=None, reconstruction_errors=None, 
#                              latent_stats=None, save_dir="autoencoder_results"):
#     """
#     Save all autoencoder results to disk.
    
#     Parameters
#     ----------
#     autoencoder : MLPAutoencoder
#         Trained autoencoder model
#     training_history : dict or None
#         Training history (optional)
#     reconstruction_errors : dict or None
#         Reconstruction errors by class (optional)
#     latent_stats : dict or None
#         Latent space statistics (optional)
#     save_dir : str
#         Directory to save results
#     """
#     os.makedirs(save_dir, exist_ok=True)
    
#     # ------------------------------------------------------
#     # ✅ 1. Save model weights
#     # ------------------------------------------------------
#     model_data = {}
#     for idx, layer in enumerate(autoencoder.all_layers):
#         if hasattr(layer, "W") and hasattr(layer, "b"):
#             model_data[f"W_{idx}"] = layer.W
#             model_data[f"b_{idx}"] = layer.b

#     model_path = os.path.join(save_dir, "autoencoder_weights.npz")
#     np.savez(model_path, **model_data)
#     print(f"✅ Model weights saved to: {model_path}")
    
#     # ------------------------------------------------------
#     # ✅ 2. Save training history (if available)
#     # ------------------------------------------------------
#     if training_history is not None:
#         history_path = os.path.join(save_dir, "training_history.npz")
#         np.savez(history_path, 
#                  epochs=training_history.get('epochs', np.arange(len(training_history.get('train_loss', [])))),
#                  train_loss=training_history.get('train_loss', []),
#                  test_loss=training_history.get('test_loss', []))
#         print(f"✅ Training history saved to: {history_path}")
#     else:
#         print("⚠️ Skipping: training_history not provided.")
    
#     # ------------------------------------------------------
#     # ✅ 3. Save reconstruction errors (if available)
#     # ------------------------------------------------------
#     if reconstruction_errors is not None:
#         errors_path = os.path.join(save_dir, "reconstruction_errors.npz")
#         np.savez(errors_path, **{f"class_{k}": v for k, v in reconstruction_errors.items()})
#         print(f"✅ Reconstruction errors saved to: {errors_path}")
#     else:
#         print("⚠️ Skipping: reconstruction_errors not provided.")
    
#     # ------------------------------------------------------
#     # ✅ 4. Save latent statistics (optional)
#     # ------------------------------------------------------
#     if latent_stats is not None:
#         latent_path = os.path.join(save_dir, "latent_stats.npz")
#         np.savez(latent_path, **latent_stats)
#         print(f"✅ Latent stats saved to: {latent_path}")
#     else:
#         print("⚠️ Skipping: latent_stats not provided.")
    
#     # ------------------------------------------------------
#     # ✅ 5. Save architecture info
#     # ------------------------------------------------------
#     arch_path = os.path.join(save_dir, "architecture.txt")
#     if hasattr(autoencoder, "get_architecture_summary"):
#         with open(arch_path, 'w') as f:
#             f.write(autoencoder.get_architecture_summary())
#         print(f"✅ Architecture summary saved to: {arch_path}")
#     else:
#         print("⚠️ Autoencoder missing get_architecture_summary() method.")
    
#     print(f"\n📁 All results saved to directory: {save_dir}")


# # ==========================================================
# # ✅ Dummy defaults if not defined earlier
# # ==========================================================
# if 'training_history' not in locals():
#     training_history = {"epochs": np.arange(10), "train_loss": np.random.rand(10), "test_loss": np.random.rand(10)}

# if 'reconstruction_errors' not in locals():
#     reconstruction_errors = {"0": np.random.rand(10), "1": np.random.rand(10)}

# if 'latent_stats' not in locals():
#     latent_stats = {"mean": np.random.rand(5), "std": np.random.rand(5)}

# # ==========================================================
# # ✅ Call the function safely
# # ==========================================================
# save_autoencoder_results(
#     autoencoder=autoencoder,
#     training_history=training_history,
#     reconstruction_errors=reconstruction_errors,
#     latent_stats=latent_stats,
#     save_dir="autoencoder_results"
# )


# 3.2

# 3.2.1

In [None]:
# Cell 1 - Load LFW Dataset and Prepare Data
import os
from PIL import Image
import numpy as np
from collections import defaultdict

# Set random seed
np.random.seed(42)

# Path to LFW dataset
lfw_path = "/home/rohitha/ass3/LFW_Dataset"

print("=" * 80)
print("LOADING LFW DATASET")
print("=" * 80)

# Dictionary to store images by person
person_images = defaultdict(list)

# Load all images
for person_name in os.listdir(lfw_path):
    person_dir = os.path.join(lfw_path, person_name)
    
    if not os.path.isdir(person_dir):
        continue
    
    for img_file in os.listdir(person_dir):
        if img_file.endswith(('.jpg', '.png', '.jpeg')):
            img_path = os.path.join(person_dir, img_file)
            try:
                # Load image and convert to grayscale
                img = Image.open(img_path).convert('L')
                img_array = np.array(img, dtype=np.float32)
                person_images[person_name].append(img_array)
            except Exception as e:
                print(f"Error loading {img_path}: {e}")

# Print dataset statistics
print(f"\nTotal people in dataset: {len(person_images)}")
for person, images in sorted(person_images.items(), key=lambda x: len(x[1]), reverse=True)[:10]:
    print(f"  {person}: {len(images)} images")

print("\n" + "=" * 80)


In [None]:
# Cell 2 - Prepare Train/Test Splits for Anomaly Detection

# Extract George W Bush images (normal class)
normal_class = 'George_W_Bush'
normal_images = person_images[normal_class]

print("=" * 80)
print("PREPARING TRAIN/TEST SPLITS")
print("=" * 80)
print(f"\nNormal class: {normal_class}")
print(f"Total normal images: {len(normal_images)}")

# Use 400 images for training, 100 for testing (from George W Bush)
np.random.shuffle(normal_images)
normal_train = normal_images[:400]
normal_test = normal_images[400:500]

print(f"Normal training images: {len(normal_train)}")
print(f"Normal test images: {len(normal_test)}")

# Collect anomaly images from other classes
anomaly_images = []
anomaly_count_per_class = {}

for person, images in person_images.items():
    if person == normal_class:
        continue
    
    # Take up to 10 images from each other person as anomalies
    n_images = min(10, len(images))
    anomaly_images.extend(images[:n_images])
    anomaly_count_per_class[person] = n_images

print(f"\nTotal anomaly images collected: {len(anomaly_images)}")
print(f"Number of anomaly classes: {len(anomaly_count_per_class)}")

# Create test set: mix of normal and anomaly
# Use 100 normal + anomalies
test_images = normal_test + anomaly_images
test_labels = [0] * len(normal_test) + [1] * len(anomaly_images)  # 0=normal, 1=anomaly

# Shuffle test set
test_indices = np.random.permutation(len(test_images))
test_images = [test_images[i] for i in test_indices]
test_labels = [test_labels[i] for i in test_indices]

print(f"\nTest set composition:")
print(f"  Normal images: {sum(1 for l in test_labels if l == 0)}")
print(f"  Anomaly images: {sum(1 for l in test_labels if l == 1)}")
print(f"  Total test images: {len(test_images)}")

# Get image dimensions
img_height, img_width = normal_images[0].shape
input_dim = img_height * img_width

print(f"\nImage dimensions: {img_height}x{img_width}")
print(f"Input dimension: {input_dim}")
print("=" * 80)


In [None]:
# Cell 3 - Preprocess and Normalize Data

print("=" * 80)
print("PREPROCESSING DATA")
print("=" * 80)

# Convert training images to flattened arrays and normalize
X_train_lfw = np.array([img.flatten() / 255.0 for img in normal_train], dtype=np.float32)

# Convert test images to flattened arrays and normalize
X_test_lfw = np.array([img.flatten() / 255.0 for img in test_images], dtype=np.float32)
y_test_lfw = np.array(test_labels, dtype=np.int32)

print(f"Training data shape: {X_train_lfw.shape}")
print(f"Test data shape: {X_test_lfw.shape}")
print(f"Test labels shape: {y_test_lfw.shape}")
print(f"\nData type: {X_train_lfw.dtype}")
print(f"Value range: [{X_train_lfw.min():.3f}, {X_train_lfw.max():.3f}]")

# Memory usage estimation
train_memory = X_train_lfw.nbytes / (1024**2)  # MB
test_memory = X_test_lfw.nbytes / (1024**2)  # MB
print(f"\nMemory usage:")
print(f"  Training data: {train_memory:.2f} MB")
print(f"  Test data: {test_memory:.2f} MB")
print("=" * 80)


In [None]:
# Cell 4 - Create Autoencoder for LFW Data

# Define autoencoder architecture optimized for 250x250 grayscale images
lfw_autoencoder = MLPAutoencoder(
    input_dim=input_dim,  # 62500 for 250x250 images
    hidden_dims=[2048, 512],  # Reduced complexity for faster training
    latent_dim=128
)

print(lfw_autoencoder.get_architecture_summary())

# Count total parameters
total_params = 0
for layer in lfw_autoencoder.all_layers:
    if hasattr(layer, 'W'):
        total_params += layer.W.size + layer.b.size

print(f"\nTotal parameters: {total_params:,}")


In [None]:
# Cell 5 - Train Autoencoder on Normal Data (George W Bush)

# Optimized training parameters for faster convergence
lfw_training_history = train_autoencoder(
    autoencoder=lfw_autoencoder,
    X_train=X_train_lfw,
    X_test=X_train_lfw[:50],  # Use small subset for validation during training
    batch_size=8,  # Smaller batch size for memory efficiency
    num_epochs=100,  # Reduced epochs
    lr=0.01,  # Lower learning rate for stability
    patience=15,
    rel_loss_thresh=0.0001
)


Epoch 3/100 - Train Loss: 0.333222 - Test Loss: 0.324495


In [None]:
# Cell 6 - Visualize Training Progress

plt.figure(figsize=(10, 6))
plt.plot(lfw_training_history['epochs'], lfw_training_history['train_loss'], 
         label='Train Loss', marker='o', linewidth=2)
plt.plot(lfw_training_history['epochs'], lfw_training_history['test_loss'], 
         label='Validation Loss', marker='s', linewidth=2)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (MSE)', fontsize=12)
plt.title('LFW Autoencoder Training Progress', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nFinal Training Loss: {lfw_training_history['train_loss'][-1]:.6f}")
print(f"Best Training Loss: {lfw_training_history['best_train_loss']:.6f}")


In [None]:
# Cell 7 - Compute Reconstruction Errors on Test Set

print("=" * 80)
print("COMPUTING RECONSTRUCTION ERRORS")
print("=" * 80)

# Compute reconstruction errors for all test images
reconstruction_errors = []
batch_size = 32

for i in range(0, len(X_test_lfw), batch_size):
    X_batch = X_test_lfw[i:i+batch_size]
    X_reconstructed = lfw_autoencoder.forward(X_batch)
    
    # Compute MSE for each image in batch
    batch_errors = np.mean((X_reconstructed - X_batch) ** 2, axis=1)
    reconstruction_errors.extend(batch_errors)

reconstruction_errors = np.array(reconstruction_errors)

print(f"Computed reconstruction errors for {len(reconstruction_errors)} test images")
print(f"\nReconstruction Error Statistics:")
print(f"  Mean: {reconstruction_errors.mean():.6f}")
print(f"  Std: {reconstruction_errors.std():.6f}")
print(f"  Min: {reconstruction_errors.min():.6f}")
print(f"  Max: {reconstruction_errors.max():.6f}")
print(f"  Median: {np.median(reconstruction_errors):.6f}")

# Separate errors by class
normal_errors = reconstruction_errors[y_test_lfw == 0]
anomaly_errors = reconstruction_errors[y_test_lfw == 1]

print(f"\nNormal Images (n={len(normal_errors)}):")
print(f"  Mean Error: {normal_errors.mean():.6f}")
print(f"  Std Error: {normal_errors.std():.6f}")

print(f"\nAnomaly Images (n={len(anomaly_errors)}):")
print(f"  Mean Error: {anomaly_errors.mean():.6f}")
print(f"  Std Error: {anomaly_errors.std():.6f}")

print("=" * 80)


In [None]:
# Cell 9 - Determine Optimal Threshold

print("=" * 80)
print("DETERMINING OPTIMAL THRESHOLD")
print("=" * 80)

# Strategy: Use percentile of normal reconstruction errors
# Common approaches: mean + k*std, or percentile-based

# Method 1: Mean + k*std of normal errors
threshold_1 = normal_errors.mean() + 2 * normal_errors.std()

# Method 2: 95th percentile of normal errors
threshold_2 = np.percentile(normal_errors, 95)

# Method 3: Mean of normal errors
threshold_3 = normal_errors.mean()

print(f"\nCandidate Thresholds:")
print(f"  Method 1 (Mean + 2*Std): {threshold_1:.6f}")
print(f"  Method 2 (95th percentile): {threshold_2:.6f}")
print(f"  Method 3 (Mean): {threshold_3:.6f}")

# Choose threshold that best separates normal and anomaly
# We'll use Method 2 (95th percentile) as it's commonly used
optimal_threshold = threshold_2

print(f"\nSelected Threshold: {optimal_threshold:.6f}")

# Classification: error > threshold => anomaly
y_pred = (reconstruction_errors > optimal_threshold).astype(int)

print(f"\nPredictions:")
print(f"  Classified as Normal: {np.sum(y_pred == 0)}")
print(f"  Classified as Anomaly: {np.sum(y_pred == 1)}")
print("=" * 80)


In [None]:
# Cell 10 - Calculate Evaluation Metrics

from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, roc_curve

print("=" * 80)
print("EVALUATION METRICS")
print("=" * 80)

# Calculate metrics
auc_score = roc_auc_score(y_test_lfw, reconstruction_errors)
precision = precision_score(y_test_lfw, y_pred)
recall = recall_score(y_test_lfw, y_pred)
f1 = f1_score(y_test_lfw, y_pred)

print(f"\nPerformance Metrics:")
print(f"  AUC Score:  {auc_score:.4f}")
print(f"  Precision:  {precision:.4f}")
print(f"  Recall:     {recall:.4f}")
print(f"  F1-Score:   {f1:.4f}")

# Confusion Matrix
cm = confusion_matrix(y_test_lfw, y_pred)
tn, fp, fn, tp = cm.ravel()

print(f"\nConfusion Matrix:")
print(f"  True Negatives:  {tn}")
print(f"  False Positives: {fp}")
print(f"  False Negatives: {fn}")
print(f"  True Positives:  {tp}")

print(f"\nClassification Breakdown:")
print(f"  Accuracy: {(tn + tp) / len(y_test_lfw):.4f}")
print(f"  Specificity: {tn / (tn + fp):.4f}" if (tn + fp) > 0 else "  Specificity: N/A")
print(f"  Sensitivity (Recall): {recall:.4f}")

print("=" * 80)


In [None]:
# Cell 12 - Visualize Sample Reconstructions

# Select samples to visualize
# 2 correctly classified normals, 2 correctly classified anomalies
# 1 misclassified normal, 1 misclassified anomaly (if any)

normal_indices = np.where(y_test_lfw == 0)[0]
anomaly_indices = np.where(y_test_lfw == 1)[0]

# Correctly classified
correct_normal_idx = normal_indices[(y_pred[normal_indices] == 0)][:2]
correct_anomaly_idx = anomaly_indices[(y_pred[anomaly_indices] == 1)][:2]

# Misclassified (if any exist)
misclass_normal_idx = normal_indices[(y_pred[normal_indices] == 1)][:1]
misclass_anomaly_idx = anomaly_indices[(y_pred[anomaly_indices] == 0)][:1]

# Combine all indices
sample_indices = np.concatenate([
    correct_normal_idx,
    correct_anomaly_idx,
    misclass_normal_idx,
    misclass_anomaly_idx
])

# Generate reconstructions
sample_original = X_test_lfw[sample_indices]
sample_reconstructed = lfw_autoencoder.forward(sample_original)

# Visualize
n_samples = len(sample_indices)
fig, axes = plt.subplots(2, n_samples, figsize=(3*n_samples, 6))

titles = (
    ['Correct Normal']*len(correct_normal_idx) +
    ['Correct Anomaly']*len(correct_anomaly_idx) +
    ['Misclass. Normal']*len(misclass_normal_idx) +
    ['Misclass. Anomaly']*len(misclass_anomaly_idx)
)

for i, idx in enumerate(sample_indices):
    # Original
    axes[0, i].imshow(sample_original[i].reshape(img_height, img_width), cmap='gray')
    axes[0, i].set_title(f'{titles[i]}\nError: {reconstruction_errors[idx]:.4f}', fontsize=9)
    axes[0, i].axis('off')
    
    # Reconstructed
    axes[1, i].imshow(sample_reconstructed[i].reshape(img_height, img_width), cmap='gray')
    axes[1, i].set_title('Reconstructed', fontsize=9)
    axes[1, i].axis('off')

axes[0, 0].set_ylabel('Original', fontsize=12, fontweight='bold')
axes[1, 0].set_ylabel('Reconstructed', fontsize=12, fontweight='bold')

plt.suptitle('Sample Reconstructions: Normal vs Anomaly', fontsize=14, fontweight='bold', y=1.02)
plt.tight_layout()
plt.show()
