# Monk 1

## About the dataset
Monk datasets are made up of 6 descrete attributes with binary targets.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import random
import torch
from torch import nn

from sklearn.model_selection import train_test_split

In [None]:

import models_list as models_importer

from sklearn.metrics import accuracy_score
import warnings 
warnings.filterwarnings('ignore')
import time
start=time.time()


Preprocessing consists simply in getting the one-hot encoded version of the inputs and the targets. 

In [None]:
def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [None]:
def preprocessing(path, tensor=False):
    # read csv
    df = pd.read_csv(path, sep='\s+', skip_blank_lines=False, skipinitialspace=False, names=["class", 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'ID'])

    X_df = df.drop(['class', 'ID'], axis=1)
    y_df = df[['class']]

    # one-hot encoding
    X_df = pd.get_dummies(X_df, columns=X_df.columns)

    if tensor:
        X = torch.from_numpy(X_df.to_numpy(dtype=np.float32))
        y = torch.from_numpy(y_df.to_numpy(dtype=np.float32))
    else:
        X = X_df.to_numpy()
        y = y_df.to_numpy()
    
    X, y = unison_shuffled_copies(X, y)
    print(X.shape)
    print(y.shape)
    return X, y


In [None]:
path = 'monk/monks-1.train'
X_train, y_train = preprocessing(path)

path = 'monk/monks-1.test'
X_test, y_test = preprocessing(path)


# Workflow
The process is alalogous to the one used for regression. The main difference here is the model selection and model assessment approach.   
Now we select the best model with an hold-out in the train set (75% of data used for train and 25% used for validation).   
Since in this case we are given train and test sets separately, we perform model assessment with the pre-made hold-out.  


In this analysis the following models will be compared using the forementioned procedure: 
- ridge classification (sklearn)
- KNN classification
- SVM 
- deep model for classification (keras)
- RandNN for classification
- CNN for classification
- Cascade Correlation
- linear classification (pytorch)

In [None]:
#import classes and auxiliary functions
from SKLEARN_module import SklNet
from KERAS_module import KerasNet
import tools_for_classes as tools
import models_list as models_importer
from keras.utils.layer_utils import count_params
from sklearn.metrics import  make_scorer

In [None]:
#initialize lists for final comparison
final_acc = []
final_dimension = []
final_names = []

# KNN

In [None]:
hyperParameters_SKLEARN = {
                      "model__n_neighbors": np.arange(5, 35, 2),
                      "model__weights": ["uniform",  
                                                 "distance"], 
                      "model__metric": ["euclidean", "cityblock"], }


model = models_importer.build_KNN_Pipe_Clf()
mode = 'classification' 
modelName = 'knn'
net = SklNet(modelName, mode, model, X_train, np.ravel(y_train), hyperParameters_SKLEARN, X_test=X_test, y_test=y_test)
best_model, best_params_val, MSE_test, MSE_training, accuracy_training, accuracy_test  = net.train()

print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(MSE_test)
print("accuracy test")
print(accuracy_test)
print("best params")
print(best_params_val)

print("Effective free parameters")
#vd_dim=number of patterns/k (during training 75% of total data is seen -> 93)
print(93/best_params_val['model__n_neighbors'])

final_acc.append(accuracy_test)
final_dimension.append(93/best_params_val['model__n_neighbors'])
final_names.append(modelName)

# SVM

In [None]:
hyperParameters_SKLEARN = {
                         'C':[0.01,0.1, 0.2, 0.3, 0.7, 1.0, 2.0], #small C => high value of slack => underfit
                                                   #high C  => small value of slack => overfit
                         #'degree' : [1,2,3,4] not used if kernel is linear
}

model = models_importer.build_SVM(500)
mode = 'clf' 
modelName='svm'
net = SklNet(modelName, mode, model, X_train, np.ravel(y_train), hyperParameters_SKLEARN, X_test = X_test, y_test = y_test)
best_model, best_params_val, MSE_test, MSE_training, accuracy_training, accuracy_test  = net.train()

