# Thesis code

Notice that there are a lot of repitions within the notebook - this was in order for the different parts to be mostly independant from one another.

In [None]:
# All libraries used for our work

import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from scipy.spatial.distance import cdist
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import explained_variance_score, accuracy_score
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import torch.nn.utils.prune as prune
from torch.utils.data import DataLoader, TensorDataset, Dataset
import matplotlib.ticker as ticker
import tqdm
import copy
import optuna
import scipy.stats as stats

In [None]:
# Seaborn 'Set2' colors - these colors were used for plotting

good_colors = ['cornflowerblue', 'darkorange', 'forestgreen', 'hotpink', 'dimgray', 'gold', 'rebeccapurple', 'mediumseagreen']

# Clean Noise

The following is a noise removal procedure that corresponds to the discussion in section 3.3.1. Inspired by the Boruta algorithm presented by Kursa and Rudnicki in 2010 (https://www.jstatsoft.org/article/view/v036i11) we use Random Forest together with a feature created using Gaussian noise to detect which of features in the data are less important than noise.

In [None]:
# Data prep

df = pd.read_csv('Breast_cancer.csv') # Reading the data
df['diagnosis'] = df['diagnosis'].apply(lambda x: 1 if x == 'M' else 0) # Encoding the labels as 0s and 1s
df = df.drop(['id','Unnamed: 32'], axis = 1) # Removing non-feature columns present in the data
df['rand'] = np.random.normal(0,1,len(df.iloc[:,0])) # Adding a noise "feature"

# Normalizing
columns = list(df.columns)
columns.remove('diagnosis')

for column in columns:
    mean = df[column].mean()
    std = df[column].std()
    df[column] = (df[column]-mean)/std

In [None]:
rf = RandomForestClassifier(n_estimators=100) # Creatuing a Random Forest model

# Calculating importance scores using a Bootstapping approach for the construction of pseudo-confidence intervals
n_iterations = 100

importance_scores = []
for _ in range(n_iterations):
    bootstrap_indices = np.random.choice(len(df[columns]), size=len(df[columns]), replace=True)
    X_bootstrap = df[columns].iloc[bootstrap_indices]
    y_bootstrap = df['diagnosis'].iloc[bootstrap_indices]
    
    rf.fit(X_bootstrap, y_bootstrap)
    importance_scores.append(rf.feature_importances_)

In [None]:
# Calculating the pseudo-confidence intervals for all features
HW = []
mean_importance = []
n = len(importance_scores)
for i in range(len(columns)):
    temp = []
    for j in range(n):
        temp.append(importance_scores[j][i])
    mean_importance.append(np.mean(temp))
    std = np.std(temp, ddof=1)
    HW.append(stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n)))

In [None]:
# Presenting our results
plt.figure(figsize=(8,6))
x = [i for i in range(len(columns))]
plt.bar(x,mean_importance, color = good_colors[0])
plt.errorbar(x,mean_importance, yerr=HW, fmt=".", color=good_colors[1])
plt.xlabel('Features', fontsize = '18')
plt.xticks(x, columns)
plt.xticks(fontsize = '10', rotation = 90)
plt.ylabel('Importance Score', fontsize = '18')
plt.yticks(fontsize = '14')
plt.grid()
plt.savefig('Importance.png', bbox_inches='tight')
plt.show()

In [None]:
# Preparing a list of features less important than the noise feature that should be removed
remove = []
threshold = mean_importance[-1] + HW[-1]
for i in range(len(columns)-1):
    if mean_importance[i] - HW[i] < threshold:
        remove.append(columns[i])
remove.append(columns[-1])

In the case of the Wisconsin Breast Cancer data set `remove` was empty, and there was no need for data augmentation.

# Mutual Information & Harness Metric

The following is the calculations of the hardness metric discussed in section 3.3.2. The first cell corresponds to the calculation of the mutual information and the secind to the calculation of the hardness metric itself. In both cases the implementation is via a approximation of the distributions using histograms.

In [None]:
# Data prep
df = pd.read_csv('Breast_cancer.csv') # Reading the data
df = df.drop(['id','Unnamed: 32'], axis = 1) # Removing non-feature columns present in the data
df['diagnosis'] = df['diagnosis'].apply(lambda x: 1 if x == 'M' else 0) # Encoding the labels as 0s and 1s
columns = list(df.columns) # Prepare a list of the features for normalization
columns.remove('diagnosis') # Remove the response feature which shouldn't go through normalization

# Normalization
for column in columns:
    mean = df[column].mean()
    std = df[column].std()
    df[column] = (df[column]-mean)/std
    
# Preparing the distribution of the labels
class_probs = {}
total_samples = len(df)
for label in set(df['diagnosis']):
    class_probs[label] = np.sum(df['diagnosis'] == label) / total_samples
    
# Set the number of bins to the either 30 or the square root of the number of samples, which ever one's smaller
bin_num = min(30,int(np.sqrt(total_samples))) 
                                              
# Preparing the features' distributions
feature_probs = {}
features = df[columns].values
for i in range(features.shape[1]):
    # Preparing the 'pure' distribution P(X=x)
    feature_values = features[:,i]
    bins = np.linspace(min(feature_values),max(feature_values),bin_num+1)
    hist = np.histogram(feature_values, bins=bins, density=False)[0]
    feature_probs['pure', i] = [hist[i]/sum(hist) for i in range(len(hist))]
    # Preparing the dependant distribution P(X=x|Y=y)
    for label in set(df['diagnosis']):
        mask = (df['diagnosis'] == label)
        feature_values = features[mask, i]
        hist = np.histogram(feature_values, bins=bins, density=False)[0]
        feature_probs[label, i] = [hist[i]/sum(hist) for i in range(len(hist))]

# Calculating the mutual information between each feature and the labels
ind_mutual_information = []
for i in range(features.shape[1]):
    temp = 0
    for j in range(bin_num):
        for label in set(df['diagnosis']):
            p_x = feature_probs['pure', i][j]
            p_y = class_probs[label]
            p_xy = feature_probs[label, i][j]*p_y
            if p_xy != 0:
                temp = temp + p_xy*np.log(p_xy/(p_x*p_y))
    ind_mutual_information.append(temp)

# Presenting the sum of mutual information over all features
print("Mutual Information:", np.sum(ind_mutual_information))

In [None]:
# Data prep
df = pd.read_csv('Breast_cancer.csv') # Reading the data
df = df.drop(['id','Unnamed: 32'], axis = 1) # Removing non-feature columns present in the data
df['diagnosis'] = df['diagnosis'].apply(lambda x: 1 if x == 'M' else 0) # Encoding the labels as 0s and 1s
columns = list(df.columns) # Prepare a list of the features for normalization
columns.remove('diagnosis') # Remove the response feature which shouldn't go through normalization

# Normalization
for column in columns:
    mean = df[column].mean()
    std = df[column].std()
    df[column] = (df[column]-mean)/std
    
# Preparing the distribution of the labels
class_probs = {}
total_samples = len(df)
for label in set(df['diagnosis']):
    class_probs[label] = np.sum(df['diagnosis'] == label) / total_samples
    
# Calculating the entropy of the labels
class_entropy = sum([-class_probs[label]*np.log(class_probs[label]) for label in set(df['diagnosis'])])

# Set the number of bins to the either 30 or the square root of the number of samples, which ever one's smaller
bin_num = min(30,int(np.sqrt(total_samples)))

