In [1]:
import time
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
device

device(type='cpu')

In [4]:
df = pd.read_csv("../../data/HIV.csv")
df = df.iloc[:2000,:]
print(f"size: {df.shape[0]}")
df.tail(5)

size: 2000


Unnamed: 0,smiles,activity,HIV_active
1995,COC(C(=O)C(O)C(C)O)C1Cc2cc3cc(OC4CC(OC5CC(O)C(...,CI,0
1996,N=c1[nH][nH]c(=N)[nH]1.O=C(O)C(=O)O,CI,0
1997,CC12COP(OC1)OC2,CI,0
1998,C1C2CC3CC1OP(O2)O3,CI,0
1999,CC1(c2ccc(Cl)cc2)OCC(CCl)O1,CI,0


In [5]:
np.savetxt("../../data/HIV_smiles.npy", np.array(df.smiles), fmt='%s')
np.savetxt("../../data/HIV_labels.npy", np.array(df.HIV_active), fmt='%s')

In [6]:
def tokenizer(smiles):
    ## https://github.com/topazape/LSTM_Chem/blob/master/lstm_chem/utils/smiles_tokenizer2.py

    atoms = [
        'Al', 'As', 'B', 'Br', 'C', 'Cl', 'F', 'H', 'I', 'K', 'Li', 'N',
        'Na', 'O', 'P', 'S', 'Se', 'Si', 'Te'
    ]
    special = [
        '(', ')', '[', ']', '=', '#', '%', '0', '1', '2', '3', '4', '5',
        '6', '7', '8', '9', '+', '-', 'se', 'te', 'c', 'n', 'o', 's'
    ]
    padding = ['<eos>', '<sos>', '<pad>']

    vocab = sorted(atoms, key=len, reverse=True) + special + padding

    table_2_chars = list(filter(lambda x: len(x) == 2, vocab))
    table_1_chars = list(filter(lambda x: len(x) == 1, vocab))

    smiles = smiles + ' '
    N = len(smiles)
    token = []
    i = 0
    while (i < N):
        c1 = smiles[i]
        c2 = smiles[i:i + 2]

        if c2 in table_2_chars:
            token.append(c2)
            i += 2
            continue

        if c1 in table_1_chars:
            token.append(c1)
            i += 1
            continue

        i += 1
    
    vocab_dict = {}
    for i, tok in enumerate(vocab):
        vocab_dict[tok] = i

    encoded = [vocab_dict[t] for t in token]
    return encoded

In [7]:
if __name__ == "__main__":
    input_ids = []
    for line in tqdm(df.smiles):
        ids = tokenizer(line)
        input_ids.append(ids)

  0%|          | 0/2000 [00:00<?, ?it/s]

In [8]:
len(input_ids)

2000

In [9]:
## dirs
smiles_dir = "../../data/HIV_smiles.npy"
labels_dir = "../../data/HIV_labels.npy"

In [10]:
class CustomDataset(Dataset):
    def __init__(self, data, label, tokenizer):
        self.data = data
        self.label = label
        self.tokenizer = tokenizer
        
        self.data = np.loadtxt(data, dtype=object)    
        self.label = np.loadtxt(label, dtype=float)
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        text = self.data[idx]
        label = self.label[idx]
        input_id = tokenizer(text)
        return torch.tensor(input_id, dtype=torch.long), torch.tensor(label, dtype=torch.float)

In [11]:
if __name__ == "__main__":
    dataset = CustomDataset(smiles_dir, labels_dir, tokenizer)

In [12]:
from torch.utils.data import random_split

[0.8 *len(dataset)], [0.1 *len(dataset)], [0.1 *len(dataset)]

([1600.0], [200.0], [200.0])

In [13]:
train_dataset, test_dataset, val_dataset = random_split(dataset, [1600, 200, 200]) # 80, 10, 10
len(train_dataset + test_dataset + val_dataset)

2000

In [14]:
for smiles, labels in train_dataset:
    print("Input ID:\n " ,smiles)
    print("Label:\n" ,labels)
    break

Input ID:
  tensor([10, 16, 40, 27, 40, 40, 28, 40, 19, 41, 41, 27, 20, 10, 10, 10, 10, 28])
Label:
 tensor(0.)


In [15]:
def collate_batch(batch):
    label_list, text_list= [], []
    for data, label in batch:
        ## smiles (copy smiles without...)
        processed_text = data.clone().detach() 
        text_list.append(processed_text)
        ## label
        label_list.append(label)
    label_list = torch.tensor(label_list)
        ## padding
    padded_text_list = nn.utils.rnn.pad_sequence(text_list, batch_first=True)
                       
    return padded_text_list, label_list, 

In [16]:
collate_batch(val_dataset)

(tensor([[10, 10, 16,  ...,  0,  0,  0],
         [16, 23, 10,  ...,  0,  0,  0],
         [16, 23, 40,  ...,  0,  0,  0],
         ...,
         [18, 23, 17,  ...,  0,  0,  0],
         [16, 23, 10,  ...,  0,  0,  0],
         [16, 10, 40,  ...,  0,  0,  0]]),
 tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 1., 0., 0., 0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         

In [17]:
train_dataloader = DataLoader(train_dataset, batch_size=20,shuffle=False, collate_fn=collate_batch)
val_dataloader = DataLoader(val_dataset, batch_size=20,shuffle=False, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=20,shuffle=False, collate_fn=collate_batch)

In [18]:
text_batch, label_batch = next(iter(train_dataloader))
print(text_batch)

tensor([[10, 16, 40,  ...,  0,  0,  0],
        [10, 10, 19,  ...,  0,  0,  0],
        [16, 23, 40,  ...,  0,  0,  0],
        ...,
        [10, 10, 15,  ...,  0,  0,  0],
        [16, 23, 15,  ...,  0,  0,  0],
        [10, 16, 40,  ..., 40, 27, 16]])


In [19]:
print(label_batch)

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])


In [20]:
print(text_batch.shape)

torch.Size([20, 100])


In [21]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, 
                 dropout, pad_idx):
        
        super().__init__()
                
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        # in_channels; num of channels, we use 1 since its text only
        # out_channels; output feature map
        # kernel_size; the size of filters i.e. [n-grams size x emb_dim]
        # if 3 filters are provided, we get 3 convolution layers
        self.convs = nn.ModuleList([
                                    nn.Conv2d(in_channels = 1, 
                                              out_channels = n_filters, 
                                              kernel_size = (filt, embedding_dim)) 
                                    for filt in filter_sizes
                                    ])
        
        # dropout for regularization
        self.dropout = nn.Dropout(dropout)
        
        # if we have 3 filter sizes, we get e linear layers
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        
        #text = [batch size, sent len]
    def forward(self, text):
        
        # add embedding dimension to text
        # [batch size, sent len] ==> [batch size, sent len, emb dim]
        embedded = self.embedding(text)
        
        # add 1 dimensional 
        # [batch size, sent len, emb dim]==> [batch size, 1, sent len, emb dim]
        embedded = embedded.unsqueeze(1)
        
        # application of Relu activation function
        # foward pass of embedd text to convolution layers
        # squeeze() to drop superficial 3 dimensional from a tensor
        # conved dim ==> [batch size, n_filters, sent len * len(filter_sizes)]
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        
        # max pooling to decrease num of features (sub-sampling)
        # squeeze() to drop superficial 2 dimensional from a tensor
        # pooled dim ==> [batch size, n_filters * len(filter_sizes)]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        
        # concatenate feature outputs
        # cat dim ==> [batch size, n_filters * len(filter_sizes)]
        cat = torch.cat(pooled, dim = 1)
        
        # dropout on concatenated output filters
        # out dim ==> [batch size, n_filters * len(filter_sizes)]
        out = self.dropout(cat)
        
        # pass filter outputs to linear layer for prediction
        # pred dim ==> [batch size, n_filters * len(filter_sizes)]
        pred = self.fc(out)
        return pred

In [22]:
INPUT_DIM = 60 ## len of vocab size
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1 ## since the output is between 0 and 1, thus 1 dimension
DROPOUT = 0.5
PAD_IDX = 0

model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)

