##**Tokenizer**

In [None]:
import numpy as np

# CHARSET is receiced from preprocess.py module
CHARSET = [' ', '#', '(', ')', '+', '-', '/', '1', '2', '3', '4', '5', '6', '7',
           '8', '=', '@', 'B', 'C', 'F', 'H', 'I', 'N', 'O', 'P', 'S', '[', '\\',
           ']', 'c', 'l', 'n', 'o', 'r', 's']


class OneHotTokenizer():

    def __init__(self, charset=CHARSET, fixed_length=120):
        self.charset = charset
        self.fixed_length = fixed_length

    @staticmethod
    def get_one_hot_vector(idx, N):
        '''
        :param idx: index in a vector
        :param N: length of vector
        :return: one hot vector
            Eg. get_one_hot_vector(idx=2, N=6) -> [0, 0, 1, 0, 0, 0]
        '''
        return list(map(int, [idx == i for i in range(N)]))


    @staticmethod
    def get_one_hot_index(chars, char):
        '''
        :param chars: a list of characters or a string
        :param char: a character
        :return: index
            Eg 1: get_one_hot_index(chars=CHARSET,   # CHARSET is a list above
                                    char='#')   -> 1
            Eg 2. get_one_hot_index(chars='Testing',  char='s') -> 2
        '''
        try:
            return chars.index(char)
        except:
            return None


    def pad_smiles(self, smiles):
        '''
        :param smiles: Moleculer SMILES
        :return: smiles with fixed_length
            Eg 1. pad_smiles('banana', fixed_length=20) -> 'banana              '
            Eg 2. pad_smiles('banana', fixed_length=2)  -> 'ba'
            Eg 3. pad_smiles(smiles='COc(c1)cccc1C#N') ->
                  'COc(c1)cccc1C#N                       '  # Total = 120 characters
        '''
        if len(smiles) <= self.fixed_length:
            return smiles.ljust(self.fixed_length)
        return smiles[: self.fixed_length]


    def encode_one_hot(self, smiles):
        '''
        :param smiles: a molecular SMILES
        :return:
        Eg 1. smiles_encode = encode_one_hot(smiles='COc(c1)cccc1C#N')
              Output: smiles_encode =
              [  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]  # at position 1 <-> 'C'
                 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0]  # at position 1 <-> O
                 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
                 [0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
                 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
                 ...
                 [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
                 [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
              ]
              with shape = (120, 35)
              120: length of the Smiles (Note: if shorter, make empty values at the end)
              35: each character in smiles is coded in one-hot vector with length = length of CHARSET
        '''
        one_hot_indexes = [self.get_one_hot_index(chars=CHARSET, char=char) for char in self.pad_smiles(smiles)]
        one_hot_vectors = [self.get_one_hot_vector(idx=idx, N=len(CHARSET)) for idx in one_hot_indexes]
        return np.array(one_hot_vectors)


    def tokenize(self, list_smiles):
        return np.array([self.encode_one_hot(smiles) for smiles in list_smiles])


    def decode_one_hot(self, list_encoded_smiles):
        '''
        :param list_encoded_smiles: list of encoded smiles getting from tokenize()
        Eg. list_encoded_smiles =
        [
          # first smiles
          [ [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]  # first character in smiles. 'C"
            [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0]  # second character in smiles. 'O'
            [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
            [0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
            [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
            ...
            [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]  # 119th character in smiles.
            [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]  # 120th character in smiles.
          ]
          # second smiles
          # third smiles
       ]
        z.shape = (3, 120, 35)   # the first index is number of smiles
        :return:
        '''
        list_smiles_get_back = []
        for smiles_index in range(len(list_encoded_smiles)):  # run for each smiles
            smiles_string = ''
            for row in range(len(list_encoded_smiles[smiles_index])):  # run for each row (each encoded character)
                one_hot = np.argmax(list_encoded_smiles[smiles_index][row])
                smiles_string += self.charset[one_hot]
            # End of for row
            list_smiles_get_back.append([smiles_string.strip()])
        # End of for smiles_index
        return list_smiles_get_back


