# Original Function

In [28]:
#
from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
from rdkit.Chem.Draw import SimilarityMaps
from scipy.spatial.distance import cdist
import numpy as np
from scipy.spatial.distance import pdist, cdist

"""import glob
import gzip
import bz2"""
import os
#import _pickle as cPickle

import io
import matplotlib.pyplot as plt

# god hates me so in my version of python I cannot supress these damn user warning so I do this nuclear option instead
"""import warnings
def warn(*args, **kwargs):
    pass
warnings.warn = warn"""

# for setting confidence-based AD:
AD_THRESH = 0.6


import joblib  # Ensure this import is at the beginning of your script


MODEL_DIR = os.path.join(os.getcwd(), "models")  # Directory where models are stored

MODEL_DICT = {
    'Binary Sensitization': [joblib.load(os.path.join(MODEL_DIR, 'ss_DSA05_binary_morgan_r2_2048_svm_calibrated_with_threshold.joblib'))['model']],
    'Multiclass Sensitization Potency': [joblib.load(os.path.join(MODEL_DIR, 'DSA05_mordred_rf_multiclass.joblib'))]
}

THRES_DICT = {
    'Binary Sensitization': [joblib.load(os.path.join(MODEL_DIR, 'ss_DSA05_binary_morgan_r2_2048_svm_calibrated_with_threshold.joblib'))['threshold']],
}

# CHECK THIS
AD_DICT = {
    'Binary Sensitization': [joblib.load(os.path.join(MODEL_DIR, 'binary_AD.pkl'))],
    'Multiclass Sensitization Potency': [joblib.load(os.path.join(MODEL_DIR, 'multiclass_AD.pkl'))],
}

FEATURES_DICT = {
    'Binary Sensitization': [np.load(os.path.join(MODEL_DIR, 'selected_features_svm_binary.npy'), allow_pickle=True)],
    'Multiclass Sensitization Potency': [np.load(os.path.join(MODEL_DIR, 'selected_features_rf_multiclass.npy'), allow_pickle=True)],
}

# lol I'm just like screw code readability sorry
MODEL_DICT_INVERT = {str(v): key for key, val in MODEL_DICT.items() for v in val}

CLASSIFICATION_DICT = {
    'Binary Sensitization Call': {
        0: "Non-sensitizer",
        1: "Sensitizer"
    },
    'Multiclass Sensitization Potency': {
        0: "Non-sensitizer",
        1: "Weak sensitizer",
        2: "Strong sensitizer",
    }
}

AD_DICT_BOOL = {
    True: "Inside",
    False: "Outside"
}


def _get_AD_thresh(training_smiles, file_name):
    fps = np.array([list(AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), radius=2, nBits=2048, useFeatures=False))
                    for smi in training_smiles])

    dists = pdist(fps)
    mean_1 = dists.mean()
    dists_2 = dists[np.where(dists < mean_1)]
    mean_2 = dists_2.mean()
    std_2 = dists_2.std()

    threshold = mean_2 + (0.5 * std_2)

    import pickle
    pickle.dump((threshold, fps),  open(file_name, "wb"))

    with open(file_name, "wb") as f:
        pickle.dump((threshold, fps), f)


def calc_ad(query_fp, ad_tuple):
    dist = cdist(query_fp.reshape(1, -1), ad_tuple[1], "euclidean")
    return (dist < ad_tuple[0]).any()



def run_prediction_binary(model, smi, calculate_ad=True, ad_tup=None, threshold=0.5):
    """_summary_

    Args:
        model (_type_): _description_
        model_data (_type_): _description_
        smi (_type_): _description_
        calculate_ad (bool, optional): _description_. Defaults to True.

    Returns:
        _type_: _description_
    """
    fp = np.zeros((2048, 1))
    # sub in your FP function
    _fp = AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), radius=2, nBits=2048, useFeatures=False)
    DataStructs.ConvertToNumpyArray(_fp, fp)

    selected_features = np.load(os.path.join(MODEL_DIR, 'selected_features_svm_binary.npy'))
    fp_selected = fp[selected_features]

    pred_proba = model.predict_proba(fp_selected.reshape(1, -1))[:, 1]
    pred = 1 if pred_proba > threshold else 0

    if pred == 0:
        pred_proba = 1 - float(pred_proba)

    # used to get proba of the inactive class if deemed inactive
    # if pred == 0:
    #     pred_proba = 1-pred_proba

    if calculate_ad:
        ad = calc_ad(fp, ad_tup)
        return pred, pred_proba, ad
    
    else:
        ad = False  # Set to True or False depending on how you want to interpret "no AD calculation"

    return pred, float(pred_proba), ""

