In [None]:
import pandas as pd
import io
import requests
import re
from pathlib import Path, PosixPath
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.distributions.multinomial import Multinomial

# url="https://apps.ml.jku.at/challenge/data/datasets/mol_generation/smiles_train.txt"
# txt_file=requests.get(url).content

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.activity.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fexperimentsandconfigs%20https%3a%2f%2fwww.googleapis.com%2fauth%2fphotos.native&response_type=code

Enter your authorization code:
4/1AY0e-g7i60E3QFPOc0Np349J1FsxsiaFpy422RkpHZbH3razmTx-EgSVev0


In [None]:
# with open('somefile.txt', 'a') as the_file:
#     for line in lines[:20000]:
#       the_file.write(line)  

In [None]:
# val = pd.read_csv("/content/drive/MyDrive/AILS_mol_generation/smiles_val_lu.txt")["0"].to_list()
# print(val)

Preprocess



In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class TextDS(Dataset):
    def __init__(self, text_file: PosixPath, seq_length: int, batch_size: int, vocabs= None):
        self.vocab_input = vocabs
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.input_txt = open(text_file, encoding="utf-8").readlines()[:100000]
        print(len(self.input_txt))
        self.input_txt = ' '.join(self.input_txt)
        self.txt_data = self.cleanse()

        self.torch_sequences, self.vocabs, self.idx2char, self.char2idx = self.chars(seq_length)
        print(1)
        self.torch_sequences = self.yield_sequence_split()
        self.datagen = self.get_datagenerator()
        
        
    def __len__(self) -> int:
        return len(self.data_torch)
        
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.torch_sequences[idx]
    
    def cleanse(self):
        
        txt_data = self.input_txt.replace('\n', '').replace('\r', '').replace('\ufeff', '').lower()
        
        charset_to_delete = []
        for i in charset_to_delete:
            txt_data = txt_data.replace(i , '')  
                    
        return txt_data
    
    def chars(self, seq_length):
        if self.vocab_input != None:
            vocab = self.vocab_input
        else:
            vocab = sorted(set(self.txt_data))
        
        char2idx = {u:i for i, u in enumerate(vocab)}
        idx2char = np.array(vocab)

        text_as_int = np.array([char2idx[c] for c in self.txt_data])
        
        print('{} ---- characters mapped to int ---- > {}'.format(repr(self.txt_data[:13]), text_as_int[:13]))
        print('Vocab size: ' + str(len(vocab)))
        
        length = seq_length
        sequences = list()
        for i in range(length, len(text_as_int)):
            # select sequence of tokens
            seq = text_as_int[i-length:i+1]
            # store
            sequences.append(seq)
            
                
        sequences = torch.tensor(sequences).to(device)
        
        return sequences, vocab, idx2char, char2idx
    
    def yield_sequence_split(self):
        
        l = []
        
        for seq in self.torch_sequences:
            input_example, target_example = self.split_input_target(seq)
            l.append([input_example, target_example[-1]])

        print("Datasize: ", len(l))
        
        return l     
        
    def get_datagenerator(self):
        
        datagen = torch.utils.data.DataLoader(self.torch_sequences, batch_size=self.batch_size)
        return datagen

    def split_input_target(self, chunk):
        input_text = chunk[:-1]
        target_text = chunk[1:]
        return input_text, target_text
        

