In [1]:
# Mute warining
import warnings
warnings.filterwarnings("ignore")
# from sklearn.exceptions import ConvergenceWarning
# warnings.filterwarnings("ignore", category=ConvergenceWarning)

In [2]:
fileNameIn = './results/data_input_4_ModelBuilding.csv'
colName_mid = 'Compound Name'
colName_split = 'Split'
colName_y = 'ADME MDCK(WT) Permeability;Mean;A to B Papp (10^-6 cm/s);(Num)'
sep = ','

import os
folderPathOut = './results'    ## './results'
os.makedirs(folderPathOut, exist_ok=True)    


import pandas as pd
dataTable_raw = pd.read_csv(fileNameIn, sep=sep)
colName_X = [col for col in dataTable_raw.columns if col not in [colName_mid, colName_split, colName_y]]

## training
dataTable_train = dataTable_raw[dataTable_raw[colName_split]=='Training']
X_train, y_train = dataTable_train[colName_X], dataTable_train[colName_y]
print(f"\tTraining_X: {X_train.shape}; Training_y: {y_train.shape}")

## validation
dataTable_val = dataTable_raw[dataTable_raw[colName_split]=='Validation']
X_val, y_val = dataTable_val[colName_X], dataTable_val[colName_y]
print(f"\tValidation_X: {X_val.shape}; Validation_y: {y_val.shape}")

## test
dataTable_test = dataTable_raw[dataTable_raw[colName_split]=='Test']
X_test, y_test = dataTable_test[colName_X], dataTable_test[colName_y]
print(f"\tTest_X: {X_test.shape}; Test_y: {y_test.shape}")

	Training_X: (24, 23); Training_y: (24,)
	Validation_X: (3, 23); Validation_y: (3,)
	Test_X: (3, 23); Test_y: (3,)


In [3]:
## <===================== model initiate =====================>
def step_1_model_init(ml_methed, n_jobs=-1, rng=666666):
    ml_methed = ml_methed.lower()
    ## -------------------- random forest --------------------
    if ml_methed in ['rf', 'random forest', 'randomforest']:
        from sklearn.ensemble import RandomForestRegressor
        sk_model = RandomForestRegressor(random_state=rng, oob_score=True, n_jobs=n_jobs)
        search_space = {'n_estimators': [50, 200, 500], 'max_depth': [2, 4, 6], 'max_features': ['auto', 'sqrt', 'log2'], 'min_samples_leaf': [5, 10, 25, 50], 'min_samples_split': [2, 5, 8, 10]}

    ## -------------------- SVM --------------------
    elif ml_methed in ['svm', 'support vector machine', 'supportvectormachine']:
        from sklearn.svm import SVR
        sk_model = SVR(kernel="rbf", gamma=0.1)
        search_space = {'kernel': ['poly', 'rbf', 'sigmoid'], 'gamma': ['scale', 'auto'], 'C': [0.1, 1, 10, 100]}

    ## -------------------- MLP --------------------
    elif ml_methed in ['mlp', 'ann']:
        from sklearn.neural_network import MLPRegressor
        sk_model = MLPRegressor(random_state=rng, max_iter=100, early_stopping=True)
        search_space = {'hidden_layer_sizes': [(128,), (128, 128), (128, 128, 128)], 'activation': ['logistic', 'tanh', 'relu'], 'solver': ['sgd', 'adam'], 'alpha': [0.1, 0.01, 0.001, 0.0001]}

    ## -------------------- KNN --------------------
    elif ml_methed in ['knn', 'k-nn', 'nearest neighbor', 'nearestneighbor']:
        from sklearn.neighbors import KNeighborsRegressor
        sk_model = KNeighborsRegressor(n_neighbors=3, n_jobs=n_jobs)
        search_space = {'n_neighbors': [1, 3, 5, 10]}

    ## -------------------- Linear --------------------
    else:
        if ml_methed != 'linear':
            print(f"Error! no proper ML methods were selected, using Linear method instead")
        from sklearn.linear_model import LinearRegression
        sk_model = LinearRegression(n_jobs=n_jobs)
        search_space = None

    return sk_model, search_space