def decode_smiles_from_indexes(vec, charset=CHARSET, decode=True):
    '''
    :param vec:
    :param charset: in this code the real charset is np.array
    Eg 1. charset is CHARSET
    charset = [' ', '#', ')', '(', '+', '-', '/', '1', '3', '2', '5',
               '4', '7', '6', '=', '@', 'C', 'B', 'F', 'I', 'H', 'O',
               'N', 'S', '[', ']', '\\', 'c', 'l', 'o', 'n', 's', 'r']
    Eg 2. If not decode the charset may have
    charset = [b' ' b'#' b')' b'(' b'+' b'-' b'/' b'1' b'3' b'2' b'5'
               b'4' b'7' b'6' b'=' b'@' b'C' b'B' b'F' b'I' b'H' b'O'
               b'N' b'S' b'[' b']' b'\\' b'c' b'l' b'o' b'n' b's' b'r']
    :return:
        Eg 1.  decode_smiles_from_indexes(vec=np.array([0, 3, 1, 2, 4, 5]),
                                          charset='abcdef')
           Since 'abcdef' has indexes 012345,
           Then vec = [0, 3, 1, 2, 4, 5] will generate a string 'adbcef'
    '''
    if decode:
        try:
            charset = np.array([v.decode('utf-8') for v in charset])
        except:
            pass
    # End of if
    return ''.join(map(lambda x: charset[x], vec)).strip()


def test():
    one_hot_tokenizer = OneHotTokenizer(charset=CHARSET, fixed_length=120)
    one_hot_vector = one_hot_tokenizer.get_one_hot_vector(idx=2, N=6)
    print(f'one_hot_vector = {one_hot_vector}')

    one_hot_index = one_hot_tokenizer.get_one_hot_index(chars='Testing', char='s')
    print(f'one_hot_index = {one_hot_index}')

    smiles = one_hot_tokenizer.pad_smiles(smiles='COc(c1)cccc1C#N')
    print(f'smiles = {smiles} with length = {len(smiles)}')

    smiles_encoded = one_hot_tokenizer.encode_one_hot(smiles='COc(c1)cccc1C#N')
    np.set_printoptions(threshold=np.inf)
    print(f'smiles_encoded = {smiles_encoded}')
    print(f'smiles_encoded.shape = {smiles_encoded.shape}')

    list_encoded_smiles = one_hot_tokenizer.tokenize(list_smiles=['COc(c1)cccc1C#N'])
    print(f'\ntokenizer for a list of Smiles = {list_encoded_smiles}')
    print(f'list_encoded_smiles.shape = {list_encoded_smiles.shape}')

    list_smiles_get_back = one_hot_tokenizer.decode_one_hot(list_encoded_smiles)
    print(f'list_smiles_get_back = {list_smiles_get_back}')

    print('\ndecode smiles from indexes - testing with fake smiles')
    fake_smiles = decode_smiles_from_indexes(vec=np.array([0, 3, 1, 2, 4, 5]),
                                             charset='abcdef')
    print(f'fake_smiles = {fake_smiles}')

##**Model**

In [None]:
import torch
import torch.utils.data
from torch import nn
import torch.nn.functional as F