In [23]:
model

CNN(
  (embedding): Embedding(60, 100, padding_idx=0)
  (convs): ModuleList(
    (0): Conv2d(1, 100, kernel_size=(3, 100), stride=(1, 1))
    (1): Conv2d(1, 100, kernel_size=(4, 100), stride=(1, 1))
    (2): Conv2d(1, 100, kernel_size=(5, 100), stride=(1, 1))
  )
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=300, out_features=1, bias=True)
)

In [43]:
def train(dataloader):
    # model training mode (gradient computation)
    model.train()
    # initailiz acc, and loss at zero 
    total_acc, total_loss = 0, 0
    for idx, (text, label) in enumerate (dataloader):
        # reset gradients to zero before each instance
        optimizer.zero_grad()
        # label predictions (forward papagation)
        # squeeze(1) => drop superficial one dimensional from a tensor
        predicted_label = model(text).squeeze(1) # or [:,0]
        # loss calculation
        loss = loss_fn(predicted_label, label)
        # compute gradients (backward propagation) 
        # to minimize loss functions with gradient descent
        loss.backward()
        # update parameters based on the computed gradients
        optimizer.step()
        
        ## logging
        if not idx % 50:
            print('| Epoch: %03d/%03d | Batch %03d/%03d | Loss: %.4f'
                  %(epoch +1, num_epochs, idx, len(dataloader), loss))
        # compute total accuracy
        # if pred label is >=0.5 to probability of true truth accuracy count increases
        # summation of the largest value which yields predicted class label
        total_acc += ((predicted_label >= 0.5).float() == label).float().sum().item()
        # compute total loss after back prop and parameter update
        total_loss += loss.item() * label.size(0)
    # compare true labels with the predicted labels to compute accuracy
    return total_acc/len(dataloader.dataset), \
            total_loss/len(dataloader.dataset)

