In [1]:
import time
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import OrderedDict

In [2]:
device = torch.device('cpu' if torch.cuda.is_available() else 'cuda')

In [27]:
df = pd.read_csv("../../data/HIV.csv")
df = df[:2000]
print(f"size: {df.shape[0]}")
df.head()

size: 2000


Unnamed: 0,smiles,activity,HIV_active
0,CCC1=[O+][Cu-3]2([O+]=C(CC)C1)[O+]=C(CC)CC(CC)...,CI,0
1,C(=Cc1ccccc1)C1=[O+][Cu-3]2([O+]=C(C=Cc3ccccc3...,CI,0
2,CC(=O)N1c2ccccc2Sc2c1ccc1ccccc21,CI,0
3,Nc1ccc(C=Cc2ccc(N)cc2S(=O)(=O)O)c(S(=O)(=O)O)c1,CI,0
4,O=S(=O)(O)CCS(=O)(=O)O,CI,0


In [28]:
class CustomDataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.label[idx]

In [29]:
if __name__ == "__main__":
    dataset = CustomDataset(df.smiles, df.HIV_active)

In [30]:
from torch.utils.data import random_split

[0.8 *len(dataset)], [0.1 *len(dataset)], [0.1 *len(dataset)]

([1600.0], [200.0], [200.0])

In [31]:
train_dataset, test_dataset, val_dataset = random_split(dataset, [int(0.8*len(dataset)), int(0.1*len(dataset)), int(0.1*len(dataset))]) # 80, 10, 10
len(train_dataset + test_dataset + val_dataset)

2000

In [32]:
train_dataset, test_dataset, val_dataset = random_split(dataset, [1600, 200, 200]) # 80, 10, 10
len(train_dataset + test_dataset + val_dataset)

2000

In [15]:
for smiles, labels in train_dataset:
    print("Input ID:\n " ,smiles)
    print("Label:\n" ,labels)
    break

Input ID:
  N#CC(=Cc1ccccc1)c1ccc(Cl)cc1
Label:
 0


In [16]:
## https://github.com/topazape/LSTM_Chem/blob/master/lstm_chem/utils/smiles_tokenizer2.py

class SmilesTokenizer(object):
    def __init__(self):
        atoms = [
            'Al', 'As', 'B', 'Br', 'C', 'Cl', 'F', 'H', 'I', 'K', 'Li', 'N',
            'Na', 'O', 'P', 'S', 'Se', 'Si', 'Te'
        ]
        special = [
            '(', ')', '[', ']', '=', '#', '%', '0', '1', '2', '3', '4', '5',
            '6', '7', '8', '9', '+', '-', 'se', 'te', 'c', 'n', 'o', 's'
        ]

        self.table = sorted(atoms, key=len, reverse=True) + special 

        self.table_2_chars = list(filter(lambda x: len(x) == 2, self.table))
        self.table_1_chars = list(filter(lambda x: len(x) == 1, self.table))
        self.vocab_dict = {}

    def tokenize(self, smiles):
        smiles = smiles + ' '
        N = len(smiles)
        token = []
        i = 0
        while (i < N):
            c1 = smiles[i]
            c2 = smiles[i:i + 2]

            if c2 in self.table_2_chars:
                token.append(c2)
                i += 2
                continue

            if c1 in self.table_1_chars:
                token.append(c1)
                i += 1
                continue

            i += 1

        return np.asarray(token, dtype=object)
        
    def vocaburaly(self):
        vocab_dict = {}
        for i, tok in enumerate(self.table):
            vocab_dict[tok] = i
        return vocab_dict
    
    def index_encode(self, tokenized_smiles):
        vocab_dict = {}
        for i, tok in enumerate(self.table):
            vocab_dict[tok] = i
        encoded = [vocab_dict[t] for t in tokenized_smiles ]
        return encoded

In [17]:
tokenizer = SmilesTokenizer()
tokens = [tokenizer.tokenize(x) for x in df.smiles]
vocabulary = tokenizer.vocaburaly()
indexed_smiles = [tokenizer.index_encode(x) for x in tokens]

In [18]:
from torchtext.vocab import vocab

sorted_by_freq_tuples = sorted(vocabulary.items(), key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq_tuples)
vocab1 = vocab(ordered_dict)
vocab1.insert_token("<pad>", 0)
vocab1.insert_token("<unk>", 1)
vocab1.set_default_index(1)

In [19]:
vocab1.get_itos()[:10]

['<pad>', '<unk>', 's', 'o', 'n', 'c', 'te', 'se', '-', '+']

In [20]:
text_pipeline = lambda x: [vocab1[token] for token in tokenizer.tokenize(x)]
text_pipeline("O=S(=O)")

[29, 22, 27, 26, 22, 29, 25]

In [21]:
label_pipeline = lambda x: 1. if x == 0. else 1.
label_pipeline("1")

1.0

In [22]:
def collate_batch(batch):
    label_list, text_list, lengths = [], [], []
    for   _text, _label in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text),
                                      dtype=torch.long)
        text_list.append(processed_text)
        lengths.append(processed_text.size(0))
    label_list = torch.tensor(label_list,
                                 dtype=torch.float)
    lengths = torch.tensor(lengths)
    padded_text_list = nn.utils.rnn.pad_sequence(text_list,
                                                     batch_first=True)  
    return padded_text_list, label_list, lengths

