In [1]:
# Declaring the model
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from rdkit import Chem
import tqdm

class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_hidden=10, n_layers=2,
                               drop_prob=0.2, lr=0.001):# n_hidden=256,
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        #define the LSTM
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        #define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        #define the final, fully-connected output layer
        self.fc = nn.Linear(n_hidden, len(self.chars))
      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''
                
        #get the outputs and the new hidden state from the lstm
        r_output, hidden = self.lstm(x, hidden)
        
        #pass through a dropout layer
        out = self.dropout(r_output)
        
        # Stack up LSTM outputs using view
        out = out.contiguous().view(-1, self.n_hidden)
        
        #put x through the fully-connected layer
        out = self.fc(out)
        
        # return the final output and the hidden state
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        # Check if GPU is available
        train_on_gpu = torch.cuda.is_available()

        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

In [2]:
with open('../data/antidiabetic/ANTIDIABETIC_smiles_variants.txt','r') as f:
    text = f.read()

In [3]:
# Showing the first 100 characters
text[:100]

'c1(c2c(c(ccc2)C(=S)N(CC(=O)O)C)ccc1OC)C(F)(F)F\nC(c1c2c(c(C(F)(F)F)c(cc2)OC)ccc1)(N(CC(=O)O)C)=S\nFC(c'

In [4]:
# Defining method to encode one hot labels
def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

# Defining method to make mini-batches for training
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y
    
# Declaring the train method
def train(net, data, epochs=1, batch_size=32, seq_length=100, lr=0.001, clip=5, val_frac=0.1, print_every=10000):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    # Check if GPU is available
    train_on_gpu = torch.cuda.is_available()
    #if(train_on_gpu):
        #print('Training on GPU!')
    #else:
        #print('No GPU available, training on CPU; consider making n_epochs very small.')
    if(train_on_gpu):
        net.cuda()
    v_l = 1.6
    counter = 0
    n_chars = len(net.chars)
    for e in tqdm.tqdm(range(epochs)):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                Loss = loss.item()
                Val_Loss = np.mean(val_losses)
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(Loss),
                      "Val Loss: {:.4f}".format(Val_Loss))
                
                if (v_l > Val_Loss) & (counter>6999):
                     v_l = Val_Loss
                     # Saving the model
                     checkpoint = {'n_hidden': net.n_hidden,
                                  'n_layers': net.n_layers,
                                  'state_dict': net.state_dict(),
                                  'tokens': net.chars}
                     torch.save(net, '{}_model_{}_{}.pt'.format(counter, Loss, Val_Loss))

# PREDICT

In [5]:
def predict(net, char, h=None, top_k=None):
    ''' Given a character, predict the next character.
        Returns the predicted character and the hidden state.
    '''

    # tensor inputs
    x = np.array([[net.char2int[char]]])
    x = one_hot_encode(x, len(net.chars))
    inputs = torch.from_numpy(x)
    train_on_gpu = torch.cuda.is_available()
    if (train_on_gpu):
         inputs = inputs.cuda()

    # detach hidden state from history
    h = tuple([each.data for each in h])
    # get the output of the model
    out, h = net(inputs, h)

    # get the character probabilities
    p = F.softmax(out, dim=1).data
    # train_on_gpu = torch.cuda.is_available()
    if (train_on_gpu):
        p = p.cpu()  # move to cpu

    # get top characters
    if top_k is None:
        top_ch = np.arange(len(net.chars))
    else:
        p, top_ch = p.topk(top_k)
        top_ch = top_ch.numpy().squeeze()

    # select the likely next character with some element of randomness
    p = p.numpy().squeeze()
    char = np.random.choice(top_ch, p=p / p.sum())

    # return the encoded value of the predicted char and the hidden state
    return net.int2char[char], h

def sample(net, size, prime='B', top_k=None):
    # Check if GPU is available
    train_on_gpu = torch.cuda.is_available()

    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [6]:
net=torch.load('../models/RNN_models/model12.0.pt', weights_only=False)
# Generating new text
t = sample(net, 1000, prime='C', top_k=5)

In [7]:
t = t.split('\n')

In [8]:
def validate_smiles(smiles: str) -> str | None:
    """
    Returns the canonical SMILES if valid, otherwise None.
    """
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        return Chem.MolToSmiles(mol, canonical=True)
    return None

In [9]:
valid_smiles = [validate_smiles(s) for s in t if validate_smiles(s) is not None]

[07:13:37] SMILES Parse Error: extra open parentheses while parsing: C[C@@H]1N(CCCC[C@@H]1CC1
[07:13:37] SMILES Parse Error: check for mistakes around position 10:
[07:13:37] C[C@@H]1N(CCCC[C@@H]1CC1
[07:13:37] ~~~~~~~~~^
[07:13:37] SMILES Parse Error: Failed parsing SMILES 'C[C@@H]1N(CCCC[C@@H]1CC1' for input: 'C[C@@H]1N(CCCC[C@@H]1CC1'
[07:13:37] SMILES Parse Error: syntax error while parsing: c1ccc(C[C@H](C(N2CCNC([C@H]3COC(C)(C)C)=O)c2c(cc2)cccc2)C)CCCNC1==C)C
[07:13:37] SMILES Parse Error: check for mistakes around position 66:
[07:13:37] (cc2)cccc2)C)CCCNC1==C)C
[07:13:37] ~~~~~~~~~~~~~~~~~~~~^
[07:13:37] SMILES Parse Error: Failed parsing SMILES 'c1ccc(C[C@H](C(N2CCNC([C@H]3COC(C)(C)C)=O)c2c(cc2)cccc2)C)CCCNC1==C)C' for input: 'c1ccc(C[C@H](C(N2CCNC([C@H]3COC(C)(C)C)=O)c2c(cc2)cccc2)C)CCCNC1==C)C'
[07:13:37] SMILES Parse Error: unclosed ring for input: 'N1(c2n(nc(C)c2)-c2ccccc3)CCN(C2)C(=O)[C@@H]1CN[C@H](C(=O)N2CCSC2)C1'
[07:13:37] SMILES Parse Error: extra close parentheses whi

In [10]:
valid_smiles

['CC(C)(C)OC[C@@H]1C(=O)NCCN1C1C[C@H](N)CC(=O)C(F)=C(F)C=C1F']

In [11]:
import pickle
with open('../data/antidiabetic/GEN_valid_smiles1.pkl', 'wb') as f:
    pickle.dump(obj= valid_smiles, file=f)