In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/atomic-new/model_300dim.pkl
/kaggle/input/atomic-new/large_data.csv
/kaggle/input/atomic-new/small_data.csv


In [3]:
!pip install rdkit mol2vec gensim -q




In [13]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import pandas as pd
import torch
import torch.nn as nn

import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import RDKFingerprint
from mol2vec.features import mol2alt_sentence, MolSentence, DfVec, sentences2vec
from gensim.models import word2vec
from sklearn.linear_model import LogisticRegression

In [14]:
class MolDataset(Dataset):
    """
    Dataset for molecular data
    """
    def __init__(self, X, y, X_func=lambda a: a.to_numpy(dtype=np.int32), X_get_func=lambda a: a):
        """
        Initiates a dataset.
        
        :param X: all training features
        :param y: all ground truth labels
        :param X_func: a function that converts X into the form it should be stored as
        :param X_get_func: a function that processes an element in X to the form that should be fed to a model as input
        """
        self.X = X_func(X)
        self.y = y.to_numpy(dtype=np.int32)
        self.f = X_get_func
    
    def __getitem__(self, index):
        return self.f(self.X[index]), self.y[index]
    
    def __len__(self):
        return self.y.shape[0]

    
def torch_accuracy(loader, model, device, conv=False):
    """
    Evaluates the accuracy of a pytorch model
    
    :param loader: a DataLoader instance
    :param model: a torch.nn.Module
    :param device: 'cuda' or 'cpu'
    :param conv: whether this model is a convolution model
    :return: the accuracy of the input model evaluated on the dataset in the dataloader
    """
    size = len(loader.dataset)
    correct = 0
    model.eval()
    with torch.no_grad():
        for X, y in loader:
            if conv:
                X = X.unsqueeze(1)
            X, y = X.to(device).float(), y.to(device).long()
            pred = model(X)
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    
    return correct / size


def train(dataloader, model, loss_fn, optimizer, device, print_loss=True, conv=False):
    """
    Trains a pytorch model on a dataset.
    
    :param dataloader: a DataLoader instance with the data to be trained on
    :param model: a torch.nn.Module
    :param loss_fn: the loss function
    :param optimizer: the optimizer used for training
    :param device: 'cuda' or 'cpu'
    :param print_loss: whether to print the loss value during training
    :param conv: whether this model is a convolution model
    :return: a list of loss values calcualted during training
    """
    losses = []
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        if conv:
            X = X.unsqueeze(1)
        X, y = X.to(device).float(), y.to(device).long()
        pred = model(X)
        loss = loss_fn(pred, y)
        losses.append(loss.item())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if print_loss and batch % 100 == 0:
            loss = loss.item()
            print(f'loss: {loss}')
    return losses


def init_weights(m):
    """
    He initialization for linear layers in a model
    
    :param m: a layer in a neural network
    """
    if isinstance(m, nn.Linear):
        nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
        m.bias.data.fill_(0.01)

         
            