from sklearn.preprocessing import MinMaxScaler
from mordred import Calculator, descriptors

def run_prediction_multiclass(model, smi, calculate_ad=True, ad_tup=None, threshold=0.5):
    """_summary_

    Args:
        model (_type_): _description_
        model_data (_type_): _description_
        smi (_type_): _description_
        calculate_ad (bool, optional): _description_. Defaults to True.

    Returns:
        _type_: _description_
    """
    fp_ad_arr = np.zeros((2048, 1))
    # sub in your FP function
    fp_ad = AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), radius=2, nBits=2048, useFeatures=False)
    DataStructs.ConvertToNumpyArray(fp_ad, fp_ad_arr)

    calc = Calculator(descriptors, ignore_3D=True)
    mol = Chem.MolFromSmiles(smi)  # Convert SMILES to RDKit Mol object
    if mol is None:
        raise ValueError(f"Invalid SMILES string: {smi}")
    
    _fp = calc.pandas([mol])  # Pass the mol as a list

    # Min-Max scaling
    scaler = MinMaxScaler()
    fp = scaler.fit_transform(_fp)

    # Load selected features with allow_pickle=True
    selected_features = np.load(os.path.join(MODEL_DIR, 'selected_features_rf_multiclass.npy'), allow_pickle=True)

    # Ensure selected_features are valid column names
    # Filter the fp DataFrame to retain only the selected columns (features)
    fp_selected = _fp[selected_features]

    # Scale the selected features
    fp_selected_scaled = scaler.fit_transform(fp_selected)

    # Get probabilities for all classes (not just class 1)
    pred_proba = model.predict_proba(fp_selected_scaled.reshape(1, -1))

    # Get the class with the highest probability
    pred = np.argmax(pred_proba)

    # Probability of the predicted class
    predicted_class_proba = pred_proba[0][pred]

    if calculate_ad:
        ad = calc_ad(fp_ad_arr, ad_tup)
        return pred, predicted_class_proba, ad
    
    return pred, predicted_class_proba, ""





def get_prob_map(model, smi):
    def get_fp(mol, idx):
        fps = np.zeros((2048, 1))
        _fps = SimilarityMaps.GetMorganFingerprint(mol, idx, radius=2, nBits=2048)
        DataStructs.ConvertToNumpyArray(_fps, fps)
        return fps

    def get_proba(fps):
        return float(model.predict_proba(fps.reshape(1, -1))[:, 1])

    mol = Chem.MolFromSmiles(smi)
    fig, _ = SimilarityMaps.GetSimilarityMapForModel(mol, get_fp, get_proba)
    imgdata = io.StringIO()
    fig.savefig(imgdata, format='svg')
    imgdata.seek(0)  # rewind the data
    plt.savefig(imgdata, format="svg", bbox_inches="tight")

    return imgdata.getvalue()


def main(smi, calculate_ad=True, make_prop_img=False, **kwargs):
    values = {}

    for key, val in kwargs.items():
        if key in MODEL_DICT.keys():  # check if this kwarg is for a model
            if val:  # check if model is turned on
                model = MODEL_DICT[key][0]  # Get the model
                ad_tup = AD_DICT[key][0]

                # Choose the appropriate prediction function
                if key == 'Binary Sensitization':
                    pred, pred_proba, ad = run_prediction_binary(model, smi, calculate_ad=calculate_ad, ad_tup=ad_tup)
                elif key == 'Multiclass Sensitization Potency':
                    pred, pred_proba, ad = run_prediction_multiclass(model, smi, calculate_ad=calculate_ad, ad_tup=ad_tup)

                contrib_svg_str = ""
                if make_prop_img:
                    contrib_svg_str = get_prob_map(model, smi)

                values[key] = [pred, float(pred_proba), AD_DICT_BOOL[ad], contrib_svg_str]

    processed_results = []
    for key, val in values.items():
        # Use 'Binary Sensitization Call' for the classification dict key
        classification_key = 'Binary Sensitization Call' if key == 'Binary Sensitization' else key
        processed_results.append([key, CLASSIFICATION_DICT[classification_key][val[0]], val[1], val[2], val[3]])

    return processed_results






