In [1]:
%load_ext lab_black

## Setup

### Setup Intel Extensions

In [2]:
from sklearnex import patch_sklearn

In [3]:
patch_sklearn()

Intel(R) Extension for Scikit-learn* enabled (https://github.com/intel/scikit-learn-intelex)


### Configure on-machine parallelism

Using dask for this seems to lead to decreased performance, as even small pieces of work are scattered across the full cluster.
So we just use the recommended loky backend.
I am unsure if the backend has to be set during model creation, or just while fitting.

In [4]:
# import joblib

In [5]:
# joblib.parallel_config("loky", n_jobs=-1)

### Predictable randomness

In [6]:
import numpy as np

seed = 0


def rng():
    return np.random.RandomState(seed)

### Shared parameters

In [7]:
param_scalers = [None]

### Preprocessing and parameter search

In [8]:
def rdkit_ecfp(mol):
    from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator
    import numpy as np

    gen = GetMorganGenerator(radius=2, fpSize=1024)
    fp = gen.GetFingerprint(mol)
    return np.int64(fp)

In [9]:
from sklearn.model_selection import KFold
from sklearn.pipeline import Pipeline as PipelineBase
from lib.failable import ElementFunctionTransformer, DropErrors

# from dask_ml.model_selection import HyperbandSearchCV
# from dask_ml.wrappers import Incremental

from sklearn.model_selection import RandomizedSearchCV


class Pipeline(PipelineBase):
    def partial_fit(self, X, y=None):
        args = [X, y]
        for name, est in self.steps:
            if est is None:
                continue
            est.partial_fit(*args)
            X_transformed = est.transform(args[0])
            args = [X_transformed, y]
        return self


def make_parameter_search(model, cv_params):
    # if all([])
    return RandomizedSearchCV(
        model,
        cv_params,
        scoring="balanced_accuracy",
        refit=True,
        cv=KFold(n_splits=10, shuffle=True, random_state=seed),
        verbose=3,
        error_score="raise",
        n_jobs=-1,
    )
    # return HyperbandSearchCV(
    #     Incremental(model, random_state=rng()),
    #     {f"estimator__{key}": value for key, value in cv_params.items()},
    #     scoring="balanced_accuracy",
    #     random_state=seed,
    # )

### Cross validation

In [10]:
from sklearn.model_selection import cross_validate
from sklearn.metrics import make_scorer, recall_score, precision_score


def cross_validation(model, X, y):
    return cross_validate(
        model,
        X,
        y,
        scoring={
            "accuracy": "accuracy",
            "sensitivity": "recall",
            "specificity": make_scorer(recall_score, pos_label=0),
            "balanced_accuracy": "balanced_accuracy",
            "f1": "f1",
            "roc_auc": "roc_auc",
            "precision": make_scorer(precision_score, zero_division=0),
            "matthews_corrcoef": "matthews_corrcoef",
        },
        cv=KFold(n_splits=10, shuffle=True, random_state=seed),
        n_jobs=-1,
    )

## Define Models

In [11]:
# from sklearn.pipeline import Pipeline

In [12]:
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import FunctionTransformer

In [13]:
clf_logr = make_parameter_search(
    Pipeline(
        steps=[
            ("scaler", None),
            ("pca", PCA(n_components=8, random_state=rng())),
            (
                "logr",
                LogisticRegression(solver="saga", max_iter=10000, random_state=rng()),
            ),
        ]
    ),
    {
        "scaler": param_scalers,
        "logr__penalty": ["elasticnet"],
        "logr__C": [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000],
        "logr__l1_ratio": [0, 0.25, 0.5, 0.75, 1],
    },
)

NameError: name 'RandomizedSearchCV' is not defined

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
clf_rf = make_parameter_search(
    Pipeline(
        steps=[
            ("scaler", None),
            (
                "rf",
                RandomForestClassifier(max_features=1.0, random_state=rng(), n_jobs=-1),
            ),
        ]
    ),
    {
        "scaler": param_scalers,
        "rf__class_weight": ["balanced"],
        "rf__n_estimators": [
            5,
            10,
            25,
            50,
        ],  # , 100, 250
        "rf__max_depth": [2, 4, 8, 16],  # , 32, 64
    },
)

In [None]:
from sklearn.neighbors import KNeighborsClassifier

In [None]:
clf_knn = make_parameter_search(
    Pipeline(steps=[("scaler", None), ("knn", KNeighborsClassifier())]),
    {
        "scaler": param_scalers,
        "knn__n_neighbors": [3, 5, 9, 11, 13, 17, 19],
        "knn__weights": ["uniform", "distance"],
        "knn__metric": ["euclidean", "manhattan"],
    },
)

In [None]:
from sklearn.svm import SVC

In [None]:
clf_svc = make_parameter_search(
    Pipeline(
        steps=[
            ("scaler", None),
            ("svc", SVC(class_weight="balanced", probability=True, random_state=rng())),
        ]
    ),
    {
        "scaler": param_scalers,
        "svc__kernel": ["rbf"],
        "svc__C": [0.1, 1.0, 10.0, 100.0, 1000.0],
        "svc__gamma": [0.0001, 0.001, 0.01, 0.1, 1.0, 10.0],
    },
)

In [None]:
from xgboost import XGBClassifier

In [None]:
clf_xgb = make_parameter_search(
    Pipeline(
        steps=[
            ("scaler", None),
            ("xgb", XGBClassifier(random_state=rng(), n_jobs=-1)),
        ]
    ),
    {
        "scaler": param_scalers,
        "xgb__scale_pos_weight": [0.1, 0.5, 1, 5, 10],
        "xgb__objective": [None, "binary:logistic"],
        "xgb__n_estimators": [5, 10, 25, 50, 100, 250],
        "xgb__max_depth": [2, 4, 8, 16, 32, 64],
    },
)

In [None]:
models = [clf_logr, clf_rf, clf_knn, clf_svc, clf_xgb]
targets = [
    "BCRP",
    "BCRP-S",
    "BSEP",
    "MATE1",
    "MDR1",
    "MDR1-S",
    "MRP2-S",
    "MRP3",
    "MRP3-S",
    "OATP1B1",
    "OATP1B3",
    "OCT1",
    "OCT2",
]

## Training

### Training routine

In [None]:
from rdkit.Chem.PandasTools import LoadSDF
from dask.distributed import worker_client
import numpy as np
from molvs import Standardizer, MolVSError

In [None]:
standardizer = Standardizer()


def load_data(target):
    data = LoadSDF(
        f"ba_assets/data_for_models/data_threshold_all_filled_0.5_all_masters/training_chembl+manual/{target}.sdf"
    )

    data["Descriptors"] = Pipeline(
        [
            (
                "standardize",
                ElementFunctionTransformer(
                    standardizer.standardize, catch_exception=MolVSError
                ),
            ),
            ("drop_errors", DropErrors()),
            (
                "extract_features",
                ElementFunctionTransformer(rdkit_ecfp),
            ),
        ]
    ).transform(data.ROMol)

    return data

In [None]:
def train_model(model, target):
    data = load_data(target)
    descs = np.stack(data.Descriptors)
    #
    with Client(cluster) as client:
        with joblib.parallel_backend("loky", njobs=-1):
            # Train
            model.fit(descs, data.Classification.astype(int))
    #
    ## Cross validate
    # cross = cross_validation(model, descs, data.Classification.astype(int))

    model = None
    cross = None

    return model, cross

### Configure task runner

In [None]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import atexit
import shutil

In [None]:
if "cluster" in globals():
    cluster.close()
shutil.rmtree("logs")

cluster = SLURMCluster(
    cores=1,
    job_cpu=16,
    memory="128 GB",
    scheduler_options={"dashboard_address": ":8787"},
    log_directory="logs",
    # worker_extra_args=["--resources", "singleton=1"],
)

atexit.register(lambda: cluster.close())

In [None]:
cluster.scale(len(models) * len(targets))

In [None]:
client = Client(cluster)

### Start jobs

In [None]:
data = load_data("BCRP")

In [None]:
np.stack(data.Classification)

In [None]:
models[1].fit(np.stack(data.Descriptors), np.stack(data.Classification.astype(int)))

In [None]:
jobs = [
    client.submit(
        train_model,
        model,
        target,
        key=f"train-{model.estimator.steps[-1][0]}-{target}-1",
        # resources={"singleton": 1},
    )
    for model in models
    for target in targets
]

In [None]:
jobs[0].result()

In [None]:
results = [job.result() for job in jobs]

In [None]:
results