# Theory Approving Experiment

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data.datapipes.iter.grouping import UnBatcherIterDataPipe
from torchvision import transforms
import numpy as np
from tqdm.auto import tqdm

from src.models import MLP
from src.models import train
from src.utils import init_dataloader
from src.eigenvalues import HessianEigenvector
from src.models import get_loss
from src.visualize import DeltaVisualizer
from src.calc import DeltaCalculator
from src.directions import EigenDirection

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [2]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_loader = init_dataloader(
    dataset_name='MNIST',
    transform=transform,
    batch_size=64,
    dataset_load_path='data/',
    train_mode=True,
    size=64 * (10000 // 64)
)

test_loader = init_dataloader(
    dataset_name='MNIST',
    transform=transform,
    batch_size=64,
    dataset_load_path='data/',
    train_mode=False,
    size=64 * (10000 // 64)
)

In [3]:
model = MLP(layers_num=2, hidden=256, input_channels=1, input_sizes=(28, 28), classes=10).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train(model, criterion, train_loader, optimizer, num_epochs=10)

loss = get_loss(model, criterion, train_loader, device=DEVICE)
func = torch.cumsum(loss, dim=0) / torch.arange(1, loss.size(0) + 1, device=model.device)

In [None]:
eigen = []
for i in tqdm(range(50, len(func))):
    eigen.append(HessianEigenvector(model.parameters(), func[i]).get(10)[0])
eigen = np.array(eigen)

  0%|          | 0/106 [00:00<?, ?it/s]

In [None]:
from numpy.lib.stride_tricks import sliding_window_view


def sliding_avg(eigen_matrix, k):
    pad = np.pad(eigen_matrix, k, mode='edge')
    windows = sliding_window_view(pad, window_shape=2 * k + 1)
    return windows.mean(axis=1)

In [None]:
def get_theoretical(eigen_matrix):
    diff = eigen_matrix[1:] - eigen_matrix[:-1]
    return (2 * np.square(diff).sum(axis=1) + np.square(diff.sum(axis=1))) / 4

In [None]:
core_e = EigenDirection(model, criterion, train_loader)
calc_e = DeltaCalculator(model, criterion, train_loader, core_e)
vis_e = DeltaVisualizer(calc_e)

In [None]:
import matplotlib as mpl

mpl.rcParams['xtick.labelsize'] = 26
mpl.rcParams['ytick.labelsize'] = 26
mpl.rcParams['axes.labelsize'] = 28
mpl.rcParams['legend.fontsize'] = 24

theory = sliding_avg(get_theoretical(eigen), 5)

params = {'dim': 10, 'sigma': 1}
num_samples = 1024
deltas = vis_e.visualize_border(theory,
                                params=params,
                                num_samples=num_samples,
                                begin=10, striding_func=lambda x: sliding_avg(x, 5), return_deltas=True)

In [None]:
empiric = deltas[50:]

loss_val = func[50:-1].cpu().detach().numpy()

_, axs = plt.subplots(figsize=(18, 14), nrows=2, ncols=1)

axs[0].scatter(loss_val, empiric)

k1, b1 = np.polyfit(loss_val, empiric, 1)
y1_trend = k1 * loss_val + b1
axs[0].plot(loss_val, y1_trend, color='red', linewidth=2, label=f'Trend line')
axs[0].set(xlabel=r'$\mathcal {L}_k$', ylabel=r'$\Delta_k$ Empirical')
axs[0].legend()

axs[1].scatter(loss_val, theory)

k2, b2 = np.polyfit(loss_val, theory, 1)
y2_trend = k2 * loss_val + b2
axs[1].plot(loss_val, y2_trend, color='green', linewidth=2, label=f'Trend line')
axs[1].set(xlabel=r'$\mathcal {L}_k$', ylabel=r'$\Delta_k$ Theoretical')
axs[1].legend()

plt.savefig("../paper/img/delta_loss" + f"_{params['sigma']}_{params['dim']}_{num_samples}" + ".pdf")
plt.show()