# Preparing the features' distributions
feature_probs = {}
features = df[columns].values
for i in range(features.shape[1]):
    # Preparing the 'pure' distribution P(X=x)
    feature_values = features[:,i]
    bins = np.linspace(min(feature_values),max(feature_values),bin_num+1)
    hist = np.histogram(feature_values, bins=bins, density=False)[0]
    feature_probs['pure', i] = [hist[i]/sum(hist) for i in range(len(hist))]
    # Preparing the dependant distribution P(X=x|Y=y)
    for label in set(df['diagnosis']):
        mask = (df['diagnosis'] == label)
        feature_values = features[mask, i]
        hist = np.histogram(feature_values, bins=bins, density=False)[0]
        feature_probs[label, i] = [hist[i]/sum(hist) for i in range(len(hist))]

# Calculating the mutual information between each feature and the labels
ind_mutual_information = []
for i in range(features.shape[1]):
    temp = 0
    for j in range(bin_num):
        for label in set(df['diagnosis']):
            p_x = feature_probs['pure', i][j]
            p_y = class_probs[label]
            p_xy = feature_probs[label, i][j]*p_y
            if p_xy != 0:
                temp = temp + p_xy*np.log(p_xy/(p_x*p_y))
    ind_mutual_information.append(temp)

# Presenting the resulting hardness metric as it was defined in section 3.3.2.
print("Hardness metric:", np.sum(ind_mutual_information)/(len(set(df['diagnosis']))*class_entropy))

# PCA

The following is the implementation of PCA using standard tools from sklearn as one of our measurements of data ID as was described under section 3.3.3.

In [None]:
# Data prep
df = pd.read_csv('Breast_cancer.csv') # Reading the data
df2 = df.drop(['id','diagnosis','Unnamed: 32'], axis = 1) # Removing non-feature columns present in the data as
                                                         # well as the labels which are not used for PCA
# Normalizing
transform = StandardScaler()
scaled = transform.fit_transform(df2)
scaled_df2 = pd.DataFrame(scaled, index=df2.index, columns=df2.columns)

In [None]:
# Performing PCA
pca = PCA(n_components=len(df2.columns))
components = pca.fit_transform(scaled)

# Calculating the cumulative explained variance
explained = []
for i in range(len(df2.columns)):
    explained.append(sum(pca.explained_variance_ratio_[0:i+1]))

# Plotting the results    
num = [i+1 for i in range(len(df2.columns))]
threshold90 = 0.9*np.ones(len(df2.columns))
threshold95 = 0.95*np.ones(len(df2.columns))

plt.figure(figsize=(8,6))
plt.bar(num,explained, color = good_colors[0])
plt.plot(num,threshold90,'--', color='red')
plt.plot(num,threshold95,'--', color='darkred')
plt.xlabel('Number of Components', fontsize = '18')
plt.xticks(fontsize = '14')
plt.ylabel('Accumulated Explained Variance', fontsize = '18')
plt.yticks(fontsize = '14')
plt.grid()
plt.savefig('PCA.png', bbox_inches='tight')
plt.show()

# GP+CV

The following is the implementation of the GP+CV algorithm as was described under section 3.3.3 as a second method of data ID estimation based on the papers by Grassberger and Procaccia from 1983 (https://www.sciencedirect.com/science/article/abs/pii/0167278983902981) and by Camastra and Vinciarelli from 2002 (https://ieeexplore.ieee.org/document/1039212).

Notice that below we use `scaled` which was created in the PCA part above. 

In [None]:
def GP(array):
    # A function that creates the data required for the curve to calculate the
    # correlation dimension using the GP algorithm - i.e., it calculates the
    # natural logarithm of the correlation integral for various values of r.
    # 'array' is a numpy array that has samples as rows and columns as features
    # (the usual way...).
    dist_matrix = cdist(array, array, metric='euclidean') # Calculating a distance matrix for the data
    dist_matrix1 = dist_matrix.copy() # Creating a second matrix with 'inf' values on the diagonal to eliminate zeros
    dist_matrix1[np.diag_indices_from(dist_matrix1)] = np.inf
    N = array.shape[0]
    
    # Creating a distances vector that covers the relevant distances (those that exist between data points)
    if np.min(dist_matrix1) == 0:
        r = np.logspace(-5,np.log10(np.max(dist_matrix)),1000)
    else:
        r = np.logspace(np.log10(np.min(dist_matrix1)),np.log10(np.max(dist_matrix)),1000)
    
    # Calculating the correlation integral
    logC = []
    for i in range(len(r)):
        logC.append(np.log((sum(sum(dist_matrix <= r[i]))-N)/(N*(N-1))))
    
    # Saving the calculating result ignoring senless data that's coming from empty distance bins
    infnum = sum(np.isinf(logC))
    logC = np.array(logC)
    r[0:infnum] = r[infnum]
    logC[0:infnum] = logC[infnum]
    return [np.log(r),logC]

In [None]:
# Creating all required data for the GP+CV algorithm reference curve using random values in hypercubes

# A dataframe to include all of the relevant data
df = dict()

# Performing with the GP algorithm on random hypercube data
for i in range(scaled.shape[1]):
    res = GP(np.random.rand(scaled.shape[0],i+1))
    df['logr ' + str(i+1)] = res[0]
    df['logC ' + str(i+1)] = res[1]

# Performing with the GP algorithm on the data of interest
res = GP(scaled)
df['logr data'] = res[0]
df['logC data'] = res[1]

# Saving all the data to be further analyzed using MATLAB
df = pd.DataFrame.from_dict(df)
df.to_csv('GPCV.csv',index=False)

In [None]:
# Plotting the results of our MATLAB analysis

# Main graph
logr = df['logr data']
logC = df['logC data']
x = np.linspace(np.min(logr),1.4,1000)
y = 7.095*x - 10.36

# Reference curve
ref_d = [1,2,3,4,5,6,7,8,9,10]
ref_D = [0.9833,1.974,2.897,3.683,4.575,5.22,5.659,6.43,6.987,7.893]

# Plotting the main graph
plt.figure(figsize=(8, 6))
plt.plot(logr, logC, '.', color=good_colors[0])
plt.plot(x,y,'--',color=good_colors[1])
plt.xlabel(r'$\log{\left(r\right)}$',size='18')
plt.ylabel(r'$\log{\left(C_{m}\left(r\right)\right)}$',size='18')
plt.xticks(fontsize='14')
plt.yticks(fontsize='14')

# Plotting the reference curve in a desired location
ax_inset = plt.axes([0.6, 0.2, 0.25, 0.25])  # [left, bottom, width, height]
ax_inset.plot(ref_d, ref_D, 'o-', color=good_colors[2], label='Reference Curve')
ax_inset.set_xlabel('d',size='14')
ax_inset.set_ylabel('D',size='14')

# Saving and presenting figure
plt.savefig('GPCV.png', bbox_inches='tight')
plt.show()

# Autoencoders

Implementation of autoencoders as a third method of data ID estimation which was described under section 3.3.3. The first part was used for hyperparameter tuning of both the single hidden layer and three hidden layer autoencoders. The following parts present the results for the two different autoencoders using values obtained through the optimization process.

## Hyperparameter Tuning

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features):
        self.features = features

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        return feature

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer,device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct = 0, 0
    for batch, X in enumerate(dataloader):
        X = X.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, X)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X in dataloader:
            X = X.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, X).item()
    test_loss /= num_batches
    return test_loss

