In [None]:
import numpy as np
import pandas as pd
from itertools import product
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, random_split

import matplotlib.pyplot as plt

import seaborn as sns
sns.set_style('white')
sns.set_context('talk')

In [None]:
def compute_wht_spectrum(model, H, all_inputs, device="cuda"):
    '''
    Compute the WH spectrum of the DNN model
    Used for EN-regularization
    '''
    all_inputs = all_inputs.to(device)
    all_inputs = all_inputs.float()
    H = H.to(device)
    # get all outputs of model (aka landscape)
    landscape = model(all_inputs).reshape(-1)
    # compute spectrum by matrix-vector multiplication with WHT basis
    landscape = landscape.double()
    spectrum = torch.matmul(H, landscape)
    # clear up gpu memory
    H.to("cpu")
    all_inputs.to("cpu")
    return spectrum

def walsh_hadamard_matrix(L=13, normalize=False):
    '''
    Compute the WHT matrix for domain of dimension L
    '''
    H1 = np.asarray([[1.,1.], [1.,-1.]])
    H = np.asarray([1.])
    for i in range(L):
        H = np.kron(H, H1)
    if normalize:
        H = (1 / np.sqrt(2**L)) * H
    return H
ratio_matrices = []

## Defining model architectures

In [None]:
class Net_fc_2(nn.Module):
    def __init__(self, n, multiplier):
        super(Net_fc_2, self).__init__()
        self.fc1 = nn.Linear(n, multiplier*n)
        self.bn1 = nn.BatchNorm1d(multiplier*n)
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        self.fc2 = nn.Linear(multiplier*n, 1)
        torch.nn.init.xavier_uniform_(self.fc2.weight)
    
    def forward(self, x):
        x = self.bn1(F.leaky_relu(self.fc1(x)))
        x = self.fc2(x)
        return x.reshape(-1)
    
class ConvNeuralNet2(nn.Module):
    def __init__(self):
        super(ConvNeuralNet2, self).__init__()
        self.conv1 = torch.nn.Conv1d(in_channels=13, out_channels=13, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(1, 1)
        self.batchnorm1 = nn.BatchNorm1d(13)
        self.conv2 = torch.nn.Conv1d(in_channels=13, out_channels=1, kernel_size=3, padding=1)
        self.fc2 = nn.Linear(1, 1)
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        torch.nn.init.xavier_uniform_(self.fc2.weight)
        
    def forward(self, x):
        x = self.batchnorm1(F.leaky_relu(self.fc1(self.conv1(x))))
        x = self.fc2(F.leaky_relu(self.conv2(x)))
        return x.reshape(-1)
    
class ConvNeuralNet4(nn.Module):
    def __init__(self):
        super(ConvNeuralNet4, self).__init__()
        self.conv1 = torch.nn.Conv1d(in_channels=13, out_channels=13, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(1, 1)
        self.batchnorm1 = nn.BatchNorm1d(13)
        self.conv2 = torch.nn.Conv1d(in_channels=13, out_channels=13, kernel_size=3, padding=1)
        self.fc2 = nn.Linear(1, 1)
        self.batchnorm2 = nn.BatchNorm1d(13)
        self.conv3 = torch.nn.Conv1d(in_channels=13, out_channels=10, kernel_size=3, padding=1)
        self.fc3 = nn.Linear(1, 1)
        self.batchnorm3 = nn.BatchNorm1d(10)
        self.conv4 = torch.nn.Conv1d(in_channels=10, out_channels=1, kernel_size=3, padding=1)
        self.fc4 = nn.Linear(1, 1)
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        torch.nn.init.xavier_uniform_(self.fc2.weight)
        torch.nn.init.xavier_uniform_(self.fc3.weight)
        torch.nn.init.xavier_uniform_(self.fc4.weight)
        torch.nn.init.xavier_uniform_(self.conv1.weight)
        torch.nn.init.xavier_uniform_(self.conv2.weight)
        torch.nn.init.xavier_uniform_(self.conv3.weight)
        torch.nn.init.xavier_uniform_(self.conv4.weight)
        
    def forward(self, x):
        x = self.batchnorm1(F.leaky_relu(self.fc1(self.conv1(x))))
        x = self.batchnorm2(F.leaky_relu(self.fc2(self.conv2(x))))
        x = self.batchnorm3(F.leaky_relu(self.fc3(self.conv3(x))))
        x = self.fc4(F.leaky_relu(self.conv4(x)))
        return x.reshape(-1)
    
class Net_fc_4(nn.Module):
    def __init__(self, n, multiplier):
        super(Net_fc_4, self).__init__()
        self.fc1 = nn.Linear(n, multiplier*n)
        self.bn1 = nn.BatchNorm1d(multiplier*n)
        self.fc2 = nn.Linear(multiplier*n, multiplier*n)
        self.bn2 = nn.BatchNorm1d(multiplier*n)
        self.fc3 = nn.Linear(multiplier*n, n)
        self.bn3 = nn.BatchNorm1d(n)
        self.fc4 = nn.Linear(n, 1)
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        torch.nn.init.xavier_uniform_(self.fc2.weight)
        torch.nn.init.xavier_uniform_(self.fc3.weight)
        torch.nn.init.xavier_uniform_(self.fc4.weight)

    def forward(self, x):
        x = self.bn1(F.leaky_relu(self.fc1(x)))
        x = self.bn2(F.leaky_relu(self.fc2(x)))
        x = self.bn3(F.leaky_relu(self.fc3(x)))
        x = self.fc4(x)
        return x.reshape(-1)

In [None]:
def train_model(model, train_dl, val_dl, H, all_inputs, lr=1e-1, weight_decay=0.0, reg_lambda=1e-3, num_epochs=200, device="cuda"):
    model.to(device)
    optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    for epoch in range(num_epochs):
        ###############################
        ########## TRAIN LOOP #########
        ###############################
        print('training epoch ' + str(epoch))
        if epoch >= 0:
            #modify model path
            path = "testing/models/fc_net_reg_xavier/" + "2_layer_fc_net_epoch_" + str(epoch) + ".pt"
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_State_dict': optim.state_dict(),
            }, path)
        model.train()
        for X, y in train_dl:
            optim.zero_grad()
            X, y = X.to(device), y.to(device)
            X = X.float()
            y = y.float()
            y_hat = model(X)
            loss = F.mse_loss(y, y_hat)
            #Apply EN-regularization
            spectrum = compute_wht_spectrum(model, H, all_inputs, device=device)
            reg_loss = F.l1_loss(spectrum, torch.zeros_like(spectrum))
            loss += reg_lambda * reg_loss
            loss.backward()
            optim.step()

        ###############################
        ########## VAL LOOP #########
        ###############################
        model.eval()
        with torch.no_grad():
            X, y = next(iter(train_dl))
            X, y = X.to(device), y.to(device)
            X = X.float()
            y = y.float()
            #add these lines for CNN model training
