In [1]:
import os
os.environ["PYTHONHASHSEED"] = "42"
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

import pandas as pd
import numpy as np
import random
import time
import torch
import torch.nn.functional as F
from torchvision import datasets
from torch.autograd.functional import hessian
import torch.nn as nn
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [2]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.use_deterministic_algorithms(True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def get_cropped_images(X, y, n):
    """Returns images that can be cropped by n pixels on all sides without
    losing any information (all cropped pixels = 0).

    Args:
        X np.ndarray: Input images.
        y np.ndarray: Corresponding labels.
        n (int): Number of pixels to crop from each side.

    Returns:
        np.ndarray: Cropped images.
    """

    mask = np.zeros((X.shape[1], X.shape[2]), dtype=bool)
    mask[:n, :] = True
    mask[-n:, :] = True
    mask[:, :n] = True
    mask[:, -n:] = True

    border_pixels = X[:, mask]
    croppable_mask = (border_pixels.sum(axis=1) == 0)
    return X[croppable_mask, n:-n, n:-n], y[croppable_mask]

In [4]:
# Preprocessing

# Load MNIST dataset
train = datasets.MNIST(root="./data", train=True,  download=True)
test  = datasets.MNIST(root="./data", train=False, download=True)
X_full = torch.cat([train.data, test.data], dim=0).numpy()
y_full = torch.cat([train.targets, test.targets], dim=0).numpy()

# Crop images by n pixels on each side
X_cropped, y_cropped = get_cropped_images(X_full, y_full, 4)
y_cropped = pd.Series(y_cropped)

# Select most frequent classes
num_labels = 7
classes = y_cropped.value_counts().index[:num_labels]

# Create train/test split with 800 train / 200 test samples per class
X_train_list = []
y_train_list = []
X_test_list = []
y_test_list = []

for clss in classes:
    indices = y_cropped[y_cropped == clss].sample(1000, random_state=SEED).index
    X_train_list.append(X_cropped[indices[:800]])
    y_train_list.append(y_cropped[indices[:800]])
    X_test_list.append(X_cropped[indices[800:]])
    y_test_list.append(y_cropped[indices[800:]])

X = np.concatenate(X_train_list, axis=0)
y = pd.concat(y_train_list, axis=0).reset_index(drop=True)
X_test = np.concatenate(X_test_list, axis=0)
y_test = pd.concat(y_test_list, axis=0).reset_index(drop=True)

# 0 mean normalize
X_mean, X_std = X.mean(), X.std()
X = (X - X_mean) / X_std
X_test = (X_test - X_mean) / X_std

# Convert y from image label to class label (see classes variable)
class_map = {clss: idx for idx, clss in enumerate(classes)}
y = y.map(class_map).to_numpy()
y_test = y_test.map(class_map).to_numpy()

# Convert to torch
X = torch.tensor(X, dtype=torch.float32, device=device)
y = torch.tensor(y, dtype=torch.long, device=device)
X_test = torch.tensor(X_test, dtype=torch.float32, device=device)
y_test = torch.tensor(y_test, dtype=torch.long, device=device)

In [11]:
class FullyConnectedNet(nn.Module):
    def __init__(self, input_size, num_hidden_layers, hidden_layer_size, 
                 num_labels, activation=nn.ReLU):
        super(FullyConnectedNet, self).__init__()

        self.input_size = input_size
        self.num_hidden_layers = num_hidden_layers
        self.hidden_layers_size = hidden_layer_size
        self.num_labels = num_labels
        self.activation = activation

        layers = [nn.Flatten()]

        in_size = input_size
        for _ in range(num_hidden_layers):
            layers += [nn.Linear(in_size, hidden_layer_size), activation()]
            in_size = hidden_layer_size

        layers.append(nn.Linear(in_size, num_labels))
        self.network = nn.Sequential(*layers)
        self.param_list = list(self.parameters())

        for m in self.network:
            if isinstance(m, nn.Linear):
                if activation == nn.ReLU:
                    nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                elif activation == nn.Tanh:
                    nn.init.xavier_uniform_(m.weight)
                else:
                    nn.init.kaiming_normal_(m.weight, nonlinearity='relu')

        nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.network(x)

In [6]:
def setup_output_files(output_dir="output"): 

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    metadata_path = os.path.join(output_dir, "metadata_rmsprop.csv")
    output_data_path = os.path.join(output_dir, "output_rmsprop.csv")

    if os.path.exists(metadata_path):
            metadata = pd.read_csv(metadata_path)
    else:
        metadata = pd.DataFrame({
            "model_id": pd.Series(dtype="int"),
            "model_type": pd.Series(dtype="str"),
            "num_hidden_layers": pd.Series(dtype="int"),
            "hidden_layers_size": pd.Series(dtype="int"),
            "activation_function": pd.Series(dtype="str"),
            "optimizer": pd.Series(dtype="str"),
            "learning_rate": pd.Series(dtype="float"),
            "num_epochs": pd.Series(dtype="int"),
            "time_minutes": pd.Series(dtype="float"),
        })

    if os.path.exists(output_data_path):
        output_data = pd.read_csv(output_data_path)
    else:
        output_data = pd.DataFrame({
            "model_id": pd.Series(dtype="int"),
            "epoch": pd.Series(dtype="int"),
            "train_loss": pd.Series(dtype="float"),
            "train_accuracy": pd.Series(dtype="float"),
            "test_accuracy": pd.Series(dtype="float"),
            "sharpness_H": pd.Series(dtype="float"),
            "sharpness_A": pd.Series(dtype="float"),
            "trace_H": pd.Series(dtype="float"),
            "alpha_t": pd.Series(dtype="float")
        })

    return metadata, output_data

def load_output_files(output_dir="output"):
    metadata_path = os.path.join(output_dir, "metadata_rmsprop.csv")
    output_data_path = os.path.join(output_dir, "output_rmsprop.csv")

    metadata = pd.read_csv(metadata_path)
    output_data = pd.read_csv(output_data_path)

    return metadata, output_data

def save_output_files(metadata, output_data, output_dir="output"):

    metadata_path = os.path.join(output_dir, "metadata_rmsprop.csv")
    output_data_path = os.path.join(output_dir, "output_rmsprop.csv")

    metadata.to_csv(metadata_path, index=False)
    output_data.to_csv(output_data_path, index=False)

In [7]:
def get_hessian_metrics(model, optimizer, criterion, iters=30):

    # Build graph for gradient
    outputs = model(X)
    loss = criterion(outputs, y)

    grads = torch.autograd.grad(
        loss, model.param_list,
        create_graph=True
    )
    g_flat = torch.cat([g.reshape(-1) for g in grads])
    dim    = g_flat.numel()
    device = g_flat.device

    # Computes Hessian-vector product with Pearlmutter trick
    def Hv(v):
        Hv_list = torch.autograd.grad(
            g_flat @ v,
            model.param_list,
            retain_graph=True
        )
        return torch.cat([h.reshape(-1) for h in Hv_list])


    # Compute adaptive scaling matrix D (sqrt) for effective Hessian
    v_t = torch.cat([state['square_avg'].reshape(-1)
                     for state in optimizer.state.values()]
                    ).detach()
    eta = optimizer.param_groups[0]['lr']
    eps = optimizer.param_groups[0]['eps']
    D_sqrt = torch.sqrt(eta / torch.sqrt(v_t + eps))

    # Compute effective Hessian-vector product
    def Av(v):
        return D_sqrt * Hv(D_sqrt * v)
    
    # Performs power iteration to estimate largest eigenvalue
    def power_iteration(matvec):
        v = torch.randn(dim, device=device)
        v /= v.norm()
        for _ in range(iters):
            v = matvec(v)
            v /= v.norm()

        max_eig_val = (v @ matvec(v)).item()
        max_eig_vec = v
        return max_eig_val, max_eig_vec

    lambda_H , eig_vec_H = power_iteration(Hv)
    lambda_A , eig_vec_A = power_iteration(Av)

    alpha_t = (v_t @ (eig_vec_A**2)).item()

    # Estimate trace with Hutchinson's method
    trace_H = 0.0
    for _ in range(iters):
        z = torch.randint(0, 2, (dim,), device=device).float() * 2 - 1
        Hz = Hv(z)
        trace_H += (Hz * z).sum().item()
    trace_H /= iters

    return lambda_H, lambda_A, trace_H, alpha_t

In [8]:
def train_model(model, optimizer, criterion, epochs, X, y, X_test, y_test):
    learning_rate = optimizer.param_groups[0]['lr']

    model.to(device)
    model.train()

    train_losses = np.empty(epochs)
    train_accuracies = np.empty(epochs)
    test_accuracies = np.empty(epochs)
    H_sharps = np.empty(epochs)
    A_sharps = np.empty(epochs)
    H_traces = np.empty(epochs)
    alpha_ts = np.empty(epochs)

    start = time.time()

    for epoch in range(epochs):

        optimizer.zero_grad()
        outputs = model(X)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        train_losses[epoch] = loss.item()

        H_sharps[epoch], A_sharps[epoch], H_traces[epoch], alpha_ts[epoch] = get_hessian_metrics(
            model, optimizer, criterion
        )

        with torch.no_grad():
            model.eval()
            train_preds = outputs.argmax(dim=1)
            test_preds = model(X_test).argmax(dim=1)
            train_accuracies[epoch] = (train_preds == y).float().mean().item()
            test_accuracies[epoch] = (test_preds == y_test).float().mean().item()
        model.train()

        if (epoch+1) % 20 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}, " +
                  f"Time: {round(((time.time() - start) / 60), 2)}")

    metadata, output_data = setup_output_files("output")
    model_id = metadata.shape[0] + 1

    metadata.loc[metadata.shape[0]] ={
        "model_id": model_id,
        "model_type": model.__class__.__name__,
        "num_hidden_layers": model.num_hidden_layers,
        "hidden_layers_size": model.hidden_layers_size,
        "activation_function": model.activation.__name__,
        "optimizer": optimizer.__class__.__name__,
        "learning_rate": learning_rate,
        "num_epochs": epochs,
        "time_minutes": round((time.time() - start) / 60, 2),
    }

    output_data = pd.concat([output_data, pd.DataFrame({
        "model_id": np.ones_like(train_losses) * model_id,
        "epoch": np.arange(1, epochs + 1),
        "loss": train_losses,
        "sharpness_H": H_sharps,
        "sharpness_A": A_sharps,
        "test_accuracy": test_accuracies,
        "train_accuracy": train_accuracies,
        "trace_H": H_traces,
        "alpha_t": alpha_ts
    })], ignore_index=True)

    save_output_files(metadata, output_data)

