# Training the ANN models

## Description

This notebook contains the code to train and optimize the ANN models on the CSD (and PDB) data.

In [None]:
# General imports.
import os
import sys

# Add the source code directory in the search path.
module_path = os.path.abspath(os.path.join("../src/"))

if module_path not in sys.path:
    sys.path.append(module_path)
# _end_if_

import json
from datetime import datetime
from pathlib import Path
from time import perf_counter

import h5py
import joblib
import matplotlib.pyplot as plt
import numpy as np
from metal_auxiliaries import METAL_TARGETS, MetalAtom
from metal_pdb_data import MetalPdbData
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    accuracy_score,
    balanced_accuracy_score,
    classification_report,
)
from sklearn.model_selection import cross_val_score
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler
from skopt import BayesSearchCV
from skopt.space import Categorical, Integer, Real

In [13]:
# Dictionary of class targets:
# 1) the first value corresponds to the numerical target
# 2) and the second to its number of coordinates
CLASS_TARGETS = {
    "LIN": (0, 2),
    "TRI": (1, 3),
    "TET": (2, 4),
    "SPL": (3, 4),
    "SQP": (4, 5),
    "TBP": (5, 5),
    "OCT": (6, 6),
}

## Data locations

Here we set the directories where the data are located.

In [None]:
# CSD data: size=(109001, 37)
csd_path = Path("../data/CSD/")

# PDB data: size=(2960, 37)
pdb_path = Path("../data/PDB/")

# Output (temporary) directory.
output_path = Path("../results/")

## Load the data for ML algorithms

Load the data and split them in train/test. The limit is usually set at 10% (for the test size).

In [None]:
# Set the trainning data type: {CSD or PDB}
train_type = "CSD"

# Complete structure.
data_set = {"Train": None, "Test": None}

# Test size ~ 10%.
test_split = 0.10

# Load the data files.
if train_type == "CSD":

    # Load CSD data.
    with h5py.File(Path("../data/csd_metal_data.h5"), mode="r") as hdf_file:

        # Make sure they are numpy arrays.
        CSD_DATA = np.array(hdf_file["data"], dtype=float)
    # _end_with_

    # Total number of test points.
    test_size = int(len(CSD_DATA) * test_split)

    # Split test set.
    data_set["Test"] = CSD_DATA[0:test_size, :]

    # Split train set.
    data_set["Train"] = CSD_DATA[test_size:, :]

else:

    # Load PDB data.
    with h5py.File(Path("../data/pdb_metal_data.h5"), mode="r") as hdf_file:

        # Make sure they are numpy arrays.
        PDB_DATA = np.array(hdf_file["data"], dtype=float)
    # _end_with_

    # Total number of test points.
    test_size = int(len(PDB_DATA) * test_split)

    # Split test set.
    data_set["Test"] = PDB_DATA[0:test_size, :]

    # Split train set.
    data_set["Train"] = PDB_DATA[test_size:, :]
# _end_if_

# Split test/train data.
x_train = data_set["Train"][:, 0:-1].copy()
x_test = data_set["Test"][:, 0:-1].copy()

y_train = data_set["Train"][:, -1].copy()
y_test = data_set["Test"][:, -1].copy()

# Print the dimensions of the datasets.
print(f"{train_type} --> {train_type}")
print(f"Dataset sizes: ")
print(f"X-train: {x_train.shape}, y-train: {y_train.shape}")
print(f"X-test : {x_test.shape}, y-test: {y_test.shape}")
print("Done!\n")

## Bayesian Hyper-Parameter Optimization

Setup a search grid and perform Bayesian HPO (using Gaussian Processes).

In [None]:
# Create the search space for the HPO.
search_space = {
    "mlp__activation": Categorical(["relu", "tanh"]),
    "mlp__hidden_layer_sizes": Integer(10, 1024),
    "mlp__shuffle": Categorical([True, False]),
    "mlp__alpha": Real(1.0e-4, 1.0e-1, prior="log-uniform"),
    "mlp__learning_rate_init": Real(1.0e-3, 1.0e-1, prior="log-uniform"),
    "mlp__batch_size": Integer(16, 1024),
}

# Create the HPO pipeline.
hpo_pipeline = Pipeline(
    steps=[
        ("data_scaler", RobustScaler(copy=True)),
        ("mlp", MLPClassifier(solver="adam", max_iter=200, early_stopping=False)),
    ]
)