## <===================== model training =====================>
def _HyperParamSearch(sk_model, X, y, search_space=None, search_method='grid', scoring='neg_mean_absolute_error', nFolds=5, n_jobs=-1):
    print(f"\t\tStart Hyper-Parameter Tunning ...")
    SearchResults = {'best_model': None, 'best_score':None, 'best_param':None}

    # if search_method == 'grid':
    from sklearn.model_selection import GridSearchCV
    optimizer = GridSearchCV(estimator=sk_model, param_grid=search_space, scoring=scoring, cv=nFolds, n_jobs=n_jobs)
    optimizer.fit(X, y)
    ## search results
    SearchResults['optimizer'] = optimizer    ## optimizer.best_estimator_, optimizer.best_score_
    SearchResults['best_param'] = optimizer.best_estimator_.get_params()

    ## export
    print(f"\t\tThe best {scoring}: {SearchResults['best_score']}\n\t\tThe best hp-params: {SearchResults['best_param']}")
    print(f"\t\tComplete Hyper-Parameter Tunning ...")
    return SearchResults

##
def step_2_model_training(sk_model, X, y, logy=False, doHPT=False, search_space=None, scoring='neg_mean_absolute_error', n_jobs=-1):
    import time   
    beginTime = time.time()        
    ## ----------------------------------------------------------------
    import numpy as np
    # X = X.to_numpy()
    y = y.to_numpy().reshape((len(y), ))
    y = np.log10(y) if logy else y

    ## ----- hyper parameter search ----------------
    if doHPT and search_space is not None:
        HPSearchResults = _HyperParamSearch(sk_model, X, y, search_space, search_method='grid', scoring=scoring, nFolds=5, n_jobs=n_jobs)
        sk_model = sk_model.set_params(**HPSearchResults['best_param'])    #optimizer.best_estimator_

    ## ----- fit the model -----
    sk_model.fit(X, y)

    ## ----------------------------------------------------------------        
    print(f"\tThe model training costs time = {(time.time()-beginTime):.2f} s ................")
    return sk_model

## <===================== model predict =====================>
def step_3_make_prediction(sk_model, X, logy=False):
    y_pred = sk_model.predict(X)
    y_pred = 10**y_pred if logy else y_pred
    return y_pred

## <===================== model predict =====================>


In [4]:
##
n_jobs = -1
rng = 666666
logy = True
doHPT = True
scoring = 'neg_mean_absolute_error'

ml_methed = 'rf'
ml_methed_list = ['linear', 'rf', 'svm', 'mlp', 'knn']

model_dict = {'data': {'Training': [X_train, y_train], 'Validation': [X_val, y_val],  'Test': [X_test, y_test]},
              'config': {'rng': rng, 'n_jobs':n_jobs, 'logy': logy, 'doHPT': doHPT}, 
              'model': {},
              'results': None}

##
for ml_methed in ml_methed_list:
    ## training
    sk_model, search_space = step_1_model_init(ml_methed, n_jobs=n_jobs, rng=rng)
    sk_model = step_2_model_training(sk_model, X_train, y_train, logy=logy, doHPT=doHPT, search_space=search_space, scoring=scoring, n_jobs=n_jobs) 
    model_dict['model'][ml_methed] = sk_model

    ## prediction
    col_pred = f"Prediction_{ml_methed}_{colName_y}"
    dataTable_train[col_pred] = step_3_make_prediction(sk_model, X_train, logy=logy)
    dataTable_val[col_pred] = step_3_make_prediction(sk_model, X_val, logy=logy)
    dataTable_test[col_pred] = step_3_make_prediction(sk_model, X_test, logy=logy)