https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


## OLD CODE - Testing

In [34]:
smiles_string = "NC(=O)C1=CC=CC=C1"
results = main(smi=smiles_string, calculate_ad=True, make_prop_img=False, **{"Binary Sensitization": True})
results


  values[key] = [pred, float(pred_proba), AD_DICT_BOOL[ad], contrib_svg_str]


[['Binary Sensitization', 'Sensitizer', 0.8769790857195798, 'Inside', '']]

In [31]:
model = joblib.load("models/ss_DSA05_binary_morgan_r2_2048_svm_calibrated_with_threshold.joblib")  # Load the model
print(type(model))  # Check the type of the loaded model
model

<class 'dict'>


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


{'model': SVC(C=453.40231050514495, gamma=0.0784644856439669, probability=True),
 'threshold': 0.7803329871102571}

In [22]:
sel_features_test = np.load("models/selected_features_svm_binary.npy")  # Load the selected features for the model
sel_features_test

array([   1,    2,   80,  222,  227,  283,  294,  389,  529,  573,  622,
        650,  679,  691,  693,  694,  695,  759,  769,  807,  890,  926,
        929, 1019, 1057, 1088, 1095, 1114, 1138, 1143, 1155, 1163, 1200,
       1249, 1281, 1365, 1380, 1421, 1444, 1485, 1599, 1629, 1673, 1750,
       1783, 1866, 1873, 1914, 1917, 1999], dtype=int64)

In [80]:
training_load = joblib.load("models/DSA05_BINARY_SVM_x_train.pkl")  # Load the training data
type(training_load)
len(training_load)
training_load

array([[0., 1., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [1., 0., 1., ..., 0., 0., 0.]])

# Testing AD Functionality

In [81]:
from rdkit import Chem
from rdkit.Chem import AllChem
import numpy as np
import os
import pickle
from scipy.spatial.distance import pdist
from scipy.spatial.distance import cdist


def _get_AD_thresh(fps, file_name):
    dists = pdist(fps)
    mean_1 = dists.mean()
    dists_2 = dists[np.where(dists < mean_1)]
    mean_2 = dists_2.mean()
    std_2 = dists_2.std()

    threshold = mean_2 + (0.5 * std_2)

    pickle.dump((threshold, fps),  open(file_name, "wb"))

def read_sdf_and_save_ad_file(sdf_file, output_file):
    suppl = Chem.SDMolSupplier(sdf_file)
    molecules = [mol for mol in suppl if mol is not None]

    fps = np.array([list(AllChem.GetMorganFingerprintAsBitVect(mol, radius=2, nBits=2048, useFeatures=False))
                    for mol in molecules])

    _get_AD_thresh(fps, output_file)




In [85]:
fname1 = "../models/dataset_ss_DSA05_WES_GHS_BIN_Binary.sdf"
fname2 = "../models/dataset_ss_DSA05_WES_GHS_SUB MC.sdf"



In [86]:
# Overall FPs
MODEL_DIR = os.path.join(os.path.dirname("../dev_tox"), "models")  # Directory where models are stored
sdf_file = fname1  # Replace with your SDF file path
output_file = os.path.join(MODEL_DIR, 'binary_AD.pkl')  # Replace 'your_model_AD.pkl' with your desired output file name

read_sdf_and_save_ad_file(sdf_file, output_file)

In [87]:
# First TM FPs
MODEL_DIR = os.path.join(os.path.dirname("../dev_tox"), "models")  # Directory where models are stored
sdf_file = fname2  # Replace with your SDF file path
output_file = os.path.join(MODEL_DIR, 'multiclass_AD.pkl')  # Replace 'your_model_AD.pkl' with your desired output file name

read_sdf_and_save_ad_file(sdf_file, output_file)

In [88]:
import pickle
import os

# Assuming your function 'read_sdf_and_save_ad_file' has already been run and the output file has been created

# Path to the output file
MODEL_DIR = os.path.join(os.path.dirname("c:/Users/ricar/Documents/GitHub/dev-tox/dev_tox"), "models")
output_file = os.path.join(MODEL_DIR, 'binary_AD.pkl')

# Load the output file
with open(output_file, 'rb') as file:
    data = pickle.load(file)

# Check and print the type of data
print(f"Type of the data in the file: {type(data)}")

# If you want to see more details about the content (like length if it's a list or tuple)
if isinstance(data, (list, tuple)):
    print(f"Length of the data: {len(data)}")
    # To show the type of the first element in the tuple/list
    print(f"Type of the first element: {type(data[0])}")


Type of the data in the file: <class 'tuple'>
Length of the data: 2
Type of the first element: <class 'numpy.float64'>


In [89]:


def _get_AD_thresh(training_smiles, file_name):
    fps = np.array([list(AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), radius=2, nBits=2048, useFeatures=False))
                    for smi in training_smiles])

    dists = pdist(fps)
    mean_1 = dists.mean()
    dists_2 = dists[np.where(dists < mean_1)]
    mean_2 = dists_2.mean()
    std_2 = dists_2.std()

    threshold = mean_2 + (0.5 * std_2)

    import pickle
    pickle.dump((threshold, fps),  open(file_name, "wb"))


