# Case Study

**Note**: This COLAB notebook is for the case study of REINDEER software.

**Reference**: Paper is under production.

**Package Version** (at the time of creating this notebook - 4/7/2024):

* umpy: 1.25.2

* pandas: 2.0.3

* xgboost: 2.0.3

* scipy: 1.11.4

* scikit-learn: 1.2.2

* optuna: 3.6.1

* matplotlib: 3.7.1

In [None]:
# @title Install dependencies
import os

print("Packages are installing...")
os.system("pip install -q optuna")
print("Optuna is installed.")

In [None]:
#@title Clone REINDEER Github
!git clone --quiet https://github.com/miladrayka/reindeer_software.git
print("Files are downloaded.")

In [None]:
# @title Load packages
import os
import json
from collections import defaultdict
from functools import partial


import scipy
import optuna
import sklearn
import numpy as np
import pandas as pd
import xgboost as xgb
import matplotlib.pyplot as plt
import matplotlib

print("Package version:")
print(f"numpy: {np.__version__}")
print(f"pandas: {pd.__version__}")
print(f"xgboost: {xgb.__version__}")
print(f"scipy: {scipy.__version__}")
print(f"scikit-learn: {sklearn.__version__}")
print(f"optuna: {optuna.__version__}")
print(f"matplotlib: {matplotlib.__version__}")

#1- Utility functions

In [None]:
def preprocessing(
    fv_path: str,
    ba_path: str,
    var_threshold: float = 0.01,
    corr_threshold: float = 0.95,
    val_size: int = 0,
) -> dict:
    """Preprocess features input pd.DataFrame and drop static, quasi-static and
    correlated features. Return standardized and processed data in
    pd.DataFrame, mean and std of data.

    Parameters
    ----------
    fv_path : str
        Path to a feature vector csv file.
    ba_path: str
        Path to the binding affinity csv file.
    var_threshold : float, optional
        Variance threshold. Features below this
        threshold are discarded, by default 0.01
    corr_threshold : float, optional
        Correlated features are discarded, by default 0.95
    val_size: int, optional
        Sample a random validation set, by default 0

    Returns
    -------
    set_split: dict
        Return a dictionary containing numpy arrays of train and test (validation) sets.
    """

    fv_df = pd.read_csv(fv_path, index_col="pdbid")
    ba_df = pd.read_csv(ba_path, index_col="pdbid").reindex(fv_df.index)

    x_train_df = fv_df.loc[ba_df["train"], :]
    x_test_df = fv_df.loc[ba_df["test"], :]
    y_train_df = ba_df.loc[ba_df["train"], :]
    y_test_df = ba_df.loc[ba_df["test"], :]

    x_train_df = x_train_df.loc[(x_train_df != 0).any(axis=1)]
    x_test_df = x_test_df.loc[(x_test_df != 0).any(axis=1)]
    y_train = y_train_df.loc[x_train_df.index, "binding_affinity"]
    y_test = y_test_df.loc[x_test_df.index, "binding_affinity"]

    x_train_df = x_train_df.loc[:, x_train_df.var(axis=0) > var_threshold]
    corr_matrix = x_train_df.corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    to_drop = [
        column for column in upper.columns if any(upper[column] > corr_threshold)
    ]
    x_train_df = x_train_df.drop(to_drop, axis=1)

    mean = x_train_df.mean()
    std = x_train_df.std()

    x_train_df = (x_train_df - mean) / std
    x_test_df = (x_test_df.loc[:, x_train_df.columns] - mean) / std

    train_test_split = {
        "x_train": x_train_df.to_numpy(),
        "x_test": x_test_df.to_numpy(),
        "y_train": y_train.to_numpy(),
        "y_test": y_test.to_numpy(),
    }

    if val_size:
        np.random.seed(42)
        indexes = np.random.choice(
            range(train_test_split["x_train"].shape[0]), val_size, replace=False
        )
        mask = np.zeros(train_test_split["x_train"].shape[0], dtype=bool)
        mask[indexes] = True

        train_test_split["x_val"] = train_test_split["x_train"][mask]
        train_test_split["y_val"] = train_test_split["y_train"][mask]
        train_test_split["x_train"] = train_test_split["x_train"][~mask]
        train_test_split["y_train"] = train_test_split["y_train"][~mask]

    print("Split shapes:")
    for key, value in train_test_split.items():
        print(f"{key}: {value.shape}")

    return train_test_split