class MyModel:
    """
    Wrapper type for all neural network models used in this project.
    Specific models should subclass this class and specify parameters.
    """
    def __init__(self, model, X, y, batch_size, partition, learning_rate, reg_strength=0, seed=None, X_func=lambda a: a.to_numpy(dtype=np.int32), X_get_func=lambda a: a, conv=False):
        """
        Builds a neural network model.
        
        :param model: the model to be trained; an instance of a torch.nn.Module
        :param X: all input features (train and test sets)
        :param y: all ground truth labels (train and test sets)
        :param batch_size: mini batch size
        :param partition: ratio of the input dataset to use as the test set
        :param learning_rate: learning rate
        :param reg_strength: strength of L2 regularization
        :param seed: for controlling how the dataset is split into train and test sets
        :param X_func: a function that converts X into the form it should be stored as
        :param X_get_func: a function that processes an element in X to the form that should be fed to a model as input
        :param conv: whether this model is a convolution model
        """        
        
        # split into train and test sets
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=partition, random_state=seed)
        
        # initialize datasets and dataloaders
        self.train_dset = MolDataset(self.X_train, self.y_train, X_func, X_get_func)
        self.test_dset = MolDataset(self.X_test, self.y_test, X_func, X_get_func)
        self.train_loader = DataLoader(self.train_dset, batch_size=batch_size, shuffle=True)
        self.test_loader = DataLoader(self.test_dset, batch_size=batch_size, shuffle=True)
        
        # initialize model
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = model.to(self.device)
        self.model.apply(init_weights)
        
        # loss function and optimizer
        self.loss = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate, weight_decay=reg_strength)
        
        self.conv = conv
        
    def train(self, epochs, print_loss=True):
        """
        Trains the model
        
        :param epochs: how many epochs to train for
        :param print_loss: whether to print the loss values during training
        :return: list of loss values, train accuracies, and test accuracies as training progressed
        """
        losses = []
        train_accuracies = []
        test_accuracies = []
        for t in range(epochs):
            if print_loss:
                print(f'Epoch {t + 1}\n--------------------')
            losses += train(self.train_loader, self.model, self.loss, self.optimizer, self.device, print_loss, self.conv)
            accuracies = self.evaluate()
            train_accuracies.append(accuracies[0])
            test_accuracies.append(accuracies[1])
        return losses, train_accuracies, test_accuracies
   
    def evaluate(self):
        """
        Evaluates the accuracy of the model
        
        :return: the current train and test accuracies
        """
        train_accuracy = torch_accuracy(self.train_loader, self.model, self.device, self.conv)
        test_accuracy = torch_accuracy(self.test_loader, self.model, self.device, self.conv)
        return train_accuracy, test_accuracy

    
def plot(data, xlabel, ylabel):
    """
    Plots a series of data
    
    :param data: a series of values to plot
    :param xlabel: label of x axis
    :param ylabel: label of y axis
    """
    plt.plot(list(range(len(data))), data)
    plt.ylabel(ylabel)
    plt.xlabel(xlabel)
    plt.show()
    
    
def evaluate_model(model, epochs, print_loss=True, plot_metrics=False):
    """
    Trains a neural network and evaluates it accuracy
    
    :param model: the model to evaluate; an instance of MyModel
    :param epochs: how many epochs to train for
    :param print_loss: whether to print the loss values during training
    :param plot_metrics: whether to plot the loss and accuracy values
    :return: train and test accuracies of the model
    """
    losses, train_accuracies, test_accuracies = model.train(epochs, print_loss)
    train_accuracy, test_accuracy = model.evaluate()
    
    if plot_metrics:
        plot(losses, '# of iterations', 'loss')
        plot(train_accuracies, 'epoch', 'train accuracy')
        plot(test_accuracies, 'epoch', 'test accuracy')
        
    return train_accuracy, test_accuracy

In [15]:
def fingerprint(smiles):
    """
    Generates molecular fingerprint for a molecule.
    
    :param smiles: SMILES string of a molecule
    :return: the corresponding 2048-bit vector fingerprint, as a np array
    """
    mol = Chem.MolFromSmiles(smiles)
    return np.array(RDKFingerprint(mol))


def generate_word_embedding(data_file, out_file, model_file='/kaggle/input/atomic-new/model_300dim.pkl'):
    """
    Uses a pre-trained model to generate word embeddings for a list of molecules.
    Prepends the embeddings as columns to the input data file 
    and writes an output csv file.
    
    :param data_file: path to a csv data file containing a 'SMILES' column of SMILES strings
    :param out_file: path to the output csv file
    :param model_file: path to a pre-trained model
    :return: data from the new csv file written to disk, as a pandas DataFrame
    """
    data = pd.read_csv(data_file)
    mol = [Chem.MolFromSmiles(i) for i in data['text']]
    sentence = [MolSentence(mol2alt_sentence(i, radius=1)) for i in mol]
    w2v_model = word2vec.Word2Vec.load(model_file)
    embedding = [DfVec(x) for x in sentences2vec(sentence, w2v_model)]
    data_mol2vec = np.array([x.vec for x in embedding])
    data_mol2vec = pd.DataFrame(data_mol2vec)
    new_data = pd.concat([data_mol2vec, data], axis=1)
    new_data.to_csv(out_file, index=False)
    return new_data


