<a href="https://colab.research.google.com/github/sachaRfd/Chemistry-Related-notebooks/blob/main/Genarating_Chemical_Compounds_using_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Project Brief: 
- I want to create and train RNNs to be able to generate theoretically pheasible chemical compounds. For this I will be trying to implement LSTMs as they have been shown to produce SMILES (String-form representations of chemicals) quite well. 



Let's do some classic Imports: 

In [1]:
%load_ext tensorboard
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sys, os
sys.version


!pip install rdkit -q

from rdkit import Chem
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import PandasTools
from rdkit.Chem import Descriptors

!pip -q install pytorch-lightning

import torch
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning import LightningDataModule, LightningModule
# Set Device to GPU is available - otherwise set to CPU: 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Your Current Device is {device}')  # Check the Colab Device we are using



from google.colab import drive
drive.mount('/content/drive')

[K     |████████████████████████████████| 29.3 MB 1.3 MB/s 
[K     |████████████████████████████████| 800 kB 8.2 MB/s 
[K     |████████████████████████████████| 512 kB 57.1 MB/s 
[K     |████████████████████████████████| 125 kB 54.2 MB/s 
[?25hYour Current Device is cpu
Mounted at /content/drive


Now we need to think about how we are going to be getting our data and preprocessing it: 

- Zinc is a great place to find chemical compound SMILES: 

http://files.docking.org/2D/

In [11]:
shard = "EEBD"
os.system(f"mkdir -pv {shard[0:2]} && wget -c http://files.docking.org/2D/{shard[0:2]}/{shard}.smi -O {shard[0:2]}/{shard}.smi")
DF = pd.read_csv('EE/EEBD.smi', sep = " ")  # In the File, the Smiles and Zinc_ids are seperated by a space
DF

Unnamed: 0,smiles,zinc_id
0,O=C(COc1ccccc1[N+](=O)[O-])OCc1ccc2c(c1)OCO2,6119166
1,CN(CCc1ccccn1)C(=O)CCNc1ccccc1[N+](=O)[O-],55206436
2,Cc1ocnc1C(=O)OCC1CCN(c2ccc([N+](=O)[O-])cn2)CC1,105766284
3,CSCC[C@@H]1NC(=S)N([C@@H](C)c2cccc([N+](=O)[O-...,55179745
4,Cc1ccc([N+](=O)[O-])cc1NC(=O)CN1CCC(NC(C)C)CC1,108371623
...,...,...
426480,Cc1ncn(C)c1C(=O)N1CCC(Nc2ccc([N+](=O)[O-])cc2)CC1,1057380417
426481,Cc1ncncc1C(=O)NC1CCN(c2ccc([N+](=O)[O-])cc2)CC1,1057839606
426482,O=C(N[C@H]1CC[C@H](Nc2ccccc2[N+](=O)[O-])CC1)c...,1063352532
426483,O=C(NCC1CC2(C1)OCCO2)c1cccc(Cl)c1[N+](=O)[O-],832201123


We can see that the dataset is quite large, let's check for ducplicates: 

In [15]:
print('There are',DF.duplicated().sum(), 'Duplicates in the Dataset!')

There are 0 Duplicates in the Dataset!


Now we have to think about Tokenising the SMILES in a way the algorithm can understand the molecules: Classic Encoding and possibly spatial Encoding: 

For now I will be using a Character based Tokenising algorithm adapted from the following blog: 

https://www.cheminformania.com/non-conditional-de-novo-molecular-generation-with-transformer-encoders/

In [42]:
# A simple tokenizer
class SimpleTokenizer():
  def __init__(self):
    self.start = "^"  # Starting Character 
    self.end = "$"  #  Ending Character
    self.unknown = "?"
    self.pad = " "  # Padding to all the SMILES are same length
  def create_vocabulary(self, smiles):
    charset = set("".join(list(smiles)))
    #Important that pad gets value 0
    self.tokenlist = [self.pad, self.unknown, self.start, self.end] + list(charset)
  @property
  def tokenlist(self):
    return self._tokenlist
  @tokenlist.setter
  def tokenlist(self, tokenlist):
    self._tokenlist = tokenlist
    #create the dictionaries      
    self.char_to_int = {c:i for i,c in enumerate(self._tokenlist)}
    self.int_to_char = {i:c for c,i in self.char_to_int.items()}
  def vectorize(self, smiles):
      return [self.char_to_int[self.start]] + [self.char_to_int.get(char, self.char_to_int[self.unknown]) for char in smiles] + [self.char_to_int[self.end]]
  def devectorize(self, tensor):
      return [self.int_to_char[i] for i in tensor]
  @property
  def n_tokens(self):
    return len(self.int_to_char)

In [19]:
Tokeniser = SimpleTokenizer()

Now lets create a dataset class that is able to turn smiles into Tensors that can be read through an RNN such as LSTMs:

The Class needs to include at least a len function and a getitem function.

Simple Dataset Class: 

In [45]:
# shard = "EEBD"
# os.system(f"mkdir -pv {shard[0:2]} && wget -c http://files.docking.org/2D/{shard[0:2]}/{shard}.smi -O {shard[0:2]}/{shard}.smi")
# DF = pd.read_csv('EE/EEBD.smi', sep = " ")  # In the File, the Smiles and Zinc_ids are seperated by a space
# DF

class Molecular_Dataset(Dataset):
  def __init__(self):
    self.shard = "EEBD"
    # Next line Imports the data to local drive:
    os.system(f"mkdir -pv {shard[0:2]} && wget -c http://files.docking.org/2D/{shard[0:2]}/{shard}.smi -O {shard[0:2]}/{shard}.smi")
    self.DF = pd.read_csv('EE/EEBD.smi', sep = " ")
    self.data = self.DF
  def __len__(self):
    return len(self.data)
  def __getitem__(self, idx):
    smiles = self.data.smiles.iloc[idx]
    return smiles

Mol_DS = Molecular_Dataset()
Mol_DS[100]

'C[C@@H](NC(=O)c1c[nH]c2ccncc2c1=O)c1cccc([N+](=O)[O-])c1'

Below is the dataset I am trying to make myself: 

- Will be working on it more later: 

In [44]:
# shard = "EEBD"
# os.system(f"mkdir -pv {shard[0:2]} && wget -c http://files.docking.org/2D/{shard[0:2]}/{shard}.smi -O {shard[0:2]}/{shard}.smi")
# DF = pd.read_csv('EE/EEBD.smi', sep = " ")  # In the File, the Smiles and Zinc_ids are seperated by a space
# DF

class Molecular_Dataset(Dataset):
  def __init__(self, tokenizer):
    self.shard = "EEBD"
    # Next line Imports the data to local drive:
    os.system(f"mkdir -pv {shard[0:2]} && wget -c http://files.docking.org/2D/{shard[0:2]}/{shard}.smi -O {shard[0:2]}/{shard}.smi")
    self.DF = pd.read_csv('EE/EEBD.smi', sep = " ")
    self.data = self.DF
    self.tokenizer = tokenizer
    
  def __len__(self):
    return len(self.data)
  def __getitem__(self, idx):
    smiles = self.data.smiles.iloc[idx]
    # Create Vocabulary for Tokens: 
    self.tokenizer.create_vocabulary(self.data.smiles.values)
    smiles = self.data.smiles.iloc[idx]
    self.tokenizer.tokenlist
    tensor = self.tokenizer.vectorize(smiles)
    return tensor

token = SimpleTokenizer()
Mol_DS = Molecular_Dataset(token)
Mol_DS[0]

AttributeError: ignored

Now To add the Tokeniser to the Dataset: 

In [61]:
# A simple tokenizer
class SimpleTokenizer():
  def __init__(self):
    self.start = "^"
    self.end = "$"
    self.unknown = "?"
    self.pad = " "
  def create_vocabulary(self, smiles):
    charset = set("".join(list(smiles)))
    #Important that pad gets value 0
    self.tokenlist = [self.pad, self.unknown, self.start, self.end] + list(charset)
  @property
  def tokenlist(self):
    return self._tokenlist
  @tokenlist.setter
  def tokenlist(self, tokenlist):
    self._tokenlist = tokenlist
    #create the dictionaries      
    self.char_to_int = {c:i for i,c in enumerate(self._tokenlist)}
    self.int_to_char = {i:c for c,i in self.char_to_int.items()}
  def vectorize(self, smiles):
      return [self.char_to_int[self.start]] + [self.char_to_int.get(char, self.char_to_int[self.unknown]) for char in smiles] + [self.char_to_int[self.end]]
  def devectorize(self, tensor):
      return [self.int_to_char[i] for i in tensor]
  @property
  def n_tokens(self):
    return len(self.int_to_char)

In [62]:
# Build a dataset, that returns tensors
class MolDataset(Dataset):
  def __init__(self, data, tokenizer):
    self.data = data
    self.tokenizer = tokenizer
  def __len__(self):
    return len(self.data)
  def __getitem__(self, idx):
    smiles = self.data.smiles.iloc[idx]
    tensor = self.tokenizer.vectorize(smiles)
    return tensor

In [63]:
class ZincShardDataModule(LightningDataModule):
    def __init__(self):
        super().__init__()
        self.tokenizer = SimpleTokenizer()
        self.val_size = 200  # Validation size
        self.train_size = 200000
        self.batch_size = 128
        self.shard = "EEBD" #Shard must be > self.val_size + self.train_size
    def prepare_data(self):
        os.system(f"mkdir -pv {self.shard[0:2]} && wget -c http://files.docking.org/2D/{self.shard[0:2]}/{self.shard}.smi -O {self.shard[0:2]}/{self.shard}.smi")
    def randomize_smiles_atom_order(self, smiles):
        mol = Chem.MolFromSmiles(smiles)
        atom_idxs = list(range(mol.GetNumAtoms()))
        np.random.shuffle(atom_idxs)
        mol = Chem.RenumberAtoms(mol,atom_idxs)
        return Chem.MolToSmiles(mol, canonical=False)
    def custom_collate_and_pad(self, batch):
        #Batch is a list of vectorized smiles
        tensors = [torch.tensor(l) for l in batch]
        #pad and transpose, pytorch RNNs  (and now transformers) expect (sequence,batch, features) batch) dimensions
        tensors = torch.nn.utils.rnn.pad_sequence(tensors)
        return tensors
    def setup(self):
        #Load data
        self.data = pd.read_csv(f"{self.shard[0:2]}/{self.shard}.smi", sep = " ", nrows = self.val_size + self.train_size)
        #Atom order randomize SMILES
        self.data["smiles"] = self.data["smiles"].apply(self.randomize_smiles_atom_order)  # Apply atom randomisation as this was shown to imporve the generation space of the models
         
        #Initialize Tokenizer
        self.tokenizer.create_vocabulary(self.data.smiles.values)
        #Create splits for train/val
        np.random.seed(seed=42)
        idxs = np.array(range(len(self.data)))
        np.random.shuffle(idxs)
        val_idxs, train_idxs = idxs[:self.val_size], idxs[self.val_size:self.val_size+self.train_size]
        self.train_data = self.data.iloc[train_idxs]
        self.val_data = self.data.iloc[val_idxs]
    def train_dataloader(self):
        dataset = MolDataset(self.train_data, self.tokenizer)
        return DataLoader(dataset, batch_size=self.batch_size,collate_fn=self.custom_collate_and_pad)
    def val_dataloader(self):
        dataset = MolDataset(self.val_data, self.tokenizer)
        return DataLoader(dataset, batch_size=self.batch_size,collate_fn=self.custom_collate_and_pad, shuffle=False)

Now let's Instanciate the Dataset: 

In [65]:
molecular_stuff = ZincShardDataModule()
molecular_stuff.prepare_data()
molecular_stuff.setup()

molecular_stuff.data.head()

Unnamed: 0,smiles,zinc_id
0,c1c2c(ccc1COC(COc1ccccc1[N+]([O-])=O)=O)OCO2,6119166
1,c1ccc([N+]([O-])=O)c(NCCC(=O)N(CCc2ncccc2)C)c1,55206436
2,O=C(OCC1CCN(c2ccc([N+]([O-])=O)cn2)CC1)c1c(C)ocn1,105766284
3,c1cc([C@H](C)N2C(=O)[C@H](CCSC)NC2=S)cc([N+](=...,55179745
4,[N+](c1cc(NC(=O)CN2CCC(NC(C)C)CC2)c(C)cc1)(=O)...,108371623


Let's Look at how the Dataloaders work: 

In [66]:
TL = molecular_stuff.val_dataloader()
test_batch = next(iter(TL))
test_batch

tensor([[ 2,  2,  2,  ...,  2,  2,  2],
        [ 8, 35, 23,  ..., 23, 35, 34],
        [13, 13, 34,  ..., 13, 29, 23],
        ...,
        [ 0,  0,  0,  ...,  0,  0,  0],
        [ 0,  0,  0,  ...,  0,  0,  0],
        [ 0,  0,  0,  ...,  0,  0,  0]])

Great so we have the Tokenised Dataset now. Let's now implement a simple RNN model: 
# Not LSTM Yet: 

To implement our RNN, we will start by creating an `nn.Module` that will represent a single RNN cell. This cell will update its hidden state by:

- Applying a linear, fully connected layer to the cell input.
- Applying a linear, fully connected layer to the previous hidden state.
- Applying a non-linear activation to the result.

In [68]:
import torch.nn as nn

class RNNCell(nn.Module):

    def __init__(self, input_size, hidden_size, bias=True, activation="tanh"):
        super(RNNCell, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias

        # select appropriate activation function
        if activation == 'tanh':
          self.act = nn.Tanh()   
        elif activation == 'sigmoid':
          self.act = nn.Sigmoid()
        elif activation == 'ReLU':
          self.act == nn.ReLU()

        # create linear layer from input to hidden state
        self.i2h = nn.Linear(input_size, hidden_size, bias=self.bias)
        # create linear layer from previous to current hidden state
        self.h2h = nn.Linear(hidden_size, hidden_size, bias=self.bias)

        # initialise the parameters  --> we need to initialise the parameters in this case
        # Use the reset parameters function:

        self.reset_parameters()


    def reset_parameters(self):
        std = 1.0 / np.sqrt(self.hidden_size)

        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, input, h):
        # map from input to hidden state space
        input = self.i2h(input)

        # map from previous to current hidden state space
        h = self.h2h(h)

        # calculate new hidden state by applying activation
        h = self.act(input + h)



        return h

Once we have implemented a single RNN cell, we can write another `nn.Module` for our RNN network: it will contain one or more cells, concatenated forming multiple layers, and will apply the RNN cells to a given input sequence.

The final output of the RNN will be obtained by applying a final fully connected layer to the last hidden state. 

# Should Set bias to True Later

In [72]:
class RNN(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, output_size, bias=False, activation='tanh'):
        super(RNN, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.bias = bias

        # create a list of modules --> concatenate many cells together
        self.rnn_cell_list = nn.ModuleList()



        # create each layer in the network
        # take care when defining the input size of the first vs later layers because after the first one, the size of the inputs will be of the hidden layers
        for l in range(self.num_layers):
          self.rnn_cell_list.append(RNNCell(self.input_size if l == 0 else self.hidden_size,
                                            self.hidden_size, self.bias, activation))

        

        # create a final linear layer from hidden state to network output
        self.h2o = nn.Linear(self.hidden_size, self.output_size)


    
    def init_hidden(self,  batch_size=1):
        # initialise the hidden state
        return torch.zeros(self.num_layers, batch_size, self.hidden_size, requires_grad=False).to(device)

    def forward(self, input, h0):
        # Input of shape (batch_size, seqence length, input_size)
        # Output of shape (batch_size, output_size)

        batch_size = input.size(0)
        step_size = input.size(1)

        # Initialise lists --> to be able to keep history of then
        outs = []
        hidden = []

        for layer in range(self.num_layers):  # initialise hidden
          hidden.append(h0[layer, :, :])

        # iterate over all elements in the sequence
        # first layer should be input size then the rest should be size of hidden
        for t in range(step_size):
            # iterate over each layer 
            for layer in range(self.num_layers):
                # apply each layer
                # take care to apply the layer to the input or the
                # previous hidden state depending on the layer number
                if layer == 0:
                  hidden_1 = self.rnn_cell_list[layer](input[:, t, :], hidden[layer])
                else: 
                  hidden_1 = self.rnn_cell_list[layer](hidden[layer-1], hidden)
                # store the hidden state of each layer

            hidden[layer] = hidden_1

            # the hidden state of the last layer needs to be recorded
            # to be used in the output
            outs.append(hidden_1)

        # calculate output for each element in the sequence
        out = torch.stack([self.h2o(out) for out in outs], dim=1)

        return out

In [93]:
class SmilesRNN(LightningModule):

    def __init__(self, n_tokens, input_size, hidden_size, num_layers, num_unique_words):
        super(SmilesRNN, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_unique_words = num_unique_words
        
        # add a nn.Embedding
        self.embedding = nn.Embedding(self.hparams.n_tokens, self.hparams.d_model) # give number of words and its size
        
        # add our RNN
        self.rnn = RNN(self.input_size, self.hidden_size, self.num_layers, self.num_unique_words, bias=False, activation='tanh')


    def forward(self, x):
      batch_size = x.size(0)

        # initialise hidden state
      
      hidden = self.rnn.init_hidden(batch_size)
        
        # store the word embeddings
      embedded = self.embedding(x)

        # apply the RNN
      output = self.rnn(embedded, hidden)

      return output

def count_trainable_parameters(model):
    return sum([p.numel() for p in model.parameters() if p.requires_grad])

For the Training you could implement that the first Input in is the Start Token

The train and evaluate functions:

In [None]:
def train_rnn_gen(model, optimizer, criterion, dataloader):
    model.train()    # set model to train mode
    train_loss = 0   # initialise the loss
    
    # loop over dataset
    for i, (x, y) in enumerate(dataloader):
        # send data to device
        x, y = x.to(device), y.to(device)
        # reset the gradients
        optimizer.zero_grad()
        # get output
        y_pred = model(x)
        
        # compute the loss (change shape as crossentropy takes input as batch_size, number of classes, d1, d2, ...)
        loss = criterion(y_pred.permute(0, 2, 1), y) # Flip dimentions so they make sense with y
        train_loss += loss

        # backpropagate
        loss.backward()

        # update weights
        optimizer.step()

    return train_loss/len(dataloader)


def predict_rnn_gen(dataset, model, text, next_words=100):
  # need to transform text so that its understandable by network
    model.eval()  # set model to evaluation mode

    words = text.split(' ')

    # loop over words
    for i in range(next_words):
        # take word from dataset and send to device
        x = torch.tensor([[dataset.word_to_idx[word]] for word in words[i:]]).to(device)

        # compute output and hidden state
        y_pred = model(x)
       
        # take last output
        last_word_logits = y_pred[0][-1]

        # obtain probability vector for last output # Not the exact thing --> we want to sample from it
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().cpu().numpy()

        # sample probability vector to get index in dataset 
        word_idx = np.random.choice(len(last_word_logits), p = p)

        # get word corresponding to dataset
        words.append(dataset.idx_to_word[word_idx])

    return ' '.join(words)


Now for the Hyperparamters and Training Loop: 

In [None]:
device = device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

input_size = 128
n_hidden = 128                         
n_layers = 1
embedding_dim = input_size
n_unique_words = len(words_dataset.unique_words)
batch_size = 256
sequence_length = 4

lr = 5e-3
momentum = 0.5
n_epochs = 20

rnn_gen = RNN_GEN(input_size, embedding_dim, n_hidden, n_layers, n_unique_words).to(device)
print(f'The model has {count_trainable_parameters(rnn_gen):,} trainable parameters')

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn_gen.parameters(), lr=lr)

words_dataset = WordsTensorDataset(data, sequence_length=sequence_length)
words_dataloader = DataLoader(words_dataset, batch_size=batch_size, shuffle=False)   

# Keep track of losses for plotting
liveloss = PlotLosses()
for epoch in range(n_epochs):
    logs = {}
    train_loss = train_rnn_gen(rnn_gen, optimizer, criterion, words_dataloader)

    print(epoch,train_loss)

    logs['' + 'log loss'] = train_loss.item()
    liveloss.update(logs)
    liveloss.draw()