In [11]:
import os
import pandas as pd
import sys
sys.path.append('../')
from src.utils.util_funcs import load_dataset


In [12]:
load_dataset('ecoli')

<DiskDataset X.shape: (2335,), y.shape: (2335, 1), w.shape: (2335, 1), task_names: ['ecoli']>

In [73]:
import random
import numpy as np

from torch.utils.data.sampler import Sampler


class MultilabelBalancedRandomSampler(Sampler):
    """
    MultilabelBalancedRandomSampler: Given a multilabel dataset of length n_samples and
    number of classes n_classes, samples from the data with equal probability per class
    effectively oversampling minority classes and undersampling majority classes at the
    same time. Note that using this sampler does not guarantee that the distribution of
    classes in the output samples will be uniform, since the dataset is multilabel and
    sampling is based on a single class. This does however guarantee that all classes
    will have at least batch_size / n_classes samples as batch_size approaches infinity
    """

    def __init__(self, labels, indices=None, class_choice="least_sampled"):
        """
        Parameters:
        -----------
            labels: a multi-hot encoding numpy array of shape (n_samples, n_classes)
            indices: an arbitrary-length 1-dimensional numpy array representing a list
            of indices to sample only from
            class_choice: a string indicating how class will be selected for every
            sample:
                "least_sampled": class with the least number of sampled labels so far
                "random": class is chosen uniformly at random
                "cycle": the sampler cycles through the classes sequentially
        """
        self.labels = labels
        self.indices = indices
        if self.indices is None:
            self.indices = range(len(labels))

        self.num_classes = self.labels.shape[1]

        # List of lists of example indices per class
        self.class_indices = []
        for class_ in range(self.num_classes):
            lst = np.where(self.labels[:, class_] == 1)[0]
            lst = lst[np.isin(lst, self.indices)]
            self.class_indices.append(lst)

        self.counts = [0] * self.num_classes

        assert class_choice in ["least_sampled", "random", "cycle"]
        self.class_choice = class_choice
        self.current_class = 0

    def __iter__(self):
        self.count = 0
        return self

    def __next__(self):
        if self.count >= len(self.indices):
            raise StopIteration
        self.count += 1
        return self.sample()

    def sample(self):
        class_ = self.get_class()
        class_indices = self.class_indices[class_]
        chosen_index = np.random.choice(class_indices)
        if self.class_choice == "least_sampled":
            for class_, indicator in enumerate(self.labels[chosen_index]):
                if indicator == 1:
                    self.counts[class_] += 1
        return chosen_index

    def get_class(self):
        if self.class_choice == "random":
            class_ = random.randint(0, self.labels.shape[1] - 1)
        elif self.class_choice == "cycle":
            class_ = self.current_class
            self.current_class = (self.current_class + 1) % self.labels.shape[1]
        elif self.class_choice == "least_sampled":
            min_count = self.counts[0]
            min_classes = [0]
            for class_ in range(1, self.num_classes):
                if self.counts[class_] < min_count:
                    min_count = self.counts[class_]
                    min_classes = [class_]
                if self.counts[class_] == min_count:
                    min_classes.append(class_)
            class_ = np.random.choice(min_classes)
        return class_

    def __len__(self):
        return len(self.indices)

In [74]:
import deepchem.molnet as dcm

In [75]:
data = dcm.load_bbbp(featurizer='raw',splitter=None)[1][0]

In [76]:
sampler = MultilabelBalancedRandomSampler(data.y,class_choice="random")

In [77]:
from typing import List, Tuple
import numpy as np
from rdkit import Chem
from rdkit.Chem.rdmolfiles import MolFromSmiles
import pandas as pd
from deepchem.data import DiskDataset
from rdkit import RDLogger
import tokenizers

from typing import Callable, Dict, List
from rdkit import RDLogger
import numpy as np
from deepchem.data.datasets import DiskDataset
from tokenizers import Tokenizer
from torch.utils.data import Dataset

lg = RDLogger.logger()
lg.setLevel(RDLogger.CRITICAL)

def smiles2index(s_1: str, tokenizer: tokenizers.Tokenizer) -> List[int]:
    """Tokenize a SMILES string

    Args:
        s_1 (str): SMILES string
        tokenizer (tokenizers.Tokenizer): Pretrained tokenizer

    Returns:
        List[int]: List of tokens
    """
    return tokenizer.encode(s_1).ids


def index2multi_hot_fg(molecule: Chem.rdchem.Mol, fgroups_list: List[str]) -> np.ndarray:
    """Generate functional group representation

    Args:
        molecule (Chem.rdchem.Mol): Rdkit molecule from SMILES string
        fgroups_list (List[str]): List of SMARTS strings for functional groups

    Returns:
        List[int]: One hot encoding of functional groups
    """
    v_1 = np.zeros(len(fgroups_list))
    for idx, f_g in enumerate(fgroups_list):
        if molecule.HasSubstructMatch(f_g):
            v_1[idx] = 1
    return v_1


