In [222]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
import torch
import torch.nn as nn
import torch
from torch import nn
from itertools import product
from monk_helpers import CV,SEED
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report
import time




Create reproducible results

In [223]:
torch.manual_seed(SEED)
np.random.seed(SEED)
gen = torch.Generator().manual_seed(SEED)
SEEDS = list(range(40,46))


In [224]:
# Datasets Path
TR_PATH_1 = "./monks/datasets/monks-1.train"
TS_PATH_1 = "./monks/datasets/monks-1.test"
# Datasets Path
TR_PATH_2 = "./monks/datasets/monks-2.train"
TS_PATH_2 = "./monks/datasets/monks-2.test"
# Datasets Path
TR_PATH_3 = "./monks/datasets/monks-3.train"
TS_PATH_3 = "./monks/datasets/monks-3.test"

In [225]:

def plot_graph(train_losses,losses,epochs,title):
    num_epochs = list(range(0, epochs))  
    # Plotting
    plt.plot(num_epochs, train_losses, label=' Training',linestyle='-')
    plt.plot(num_epochs, losses, label=title+' MSE',linestyle='--')

    plt.title('Training and '+title+' Losses Across Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('MSE')
    plt.legend()
    plt.show()

In [226]:

def plot_graph_accuracy(train_accuracies, accuracies, epochs):
    num_epochs = list(range(0, epochs))
    
    # Plotting
    print("TRAIN ACCURACY:", train_accuracies[-1])
    plt.plot(num_epochs, train_accuracies, label='Training Accuracy',linestyle='-')
    plt.plot(num_epochs, accuracies, label='Test Accuracy',linestyle='--')

    plt.title('Training and Test Accuracies Across Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

In [227]:
class Net(nn.Module):

  def __init__(self, input_size, units, output_size):
    super().__init__()
    self.units = units

    self.l1 = nn.Linear(input_size, units)
    self.l2 = nn.Linear(units, output_size)

  def forward(self, x):
    out = torch.tanh(self.l1(x))
    out = torch.sigmoid(self.l2(out))
    return out


def reset_weights(net):
  for param in net.parameters():
    torch.nn.init.uniform_(param, a=-0.7, b=0.7)

In [228]:
def read_ds(path):
  """
  parse CSV data set and
  returns a tuple (input, target)
  """
  names = ['class', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'id']
  data = pd.read_csv(path, dtype=object, delim_whitespace=True, header=None, skipinitialspace=True, names=names)

  X = data.drop(['class','id'], axis=1)
  X = pd.get_dummies(X).astype(float).to_numpy()
  y = data.drop(['a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'id'], axis=1)
  y = y.astype(float).to_numpy()


  return np.concatenate((y, X), axis=1)

In [229]:
class EarlyStopper:
  def __init__(self, epochs_to_wait=1, min_delta=0):
    self.min_training_loss = np.inf
    self.epochs_to_wait = epochs_to_wait
    self.min_delta = min_delta
    self.counter = 0

  def check_early_stop(self, training_loss):
    if training_loss > (self.min_training_loss - self.min_delta):
      self.counter +=1
      if self.counter >= self.epochs_to_wait:
        return True
    else: 
      self.counter = 0
    if training_loss < self.min_training_loss:
      self.min_training_loss = training_loss
    
    return False

In [230]:
def Padding(validation_losses_fold,train_losses_fold):
    max_epochs = max(map(len, validation_losses_fold))

    for validation_loss_arr in validation_losses_fold:
        while len(validation_loss_arr) < max_epochs:
            validation_loss_arr.append(validation_loss_arr[-1])

    for train_loss_arr in train_losses_fold:
      while len(train_loss_arr) < max_epochs:
            train_loss_arr.append(train_loss_arr[-1])

    print(len(validation_losses_fold))
    return validation_losses_fold,train_losses_fold

In [231]:
def Mean(validation_avg_loss_fold,train_losses_fold,n_folds):
    max_epochs = max(map(len, validation_avg_loss_fold))
    validation_avg_loss = []
    train_avg_loss = []
    
    for i in range(0,max_epochs):
        temp_loss = 0
        for j in range(0,len(validation_avg_loss_fold)):
            temp_loss += validation_avg_loss_fold[j][i]
        validation_avg_loss.append(temp_loss/n_folds)

    for i in range(0,max_epochs):
        temp_loss = 0
        for j in range(0,len(train_losses_fold)):
            temp_loss += train_losses_fold[j][i]
        train_avg_loss.append(temp_loss/n_folds)
    
    return validation_avg_loss,train_avg_loss
    

    

In [232]:
def fit_model(input_size,hidden_size,output_size,learning_rate,momentum,weight_decay,opt,epochs,trainloader,loss_function,testloader):
    # Init the neural network
    network = Net(input_size, hidden_size, output_size)
    network.apply(reset_weights) #reset weights with random initialization

    optimizer = opt(network.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
    

    
    train_losses = []
    test_losses = []


    
    epoch_train_accuracies = []
    epoch_test_accuracies = []

    early_stopper = EarlyStopper(epochs_to_wait=40, min_delta=1e-5)

    # Run the training loop for defined number of epochs
    for epoch in range(0, epochs):

      print(f'Starting epoch {epoch+1}')

      # Set current loss and accuracy value for train
      train_loss = 0.0
      epoch_train_accuracy = 0.0

      # Set current loss and accuracy value for test
      test_loss = 0.0
      epoch_test_accuracy = 0.0


      # Iterate over the DataLoader for training data
      for i, data in enumerate(trainloader, 0):
        # Get inputs
        inputs = data[:, 1:].to(torch.float32)
        targets = data[:, [0]].to(torch.float32)

        # Zero the gradients
        optimizer.zero_grad() 

        
        # Perform forward pass
        outputs = network(inputs)

        # Compute loss
        loss = loss_function(outputs, targets)
        
        # Perform backward pass
        loss.backward()
        
        # Perform optimization
        optimizer.step()
        
        # Print statistics
        train_loss += loss.item() * inputs.size(0)

        # Update accuracy
        for output, target in zip(outputs, targets):
          output = 0 if output.item() < 0.5 else 1
          if output == target.item():
            epoch_train_accuracy += 1

      # Print loss values
      epoch_train_loss = train_loss / len(trainloader.sampler)
      train_losses.append(epoch_train_loss)

      epoch_train_accuracy /= len(trainloader.sampler)

      epoch_train_accuracies.append(epoch_train_accuracy)

      
      with torch.no_grad():
        # Iterate over the testing data and generate predictions
        for i, data in enumerate(testloader, 0):

          inputs = data[:, 1:].to(torch.float32)
          targets = data[:, [0]].to(torch.float32)
        
          outputs = network(inputs)

          loss = loss_function(outputs, targets)
          
          test_loss += loss.item() * inputs.size(0)
          # Update accuracy
          for output, target in zip(outputs, targets):
            output = 0 if output.item() < 0.5 else 1
            if output == target.item():
              epoch_test_accuracy += 1

        epoch_test_loss = test_loss / len(testloader.sampler)    
        test_losses.append(epoch_test_loss)


        
        epoch_test_accuracy /= len(testloader.sampler)

        epoch_test_accuracies.append(epoch_test_accuracy)
        
        print(epoch_test_loss)
        if early_stopper.check_early_stop(epoch_test_loss):
          print("Early stopping: ",epoch)
          break

    plot_graph(train_losses,test_losses,epoch+1,"test")
    plot_graph_accuracy(epoch_train_accuracies,epoch_test_accuracies,epoch+1)

    return network,train_losses[-1],test_losses[-1]


In [233]:
def k_fold_model(input_size,hidden_size,output_size,learning_rate,momentum,weight_decay,opt,epochs,trainloader,loss_function,validationloader):
    
    # Init the neural network
    network = Net(input_size, hidden_size, output_size)
    network.apply(reset_weights) #reset weights with random initialization
    
    optimizer = torch.optim.SGD(network.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)


    early_stopper = EarlyStopper(epochs_to_wait=40, min_delta=1e-5)
    train_losses = []
    validaition_losses = []

    # Run the training loop for defined number of epochs
    for epoch in range(0, epochs):

      # Set current loss value
      train_loss = 0.0
      # Iterate over the DataLoader for training data
      for i, data in enumerate(trainloader, 0):
        # Get inputs
        inputs = data[:, 1:].to(torch.float32)
        targets = data[:, [0]].to(torch.float32)

        # Zero the gradients
        optimizer.zero_grad() 
        
        # Perform forward pass
        outputs = network(inputs)

        # Compute loss
        loss = loss_function(outputs, targets)
        # Perform backward pass
        loss.backward()
        
        # Perform optimization
        optimizer.step()
        
        # Print statistics
        train_loss += loss.item() * inputs.size(0)

      avg_train_loss = train_loss / len(trainloader.sampler)    
      train_losses.append(avg_train_loss) 

      # Evaluationfor this fold
      valid_loss = 0.0 
      with torch.no_grad():
        # Iterate over the validation data and generate predictions
        for i, data in enumerate(validationloader, 0):

          # Get inputs
          inputs = data[:, 1:].to(torch.float32)
          targets = data[:, [0]].to(torch.float32)
          
          # Generate outputs
          outputs = network(inputs)

          loss = loss_function(outputs, targets)

          # Calculate loss
          valid_loss += loss.item() * inputs.size(0)

      
        avg_valid_loss = valid_loss / len(validationloader.sampler) #used to find the best parameters of the model
        validaition_losses.append(avg_valid_loss)


        # Early stopping
        if early_stopper.check_early_stop(avg_valid_loss):
          print("Early stopping:", epoch)
          break
      
    return avg_valid_loss,avg_train_loss,validaition_losses,train_losses


In [234]:
def execute_folds(kfold,dataset,batch_size,input_size, hidden_size, output_size, learning_rate, epochs,
    loss_function, momentum, opt, weight_decay):

    validation_avg_loss_fold = 0
    train_avg_loss_fold = 0
    validation_losses_fold = []
    train_losses_fold = []
    num_iterations = 0
    current_config = {
        'input_size': input_size,
        'hidden_size': hidden_size,
        'output_size': output_size,
        'learning_rate': learning_rate,
        'epochs': epochs,
        'momentum': momentum,
        'opt': opt,
        'weight_decay': weight_decay,
        "batch_size":batch_size,
        "loss_function":loss_function
    }
 
    for fold, (train_ids, val_ids) in enumerate(kfold.split(np.zeros(len(dataset)),dataset[:, 0])):

        # Sample elements randomly from a given list of ids, no replacement.
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids, gen) 
        validation_subsampler = torch.utils.data.SubsetRandomSampler(val_ids, gen) 

        trainloader = torch.utils.data.DataLoader(
                        dataset, 
                        batch_size=batch_size, sampler=train_subsampler)
        validationloader = torch.utils.data.DataLoader(
                        dataset,
                        batch_size=batch_size, sampler=validation_subsampler)    
        

        validation_loss,train_loss,validation_losses,train_losses = k_fold_model(learning_rate=learning_rate,epochs=epochs,hidden_size=hidden_size,input_size=input_size,loss_function=loss_function,momentum=momentum
                                                    ,opt=opt,output_size=output_size,trainloader=trainloader,weight_decay=weight_decay,validationloader=validationloader)   
        validation_avg_loss_fold  += validation_loss
        train_avg_loss_fold += train_loss
        validation_losses_fold.append(validation_losses)
        train_losses_fold.append(train_losses)

        num_iterations += 1


    #validation and train average over all folds
    validation_avg_loss_fold /= num_iterations
    train_avg_loss_fold /= num_iterations
    
    validation_losses_fold,train_losses_fold = Padding(validation_losses_fold,train_losses_fold)

    validation_losses_mean, train_losses_mean = Mean(validation_losses_fold, train_losses_fold,n_folds=num_iterations)
    
    plot_graph(train_losses_mean,validation_losses_mean,len(validation_losses),"validation") 
    


    return (validation_avg_loss_fold,train_avg_loss_fold,current_config)

function that executes the folds for each combination of parameters

In [235]:
def dogridsearch(dataset_train_part,params_grid,output_size,input_size,seeds):
    
    dataset = dataset_train_part
    # Set fixed random number seed
    loss_function = nn.MSELoss()

    # Define the K-fold Cross Validator
    kfold = CV 
    
    # K-fold Cross Validation model evaluation
    best_params = None

    actual_it = 0
    total_iterations = len(params_grid["epochs"]) * len(params_grid["optimizer"]) * len(params_grid["hidden_size"]) * len(params_grid["learning_rate"]) * len(params_grid["batch_size"]) * len(params_grid["weight_decay"]) * len(params_grid["momentum"]) 


    configurations = []

    for epochs, opt, hidden_size, learning_rate, batch_size, weight_decay, momentum in product(params_grid["epochs"],params_grid["optimizer"], params_grid["hidden_size"], params_grid["learning_rate"], params_grid["batch_size"], params_grid["weight_decay"], params_grid["momentum"]):
        #print the actual percentage of the grid search
        print(f'Actual iter {(actual_it/total_iterations)*100}%')
        for seed in seeds:
            print("Working with seed:",seed)
            torch.manual_seed(seed)
            (validation_avg_loss_fold,train_avg_loss_fold,current_config) = execute_folds(kfold=kfold,dataset=dataset,learning_rate=learning_rate,epochs=epochs,hidden_size=hidden_size,input_size=input_size,loss_function=loss_function,momentum=momentum
                                                            ,opt=opt,output_size=output_size,weight_decay=weight_decay,batch_size=batch_size)
            configurations.append((validation_avg_loss_fold, train_avg_loss_fold,current_config))
        
        actual_it += 1

    val_mse = []
    train_mse = []
    #best 
    for conf_val in configurations:
        val_mse.append(conf_val[0])
        train_mse.append(conf_val[1])
        if best_params is None or conf_val[0] < best_params[0]:
                current_config = conf_val[2]
                best_params = (conf_val[0],conf_val[1]
                ,current_config['learning_rate'], current_config['epochs'],current_config["loss_function"],current_config['hidden_size'],current_config['momentum'],current_config['opt'],
                current_config['weight_decay'],
                current_config['batch_size'])

    print("TRAIN MEAN MSE",np.mean(train_mse))
    print("TRAIN STD",np.std(train_mse))
    print("VALIDATION MEAN MSE",np.mean(val_mse))
    print("VALIDATION STD",np.std(val_mse))

    return best_params

test the created model and plot training/test error

In [236]:
def train_test_model(dataset_train_part,dataset_test_part,best_params,seeds):

    train_subsampler = torch.utils.data.SubsetRandomSampler(range(len(dataset_train_part)), gen)

    trainloader = torch.utils.data.DataLoader(
                        dataset_train_part, 
                        batch_size=best_params[9], sampler=train_subsampler)

    test_subsampler =  torch.utils.data.SubsetRandomSampler(range(len(dataset_test_part)), gen)
    testloader = torch.utils.data.DataLoader(
                        dataset_test_part, 
                        batch_size=best_params[9], sampler=test_subsampler)
    train_error_seed = []
    test_error_seed = []
    for seed in seeds:
        # Start the timer
        start = time.time()
        print("Working with seed:",seed)
        torch.manual_seed(seed)
        best_net,train_error,test_error = fit_model(learning_rate=best_params[2],epochs=best_params[3],hidden_size=best_params[5],input_size=17,loss_function=best_params[4],
                        momentum=best_params[6],opt=best_params[7],output_size=1,trainloader=trainloader,weight_decay=best_params[8],testloader=testloader) 
        train_error_seed.append(train_error)
        test_error_seed.append(test_error)
        end = time.time()
        print("Refit Time: {:.2f} seconds".format(end - start))


    print("TRAIN MEAN MSE",np.mean(train_error_seed))
    print("TRAIN STD",np.std(train_error_seed))
    print("TEST MEAN MSE",np.mean(test_error_seed))
    print("TEST STD",np.std(test_error_seed))

    return best_net

In [None]:
input_size = 17  
output_size = 1

'''params_grid_wide = {
    "hidden_size": [3, 4, 5],
    "learning_rate": [0.1,0.4,0.7,0.8],
    "batch_size": [4,8,12,32],
    "weight_decay": [0],
    "momentum": [0,0.1,0.4,0.7,0.8],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}


params_grid_precise = {
    "hidden_size": [4,5],
    "learning_rate": [0.7, 0.08, 0.9],
    "batch_size": [4],
    "weight_decay": [0],
    "momentum": [0.4,0.5,0.6],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}
'''


params_grid = {
    "hidden_size": [4],
    "learning_rate": [0.8],
    "batch_size": [4],
    "weight_decay": [0],
    "momentum": [0.5],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],

}
    

dataset_train_part = read_ds(TR_PATH_1)
dataset_test_part = read_ds(TS_PATH_1)

dataset = dataset_train_part



best_params = dogridsearch(dataset_train_part=dataset_train_part,params_grid=params_grid,output_size=output_size,input_size=input_size,seeds = SEEDS)
        




In [None]:
#Best parameters found
print(f"Best hidden size: {best_params[5]} \nBest learning rate: {best_params[2]} \nBest batch size: {best_params[9]} \nBest weight decay: {best_params[8]} \nBest momentum: {best_params[6]}")

print(best_params)

In [None]:
best_net = train_test_model(dataset_train_part=dataset_train_part,dataset_test_part=dataset_test_part,best_params=best_params,seeds=SEEDS)  



In [None]:
test_data = torch.from_numpy(dataset_test_part[:, 1:]).to(torch.float32)
val_labels = torch.from_numpy(dataset_test_part[:, [0]]).to(torch.float32)

#Print accuracy on test set
test_outputs = best_net(test_data).round().int().view(-1)

y_pred = best_net(test_data)
y_pred = y_pred.round().int().view(-1)
print("accuracy on test set {:.3f}".format(accuracy_score( val_labels,y_pred)))
print(classification_report(val_labels, 
                            y_pred, 
                            target_names=['0', '1']))

#print the confusion matrix
cm = confusion_matrix(val_labels, test_outputs)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0,1])
disp.plot()
plt.show()
print("------------------------------------------------------------------------------------------------------")


MONK 2

In [None]:
input_size = 17  
output_size = 1

'''params_grid_wide = {
    "hidden_size": [3, 4, 5],
    "learning_rate": [0.1,0.4,0.7,0.8],
    "batch_size": [4,8,12,32],
    "weight_decay": [0],
    "momentum": [0,0.1,0.4,0.7,0.8],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}


params_grid_precise = {
    "hidden_size": [4,5],
    "learning_rate": [0.7, 0.08, 0.9],
    "batch_size": [4],
    "weight_decay": [0],
    "momentum": [0.4,0.5,0.6],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}
'''

params_grid = {
    "hidden_size": [4],
    "learning_rate": [0.8],
    "batch_size": [4],
    "weight_decay": [0],
    "momentum": [0.5],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],


}
    