SMILES_CHARS = [' ',
                '#', '%', '(', ')', '+', '-', '.', '/',
                '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                '=', '@',
                'A', 'B', 'C', 'F', 'H', 'I', 'K', 'L', 'M', 'N', 'O', 'P',
                'R', 'S', 'T', 'V', 'X', 'Z',
                '[', '\\', ']',
                'a', 'b', 'c', 'e', 'g', 'i', 'l', 'n', 'o', 'p', 'r', 's',
                't', 'u']
smi2index = dict( (c,i) for i,c in enumerate( SMILES_CHARS ) )
index2smi = dict( (i,c) for i,c in enumerate( SMILES_CHARS ) )


def smiles_encoder(smiles, maxlen=500):
    """
    Generates one-hot encodings of a SMILES string.
    Each column represents a character.
    Pads with columns of zeros to some maximum length.
    
    :param smiles: SMILES string of a molecule
    :param maxlen: maximum legnth allowed for SMILES strings; shorter strings are padded to this length
    :return: the one-hot encoding of the SMILES string as a np array
    """
    X = np.zeros((len(SMILES_CHARS), maxlen))
    for i, c in enumerate(smiles):
        X[smi2index[c], i] = 1
    return X


def smiles_decoder(X):
    """
    Converts a one-hot encoding of a SMILES string back to the original 1D string.
    
    :param X: one-hot encoding of a molecule
    :return: corresponding SMILES string
    """
    smi = ''
    X = X.argmax(axis=0)
    for i in X:
        smi += index2smi[i]
    return smi.strip()


def accuracy(truth, predicted):
    """
    Calculates the accuracy of the predictions of a classifier
    
    :param truth: list of ground truth labels
    :param predicted: list of predicted labels
    :returns: accuracy of the prediction
    """
    return np.sum(predicted == truth) / len(predicted)


def test_log_reg(X, y, partition):
    """
    Runs logistic regression on a dataset and evaluates its accuracy.
    
    :param X: all input features (train and test sets)
    :param y: all ground truth labels (train and test sets)
    :param partition: percentage of the input dataset to be used as the test set
    :return: train and test accuracies of logsitic regression
    """
    clf = LogisticRegression(max_iter=2000)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=partition)
    clf.fit(X_train, y_train)
    train_preds = clf.predict(X_train)
    test_preds= clf.predict(X_test)
    return accuracy(y_train, train_preds), accuracy(y_test, test_preds)


def test_model(model, data, partition, runs, epochs, print_loss=True, plot=False, X=None, y=None):
    """
    Tests a classifier model on a dataset and evaluates its accuracy.
    Prints out the train and test accuracies.
    
    :param model: a subclass of torch_utils.MyModel; this object will be called to construct a DL model instance; or 'logreg' for logistic regression
    :param data: the entire dataset (train and test); not used if model is 'logreg'
    :param partition: percentage of the input dataset to be used as the test set
    :param runs: how many times the model is evaluated; the reported accuracies are averaged across all runs
    :param epochs: number of training epochs; not used if model is 'logreg'
    :param print_loss: whether to print loss values during training; not used if model is 'logreg'
    :param plot: whether to plot loss and accuracy values after each run; not used if model is 'logreg'
    :param X: all input features (train and test sets); only used if model is 'logreg'
    :param y: all ground truth labels (train and test sets); only used if model if 'logreg'
    :return: train and test accuracies of the model averaged across all runs
    """
    train_accuracies, test_accuracies = [], []
    
    for _ in range(runs):
        if model == 'logreg':
            train_accuracy, test_accuracy = test_log_reg(X, y, partition)
        else:
            
            m = model(data, partition)
            m = m.to(device)
            train_accuracy, test_accuracy = evaluate_model(m, epochs, print_loss, plot_metrics=plot)
        
        train_accuracies.append(train_accuracy)
        test_accuracies.append(test_accuracy)
        
    train_mean = np.mean(train_accuracies)
    test_mean = np.mean(test_accuracies)
    print(f'Train accuracy mean: {train_mean}')
    print(f'Test accuracy mean: {test_mean}')
        
    return train_mean, test_mean