In [None]:
# A PyTorch based class for a three hidden layer autoencoder with no encoding to and decoding from a lower dimension
class AE(nn.Module):
    def __init__(self):
        super().__init__()
        # Structure and activation functions
        self.hidden1 = nn.Linear(30, 30)
        self.act1 = nn.ReLU()
        self.hidden2 = nn.Linear(30, 30)
        self.act2 = nn.ReLU()
        self.hidden3 = nn.Linear(30, 30)
        self.act3 = nn.ReLU()
        self.output = nn.Linear(30, 30)
        self.act_output = nn.Identity()


    def forward(self, x):
        x = self.act1(self.hidden1(x))
        x = self.act2(self.hidden2(x))
        x = self.act3(self.hidden3(x))
        x = self.act_output(self.output(x))
        return x

In [None]:
# A PyTorch based class for a single hidden layer autoencoder with no encoding to and decoding from a lower dimension
class AE(nn.Module):
    def __init__(self):
        super().__init__()
        # Structure and activation functions
        self.hidden = nn.Linear(30, 30)
        self.act = nn.ReLU()
        self.output = nn.Linear(30, 30)


    def forward(self, x):
        x = self.act(self.hidden(x))
        x = self.output(x)
        return x

In [None]:
# Function for training over 200 epochs with an early-stopping condition over the validation loss
def train_and_evaluate(model, learning_rate, batch_size, train_dataset, val_dataset, device):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))
    best_loss = + np.inf   # init to positive infinity
    loss_fn = nn.MSELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer, device)
        test_loss = test(val_loader, model, loss_fn, device)
        if test_loss < best_loss:
              best_loss = test_loss

    return best_loss

In [None]:
# Function to read data into objects that PyTorch can handle - it is required to be in function form for
# compatibility with the optuna library
def load_data():
    # Reading
    df3 = pd.read_csv('Breast_cancer.csv')
    df4 = df3.drop(['id','diagnosis','Unnamed: 32'], axis = 1)

    # Splitting test and validation
    X_train,X_val = train_test_split(df4,test_size=0.3,random_state=42)

    # Normalizing
    for column in df4.columns:
        mean = X_train[column].mean()
        std = X_train[column].std()
        X_train[column] = (X_train[column]-mean)/std
        X_val[column] = (X_val[column]-mean)/std

    # Transforming data to a Pytorch form
    X_train = torch.tensor(X_train.values, dtype=torch.float32)
    X_val = torch.tensor(X_val.values, dtype=torch.float32)

    train_dataset = CustomDataset(X_train)
    val_dataset = CustomDataset(X_val)
    return train_dataset, val_dataset

In [None]:
# Defining the objective function used by optuna to optimize the hyperparameters
def objective(trial):
    train_data, val_data = load_data()
    
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_int('batch_size', 4, 64, log=True)
    # batch_size = trial.suggest_categorical('batch_size', [1, 2, 3, 4])

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AE().to(device)
    loss = train_and_evaluate(model, learning_rate, batch_size, train_data, val_data, device)

    return loss

In [None]:
# Performing optimization using optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

best_params = study.best_params
print(best_params)

## ID Estimation

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features):
        self.features = features

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        return feature

In [None]:
# Data prep

# Reading
df3 = pd.read_csv('Breast_cancer.csv')
df4 = df3.drop(['id','diagnosis','Unnamed: 32'], axis = 1)

# Splitting test and validation
X_train,X_val = train_test_split(df4,test_size=0.3,random_state=42)

# Normalizing
for column in df4.columns:
    mean = X_train[column].mean()
    std = X_train[column].std()
    X_train[column] = (X_train[column]-mean)/std
    X_val[column] = (X_val[column]-mean)/std

# Transforming data to a Pytorch form
X_train = torch.tensor(X_train.values, dtype=torch.float32)
X_val = torch.tensor(X_val.values, dtype=torch.float32)

# batch_size = 4 # For autoencoder-3
batch_size = 3 # For autoencoder-1
train_dataset = CustomDataset(X_train)
val_dataset = CustomDataset(X_val)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct = 0, 0
    for batch, X in enumerate(dataloader):
        X = X.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, X)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, EV = 0, 0
    with torch.no_grad():
        for X in dataloader:
            X = X.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, X).item()
            EV += explained_variance_score(pred.cpu(), X.cpu()) # Explained variance calculation
    test_loss /= num_batches
    EV /= num_batches
    return test_loss, EV

### Autoencoder-3

Estimating ID using a three hidden layer autoencoder.

In [None]:
# A PyTorch based class for a three hidden layer autoencoder, with `hid1` and `hid2` controlling the level of
# encoding and decoding
class AE(nn.Module):
    def __init__(self,hid1,hid2):
        super().__init__()
        # Structure and activation functions
        self.hidden1 = nn.Linear(30, hid1)
        self.act1 = nn.ReLU()
        self.hidden2 = nn.Linear(hid1, hid2)
        self.act2 = nn.ReLU()
        self.hidden3 = nn.Linear(hid2, hid1)
        self.act3 = nn.ReLU()
        self.output = nn.Linear(hid1, 30)
        self.act_output = nn.Identity()


    def forward(self, x):
        x = self.act1(self.hidden1(x))
        x = self.act2(self.hidden2(x))
        x = self.act3(self.hidden3(x))
        x = self.act_output(self.output(x))
        return x

In [None]:
# Calculating the explained variance as a function of the narrowest layer dimension

EV = [] # A list to hold explained variance values
dim = [] # A list to hold the narrowest layer dimensions
learning_rate = 0.0014283641459297775
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))
n_epochs = 200


is_not_done = 1
hid1 = 30 # Starting with no encoding to or decoding from a lower dimension
hid2 = 30
cond1 = 1

while is_not_done:
    # Training an Autoencoder and storing the explained variance achieved
    model = AE(hid1,hid2).to(device)
    loss_fn = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_loss = + np.inf
    best_EV = + np.inf
    for epoch in range(n_epochs):
        train(train_loader, model, loss_fn, optimizer)
        epoch_loss, EV_temp = test(val_loader, model, loss_fn)
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            best_EV = EV_temp
    dim_temp = hid2
    print('EV = ' + str(best_EV) + ', dim = ' + str(hid2))

    # When we cross the 0.95 threshold, we check if hid2+1 allows more than 0.95 explained variance
    if best_EV < 0.95 and cond1:
        cond1 = 0
        model = AE(hid1,hid2+1).to(device)
        loss_fn = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        best_loss = + np.inf
        new_EV = + np.inf
        for epoch in range(n_epochs):
            train(train_loader, model, loss_fn, optimizer)
            epoch_loss, EV_temp = test(val_loader, model, loss_fn)
            if epoch_loss < best_loss:
                best_loss = epoch_loss
                new_EV = EV_temp
        EV.append(new_EV)
        dim.append(hid2+1)
        print('EV = ' + str(EV[-1]) + ', dim = ' + str(dim[-1]))

    # When we cross the 0.90 threshold, we check if hid2+1 allows more than 0.9 explained variance
    if best_EV < 0.90:
        is_not_done = 0 # We terminate the while loop after this one last calculation
        model = AE(hid1,hid2+1).to(device)
        loss_fn = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        best_loss = + np.inf
        new_EV = + np.inf
        for epoch in range(n_epochs):
            train(train_loader, model, loss_fn, optimizer)
            epoch_loss, EV_temp = test(val_loader, model, loss_fn)
            if epoch_loss < best_loss:
                best_loss = epoch_loss
                new_EV = EV_temp
        EV.append(new_EV)
        dim.append(hid2+1)
        print('EV = ' + str(EV[-1]) + ', dim = ' + str(dim[-1]))
    
    # Saving the values from *before* the if loops so we get the values in the correct order 
    EV.append(best_EV)
    dim.append(dim_temp)
    
    # Lowering the dimension
    hid1 = hid1 - 1
    hid2 = hid2 - 2

