Install prereqs:

In [13]:
!pip install pandas numpy torch torchvision SmilesPE

# Install Tokenizer
!pip install SmilesPE

^C
[31mERROR: Operation cancelled by user[0m[31m
[0m

Check if GPU is available

In [14]:
import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')

# memory allocation fix
# import os

# os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:50'

Using device: cuda


Load in datasets for training and testing. Using ZINC Drugs Clean, 1 million randomly sampled, for training. Using Massbank MS Positive for training.

In [15]:
import pandas as pd
import gc

# Define the function to divide DataFrame into chunks
def divide_into_chunks(df, chunk_size):
    chunks = [df[i:i + chunk_size] for i in range(0, df.shape[0], chunk_size)]
    return chunks

# Load the full training data
raw_training = pd.read_csv('zinc_training.csv', skiprows=[724671])

# Randomly sample entries from the raw_training DataFrame
train_sample = raw_training.sample(n=100, random_state=42)

# Clear the memory occupied by raw_training
del raw_training
gc.collect()

# Create chunks from train_chunk
train_chunks = divide_into_chunks(train_sample, chunk_size=100)

raw_testing = pd.read_csv('massbank_testing.csv', skiprows=[410])

test_sample = raw_testing.sample(n=100, random_state=42)

# Clear the memory occupied by raw_training
del raw_testing
gc.collect()

test_chunks = divide_into_chunks(test_sample, chunk_size=100)

print("Done! 🙌")

Done! 🙌


Data Processing

Preprocessing the SMILES data, using SMILES Pair Encoding. Link to encoding library and vocabulary list: https://github.com/XinhaoLi74/SmilesPE

Preprocessing the mass spectra data using binning procedure, categorizing each peak into fixed bins.

In [16]:
import codecs
from SmilesPE.tokenizer import SPE_Tokenizer
import numpy as np
import pandas as pd
import gc

# Path to your SPE vocabulary file
spe_vob_path = 'SPE_ChEMBL.txt'

# Open the SPE vocabulary file and initialize the tokenizer
with codecs.open(spe_vob_path, encoding='utf-8') as spe_vob:
    spe = SPE_Tokenizer(spe_vob)

# Function for encoding SMILES data
def preprocess_smiles(smiles_data, tokenizer):
    return smiles_data.apply(tokenizer.tokenize)
    
# Function for encoding spectra data
def encode_mass_spectra(spectra, max_mz=500, bin_precision=0.01, max_peaks=100):
    if isinstance(spectra, str):
        spectra = np.array([float(x) for x in spectra.strip('[]').split(', ') if float(x) <= max_mz])
    elif isinstance(spectra, list):
        spectra = np.array([x for x in spectra if x <= max_mz])
    else:
        raise ValueError("Spectra data must be either a string or a list of floats")

    if len(spectra) > max_peaks:
        spectra = spectra[:max_peaks]

    bin_indices = np.floor(spectra / bin_precision).astype(int)
    encoded = np.zeros(int(max_mz / bin_precision))
    encoded[bin_indices] = 1

    return encoded


# Function for processing each chunk, with garbage collection
def process_chunks(chunks, smiles_col, spectra_col, tokenizer):
    processed_chunks = []
    for chunk in chunks:
        # Create a copy of the chunk to avoid SettingWithCopyWarning
        working_chunk = chunk.copy()

        # Process the data
        working_chunk['tokenized_smiles'] = preprocess_smiles(working_chunk[smiles_col], tokenizer)
        working_chunk['encoded_spectra'] = working_chunk[spectra_col].apply(encode_mass_spectra)

        # Append the processed chunk to the list
        processed_chunks.append(working_chunk)

        # Free up memory
        del working_chunk
        gc.collect()
        # torch.cuda.empty_cache()  # Optionally clear unused GPU memory after processing

    return processed_chunks

# Process training and testing data
processed_train_chunks = process_chunks(train_chunks, 'smiles', 'METFRAG_MZ', spe)
processed_test_chunks = process_chunks(test_chunks, 'smiles', 'spectrum', spe)

# Inspect tokenized SMILES and encoded spectra from the first row of the first training chunk
first_train_row = processed_train_chunks[0].iloc[0]
print("Tokenized SMILES (Training):", first_train_row['tokenized_smiles'])
print("Encoded Spectra (Training):", first_train_row['encoded_spectra'])

# Similar inspection for the testing data
first_test_row = processed_test_chunks[0].iloc[0]
print("\nTokenized SMILES (Testing):", first_test_row['tokenized_smiles'])
print("Encoded Spectra (Testing):", first_test_row['encoded_spectra'])


print("Done! 🙌")


Tokenized SMILES (Training): Cc1n nc2n1 CCC[C@H]2 NC(=O) c3cc( cn 3C) C#N
Encoded Spectra (Training): [0. 0. 0. ... 0. 0. 0.]

Tokenized SMILES (Testing): c( c2)ccc( c2) NC(=O) C(= C(C) 1) S(=O)(=O) CCO1
Encoded Spectra (Testing): [0. 0. 0. ... 0. 0. 0.]
Done! 🙌


In [17]:
# Function for encoding spectra data, vectorized with numpy
def encode_mass_spectra_vectorized(spectra, max_mz=500, bin_precision=0.01, max_peaks=100):
    # Handle both string and list input
    if isinstance(spectra, str):
        spectra = np.array([float(x) for x in spectra.strip('[]').split(', ') if float(x) <= max_mz])
    elif isinstance(spectra, list):
        spectra = np.array(spectra)
    else:
        raise ValueError("Spectra data must be either a string or a list of floats")

    # Ensure spectra are within the specified m/z range
    spectra = spectra[spectra <= max_mz]

    # Limit the number of peaks
    if len(spectra) > max_peaks:
        # Sort spectra and select the highest intensity peaks (assuming higher intensity means more important)
        spectra = np.sort(spectra)[-max_peaks:]

    # Binning procedure
    bin_indices = np.floor(spectra / bin_precision).astype(int)
    bin_indices = bin_indices[bin_indices < int(max_mz / bin_precision)] # Ensure indices are within range

    # Create a one-hot encoded vector for each peak
    encoded = np.zeros(int(max_mz / bin_precision))
    np.put(encoded, bin_indices, 1)

    return encoded

# Function for encoding spectra data, using GPU
def encode_mass_spectra_gpu(spectra, max_mz=500, bin_precision=0.05, max_peaks=50, device='cuda'):
    if isinstance(spectra, str):
        spectra = [float(x) for x in spectra.strip('[]').split(', ') if float(x) <= max_mz]
    elif not isinstance(spectra, list):
        raise ValueError("Spectra data must be either a string or a list of floats")

    # Convert to tensor and use half-precision floats
    spectra = torch.tensor(spectra, device=device, dtype=torch.float16)

    spectra = spectra[spectra <= max_mz]

    if spectra.size(0) > max_peaks:
        spectra, _ = torch.sort(spectra, descending=True)
        spectra = spectra[:max_peaks]

    bin_indices = torch.floor(spectra / bin_precision).type(torch.int64)  # int64 for indices
    bin_indices = bin_indices[bin_indices < int(max_mz / bin_precision)]

    # Use a half-precision float tensor for the encoded array
    encoded = torch.zeros(int(max_mz / bin_precision), device=device, dtype=torch.float16)
    encoded.scatter_(0, bin_indices, 1)

    return encoded

Concatenate Chunks

In [18]:
# Concatenate processed training data chunks
processed_train_df = pd.concat(processed_train_chunks, ignore_index=True)

# Concatenate processed testing data chunks
processed_test_df = pd.concat(processed_test_chunks, ignore_index=True)

print(processed_train_df)

print("Done! 🙌")

                                               smiles  \
0               Cc1nnc2n1CCC[C@H]2NC(=O)c3cc(cn3C)C#N   
1   Cc1cc(sc1C(=O)N2CCC[C@@H](C2)NS(=O)(=O)C)NC(=O...   
2   Cc1c(nc(s1)NC(=O)C[C@H]2c3ccccc3C(=O)O2)c4ccc(...   
3   COc1ccc2c(c1)CCCN2C(=O)c3ccc(cc3)CS(=O)(=O)c4c...   
4     Cc1ccccc1Cc2nc(no2)C[NH+]3CCC4(CC3)OC[C@H](O4)C   
..                                                ...   
95  Cc1ccc(c(c1)N(C(C)C)C(=O)[C@@H]2Cc3ccccc3N2C(=...   
96       CCc1nnc(s1)NC(=O)[C@H]2[C@@H](C2(C)C)C=C(C)C   
97    Cc1c(cccc1Cl)NC(=O)CN(c2cc(ccc2OC)Cl)S(=O)(=O)C   
98  CCCc1cc(n(n1)C)C(=O)N2CC[C@@H]3CC[C@H](C2)[NH+]3C   
99                           Cc1c(cnn1C)CNc2cccc(c2)I   

                                           METFRAG_MZ  \
0   [81.03215, 85.05224, 91.02907, 93.02091, 95.04...   
1   [81.987175, 84.08082, 85.052246, 85.08865, 86....   
2   [82.98242, 87.03149, 90.046425, 93.03351, 97.9...   
3   [85.05224, 90.04643, 91.05426, 92.02567, 104.0...   
4   [83.02399, 83.023994, 84.0

Tranformer and Positional Encoding

In [19]:
# Load the SPE vocabulary file and create a mapping from tokens to IDs
spe_vob_path = 'SPE_ChEMBL.txt'
with open(spe_vob_path, 'r', encoding='utf-8') as file:
    tokens = [line.strip() for line in file.readlines()]
token_to_id = {token: idx for idx, token in enumerate(tokens)}

# Add a padding token to the token_to_id dictionary
pad_token = '<PAD>'
if pad_token not in token_to_id:
    token_to_id[pad_token] = len(token_to_id)
pad_token_id = token_to_id[pad_token]

# Function to convert tokenized SMILES to IDs
def smiles_to_ids(tokenized_smiles, token_to_id):
    return [token_to_id[token] if token in token_to_id else token_to_id['<UNK>'] for token in tokenized_smiles]

# Add an unknown token to the token_to_id dictionary if it doesn't exist
if '<UNK>' not in token_to_id:
    token_to_id['<UNK>'] = len(token_to_id)

import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

# Hyperparameters
d_model = 512  # The number of expected features in the encoder/decoder inputs
nhead = 8  # The number of heads in the multiheadattention models
num_encoder_layers = 3  # The number of sub-encoder-layers in the encoder
num_decoder_layers = 3  # The number of sub-decoder-layers in the decoder
dim_feedforward = 2048  # The dimension of the feedforward network model
dropout = 0.1  # The dropout value
batch_size = 64  # Batch size for training

class TransformerModel(nn.Module):
    def __init__(self, num_tokens, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerModel, self).__init__()
        self.encoder_embedding = nn.Embedding(num_tokens, d_model, padding_idx=pad_token_id)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
        self.decoder_embedding = nn.Embedding(num_tokens, d_model, padding_idx=pad_token_id)
        self.pos_decoder = PositionalEncoding(d_model, dropout)
        self.output_layer = nn.Linear(d_model, num_tokens)
        
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):
        src = self.encoder_embedding(src) * math.sqrt(d_model)
        src = self.pos_encoder(src)
        tgt = self.decoder_embedding(tgt) * math.sqrt(d_model)
        tgt = self.pos_decoder(tgt)
        output = self.transformer(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        output = self.output_layer(output)
        return output

# Initialize the transformer model
# num_tokens = 3002 + 1 +1  # Plus one for the padding token
num_tokens = len(token_to_id)
transformer_model = TransformerModel(num_tokens, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)



print("Initialized 🛠")

Initialized 🛠


Create datasets

In [20]:
class SMILESToSpectraDataset(Dataset):
    def __init__(self, smiles, spectra, token_to_id, max_length=128):
        self.smiles = smiles
        self.spectra = spectra
        self.token_to_id = token_to_id
        self.max_length = max_length

    def __len__(self):
        return len(self.smiles)

    def __getitem__(self, idx):
        # Tokenize the SMILES string and convert to IDs
        tokenized_smiles = spe.tokenize(self.smiles[idx]).split()
        smiles_ids = smiles_to_ids(tokenized_smiles, self.token_to_id)
        encoded_spectra = self.spectra[idx]
        
        # Calculate padding lengths, ensuring non-negative values
        smiles_padding_length = max(self.max_length - len(smiles_ids), 0)
        spectra_padding_length = max(self.max_length - len(encoded_spectra), 0)
        
        # Padding with the ID of the pad_token
        padded_smiles_ids = np.pad(smiles_ids, (0, smiles_padding_length), mode='constant', constant_values=pad_token_id)
        padded_spectra = np.pad(encoded_spectra, (0, spectra_padding_length), mode='constant', constant_values=0)
        
        # Convert to tensors
        src = torch.tensor(padded_smiles_ids, dtype=torch.long)
        tgt = torch.tensor(padded_spectra, dtype=torch.float)
        
        return src, tgt

# Create datasets
train_dataset = SMILESToSpectraDataset(processed_train_df['smiles'].tolist(), processed_train_df['encoded_spectra'].tolist(), token_to_id)
test_dataset = SMILESToSpectraDataset(processed_test_df['smiles'].tolist(), processed_test_df['encoded_spectra'].tolist(), token_to_id)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


print("Done 🙌")

Done 🙌


Training loop

In [None]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def create_padding_mask(seq, pad_token_id):
    return (seq == pad_token_id).transpose(0, 1)

# Example usage:
# src_mask = generate_square_subsequent_mask(src_seq_length)
# tgt_mask = generate_square_subsequent_mask(tgt_seq_length)
# src_padding_mask = create_padding_mask(src, pad_token_id)
# tgt_padding_mask = create_padding_mask(tgt, pad_token_id)

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(transformer_model.parameters())

# Training loop
def train_epoch(model, train_loader, criterion, optimizer, pad_token_id):
    model.train()
    total_loss = 0
    for src, tgt in train_loader:
        src_seq_length, tgt_seq_length = src.size(1), tgt.size(1)
        
        # Create masks
        src_mask = generate_square_subsequent_mask(src_seq_length).to(src.device)
        tgt_mask = generate_square_subsequent_mask(tgt_seq_length).to(tgt.device)
        src_padding_mask = create_padding_mask(src, pad_token_id).to(src.device)
        tgt_padding_mask = create_padding_mask(tgt, pad_token_id).to(tgt.device)
        memory_key_padding_mask = src_padding_mask.clone()

        optimizer.zero_grad()
        
        # Pass the masks to the forward method
        output = model(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        
        # Compute the loss; we may need to adjust the target to match the output shape
        loss = criterion(output.view(-1, output.size(-1)), tgt.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

num_epochs = 10
for epoch in range(num_epochs):
    loss = train_epoch(transformer_model, train_loader, criterion, optimizer, pad_token_id)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

In [None]:
# Evaluation loop
def evaluate(model, test_loader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for src, tgt in test_loader:
            output = model(src, tgt)
            loss = criterion(output, tgt)
            total_loss += loss.item()
    return total_loss / len(test_loader)

# Evaluate the model
test_loss = evaluate(transformer_model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}")

View model's output on validation set

In [None]:
import torch.nn.functional as F

# Function to convert logits to binary predictions
def logits_to_binary_predictions(logits, threshold=0.5):
    probabilities = torch.sigmoid(logits)
    binary_predictions = (probabilities >= threshold).int()
    return binary_predictions

# Function to print model predictions and actual values
def print_model_predictions(model, data_loader, tokenizer, num_samples=5):
    model.eval()
    samples_printed = 0
    with torch.no_grad():
        for src, tgt in data_loader:
            output = model(src, tgt)
            binary_predictions = logits_to_binary_predictions(output)
            
            # Convert tokenized SMILES back to string
            smiles = [''.join(tokenizer.decode(token)) for token in src]
            
            # Print the results
            for i in range(src.size(0)):
                if samples_printed >= num_samples:
                    break
                print(f"SMILES: {smiles[i]}")
                print(f"Predicted Spectra: {binary_predictions[i].tolist()}")
                print(f"Actual Spectra: {tgt[i].tolist()}\n")
                samples_printed += 1
            if samples_printed >= num_samples:
                break

# Print model predictions on the validation set
print_model_predictions(transformer_model, test_loader, spe, num_samples=100)