## merge/concact data
dataTable_list = [dataTable_train, dataTable_val, dataTable_test]
dataTable_results = pd.concat(dataTable_list).sort_index(ascending=True)
model_dict['results'] = dataTable_results

	The model training costs time = 0.03 s ................
		Start Hyper-Parameter Tunning ...
		The best neg_mean_absolute_error: None
		The best hp-params: {'bootstrap': True, 'ccp_alpha': 0.0, 'criterion': 'squared_error', 'max_depth': 2, 'max_features': 'sqrt', 'max_leaf_nodes': None, 'max_samples': None, 'min_impurity_decrease': 0.0, 'min_samples_leaf': 5, 'min_samples_split': 2, 'min_weight_fraction_leaf': 0.0, 'monotonic_cst': None, 'n_estimators': 500, 'n_jobs': -1, 'oob_score': True, 'random_state': 666666, 'verbose': 0, 'warm_start': False}
		Complete Hyper-Parameter Tunning ...
	The model training costs time = 28.76 s ................
		Start Hyper-Parameter Tunning ...
		The best neg_mean_absolute_error: None
		The best hp-params: {'C': 10, 'cache_size': 200, 'coef0': 0.0, 'degree': 3, 'epsilon': 0.1, 'gamma': 'scale', 'kernel': 'rbf', 'max_iter': -1, 'shrinking': True, 'tol': 0.001, 'verbose': False}
		Complete Hyper-Parameter Tunning ...
	The model training costs time = 0.2



		The best neg_mean_absolute_error: None
		The best hp-params: {'activation': 'tanh', 'alpha': 0.1, 'batch_size': 'auto', 'beta_1': 0.9, 'beta_2': 0.999, 'early_stopping': True, 'epsilon': 1e-08, 'hidden_layer_sizes': (128, 128), 'learning_rate': 'constant', 'learning_rate_init': 0.001, 'max_fun': 15000, 'max_iter': 100, 'momentum': 0.9, 'n_iter_no_change': 10, 'nesterovs_momentum': True, 'power_t': 0.5, 'random_state': 666666, 'shuffle': True, 'solver': 'adam', 'tol': 0.0001, 'validation_fraction': 0.1, 'verbose': False, 'warm_start': False}
		Complete Hyper-Parameter Tunning ...
	The model training costs time = 0.89 s ................
		Start Hyper-Parameter Tunning ...
		The best neg_mean_absolute_error: None
		The best hp-params: {'algorithm': 'auto', 'leaf_size': 30, 'metric': 'minkowski', 'metric_params': None, 'n_jobs': -1, 'n_neighbors': 1, 'p': 2, 'weights': 'uniform'}
		Complete Hyper-Parameter Tunning ...
	The model training costs time = 0.03 s ................


In [4]:
## save results
fileNameOut_pred = f"{folderPathOut}/prediction_results.csv"
dataTable_results.to_csv(fileNameOut_pred, index=False)

## save model
import pickle
fileNameOut_model = f"{folderPathOut}/ml_models.pickle"
with open(fileNameOut_model, 'rb') as ofh_models:
    pickle.dump(model_dict, ofh_models)

In [None]:
dataTable_train[f"Predict_{colName_y}"] = step_3_make_prediction(sk_model, X_train, logy=logy)
dataTable_train

In [None]:
dataTable_val[f"Predict_{colName_y}"] = step_3_make_prediction(sk_model, X_val, logy=logy)
dataTable_val

In [None]:
dataTable_test[f"Predict_{colName_y}"] = step_3_make_prediction(sk_model, X_test, logy=logy)
dataTable_test

In [None]:
dataTable_val