In [10]:
input_size = X.shape[1] * X.shape[2] # 400
num_hidden_layers = 2
hidden_layer_size = 21
learning_rates = [0.01, 0.003, 0.001, 0.0003, 0.0001]
epochs = 5000

for learning_rate in learning_rates:

    model = FullyConnectedNet(
        input_size=input_size,
        num_hidden_layers=num_hidden_layers,
        hidden_layer_size=hidden_layer_size,
        num_labels=num_labels,
        activation=nn.Tanh
    )

    optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)

    criterion = nn.CrossEntropyLoss()

    train_model(model, optimizer, criterion, epochs, X, y, X_test, y_test)


AttributeError: 'FullyConnectedNet' object has no attribute 'param_list'

In [14]:
md, out = load_output_files()
md

Unnamed: 0,model_id,model_type,num_hidden_layers,hidden_layers_size,activation_function,optimizer,learning_rate,num_epochs,time_minutes
0,1,FullyConnectedNet,2,21,ReLU,RMSprop,0.003,10000,35.9
1,2,FullyConnectedNet,2,21,ReLU,RMSprop,0.001,10000,35.57
2,3,FullyConnectedNet,2,21,ReLU,RMSprop,0.0003,10000,35.14
3,4,FullyConnectedNet,2,21,ReLU,RMSprop,0.0001,10000,35.02
4,5,FullyConnectedNet,2,21,ReLU,RMSprop,3e-05,10000,37.07
5,6,FullyConnectedNet,2,21,ReLU,RMSprop,0.01,10000,35.89
6,7,FullyConnectedNet,2,21,ReLU,RMSprop,0.003,10000,36.74
7,8,FullyConnectedNet,2,21,ReLU,RMSprop,0.001,10000,37.28
8,9,FullyConnectedNet,2,21,ReLU,RMSprop,0.0003,10000,36.41
9,10,FullyConnectedNet,2,21,ReLU,RMSprop,0.0001,10000,35.73


