In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/fashionmnist/t10k-labels-idx1-ubyte
/kaggle/input/fashionmnist/t10k-images-idx3-ubyte
/kaggle/input/fashionmnist/fashion-mnist_test.csv
/kaggle/input/fashionmnist/fashion-mnist_train.csv
/kaggle/input/fashionmnist/train-labels-idx1-ubyte
/kaggle/input/fashionmnist/train-images-idx3-ubyte


In [4]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical


import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
# Copied directly from Question 1

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return (x > 0).astype(float)

def leaky_relu(x, alpha=0.01):
    return np.where(x > 0, x, alpha * x)

def leaky_relu_derivative(x, alpha=0.01):
    return np.where(x > 0, 1, alpha)

def tanh(x):
    return np.tanh(x)

def tanh_derivative(x):
    return 1 - np.tanh(x)**2

def gelu(x):
    return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)))

def gelu_derivative(x):
    s = np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)
    tanh_s = np.tanh(s)
    sech2_s = 1 - tanh_s**2

    return 0.5 * (1 + tanh_s) + 0.5 * x * sech2_s * np.sqrt(2 / np.pi) * (1 + 0.134145 * x**2)

def softmax(x):
    shifted = x - np.max(x, axis=0, keepdims=True)
    exps = np.exp(shifted)
    return exps / np.sum(exps, axis=0, keepdims=True)


def cross_entropy_loss(y_pred, y_true):
    m = y_true.shape[0]
    loss = -np.sum(y_true * np.log(y_pred + 1e-15)) / m
    return loss

def accuracy(y_pred, y_true):
    y_pred_labels = np.argmax(y_pred, axis=1)
    y_true_labels = np.argmax(y_true, axis=1)
    return np.mean(y_pred_labels == y_true_labels)



class MLP:
    def __init__(self, layer_sizes, activations, dropout_rate=0.0):
        self.layers = []
        self.dropout_rate = dropout_rate
        for i in range(len(layer_sizes) - 1):
            # He initialization for ReLU, Xavier for Tanh
            if activations[i] == 'softmax':
                scale = 0.01  # Smaller scale for softmax layer
            elif activations[i] in ['relu', 'leaky_relu', 'gelu']:
                scale = np.sqrt(2.0 / layer_sizes[i])
            else:
                scale = np.sqrt(1.0 / layer_sizes[i])  # For Tanh/Sigmoid
            W = np.random.randn(layer_sizes[i+1], layer_sizes[i]) * scale
            b = np.zeros((layer_sizes[i+1], 1))
            self.layers.append({
                'W': W,
                'b': b,
                'activation': activations[i]
            })

    def forward(self, X, training=True):
        A = X.T  # (input_size, batch_size)
        cache = []
        for layer in self.layers:
            Z = layer['W'] @ A + layer['b']
            activation = layer['activation']
            if activation == 'relu':
                A = relu(Z)
            elif activation == 'leaky_relu':
                A = leaky_relu(Z)
            elif activation == 'tanh':
                A = tanh(Z)
            elif activation == 'gelu':
                A = gelu(Z)
            elif activation == 'softmax':
                A = softmax(Z)
            # Applying dropout (skip for output layer)
            if training and activation != 'softmax' and self.dropout_rate > 0:
                mask = (np.random.rand(*A.shape) < (1 - self.dropout_rate)) / (1 - self.dropout_rate)
                A *= mask
                cache.append({'Z': Z, 'A': A, 'mask': mask})
            else:
                cache.append({'Z': Z, 'A': A})
        return A.T, cache  # (batch_size, output_size), cache

    def backward(self, X, y_true, learning_rate, cache):
        y_pred = cache[-1]['A'].T
        m = y_true.shape[0]
        dZ = (y_pred - y_true).T / m

        for i in reversed(range(len(self.layers))):
            layer = self.layers[i]
            layer_cache = cache[i]
            Z = layer_cache['Z']
            A_prev = X.T if i == 0 else cache[i-1]['A']

            # gradients for weights/biases
            dW = dZ @ A_prev.T
            db = np.sum(dZ, axis=1, keepdims=True)

            # Update parameters
            layer['W'] -= learning_rate * dW
            layer['b'] -= learning_rate * db

            # Propagate gradient to previous layer
            if i > 0:
                dA = self.layers[i]['W'].T @ dZ
                # Apply dropout mask if present
                if 'mask' in cache[i-1]:
                    dA *= cache[i-1]['mask']
                # Get Z from the PREVIOUS layer's cache
                Z_prev = cache[i-1]['Z']
                # Compute dZ using activation derivative of PREVIOUS layer
                activation = self.layers[i-1]['activation']
                if activation == 'relu':
                    dZ = dA * relu_derivative(Z_prev)  # Use Z_prev
                elif activation == 'leaky_relu':
                    dZ = dA * leaky_relu_derivative(Z_prev)
                elif activation == 'tanh':
                    dZ = dA * tanh_derivative(Z_prev)
                elif activation == 'gelu':
                    dZ = dA * gelu_derivative(Z_prev)