dataset_train_part = read_ds(TR_PATH_2)
dataset_test_part = read_ds(TS_PATH_2)

dataset = dataset_train_part

best_params = dogridsearch(dataset_train_part=dataset_train_part,params_grid=params_grid,output_size=output_size,input_size=input_size,seeds = [SEED])
        




In [None]:
#Best parameters found
print(f"Best hidden size: {best_params[5]} \nBest learning rate: {best_params[2]} \nBest batch size: {best_params[9]} \nBest weight decay: {best_params[8]} \nBest momentum: {best_params[6]}")

print(best_params)

In [None]:
best_net = train_test_model(dataset_train_part=dataset_train_part,dataset_test_part=dataset_test_part,best_params=best_params,seeds=SEEDS)  


In [None]:
test_data = torch.from_numpy(dataset_test_part[:, 1:]).to(torch.float32)
val_labels = torch.from_numpy(dataset_test_part[:, [0]]).to(torch.float32)

#Print accuracy on test set
test_outputs = best_net(test_data).round().int().view(-1)

y_pred = best_net(test_data)
y_pred = y_pred.round().int().view(-1)
print("accuracy on test set {:.3f}".format(accuracy_score( val_labels,y_pred)))
print(classification_report(val_labels, 
                            y_pred, 
                            target_names=['0', '1']))

