In [1]:
# =========================
# Standard library
# =========================
import os
import ast

# =========================
# Third-party libraries
# =========================
import numpy as np
import pandas as pd
import joblib

from sklearn.metrics import (
    confusion_matrix,
    accuracy_score,
    roc_auc_score
)

# =========================
# RDKit
# =========================
from rdkit import Chem
from rdkit.Chem import (
    AllChem,
    MACCSkeys,
    rdMolDescriptors,
    RDKFingerprint,
    Descriptors
)
from rdkit.DataStructs import (
    TanimotoSimilarity,
    ExplicitBitVect
)

# =========================
# CDK
# =========================
from CDK_pywrapper import CDK


# mfCoQ-RASAR Evaluation

# Weight Optimzation

In [None]:
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem.Scaffolds import MurckoScaffold
from rdkit.DataStructs.cDataStructs import ExplicitBitVect
from rdkit import DataStructs
from sklearn.metrics import roc_auc_score, confusion_matrix
from tqdm import tqdm
import joblib
import ast
import os

# ===============================================================
# 0. Paths & config
# ===============================================================
train_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Train_set_Dermal_with_fingerprints_RDKit_CDK.xlsx"
qsar_model_dir = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\QSAR Model"
ra_train_file = r"C:\Fauzan\Manuscripts QSAR-RA 2\Old Endpoints\Acute Dermal Toxicity (manual split)\Read Across\Train_set_Dermal_balanced_with_fingerprints.xlsx"
sf_ra_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Read Across\Sf_per_fingerprint.csv"
output_dir = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Evaluation"
os.makedirs(output_dir, exist_ok=True)

# kandidat w
W_GRID = np.arange(0.01, 1.00, 0.01)

# ===============================================================
# 1. Load TRAIN set (untuk mfCoQ-RASAR CV)
# ===============================================================
train_df_qsar = pd.read_excel(train_path)
y_all = train_df_qsar['Outcome'].astype(int).values
smiles_all = train_df_qsar['SMILES'].astype(str).values

# helper: parse fingerprints → np.array
def convert_to_array(series):
    return np.array(series.apply(ast.literal_eval).tolist())

X_qsar_by_desc = {
    "Morgan":   convert_to_array(train_df_qsar["Morgan_Descriptors"]),
    "MACCS":    convert_to_array(train_df_qsar["MACCS_Descriptors"]),
    "APF":      convert_to_array(train_df_qsar["APF_Descriptors"]),
    "Physchem": train_df_qsar.drop(columns=[
        'SMILES', 'Morgan_Descriptors', 'MACCS_Descriptors',
        'APF_Descriptors', 'RDK_Descriptors', 'Outcome'
    ]).values
}

descriptors = ["Morgan", "MACCS", "APF", "Physchem"]
alg_best = {
    "Morgan":   "XGB",
    "MACCS":    "SVM",
    "APF":      "RF",
    "Physchem": "XGB",
}

# QSAR model paths (best per descriptor)
qsar_model_paths = {
    "Morgan":   os.path.join(qsar_model_dir, "Dermal_xgb_morgan.pkl"),
    "MACCS":    os.path.join(qsar_model_dir, "Dermal_SVM_MACCS.pkl"),
    "APF":      os.path.join(qsar_model_dir, "Dermal_rf_apf.pkl"),
    "Physchem": os.path.join(qsar_model_dir, "Dermal_xgb_rdkitcdk.pkl"),
}
qsar_models = {d: joblib.load(p) for d, p in qsar_model_paths.items()}

# Load Sm (QSAR) – hasil 10-fold scaffold-CV QSAR
Sm_files = {
    "Morgan":   os.path.join(qsar_model_dir, "Sm_Morgan_XGB.csv"),
    "MACCS":    os.path.join(qsar_model_dir, "Sm_MACCS_SVM.csv"),
    "APF":      os.path.join(qsar_model_dir, "Sm_APF_RF.csv"),
    "Physchem": os.path.join(qsar_model_dir, "Sm_Physchem_XGB.csv"),
}
Sm_qsar = {}
for desc, path in Sm_files.items():
    sm_df = pd.read_csv(path)
    Sm_qsar[desc] = float(sm_df["Sm"].values[0])