In [None]:
# Saving the results
dict1 = dict()
dict1['EV'] = EV
dict1['dim'] = dim
df = pd.DataFrame.from_dict(dict1)
df.to_csv('Autoencoder-3.csv',index=False)

In [None]:
# Plotting the results

df = pd.read_csv('Autoencoder-3.csv') 

x = [min(df['dim']), 30]
threshold90 = [0.90,0.90]
threshold95 = [0.95,0.95]

plt.figure(figsize=(8,6))
plt.plot(df['dim'],df['EV'], 'o-', color = good_colors[0])
plt.plot(x,threshold90,'--', color='red')
plt.plot(x,threshold95,'--', color='darkred')
plt.xlabel('Narrow Layer\'s Width', fontsize = '18')
plt.xticks(fontsize = '14')
plt.ylabel('Explained Variance', fontsize = '18')
plt.yticks(fontsize = '14')
plt.grid()
plt.savefig('AE-3.png', bbox_inches='tight')
plt.show()

### Autoencoder-1

Estimating ID using a single hidden layer autoencoder.

In [None]:
# A PyTorch based class for a single hidden layer autoencoder, with `hid` controlling the level of encoding and
# decoding
class AE(nn.Module):
    def __init__(self,hid):
        super().__init__()
        # Structure and activation functions
        self.hidden = nn.Linear(30, hid)
        self.act = nn.ReLU()
        self.output = nn.Linear(hid, 30)


    def forward(self, x):
        x = self.act(self.hidden(x))
        x = self.output(x)
        return x

In [None]:
# Calculating the explained variance as a function of the narrowest layer dimension

EV = [] # A list to hold explained variance values
dim = [] # A list to hold the narrowest layer dimensions
learning_rate = 0.0020056404664158065
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))
n_epochs = 200


is_not_done = 1 # Starting with no encoding to or decoding from a lower dimension
hid = 30

while is_not_done:
    # Training an Autoencoder and storing the explained variance achieved
    model = AE(hid).to(device)
    loss_fn = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_loss = + np.inf
    best_EV = + np.inf
    for epoch in range(n_epochs):
        train(train_loader, model, loss_fn, optimizer)
        epoch_loss, EV_temp = test(val_loader, model, loss_fn)
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            best_EV = EV_temp
    dim_temp = hid
    print('EV = ' + str(best_EV) + ', dim = ' + str(hid))

    # When we cross the 0.90 threshold, we stop the calculation
    if best_EV < 0.90:
        is_not_done = 0
        
    EV.append(best_EV)
    dim.append(dim_temp)
    # Lowering the dimension
    hid = hid - 1

In [None]:
# Saving the results
dict1 = dict()
dict1['EV'] = EV
dict1['dim'] = dim
df = pd.DataFrame.from_dict(dict1)
df.to_csv('Autoencoder-1.csv',index=False)

In [None]:
# Plotting the results

df = pd.read_csv('Autoencoder-1.csv') 

x = [min(df['dim']), 30]
threshold90 = [0.90,0.90]
threshold95 = [0.95,0.95]

plt.figure(figsize=(8,6))
plt.plot(df['dim'],df['EV'], 'o-', color = good_colors[0])
plt.plot(x,threshold90,'--', color='red')
plt.plot(x,threshold95,'--', color='darkred')
plt.xlabel('Narrow Layer\'s Width', fontsize = '18')
plt.xticks(fontsize = '14')
plt.ylabel('Explained Variance', fontsize = '18')
plt.yticks(fontsize = '14')
plt.grid()
plt.savefig('AE-1.png', bbox_inches='tight')
plt.show()

# Li et al. Method

In the following we present the implementation of the Li et al. method used for neural network ID estimation as was described in section 3.3.4 based on the original work by Li et al. from 2018 (https://arxiv.org/abs/1804.08838). The first part deals with hyperparameter tuning - including the dimensions of a fully connected network architecture that would be used as the basis for the employment of the Li et al. method. The following part deals with the ID estimation itself.

## Hyperparameter Tuning

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn,device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture. We do not use the "regular" means
# of creating a PyTorch model in order to be uniform with the manner in which we later define the class of 
# "Li models" where we found that this "manual" model construction is required for the sub-space training 
# instead of training using all model parameters
class RegModel(nn.Module):
    def __init__(self, hid1, hid2):
        super(RegModel, self).__init__()
        self.weight1 = nn.Parameter(torch.empty(hid1, 30), requires_grad = True)
        nn.init.xavier_uniform_(self.weight1)
        self.bias1 = nn.Parameter(torch.randn(hid1), requires_grad = True)
        self.weight2 = nn.Parameter(torch.empty(hid2, hid1), requires_grad = True)
        nn.init.xavier_uniform_(self.weight2)
        self.bias2 = nn.Parameter(torch.randn(hid2), requires_grad = True)
        self.weight3 = nn.Parameter(torch.empty(1, hid2), requires_grad = True)
        nn.init.xavier_uniform_(self.weight3)
        self.bias3 = nn.Parameter(torch.randn(1), requires_grad = True)

    def forward(self, x):
        x = torch.relu(torch.matmul(x, self.weight1.t()) + self.bias1)
        x = torch.relu(torch.matmul(x, self.weight2.t()) + self.bias2)
        x = torch.sigmoid(torch.matmul(x, self.weight3.t()) + self.bias3)
        return x

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, batch_size, train_dataset, val_dataset, device):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer, device)
        test_acc, loss = test(val_loader, model, loss_fn, device)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    print('Accuracy: ' + str(100*test_acc) + '%')
    return best_loss

In [None]:
# Function to read data into objects that PyTorch can handle - it is required to be in function form for
# compatibility with the optuna library
def load_data():
    # Reading
    df3 = pd.read_csv('Breast_cancer.csv')
    df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
    columns = list(df4.columns)
    columns.remove('diagnosis')

    # Encoding the labels
    class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
    df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

    # Splitting test and validation
    X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

    # Normalizing
    for column in columns:
        mean = X_train[column].mean()
        std = X_train[column].std()
        X_train[column] = (X_train[column]-mean)/std
        X_val[column] = (X_val[column]-mean)/std

    # Transforming data to a Pytorch form
    X_train = torch.tensor(X_train.values, dtype=torch.float32)
    X_val = torch.tensor(X_val.values, dtype=torch.float32)
    y_train = torch.tensor(y_train.values, dtype=torch.float32)
    y_val = torch.tensor(y_val.values, dtype=torch.float32)

    train_dataset = CustomDataset(X_train, y_train)
    val_dataset = CustomDataset(X_val, y_val)
    return train_dataset, val_dataset