#print the confusion matrix
cm = confusion_matrix(val_labels, test_outputs)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0,1])
disp.plot()
plt.show()
print("------------------------------------------------------------------------------------------------------")


MONK 3

In [None]:
input_size = 17  
output_size = 1
'''params_grid_wide = {
    "hidden_size": [3, 4, 5],
    "learning_rate": [0.1,0.4,0.7,0.8],
    "batch_size": [4,8,12,32],
    "weight_decay": [0.0001,0.02,0.001,0.1,0.2],
    "momentum": [0,0.1,0.4,0.7,0.8],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}


params_grid_precise = {
    "hidden_size": [4,5],
    "learning_rate": [0.1, 0.05, 0.2],
    "batch_size": [16,32],
    "weight_decay": [0.01,0.1,0.2],
    "momentum": [0.4, 0.01, 0.05, 0.1],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}
'''
params_grid = {
    "hidden_size": [4],
    "learning_rate": [0.1],
    "batch_size": [32],
    "weight_decay": [0.01],
    "momentum": [0.4],
    "epochs":[600],
    "optimizer":[torch.optim.SGD],
}

dataset_train_part = read_ds(TR_PATH_3)
dataset_test_part = read_ds(TS_PATH_3)

dataset = dataset_train_part
best_params = dogridsearch(dataset_train_part=dataset_train_part,params_grid=params_grid,output_size=output_size,input_size=input_size,seeds = [SEED])


In [None]:
#Best parameters found
print(f"Best hidden size: {best_params[5]} \nBest learning rate: {best_params[2]} \nBest batch size: {best_params[9]} \nBest weight decay: {best_params[8]} \nBest momentum: {best_params[6]}")

print(best_params)

In [None]:
best_net = train_test_model(dataset_train_part=dataset_train_part,dataset_test_part=dataset_test_part,best_params=best_params,seeds=SEEDS)  

In [None]:
test_data = torch.from_numpy(dataset_test_part[:, 1:]).to(torch.float32)
val_labels = torch.from_numpy(dataset_test_part[:, [0]]).to(torch.float32)

#Print accuracy on test set
test_outputs = best_net(test_data).round().int().view(-1)

y_pred = best_net(test_data)
y_pred = y_pred.round().int().view(-1)
print("accuracy on test set {:.3f}".format(accuracy_score( val_labels,y_pred)))
print(classification_report(val_labels, 
                            y_pred, 
                            target_names=['0', '1']))

#print the confusion matrix
cm = confusion_matrix(val_labels, test_outputs)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0,1])
disp.plot()
plt.show()
print("------------------------------------------------------------------------------------------------------")