print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(MSE_test)
print("accuracy test")
print(accuracy_test)
print("best params")
print(best_params_val)


print("Effective free parameters")
#n+1 where n is the dimension of the pattern
print(18)

final_acc.append(accuracy_test)
final_dimension.append(18)
final_names.append(modelName)

# Ridge

In [None]:
hyperParameters_SKLEARN = {
                       'alpha': [0.001, 0.00001, 0.7, 0.9, 1, 2],
                       'eta0' : [0.01, 0.1, 0.001, 1], #init learning rate
                       
}

model = models_importer.build_RidgeClassifier(max_iter=50)
mode = 'classification' 
modelName='ridge'
net = SklNet(modelName, mode, model, X_train, np.ravel(y_train), hyperParameters_SKLEARN, X_test=X_test, y_test=y_test)
best_model, best_params_val, MSE_test, MSE_training, accuracy_training, accuracy_test = net.train()

print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(MSE_test)
print("accuracy test")
print(accuracy_test)
print("best params")
print(best_params_val)

print("Effective free parameters")
print(17*2)

final_acc.append(accuracy_test)
final_dimension.append(17*2)
final_names.append(modelName)

# Deep classification model

In [None]:
# mini batch size 32
mode='classification'
hyperp = {
               'output_units' : 1,
                'units' : [2,3,4,5,6],
                'dropout' : [1e-3, 0.0],
                'learning_rate': [1e-2, 1e-3],
                'decay': [0.0, 1e-4, 1e-3],
                'depth':[1,3,1],
                'activation_hidden': 'relu',
                'activation_output':'sigmoid',
                'metric': 'accuracy'
            }
models_importer.set_input_size(len(X_train[0]))
models_importer.set_hyperp(hyperp)
modelBuilder = models_importer.get_deepNN
tot_trials = tools.get_search_spaze_size(hyperp)
print(tot_trials)
modelName='deep_mb'
tunerParameters = {
            'directory' : 'tuner',
            'project_name':'deep_mb',
            'batch_size': 32,
            'max_trials' : 0.2*tot_trials
            #'max_trials' : 0.002*tot_trials,
        }

net = KerasNet(modelName=modelName, mode=mode, X=X_train, y=y_train, tunerParameters=tunerParameters, modelBuilder=modelBuilder, X_test=X_test, y_test=y_test )
best_model, best_hps, MSE_training, accuracy_training, best_model_MSE_TEST, accuracy_test = net.train()

    
print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(best_model_MSE_TEST)
print("accuracy test")
print(accuracy_test)
print("Best params")
print(best_hps.values)

print("Effective free parameters")
print(count_params(best_model.trainable_weights))

final_acc.append(accuracy_test)
final_dimension.append(count_params(best_model.trainable_weights))
final_names.append(modelName)

In [None]:
# batch
mode='classification'
hyperp = {
               'output_units' : 1,
                'units' : [2,3,4,5,6],
                'dropout' : [1e-3, 0.0],
                'learning_rate': [1e-2, 1e-3],
                'decay': [0.0, 1e-4, 1e-3],
                'depth':[1,3,1],
                'activation_hidden': 'relu',
                'activation_output':'sigmoid',
                'metric': 'accuracy'
            }
models_importer.set_input_size(len(X_train[0]))
models_importer.set_hyperp(hyperp)
modelBuilder = models_importer.get_deepNN
tot_trials = tools.get_search_spaze_size(hyperp)
print(tot_trials)
modelName='deep_b'
tunerParameters = {
            'directory' : 'tuner',
            'project_name':'deep_b',
            'batch_size': 124,
            'max_trials' : 0.2*tot_trials
            #'max_trials' : 0.002*tot_trials,
        }

net = KerasNet(modelName=modelName, mode=mode, X=X_train, y=y_train, tunerParameters=tunerParameters, modelBuilder=modelBuilder, X_test=X_test, y_test=y_test )
best_model, best_hps, MSE_training, accuracy_training, best_model_MSE_TEST, accuracy_test = net.train()

    
print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(best_model_MSE_TEST)
print("accuracy test")
print(accuracy_test)
print("Best params")
print(best_hps.values)