def train_mlp(x_train, y_train, x_val, y_val, layer_sizes, activations,
              epochs=10, batch_size=64, lr=0.01, dropout_rate=0.0,
              patience=5, delta=0.001):

    print(layer_sizes,activations)

    mlp = MLP(layer_sizes, activations, dropout_rate)

    train_losses, train_accs = [], []
    val_losses, val_accs = [], []

    best_val_loss = np.inf
    best_epoch = 0
    wait = 0  # Counter for epochs without improvement for early stopping
    best_model = None  # Store the best model state

    for epoch in range(epochs):
        permutation = np.random.permutation(x_train.shape[0])
        x_shuffled = x_train[permutation]
        y_shuffled = y_train[permutation]

        epoch_train_loss, epoch_train_acc = 0, 0
        for i in range(0, x_train.shape[0], batch_size):
            X_batch = x_shuffled[i:i+batch_size]
            y_batch = y_shuffled[i:i+batch_size]
            y_pred, cache = mlp.forward(X_batch, training=True)
            batch_loss = cross_entropy_loss(y_pred, y_batch)
            batch_acc = accuracy(y_pred, y_batch)
            epoch_train_loss += batch_loss * len(X_batch)
            epoch_train_acc += batch_acc * len(X_batch)
            mlp.backward(X_batch, y_batch, lr, cache)

        # Compute training metrics
        epoch_train_loss /= len(x_train)
        epoch_train_acc /= len(x_train)
        train_losses.append(epoch_train_loss)
        train_accs.append(epoch_train_acc)

        # Compute validation metrics
        val_pred, _ = mlp.forward(x_val, training=False)
        val_loss = cross_entropy_loss(val_pred, y_val)
        val_acc = accuracy(val_pred, y_val)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        # Early stopping logic
        if val_loss < best_val_loss - delta:
            print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}")
            best_val_loss = val_loss
            best_epoch = epoch
            wait = 0
            # Save the best model weights
            best_model = [
                {'W': np.copy(layer['W']), 'b': np.copy(layer['b'])}
                for layer in mlp.layers
            ]
        else:
            wait += 1
            if wait >= patience:
                print(f"\nEarly stopping at epoch {epoch+1} (best epoch: {best_epoch+1})")
                break

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_train_loss:.4f} | Train Acc: {epoch_train_acc:.4f} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | "
              f"Patience: {wait}/{patience}")

    # Restore the best model weights
    if best_model is not None:
        for i, layer in enumerate(mlp.layers):
            layer['W'] = best_model[i]['W']
            layer['b'] = best_model[i]['b']

    return mlp, train_losses, train_accs, val_losses, val_accs

In [None]:
# ------------------------------
# Data Loading and Preprocessing
# ------------------------------
train_df = pd.read_csv('/kaggle/input/fashionmnist/fashion-mnist_train.csv')
test_df  = pd.read_csv('/kaggle/input/fashionmnist/fashion-mnist_test.csv')

# Preprocessing with correct normalization
train_data = np.array(train_df, dtype='float32')
test_data  = np.array(test_df, dtype='float32')
X_train = (train_data[:, 1:] / 255.0 - 0.5) / 0.5  # Normalize to [-1, 1]
y_train = train_data[:, 0].astype(np.int64)
X_test  = (test_data[:, 1:] / 255.0 - 0.5) / 0.5
y_test  = test_data[:, 0].astype(np.int64)

x_train, x_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

print(x_train.shape)