In [None]:
# Defining the objective function used by optuna to optimize the hyperparameters
def objective(trial):
    train_data, val_data = load_data()
    
    learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-1, log=True)
    batch_size = trial.suggest_int('batch_size', 4, 64, log=True)
    hid1 = trial.suggest_int('hid1', 10, 100, log=True)
    hid2 = trial.suggest_int('hid2', 10, 100, log=True)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = RegModel(hid1,hid2).to(device)
    loss = train_and_evaluate(model, learning_rate, batch_size, train_data, val_data, device)

    return loss

In [None]:
# Performing optimization using optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

best_params = study.best_params
print(best_params)

## ID Estimation

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Data prep

# Reading
df3 = pd.read_csv('Breast_cancer.csv')
df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
columns = list(df4.columns)
columns.remove('diagnosis')

# Encoding the labels
class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

# Splitting test and validation
X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

# Normalizing
for column in columns:
    mean = X_train[column].mean()
    std = X_train[column].std()
    X_train[column] = (X_train[column]-mean)/std
    X_val[column] = (X_val[column]-mean)/std

# Transforming data to a Pytorch form
X_train = torch.tensor(X_train.values, dtype=torch.float32)
X_val = torch.tensor(X_val.values, dtype=torch.float32)
y_train = torch.tensor(y_train.values, dtype=torch.float32)
y_val = torch.tensor(y_val.values, dtype=torch.float32)

batch_size = 7
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, train_loader, val_loader):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer)
        test_acc, loss = test(val_loader, model, loss_fn)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    return best_acc, best_loss

In [None]:
# A version of the previous function that matches a Li model - a model where the optimizer is only accessible 
# to the vector representing the sub-space
def train_and_evaluate_Li(model, learning_rate, train_loader, val_loader):
    optimizer = optim.SGD([model.u], lr=learning_rate)
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer)
        test_acc, loss = test(val_loader, model, loss_fn)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    return best_acc, best_loss

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture - same as the hyperparameter tuning part
class RegModel(nn.Module):
    def __init__(self):
        super(RegModel, self).__init__()
        self.weight1 = nn.Parameter(torch.empty(71, 30), requires_grad = True)
        nn.init.xavier_uniform_(self.weight1)
        self.bias1 = nn.Parameter(torch.randn(71), requires_grad = True)
        self.weight2 = nn.Parameter(torch.empty(30, 71), requires_grad = True)
        nn.init.xavier_uniform_(self.weight2)
        self.bias2 = nn.Parameter(torch.randn(30), requires_grad = True)
        self.weight3 = nn.Parameter(torch.empty(1, 30), requires_grad = True)
        nn.init.xavier_uniform_(self.weight3)
        self.bias3 = nn.Parameter(torch.randn(1), requires_grad = True)

    def forward(self, x):
        x = torch.relu(torch.matmul(x, self.weight1.t()) + self.bias1)
        x = torch.relu(torch.matmul(x, self.weight2.t()) + self.bias2)
        x = torch.sigmoid(torch.matmul(x, self.weight3.t()) + self.bias3)
        return x

In [None]:
# A PyTorch based class for a Li model - two hidden layer fully connected architecture but with additional 
# parameters: one that defines the sub-space where training takes place and 6 projection matrices that project
# the changes in the sub-space to the full architecture
class LiModel(nn.Module):
    def __init__(self, reduced_dim):
        super(LiModel, self).__init__()
        
        # The sub-space is initialized as a vector of zeros - it is important to do so since the sub-space
        # actually stands for *changes* from the network's initialization values and does not stand for the 
        # weights themselves
        self.reduced_dim = reduced_dim
        self.u = nn.Parameter(torch.zeros(reduced_dim), requires_grad = True)
        
        # The "regular" network
        self.weight1 = nn.Parameter(torch.empty(71, 30), requires_grad = False)
        nn.init.xavier_uniform_(self.weight1)
        self.bias1 = nn.Parameter(torch.randn(71), requires_grad = False)
        self.weight2 = nn.Parameter(torch.empty(30, 71), requires_grad = False)
        nn.init.xavier_uniform_(self.weight2)
        self.bias2 = nn.Parameter(torch.randn(30), requires_grad = False)
        self.weight3 = nn.Parameter(torch.empty(1, 30), requires_grad = False)
        nn.init.xavier_uniform_(self.weight3)
        self.bias3 = nn.Parameter(torch.randn(1), requires_grad = False)
        
        # Projection matrices from the sub-space to the full network
        self.P1 = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(71*30, reduced_dim)), requires_grad = False)
        self.P1B = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(71, reduced_dim)), requires_grad = False)
        self.P2 = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(30*71, reduced_dim)), requires_grad = False)
        self.P2B = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(30, reduced_dim)), requires_grad = False)
        self.P3 = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(30, reduced_dim)), requires_grad = False)
        self.P3B = nn.Parameter(torch.nn.init.orthogonal_(torch.empty(1, reduced_dim)), requires_grad = False)

    def forward(self, x):
        x = torch.relu(torch.matmul(x, self.weight1.t() + torch.matmul(self.u, self.P1.t()).view((30,-1))) + self.bias1 + torch.matmul(self.u, self.P1B.t()))
        x = torch.relu(torch.matmul(x, self.weight2.t() + torch.matmul(self.u, self.P2.t()).view((71,-1))) + self.bias2 + torch.matmul(self.u, self.P2B.t()))
        x = torch.sigmoid(torch.matmul(x, self.weight3.t() + torch.matmul(self.u, self.P3.t()).view((30,-1))) + self.bias3 + torch.matmul(self.u, self.P3B.t()))
        return x

In [None]:
# Generating results

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))

learning_rate = 0.006882672576552301
replicas = 30 # We use 30 replicas to have statistics of the results

# Training a "regular" network for defining a threshold for the Li et al. method
res = []
temp = [31*71 + 72*30 + 31]
for i in range(replicas):
    model = RegModel().to(device)
    acc, loss = train_and_evaluate(model, learning_rate, train_loader, val_loader)
    temp.append(acc)
    print("Replica " + str(i+1) + " of full model. Avg accuracy so far: " + str(100*sum(temp[1:])/len(temp[1:])) + "%.")
res.append(temp)

# Calculating the thresholds of ID definition and for stopping the computation
threshold = 0.9*sum(temp[1:])/len(temp[1:])
stopping = 0.95*sum(temp[1:])/len(temp[1:])

# Calculating the performance using the Li et al. method, incrementally enlarging the sub-space dimension
is_not_done = 1
current_parameters = 1
while is_not_done:
    temp = [current_parameters]
    for i in range(replicas):
        model = LiModel(current_parameters).to(device)
        acc, loss = train_and_evaluate_Li(model, learning_rate, train_loader, val_loader)
        temp.append(acc)
        print("Replica " + str(i+1) + " of a " + str(current_parameters) + " parameter model. Avg accuracy so far: " + str(100*sum(temp[1:])/len(temp[1:])) + "%.")
    res.append(temp)
    
    current_parameters = current_parameters + 1
    current_acc = sum(temp[1:])/len(temp[1:])
    if current_acc > stopping:
        is_not_done = 0

In [None]:
# Saving the results
df_Li = pd.DataFrame(np.array(res))
df_Li.to_csv("LiEtAl.csv")

In [None]:
# Plotting the results