In [None]:
def config_file(params: str, filename: str) -> None:
    """Save hyperparameters to a .json file.

    Parameters
    ----------
    params : dict
        hyperparameters of xgboost.XGBREGRESSOR algorithm.
    filename : str
        Name of the json file.
    """

    with open(filename, "w") as file:
        json.dump(params, file)

In [None]:
class Model(object):
    """A class for training, validating, and testing of a scoring function.
    xgboost.XGBREGRESSOR is used as a learner.
    """

    def __init__(self, data_split: dict, params: dict, num_regressor: int) -> None:
        """
        Parameters
        ----------
        data_split : dict
            Train, validation, and test sets split from preprocessing function.
        params : dict
            Hyperparameters of xgboost.XGBREGRESSOR.
        num_regressor : int
            Number of desired regressors to train.
        """

        self.data_split = data_split
        self.random_numbers = np.random.randint(0, 10000, num_regressor)
        self.regressors = [
            xgb.XGBRegressor(random_state=self.random_numbers[num], **params)
            for num in range(num_regressor)
        ]
        self.results = {}

    def train(self) -> None:
        """Train a xgboost.XGBREGRESSOR."""
        for reg in self.regressors:
            reg.fit(self.data_split["x_train"], self.data_split["y_train"])

    def eval(self, mode: str) -> None:
        """Evaluate trained models.

        Parameters
        ----------
        mode : str
          Evaluate on validation (val) or test sets. Only val and test strings
          are allowed.
        """

        assert mode in ["val", "test"], "Mode should be val or test"

        self.results[mode] = [
            reg.predict(self.data_split[f"x_{mode}"]) for reg in self.regressors
        ]

    def metrics(self, mode: str) -> dict:
        """Calculate performance of a trained model on validation and test
        sets by root-mean-square error (RMSE) and Pearson's correlation
        coefficient (RP).

        Parameters
        ----------
        mode : str
          Calculate performance on validation (val) or test sets. Only val and
          test strings are allowed.

        Returns
        -------
        dict
            Calculated RMSE and RP for validation or test sets.
        """

        assert mode in ["val", "test"], "Mode should be val or test"

        metric_results = defaultdict(list)

        for result in self.results[mode]:
            rmse_value = sklearn.metrics.mean_squared_error(
                result, self.data_split[f"y_{mode}"], squared=False
            )
            rp_value = scipy.stats.pearsonr(result, self.data_split[f"y_{mode}"])[0]
            metric_results["rmse"].append(rmse_value)
            metric_results["rp"].append(rp_value)

        return metric_results

    def save_model(self, pathfile: str, name: str) -> None:
        """Save the trained models.

        Parameters
        ----------
        pathfile : str
            A path to save the models.
        name : str
            A name to save models, e.g., xgb_oic.
        """

        for num, reg in enumerate(self.regressors, start=1):
            reg.save_model(os.path.join(pathfile, f"{name}_{num}.json"))