In [None]:
    def _CalcScores(self, y_pred, y_true, printLog=True):   
        dataDict_result = {}
        try:
            y_pred = y_pred.reshape((len(y_pred), ))
            y_true = y_true.reshape((len(y_true), ))
        except Exception as e:
            print(f"\tError! Cannot reformatting the y_pred and y_true when calculating the statistics")
        else:
            ## calculate the mean absolute error using Scikit learn
            try:
                dataDict_result['MAE'] = mean_absolute_error(y_true, y_pred)
            except:
                dataDict_result['MAE'] = np.nan
            
            ## calculate the PearsonCorrelationCoefficient
            try:
                pr_np = np.corrcoef(y_pred, y_true)[1, 0]
                dataDict_result['Pearson_R2'] = pr_np * pr_np
            except:
                dataDict_result['Pearson_R2'] = np.nan

            ## calculate the rank-order correlation (Spearman's rho)
            try:
                sr_sp, sp_sp = spearmanr(y_pred, y_true)[0], spearmanr(y_pred, y_true)[1]
                dataDict_result['Spearman_R2'] = sr_sp * sr_sp
            except:
                dataDict_result['Spearman_R2'], sp_sp = np.nan, np.nan
                        
            ## calculate the # Kendall's tau
            try:
                kr_sp, kp_sp = kendalltau(y_pred, y_true)[0] , kendalltau(y_pred, y_true)[1]
                dataDict_result['KendallTau_R2'] = kr_sp * kr_sp
            except:
                dataDict_result['KendallTau_R2'], kp_sp = np.nan, np.nan
             
            ## print out the results
            if printLog:
                print(f"\t\tData shape: y_pred {y_pred.shape}; y_true {y_true.shape}")
                print(f"\t\tMean absolute error: {dataDict_result['MAE']:.2f}")
                print(f"\t\tPearson-R2: {dataDict_result['Pearson_R2']:.2f}")
                print(f"\t\tSpearman-R2: {dataDict_result['Spearman_R2']:.2f} (p={sp_sp:.2f})")
                print(f"\t\tKendall-R2: {dataDict_result['KendallTau_R2']:.2f} (p={kp_sp:.2f})")
        return dataDict_result



def step_3_model_evaluating(sk_model, X, y, logy=False):
    y_pred = sk_model.predict(X)
    y_pred = float(10**y_pred) if logy else y_pred


    return 1


In [None]:
def model_predict(sk_model, X, logy=False):
    y_pred = sk_model.predict(X)
    y_pred = float(10**y_pred) if logy else y_pred
    return y_pred

In [None]:
feature_scoring = pd.read_csv('./results/feature_scoring_merged.csv')
feature_scoring

In [None]:
import pandas as pd
dataTable_pred = pd.read_csv('./Data/DataView_MDCK_MDR1__Permeability_1__export_top30.csv')
print(dataTable_pred.shape)

from DescGen import desc_calculator_chemaxon, calc_desc_from_table
cx_version, cx_desc = 'V22', 'all'
calculator_cx = desc_calculator_chemaxon(version=cx_version, desc_list=cx_desc)
result_dict_cx= calc_desc_from_table(dataTable_pred, colName_mid='Compound Name', colName_smi='Smiles', desc_calculator=calculator_cx)
dataTable_pred_prop = pd.DataFrame.from_dict(result_dict_cx).T
dataTable_pred_prop

In [None]:
class Regression_Model(object):
    ## <===================== model initiation =====================>
    def __init__(self,  myScikitModel=None, modelName='Regression_Model', log_y=True, rng=666666, n_jobs=-1):
        assert myScikitModel is not None, f"\tWarning! Please define an initiated RDKit ML model"
        self._name = modelName
        self._rng = rng
        self._n_jobs = n_jobs
        self.model = myScikitModel
        self.log_y = log_y
        self.HPT_Results = {}
        self.predictions = None
        self.performance = {}
        self.plots = {}

In [None]:
'''
This is the class of custom ML regression/classification models based on Scikit-learn. 

'''
import warnings
warnings.filterwarnings("ignore")

import time
import copy
import sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

##
from scipy.stats import linregress, t, pearsonr, spearmanr, kendalltau

# from skopt import BayesSearchCV
from sklearn.metrics import mean_absolute_error, roc_auc_score, auc, roc_curve, accuracy_score, RocCurveDisplay
from sklearn.model_selection import GridSearchCV