df_Li = pd.read_csv('LiEtAl.csv', index_col=0)

# Calculating means and 95% confidence intervals for all sub-space sizes examined
mean = []
HW = [] # using a t-test to calculate half-width
parameters = []
for i in range(len(df_Li.iloc[1:,0])):
    parameters.append(df_Li.iloc[i+1,0])
    mean.append(np.mean(df_Li.iloc[i+1,1:]))
    
    # Half-width
    std = np.std(df_Li.iloc[i+1,1:], ddof=1)
    n = len(df_Li.iloc[i+1,1:])
    HW.append(stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n)))

x = [min(df_Li.iloc[1:,0]),max(df_Li.iloc[1:,0])]
y = [0.9*np.mean(df_Li.iloc[0,1:]),0.9*np.mean(df_Li.iloc[0,1:])]

# The actual plotting
plt.figure(figsize=(8, 6))
plt.plot(x, y, 'k--')
plt.errorbar(parameters, mean, yerr=[HW, HW], fmt='o-', capsize=4, color=good_colors[0])
plt.xlabel('Number of Tunable Parameters',size='18')
plt.ylabel('Validation Accuracy',size='18')
# plt.xscale('log')
plt.legend(['90% of Full Network Accuracy'], fontsize='14')
plt.xticks(fontsize='14')
plt.yticks(fontsize='14')
plt.grid()
plt.savefig('Breast_cancer_Lietal.png', bbox_inches="tight")
plt.show()

# Frankle & Carbin Method

In the following we present the implementation of the Frankle and Carbin method used for neural network ID estimation as was described in section 5.2 based on the original work by Frankle and Carbin from 2019 (https://arxiv.org/abs/1803.03635). The first part deals with hyperparameter tuning - including the dimensions of a fully connected network architecture that would be used as the basis for the employment of the Frankle and Carbin method. The following part deals with the ID estimation itself.

## Hyperparameter Tuning

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn,device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture
class MyModel(nn.Module):
    def __init__(self,hid1,hid2):
        super().__init__()
        self.fc1 = nn.Linear(30, hid1)
        self.fc2 = nn.Linear(hid1, hid2)
        self.fc3 = nn.Linear(hid2, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, batch_size, train_dataset, val_dataset, device):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer, device)
        test_acc, loss = test(val_loader, model, loss_fn, device)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    print('Accuracy: ' + str(100*test_acc) + '%')
    return best_loss

In [None]:
# Function to read data into objects that PyTorch can handle - it is required to be in function form for
# compatibility with the optuna library
def load_data():
    # Reading
    df3 = pd.read_csv('Breast_cancer.csv')
    df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
    columns = list(df4.columns)
    columns.remove('diagnosis')

    # Encoding the labels
    class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
    df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

    # Splitting test and validation
    X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

    # Normalizing
    for column in columns:
        mean = X_train[column].mean()
        std = X_train[column].std()
        X_train[column] = (X_train[column]-mean)/std
        X_val[column] = (X_val[column]-mean)/std

    # Transforming data to a Pytorch form
    X_train = torch.tensor(X_train.values, dtype=torch.float32)
    X_val = torch.tensor(X_val.values, dtype=torch.float32)
    y_train = torch.tensor(y_train.values, dtype=torch.float32)
    y_val = torch.tensor(y_val.values, dtype=torch.float32)

    train_dataset = CustomDataset(X_train, y_train)
    val_dataset = CustomDataset(X_val, y_val)
    return train_dataset, val_dataset

In [None]:
# Defining the objective function used by optuna to optimize the hyperparameters
def objective(trial):
    train_data, val_data = load_data()
    
    learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_int('batch_size', 4, 64, log=True)
    hid1 = trial.suggest_int('hid1', 10, 100, log=True)
    hid2 = trial.suggest_int('hid2', 10, 100, log=True)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = MyModel(hid1,hid2).to(device)
    loss = train_and_evaluate(model, learning_rate, batch_size, train_data, val_data, device)

    return loss

In [None]:
# Performing optimization using optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

best_params = study.best_params
print(best_params)

## ID Estimation

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Data prep

# Reading
df3 = pd.read_csv('Breast_cancer.csv')
df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
columns = list(df4.columns)
columns.remove('diagnosis')

# Encoding the labels
class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

# Splitting test and validation
X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

# Normalizing
for column in columns:
    mean = X_train[column].mean()
    std = X_train[column].std()
    X_train[column] = (X_train[column]-mean)/std
    X_val[column] = (X_val[column]-mean)/std

# Transforming data to a Pytorch form
X_train = torch.tensor(X_train.values, dtype=torch.float32)
X_val = torch.tensor(X_val.values, dtype=torch.float32)
y_train = torch.tensor(y_train.values, dtype=torch.float32)
y_val = torch.tensor(y_val.values, dtype=torch.float32)

batch_size = 8
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))

In [None]:
# Function for training

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, train_loader, val_loader):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer)
        test_acc, loss = test(val_loader, model, loss_fn)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    return best_acc, best_loss

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(30, 51)
        self.fc2 = nn.Linear(51, 39)
        self.fc3 = nn.Linear(39, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
# A function that verifies the amount of weights pruned and helps to keep track on the pruning percentage
def count_zeros(model):
    return torch.sum(model.fc1.weight.data == 0.0).item() + torch.sum(model.fc1.bias.data == 0.0).item() + torch.sum(model.fc2.weight.data == 0.0).item() + torch.sum(model.fc2.bias.data == 0.0).item() + torch.sum(model.fc3.weight.data == 0.0).item() + torch.sum(model.fc3.bias.data == 0.0).item()

In [None]:
# Generating results

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))
all_weights = 31*51 + 52*39 + 40 # For calculation of pruning percentage
remaining = []
val_accuracy = []
prunings = 52 # 25 - moderate, 52 - slow, 11 - fast (how many pruning iterations should be performed)

learning_rate = 0.0037994898049290024
replicas = 30 # We use 30 replicas to have statistics of the results
remaining = np.zeros([replicas,prunings+1])
val_accuracy = np.zeros([replicas,prunings+1])

for rep in range(replicas):
    model = MyModel().to(device)
    initial_state_dict = copy.deepcopy(model.state_dict())

    for i in range(prunings+1):
        acc, loss = train_and_evaluate(model, learning_rate, train_loader, val_loader)
        remaining[rep][i] = 100*(all_weights - count_zeros(model))/all_weights
        val_accuracy[rep][i] = acc
        print('Replica: ' + str(rep+1) + '. Remaining weights are ' + str(remaining[rep][i]) + '%. Validation accuracy is ' + str(acc*100) + '%.')
        
        # Apply pruning to the specified layers, changing the pruning rates by commenting and uncommenting lines
        to_prune = [(model.fc1, 'weight', 0.1), (model.fc2, 'weight', 0.1), (model.fc3, 'weight', 0.05)] # slow
        # to_prune = [(model.fc1, 'weight', 0.2), (model.fc2, 'weight', 0.2), (model.fc3, 'weight', 0.1)] # moderate
        # to_prune = [(model.fc1, 'weight', 0.4), (model.fc2, 'weight', 0.4), (model.fc3, 'weight', 0.2)] # fast
        for layer, param_name, pruning_perc in to_prune:
            prune.l1_unstructured(layer, name=param_name, amount=pruning_perc)
        
        # Upload original parameter values
        model.fc1.weight.data = initial_state_dict['fc1.weight']
        model.fc1.bias.data = initial_state_dict['fc1.bias']
        model.fc2.weight.data = initial_state_dict['fc2.weight']
        model.fc2.bias.data = initial_state_dict['fc2.bias']
        model.fc3.weight.data = initial_state_dict['fc3.weight']
        model.fc3.bias.data = initial_state_dict['fc3.bias']