def smiles2vector_fgr(
    s_1: str, tokenizer: tokenizers.Tokenizer, fgroups_list: List[str]
) -> Tuple[np.ndarray, np.ndarray]:
    """Generate Functional Groups (FG) and Mined Functional Groups (MFG)

    Args:
        s_1 (str): SMILES string
        tokenizer (tokenizers.Tokenizer): Pretrained tokenizer
        fgroups_list (List[str]): List of SMARTS strings for functional groups

    Returns:
        Tuple[List[int],List[int]]: FG and MFG
    """
    i_1 = smiles2index(s_1, tokenizer)
    mfg = np.zeros(tokenizer.get_vocab_size())
    mfg[i_1] = 1
    molecule = MolFromSmiles(s_1)
    f_g = index2multi_hot_fg(molecule, fgroups_list)
    return f_g, mfg


In [78]:
class FGRDataset(Dataset):
    """Pytorch dataset for training and testing different models"""

    def __init__(
        self,
        data: DiskDataset,
        fgroups_list: List[str],
        tokenizer: Tokenizer,
        descriptor_funcs: Dict[str, Callable],
    ) -> None:
        """Initiliaze dataset with arguments

        Args:
            data (DiskDataset): Deepchem dataset containing SMILES and labels
            fgroups_list (List[str]): List of functional groups
            tokenizer (Tokenizer): Pretrained tokenizer
            descriptor_funcs (Dict[str, Callable]): RDKit descriptor dictionary
        """
        self.mols = data.X
        self.labels = data.y
        self.smiles = data.ids
        self.fgroups_list = fgroups_list
        self.tokenizer = tokenizer
        self.descriptor_funcs = descriptor_funcs

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        mol = self.mols[idx]
        smile = self.smiles[idx]
        target = self.labels[idx]
        f_g, mfg = smiles2vector_fgr(smile, self.tokenizer, self.fgroups_list)
        num_features = np.asarray(
            [self.descriptor_funcs[key](mol) for key in self.descriptor_funcs.keys()]
        )
        return f_g, mfg, num_features, target

In [79]:
from typing import Callable, Dict, List
from rdkit.Chem.rdmolfiles import MolFromSmarts
from rdkit.Chem import Descriptors
import numpy as np
import pandas as pd
from deepchem.splits.splitters import RandomStratifiedSplitter, ScaffoldSplitter
import deepchem.molnet as dcm
from torch.utils.data import DataLoader, Subset
from pytorch_lightning import LightningDataModule
from tokenizers import Tokenizer
from tokenizers.models import BPE


descriptor_funcs = {name: func for name, func in Descriptors.descList}
fgroups = pd.read_csv("../datasets/processed/fg.csv")["SMARTS"].tolist()
fgroups_list = [MolFromSmarts(x) for x in fgroups]
tokenizer = Tokenizer(BPE(unk_token="[UNK]")).from_file("tokenizer_bpe.json")

In [80]:
dataset = FGRDataset(data, fgroups_list,tokenizer, descriptor_funcs)

In [91]:
lo = DataLoader(dataset, batch_size=64)

In [92]:
np.unique(next(iter(lo))[-1],return_counts=True)

(array([0., 1.]), array([ 6, 58]))

In [34]:
np.unique(data.y,return_counts=True)

(array([0., 1.]), array([ 479, 1560]))

In [88]:
from torch.utils.data.sampler import WeightedRandomSampler
sampler = WeightedRandomSampler(data.w, 64)

In [1]:
import dask.dataframe as dd

Reading SMILES


In [3]:
train.compute()

0                          CC(=O)OC(CC(=O)[O-])C[N+](C)(C)C
1                             CC(=O)OC(CC(=O)O)C[N+](C)(C)C
2                                  C1=CC(C(C(=C1)C(=O)O)O)O
3                                                   CC(CN)O
4                                      C(C(=O)COP(=O)(O)O)N
                                ...                        
441734    C[C@@H]1[C@H]([C@@]2([C@@H](C2(C)C)[C@H]3[C@]1...
441735    C[C@@H]1[C@H]([C@@]2([C@@H](C2(C)C)[C@H]3[C@]1...
441736    CC[C@H](C)C(=O)O[C@@]12[C@@H](C1(C)C)[C@@H]3C=...
441737    CC[C@H](C)C(=O)O[C@@]12[C@@H](C1(C)C)[C@@H]3C=...
441738    CC[C@H](C)C(=O)O[C@@]12[C@@H](C1(C)C)[C@@H]3C=...
Name: SMILES, Length: 100903545, dtype: object