# ------------------------------
# Define the CNN Extractor in PyTorch
# ------------------------------
class CNNExtractor(nn.Module):
    def __init__(self, pool_method='max', weight_init='xavier',conv_dims=[32, 64, 128, 256, 512],n6=32):
        super(CNNExtractor, self).__init__()
        self.pool_method = pool_method
        self.relu = nn.ReLU(inplace=True)
        self.n6=n6

        # Convolutional layers with correct filter sizes
        self.conv1 = nn.Conv2d(1, conv_dims[0], kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(conv_dims[0], conv_dims[1], kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(conv_dims[1], conv_dims[2], kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(conv_dims[2], conv_dims[3], kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(conv_dims[3], conv_dims[4], kernel_size=3, padding=1)

        self.pool = self._get_pooling(pool_method)

        # Weight initialization
        self._initialize_weights(weight_init)


        # Final flattened size calculation
        with torch.no_grad():
            dummy_input = torch.zeros(1, 1, 28, 28)
            output = self.forward_features(dummy_input)
        self.flattened_size= output.view(-1).shape[0]

        print(self.flattened_size)

        # FCC
        # self.fc = nn.Linear(self.flattened_size, 10)
        self.fc = nn.Sequential(
            nn.Linear(self.flattened_size, self.n6),
            nn.ReLU(inplace=True),
            nn.Linear(self.n6, 10)
        )

    def _get_pooling(self, method):
        pool_dict = {
            'max': nn.MaxPool2d(kernel_size=2, stride=2),
            'avg': nn.AvgPool2d(kernel_size=2, stride=2),
            'global': nn.AdaptiveAvgPool2d((1, 1))
        }
        return pool_dict.get(method)

    def forward_features(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.relu(self.conv3(x))
        x = self.pool(x)
        x = self.relu(self.conv4(x))
        x = self.pool(x)
        x = self.relu(self.conv5(x))
        return torch.flatten(x, 1)

    def forward(self, x):
        x = x.view(-1, 1, 28, 28)
        x = self.forward_features(x)
        x = self.fc(x)
        return x

    def extract_features(self, x):
        x = x.view(-1, 1, 28, 28)
        with torch.no_grad():
            x = self.forward_features(x)
        return x

    def _initialize_weights(self, method):
        for layer in self.modules():
            if isinstance(layer, nn.Conv2d):
                if method == 'xavier':
                    nn.init.xavier_uniform_(layer.weight)
                elif method == 'he':
                    nn.init.kaiming_uniform_(layer.weight, nonlinearity='relu')
                elif method == 'random':
                    nn.init.normal_(layer.weight, mean=0, std=0.1)
                if layer.bias is not None:
                    nn.init.constant_(layer.bias, 0)

# ------------------------------
# Train Function with Device Handling
# ------------------------------
def train_cnn_extractor(epochs=10, batch_size=64, lr=0.001, pool_method='max',
                       weight_init='he', conv_dims=[32, 64, 128, 256, 512],n6=32):
    print(pool_method,weight_init,conv_dims,"MLP Hidden Layer: ",n6)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Convert data to tensors
    x_train_tensor = torch.tensor(x_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    x_val_tensor = torch.tensor(x_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)

    # Create torch datasets and loaders
    train_dataset = torch.utils.data.TensorDataset(x_train_tensor, y_train_tensor)
    val_dataset = torch.utils.data.TensorDataset(x_val_tensor, y_val_tensor)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Initialize model
    model = CNNExtractor(pool_method, weight_init,conv_dims,n6=32).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    best_val_loss = float('inf')
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        epoch_loss, train_correct, train_total = 0, 0, 0

        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item() * inputs.size(0)
            train_correct += (outputs.argmax(dim=1) == targets).sum().item()
            train_total += inputs.size(0)

        # Validation phase
        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                val_loss += loss_fn(outputs, targets).item() * inputs.size(0)
                val_correct += (outputs.argmax(dim=1) == targets).sum().item()
                val_total += inputs.size(0)

        # Calculate metrics
        epoch_loss /= len(train_loader.dataset)
        val_loss /= len(val_loader.dataset)
        train_acc = train_correct / train_total
        val_acc = val_correct / val_total

        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {epoch_loss:.4f} | Train Acc: {train_acc:.4f} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict()

    # Load best model weights
    model.load_state_dict(best_model_state)

    return model.to('cpu')  # Return to CPU for feature extraction


# ------------------------------
# Train the Model ............Change Parameters Here
# ------------------------------
print("Initializing Feature Extractor Training...")
cnn_model = train_cnn_extractor(epochs=10, pool_method='max', weight_init='xavier',conv_dims=[32, 64, 128, 256,512],n6=1024,)
print("Model Training Completed")

# ------------------------------
# Feature Extraction
# ------------------------------

cnn_model.eval()
x_train_tensor = torch.tensor(x_train, dtype=torch.float32)
x_val_tensor = torch.tensor(x_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
with torch.no_grad():
    features_train = cnn_model.extract_features(x_train_tensor).numpy()
    features_val = cnn_model.extract_features(x_val_tensor).numpy()
    features_test = cnn_model.extract_features(X_test_tensor).numpy()

print("Extracted feature shapes:", features_train.shape, features_val.shape, features_test.shape)

# Evaluate the Test Features
# Convert numpy features to PyTorch tensors
features_test_tensor = torch.tensor(features_test, dtype=torch.float32)

# Make predictions using the model's classifier (fc layer)
with torch.no_grad():
    test_outputs = cnn_model.fc(features_test_tensor)
    test_preds = torch.argmax(test_outputs, dim=1).numpy()

# print(test_preds.shape)
# Calculate test accuracy
test_accuracy = (test_preds == y_test).mean()
print(f"\nTest Accuracy: {test_accuracy:.4f}")


(48000, 784)
Initializing Feature Extractor Training...
max xavier [32, 64, 128, 256, 512] MLP Hidden Layer:  1024
512
Epoch 1/10 | Train Loss: 0.6960 | Train Acc: 0.7332 | Val Loss: 0.4207 | Val Acc: 0.8477
Epoch 2/10 | Train Loss: 0.3632 | Train Acc: 0.8696 | Val Loss: 0.3247 | Val Acc: 0.8832
Epoch 3/10 | Train Loss: 0.2888 | Train Acc: 0.8961 | Val Loss: 0.2785 | Val Acc: 0.8988
Epoch 4/10 | Train Loss: 0.2472 | Train Acc: 0.9104 | Val Loss: 0.2575 | Val Acc: 0.9087
Epoch 5/10 | Train Loss: 0.2152 | Train Acc: 0.9217 | Val Loss: 0.2663 | Val Acc: 0.9073
Epoch 6/10 | Train Loss: 0.1903 | Train Acc: 0.9305 | Val Loss: 0.2569 | Val Acc: 0.9090
Epoch 7/10 | Train Loss: 0.1673 | Train Acc: 0.9390 | Val Loss: 0.2778 | Val Acc: 0.9130
Epoch 8/10 | Train Loss: 0.1467 | Train Acc: 0.9460 | Val Loss: 0.2702 | Val Acc: 0.9141
Epoch 9/10 | Train Loss: 0.1278 | Train Acc: 0.9527 | Val Loss: 0.3052 | Val Acc: 0.9042
Epoch 10/10 | Train Loss: 0.1105 | Train Acc: 0.9593 | Val Loss: 0.2915 | Val Ac

In [57]:
from keras.utils import to_categorical
num_classes=10
y_train_oh = to_categorical(y_train, num_classes)
y_val_oh = to_categorical(y_val, num_classes)
y_test_oh = to_categorical(y_test, num_classes)

In [58]:
print(features_train.shape,y_train_oh.shape)
print(features_val.shape,y_val_oh.shape)
print(features_test.shape,y_test_oh.shape)
# ------------------------------
# Our Custom MLP (NumPy-based) on the extracted features
# ------------------------------
layer_sizes = [features_train.shape[1], 1024,10]
# Note: If you originally used more layers, adjust accordingly.
activations = ['relu', 'softmax']

print("Training custom MLP on CNN features...")
mlp_model, train_losses, train_accs, val_losses, val_accs = train_mlp(
    features_train, y_train_oh,
    features_val, y_val_oh,
    layer_sizes, activations,
    epochs=200, lr=0.001, dropout_rate=0.2, patience=5, batch_size=32
)
print("Custom MLP training complete.")


# Use your custom MLP for prediction.
test_pred, _ = mlp_model.forward(features_test, training=False)
test_acc = accuracy(test_pred, y_test_oh)
print(f"Test Accuracy (Custom MLP on CNN features): {test_acc:.4f}")


# import matplotlib.pyplot as plt

# # Plot loss curves
# plt.figure(figsize=(12, 5))
# plt.subplot(1, 2, 1)
# plt.plot(train_losses, label='Training Loss')
# plt.plot(val_losses, label='Validation Loss')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.legend()
# plt.title('Loss vs. Epochs')

# # Plot accuracy curves
# plt.subplot(1, 2, 2)
# plt.plot(train_accs, label='Training Accuracy')
# plt.plot(val_accs, label='Validation Accuracy')
# plt.xlabel('Epochs')
# plt.ylabel('Accuracy')
# plt.legend()
# plt.title('Accuracy vs. Epochs')

# plt.tight_layout()
# plt.show()


(48000, 512) (48000, 10)
(12000, 512) (12000, 10)
(10000, 512) (10000, 10)
Training custom MLP on CNN features...
[512, 1024, 10] ['relu', 'softmax']
Validation loss improved from inf to 0.2853
Epoch 1/200 | Train Loss: 0.1045 | Train Acc: 0.9697 | Val Loss: 0.2853 | Val Acc: 0.9192 | Patience: 0/5
Epoch 2/200 | Train Loss: 0.0796 | Train Acc: 0.9728 | Val Loss: 0.2960 | Val Acc: 0.9184 | Patience: 1/5
Epoch 3/200 | Train Loss: 0.0757 | Train Acc: 0.9740 | Val Loss: 0.2954 | Val Acc: 0.9204 | Patience: 2/5
Epoch 4/200 | Train Loss: 0.0749 | Train Acc: 0.9739 | Val Loss: 0.2938 | Val Acc: 0.9197 | Patience: 3/5
Epoch 5/200 | Train Loss: 0.0726 | Train Acc: 0.9740 | Val Loss: 0.2972 | Val Acc: 0.9197 | Patience: 4/5

Early stopping at epoch 6 (best epoch: 1)
Custom MLP training complete.
Test Accuracy (Custom MLP on CNN features): 0.9235