# ===============================================================
# 2. Scaffold-based 10-fold split (Bemis–Murcko)
# ===============================================================
def get_bemis_murcko_scaffold(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    scaffold = MurckoScaffold.GetScaffoldForMol(mol)
    return Chem.MolToSmiles(scaffold) if scaffold is not None else None

def scaffold_kfold_indices(smiles_list, n_splits=10, random_state=42):
    scaffolds = [get_bemis_murcko_scaffold(smi) for smi in smiles_list]
    scaffold_to_indices = {}
    for idx, scaf in enumerate(scaffolds):
        scaffold_to_indices.setdefault(scaf, []).append(idx)
    unique_scaffolds = list(scaffold_to_indices.keys())
    rng = np.random.RandomState(random_state)
    rng.shuffle(unique_scaffolds)
    # manual split scaffolds ≈ 10-fold
    fold_scaffolds = np.array_split(unique_scaffolds, n_splits)
    folds = []
    for i in range(n_splits):
        val_scaf = fold_scaffolds[i]
        train_scaf = [s for j, fs in enumerate(fold_scaffolds) if j != i for s in fs]
        train_idx = []
        val_idx = []
        for s in train_scaf:
            train_idx.extend(scaffold_to_indices[s])
        for s in val_scaf:
            val_idx.extend(scaffold_to_indices[s])
        folds.append((np.array(train_idx, dtype=int), np.array(val_idx, dtype=int)))
    return folds

folds = scaffold_kfold_indices(smiles_all, n_splits=10, random_state=42)

# ===============================================================
# 3. Read-Across: train data & Sf
# ===============================================================
ra_train_df = pd.read_excel(ra_train_file)
y_train_ra = ra_train_df['Outcome'].astype(int).values
ra_fps = ['Morgan_Descriptors', 'MACCS_Descriptors', 'RDK_Descriptors']

def convert_list_to_bitvect(fp_list):
    if isinstance(fp_list, str):
        fp_list = ast.literal_eval(fp_list)
    n_bits = len(fp_list)
    bv = ExplicitBitVect(n_bits)
    for i, bit in enumerate(fp_list):
        if int(bit):
            bv.SetBit(i)
    return bv

def prepare_fingerprints(df, fingerprint_cols):
    fps_dict = {}
    for col in fingerprint_cols:
        fps_dict[col] = [convert_list_to_bitvect(x) for x in df[col]]
    return fps_dict

train_fps_ra = prepare_fingerprints(ra_train_df, ra_fps)

def compute_tanimoto_similarity(target_fp, reference_fps):
    return np.array([DataStructs.TanimotoSimilarity(target_fp, ref_fp) for ref_fp in reference_fps])

def predict_ra_weighted(test_fps_list, train_fps_list, y_train, k=5, tanimoto_cutoff=0.0):
    y_train = np.asarray(y_train, dtype=float)
    probs = []
    for test_fp in test_fps_list:
        sims = compute_tanimoto_similarity(test_fp, train_fps_list)
        if tanimoto_cutoff > 0.0:
            valid_idx = np.where(sims >= tanimoto_cutoff)[0]
        else:
            valid_idx = np.arange(len(sims))
        if len(valid_idx) == 0:
            probs.append(y_train.mean())
            continue
        sims_valid = sims[valid_idx]
        if len(sims_valid) > k:
            nn_local = np.argsort(sims_valid)[-k:]
            nn_idx = valid_idx[nn_local]
        else:
            nn_idx = valid_idx
        nn_sims = sims[nn_idx]
        nn_y = y_train[nn_idx]
        w = nn_sims.copy()
        if w.sum() == 0:
            probs.append(nn_y.mean())
        else:
            probs.append(np.sum(w * nn_y) / np.sum(w))
    return np.array(probs)

# Sf untuk RA
sf_df = pd.read_csv(sf_ra_path)
Sf_ra = {}
for col in ra_fps:
    row = sf_df.loc[sf_df["Fingerprint"] == col]
    if row.empty:
        raise ValueError(f"Sf untuk {col} tidak ditemukan di {sf_ra_path}")
    Sf_ra[col] = float(row["Sf"].values[0])

# ---------------------------------------------------------------
# Helper metrik
# ---------------------------------------------------------------
def compute_bacc_from_preds(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    spec = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    bacc = 0.5 * (sens + spec)
    return bacc, sens, spec

# ===============================================================
# 4. 10-fold scaffold-CV mfCoQ-RASAR untuk cari w*
# ===============================================================
S_w_list = []  # simpan S_w per weight

for w in W_GRID:
    fold_scores = []  # S_{f,w}
    for fold_id, (idx_train, idx_val) in enumerate(folds, start=1):
        # subset train/val QSAR
        y_tr = y_all[idx_train]
        y_val = y_all[idx_val]

        # QSAR: consensus di fold ini (pakai model final; ini approx CV model di manuskrip)
        probs_qsar_val_desc = {}
        for desc in descriptors:
            X_tr_desc = X_qsar_by_desc[desc][idx_train]
            X_val_desc = X_qsar_by_desc[desc][idx_val]
            model = qsar_models[desc]
            # retrain di fold-train
            model.fit(X_tr_desc, y_tr)
            probs_qsar_val_desc[desc] = model.predict_proba(X_val_desc)[:, 1]

        # bobot QSAR descriptor-level = Sm (per descriptor), normalisasi
        Sm_vals_fold = np.array([Sm_qsar[d] for d in descriptors], dtype=float)
        w_desc = Sm_vals_fold / Sm_vals_fold.sum()
        P_qsar_val = np.average(
            np.vstack([probs_qsar_val_desc[d] for d in descriptors]),
            axis=0,
            weights=w_desc
        )

        # RA: di manuskrip RA dilatih di training set global; di sini pakai train RA tetap,
        # tapi prediksi hanya untuk subset val QSAR (pakai SMILES yang sama urutan train_df_qsar)
        # kita butuh RA prob untuk molekul val: gunakan fingerprint di qsar train_df
        val_df = train_df_qsar.iloc[idx_val].reset_index(drop=True)

        test_fps_val = prepare_fingerprints(val_df, ra_fps)

        ra_probs_fp_val = {}
        for col in ra_fps:
            ra_probs_fp_val[col] = predict_ra_weighted(
                test_fps_val[col], train_fps_ra[col], y_train_ra, k=5, tanimoto_cutoff=0.0
            )

        Sf_vals = np.array([Sf_ra[col] for col in ra_fps], dtype=float)
        w_fp = Sf_vals / Sf_vals.sum()
        P_ra_val = np.average(
            np.vstack([ra_probs_fp_val[col] for col in ra_fps]),
            axis=0,
            weights=w_fp
        )

        # mfCoQ-RASAR prob untuk fold ini & w
        P_mf_val = w * P_qsar_val + (1 - w) * P_ra_val
        y_pred_val = (P_mf_val >= 0.5).astype(int)

        # metrik fold
        try:
            auc = roc_auc_score(y_val, P_mf_val)
        except ValueError:
            auc = np.nan
        bacc, _, _ = compute_bacc_from_preds(y_val, y_pred_val)

        if not np.isnan(auc):
            S_fw = 0.5 * (auc + bacc)
        else:
            S_fw = bacc  # fallback jika AUC NaN

        fold_scores.append(S_fw)

    S_w = float(np.mean(fold_scores))
    S_w_list.append({"w": w, "S_w": S_w})
    print(f"w={w:.2f} -> S_w={S_w:.4f}")

S_w_df = pd.DataFrame(S_w_list).sort_values(by="S_w", ascending=False)
best_row = S_w_df.iloc[0]
best_w = float(best_row["w"])
best_Sw = float(best_row["S_w"])

print("\n=== Optimal mfCoQ-RASAR weight (train CV) ===")
print(f"w* (QSAR weight) = {best_w:.2f}, S_w* = {best_Sw:.4f}")

# simpan ke CSV
w_out_path = os.path.join(output_dir, "mfCoQ-RASAR_best_w_from_10fold_scaffoldCV.csv")
S_w_df.to_csv(w_out_path, index=False)
print("Grid S_w(w) dan w* disimpan ke:", w_out_path)


# Evaluation 

In [None]:
import numpy as np
import pandas as pd
from rdkit.DataStructs.cDataStructs import ExplicitBitVect
from rdkit import DataStructs
from sklearn.metrics import (
    confusion_matrix, accuracy_score, roc_auc_score
)
import joblib
import ast
import os

# ===============================================================
# 0. Paths & config
# ===============================================================
test_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Test_set_Dermal_with_fingerprints_RDKit_CDK.xlsx"
qsar_model_dir = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\QSAR Model"
ra_train_file = r"C:\Fauzan\Manuscripts QSAR-RA 2\Old Endpoints\Acute Dermal Toxicity (manual split)\Read Across\Train_set_Dermal_balanced_with_fingerprints.xlsx"
sf_ra_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Read Across\Sf_per_fingerprint.csv"
best_w_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Evaluation\mfCoQ-RASAR_best_w_from_10fold_scaffoldCV.csv"
output_path = r"C:\Fauzan\Manuscripts QSAR-RA 2\q-RASAR\Acute Dermal Toxicity\Evaluation\mfCoQ-RASAR_Test_Performance.xlsx"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# ===============================================================
# 1. Load w* dari training CV
# ===============================================================
w_df = pd.read_csv(best_w_path)
w_star = float(w_df.sort_values("S_w", ascending=False).iloc[0]["w"])
print(f"Using optimal w* (QSAR weight) from train CV: {w_star:.2f}")

# ===============================================================
# 2. Load TEST set
# ===============================================================
test_df = pd.read_excel(test_path)
y_true = test_df["Outcome"].astype(int).values

def convert_to_array(series):
    return np.array(series.apply(ast.literal_eval).tolist())

X_test_qsar = {
    "Morgan":   convert_to_array(test_df["Morgan_Descriptors"]),
    "MACCS":    convert_to_array(test_df["MACCS_Descriptors"]),
    "APF":      convert_to_array(test_df["APF_Descriptors"]),
    "Physchem": test_df.drop(columns=[
        "SMILES", "Morgan_Descriptors", "MACCS_Descriptors",
        "APF_Descriptors", "RDK_Descriptors", "Outcome"
    ]).values
}
descriptors = ["Morgan", "MACCS", "APF", "Physchem"]

# ===============================================================
# 3. Load QSAR models & Sm (best per descriptor)
# ===============================================================
qsar_models_info = {
    "Morgan":   os.path.join(qsar_model_dir, "Dermal_xgb_morgan.pkl"),
    "MACCS":    os.path.join(qsar_model_dir, "Dermal_SVM_MACCS.pkl"),
    "APF":      os.path.join(qsar_model_dir, "Dermal_rf_apf.pkl"),
    "Physchem": os.path.join(qsar_model_dir, "Dermal_xgb_rdkitcdk.pkl"),
}
qsar_models = {d: joblib.load(p) for d, p in qsar_models_info.items()}

Sm_files = {
    "Morgan":   os.path.join(qsar_model_dir, "Sm_Morgan_XGB.csv"),
    "MACCS":    os.path.join(qsar_model_dir, "Sm_MACCS_SVM.csv"),
    "APF":      os.path.join(qsar_model_dir, "Sm_APF_RF.csv"),
    "Physchem": os.path.join(qsar_model_dir, "Sm_Physchem_XGB.csv"),
}
Sm_qsar = {}
for desc, path in Sm_files.items():
    sm_df = pd.read_csv(path)
    Sm_qsar[desc] = float(sm_df["Sm"].values[0])

Sm_vals = np.array([Sm_qsar[d] for d in descriptors], dtype=float)
w_qsar_desc = Sm_vals / Sm_vals.sum()

# QSAR probability per descriptor
probs_qsar_desc = {}
for desc in descriptors:
    model = qsar_models[desc]
    X = X_test_qsar[desc]
    probs_qsar_desc[desc] = model.predict_proba(X)[:, 1]

qsar_consensus_probs = np.average(
    np.vstack([probs_qsar_desc[d] for d in descriptors]),
    axis=0,
    weights=w_qsar_desc
)

# ===============================================================
# 4. Read-Across consensus on TEST
# ===============================================================
ra_train_df = pd.read_excel(ra_train_file)
y_train_ra = ra_train_df["Outcome"].astype(int).values
ra_fps = ["Morgan_Descriptors", "MACCS_Descriptors", "RDK_Descriptors"]

def to_bitvect(fp_list):
    if isinstance(fp_list, str):
        fp_list = ast.literal_eval(fp_list)
    n_bits = len(fp_list)
    bv = ExplicitBitVect(n_bits)
    for i, bit in enumerate(fp_list):
        if int(bit):
            bv.SetBit(i)
    return bv

def prepare_fps(df, fp_cols):
    res = {}
    for col in fp_cols:
        res[col] = [to_bitvect(x) for x in df[col]]
    return res

train_fps_ra = prepare_fps(ra_train_df, ra_fps)
test_fps_ra  = prepare_fps(test_df, ra_fps)

def tanimoto_sim(target_fp, ref_fps):
    return np.array([DataStructs.TanimotoSimilarity(target_fp, r) for r in ref_fps])

def predict_ra_weighted(test_fps_list, train_fps_list, y_train, k=5, tanimoto_cutoff=0.0):
    y_train = np.asarray(y_train, dtype=float)
    probs = []
    for t_fp in test_fps_list:
        sims = tanimoto_sim(t_fp, train_fps_list)
        if tanimoto_cutoff > 0.0:
            valid_idx = np.where(sims >= tanimoto_cutoff)[0]
        else:
            valid_idx = np.arange(len(sims))
        if len(valid_idx) == 0:
            probs.append(y_train.mean())
            continue
        sims_valid = sims[valid_idx]
        if len(sims_valid) > k:
            nn_local = np.argsort(sims_valid)[-k:]
            nn_idx = valid_idx[nn_local]
        else:
            nn_idx = valid_idx
        nn_sims = sims[nn_idx]
        nn_y = y_train[nn_idx]
        w = nn_sims.copy()
        if w.sum() == 0:
            probs.append(nn_y.mean())
        else:
            probs.append(np.sum(w * nn_y) / np.sum(w))
    return np.array(probs)

# RA prob per fingerprint
ra_probs_fps = {}
for col in ra_fps:
    ra_probs_fps[col] = predict_ra_weighted(
        test_fps_ra[col], train_fps_ra[col], y_train_ra, k=5, tanimoto_cutoff=0.0
    )

# Load Sf untuk RA weighting
sf_df = pd.read_csv(sf_ra_path)
Sf_ra = {}
for col in ra_fps:
    row = sf_df.loc[sf_df["Fingerprint"] == col]
    if row.empty:
        raise ValueError(f"Sf untuk {col} tidak ditemukan di {sf_ra_path}")
    Sf_ra[col] = float(row["Sf"].values[0])

Sf_vals = np.array([Sf_ra[col] for col in ra_fps], dtype=float)
w_ra_fp = Sf_vals / Sf_vals.sum()

ra_consensus_probs = np.average(
    np.vstack([ra_probs_fps[col] for col in ra_fps]),
    axis=0,
    weights=w_ra_fp
)

# ===============================================================
# 5. mfCoQ-RASAR with fixed w*
# ===============================================================
mfcoq_probs = w_star * qsar_consensus_probs + (1 - w_star) * ra_consensus_probs
preds = (mfcoq_probs >= 0.5).astype(int)

tn, fp, fn, tp = confusion_matrix(y_true, preds).ravel()
acc = accuracy_score(y_true, preds)
bacc = 0.5 * (
    (tp / (tp + fn) if (tp + fn) > 0 else 0.0) +
    (tn / (tn + fp) if (tn + fp) > 0 else 0.0)
)
sen = tp / (tp + fn) if (tp + fn) > 0 else 0.0
spe = tn / (tn + fp) if (tn + fp) > 0 else 0.0
auc = roc_auc_score(y_true, mfcoq_probs)

# ===============================================================
# 6. Bootstrap CI 95% (sesuai manuskrip)
# ===============================================================
def bootstrap_ci(probs, y_true, n_bootstrap=1000, random_state=42):
    rng = np.random.default_rng(random_state)
    probs = np.asarray(probs)
    y_true = np.asarray(y_true)
    preds = (probs >= 0.5).astype(int)
    n = len(y_true)

    accs, baccs, sens, spes, aucs = [], [], [], [], []
    for _ in range(n_bootstrap):
        idx = rng.choice(n, size=n, replace=True)
        y_b = y_true[idx]
        p_b = probs[idx]
        pred_b = preds[idx]
        tn_b, fp_b, fn_b, tp_b = confusion_matrix(y_b, pred_b).ravel()
        accs.append(accuracy_score(y_b, pred_b))
        se = tp_b / (tp_b + fn_b) if (tp_b + fn_b) > 0 else 0.0
        sp = tn_b / (tn_b + fp_b) if (tn_b + fp_b) > 0 else 0.0
        sens.append(se)
        spes.append(sp)
        baccs.append(0.5 * (se + sp))
        try:
            aucs.append(roc_auc_score(y_b, p_b))
        except ValueError:
            aucs.append(np.nan)

    def summarize(vals):
        vals = np.array(vals, dtype=float)
        med = np.nanmedian(vals)
        lo = np.nanpercentile(vals, 2.5)
        hi = np.nanpercentile(vals, 97.5)
        half = (hi - lo) / 2.0
        return f"{med:.2f} ± {half:.2f}"

    return {
        "ACC_CI": summarize(accs),
        "BACC_CI": summarize(baccs),
        "SEN_CI": summarize(sens),
        "SPE_CI": summarize(spes),
        "AUC_CI": summarize(aucs),
    }

ci = bootstrap_ci(mfcoq_probs, y_true)

# ===============================================================
# 7. Save results
# ===============================================================
results = {
    "w_star": [w_star],
    "AUC": [auc],
    "BACC": [bacc],
    "ACC": [acc],
    "SEN": [sen],
    "SPE": [spe],
    "TN": [tn],
    "FP": [fp],
    "FN": [fn],
    "TP": [tp],
    "AUC_CI": [ci["AUC_CI"]],
    "BACC_CI": [ci["BACC_CI"]],
    "ACC_CI": [ci["ACC_CI"]],
    "SEN_CI": [ci["SEN_CI"]],
    "SPE_CI": [ci["SPE_CI"]],
}

df_out = pd.DataFrame(results)
df_out.to_excel(output_path, index=False)

print("mfCoQ-RASAR test performance saved to:", output_path)
print(df_out.T)