In [None]:
class Model(nn.Module):
    def __init__(self, input_size: int, hidden_size, output_size, n_of_classes, seq_length, LR=0.001, momentum= 0.9):
        super(Model, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_of_classes = n_of_classes
        
        self.z = nn.Linear(input_size, hidden_size)
        self.z_rec = nn.Linear(hidden_size,hidden_size) 
        self.z_tanh = nn.Tanh()
        
        self.i = nn.Linear(input_size, hidden_size)
        self.i_rec = nn.Linear(hidden_size, hidden_size)
        self.i_sigmoid = nn.Sigmoid()
                
        self.o = nn.Linear(input_size, hidden_size)
        self.o_rec = nn.Linear(hidden_size, hidden_size)
        self.o_sigmoid = nn.Sigmoid()

        self.c_tanh = nn.Tanh()
        
        self.h0 = torch.zeros((hidden_size)).type(torch.float)
        self.c0 = torch.zeros((hidden_size)).type(torch.float)
        
        self.V = nn.Linear(hidden_size, n_of_classes)
        
        self.loss = nn.CrossEntropyLoss()
    
        self.optimizer = torch.optim.Adam(self.parameters(), lr=LR, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
        #self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=8, gamma=0.1)
        self.current_loss = []

        self.apply(self._weights_init) # not for forget gate (needs init with high bias)

        # initialize forget gate with high bias
        self.f = nn.Linear(input_size, hidden_size)
        self.f.bias.data.fill_(10)
        self.f_rec = nn.Linear(hidden_size, hidden_size)
        self.f_rec.bias.data.fill_(10)
        self.f_sigmoid = nn.Sigmoid()

    @staticmethod
    def _weights_init(m):
      if isinstance(m, nn.Linear):          
        nn.init.xavier_uniform_(m.weight, gain=nn.init.calculate_gain("relu")),
        nn.init.constant_(m.bias, 0)
        
        
    def forward(self, batch) -> torch.Tensor:
         
        batch_size = len(batch)
        seq_len = batch.shape[1]
        
        self.h0 = torch.zeros((batch_size, hidden_size)).type(torch.float).to(device)
        self.c0 = torch.zeros((batch_size, hidden_size)).type(torch.float).to(device)
        

        for i in range(seq_len):

            x= batch[:,i,:]
            
            y_z = self.z_tanh(self.z(x) + self.z_rec(self.h0))
            y_i = self.i_sigmoid(self.i(x) + self.i_rec(self.h0))
            y_f = self.f_sigmoid(self.f(x) + self.f_rec(self.h0))

            c = y_f * self.c0 + y_z * y_i
            self.c0 = c.detach()

            y_c = self.c_tanh(c)
            y_o = self.o_sigmoid(self.o(x) + self.o_rec(self.h0))

            y = y_c * y_o
            self.h0 = y.detach()  # rec. y; detach grad attribute
        
        y = self.V(y)  # puts y into the right shape for cross entropy
             
        return y
    
    def print_grads(self):
        print(sum(self.z.weight.grad))
    
    def reset_loss(self):
        self.current_loss = []
        
    def backward(self, y_hat, y):
        loss_ = self.loss(y_hat, y)
        loss_.backward(retain_graph=True)
  
        self.current_loss.append(loss_.detach())
        
    def update(self):
        self.optimizer.step()
    

In [None]:
from torch import nn
class Model_(nn.Module):
    def __init__(self, input_size: int, n_hidden, output_size, n_of_classes, seq_length, LR=0.001, momentum= 0.9, drop_prob=0, n_layers=1):
        super(Model_, self).__init__()
        
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr


        #LSTM Layers
        self.lstm = nn.LSTM(n_of_classes, self.n_hidden, self.n_layers,
                           dropout=self.drop_prob, batch_first=True)

        #Dropout Layer
        self.dropout = nn.Dropout(drop_prob)

        ##Fully-connected output layer
        self.fc = nn.Linear(n_hidden, output_size)
        
        self.loss = nn.CrossEntropyLoss()
    
        self.optimizer = torch.optim.Adam(self.parameters(), lr=LR, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
        #self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=8, gamma=0.1)
        self.current_loss = []

        self.apply(self._weights_init) # not for forget gate (needs init with high bias)

    @staticmethod
    def _weights_init(m):
      if isinstance(m, nn.Linear):          
        nn.init.xavier_uniform_(m.weight, gain=nn.init.calculate_gain("relu")),
        nn.init.constant_(m.bias, 0)
        
        
    def forward(self, x) -> torch.Tensor:

        batch_size = len(x)

        self.h0 = torch.zeros((self.n_layers, batch_size, self.n_hidden)).type(torch.float).to(device)
        self.c0 = torch.zeros((self.n_layers, batch_size, self.n_hidden)).type(torch.float).to(device)

        hidden = (self.h0, self.c0)
        r_output, hidden = self.lstm(x, hidden)

        out = self.dropout(r_output)
        out = out[:,-1,:]
        out = self.fc(out)

        return out 



    
    def print_grads(self):
        print(sum(self.z.weight.grad))
    
    def reset_loss(self):
        self.current_loss = []
        
    def backward(self, y_hat, y):
        loss_ = self.loss(y_hat, y)
        loss_.backward(retain_graph=True)
  
        self.current_loss.append(loss_.detach())
        
    def update(self):
        self.optimizer.step()
    

In [None]:
#Parameters
seq_length_ = 24
batch_size_ = 128
lr = 0.001
hidden_size = 1024
epochs = 10
n_layers= 2
drop_prob = 0.2

data = TextDS("/content/drive/MyDrive/AILS_mol_generation/smiles_train.txt", seq_length_, batch_size_, None)
#data_validation = TextDS('sample_data/trump_val.txt', seq_length_, batch_size_, data.vocabs) #use vocab idx of train

data_ = torch.load("/content/drive/MyDrive/AILS_mol_generation/torch_sequences_4859892")  
      
_train_data = data_[:4459892]
_val_data = data_[4459892:]

datagen = torch.utils.data.DataLoader(_train_data, batch_size=batch_size_)
datagen_validation = torch.utils.data.DataLoader(_val_data, batch_size=batch_size_)


n_of_classes = len(data.vocabs)
sequence_length = data.seq_length

input_size = n_of_classes
hidden_size = hidden_size
output_size = n_of_classes


m = Model_(input_size, hidden_size, output_size, n_of_classes, sequence_length, lr, n_layers=2, drop_prob=drop_prob)
m.to(device) #enable cuda

loss_list = list()
valid_loss_list = []
accuracy_list = list()
correct = []


100000
'o=c1c(=cc2ccc' ---- characters mapped to int ---- > [28 17 21  8 21  3 17 21 21  9 21 21 21]
Vocab size: 32
1
Datasize:  4859892


In [None]:
for epoch in range(epochs):
    print('')
    print('Epoch ' + str(epoch))
    
    if epoch != 0:
        loss_list.append((sum(m.current_loss) / len(m.current_loss)))
        m.reset_loss()
        
        accuracy_list.append(sum(correct) / len(correct))
        correct = []
        
    # *******  Validation ************#
        valid_loss = []
        with torch.no_grad():
            for x_valid, target_valid in datagen_validation:
                x_valid = nn.functional.one_hot(x_valid.type(torch.long), n_of_classes).type(torch.float)
                y_hat_valid = m.forward(x_valid)  
                y_valid = torch.tensor(target_valid.type(torch.long))
                valid_loss.append(m.loss(y_hat_valid,y_valid))

        print('Valid. Loss:' + str(sum(valid_loss) / len(valid_loss)))
        valid_loss_list.append(valid_loss)
        valid_loss = []
        

    # ********** Training *******************#
    for step_idx, (x,target) in enumerate(datagen):
               
        m.optimizer.zero_grad()

        x = nn.functional.one_hot(x.type(torch.long), n_of_classes).type(torch.float)
        y_hat = m.forward(x)  
        y = torch.tensor(target.type(torch.long))
        
        y_for_acc = torch.argmax(torch.nn.functional.softmax(y_hat.detach()).data, axis=1)
        correct.append(torch.sum((y_for_acc == y).float()) / len(x))
                
        
        m.backward(y_hat,y)
        m.update()
        
        if step_idx % 1000 == 0 and step_idx != 0:
            tmp = m.current_loss[-1000:]
            print('Step: ' + str(step_idx)+ ', ' +'Loss: ' + str(round(float(sum(tmp) / len(tmp)),3)) + ', ' + 'Accuracy: ' + str(round(float(sum(correct[-1000:]) / 1000),3)) )  

    torch.save(m, "/content/drive/MyDrive/AILS_mol_generation/lstm_2_layer")


Epoch 0




Step: 1000, Loss: 1.353, Accuracy: 0.571
Step: 2000, Loss: 1.028, Accuracy: 0.657
Step: 3000, Loss: 0.956, Accuracy: 0.676
Step: 4000, Loss: 0.923, Accuracy: 0.686
Step: 5000, Loss: 0.899, Accuracy: 0.692
Step: 6000, Loss: 0.873, Accuracy: 0.7
Step: 7000, Loss: 0.862, Accuracy: 0.704
Step: 8000, Loss: 0.855, Accuracy: 0.707
Step: 9000, Loss: 0.847, Accuracy: 0.709
Step: 10000, Loss: 0.832, Accuracy: 0.714
Step: 11000, Loss: 0.825, Accuracy: 0.714
Step: 12000, Loss: 0.818, Accuracy: 0.718
Step: 13000, Loss: 0.814, Accuracy: 0.717
Step: 14000, Loss: 0.813, Accuracy: 0.72
Step: 15000, Loss: 0.806, Accuracy: 0.723
Step: 16000, Loss: 0.803, Accuracy: 0.723
Step: 17000, Loss: 0.798, Accuracy: 0.724
Step: 18000, Loss: 0.797, Accuracy: 0.724
Step: 19000, Loss: 0.786, Accuracy: 0.728
Step: 20000, Loss: 0.787, Accuracy: 0.728
Step: 21000, Loss: 0.785, Accuracy: 0.728
Step: 22000, Loss: 0.782, Accuracy: 0.729
Step: 23000, Loss: 0.779, Accuracy: 0.73
Step: 24000, Loss: 0.774, Accuracy: 0.732
Step:



Valid. Loss:tensor(0.7586, device='cuda:0')
Step: 1000, Loss: 0.755, Accuracy: 0.738
Step: 2000, Loss: 0.748, Accuracy: 0.74
Step: 3000, Loss: 0.75, Accuracy: 0.738
Step: 4000, Loss: 0.756, Accuracy: 0.738
Step: 5000, Loss: 0.749, Accuracy: 0.74
Step: 6000, Loss: 0.744, Accuracy: 0.741
Step: 7000, Loss: 0.746, Accuracy: 0.74
Step: 8000, Loss: 0.745, Accuracy: 0.742
Step: 9000, Loss: 0.745, Accuracy: 0.741
Step: 10000, Loss: 0.741, Accuracy: 0.743
Step: 11000, Loss: 0.738, Accuracy: 0.743
Step: 12000, Loss: 0.734, Accuracy: 0.744
Step: 13000, Loss: 0.735, Accuracy: 0.742
Step: 14000, Loss: 0.736, Accuracy: 0.743
Step: 15000, Loss: 0.733, Accuracy: 0.746
Step: 16000, Loss: 0.73, Accuracy: 0.745
Step: 17000, Loss: 0.731, Accuracy: 0.746
Step: 18000, Loss: 0.732, Accuracy: 0.745
Step: 19000, Loss: 0.724, Accuracy: 0.748
Step: 20000, Loss: 0.725, Accuracy: 0.748
Step: 21000, Loss: 0.724, Accuracy: 0.747
Step: 22000, Loss: 0.722, Accuracy: 0.747
Step: 23000, Loss: 0.721, Accuracy: 0.749
Step

In [None]:
def predict(_model, prime="C", seq_length=24, top_k=1, n_of_chars=100, confidence=0.8):

  def softmax_pred(X):
      expo = torch.exp(X)
      expo_sum = torch.sum(torch.exp(X))
      soft = expo/expo_sum
      pred = torch.argmax(soft)
      if top_k > 1 and pred < confidence:                 # < 0.7 condition for only doing randomization only in unsure cases for text quality
          vals, topk_idx = torch.topk(soft, top_k)
          idx = torch.randint(0, top_k, (1,))
          pred = topk_idx[0][idx]
      return pred

  for i in range(n_of_chars):

      prime_len = len(prime)
      if prime_len <= seq_length_: text_as_int = np.array([data.char2idx[c] for c in prime.lower()])
      else: text_as_int = np.array([data.char2idx[c] for c in prime[-seq_length:].lower()])

      text_one_hot = nn.functional.one_hot(torch.tensor(text_as_int).type(torch.long), n_of_classes).type(torch.float)
      input_tensor = text_one_hot.reshape((1, text_one_hot.shape[0], text_one_hot.shape[1])).to(device)

      if i % 2000 == 0:
        print(i)

      output = _model.forward(input_tensor).detach()
      pred = softmax_pred(output.to(device))
      char = data.idx2char[pred]
      prime += char

  return prime



In [None]:
prime = "C"

str_ = predict(m, prime=prime, top_k=3, seq_length=seq_length_, n_of_chars=10000, confidence=0.5)

0
2000
4000
6000
8000


In [1]:
mols_pred = str_.split(" ")

print(mols_pred)
print(len(mols_pred))
gen_mol = pd.DataFrame(mols_pred)

NameError: name 'str_' is not defined

In [None]:
m = torch.load("/content/drive/MyDrive/AILS_mol_generation/lstm_2_layer")

In [None]:
!pip install fcd
!pip install rdkit-pypi

Collecting fcd
[?25l  Downloading https://files.pythonhosted.org/packages/a8/1d/75bf35ec742cbe679bfe373e5a0859a1debbd1bcc15d3cfa0930620438b2/FCD-1.1-py3-none-any.whl (53.1MB)
[K     |████████████████████████████████| 53.1MB 98kB/s 
Installing collected packages: fcd
Successfully installed fcd-1.1
Collecting rdkit-pypi
[?25l  Downloading https://files.pythonhosted.org/packages/ad/66/dcedc9498f7a3424d5acaeb97531992cc758f4219c870f95e38f9b6f58f3/rdkit_pypi-2021.3.1.4-cp37-cp37m-manylinux2014_x86_64.whl (32.8MB)
[K     |████████████████████████████████| 32.8MB 146kB/s 
[?25hInstalling collected packages: rdkit-pypi
Successfully installed rdkit-pypi-2021.3.1.4


In [None]:
from fcd import get_fcd, load_ref_model,canonical_smiles, get_predictions, calculate_frechet_distance
#https://github.com/bioinf-jku/FCD/blob/master/example.ipynb

#gen_mol = pd.read_csv(gen_mol_file,header=None)[0] #IMPORTANT: take at least 10000 molecules as FCD can vary with sample size 
gen_mol = gen_mol[0].to_list()
sample1 = np.random.choice(gen_mol, 10000, replace=False)
sample2 = np.random.choice(gen_mol, 10000, replace=False)

# get canonical smiles and filter invalid ones
can_sample1 = [w for w in canonical_smiles(sample1) if w is not None]
can_sample2 = [w for w in canonical_smiles(sample2) if w is not None]


#get CHEBMLNET activations of generated molecules 
act1 = get_predictions(model, can_sample1)
act2 = get_predictions(model, can_sample2)

mu1 = np.mean(act1, axis=0)
sigma1 = np.cov(act1.T)

mu2 = np.mean(act2, axis=0)
sigma2 = np.cov(act2.T)

fcd_score = calculate_frechet_distance(
    mu1=mu1,
    mu2=mu2, 
    sigma1=sigma1,
    sigma2=sigma2)

print('FCD: ',fcd_score)

NameError: ignored

In [None]:
def get_metric(list_):    

    with open("/content/drive/MyDrive/AILS_mol_generation/smiles_train.txt") as f:
        smiles_train = {s for s in f.read().split() if s}

    smiles_gen = list_[:10000]

    smiles_can = canonical_smiles(smiles_gen)
    smiles_valid = [s for s in smiles_can if s is not None]
    smiles_unique = set(smiles_valid)
    smiles_novel = smiles_unique - smiles_train

    validity = len(smiles_valid) / len(smiles_gen)
    uniqueness = len(smiles_unique) / len(smiles_gen)
    novelty = len(smiles_novel) / len(smiles_gen)

    # if name == 'validity':
    #     return validity
    # elif name == 'uniqueness':
    #     return uniqueness
    # elif name == 'novelty':
    #     return novelty
    # elif name != 'fcd':
    #     raise ValueError('Invalid metric: %s' % name)

    # Load precomputed test mean and covariance
    with open("/content/drive/MyDrive/AILS_mol_generation/test_stats.p", 'rb') as f:
        mean_test, cov_test = pickle.load(f)

    model = loadmodel()
    mean_gen, cov_gen = getstats(smiles_valid, model)

    fcd_value = fcd.calculate_frechet_distance(
        mu1=mean_gen,
        mu2=mean_test,
        sigma1=cov_gen,
        sigma2=cov_test)
    return fcd_value

In [None]:
f = open('/content/drive/MyDrive/AILS_mol_generation/submission.txt', "r")
f = f.readlines()