#             X = X.view([X.shape[0], X.shape[1], 1, 1])
#             y = y.view([y.shape[0], 1, 1, 1])
            y_hat = model(X)
            val_loss = F.mse_loss(y, y_hat).item()
            print(f"Validation Loss: {val_loss:.3f}")
    return model

In [None]:
def evaluate(model_name):
    ratio_matrix = np.zeros((100,298))
    sigma_vec = np.linspace(0.03, 3, 298)
    for sigma_ind,sigma_val in enumerate(sigma_vec):
        print('sigma=',sigma_val)
        for repeat in range(100):
            random.seed(4)
            np.random.seed(4)
            torch.manual_seed(4)
            
            model = Net_fc_2(L, 1)
            optim = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.0)
            #load model here and run this function everytime we need to evaluate a new model
            PATH = "testing/models/fc_net_reg_xavier/" + model_name
            checkpoint = torch.load(PATH)
            model.load_state_dict(checkpoint['model_state_dict'])
            optim.load_state_dict(checkpoint['optimizer_State_dict'])
            epoch = checkpoint['epoch']

            model.eval()
            with torch.no_grad():
                X, y = next(iter(train_dl))
                X, y = X.to(device), y.to(device)
                X = X.float()
                y = y.float()
                y_hat_start = model(X)

            seed_multiplier = 0
            random.seed(repeat+1000000*seed_multiplier)
            np.random.seed(repeat+1000000*seed_multiplier)
            torch.manual_seed(repeat+1000000*seed_multiplier)

            with torch.no_grad():
                for param in model.parameters():
                    added_noise = torch.randn(param.size()) * sigma_val
                    param.add_(added_noise)

            with torch.no_grad():
                X, y = next(iter(train_dl))
                X, y = X.to(device), y.to(device)
                X = X.float()
                y = y.float()
                y_hat = model(X)
                test_loss = F.mse_loss(y_hat, y_hat_start).item()
                actual_test_loss = F.mse_loss(y, y_hat_start).item()
                ratio_matrix[repeat,sigma_ind]=test_loss
    return ratio_matrix