In [16]:
def plot_output_data(metadata, output, model_id):
    metadata = metadata[metadata['model_id']==model_id]
    output = output[output['model_id']==model_id]
    
    xs = np.arange(metadata['num_epochs'].iloc[0])
    losses = output['loss']
    sharpness_H = output['sharpness_H']
    sharpness_A = output['sharpness_A']
    train_accuracy = output['train_accuracy']
    test_accuracy = output['test_accuracy']
    trace_H = output['trace_H']

    fig = make_subplots(rows = 2, cols = 1, 
                        specs=[[{"secondary_y": True}],
                               [{"secondary_y": True}]],
                        vertical_spacing=0.1)
    
    fig.add_trace(
        go.Scatter(x=xs, y=losses, name="Training Loss",line=dict(width=2)),
        secondary_y=False, row=1, col=1
    )

    fig.add_trace(
        go.Scatter(x=xs, y=sharpness_A, name="Max Eigenvalue of A", line=dict(width=2)),
        secondary_y=True, row=1, col=1
    )

    fig.add_trace(
        go.Scatter(x=xs, y=test_accuracy, name="Test Accuracy", line=dict(width=2)),
        secondary_y=False, row=2, col=1
    )

    fig.add_trace(
        go.Scatter(x=xs, y=trace_H, name="Trace of H", line=dict(width=2)),
        secondary_y=True, row=2, col=1
    )

    fig.add_trace(
        go.Scatter(x=xs, y=sharpness_H, name="Max Eigenvalue of H", line=dict(width=2)),
        secondary_y=True, row=1, col=1
    )

    fig.update_yaxes(title_text="Training Loss", secondary_y=False, range = [0,2], showgrid=False,
                     row=1, col=1)
    fig.update_yaxes(title_text="Max Eigenvalue of A", secondary_y=True, range = [0,25],
                     row=1, col=1)
    
    fig.update_xaxes(title_text="epoch")
    fig.update_layout(height = 800, width = 1200)
    
    fig.show()

In [26]:
plot_output_data(md, out, 6)

In [27]:
plot_output_data(md, out, 7)

In [28]:
plot_output_data(md, out, 8)

In [29]:
plot_output_data(md, out, 9)

In [30]:
plot_output_data(md, out, 10)

In [65]:
model_id = 2

plot_output_data(md, out, model_id)

loss = out[out['model_id']==model_id]['loss'].values
((loss[:-1] - loss[1:]) < 0)

fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(len(loss)-1), y=((loss[:-1] - loss[1:]) < 0),
                        mode='lines', name='Stability',line=dict(width=.5)))
fig.update_layout(
    xaxis_title='Epoch',
    yaxis_title='Stability',
    height=600,
    width=1200,
    showlegend=True
)

In [30]:
model.eval()
with torch.no_grad():
    outputs = model(X_test.to(device))
    preds = outputs.argmax(dim=1)
    correct = (preds == y_test.to(device)).sum().item()
    accuracy = correct / len(y_test)

print(f"Test accuracy: {accuracy:.4f}")

NameError: name 'model' is not defined