print("Effective free parameters")
print(count_params(best_model.trainable_weights))

final_acc.append(accuracy_test)
final_dimension.append(count_params(best_model.trainable_weights))
final_names.append(modelName)

In [None]:
# RandNN

In [None]:
mode='classification'
hyperp = {
                'output_units' : 1,
                'units' : [3,4,5,6],
                'dropout' : [1e-3, 0.0],
                'learning_rate': [1e-2, 1e-3],
                'decay': [0.0, 1e-4, 1e-3],
                'depth':[1,4,1],
                'activation_hidden': 'relu',
                'activation_output':'sigmoid',
                'metric': 'accuracy'
            }
models_importer.set_input_size(len(X_train[0]))
models_importer.set_hyperp(hyperp)
modelBuilder = models_importer.get_RandNN
tot_trials = tools.get_search_spaze_size(hyperp)
print(tot_trials)
modelName='randNN'
tunerParameters = {
            'directory' : 'tuner',
            'project_name':'deep_b',
            'batch_size': 124,
            'max_trials' : 0.2*tot_trials
            #'max_trials' : 0.002*tot_trials,
        }

net = KerasNet(modelName=modelName, mode=mode, X=X_train, y=y_train, tunerParameters=tunerParameters, modelBuilder=modelBuilder, X_test=X_test, y_test=y_test )
best_model, best_hps, MSE_training, accuracy_training, best_model_MSE_TEST, accuracy_test = net.train()

    
print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(best_model_MSE_TEST)
print("accuracy test")
print(accuracy_test)
print("Best params")
print(best_hps.values)

print("Effective free parameters")
print(count_params(best_model.trainable_weights))

final_acc.append(accuracy_test)
final_dimension.append(count_params(best_model.trainable_weights))
final_names.append(modelName)

# CNN

In [None]:
mode='classification'
hyperp = {
               'output_units' : 1,
                'units' : [4,6,8],
                'learning_rate': [1e-2, 1e-3],
                'decay': [0.0, 1e-4, 1e-3],
                'activation_hidden': 'relu',
                'activation_output':'sigmoid',
                'metric': 'accuracy'
            }

models_importer.set_input_size(len(X_train[0]))
models_importer.set_hyperp(hyperp)
modelBuilder = models_importer.get_CNN
tot_trials = tools.get_search_spaze_size(hyperp)
print(tot_trials)
modelName='cnn'
tunerParameters = {
            'directory' : 'tuner',
            'project_name':'cnn',
            'batch_size': 124,
            'max_trials' : 0.2*tot_trials
            #'max_trials' : 0.002*tot_trials,
        }

net = KerasNet(modelName=modelName, mode=mode, X=X_train, y=y_train, tunerParameters=tunerParameters, modelBuilder=modelBuilder, X_test=X_test, y_test=y_test )
best_model, best_hps, MSE_training, accuracy_training, best_model_MSE_TEST, accuracy_test = net.train()

    
print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(best_model_MSE_TEST)
print("accuracy test")
print(accuracy_test)
print("Best params")
print(best_hps.values)

print("Effective free parameters")
print(count_params(best_model.trainable_weights))

final_acc.append(accuracy_test)
final_dimension.append(count_params(best_model.trainable_weights))
final_names.append(modelName)

# Cascade Correlation

In [None]:
trainingParameters_KERAS = {
            'directory' : 'tuner',
            'project_name':'classification_monk',
            'batch_size': 1,
            'max_trials' : 10,
        }

hyperp = {}

mode = 'classification'
models_importer.set_input_size(len(X_train[0]))
models_importer.set_hyperp(hyperp)
modelName='cc'

modelBuilder = models_importer.get_CC_units
net = KerasNet(modelName=modelName, mode=mode, X=X_train, y=y_train, tunerParameters=trainingParameters_KERAS,
                        modelBuilder=modelBuilder, X_test=X_test, y_test=y_test)