In [None]:
class DNNModel(nn.Module):
    def __init__(self):
        super(DNNModel, self).__init__()
        self.layers = nn.Sequential(nn.Linear(2048, 500),
                                    nn.BatchNorm1d(500),
                                    nn.ReLU(),
                                    nn.Linear(500, 200),
                                    nn.BatchNorm1d(200),
                                    nn.ReLU(),
                                    nn.Linear(200, 3))
    
    def forward(self, x):
        logits = self.layers(x)
        return logits

    
class FingerprintDNNModel(MyModel):
    def __init__(self, data, partition):
        super(FingerprintDNNModel, self).__init__(DNNModel(), data['text'], data['target'], 128, 
                                          partition, 0.0001, X_func=lambda a: a.tolist(), 
                                          X_get_func=lambda a: fingerprint(a))

        
class ConvModel(nn.Module):
    def __init__(self):
        super(ConvModel, self).__init__()
        self.layers = nn.Sequential(nn.Conv1d(1, 10, 7),
                                    nn.ReLU(),
                                    nn.MaxPool1d(10),
                                    nn.Conv1d(10, 30, 7),
                                    nn.ReLU(),
                                    nn.MaxPool1d(10),
                                    nn.Flatten(),
                                    nn.Linear(570, 50),
                                    nn.ReLU(),
                                    nn.Linear(50, 20),
                                    nn.ReLU(),
                                    nn.Linear(20, 3))
    
    def forward(self, x):
        logits = self.layers(x)
        return logits

    
class FingerprintConvModel(MyModel):
    def __init__(self, data, partition):
        super(FingerprintConvModel, self).__init__(ConvModel(), data['text'], data['target'], 128, 
                                          partition, 0.001, X_func=lambda a: a.tolist(), 
                                          X_get_func=lambda a: fingerprint(a), conv=True)

        
if __name__ == '__main__':
    large_data = pd.read_csv('/kaggle/input/atomic-new/large_data.csv')
    small_data = pd.read_csv('/kaggle/input/atomic-new/small_data.csv')
    
    print('Testing logistic regression on large dataset')
    test_model('logreg', None, 0.06, 1, 0, X=np.array([fingerprint(i) for i in large_data['text']]), y=large_data['target'])
    print('Testing logistic regression on small dataset')
    test_model('logreg', None, 0.2, 50, 0, X=np.array([fingerprint(i) for i in small_data['text']]), y=small_data['target'])
    
    
    print('Testing DNN model on large dataset')
    test_model(FingerprintDNNModel, large_data, 0.06, 1, 10, print_loss=True, plot=False)
   
    print('Testing DNN model on small dataset')
    test_model(FingerprintDNNModel, small_data, 0.2 , 50, 10, print_loss=True, plot=False)
    
    print('Testing 1D CNN model on large dataset')
    test_model(FingerprintConvModel, large_data, 0.06, 1, 10, print_loss=True, plot=False)
   
    print('Testing 1D CNN model on small dataset')
    test_model(FingerprintConvModel, small_data, 0.2 , 50, 10, print_loss=True, plot=False)


Testing logistic regression on large dataset


[10:47:31] Conflicting single bond directions around double bond at index 55.
[10:47:31]   BondStereo set to STEREONONE and single bond directions set to NONE.
[10:49:26] Conflicting single bond directions around double bond at index 7.
[10:49:26]   BondStereo set to STEREONONE and single bond directions set to NONE.


Train accuracy mean: 0.9496695326665496
Test accuracy mean: 0.9111314704535044
Testing logistic regression on small dataset


[10:51:46] Conflicting single bond directions around double bond at index 55.
[10:51:46]   BondStereo set to STEREONONE and single bond directions set to NONE.