In [None]:
def objective(trial: object, data_split: dict) -> float:
    """Objective for hyperparameters optimization by Optuna.

    Parameters
    ----------
    trial : object
        A parameter for Optuna package.
    data_split : dict
        Train, validation, and test sets split from preprocessing function.

    Returns
    -------
    float
        Pearson's correlation coefficient (RP).
    """
    params = {
        "device": "cpu",#cuda:0 for GPU.
        "n_jobs": -1,
        "tree_method": "hist",
        "objective": "reg:squarederror",
        "verbosity": 0,
        "n_estimators": trial.suggest_int("n_estimators", 100, 20000, step=100),
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.1, log=True),
        "max_depth": trial.suggest_int("max_depth", 1, 10),
        "subsample": trial.suggest_float("subsample", 0.05, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.05, 1.0),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 20),
    }

    xgb_reg = Model(data_split, params, 1)
    xgb_reg.train()

    xgb_reg.eval("val")
    val_metrics = xgb_reg.metrics("val")
    rp_value = val_metrics["rp"][0]

    return rp_value

In [None]:
def hp_optimize(num_trials: int, data_split: dict) -> tuple:
    """Optimize hyperparameters by Optuna.

    Parameters
    ----------
    num_trials : int
        Number of trials to find hyperparameters.
    data_split : dict
        Train, validation, and test sets split from preprocessing function.

    Returns
    -------
    tuple
        Tuple of validation performance and best hyperparameters.
    """

    objective_partial = partial(objective, data_split=data_split)
    study = optuna.create_study(direction="maximize")
    study.optimize(objective_partial, n_trials=num_trials)

    return (study.best_value, study.best_params)

In [None]:
def save_metrics(val_metrics: dict, test_metrics: dict, filename: str) -> None:
    """Save validation and test metrics to a .csv file.

    Parameters
    ----------
    val_metrics : dict
        Metrics for a validation set.
    test_metrics: dict
        Metrics for a test set.
    filename : str
        Name of the csv file.
    """

    df1 = pd.DataFrame(val_metrics).rename({"rmse": "val_rmse", "rp": "val_rp"}, axis=1)
    df2 = pd.DataFrame(test_metrics).rename(
        {"rmse": "test_rmse", "rp": "test_rp"}, axis=1
    )
    pd.concat([df1, df2], axis=1).to_csv(f"{filename}.csv", index=False)

In [None]:
def comparison_plot(
    heights: list,
    labels: list,
    colors: list,
    title: str,

    y_label: str,

    y_lim: float,

    filename: str,
) -> None:

    """Plot a comparison bar chart.


    Parameters
    ----------
    heights: list

        A list of values of a metric for test or validation set.
    labels: list

        A list of labels for x-axis.
    colors: list

        A list of colors.
    title: str

        Title of diagram.

    y_label: str

        Label for y-axis,e.g., RMSE or RP.

    y_lim: float

      For RMSE set it 2.0 for RP set is 1.0.
    filename: str

      Name of file to save diagram in .png format with doi=600.

    """


    plt.figure(figsize=(8, 6))

    plt.bar(labels, heights, color=colors, width=0.6, edgecolor="black", linewidth=1.5)

    plt.ylim(0, y_lim)

    plt.xlabel("Scoring Function", fontsize=12)

    plt.ylabel(y_label, fontsize=12)

    plt.grid(True, color="grey", linestyle="--", linewidth=0.5)

    plt.title(title)

    plt.savefig(f"{filename}.png", dpi=600)

    plt.show()

#2- Scoring functions

In [None]:
# @title Assigning paths of all generated features, and bindning_affinity data.
oic_path = "/content/reindeer_software/files/casestudy_oic_fv.csv"  # @param
dwic_path = "/content/reindeer_software/files/casestudy_dwic_fv.csv"  # @param
ecif_path = "/content/reindeer_software/files/casestudy_ecif_fv.csv"  # @param
binding_affinity_path = (
    "/content/reindeer_software/files/train_test_labels.csv"  # @param
)

##2-1- OIC

Making scoring function using OIC representations.

In [None]:
oic_data_split = preprocessing(
    fv_path=oic_path,

    ba_path=binding_affinity_path,

    var_threshold=0.01,

    corr_threshold=0.95,

    val_size=300,
)

In [None]:
_, best_params = hp_optimize(50, oic_data_split)
config_file(best_params, "/content/xgb_oic_hp.json")

In [None]:
xgb_models = Model(oic_data_split, best_params, 5)
xgb_models.train()