In [None]:
# Save results (change name based on pruning rate used)
df_fast = pd.DataFrame(val_accuracy)
df_fast.loc[len(df_fast.index)] = remaining[0]
df_fast.to_csv("Fast.csv")

In [None]:
# One shot pruning

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))
all_weights = 31*51 + 52*39 + 40 # For calculation of pruning percentage
remaining = []
val_accuracy = []
prunings = 11 # Number of pruning for One Shot always matched the one used for fast rate pruning

learning_rate = 0.0037994898049290024
replicas = 30 # We use 30 replicas to have statistics of the results
remaining = np.zeros([replicas,prunings+1])
val_accuracy = np.zeros([replicas,prunings+1])

for rep in range(replicas):
    model = MyModel().to(device)
    initial_state_dict = copy.deepcopy(model.state_dict())
    
    # Before pruining
    model_temp = MyModel().to(device)
    model_temp.load_state_dict(initial_state_dict)
    acc, loss = train_and_evaluate(model_temp, learning_rate, train_loader, val_loader, device)
    remaining[rep][0] = 100*(all_weights - count_zeros(model_temp))/all_weights # Should be 100, this is a sanity check
    val_accuracy[rep][0] = acc
    print('Replica: ' + str(rep+1) + '. Remaining weights are ' + str(remaining[rep][0]) + '%. Validation accuracy is ' + str(acc*100) + '%.')
    
    # Including prunings
    for i in range(prunings):
        # Training a new model
        model_temp = MyModel().to(device)
        model_temp.load_state_dict(initial_state_dict)
        acc, loss = train_and_evaluate(model_temp, learning_rate, train_loader, val_loader, device)
        
        # One Shot pruning
        mid_layer_prune = 1 - 0.6**(i+1)
        out_layer_prune = 1 - 0.8**(i+1)
        to_prune = [(model_temp.fc1, 'weight', mid_layer_prune), (model_temp.fc2, 'weight', mid_layer_prune), (model_temp.fc3, 'weight', out_layer_prune)]
        for layer, param_name, pruning_perc in to_prune:
            prune.l1_unstructured(layer, name=param_name, amount=pruning_perc)
            
        # Upload original parameter values
        model_temp.fc1.weight.data = initial_state_dict['fc1.weight']
        model_temp.fc1.bias.data = initial_state_dict['fc1.bias']
        model_temp.fc2.weight.data = initial_state_dict['fc2.weight']
        model_temp.fc2.bias.data = initial_state_dict['fc2.bias']
        model_temp.fc3.weight.data = initial_state_dict['fc3.weight']
        model_temp.fc3.bias.data = initial_state_dict['fc3.bias']
        
        # Training the post-pruning model
        acc, loss = train_and_evaluate(model_temp, learning_rate, train_loader, val_loader, device)
        remaining[rep][i+1] = 100*(all_weights - count_zeros(model_temp))/all_weights
        val_accuracy[rep][i+1] = acc
        print('Replica: ' + str(rep+1) + '. Remaining weights are ' + str(remaining[rep][i+1]) + '%. Validation accuracy is ' + str(acc*100) + '%.')

In [None]:
# Save results
df_one = pd.DataFrame(val_accuracy)
df_one.loc[len(df_one.index)] = remaining[0]
df_one.to_csv("OneShot.csv")

In [None]:
# A function used for plotting the x ticks as desired
def format_tick(x, pos):
    return "{:.2f}".format(x)

In [None]:
# Plotting

# Reading data
df_one = pd.read_csv('OneShot.csv', index_col=0)
df_slow = pd.read_csv('Slow.csv', index_col=0)
df_mod = pd.read_csv('Moderate.csv', index_col=0)
df_fast = pd.read_csv('Fast.csv', index_col=0)

all_weights = 31*51 + 52*39 + 40

# Calculating confidence intervals for all pruning options
weights_one = []
mean_one = []
std_one = []
for i in range(len(df_one.columns)):
    mean_one.append(np.mean(df_one.iloc[:30,i]))
    std_one.append(np.std(df_one.iloc[:30,i]))
    weights_one.append(df_one.iloc[df_one.index[-1],i])

LB_one = []
for i in range(len(df_one.columns)):
    # Half-width
    std = np.std(df_one.iloc[:30,i], ddof=1)
    n = len(df_one.iloc[:30,i])
    HW = stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n))
    LB_one.append(mean_one[i]-HW)
    
weights_slow = []
mean_slow = []
std_slow = []
for i in range(len(df_slow.columns)):
    mean_slow.append(np.mean(df_slow.iloc[:30,i]))
    std_slow.append(np.std(df_slow.iloc[:30,i]))
    weights_slow.append(df_slow.iloc[df_slow.index[-1],i])

LB_slow = []
for i in range(len(df_slow.columns)):
    # Half-width
    std = np.std(df_slow.iloc[:30,i], ddof=1)
    n = len(df_slow.iloc[:30,i])
    HW = stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n))
    LB_slow.append(mean_slow[i]-HW)
    
weights_mod = []
mean_mod = []
std_mod = []
for i in range(len(df_mod.columns)):
    mean_mod.append(np.mean(df_mod.iloc[:30,i]))
    std_mod.append(np.std(df_mod.iloc[:30,i]))
    weights_mod.append(df_mod.iloc[df_mod.index[-1],i])

LB_mod = []
for i in range(len(df_mod.columns)):
    # Half-width
    std = np.std(df_mod.iloc[:30,i], ddof=1)
    n = len(df_mod.iloc[:30,i])
    HW = stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n))
    LB_mod.append(mean_mod[i]-HW)

weights_fast = []
mean_fast = []
std_fast = []
for i in range(len(df_fast.columns)):
    mean_fast.append(np.mean(df_fast.iloc[:30,i]))
    std_fast.append(np.std(df_fast.iloc[:30,i]))
    weights_fast.append(df_fast.iloc[df_fast.index[-1],i])

LB_fast = []
for i in range(len(df_fast.columns)):
    # Half-width
    std = np.std(df_fast.iloc[:30,i], ddof=1)
    n = len(df_fast.iloc[:30,i])
    HW = stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n))
    LB_fast.append(mean_fast[i]-HW)

# The actual plotting
TH = 0.9*mean_slow[0]
plt.figure(figsize=(8, 6))
plt.errorbar(weights_slow, mean_slow, yerr=[std_slow, std_slow], fmt='o-', capsize=4, color=good_colors[0])
plt.errorbar(weights_mod, mean_mod, yerr=[std_mod, std_mod], fmt='o-', capsize=4, color=good_colors[1])
plt.errorbar(weights_fast, mean_fast, yerr=[std_fast, std_fast], fmt='o-', capsize=4, color=good_colors[2])
plt.errorbar(weights_one, mean_one, yerr=[std_one, std_one], fmt='o-', capsize=4, color=good_colors[3])
plt.plot([min(min(weights_slow),min(weights_mod),min(weights_fast),min(weights_one)),100],[TH,TH],'k--')
plt.xlabel('Remaining Weights [%]',size='18')
plt.ylabel('Validation Accuracy',size='18')
plt.xscale('log')
plt.legend(['90% of Full Network Accuracy','Iterative Pruining - Slow Rate', 'Iterative Pruining - Moderate Rate', 'Iterative Pruining - Fast Rate', 'One Shot Pruining'], fontsize='14')
ax = plt.gca()
ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_tick))
ax.invert_xaxis()
plt.xticks(fontsize='14')
plt.yticks(fontsize='14')
plt.grid()
plt.savefig('LTH_no_zoom.png', bbox_inches="tight")
plt.show()    