#best_hps = num.hidden
model, MSE_training, accuracy_training, best_model_MSE_TEST, accuracy_test, num_hidden = net.train()
print("MSE train")
print(MSE_training)
print("accuracy train")
print(accuracy_training)
print("MSE test")
print(best_model_MSE_TEST)
print("accuracy test")
print(accuracy_test)
print("Best params")
print(num_hidden)

print("Effective free parameters")
print(tools.get_param_cc(n_in=17, n_out=1, n_hid=sum(num_hidden)))

final_acc.append(accuracy_test)
final_dimension.append(tools.get_param_cc(n_in=17, n_out=1, n_hid=sum(num_hidden)))
final_names.append(modelName)

# Pytorch

In [None]:
%pip install torchmetrics

In [None]:
from tools_for_Pytorch import EarlyStopping, weights_init_uniform_fan_in, count_parameters
from tools_for_classes import save_plot
import torchmetrics

In [None]:
path = 'monk/monks-1.train'
X_dev, y_dev = preprocessing(path, tensor=True)

path = 'monk/monks-1.test'
X_test, y_test = preprocessing(path, tensor=True)

In [None]:
VAL_SPLIT = 0.25
X_train, X_val, y_train, y_val = train_test_split(X_dev, y_dev, test_size=VAL_SPLIT, shuffle = True, random_state=42)

In [None]:
global test_acc_list

test_acc_list = []