def calc_ad(query_fp, ad_tuple):
    dist = cdist(query_fp, ad_tuple[1], "euclidean")
    return dist < ad_tuple[0]


def run_prediction(model, smi, calculate_ad=True, ad_tup=None, threshold=0.5):
    """_summary_

    Args:
        model (_type_): _description_
        model_data (_type_): _description_
        smi (_type_): _description_
        calculate_ad (bool, optional): _description_. Defaults to True.

    Returns:
        _type_: _description_
    """
    fp = np.zeros((2048, 1))
    # sub in your FP function
    _fp = AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), radius=2, nBits=2048, useFeatures=False)
    DataStructs.ConvertToNumpyArray(_fp, fp)

    pred_proba = model.predict_proba(fp.reshape(1, -1))[:, 1]
    pred = 1 if pred_proba > threshold else 0

    if pred == 0:
        pred_proba = 1 - float(pred_proba)

    # used to get proba of the inactive class if deemed inactive
    # if pred == 0:
    #     pred_proba = 1-pred_proba

    if calculate_ad:
        ad = calc_ad(fp, ad_tup)
        return pred, pred_proba, ad

    return pred, float(pred_proba), ""


def get_prob_map(model, smi):
    def get_fp(mol, idx):
        fps = np.zeros((2048, 1))
        _fps = SimilarityMaps.GetMorganFingerprint(mol, idx, radius=3, nBits=2048)
        DataStructs.ConvertToNumpyArray(_fps, fps)
        return fps

    def get_proba(fps):
        return float(model.predict_proba(fps.reshape(1, -1))[:, 1])

    mol = Chem.MolFromSmiles(smi)
    fig, _ = SimilarityMaps.GetSimilarityMapForModel(mol, get_fp, get_proba)
    imgdata = io.StringIO()
    fig.savefig(imgdata, format='svg')
    imgdata.seek(0)  # rewind the data
    plt.savefig(imgdata, format="svg", bbox_inches="tight")

    return imgdata.getvalue()


def main(smi, calculate_ad=True, make_prop_img=False, **kwargs):
    values = {}

    for key, val in kwargs.items():
        if key in MODEL_DICT.keys():  # check if this kwarg is for a model
            if val:  # check if model is turned on
                model = MODEL_DICT[key][0]  # Get the model file path
                ad_tup = AD_DICT[key][0]
                # print(f"Loading model from: {model_file}")
                # model = joblib.load(model_file)  # load the model

                pred, pred_proba, ad = run_prediction(model, smi, calculate_ad=calculate_ad, ad_tup=ad_tup)

                contrib_svg_str = ""
                if make_prop_img:
                    contrib_svg_str = get_prob_map(model, smi)

                values[key] = [pred, float(pred_proba), AD_DICT[ad > AD_THRESH], contrib_svg_str]

    processed_results = []
    for key, val in values.items():
        processed_results.append([key, CLASSIFICATION_DICT[key][val[0]], val[1], val[2], val[3]])

    return processed_results