class VAE(nn.Module):
    '''
    Input data:
        Shape = (batch, 120, 35)
    '''
    def __init__(self):
        super(VAE, self).__init__()

        self.conv_1 = nn.Conv1d(in_channels=120, out_channels=9, kernel_size=9, stride=1)
        self.conv_2 = nn.Conv1d(in_channels=9, out_channels=9, kernel_size=9, stride=1)
        self.conv_3 = nn.Conv1d(in_channels=9, out_channels=10, kernel_size=11, stride=1)

        self.fc_0 = nn.Linear(in_features=90, out_features=435)
        self.fc_1 = nn.Linear(in_features=435, out_features=292)
        self.fc_2 = nn.Linear(in_features=435, out_features=292)
        self.fc_3 = nn.Linear(in_features=292, out_features=292)

        self.gru = nn.GRU(input_size=292, hidden_size=501, num_layers=3, batch_first=True)
        self.fc_4 = nn.Linear(in_features=501, out_features=35)

        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()


    def encode(self, x):
        '''
        :param x:
        :return:
        Example
        import numpy
        import torch.nn as nn
        import torch.nn.functional as F
        import torch

        batch_size = 64
        inputs = torch.rand(batch_size, 120, 35)

        # Convolutional layer
        x = F.relu(nn.Conv1d(120, 9, kernel_size=9)(inputs)) # x.shape=torch.Size([64, 9, 27])
        x = F.relu(nn.Conv1d(9, 9, kernel_size=9)(x))        # x.shape=torch.Size([64, 9, 19])
        x = F.relu(nn.Conv1d(9, 10, kernel_size=11)(x))      # x.shape=torch.Size([64, 10, 9])

        # fatten 2 last dimension but keep the batch_size
        x = x.view(x.size(0), -1)                            # x.shape=torch.Size([64, 90])

        # Fully connected layer
        x = F.selu(nn.Linear(90, 435)(x))                    # x.shape=torch.Size([64, 435])

        # Get z_mean and z_logvar (log-variance)
        z_mean = nn.Linear(435, 292)(x)                      # x.shape=torch.Size([64, 292])
        z_logvar = nn.Linear(435, 292)(x)                    # x.shape=torch.Size([64, 292])
        '''
        # Convolutional layer
        x = self.relu(self.conv_1(x))
        x = self.relu(self.conv_2(x))
        x = self.relu(self.conv_3(x))

        # Fatten 2 last dimension but keep the batch_size
        x = x.view(x.size(0), -1)

        # Fully connected layer
        x = F.selu(self.fc_0(x))

        # return z_mean and z_logvar
        return self.fc_1(x), self.fc_2(x)


    def sampling(self, z_mean, z_logvar):
        '''
        It is a parameterization trick to sample to get latent variable Z
        :param z_mean: a output tensor of a standard fully connected layer from encoder (encode() function)
        :param z_logvar: a output tensor of a standard fully connected layer from encoder (encode() function)
        :return: z (latent variable)
            z = z_mean + std * epsilon

        Note. torch.randn_like(input): Returns a tensor with the same size as input that
              is filled with random numbers from a normal distribution with mean 0 and
              variance 1. Therefore, input here is just to get shape.

        Example: continue with example in encode() method
        std = torch.exp(0.5 * z_logvar)                 # std.shape=torch.Size([64, 292])
        epsilon = 1e-2 * torch.randn_like(input=std)  # epsilon.shape=torch.Size([64, 292])
        z = z_mean + std * epsilon                        # z.shape=torch.Size([64, 292])
        '''
        std = torch.exp(0.5 * z_logvar)
        epsilon = 1e-2 * torch.randn_like(input=std)  # multiply 1e-2 to make epsilon smaller
        return  z_mean + std * epsilon


    def decode(self, z):
        '''
        :param z:
        :return:

        Example: continue with example in sampling() method
        z = F.selu(nn.Linear(292, 292)(z))                      # z.shape=torch.Size([64, 292])
        z = z.view(z.size(0), 1, z.size(-1)).repeat(1, 120, 1)  # z.shape=torch.Size([64, 120, 292])
        output, h_n = nn.GRU(292, 501,
                             num_layers=3,
                             batch_first=True)(z)               # output.shape=torch.Size([64, 120, 501])
                                                                # h_n.shape=torch.Size([3, 64, 501])
        out_reshape = output.contiguous()
                            .view(-1, output.size(-1))          # out_reshape=torch.Size([7680, 501]) # 7680=64*120

        y_out = nn.Linear(501, 35)(out_reshape)                 # y_out.shape=torch.Size([7680, 35])
        y_out = F.softmax(y_out, dim=1)                         # y_out.shape=torch.Size([7680, 35])
                                                                # dim=1 -> sum to 1 to every row
        y = y_out.contiguous()
                 .view(output.size(0), -1, y_out.size(-1))      # y.shape=torch.Size([64, 120, 35])
        '''
        z = F.selu(self.fc_3(z))
        z = z.view(z.size(0), 1, z.size(-1)).repeat(1, 120, 1)
        output, h_n = self.gru(z)
        output_reshape = output.contiguous().view(-1, output.size(-1))
        y_out = F.softmax(self.fc_4(output_reshape), dim=1)
        y = y_out.contiguous().view(output.size(0), -1, y_out.size(-1))
        return y
    
    
    def forward(self, x):
        z_mean, z_logvar = self.encode(x)
        z = self.sampling(z_mean, z_logvar)
        y = self.decode(z)
        return y, z_mean, z_logvar