In [23]:
collate_batch(val_dataset)

(tensor([[35,  5, 18,  ...,  0,  0,  0],
         [35, 35, 35,  ...,  0,  0,  0],
         [35, 35, 26,  ...,  0,  0,  0],
         ...,
         [29, 22,  5,  ...,  0,  0,  0],
         [27,  5, 18,  ...,  0,  0,  0],
         [35, 35, 29,  ...,  0,  0,  0]]),
 tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         

In [24]:
train_dataloader = DataLoader(train_dataset,batch_size=10, shuffle=True, collate_fn=collate_batch)
val_dataloader = DataLoader(val_dataset,batch_size=10, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=True, collate_fn=collate_batch)

In [67]:
text_batch, label_batch, length_batch = next(iter(test_dataloader))
print(text_batch[:1])
print(label_batch)
print(length_batch)
print(text_batch.shape)

tensor([[35, 35, 26, 22, 29, 25, 35, 24, 28, 33, 23, 26,  5, 18,  5,  5,  5,  5,
          5, 18, 25, 26,  5, 18,  5,  5,  5,  5,  5, 18, 25,  5, 18,  5,  5,  5,
          5,  5, 18,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0]])
tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
tensor([39, 22, 34, 36, 39, 67, 14, 23, 25, 80])
torch.Size([10, 80])


In [26]:
## building an rnn model
## many to one classificaton 

In [27]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size,
                 num_layers, fc_hidden_size, output_size):
        super(). __init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        # batch_first=True causes input/output tensors to be of shape
        # (batch_dim, seq_dim, input_dim (embedding))
        # batch_dim = number of samples per batch
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, num_layers,
                           batch_first=True, bidirectional=True) 
        # only use the hidden_size, since its a many to one
        self.fc1 = nn.Linear(rnn_hidden_size * num_layers, fc_hidden_size)  
        
        self.relu = nn.ReLU()
        # regularization to avoid overfitting
        self.dropout = nn.Dropout(p=0.5)
        # output_size, use 1 for binary classification
        self.fc2 = nn.Linear(fc_hidden_size, output_size) 
        
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, text, text_lengths):
        # text dim: [sentence length, batch size]
        # text_length = [batch size]
        
        # [sentence len, batch size] => [sentence len, batch size, embedding size]
        out = self.embedding(text)
        
        # pack sequence to avoid using <paddings> during computations (saves computations)
        # lengths needs to be on cpu
        out = nn.utils.rnn.pack_padded_sequence(out, text_lengths.to('cpu'),
                                                enforce_sorted=False, batch_first=True)
        # =>  [batch_size, sentence len,  embedding dim]
        
        # Propagate input through LSTM
        out, (hidden, cell) = self.rnn(out) ## lstm with input, hidden, and internal (cell) state
        # out dim: [batch size, sentence length, hidden dim]
        # cell dim: [num layers, batch size, hidden dim]
        # hidden dim: [num_layers, batch_size, hidden dim]
        
        # use final hidden state from the last layer as an input to fc1
        # Index hidden state of last time step
        # hidden.size() --> [num_layers, batch_size, hidden dim]
        
        # final layer foward hidden state     
        hidden_fwd = hidden[-2]
        # final layer backwaed hidden state 
        hidden_bck = hidden[-1]
        
        # concatenate the 2 layers to pass to linear layer
        # hidden_fwd/bck = [batch size, hid dim]
        out = torch.cat((hidden_fwd, hidden_bck), dim = 1)
        # out = [batch size, hid dim * 2]

        out = self.fc1(out) ## first dense layer
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out) ## final layer
        out = self.sigmoid(out) 
        return out