def train(name, model, optimizer, X_train, y_train, X_val, y_val, X_test, y_test):

    loss_fn = nn.MSELoss()
    early_stopping = EarlyStopping(delta=0.01)
    train_accuracy = torchmetrics.Accuracy(task='binary')
    val_accuracy = torchmetrics.Accuracy(task='binary')
    test_accuracy = torchmetrics.Accuracy(task='binary')

    torch.manual_seed(42)

    epochs = 500

    epoch_count = []

    train_loss_values = []
    val_loss_values = []
    test_loss_values = []

    train_acc_values = []
    val_acc_values = []
    test_acc_values = []

    for epoch in range(epochs):

        # train mode
        model.train()

        # 1. Forward pass on train data
        train_pred = model(X_train)
        
        # 2. Calculate the loss and the accuracy
        train_loss = loss_fn(train_pred, y_train)
        train_acc = train_accuracy(train_pred, y_train)

        # 3. Zero grad of the optimizer
        optimizer.zero_grad()
        
        # 4. Backpropagation
        train_loss.backward()
        
        # 5. Progress the optimizer
        optimizer.step()
        
        # evaluation mode
        model.eval()
        
        # make predictions with model without gradient tracking 
        with torch.inference_mode():

            # 1. Forward pass on validation and test data
            val_pred = model(X_val)
            test_pred = model(X_test)

            # 2. Calculate loss and accuracy on validation and test data        
            val_loss = loss_fn(val_pred, y_val)                    
            test_loss = loss_fn(test_pred, y_test)
            val_acc = val_accuracy(val_pred, y_val)
            test_acc = test_accuracy(test_pred, y_test)        
        
        train_accuracy.reset()
        val_accuracy.reset()
        test_accuracy.reset()

        epoch_count.append(epoch)

        train_loss_values.append(train_loss)
        val_loss_values.append(val_loss)
        test_loss_values.append(test_loss)

        train_acc_values.append(train_acc)
        val_acc_values.append(val_acc)
        test_acc_values.append(test_acc)

        # early_stopping needs the validation loss to check if it has decreased, 
        # and if it has, it will make a checkpoint of the current model
        early_stopping(val_loss, model)
        
        if early_stopping.early_stop:
            print("Early stopping")
            break
            
        if epoch % 10 == 0:
            print(f"Epoch is {epoch:<3} | Training loss: {train_loss:.3f} | Validation loss: {val_loss:.3f} | Training accuracy: {train_acc:.3f} | Validation accuracy: {val_acc:.3f}")

    print(f"Epoch is {epoch:<3} \nTraining loss: {train_loss:.3f} | Validation loss: {val_loss:.3f}| Test loss: {test_loss:.3f} \nTraining accuracy: {train_acc:.3f} | Validation accuracy: {val_acc:.3f}| Test accuracy: {test_acc:.3f}")
    
    test_acc_list.append(test_acc_values[-1])

    plt.figure(figsize=(9, 6))
    plt.subplot(2, 2, 1)
    plt.plot(epoch_count, np.array(torch.tensor(train_loss_values).numpy()), label="Training Loss")
    plt.plot(epoch_count, val_loss_values, label="Validation Loss", linestyle='dashed')
    plt.title("TR and VS Loss")
    plt.ylabel("Loss")
    plt.xlabel("Epochs")
    plt.legend()
    plt.subplot(2, 2, 2)
    plt.plot(epoch_count, np.array(torch.tensor(train_acc_values).numpy()), label="Training Accuracy")
    plt.plot(epoch_count, val_acc_values, label="Validation Accuracy", linestyle='dashed')
    plt.title("TR and VS Accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel("Epochs")
    plt.legend()
    plt.suptitle(name)
    plt.tight_layout()
    folder = 'Pytorch_Monk1-plots'
    save_plot(folder, name)
    plt.show()

In [None]:
test_acc_list = []

name = 'pt_LC'

w_init=weights_init_uniform_fan_in
lr=1

model = nn.Sequential(
     nn.Linear(in_features=17, out_features=1),
     nn.Sigmoid()
)

model.apply(w_init)

optimizer = torch.optim.SGD(model.parameters(), lr=lr)

train(name, model, optimizer, X_train, y_train, X_val, y_val, X_test, y_test)

parameters_count = count_parameters(model)
print(f'Effective free parameters: {parameters_count}')

test_acc = test_acc_list[-1]

final_acc.append(test_acc)
final_dimension.append(parameters_count)
final_names.append(name)

In [None]:
test_acc_list = []

name = 'pt_2hid'

w_init=weights_init_uniform_fan_in
hid = 2
lr=0.3

model = nn.Sequential(
     nn.Linear(in_features=17, out_features=hid),
     nn.Linear(in_features=hid, out_features=1),
     nn.Sigmoid()
)

model.apply(w_init)

optimizer = torch.optim.SGD(model.parameters(), lr=lr)

train(name, model, optimizer, X_train, y_train, X_val, y_val, X_test, y_test)

parameters_count = count_parameters(model)
print(f'Effective free parameters: {parameters_count}')

test_acc = test_acc_list[-1]

final_acc.append(test_acc)
final_dimension.append(parameters_count)
final_names.append(name)

In [None]:
test_acc_list = []

name = 'pt_4hid'

w_init=weights_init_uniform_fan_in
hid = 4
lr=0.4

model = nn.Sequential(
     nn.Linear(in_features=17, out_features=hid),
     nn.Linear(in_features=hid, out_features=1),
     nn.Sigmoid()
)

model.apply(w_init)

optimizer = torch.optim.SGD(model.parameters(), lr=lr)

train(name, model, optimizer, X_train, y_train, X_val, y_val, X_test, y_test)

parameters_count = count_parameters(model)
print(f'Effective free parameters: {parameters_count}')

test_acc = test_acc_list[-1]

final_acc.append(test_acc)
final_dimension.append(parameters_count)
final_names.append(name)

# Final comparison

In [None]:
final_dim = final_dimension
print(final_dim)
#final_dim = np.array(final_dim)/124
plt.figure()
fig, ax = plt.subplots()
ax.errorbar(x=final_dim, y= final_acc, fmt='.')
plt.xlabel('effective_free_params')
plt.ylabel('Accuracy')
for i, txt in enumerate(final_names):
    ax.annotate(txt, (final_dim[i], final_acc[i]))
folder = 'Monk-plots'
tools.save_plot(folder, 'final_plot')
plt.show()

print(f'Elapsed time {time.time()-start}')