## custom modules


    def PostProcess_y(self, dataTable_y):
        assert len(dataTable_y.columns) == 1, f"Error! The <dataTable_y> has incorrect {len(y.columns)} columns: {y.columns}"
        colName_raw = dataTable_y.columns[0]
        y = copy.deepcopy(dataTable_y)

        if self.transformType=='log10':
            y['y_postprocess'] = y[colName_raw].apply(lambda x: 10**(x))
        
        elif self.transformType=='One-hot':
            y['y_postprocess'] = y[colName_raw].apply(lambda x: dataDict_b2v(str(x)))
        else:
            y['y_postprocess'] = y[colName_raw].apply(lambda x: x)
        return y
    ## ============================================================



#########################################################################################
############################### Classification model ####################################
#########################################################################################
class Classification_Model(object):
    ## <===================== model initiation =====================>
    def __init__(self, myScikitModel=None, modelName='Classification_Model', rng=666666, n_jobs=-1):
        assert myScikitModel is not None, f"\tWarning! Please define an initiated RDKit ML model"
        self._name = modelName
        self._rng = rng
        self._n_jobs = n_jobs
        self.model = myScikitModel
        self.HPT_Results = {}
        self.predictions = None
        self.performance = {}
        self.plots = {}
        self.best_threshold = 0.5
        self.class_label = {0: "0", 1: "0"}

    ## <===================== model training =====================>
    def Train(self, X, y, printLog=True, HPT=False, search_space=None):
        ## count time
        beginTime = time.time()
        ## ----------------------------------------------------------------
        ## ------------ hyper parameter search ------------
        if HPT:
            self._HyperParamSearch(X, y, search_space=search_space, printLog=printLog)
        
        ## ------------ fit the model ------------
        self.model.fit(X, y)
       
        ## ----------------------------------------------------------------
        print(f"\tModel construction costs time = {(time.time()-beginTime):.2f} s ................")
        return None

    ## <===================== model evaluation =====================>
    def MakePrediction(self, X):
        y_pred_prob = self.model.predict_proba(X)[:, 1]
        y_pred = np.where(y_pred_prob >= self.best_threshold, 1, 0)
        return y_pred, y_pred_prob
    
    def Evaluate(self, X, y, ds_label='TBD', estCutoff=False, printLog=True, plotResult=False):
        ## make prediction
        # y_pred = self.model.predict(X)    #####################
        _, y_pred_prob = self.model.predict_proba(X)[:, 1]
        
        ## calcualte statistics
        print(f"\tEvaluation results of the {ds_label} dataset:")
        self.performance[ds_label] = self._CalcScores(y_pred_prob=y_pred_prob, y_true=y.to_numpy(), estTrsd=estCutoff, printLog=printLog)

        ## save prediction
        df_predictions = copy.deepcopy(y)
        df_predictions['Experiment'] = df_predictions[y.columns[0]]
        df_predictions['DataSet'] = ds_label
        df_predictions['Prob_1'] = y_pred_prob
        df_predictions['Prediction'] = df_predictions['Prob_1'].apply(lambda x: self._Pred_Class(x))
        self.predictions = pd.concat([self.predictions, df_predictions]) if self.predictions is not None else df_predictions
        
        ## plotting
        if plotResult:
            self.plots[ds_label] = self._Plot_ROCAUC(ds_label)
        return None

    ## <===================== HPTunning =====================>
    def _HyperParamSearch(self, X, y, search_space=None, search_method='grid', scoring='roc_auc', nFolds=5, printLog=True):
        ## count time
        beginTime = time.time()
        ## --------------------------------
        print(f"\tStart Hyper-Parameter Tunning ...")
        SearchResults = {'best_model': None, 'best_score':None, 'best_param':None}
        
        ##
        if search_method == 'grid':
            optimizer = GridSearchCV(estimator=self.model, param_grid=search_space, scoring=scoring, cv=nFolds, n_jobs=self._n_jobs)
        elif search_method =='Bayes':
            optimizer = GridSearchCV(estimator=self.model, param_grid=search_space, scoring=scoring, cv=nFolds, n_jobs=self._n_jobs)
        else:
            optimizer = GridSearchCV(estimator=self.model, param_grid=search_space, scoring=scoring, cv=nFolds, n_jobs=self._n_jobs)

        ## fit the Optimizer to the Data
        y_reshaped = y.to_numpy().reshape((len(y), ))
        optimizer.fit(X, y_reshaped)

        ## search results
        SearchResults['best_model'] = optimizer.best_estimator_
        SearchResults['best_score'] = optimizer.best_score_
        SearchResults['best_param'] = SearchResults['best_model'].get_params()
        self.HPT_Results[search_method] = SearchResults
        
        ##
        # self.model = optimizer.best_estimator_
        if SearchResults['best_param'] is not None:
            self.model.set_params(**SearchResults['best_param'])
        else:
            self.model = self.model

        if printLog:
            print(f"\tThis is the log info")
            print(f"\tThe best {scoring}: {SearchResults['best_score']}")
            print(f"\tThe optimized Params: {SearchResults['best_param']}")
            ## ----------------------------------------------------------------
            print(f"\tHyper-parameters Tunning costs time = {(time.time()-beginTime):.2f} s ................")
        return None
    
    ## <===================== tools =====================>    
    def __CalcScore_ROCAUC(self, y_prob, y_true, estTrsd=False):
        try:
            ## Assuming y_true are the true labels and y_prob are the predicted probabilities
            fpr, tpr, thresholds = roc_curve(y_true, y_prob)
            aucs_score = auc(fpr, tpr)

            ## determine the best threshold
            if estTrsd:
                ## Calculate the distance to the top-left corner (0,1)
                distances = np.sqrt(fpr**2 + (1-tpr)**2)
                self.best_threshold = thresholds[distances.argmin()]
                print(f"\tThe best threshold is changged to {self.best_threshold}")
                # ## Calculate Youden's J statistic
                # youden_j = tpr - fpr
                # self.best_threshold = thresholds[youden_j.argmax()]
        except Exception as e:
            print(f'Warning! Cannot calculate ROC AUC, error msg: {e}')
            auc_score, fpr, tpr, thresholds = np.nan, np.nan, np.nan, np.nan
        results = {'auc_score': aucs_score, 'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds}
        return results

    def __CalcScore_ACC(self, y_pred, y_true):
        try:
            acc = accuracy_score(y_true, y_pred)
        except Exception as e:
            acc = np.nan
        return acc
    
    def __CalcScore_CM(self, y_pred, y_true):
        try:
            cm = confusion_matrix(Clas_test, Clas_test_pred)
        except Exception as e:
            cm = np.nan
        return cm
                
    def _CalcScores(self, y_pred_prob, y_true, estTrsd=False, printLog=True):   
        dataDict_result = {}
        try:
            y_pred = y_pred.reshape((len(y_pred), ))
            y_true = y_true.reshape((len(y_true), ))
        except Exception as e:
            print(f"\tError! Cannot reformatting the y_pred and y_true when calculating the statistics")
        else:
            ## calculate the ROC auc
            dataDict_result['ROC_AUC'] = self.__CalcScore_ROCAUC(y_pred, y_true, estTrsd=estTrsd)
            
            ## calculate the accuracy
            y_pred_binary = np.where(y_pred_prob >= self.best_threshold, 1, 0)
            dataDict_result['Accuracy'] = self.__CalcScore_ACC(y_pred=y_pred_binary, y_true=y_true)
            dataDict_result['ConfusionMatrics'] = self.__CalcScore_CM(y_pred=y_pred_binary, y_true=y_true)
             
            ## print out the results
            if printLog:
                print(f"\t\tData shape: y_pred {y_pred.shape}; y_true {y_true.shape}")
                print(f"\t\tAUROC: {dataDict_result['ROC_AUC']['auc_score']:.2f}")
                print(f"\t\tAccuracy: {dataDict_result['Accuracy']:.2f}")
                print(f"\t\tConfusionMatrics: {dataDict_result['ConfusionMatrics']}")
        return dataDict_result

    def _Plot_ROCAUC(self, ds_label):
        ## initiate the figure axes
        fig, ax = plt.subplots(figsize=(6, 6))
        ## generate plot
        try:
            dataDict_roc = self.performance[ds_label]['ROC_AUC']
            fpr, tpr, roc_auc = dataDict_roc['fpr'], dataDict_roc['tpr'], dataDict_roc['auc_score']
            display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc, estimator_name=ds_label)
        except Exception as e:
            pass
        else:
            display.plot(ax=ax)
            ## set the figure config
            ax.set(xlim=[-0.05, 1.05], ylim=[-0.05, 1.05], 
                xlabel="False Positive Rate", ylabel="True Positive Rate", 
                title=f"ROC Curve ({ds_label})")
            ax.axis("square")
            ax.legend(loc="lower right")
            # plt.show()
        return fig
    
    def _Pred_Class(self, prob):
        if prob >= self.best_threshold:
            pred = 1 
        else:
            pred = 0
        return pred

    def ___futureFunctionsTBA():
        return None