# Printing the ID estimates by comparing to the 90% threshold
for i in range(len(weights_one)):
    if LB_one[i] < TH:
        print('For One shot ID is at ' + str(weights_one[i]) + '-' + str(weights_one[i-1]) + ' with ' + str(weights_one[i]*all_weights/100) + '-' + str(weights_one[i-1]*all_weights/100) + ' parameters')
        break
for i in range(len(weights_fast)):
    if LB_fast[i] < TH:
        print('For fast ID is at ' + str(weights_fast[i]) + '-' + str(weights_fast[i-1]) + ' with ' + str(weights_fast[i]*all_weights/100) + '-' + str(weights_fast[i-1]*all_weights/100) + ' parameters')
        break
for i in range(len(weights_mod)):
    if LB_mod[i] < TH:
        print('For mod ID is at ' + str(weights_mod[i]) + '-' + str(weights_mod[i-1]) + ' with ' + str(weights_mod[i]*all_weights/100) + '-' + str(weights_mod[i-1]*all_weights/100) + ' parameters')
        break
for i in range(len(weights_slow)):
    if LB_slow[i] < TH:
        print('For slow ID is at ' + str(weights_slow[i]) + '-' + str(weights_slow[i-1]) + ' with ' + str(weights_slow[i]*all_weights/100) + '-' + str(weights_slow[i-1]*all_weights/100) + ' parameters')
        break

# Comparison to Fully Connected models

In the following we simple fully connected models to compare to the resulting models from emplying the Li et al. and Frankle and Carbin methods, as described in section 5.1. Per usual, the first part presents the hyperparameter tuning and the following part the results acquisition.

In [None]:
# Defining the dimensions of the fully connected model
hid1 = 4
hid2 = 3

## Hyperparameter Tuning

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn,device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, batch_size, train_dataset, val_dataset, device):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer, device)
        test_acc, loss = test(val_loader, model, loss_fn, device)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    print('Accuracy: ' + str(100*test_acc) + '%')
    return best_loss

In [None]:
# Function to read data into objects that PyTorch can handle - it is required to be in function form for
# compatibility with the optuna library
def load_data():
    # Reading
    df3 = pd.read_csv('Breast_cancer.csv')
    df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
    columns = list(df4.columns)
    columns.remove('diagnosis')

    # Encoding the labels
    class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
    df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

    # Splitting test and validation
    X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

    # Normalizing
    for column in columns:
        mean = X_train[column].mean()
        std = X_train[column].std()
        X_train[column] = (X_train[column]-mean)/std
        X_val[column] = (X_val[column]-mean)/std

    # Transforming data to a Pytorch form
    X_train = torch.tensor(X_train.values, dtype=torch.float32)
    X_val = torch.tensor(X_val.values, dtype=torch.float32)
    y_train = torch.tensor(y_train.values, dtype=torch.float32)
    y_val = torch.tensor(y_val.values, dtype=torch.float32)

    train_dataset = CustomDataset(X_train, y_train)
    val_dataset = CustomDataset(X_val, y_val)
    return train_dataset, val_dataset

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(30, hid1)
        self.fc2 = nn.Linear(hid1, hid2)
        self.fc3 = nn.Linear(hid2, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
# Defining the objective function used by optuna to optimize the hyperparameters
def objective(trial):
    train_data, val_data = load_data()
    
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_int('batch_size', 4, 64, log=True)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = MyModel().to(device)
    loss = train_and_evaluate(model, learning_rate, batch_size, train_data, val_data, device)

    return loss

In [None]:
# Performing optimization using optuna
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

best_params = study.best_params
print(best_params)

## Results

In [None]:
# Defining a class that assists with feeding data into PyTorch
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label

In [None]:
# Data prep

# Reading
df3 = pd.read_csv('Breast_cancer.csv')
df4 = df3.drop(['id','Unnamed: 32'], axis = 1)
columns = list(df4.columns)
columns.remove('diagnosis')

# Encoding the labels
class_to_idx = {class_label: idx for idx, class_label in enumerate(df4['diagnosis'].unique())}
df4['label_idx'] = df4['diagnosis'].map(class_to_idx)

# Splitting test and validation
X_train, X_val, y_train, y_val = train_test_split(df4[columns],df4['label_idx'],test_size=0.3,random_state=42)

# Normalizing
for column in columns:
    mean = X_train[column].mean()
    std = X_train[column].std()
    X_train[column] = (X_train[column]-mean)/std
    X_val[column] = (X_val[column]-mean)/std

# Transforming data to a Pytorch form
X_train = torch.tensor(X_train.values, dtype=torch.float32)
X_val = torch.tensor(X_val.values, dtype=torch.float32)
y_train = torch.tensor(y_train.values, dtype=torch.float32)
y_val = torch.tensor(y_val.values, dtype=torch.float32)

batch_size = 7 
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset))

In [None]:
# Function for training
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.view(-1, 1))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

In [None]:
# Function for testing
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y.view(-1, 1))
            pred = pred.round()
            
            correct += (pred == y.view(-1, 1)).sum().item()
            test_loss += loss.item()
    correct /= size
    test_loss /= num_batches
    return correct, test_loss

In [None]:
# Function for 200 epoch training and evaluation with an early stopping condition of minimum validation loss
def train_and_evaluate(model, learning_rate, train_loader, val_loader):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_acc = - np.inf   # init to negative infinity
    best_loss = + np.inf # init to positive infinity
    loss_fn = nn.BCELoss()
    for epoch in range(200):
        train(train_loader, model, loss_fn, optimizer)
        test_acc, loss = test(val_loader, model, loss_fn)
        if loss < best_loss:
            best_loss = loss
            best_acc = test_acc
    return best_acc, best_loss

In [None]:
# A PyTorch based class for a two hidden layer fully connected architecture
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(30, hid1)
        self.fc2 = nn.Linear(hid1, hid2)
        self.fc3 = nn.Linear(hid2, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
# Generating results

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(device))

learning_rate = 0.005379133453785
replicas = 30 # We use 30 replicas to have statistics of the results
res = []

for rep in range(replicas):
    model = MyModel().to(device)
    acc, loss = train_and_evaluate(model, learning_rate, train_loader, val_loader)
    res.append(acc)
    print('Replica: ' + str(rep+1) + '. Validation accuracy is ' + str(acc*100) + '%.')

In [None]:
# Saving the results and printing the 95% confidence interval of the accuracy
df = pd.DataFrame(res)
df.to_csv("FC_143_weights.csv")
print(np.mean(res))
std = np.std(res, ddof=1)
n = len(res)
print(stats.t.ppf(1 - (1 - 0.95) / 2, n-1) * (std / np.sqrt(n)))