## Table of contents
* [Define parameters](#parameters)
* [Class definitions](#class)
* [Function definitions](#function)
  * [Optimize KNN neighbours](#optimizeknn)
  * [KNN](#knn)
  * [Calculate accuracy](#accuracy)
  * [MLP](#mlp)
  * [Plot](#plot)
  * [Main](#main)
* [Run](#run)


In [None]:
import re
import pickle
import glob
from pathlib import Path

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

import faiss
from collections import Counter

from pycm import ConfusionMatrix
from tqdm import tqdm

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

import torch
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ExponentialLR
from torch import FloatTensor, LongTensor

from torch import nn
from torch.utils.data import Dataset

from sklearn.neighbors import KNeighborsClassifier

from warnings import simplefilter
simplefilter(action='ignore', category=FutureWarning)

## Define parameters <a class="anchor" id="parameters"></a>

Change EC_LEVEL, FP_TYPE and MODEL_TYPE as desired.

We suggest to train 4 models for ec123 to compare the different approaches:
drfp+knn, drfp+mlp, rxnfp+knn and rxnfp+mlp

In [None]:
EC_LEVEL   = 'ec123' # 'ec1' or 'ec12' or 'ec123'
FP_TYPE    = 'drfp'  # 'drfp' or 'rxnfp'
MODEL_TYPE = 'knn'   # 'knn' or 'mlp'
INFO       = f'Training {MODEL_TYPE.upper()} model using {FP_TYPE.upper()} fingerprints to predict {EC_LEVEL.upper()}.'

DATA_PATH  = 'experiments/data/'
MODEL_PATH = 'models'
trainset = f'{DATA_PATH}{FP_TYPE}-0-{EC_LEVEL}-train.csv' # file with training dataset
validset = f'{DATA_PATH}{FP_TYPE}-0-{EC_LEVEL}-valid.csv' # file with validation dataset
testset  = f'{DATA_PATH}{FP_TYPE}-0-{EC_LEVEL}-test.csv'  # file with test dataset
output   = f'{MODEL_PATH}/rheadb-{EC_LEVEL}_{FP_TYPE}_{MODEL_TYPE}' # file prefix for output

modeltype = MODEL_TYPE

OPTIMIZE_KNN = False # Set this to True to run function 'optimizeNumNeighbors'

In [None]:
Path(MODEL_PATH).mkdir(exist_ok=True)
Path('figures').mkdir(exist_ok=True)

## Class definitions <a class="anchor" id="class"></a>

In [None]:
class ReactionDataset(Dataset):
    def __init__(self, df: pd.DataFrame, label: str = "label"):
        self.size = len(df)
        self.label = label
        self.X = FloatTensor(
            np.array([x.astype(np.float32) for x in df.fps], dtype=np.float32)
        )
        self.y = LongTensor(df[self.label].to_numpy(dtype=np.int32))
        self.fps = df["fps"]
        self.rxn_smiles = df["rxn_smiles"]

    def __getitem__(self, i):
        return (
            self.X[i],
            self.y[i],
        )

    def __len__(self):
        return self.size

class MLPClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLPClassifier, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.fc1 = nn.Linear(self.input_size, self.hidden_size)
        self.tanh = nn.Tanh()
        self.fc2 = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        hidden = self.fc1(x)
        tanh = self.tanh(hidden)
        output = self.fc2(tanh)
        return output


## Function definitions <a class="anchor" id="function"></a>

In [None]:
def get_device():
    if torch.cuda.is_available():
        dev = "cuda:0"
    else:
        dev = "cpu"

    return torch.device(dev)

## Optimize KNN neighbours <a class="anchor" id="optimizeknn"></a>

Optional function to find the optimal number of neighbors for the k-nearest neighbors algorithm.

If you run this by <a href=#parameters>setting the parameter <code>OPTIMIZE_KNN = True</code></a>, you will see that the optimal number of neighbors is 2, which is the default value of the function <code>get_nearest_neighbours_prediction</code>.

In [None]:
def optimizeNumNeighbors(df_train, df_test):
    print('Testing different numbers of neighbors for KNN algorithm.')
    accuracies = []
    X_train, y_train = np.array(df_train['fps'].to_list()), df_train['label']
    X_test, y_test = np.array(df_test['fps'].to_list()), df_test['label']
    for i in range(1,21):
        y_pred = get_nearest_neighbours_prediction(X_train, y_train, X_test, i)
        accuracy = getSklearnAccuracy(y_test, y_pred)
        accuracies.append(accuracy)
    
    plt.scatter(range(1,21), accuracies)
    plt.savefig(f'figures/knn_testing.png')
    plt.show()

### KNN <a class="anchor" id="knn"></a>

In [None]:
def get_nearest_neighbours_prediction(
    train_X: np.array, train_y: np.array, eval_X: np.array, n_neighbours: int = 2
) -> list:
    """
    Use faiss to make a K-nearest neighbour prediction
    """
    # Indexing
    print("\nKNN number of neighbours is", n_neighbours)
    index = faiss.IndexFlatL2(len(train_X[0]))
    index.add(train_X.astype(np.float32))

    # Querying
    _, results = index.search(eval_X.astype(np.float32), n_neighbours)

    # Scoring
    y_pred = get_pred(train_y, results)

    return y_pred

def get_pred(y: list, results: list) -> list:
    """
    Get most common label from nearest neighbour list.
    """
    y_pred = []
    for i, r in enumerate(results):
        y_pred.append(Counter(y[r]).most_common(1)[0][0])
        
    return y_pred

def trainKNN(X_train, y_train):
    """
    The scikit-learn version of knn. Deprecated since it does not give as good results as faiss.
    """
    knn_model = KNeighborsClassifier(n_neighbors=5)
    knn_model.fit(X_train, y_train)

    return knn_model

## Calculate accuracy <a class="anchor" id="accuracy"></a>

In [None]:
# Used for KNN.
def getSklearnAccuracy(y_test, y_pred):
    """
    The default function to calculate accuracy with scikit learn.
    """
    accuracy = accuracy_score(y_test, y_pred)
    print(f"\n=> Accuracy according to scikit learn: {accuracy:.2f}")
    
    return accuracy

# Used for MLP.
def get_accuracy(model, device, data_loader):
    correct = 0.0
    total = 0.0

    y = []
    y_pred = []

    for data, labels in data_loader:
        data, labels = data.to(device), labels.to(device)

        target = model(data)
        pred = target.max(1, keepdim=True)[1] # Get the index of the max logit.
        correct += pred.eq(labels.view_as(pred)).sum().item()
        total += int(labels.shape[0])

        y.extend(torch.flatten(labels.view_as(pred)).tolist())
        y_pred.extend(torch.flatten(pred).tolist())

    return correct / total, y, y_pred

def confusionMatrixCalculate(y, y_pred):
    cm = ConfusionMatrix(actual_vector=y, predict_vector=y_pred)
    print('\n=> Accuracy according to confusion matrix:')
    #print(cm.overall_stat)
    print(f"   Accuracy: {cm.overall_stat['Overall ACC']:.2f}")
    print(f"   MCC:      {cm.overall_stat['Overall MCC']:.2f}")
    print(f"   CEN:      {cm.overall_stat['Overall CEN']:.2f}")
    return cm

## MLP <a class="anchor" id="mlp"></a>

In [None]:
def train(model, device, train_loader, valid_loader, optimizer, criterion, epoch):
    model.train()

    train_loss = 0.0
    for _, (data, labels) in tqdm(enumerate(train_loader), total=len(train_loader)):
        data, labels = data.to(device), labels.to(device)

        # Set gradients to zero
        optimizer.zero_grad()

        # Forward pass
        target = model(data)

        # Calculate the loss
        loss = criterion(target, labels)

        # Calculate the gradients
        loss.backward()

        # Update weights
        optimizer.step()

        # Calculate the training loss
        train_loss += loss.item()

    # Validation
    valid_loss = 0.0
    model.eval()
    for data, labels in valid_loader:
        if torch.cuda.is_available():
            data, labels = data.to(device), labels.to(device)

        target = model(data)
        loss = criterion(target, labels)
        valid_loss += loss.item()

    train_loss = train_loss / len(train_loader)
    valid_loss = valid_loss / len(valid_loader)

    train_acc, _, _ = get_accuracy(model, device, train_loader)
    valid_acc, _, _ = get_accuracy(model, device, valid_loader)
    print(f"Epoch {epoch}: Training loss:     {train_loss}, Validation loss:     {valid_loss}.")
    print(f"Epoch {epoch}: Training accuracy: {train_acc}, Validation accuracy: {valid_acc}.")

    return (train_loss, valid_loss, train_acc, valid_acc)

def train_test_model(
    data_set_train,
    data_set_valid,
    data_set_test,
    label_encoder,
    device,
    input_dim: int = 10,
    hidden_dim: int = 1664,
    output_dim: int = 2,
    epochs: int = 10,
    patience: int = 5,
) -> ConfusionMatrix:
    
    model = MLPClassifier(input_dim, hidden_dim, output_dim)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = ExponentialLR(optimizer, gamma=0.9)
    model.to(device)

    training_matrics = {
        "train_loss": [],
        "valid_loss": [],
        "train_acc": [],
        "valid_acc": [],
    }
    for epoch in range(epochs):
        train_loss, valid_loss, train_acc, valid_acc = train(
            model,
            device,
            DataLoader(data_set_train, batch_size=64),
            DataLoader(data_set_valid, batch_size=64),
            optimizer,
            criterion,
            epoch,
        )

        scheduler.step()

        # Early stopping based on past mean.
        if len(training_matrics["valid_loss"]) >= patience:
           mean_past = np.mean(np.array(training_matrics["valid_loss"][-patience:]))
           if valid_loss - mean_past > -0.001:
               break

        training_matrics["train_loss"].append(train_loss)
        training_matrics["valid_loss"].append(valid_loss)
        training_matrics["train_acc"].append(train_acc)
        training_matrics["valid_acc"].append(valid_acc)

    plot(training_matrics["train_loss"], training_matrics["valid_loss"])

    test_acc, y, y_pred = get_accuracy(
        model, device, DataLoader(data_set_test, batch_size=64)
    )
    y = label_encoder.inverse_transform(y)
    y_pred = label_encoder.inverse_transform(y_pred)

    print(f"\n=> Accuracy of test: {test_acc:.2f}")

    cm = confusionMatrixCalculate(y, y_pred)

    return model, cm, training_matrics, y_pred

## Plot <a class="anchor" id="plot"></a>

In [None]:
def plot(train_loss, valid_loss):
    """
    Plotting the progress of training the MLP
    """

    plt.plot(train_loss, label="train_loss")
    plt.plot(valid_loss, label="valid_loss")
    plt.legend(loc='upper right')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.title(f'{FP_TYPE}-{EC_LEVEL}')
    plt.ylim(bottom=0)
    
    plt.savefig(f'figures/{FP_TYPE}-{EC_LEVEL}.png', format='png')

## Main <a class="anchor" id="main"></a>

In [None]:
def main(trainset, validset, testset, output, modeltype):

    print(INFO)
    device = get_device()

    model_file = Path(f"{output}.pt")
    cm_file = Path(f"{output}.cm")
    training_metrics_file = Path(f"{output}.metrics.pkl")
    label_encoder_file = Path(f"{output}-le.pkl")

    df_train = pd.read_csv(trainset)
    df_valid = pd.read_csv(validset)
    df_test  = pd.read_csv(testset)

    df_train["fps"] = df_train.fps.apply(
        #lambda x: np.array(list(map(int, x.split(";"))))
        lambda x: np.array(list(map(float, x.split(";"))))
    )
    df_valid["fps"] = df_valid.fps.apply(
        #lambda x: np.array(list(map(int, x.split(";"))))
        lambda x: np.array(list(map(float, x.split(";"))))
    )
    df_test["fps"] = df_test.fps.apply(
        #lambda x: np.array(list(map(int, x.split(";")))))
        lambda x: np.array(list(map(float, x.split(";")))))

    df_train.label = df_train.label.astype(str)
    df_valid.label = df_valid.label.astype(str)
    df_test.label  = df_test.label.astype(str)

    # Encode categorical EC class labels
    le = LabelEncoder()
    le.fit(pd.concat([df_train.label, df_valid.label, df_test.label]))

    with open(label_encoder_file, "wb") as f:
        pickle.dump(le, f)
        
    df_train["label"] = le.transform(df_train.label)
    df_valid["label"] = le.transform(df_valid.label)
    df_test["label"]  = le.transform(df_test.label)

    data_set_train = ReactionDataset(df_train)
    data_set_valid = ReactionDataset(df_valid)
    data_set_test  = ReactionDataset(df_test)
    
    # This just shows the accuracy for different KNN neighbour values,
    # it does not modify the default used in get_nearest_neighbours_prediction.
    if OPTIMIZE_KNN:
        optimizeNumNeighbors(df_train, df_test)

    if modeltype=='knn':
        X_train, y_train = np.array(df_train['fps'].to_list()), df_train['label']
        X_test, y_test = np.array(df_test['fps'].to_list()), df_test['label']
        y_pred = get_nearest_neighbours_prediction(X_train, y_train, X_test)
        getSklearnAccuracy(y_test, y_pred)
        confusionMatrixCalculate(y_test.tolist(), y_pred)
        df_test['predicted EC'] = le.inverse_transform(y_pred)

    elif modeltype=='mlp':
        n_classes = len(le.classes_)
        input_dim = len(df_train["fps"].iloc[0])
        
        model, cm, training_matrics, y_pred = train_test_model(
            data_set_train,
            data_set_valid,
            data_set_test,
            le,
            device,
            input_dim=input_dim,
            hidden_dim=1664,
            output_dim=n_classes,
            epochs=100,
        )

        with open(cm_file, "wb") as f:
            pickle.dump(cm, f)

        with open(training_metrics_file, "wb") as f:
            pickle.dump(training_matrics, f)

        torch.save(model.state_dict(), model_file)

        df_test['predicted EC'] = y_pred

    df_test.drop(axis=1, columns='fps', inplace=True)
    
    csv_file = testset.replace('data/', '').replace('.csv', '') + '_with_prediction_' + modeltype + '.csv'
    df_test.to_csv(csv_file)
    print('\n=> Created CSV file', csv_file)
    print('=> Created model files:', glob.glob(f"{output}*"))
    print('=> Finished')

## Run <a class="anchor" id="run"></a>

In [None]:
main(trainset, validset, testset, output, modeltype)