In [None]:
import pandas as pd

df_ar = pd.read_csv('ar.csv')
df_ahr = pd.read_csv('ahr.csv')
df_ar_lbd = pd.read_csv('ar-lbd.csv')
df_are = pd.read_csv('are.csv')
df_aromatase = pd.read_csv('aromatase.csv')
df_atad5 = pd.read_csv('atad5.csv')
df_er_lbd = pd.read_csv('er-lbd.csv')
df_er = pd.read_csv('er.csv')
df_hse = pd.read_csv('hse.csv')
df_mmp = pd.read_csv('mmp.csv')
df_p53 = pd.read_csv('p53.csv')
df_ppar_gamma = pd.read_csv('ppar-gamma.csv')

In [None]:
df_list = [df_ar, df_ahr, df_ar_lbd, df_are, df_aromatase, df_atad5, df_er_lbd, df_er, df_hse, df_mmp, df_p53, df_ppar_gamma]

df_name = ['df_ar', 'df_ahr', 'df_ar_lbd', 'df_are', 'df_aromatase', 'df_atad5', 'df_er_lbd', 'df_er', 'df_hse', 'df_mmp', 'df_p53', 'df_ppar_gamma']
for i, j in zip(df_name, df_list):
    print(i+':', len(j))

In [None]:
#데이터 분할

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve, roc_curve, f1_score, roc_auc_score, average_precision_score, accuracy_score
import numpy as np
import joblib

def train_one_assay(df, n_trials=100, random_state=42):
    X_train, X_test, y_train, y_test = traintestsplit(df)
    
    X_tr, X_val, y_tr, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
    )

    study4 = optuna.create_study(direction='maximize')
    study4.optimize(objective, n_trials=100, show_progress_bar=True)
    best = {**study4.best_params, 'class_weight': 'balanced', 'random_state':42, 'n_jobs':-1}
    
    rfc = RandomForestClassifier(**study4.best_params)

    rfc.fit(X_tr, y_tr)

    val_score = rfc.predict_proba(X_val)[:, 1]
    prec, rec, thr = precision_recall_curve(y_val, val_score)
    beta = 2.0 
    fb = (1+beta**2) * prec*rec / (beta**2*prec + rec + 1e-12)
    best_idx = np.nanargmax(fb[:-1])
    best_thr = float(thr[best_idx])

    final_model = RandomForestClassifier(**best).fit(X_train, y_train)
    test_score = final_model.predict_proba(X_test)[:, 1]
    test_auc = roc_auc_score(y_test, test_score)
    test_pr = average_precision_score(y_test, test_score)
    
    return {
        'model':final_model,
        'best_thr':best_thr,
        'X_cols':list(X_train.columns),
        'test_auc':test_auc,
        'test_pr':test_pr,
        'best_params':best
    }
    
def train_all_assays(df_list, assay_names, n_trials=100):
    bundle = {}
    for name, df in zip(assay_names, df_list):
        bundle[name] = train_one_assay(df, n_trials=n_trials)
    joblib.dump(bundle, 'tox21_maccs_bundle.joblib')
    
    return bundle

In [None]:
#SMILES -> MACCS Key 변형

from rdkit import Chem
from rdkit.Chem import MACCSkeys, Descriptors
from rdkit.Chem.rdmolops import SanitizeFlags
from rdkit.Chem.MolStandardize import rdMolStandardize as std
from rdkit import DataStructs
import numpy as np
import pandas as pd
from rdkit import RDLogger
from tqdm import tqdm
 

def SmilesToMaccs(smiles_list, mw_range=(50,1000)):
    RDLogger.DisableLog('rdApp.warning')
    maccs_array = []
    cnt = 0
    eliminated_index = []
    for index, smiles in enumerate(smiles_list):
        try:
            sanitize_ops = (SanitizeFlags.SANITIZE_ALL
                            ^ SanitizeFlags.SANITIZE_KEKULIZE
                            ^ SanitizeFlags.SANITIZE_SETAROMATICITY
                            )
            
            mol = Chem.MolFromSmiles(smiles, sanitize=False)
            if mol is None:
                raise ValueError('Invalid SMILES')
            
            Chem.SanitizeMol(mol, sanitizeOps=sanitize_ops)
            
            params = std.CleanupParameters()
            mol = std.Cleanup(mol, params)
            mol = std.MetalDisconnector().Disconnect(mol)
            mol = std.Uncharger().uncharge(mol)
            mol = std.Reionizer().reionize(mol)
            
            chooser = std.LargestFragmentChooser(preferOrganic=True)
            mol = chooser.choose(mol)
            
            Chem.SanitizeMol(mol, sanitizeOps=sanitize_ops)
            
            mw = Descriptors.MolWt(mol)
            if mw < mw_range[0] or mw > mw_range[1]:
                cnt += 1
                eliminated_index.append(index)
                continue
            
            fp = MACCSkeys.GenMACCSKeys(mol)
            arr = np.zeros(167, dtype=np.uint8)
            DataStructs.ConvertToNumpyArray(fp, arr)
            maccs_array.append(arr.astype(float))
            
        except Exception as e:
            cnt += 1
            eliminated_index.append(index)
            continue
    
    print(f'{cnt} eliminated')

    X = np.vstack(maccs_array).astype(float) 
    cols = [f'MACCS_{i:03d}' for i in range(X.shape[1])]
    
    return pd.DataFrame(X, columns=cols), eliminated_index  

In [None]:
#예측 dataframe 출력

def predict_multilabel(smiles_list, bundle, return_proba=False):
    if isinstance(smiles_list, str):
        smiles_list = [smiles_list]
        
    X_raw, eliminated_index = SmilesToMaccs(smiles_list)

    yhat_cols = {}
    proba_cols = {}
    
    assays = list(bundle.keys())
    
    for assay in assays:
        info = bundle[assay]
        X = X_raw.reindex(columns=info['X_cols'], fill_value=0)
        p = info['model'].predict_proba(X)[:, 1]
        y = (p >= info['best_thr']).astype(int)
        
        yhat_cols[assay[3:]] = y
        if return_proba:
            proba_cols[assay] = p
    
    yhat_df = pd.DataFrame(yhat_cols, index=X_raw.index)
    
    if len(eliminated_index) >= 1:
        smiles_s = pd.Series(smiles_list, name="SMILES")
        smiles_s = smiles_s.drop(index=sorted(set(map(int, eliminated_index))))
        outcome_df = smiles_s.reset_index(drop=True).to_frame()
        
    else:
        outcome_df = pd.DataFrame({'SMILES':smiles_list}).reset_index(drop=True)
        
    outcome_df = pd.concat([outcome_df, yhat_df.reset_index(drop=True)], axis=1)
    
    if return_proba:
        proba_df = pd.DataFrame(proba_cols, index=X_raw.index)
        return outcome_df, proba_df
    return outcome_df    
    

In [None]:
bundle = joblib.load('tox21_maccs_bundle.joblib')
outcome_df, proba_df = predict_multilabel(new_smiles2, bundle, return_proba=True)
proba_df