<h1 align="center"> Homework assignment #10: Recurrent Neural Network, LSTM </h1>
<h2 align="center"> Assigned Nov 10 and Due Nov 21 </h2>
<h3 align="center"> MSSE 277B: Machine Learning Algorithms </h3>

1. **LSTM applied to SMILES string generation. (10 pt)** Using the SMILES string from the ANI
dataset with upto 6 heavy atoms, build a LSTM generative model that can generate new smiles string
with given initial character.

**(a) (3pt)** Process the smiles strings from ANI dataset by adding a starting character at the beginning
and an ending character at the end. Look over the dataset and define the vocabulary, use one hot
encoding to encode your smiles strings.

In [1]:
#Imports 
import pickle
import numpy as np
import torch.nn as nn
import torch
import torch.optim as optim
import torch.nn.functional as F  

In [2]:

# Load the provided dataset
file_path = 'ani_smiles.pkl'

# Function to load the dataset
def load_dataset(file_path):
    with open(file_path, 'rb') as file:
        data = pickle.load(file)
    return data

# Load the dataset
dataset = load_dataset(file_path)
dataset[:5]  # Display the first few entries for inspection


[['[',
  'H',
  ']',
  'C',
  '(',
  '[',
  'H',
  ']',
  ')',
  '(',
  '[',
  'H',
  ']',
  ')',
  '[',
  'H',
  ']'],
 ['[', 'H', ']', 'N', '(', '[', 'H', ']', ')', '[', 'H', ']'],
 ['[', 'H', ']', 'O', '[', 'H', ']'],
 ['[',
  'H',
  ']',
  'C',
  '(',
  '[',
  'H',
  ']',
  ')',
  '(',
  '[',
  'H',
  ']',
  ')',
  'C',
  '(',
  '[',
  'H',
  ']',
  ')',
  '(',
  '[',
  'H',
  ']',
  ')',
  '[',
  'H',
  ']'],
 ['[',
  'H',
  ']',
  'N',
  '(',
  '[',
  'H',
  ']',
  ')',
  'C',
  '(',
  '[',
  'H',
  ']',
  ')',
  '(',
  '[',
  'H',
  ']',
  ')',
  '[',
  'H',
  ']']]

In [3]:
# Step 1: Concatenate characters in each SMILES string
concatenated_smiles = [''.join(smiles) for smiles in dataset]

# Step 2: Add starting '^' and ending '$' characters to each SMILES string
processed_smiles = ['[' + smiles + '$' for smiles in concatenated_smiles]

# Step 3: Define the vocabulary (unique characters in the dataset)
unique_chars = set(''.join(processed_smiles))
vocabulary = sorted(list(unique_chars))  # Sort for consistency

# Display first few processed SMILES and the vocabulary
processed_smiles[:5], vocabulary


(['[[H]C([H])([H])[H]$',
  '[[H]N([H])[H]$',
  '[[H]O[H]$',
  '[[H]C([H])([H])C([H])([H])[H]$',
  '[[H]N([H])C([H])([H])[H]$'],
 ['#',
  '$',
  '(',
  ')',
  '1',
  '2',
  '=',
  'C',
  'H',
  'N',
  'O',
  '[',
  ']',
  'c',
  'n',
  'o'])

In [4]:
# Step 4: One-hot Encode the SMILES Strings

# Create a mapping from characters to indices and vice versa
char_to_index = {char: i for i, char in enumerate(vocabulary)}
index_to_char = {i: char for char, i in char_to_index.items()}

# Function to one-hot encode a SMILES string
def one_hot_encode(smiles, char_to_index, max_length):
    # Initialize a matrix of zeros with dimensions (max_length, len(vocabulary))
    encoding = np.zeros((max_length, len(char_to_index)), dtype=np.float32)

    # Encode each character in the SMILES string
    for i, char in enumerate(smiles):
        encoding[i, char_to_index[char]] = 1.0
    return encoding

# Determine the maximum length of SMILES strings for consistent encoding
max_smiles_length = max(len(s) for s in processed_smiles)

# One-hot encode the SMILES strings
encoded_smiles = [one_hot_encode(smiles, char_to_index, max_smiles_length) for smiles in processed_smiles]

# Example: Display the encoding of the first SMILES string
encoded_smiles[0], processed_smiles[0]


(array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], dtype=float32),
 '[[H]C([H])([H])[H]$')

In [5]:
# Filter out SMILES strings longer than the threshold
filtered_smiles = [s for s in processed_smiles if len(s) > 1 ]

# Update the maximum SMILES length based on the filtered dataset
max_smiles_length = max(len(s) for s in filtered_smiles)

# One-hot encode the filtered SMILES strings
encoded_smiles = [one_hot_encode(smiles, char_to_index, max_smiles_length) for smiles in filtered_smiles]

# Example: Display the encoding of the first SMILES string in the filtered dataset
encoded_smiles[0], filtered_smiles[0]


(array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], dtype=float32),
 '[[H]C([H])([H])[H]$')

**(b) (7pt)** Build a LSTM model with 1 recurrent layer. Starting with the starting character and grow
a string character by character using model prediction until it reaches a ending character. Look
at the string you grown, is it a valid SMILES string?

In [6]:

# Define the LSTM model
class SMILESLSTM(nn.Module):
    def __init__(self, input_size, hidden_dim, output_size, n_layers=1):
        super(SMILESLSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.lstm = nn.LSTM(input_size, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, x, hidden):
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        return (torch.zeros(self.n_layers, batch_size, self.hidden_dim),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim))

# Model parameters
input_size = len(vocabulary)  # Size of the one-hot encoded vectors
hidden_dim = 128  # Number of features in the hidden state
output_size = len(vocabulary)  # Size of the output (same as vocabulary size)

# Create the LSTM model
lstm_model = SMILESLSTM(input_size, hidden_dim, output_size)

# Display the model summary
lstm_model


SMILESLSTM(
  (lstm): LSTM(16, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=16, bias=True)
)

In [7]:
encoded_smiles_array = np.array(encoded_smiles)

from sklearn.model_selection import train_test_split

# Split the data into training (80%) and validation (20%) sets
train_data, val_data = train_test_split(encoded_smiles_array, test_size=0.2, random_state=42)


In [8]:

# Hyperparameters
learning_rate = 0.001
num_epochs = 50  # Number of epochs for training
batch_size = 64  # Batch size for training

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=learning_rate)

# Training and Validation Loop
for epoch in range(num_epochs):
    lstm_model.train()  # Set model to training mode
    train_loss = 0.0

    # Training Phase
    for i in range(0, len(train_data), batch_size):
        batch = train_data[i:i+batch_size]
        input_seq = torch.tensor([s[:-1] for s in batch], dtype=torch.float32)

        # Extracting the indices of the 'hot' elements for the target sequence
        target_seq = torch.tensor([np.argmax(s, axis=1) for s in batch], dtype=torch.long)
        target_seq = target_seq[:, 1:].contiguous().view(-1)  # Exclude the first character and flatten

        hidden = lstm_model.init_hidden(len(batch))
        optimizer.zero_grad()

        output, _ = lstm_model(input_seq, hidden)
        loss = criterion(output.view(-1, len(vocabulary)), target_seq)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Validation Phase
    lstm_model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    with torch.no_grad():
        for i in range(0, len(val_data), batch_size):
            batch = val_data[i:i+batch_size]
            input_seq = torch.tensor([s[:-1] for s in batch], dtype=torch.float32)
            
            # Extracting the indices of the 'hot' elements for the target sequence
            target_seq = torch.tensor([np.argmax(s, axis=1) for s in batch], dtype=torch.long)
            target_seq = target_seq[:, 1:].contiguous().view(-1)  # Exclude the first character and flatten

            hidden = lstm_model.init_hidden(len(batch))
            output, _ = lstm_model(input_seq, hidden)
            loss = criterion(output.view(-1, len(vocabulary)), target_seq)

            val_loss += loss.item()

    avg_train_loss = train_loss / len(train_data)
    avg_val_loss = val_loss / len(val_data)

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss}, Validation Loss: {avg_val_loss}")


  input_seq = torch.tensor([s[:-1] for s in batch], dtype=torch.float32)


Epoch 1/50, Training Loss: 0.038311253917419304, Validation Loss: 0.03180019868969244
Epoch 2/50, Training Loss: 0.028995955226111546, Validation Loss: 0.029307341171523272
Epoch 3/50, Training Loss: 0.025205014115673, Validation Loss: 0.025906890125597937
Epoch 4/50, Training Loss: 0.02236939899328738, Validation Loss: 0.02250235801362722
Epoch 5/50, Training Loss: 0.021146081766839755, Validation Loss: 0.021307654636727888
Epoch 6/50, Training Loss: 0.019124165261532627, Validation Loss: 0.01872154851417757
Epoch 7/50, Training Loss: 0.016204854526088736, Validation Loss: 0.015651222002708305
Epoch 8/50, Training Loss: 0.013310098547046468, Validation Loss: 0.012590631758425869
Epoch 9/50, Training Loss: 0.011028224381349854, Validation Loss: 0.010738335423550363
Epoch 10/50, Training Loss: 0.009678573594928462, Validation Loss: 0.0094645068807117
Epoch 11/50, Training Loss: 0.008739324949555477, Validation Loss: 0.008642628368011301
Epoch 12/50, Training Loss: 0.010616025198145775, 

In [11]:
def generate_smiles(model, start_token='[', max_length=100):
    model.eval()  # Switch model to evaluation mode
    initial_input = one_hot_encode(start_token, char_to_index, 1)
    initial_input = torch.tensor(initial_input).unsqueeze(0)  # Add batch dimension

    # Initialize hidden state
    hidden = model.init_hidden(1)

    # Generate SMILES
    smiles = start_token
    for _ in range(max_length):
        output, hidden = model(initial_input, hidden)
        probabilities = F.softmax(output[0, -1], dim=0).detach().numpy()
        next_char_index = np.random.choice(len(vocabulary), p=probabilities)
        next_char = index_to_char[next_char_index]

        if next_char == '$':  # End token
            break

        smiles += next_char
        initial_input = one_hot_encode(next_char, char_to_index, 1)
        initial_input = torch.tensor(initial_input).unsqueeze(0)  # Add batch dimension

    return smiles

# Generate a new SMILES string
new_smiles = generate_smiles(lstm_model)
print("Generated SMILES:", new_smiles)


Generated SMILES: [[H]OC([H])([H])C1([H])C([H])([H])[H]