# Search across different combinations using Bayesian optimization.
hpo_search = BayesSearchCV(
    estimator=hpo_pipeline,
    search_spaces=search_space,
    refit=True,
    n_iter=50,
    cv=5,
    scoring="balanced_accuracy",
    verbose=5,
    n_jobs=5,
    return_train_score=True,
    random_state=911,
    error_score=np.nan,
)

# Initial time.
t0 = perf_counter()

# Display the model we are optimizing.
print(f"Optimizing ANN model ...\n")

# Fit the HPO search model.
hpo_search.fit(x_train, y_train)

# Final time.
tf = perf_counter()

# Display information.
print(f"HPO finished in {tf-t0:.2f} seconds.\n")

# Display best parameters.
print(f" Best score {hpo_search.best_score_}, with parameters are: \n")
for b_param, b_value in hpo_search.best_params_.items():
    print(f" {b_param} --> {b_value}")
# _end_for_

# Save the best model.
joblib.dump(
    hpo_search.best_estimator_,
    Path(f"../models/HPO_{train_type}_{train_type}_CV.model"),
    compress="zlib",
)

# Display final message.
print(" Done!")

## Cross-Validation Score

This step is not necessary, but it can help to verify the results of the HPO.

In [None]:
# Load an HPO model.
clf_model = joblib.load(Path(f"../models/HPO_{train_type}_{train_type}_CV.model"))

# First time.
t0 = perf_counter()

# Run the K-Fold cross-validation.
cv_scores = cross_val_score(
    clf_model, x_train, y_train, scoring="balanced_accuracy", n_jobs=5, verbose=0, cv=5
)

# Final time.
tf = perf_counter()

# Display information.
print(f"Finished K-Fold cross-validation in {tf-t0:.2f} seconds.\n")

# Print the scores.
print("Cross-validation Scores: ")
for i, sc in enumerate(cv_scores, start=1):
    print(f"Fold: {i}, Score= {sc}")
# _end_for_

print(f"Mean={cv_scores.mean():.4f}, STD= {cv_scores.std():.4f}")

## Validate the model on the test data

Once a model has been optimized we can validate it on the test set.

In [None]:
# First time.
t0 = perf_counter()

# Get the predictions on the test set.
y_pred = clf_model.predict(x_test)

# Last time.
t1 = perf_counter()

# Display information.
print(f"Finished predicting test set: {x_test.shape}, in {t1-t0:.2f} seconds.\n")

# Get the final report.
clf_rep = classification_report(
    y_test, y_pred, output_dict=True, target_names=CLASS_TARGETS.keys()
)

# Create a large figure.
_, ax = plt.subplots(figsize=(12, 12))

# Create a Confusion matrix.
matrix = ConfusionMatrixDisplay.from_predictions(
    y_test,
    y_pred,
    ax=ax,
    cmap=plt.cm.GnBu,
    xticks_rotation=45.0,
    normalize="true",
    display_labels=CLASS_TARGETS.keys(),
    values_format=".2f",
    colorbar=False,
)

# Setup the title.
ax.set_title(
    "Balanced Accuracy = {0:.2f}%, F1-weighted = {1:.2f}%".format(
        100.0 * balanced_accuracy_score(y_test, y_pred),
        100.0 * clf_rep["weighted avg"]["f1-score"],
    ),
    fontsize=18,
)
ax.set_xlabel("Predicted label", fontsize=16)
ax.set_ylabel("True label", fontsize=16)

# Set the custom colorbar.
matrix.im_.autoscale()
plt.colorbar(matrix.im_, fraction=0.046, pad=0.04)

# Save the figure.
plt.savefig(
    Path(output_path / "plots" / f"{train_type}_{train_type}_confusion_matrix.png"),
    bbox_inches="tight",
    dpi=300,
    facecolor="w",
    edgecolor="w",
    orientation="landscape",
)

# Save the classification to a json file.
with open(
    Path(
        output_path
        / "reports"
        / f"{train_type}_{train_type}_classification_report.json"
    ),
    mode="w",
) as json_file:
    json.dump(clf_rep, json_file, indent=4)
# _end_with_

# Get the loss-curve.
loss_curve = clf_model["mlp"].loss_curve_

# Plot the figure.
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(loss_curve)
ax.set_title(f"Multi Layer Perceptron Classifier")
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
plt.grid(True)

# Save the figure.
plt.savefig(
    Path(output_path / "plots" / f"{train_type}_{train_type}_loss_curve.png"),
    bbox_inches="tight",
    facecolor="w",
    edgecolor="w",
    dpi=300,
    orientation="landscape",
)

### End of notebook