# def write_csv_file(smiles_list, calculate_ad=False):
#     headers = list(MODEL_DICT.keys())
#
#     if calculate_ad:
#         headers = headers + [_ + "_AD" for _ in headers]
#
#     string_file = StringIO()
#     writer = csv.DictWriter(string_file, fieldnames=['SMILES', *headers])
#     writer.writeheader()
#
#     for smiles in tqdm(smiles_list):
#         molecule = MolFromSmiles(smiles)
#
#         row = {'SMILES': smiles}
#
#         if molecule is None:
#             row['SMILES'] = f"(invalid){smiles}"
#             writer.writerow(row)
#             continue
#
#         data = main(smiles, calculate_ad=calculate_ad, **MODEL_DICT)
#
#         for model_name, pred, pred_proba, ad, _ in data:
#             try:
#                 pred_proba = float(pred_proba[:-1]) / 100  # covert back to 0-1 float
#                 row[
#                     model_name] = pred_proba if pred == 1 else 1 - pred_proba  # this is to make sure its proba for class 1
#             except ValueError:
#                 row[model_name] = "No prediction"  # if pred_proba is string skip
#             if calculate_ad:
#                 row[model_name + "_AD"] = ad
#
#         writer.writerow(row)
#
#     return string_file.getvalue()


# if __name__ == "__main__":
#     import argparse
#     import csv
#     from io import StringIO
#     from rdkit.Chem import MolFromSmiles
#     from tqdm import tqdm
#
#     parser = argparse.ArgumentParser()
#     parser.add_argument("--infile", type=str, required=True,
#                         help="location to csv of SMILES")
#     parser.add_argument("--outfile", type=str, default=os.path.join(os.getcwd(), "phakin_output.csv"),
#                         help="location and file name for output")
#     parser.add_argument("--smiles_col", type=str, default="SMILES",
#                         help="column name containing SMILES of interest"),
#     parser.add_argument("--ad", action="store_true",
#                         help="calculate the AD")


In [99]:
import pickle

# Load the AD data from the .pkl file
ad_file_path = "../models/binary_AD.pkl"  # Replace with your .pkl file path
with open(ad_file_path, 'rb') as f:
    ad_data = pickle.load(f)

# Process the query molecule
query_smiles = "C1=NC(=NC(=O)N1C2C(C(C(O2)CO)O)O)N"  # Example query molecule (Benzene)
query_mol = Chem.MolFromSmiles(query_smiles)
query_fp = np.array(AllChem.GetMorganFingerprintAsBitVect(query_mol, radius=2, nBits=2048, useFeatures=False))

# Convert query_fp to 2D array for cdist
query_fp_2d = query_fp[np.newaxis, :]  # Makes it 2-dimensional

# Check if the query molecule is within the AD
is_within_ad = calc_ad(query_fp_2d, ad_data)
print(f"Molecule is within the AD: {is_within_ad.any()}")


Molecule is within the AD: False


['c:\\Users\\ricar\\Documents\\GitHub\\dev-tox\\dev_tox', 'C:\\Program Files\\PerkinElmerInformatics\\ChemOffice2021\\ChemScript\\Lib', 'c:\\Users\\ricar\\anaconda3\\envs\\rdkit_env\\python311.zip', 'c:\\Users\\ricar\\anaconda3\\envs\\rdkit_env\\DLLs', 'c:\\Users\\ricar\\anaconda3\\envs\\rdkit_env\\Lib', 'c:\\Users\\ricar\\anaconda3\\envs\\rdkit_env', '', 'C:\\Users\\ricar\\AppData\\Roaming\\Python\\Python311\\site-packages', 'C:\\Users\\ricar\\AppData\\Roaming\\Python\\Python311\\site-packages\\win32', 'C:\\Users\\ricar\\AppData\\Roaming\\Python\\Python311\\site-packages\\win32\\lib', 'C:\\Users\\ricar\\AppData\\Roaming\\Python\\Python311\\site-packages\\Pythonwin', 'c:\\Users\\ricar\\anaconda3\\envs\\rdkit_env\\Lib\\site-packages']
