In [1]:
import deepchem as dc
import rdkit
from rdkit import Chem
import numpy as np
import scipy.sparse as sp
import pickle
import numpy as np
import pandas as pd

from constants import *

2024-08-05 17:37:21.080794: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Skipped loading some Jax models, missing a dependency. No module named 'jax'


In [2]:
global suffix

In [3]:
def get_csv(path, smipos):
    df = pd.read_pickle(path)
    df = df.rename(columns={smipos: "SMILES_NS"})
    return df


def preprocess_graph(
    data,
):  # The function is to preprocessed the adjacency matrix, returning the normalized adjacency matrix in the form of numpy array for feeding into the model
    adj_ = data + sp.eye(data.shape[0])
    rowsum = np.array(adj_.sum(1))
    degree_mat_inv_sqrt = np.diag(np.power(rowsum, -0.5).flatten())
    adj_normalized = adj_.dot(degree_mat_inv_sqrt).transpose().dot(degree_mat_inv_sqrt)
    return np.array(adj_normalized)


def smiles_get_features(
    a,
):  # This function will return the smiles code into list of feature for each atoms
    if isinstance(a, float):
        return np.nan
    m = rdkit.Chem.MolFromSmiles(a)
    featurizer = dc.feat.ConvMolFeaturizer()
    features = featurizer.featurize([m])[0]
    if isinstance(features, np.ndarray):
        return np.nan
    atom_features = features.get_atom_features()  # initial atom feature vectors
    if atom_features.shape[0] > 60:
        return np.nan
    return atom_features

In [4]:
def smiles_get_adj(a):  # This function retrieve the adjacency matrix from the molecule
    if isinstance(a, float):
        return np.nan
    m = rdkit.Chem.MolFromSmiles(a)
    featurizer = dc.feat.ConvMolFeaturizer()
    features = featurizer.featurize([m])[0]
    if isinstance(features, np.ndarray):
        return np.nan
    adj_list = features.get_adjacency_list()  # adjacency list (neighbor list)
    adj = np.zeros(
        (len(adj_list), len(adj_list))
    )  # convert adjacency list into adjacency matrix "A"
    if len(adj_list) > 60:
        return np.nan
    for i in range(len(adj_list)):
        for j in adj_list[i]:
            adj[i][j] = 1
    return adj


def smiles_get_edge(a):  # This function retrieve the adjacency matrix from the molecule
    if isinstance(a, float):
        return np.nan
    m = rdkit.Chem.MolFromSmiles(a)
    featurizer = dc.feat.ConvMolFeaturizer()
    features = featurizer.featurize([m])[0]
    if isinstance(features, np.ndarray):
        return np.nan
    adj_list = features.get_adjacency_list()  # adjacency list (neighbor list)
    node1 = []
    node2 = []
    for i in range(len(adj_list)):
        for j in adj_list[i]:
            node1.append(i)
            node2.append(j)
    return np.stack((np.array(node1), np.array(node2)))


def sim_graph(smile):
    if isinstance(smile, float):
        return np.nan
    mol = rdkit.Chem.MolFromSmiles(smile)
    if mol is None:
        return np.nan
    Chem.Kekulize(mol)
    atoms = [atom.GetAtomicNum() for atom in mol.GetAtoms()]
    am = Chem.GetAdjacencyMatrix(mol, useBO=True)
    if len(atoms) > 60:
        return np.nan
    for i, atom in enumerate(atoms):
        am[i, i] = atom
    return am

In [5]:
def get_max_dim(
    d,
):  # This funcion is used to find the maximum dimension the set of data contain
    maxdim = 0
    for i in d:
        if i.shape[0] > maxdim:
            maxdim = i.shape[0]
    return maxdim


def pad_up_to(
    t, max_in_dims, constant_values=0
):  # This function is used to pad the data up to a given dimension
    s = t.shape
    size = np.subtract(max_in_dims, s)
    return np.pad(
        t, ((0, size[0]), (0, size[1])), "constant", constant_values=constant_values
    )