In [28]:
vocab_size = len(vocab1) ## len of vocab size
embed_dim = 70 ## input size, usually around 50-500 dimensions
num_layers = 2 ## number of recurrent layers, 2 would mean stacking 2 layers to form stacked LSTM
rnn_hidden_size = 100 ## usually around 100-500 dimensions
fc_hidden_size = 100 ## usually around 100-500 dimensions
output_size = 1 ## since the output is between 0 and 1, thus 1 dimension

In [29]:
## Now we will instantiate the class LSTM1 object.

torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size,
            num_layers, fc_hidden_size, output_size).to(device)

In [30]:
model

RNN(
  (embedding): Embedding(45, 70, padding_idx=0)
  (rnn): LSTM(70, 100, num_layers=2, batch_first=True, bidirectional=True)
  (fc1): Linear(in_features=200, out_features=100, bias=True)
  (relu): ReLU()
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=100, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [35]:
def train(dataloader):
    # model training mode (gradient computation)
    model.train()
    # initailiz acc, and loss at zero 
    total_acc, total_loss = 0, 0
    for idx, (text, label, length) in enumerate (dataloader):
        # reset gradients to zero before each instance
        optimizer.zero_grad()
        # label predictions (forward papagation)
        # squeeze(1) => drop superficial one dimensional from a tensor
        predicted_label = model(text, length).squeeze(1) # or [:,0]
        # loss calculation
        loss = loss_fn(predicted_label, label)
        # compute gradients (backward propagation) 
        # to minimize loss functions with gradient descent
        loss.backward()
        # update parameters based on the computed gradients
        optimizer.step()
        ## logging
        if not idx % 50:
            print('| Epoch: %03d/%03d | Batch %04d/%04d | Loss: %.4f'
                  %(epoch +1, num_epochs, idx, len(dataloader), loss))
        # compute total accuracy
        # if pred label is >=0.5 to probability of true truth accuracy count increases
        # summation of the largest value which yields predicted class label
        total_acc += ((predicted_label >= 0.5).float() == label).float().sum().item()
        # compute total loss after back prop and parameter update
        total_loss += loss.item() * label.size(0)
    # compare true labels with the predicted labels to compute accuracy
    return total_acc/len(dataloader.dataset), \
            total_loss/len(dataloader.dataset)

In [36]:
def evaluate(dataloader):
    # model evaluation mode (no gradient computation)
    model.eval()
    # initailize acc, and loss at zero 
    total_acc, total_loss = 0, 0
    # disabling gradient calculation
    with torch.no_grad():
        for text, label, length in dataloader:
            # label predictions (forward papagation)
            # squeeze(1) => drop superficial one dimensional from a tensor
            predicted_label = model(text, length).squeeze(1) # reshape
            # loss calculation
            loss = loss_fn(predicted_label, label)
            # compute total accuracy
            # if pred label is >=0.5 to probability of true truth accuracy count increases
            # summation of the largest value which yields predicted class label
            total_acc += ((predicted_label >= 0.5).float() == label).float().sum().item()
            # compute total loss after back prop and parameter update
            total_loss += loss.item() * label.size(0)
            # compare true labels with the predicted labels to compute accuracy
        # compare true labels with the predicted labels to compute accuracy
        return total_acc/len(dataloader.dataset), \
                total_loss/len(dataloader.dataset)

In [42]:
# for binnary classification we use
# single class membership probability output (binary cross entropy loss)
# although BCELoss
loss_fn = nn.BCEWithLogitsLoss().to(device)
# Adam Optimizer to update parameters based on the computed gradients
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [43]:
torch.manual_seed(1)
num_epochs = 5

start_time = time.time()
for epoch in range(num_epochs):
    acc_train, loss_train = train(train_dataloader)
    acc_val , loss_val = evaluate(val_dataloader)
    print('| Epoch: %03d/%03d | Train Acc: %.2f%% | Valid. Acc: %.2f%%'
          %(epoch + 1, num_epochs, 100 * acc_train, 100 * acc_val))
    print(f'| Time elapsed: {(time.time() - start_time) / 60:.2f} min')

| Epoch: 001/005 | Batch 0000/0160 | Loss: 0.3133
| Epoch: 001/005 | Batch 0050/0160 | Loss: 0.3133
| Epoch: 001/005 | Batch 0100/0160 | Loss: 0.3133
| Epoch: 001/005 | Batch 0150/0160 | Loss: 0.3133
| Epoch: 001/005 | Train Acc: 100.00% | Valid. Acc: 100.00%
| Time elapsed: 0.53 min
| Epoch: 002/005 | Batch 0000/0160 | Loss: 0.3133
| Epoch: 002/005 | Batch 0050/0160 | Loss: 0.3133
| Epoch: 002/005 | Batch 0100/0160 | Loss: 0.3133
| Epoch: 002/005 | Batch 0150/0160 | Loss: 0.3133
| Epoch: 002/005 | Train Acc: 100.00% | Valid. Acc: 100.00%
| Time elapsed: 1.12 min
| Epoch: 003/005 | Batch 0000/0160 | Loss: 0.3133
| Epoch: 003/005 | Batch 0050/0160 | Loss: 0.3133
| Epoch: 003/005 | Batch 0100/0160 | Loss: 0.3133
| Epoch: 003/005 | Batch 0150/0160 | Loss: 0.3133
| Epoch: 003/005 | Train Acc: 100.00% | Valid. Acc: 100.00%
| Time elapsed: 1.72 min
| Epoch: 004/005 | Batch 0000/0160 | Loss: 0.3133
| Epoch: 004/005 | Batch 0050/0160 | Loss: 0.3133
| Epoch: 004/005 | Batch 0100/0160 | Loss: 0.

In [44]:
acc_test, _ = evaluate(test_dataloader)
print('Test Acc: %.2f%%'
      %(100 * acc_test))

Test Acc: 100.00%


In [45]:
## https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/2%20-%20Upgraded%20Sentiment%20Analysis.ipynb
## https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/2_lstm.ipynb
sentiment_label = {1: "high",
                   0: "low"}

def predict(text, text_pipeline):
    # evaluation mode (no gradients)
    with torch.no_grad():
        # tokenize and index the tokens
        processed_text = torch.tensor(text_pipeline(text))
        # add a batch dimension
        processed_text = processed_text.unsqueeze(0).to(device)
        # compute sequence length
        text_length = processed_text.size(0)
        # convert to tensor and add a batch dimension
        text_length = torch.tensor(text_length).unsqueeze(0)
        # prediction
        prediction = model(processed_text, text_length)
        # reduction real numbers to values between 0 and 1
        probability = torch.sigmoid(prediction)
        # get the max value of all elements
        predicted_probability, predicted_class, = torch.max(probability, dim=1)
        # convert tensor holding a single value into an integer
        return predicted_class.item(), predicted_probability.item()

In [41]:
text = "CC(=O)N1c2ccccc2Sc2c1ccc1ccccc21"
pred_class, pred_proba = predict(text, text_pipeline)

print(f'Predicted Class: {pred_class} = {sentiment_label[pred_class]}')
print(f'Probability: {pred_proba}')

Predicted Class: 0 = low
Probability: 0.7310488224029541


In [50]:
text = "O=C(C#CCCN1CCOCC1)Nc1cc2c(Nc3ccc(F)c(Cl)c3)ncn"
pred_class, pred_proba = predict(text, text_pipeline)

print(f'Predicted Class: {pred_class} = {sentiment_label[pred_class]}')
print(f'Probability: {pred_proba}')

Predicted Class: 0 = low
Probability: 0.7307953834533691