xgb_models.eval("val")
val_metrics = xgb_models.metrics("val")
xgb_models.eval("test")
test_metrics = xgb_models.metrics("test")

xgb_models.save_model("/content/", "xgb_oic")
save_metrics(val_metrics, test_metrics, "/content/oic_metrics")

##2-2- DWIC

Making scoring function using DWIC representations.

In [None]:
dwic_data_split = preprocessing(
    fv_path=dwic_path,

    ba_path=binding_affinity_path,

    var_threshold=0.01,

    corr_threshold=0.95,

    val_size=300,
)

In [None]:
_, best_params = hp_optimize(50, dwic_data_split)
config_file(best_params, "/content/xgb_dwic_hp.json")

In [None]:
xgb_models = Model(dwic_data_split, best_params, 5)
xgb_models.train()

xgb_models.eval("val")
val_metrics = xgb_models.metrics("val")
xgb_models.eval("test")
test_metrics = xgb_models.metrics("test")

xgb_models.save_model("/content/", "xgb_dwic")
save_metrics(val_metrics, test_metrics, "/content/dwic_metrics")

##2-3- ECIF

Making scoring function using ECIF representations.

In [None]:
ecif_data_split = preprocessing(
    fv_path=ecif_path,

    ba_path=binding_affinity_path,

    var_threshold=0.01,

    corr_threshold=0.95,

    val_size=300,
)

In [None]:
_, best_params = hp_optimize(50, ecif_data_split)
config_file(best_params, "/content/xgb_ecif_hp.json")

In [None]:
xgb_models = Model(ecif_data_split, best_params, 5)
xgb_models.train()

xgb_models.eval("val")
val_metrics = xgb_models.metrics("val")
xgb_models.eval("test")
test_metrics = xgb_models.metrics("test")

xgb_models.save_model("/content/", "xgb_ecif")
save_metrics(val_metrics, test_metrics, "/content/ecif_metrics")

#3- Comparison

In [None]:
path = "/content"

df_list = []

for method in ["oic", "dwic", "ecif"]:
    df = pd.read_csv(f"{path}/{method}_metrics.csv")
    df.index = [f"{method}_{i}" for i in df.index]
    df.loc[f"{method}_mean", :] = df.mean()
    df.loc[f"{method}_std", :] = df.std()
    df = df.round(4)
    df_list.append(df)

pd.concat(df_list, axis=0).to_csv(f"{path}/all_results.csv", index=True)

In [None]:
colors = [
    "cyan",
    "darkturquoise",
    "cadetblue",
    "deepskyblue",
    "steelblue",
    "dodgerblue",
    "lighteagreen",
    "teal",
]

labels = ["OIC", "DWIC", "ECIF"]


results_df = pd.read_csv(f"{path}/all_results.csv", index_col=0)

mean_results_df = results_df.iloc[5:-1:7, :]


valid_rps = mean_results_df.loc[:, "val_rp"].values

valid_rmses = mean_results_df.loc[:, "val_rmse"].values
test_rps = mean_results_df.loc[:, "test_rp"].values
test_rmses = mean_results_df.loc[:, "test_rmse"].values

In [None]:
comparison_plot(
    valid_rps,
    labels,
    colors[:4],
    "Performance on Validation Set",
    "$R_{P}$",
    1.0,
    "valid_results_rp",
)

In [None]:
comparison_plot(
    valid_rmses,
    labels,
    colors[:4],
    "Performance on Validation Set",
    "RMSE",
    2.0,
    "valid_results_rmse",
)

In [None]:
comparison_plot(
    test_rps,
    labels,
    colors[:4],
    "Performance on Validation Set",
    "$R_{P}$",
    1.0,
    "test_results_rp",
)

In [None]:
comparison_plot(
    test_rmses,
    labels,
    colors[:4],
    "Performance on Validation Set",
    "RMSE",
    2.0,
    "test_results_rmse",
)