def get_np_adj_label(
    df,
    path,
    smilepos,
    labelpos,
    fingerpos,
    fingername,
    pad_dim=None,
    save=True,
    Finger=False,
):
    smi = df[smilepos]
    prelabel = df[labelpos]
    if Finger:
        fing = df.iloc[:, fingerpos:]
    pre_adj = smi.apply(smiles_get_adj)
    edge = smi.apply(smiles_get_edge).rename("edge")
    if Finger:
        adj_label = pd.concat([pre_adj, prelabel, edge, fing], axis=1, sort=False)
    else:
        adj_label = pd.concat([pre_adj, edge, prelabel], axis=1, sort=False)
    adj_label = adj_label[adj_label[smilepos].notna()]
    Trueedge = list(adj_label["edge"].values)
    Truelabel = adj_label[labelpos]
    if Finger:
        Fingerprint = adj_label.iloc[:, fingerpos:].values
    if save:
        if Finger:
            np.save(
                path + "/" + fingername + "_fingerprint", Fingerprint, fix_imports=False
            )
        np.save(path + f"/label{suffix}", Truelabel, fix_imports=False)
        with open(path + f"/edge{suffix}", "wb") as f:
            pickle.dump(Trueedge, f)
    pre_adj = pre_adj.dropna()
    max_dim = 60
    True_adj = pre_adj.apply(preprocess_graph)
    True_array_ADJ = np.stack(
        True_adj.apply(pad_up_to, args=((max_dim, max_dim),)).values
    )
    if save:
        np.save(path + f"/adj{suffix}", True_array_ADJ, fix_imports=False)
    if Finger:
        return [True_array_ADJ, max_dim, Truelabel, Fingerprint]
    else:
        return [True_array_ADJ, Trueedge, max_dim, Truelabel]

In [6]:
def get_feature(df, max_dim, smilepos, path, save=True):
    smi = df[smilepos]
    pre_feature = smi.apply(smiles_get_features).dropna()
    True_feature = pre_feature.apply(pad_up_to, args=((max_dim, 75),))
    True_array_feature = np.stack(True_feature.values)
    if save:
        np.save(path + f"/feature{suffix}", True_array_feature, fix_imports=False)
    return True_array_feature


def get_graph(df, max_dim, smilepos, path, save=True):
    smi = df[smilepos]
    pre_graph = smi.apply(sim_graph).dropna()
    true_graph = pre_graph.apply(pad_up_to, args=((max_dim, max_dim),))
    true_array_graph = np.stack(true_graph.values)
    print("1")
    if save:
        np.save(path + f"/graph{suffix}", true_array_graph, fix_imports=False)
    return true_array_graph


def get_smiles(df, max_dim, smilepos, path, save=True):
    smi = df[smilepos]
    if save:
        np.save(path + f"/smiles{suffix}", smi, fix_imports=False)
    return smi


def get_id(df, max_dim, idpos, path, save=True):
    smi = df[idpos]
    if save:
        np.save(path + f"/id{suffix}", smi, fix_imports=False)
    return smi


def get_smile_dict(df, max_dim, smilepos, path, save=True):
    smi = df[smilepos]
    if save:
        np.save(path + f"/smiles{suffix}", smi, fix_imports=False)
    return smi


class GraphGenerator:
    def __init__(self, datapath, nppath, save=True):
        self.df = get_csv(datapath, "molecule_structures_col")
        self.save = save
        self.smilepos = "SMILES_NS"
        self.fingername = "MK"
        self.suffix = "row"
        self.labelpos = ["synergy_loewe"]
        self.fingerpos = 2
        self.adj, self.edge, self.max_dim, self.label = get_np_adj_label(
            self.df,
            nppath,
            self.smilepos,
            self.labelpos,
            self.fingerpos,
            self.fingername,
            save=self.save,
            Finger=False,
        )

        self.feature = get_feature(
            self.df, self.max_dim, self.smilepos, nppath, save=self.save
        )
        self.graph = get_graph(
            self.df, self.max_dim, self.smilepos, nppath, save=self.save
        )
        self.smiles = get_smiles(
            self.df, self.max_dim, self.smilepos, nppath, save=self.save
        )
        self.id = get_id(self.df, self.max_dim, self.idpos, nppath, save=self.save)
        self.smiles = get_smile_dict(
            self.df, self.max_dim, self.smilepos, nppath, save=self.save
        )

In [8]:
suffix = "_col"
x = GraphGenerator(f"{DATA_PATH}/data_drugcomb.pkl", f"{DATA_PATH}/nps/", save=True)

  return pd.np.nan
