# Drug Feature Extraction with Transformer Encoder

In this Jupyter notebook, a transformer encoder is used to obtain drug feature representations from the SMILES strings.

<br>

### File Requirements

The following files are required in the `data/STEP01` folder:
1. [ChEMBL_SMILES_2kk.csv](https://www.ebi.ac.uk/chembl/explore/compounds/)
2. SPE_ChEMBL_1500freq.txt - Custom tokens obtained by STEP01_SPE.ipynb

<br>

### Output
The trained model, depending on the selected training size, in this case 50000 SMILES strings: “models/SMILES_ENCODER_50k.pth".

<br>

### Evaluation
Evaluation can be performed on the entire dataset, subset of the dataset and on all GDSC SMILES strings. 

The evaluation function calculates the accuracy of SMILES reconstruction, the average Levenshtein distance between the original and reconstructed SMILES strings, and the average length of mismatched SMILES strings.

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import torchtext; torchtext.disable_torchtext_deprecation_warning()
import math
from sklearn.model_selection import train_test_split
from torchtext.vocab import build_vocab_from_iterator
import Levenshtein

# Load the data

In [None]:
# Training data
df = pd.read_csv("data/STEP01/ChEMBL_SMILES_2kk.csv")

# Load custom tokens from SMILES PAIR Encoding
with open("data/STEP01/SPE_ChEMBL_1500freq.txt", "r") as f:
    custom_tokens = [line.strip().split()[0] for line in f]

# Select 50000 SMILES strings for training
smiles_training = df.iloc[:50000, 0]
train_smiles, val_smiles = train_test_split(smiles_training, test_size=0.2, random_state=42)


# Utility functions and torchtext vocabulary creation



In [None]:
def custom_tokenizer(smiles_string):
    """
    Tokenizes a SMILES string by splitting it into tokens based on custom tokens
    :param smiles_string: The SMILES string to tokenize.
    :return: list: A list of tokens.
    """
    for token in custom_tokens:
        if token in smiles_string:
            smiles_string = smiles_string.replace(token, f' {token} ')
    return smiles_string.split()


def yield_tokens(data_iter):
    """
    Generator function to yield tokens from a data iterator
    :param data_iter: An iterable of SMILES strings.
    :return: A list of tokens for each SMILES string.
    """
    for text in data_iter:
        yield custom_tokenizer(text)


# Build vocabulary with special tokens
vocab = build_vocab_from_iterator(yield_tokens(smiles_training), specials=["<pad>", "<unk>", "<sos>", "<eos>"])
vocab.set_default_index(vocab["<unk>"])


def encode_smiles_to_indices(smiles):
    """
    Encodes a SMILES string into a sequence of integer indices.
    :param smiles: The SMILES string to encode.
    :return: A list of integer indices.
    """
    return [vocab["<sos>"]] + [vocab[token] for token in custom_tokenizer(smiles)] + [vocab["<eos>"]]


# Function to pad sequences to the same length
max_len = max(len(encode_smiles_to_indices(smile)) for smile in smiles_training)


def pad_sequence(seq, max_len):
    """
    Pads a sequence to a specified length with the <pad> token.
    :param seq: The sequence to pad.
    :param max_len: The maximum length.
    :return: The padded sequence.
    """
    return seq + [vocab["<pad>"]] * (max_len - len(seq))


def smiles_to_padded_tensor(smiles):
    """
    Encodes a SMILES string into a padded PyTorch tensor of integer indices.
    :param smiles: The SMILES string to encode.
    :return: The encoded SMILES string as a PyTorch tensor.
    """
    sequence = encode_smiles_to_indices(smiles)
    sequence = pad_sequence(sequence, max_len)
    return torch.tensor(sequence).unsqueeze(0)

# Transformer Encoder model

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, num_layers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.pos_encoder = nn.TransformerEncoderLayer(embed_size, num_heads, dim_feedforward=512, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.pos_encoder, num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.embed_size = embed_size

    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(self.embed_size)
        src = self.transformer_encoder(src, src_mask)
        output = self.fc_out(src)
        return output

# Training

In [None]:
def train_model(model, train_smiles_list, val_smiles_list, num_epochs=2, lr=0.001):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0
        for smiles in train_smiles_list:
            optimizer.zero_grad()
            input_seq = smiles_to_padded_tensor(smiles)
            target_seq = input_seq.clone()
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            output = model(input_seq)
            loss = criterion(output.view(-1, len(vocab)), target_seq.view(-1))
            loss.backward()
            optimizer.step()
            epoch_train_loss += loss.item()
        
        # Validation loop
        model.eval()
        with torch.no_grad():
            epoch_val_loss = 0
            for smiles in val_smiles_list:
                input_seq = smiles_to_padded_tensor(smiles)
                target_seq = input_seq.clone()
                input_seq, target_seq = input_seq.to(device), target_seq.to(device)
                output = model(input_seq)
                loss = criterion(output.view(-1, len(vocab)), target_seq.view(-1))
                epoch_val_loss += loss.item()
        
        # Print training and validation loss
        print(f"Epoch {epoch+1}, Training Loss: {epoch_train_loss / len(train_smiles_list)}, Validation Loss: {epoch_val_loss / len(val_smiles_list)}")
    
    
# Train the model
model = TransformerModel(len(vocab), 512, 8, 3, 0.1)
train_model(model, train_smiles, val_smiles)

In [None]:
torch.save(model.state_dict(), "models/SMILES_ENCODER_50k.pth")

# Evaluation

In [None]:
# Utility function to decode the model output
def decode_sequence(input_seq):
    model.eval()
    input_seq = input_seq.to("cuda")
    with torch.no_grad():
        output = model(input_seq)
    output_tokens = output.argmax(dim=-1)
    decoded_tokens = []
    for token_idx in output_tokens.squeeze().tolist():
        token = vocab.lookup_token(token_idx)
        if token == "<eos>":
            break
        decoded_tokens.append(token)
    decoded_smiles = "".join(decoded_tokens).replace("<pad>", "").replace("<sos>", "").replace("<eos>", "")
    return decoded_smiles


# Evaluation function
def evaluate_reconstruction(smiles_list, print_mismatches):
    correct_count = 0
    total_levenshtein_distance = 0
    total_mismatched_smiles_length = 0
    mismatched_count = 0

    for smiles in smiles_list:
        input_seq = smiles_to_padded_tensor(smiles)
        decoded_smiles = decode_sequence(input_seq)

        if smiles == decoded_smiles:
            correct_count += 1
        else:
            mismatched_count += 1
            total_levenshtein_distance += Levenshtein.distance(smiles.upper(), decoded_smiles)
            total_mismatched_smiles_length += len(smiles)
            if print_mismatches:
                print(f"Original SMILES: {smiles}")
                print(f"Reconstructed SMILES: {decoded_smiles}")

    total_count = len(smiles_list)
    accuracy = correct_count / total_count * 100

    if mismatched_count > 0:
        average_levenshtein_distance = total_levenshtein_distance / mismatched_count
        average_mismatched_smiles_length = total_mismatched_smiles_length / mismatched_count
    else:
        average_levenshtein_distance = 0
        average_mismatched_smiles_length = 0

    return accuracy, average_levenshtein_distance, average_mismatched_smiles_length

In [None]:
# For evaluation on the entire dataset: df.iloc[:, 0]
# With my computational resources, this step takes about 7 hours.
eval_data = df.iloc[400000:500000, 0]
accuracy, leven, length = evaluate_reconstruction(eval_data, print_mismatches=False)
print(f"Accuracy of SMILES reconstruction: {accuracy:.2f}%")
print(f"Average Levenshtein distance: {leven:.2f}")
print(f"Average mismatched SMILES length: {length:.2f}")

In [None]:
# Evaluate on the GDSC dataset
gdsc_smiles = pd.read_csv("data/STEP00/ALL_SMILES.csv")
gdsc_smiles = gdsc_smiles.drop_duplicates(subset=["smiles"])
gdsc_smiles = gdsc_smiles["smiles"]

accuracy, leven, length = evaluate_reconstruction(gdsc_smiles, print_mismatches=True)
print(f"Accuracy of SMILES reconstruction: {accuracy:.2f}%")
print(f"Average Levenshtein distance: {leven:.2f}")
print(f"Average mismatched SMILES length: {length:.2f}")