def test_class_VAE():
    batch = 64
    inputs = torch.rand(batch, 120, 35)
    y, z_mean, z_logvar = VAE().forward(x=inputs)
    print(f'output: y.shape = {y.shape}')
    print(f'latent space: z_mean.shape = {z_mean.shape}')
    print(f'latent space: z_logvar.shape = {z_logvar.shape}')
        

if __name__ == '__main__':
    print('Run a test for forward VAE')
    test_class_VAE()

Run a test for forward VAE
output: y.shape = torch.Size([64, 120, 35])
latent space: z_mean.shape = torch.Size([64, 292])
latent space: z_logvar.shape = torch.Size([64, 292])


## **Google drive mount**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

# path: "metadata.csv" and "embeddings.csv" files are stored here
path = '/content/drive/My Drive/Colab Notebooks'

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.activity.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fexperimentsandconfigs%20https%3a%2f%2fwww.googleapis.com%2fauth%2fphotos.native&response_type=code

Enter your authorization code:
4/1AY0e-g4bvnEx6NuEpNPB3qjfAXSQnRxEeNNRX5-weW7a51z96f-MIFE925E
Mounted at /content/drive


In [4]:
path

'/content/drive/My Drive/Colab Notebooks'

##**Train model**

In [5]:
import numpy as np
import torch
from torch import optim
import torch.nn.functional as F
import os
from datetime import datetime

# from models import VAE
# from tokenizer import OneHotTokenizer


def vae_loss(x_reconstructed, x, z_mean, z_logvar):
    bce_loss = F.binary_cross_entropy(input=x_reconstructed, target=x, reduction='sum')
    kl_loss = -0.5 * torch.sum(1 + z_logvar - z_mean.pow(2) - z_logvar.exp())
    return bce_loss + kl_loss