#########################################################################################
################################# select_ML_methods #####################################
#########################################################################################

def select_ML_methods(modelType, ml_methed, rng=666666, knnk=3, n_jobs=-1):
    if modelType == 'regression':
        if ml_methed == 'RF':
            from sklearn.ensemble import RandomForestRegressor
            sk_model = RandomForestRegressor(random_state=rng, oob_score=True, n_jobs=n_jobs)
            search_space = {
                'n_estimators': [50, 100, 250, 500],
                'max_depth': [2, 4, 6],
                'max_features': ['auto', 'sqrt'],
                'min_samples_leaf': [1, 5, 10, 25, 50],
                'max_features': ['sqrt', 'log2', None],
                'min_samples_split': [2, 5, 8, 10]}
        
        elif ml_methed == 'linear':
            from sklearn.linear_model import LinearRegression
            sk_model = LinearRegression(n_jobs=n_jobs)
            search_space = None
            # from sklearn.linear_model import Lasso
            # sk_model = Lasso(alpha=0.1)
            # search_space = {'alpha': [0, 0.1, 0.25, 0.5, 0.8]}
        
        elif ml_methed == 'SVM':
            from sklearn.svm import SVR
            sk_model = SVR(kernel="rbf", gamma=0.1)
            search_space = {
                'kernel': ['poly', 'rbf', 'sigmoid'], 
                'gamma': ['scale', 'auto'], 
                'C': [0.1, 1, 10, 100]}
        
        elif ml_methed == 'MLP':
            from sklearn.neural_network import MLPRegressor
            sk_model = MLPRegressor(random_state=rng, max_iter=500, early_stopping=True)
            search_space = {
                'hidden_layer_sizes': [(128,), (128, 128), (128, 128, 128)], 
                'activation': ['logistic', 'tanh', 'relu'], 
                'solver': ['sgd', 'adam'],
                'alpha': [0.1, 0.01, 0.001, 0.0001]}
        
        elif ml_methed == 'KNN':
            from sklearn.neighbors import KNeighborsRegressor
            sk_model = KNeighborsRegressor(n_neighbors=knnk, n_jobs=n_jobs)
            search_space = {'n_neighbors': [1, 3, 5, 10]}
        
        else:
            print(f"Error! no proper ML methods were selected, using Linear method instead")
            from sklearn.linear_model import Lasso
            sk_model = Lasso(alpha=0.1)
            search_space = {'alpha': [0, 0.1, 0.25, 0.5, 0.8]}

    elif modelType == 'classification':
        if ml_methed == 'RF':
            from sklearn.ensemble import RandomForestClassifier
            sk_model = RandomForestClassifier(random_state=rng, class_weight='balanced_subsample', oob_score=True, n_jobs=n_jobs)
            search_space = {
                'n_estimators': [50, 100, 250, 500],
                'max_depth': [2, 4, 6],
                'max_features': ['auto', 'sqrt'],
                'min_samples_leaf': [1, 5, 10, 25, 50],
                'max_features': ['sqrt', 'log2', None],
                'min_samples_split': [2, 5, 8, 10]}

        elif ml_methed == 'linear':
            from sklearn.linear_model import LogisticRegression
            sk_model = LogisticRegression(random_state=rng, n_jobs=n_jobs)
            search_space = None

        elif ml_methed == 'SVM':
            from sklearn.svm import SVC
            sk_model = SVC(kernel="rbf", gamma=0.1, random_state=rng, probability=True)
            search_space = {
                'kernel': ['poly', 'rbf', 'sigmoid'], 
                'gamma': ['scale', 'auto'], 
                'C': [0.1, 1, 10, 100]}

        elif ml_methed == 'MLP':
            from sklearn.neural_network import MLPClassifier
            sk_model = MLPClassifier(random_state=rng, max_iter=500, early_stopping=True)
            search_space = {
                'hidden_layer_sizes': [(128,), (128, 128), (128, 128, 128)], 
                'activation': ['logistic', 'tanh', 'relu'], 
                'solver': ['sgd', 'adam'],
                'alpha': [0.1, 0.01, 0.001, 0.0001]}
        
        elif ml_methed == 'XGBoost':
            from sklearn.ensemble import GradientBoostingClassifier
            sk_model = GradientBoostingClassifier(n_estimators=100, random_state=rng)
            search_space = {
                'n_estimators': [50, 100, 250, 500],
                'loss': ["log_loss", "exponential"],
                'max_depth': [1, 3, 5],
                'learning_rate': [0.01, 0.1, 1],
                'min_samples_leaf': [1, 5, 10, 25, 50],
                'min_samples_split': [2, 5, 8, 10],
                'max_features': ['sqrt', 'log2', None]}
        
        elif ml_methed == 'KNN':
            from sklearn.neighbors import KNeighborsClassifier
            sk_model = KNeighborsClassifier(n_neighbors=knnk, n_jobs=n_jobs)
            search_space = {'n_neighbors': [1, 3, 5, 10]}

    else:
        print(f"\tError! ML model type should be one of <regression> or <classification>" )
    return sk_model, search_space