In [44]:
def evaluate(dataloader):
    # model evaluation mode (no gradient computation)
    model.eval()
    # initailize acc, and loss at zero 
    total_acc, total_loss = 0, 0
    # disabling gradient calculation
    with torch.no_grad():
        for text, label in dataloader:
            # label predictions (forward papagation)
            # squeeze(1) => drop superficial one dimensional from a tensor
            predicted_label = model(text).squeeze(1) # reshape
            # loss calculation
            loss = loss_fn(predicted_label, label)
            # compute total accuracy
            # if pred label is >=0.5 to probability of true truth accuracy count increases
            # summation of the largest value which yields predicted class label
            total_acc += ((predicted_label >= 0.5).float() == label).float().sum().item()
            # compute total loss after back prop and parameter update
            total_loss += loss.item() * label.size(0)
            # compare true labels with the predicted labels to compute accuracy
        # compare true labels with the predicted labels to compute accuracy
        return total_acc/len(dataloader.dataset), \
                total_loss/len(dataloader.dataset)

In [45]:
# for binnary classification we use binary cross entropy loss
# which provides logits as inputs to loss function
loss_fn = nn.BCEWithLogitsLoss()
# Adam Optimizer to update parameters based on the computed gradients
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [46]:
model = model.to(device)
loss_fn = loss_fn.to(device)

In [47]:
torch.manual_seed(1)
num_epochs = 3

start_time = time.time()
for epoch in range(num_epochs):
    acc_train, loss_train = train(train_dataloader)
    acc_val , loss_val = evaluate(val_dataloader)
    print('| Epoch: %03d/%03d | Train Acc: %.2f%% | Valid. Acc: %.2f%%'
          %(epoch + 1, num_epochs, 100 * acc_train, 100 * acc_val))
    print(f'| Time elapsed: {(time.time() - start_time) / 60:.2f} min')

| Epoch: 001/003 | Batch 000/080 | Loss: 0.0006
| Epoch: 001/003 | Batch 050/080 | Loss: 0.0098
| Epoch: 001/003 | Train Acc: 99.38% | Valid. Acc: 94.50%
| Time elapsed: 0.05 min
| Epoch: 002/003 | Batch 000/080 | Loss: 0.0033
| Epoch: 002/003 | Batch 050/080 | Loss: 0.0370
| Epoch: 002/003 | Train Acc: 99.06% | Valid. Acc: 94.50%
| Time elapsed: 0.11 min
| Epoch: 003/003 | Batch 000/080 | Loss: 0.0253
| Epoch: 003/003 | Batch 050/080 | Loss: 0.0193
| Epoch: 003/003 | Train Acc: 99.50% | Valid. Acc: 94.50%
| Time elapsed: 0.18 min


In [29]:
acc_test, _ = evaluate(test_dataloader)
print('Test Acc: %.2f'
      %(100 * acc_test))

Test Acc: 95.50


In [None]:
# in binary classification We use a sigmoid function to squash the input between 0 and 1
# unlike in multiclass classification where we use sofmax 
# or use argmax to get the highest value

In [30]:
## https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/2%20-%20Upgraded%20Sentiment%20Analysis.ipynb
## https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/2_lstm.ipynb

sentiment_label = {0: "low",
                   1: "medium"}

def predict(text, tokenizer):
    # evaluation mode (no gradients)
    with torch.no_grad():
        # tokenize and index the tokens
        processed_text = torch.tensor(tokenizer(text))
        # add a batch dimension
        processed_text = processed_text.unsqueeze(0).to(device)
        # prediction
        prediction = model(processed_text)
        # reduction real numbers to values between 0 and 1
        probability = torch.sigmoid(prediction)
        # get the max value of all elements
        predicted_probability, predicted_class, = torch.max(probability, dim=1)
        # convert tensor holding a single value into an integer
        return predicted_class.item(), predicted_probability.item()

In [31]:
text = "CC(=O)N1c2ccccc2Sc2c1ccc1ccccc21"
pred_class, pred_proba = predict(text, tokenizer)

print(f'Predicted Class: {pred_class} = {sentiment_label[pred_class]}')
print(f'Probability: {pred_proba}')

Predicted Class: 0 = low
Probability: 0.0043982649222016335


In [32]:
text = "COc1ccc(C)cc1S(=O)(=O)c1c(Cl)cccc1[N+](=O)[O-]"
pred_class, pred_proba = predict(text, tokenizer)

print(f'Predicted Class: {pred_class} = {sentiment_label[pred_class]}')
print(f'Probability: {pred_proba}')

Predicted Class: 0 = low
Probability: 0.5027364492416382