def train(file_path_train_data=r'data\smiles_tokenized.npz',
          path_checkpoint=None,
          checkpoint_save='every',
          file_path_checkpoint_for_continue_learning=None,
          batch_size=1000,
          epochs=100):
    '''
    :param file_path_train_data: file path of data after preprocessing
        Eg. r'data\smiles_tokenized_10000.npz' or r'data\smiles_tokenized.npz'
    :param path_checkpoint:
        if existed, save model check point.
        Eg. checkpoint = {
                'epoch': epoch,
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict()
            }
    :param checkpoint_save: only activate when path_checkpoint is existed
        checkpoint_save = 'every': save every epoch
                        = 'last': save only final epoch
                        = a number: every 'a number' epoch. eg. every 5 epoch

    :param file_path_checkpoint_for_continue_learning:
        if existed, read the model and optimizer parameters as starting point for training
                    (instead of training from the initial state)
    :param batch_size:
    :param epochs:
    :return:
    '''
    assert checkpoint_save == 'every' or checkpoint_save == 'last' or isinstance(checkpoint_save, int)
    # Get data
    train_data = np.load(file_path_train_data)['arr'].astype(np.float32) # Eg. X.shape = (249456, 120, 35)
    train_data = torch.utils.data.TensorDataset(torch.from_numpy(train_data))
    train_data_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)

    # Model
    torch.manual_seed(42)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f'device = {device}')
    model = VAE()
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    start_epoch, stop_epoch = 0, epochs
    if file_path_checkpoint_for_continue_learning:
        checkpoint = torch.load(file_path_checkpoint_for_continue_learning, map_location=device)
        model.load_state_dict(checkpoint['model'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        start_epoch, stop_epoch = checkpoint['epoch'] + 1, checkpoint['epoch'] + epochs + 1
    # End of if

    # model.to(device)

    for epoch in range(start_epoch, stop_epoch):
        model.train()
        train_loss = 0
        for batch_idx, data in enumerate(train_data_loader):
            data = data[0].to(device)  # Note: data is a list of one 'element'
            optimizer.zero_grad()  # reset - zero out gradient
            
            # Forward process: compute output (prediction) and loss
            output, z_mean, z_logvar = model(data)
            loss = vae_loss(output, data, z_mean, z_logvar)
            
            # Backward process: compute gradients
            loss.backward()
            
            # Update parameters
            optimizer.step()

            # Display some info
            train_loss += loss
            if batch_idx % 100 == 0:
                print(f'\nepoch/batch_idx: {epoch}/{batch_idx}\t loss = {loss: .4f}')
                # Input data
                input_data = data.cpu().numpy()
                print(f'\tFor testing: The first input smiles of batch={batch_size} Smiles')
                print('\t', OneHotTokenizer().decode_one_hot(list_encoded_smiles=[input_data[0]]))
                # Output data
                output_data = output.cpu().detach().numpy()
                print(f'\tFor testing: The first output smiles of {len(output_data)} generated Smiles')
                print('\t', OneHotTokenizer().decode_one_hot(list_encoded_smiles=[output_data[0]]))
        # End of for batch_idx,...
        train_loss /= len(train_data_loader.dataset)
        print(f'Average train loss of this epoch = {train_loss}')

        if path_checkpoint:
            checkpoint = {
                'epoch': epoch,
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict()
            }
            if checkpoint_save == 'every':
                torch.save(obj=checkpoint, f=os.path.join(path_checkpoint, fr'checkpoint_{epoch}.pt'))
            elif checkpoint_save == 'last':
                if epoch == stop_epoch - 1:
                    torch.save(obj=checkpoint, f=os.path.join(path_checkpoint, fr'checkpoint_{epoch}.pt'))
            else:
                if epoch % checkpoint_save == 0:
                    torch.save(obj=checkpoint, f=os.path.join(path_checkpoint, fr'checkpoint_{epoch}.pt'))
        # End of if path_checkpoint:
    # End of for epoch
    return train_loss

##**Traing for the first 20 epochs**

In [6]:
# if __name__ == '__main__':
#     train_loss = train(file_path_train_data=os.path.join(path, 'smiles_tokenized.npz'),
#                        path_checkpoint=path,
#                        checkpoint_save='last',
#                        file_path_checkpoint_for_continue_learning=None,  # r'checkpoint_3.pt',
#                        batch_size=1000,
#                        epochs=20)

#     print(f'train_loss = ', train_loss)

##**Continue learning: Training for the next 20 epochs**
From epochs=20 to 39 (Traing of 20 epochs (0 to 19) has been done above)

In [7]:
# if __name__ == '__main__':
#     time_start = datetime.now()
#     print(f'time_start = {time_start}')
#     train_loss = train(file_path_train_data=os.path.join(path, 'smiles_tokenized.npz'),
#                        path_checkpoint=path,
#                        checkpoint_save='last',
#                        file_path_checkpoint_for_continue_learning=os.path.join(path, 'checkpoint_19.pt'),
#                        batch_size=1000,
#                        epochs=20)
#     print(f'train_loss = ', train_loss)
#     time_end = datetime.now()
#     print(f'time_end = {time_end}')
#     print(f'Total running time = {time_end - time_start}')

## **Continue learning: Training for the next 60 epochs**
Just change params of the above code to run