In [None]:
def main():
    #
    modelType="regression"
    m_rf = _str_2_bool(args.model_rf)
    m_li = _str_2_bool(args.model_linear)
    m_svm = _str_2_bool(args.model_svm)
    m_mlp = _str_2_bool(args.model_mlp)
    m_knn = _str_2_bool(args.model_knn)
    m_knnk = int(args.model_knnk)
    m_xgb = _str_2_bool(args.model_xgb)
    HPT = _str_2_bool(args.HPT)
    pass

In [None]:
####################################################################
########################## Tools ###################################
####################################################################
## get the args
def Args_Prepation(parser_desc):
    import argparse
    parser = argparse.ArgumentParser(description=parser_desc)
    
    parser.add_argument('-i', '--input', action="store", default=None, help='The input csv file')
    parser.add_argument('-d', '--delimiter', action="store", default=',', help='The delimiter of input csv file for separate columns')
    # parser.add_argument('--detectEncoding', action="store_true", help='detect the encoding type of the csv file')
    parser.add_argument('--colId', action="store", default='Compound Name', help='The column name of the compound identifier')
    parser.add_argument('--colSmi', action="store", default='Structure', help='The column name of the compound smiles')
    parser.add_argument('--colPreCalcDesc', action="store", default=None, help='comma separated string e.g., <desc_1,desc_2,desc_3>')   

    parser.add_argument('--desc_fps', action="store", default="True", help='calculate the molecular fingerprints')
    parser.add_argument('--desc_rdkit', action="store", default="True", help='calculate the molecular property using RDKit')
    parser.add_argument('--desc_cx', action="store", default="True", help='calculate the molecular property using ChemAxon')

    parser.add_argument('--norm', action="store", default="True", help='normalize the descriptors (z-score)')
    parser.add_argument('--imput', action="store", default="True", help='impute the descriptors')
    parser.add_argument('-o', '--output', action="store", default="./results", help='the output folder')

    args = parser.parse_args()
    return args