In [None]:
L = 13
N = 2**L
H = walsh_hadamard_matrix(L=L)
all_inputs = np.asarray(list((product((0,1), repeat=L))))
H = torch.tensor(H, dtype=torch.float)
all_inputs = torch.tensor(all_inputs, dtype=torch.float)

In [None]:
################################################
###### Get Dataset and Split Train/Test ########
################################################
def generate_training_data(num_examples, extra=None):
    polynomial = '3*x_1 + 4*x_2*x_3 + 5*x_4*x_5 + x_12'
    all_inputs = np.asarray(list((product((-1,1), repeat=L))))
    all_outputs = []
    for i in range(N):
        x_1, x_2, x_3, x_4, x_5, x_6, x_7, x_8, x_9, x_10, x_11, x_12, x_13 = all_inputs[i]
        all_outputs.append(eval(polynomial))
    all_inputs = torch.tensor(all_inputs)
    all_outputs = torch.tensor(all_outputs)
    return all_inputs, all_outputs

X_all, y_all = generate_training_data(100, extra=None)
#reshape data if training cnn 
# X_all = X_all.reshape([X_all.shape[0], X_all.shape[1], 1])

In [None]:
################################################
###### Get Dataset and Split Train/Test ########
################################################

torch.manual_seed(4)
random.seed(4)
np.random.seed(4)

n_train = 1000
bs = 16
n = X_all.shape[0]
ds = TensorDataset(X_all,y_all)
# Create Datasets / DataLoaders
train_ds, val_ds, test_ds = random_split(ds, lengths=[n_train, n_train, n - 2*n_train])
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=len(val_ds), shuffle=False)
test_dl = DataLoader(test_ds, batch_size=len(test_ds), shuffle=False)

In [None]:
#hyperparameters for training
lr = 5e-3
weight_decay = 0.0
reg_lambda = 1e-3  ## with EN regularization
num_epochs = 100
device = "cpu"
H = torch.tensor(walsh_hadamard_matrix(L=L))

torch.manual_seed(4)
random.seed(4)
np.random.seed(4)
model = Net_fc_2(L, 1)
model = train_model(model, train_dl, val_dl, H, X_all, 
                    lr=lr, weight_decay=weight_decay, reg_lambda=reg_lambda, 
                    num_epochs=num_epochs, device=device)

model.eval()
with torch.no_grad():
    X, y = next(iter(train_dl))
    X, y = X.to(device), y.to(device)
    X = X.float()
    y = y.float()
    y_hat = model(X)
    test_loss = F.mse_loss(y_hat, y).item()
    print(f"Test Loss: {test_loss:.3f}")

In [None]:
################################################
###### Get Dataset and Split Train/Test ########
################################################
# (for evaluation)

torch.manual_seed(4)
random.seed(4)
np.random.seed(4)

n_train = 1000
bs = 1000
n = X_all.shape[0]
ds = TensorDataset(X_all,y_all)
# Create Datasets / DataLoaders
train_ds, val_ds, test_ds = random_split(ds, lengths=[n_train, n_train, n - 2*n_train])
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=len(val_ds), shuffle=False)
test_dl = DataLoader(test_ds, batch_size=len(test_ds), shuffle=False)

In [None]:
models = ["2_layer_fc_net_epoch_0.pt", "2_layer_fc_net_epoch_1.pt", "2_layer_fc_net_epoch_2.pt", "2_layer_fc_net_epoch_3.pt"]
for model in models:
    ratio_matrices.append(evaluate(model))

## plotting results

In [None]:
sigma_vec = np.linspace(0.03, 3, 298)
plt.loglog(np.multiply(sigma_vec,sigma_vec),np.divide(np.min(ratio_matrices[0],axis=0),np.multiply(sigma_vec,sigma_vec)),'o', markersize=3, label = '0')
plt.loglog(np.multiply(sigma_vec,sigma_vec),np.divide(np.min(ratio_matrices[1],axis=0),np.multiply(sigma_vec,sigma_vec)),'o', markersize=3, label = '1')
plt.loglog(np.multiply(sigma_vec,sigma_vec),np.divide(np.min(ratio_matrices[2],axis=0),np.multiply(sigma_vec,sigma_vec)),'o', markersize=3, label = '2')
plt.loglog(np.multiply(sigma_vec,sigma_vec),np.divide(np.min(ratio_matrices[3],axis=0),np.multiply(sigma_vec,sigma_vec)),'o', markersize=3, label = '3')
plt.ylabel(r'$\min \sum_i \|g_{\theta}(x_i)-g_{\theta^*}(x_i)\|_2^2 / \sigma^2$')
plt.legend(loc="lower left", prop={'size': 10})
plt.xlabel(r'$\sigma^2$')
plt.show()