In [4]:

import ast

import pandas as pd
import numpy as np
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint
from rdkit import Chem
from rdkit.Chem import Descriptors, rdMolDescriptors, Crippen
from rdkit.Chem.Draw import MolsToGridImage
import matplotlib.pyplot as plt
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from rdkit import Chem, RDLogger
from rdkit.Chem import BondType
from rdkit.Chem.Draw import MolsToGridImage

RDLogger.DisableLog("rdApp.*")

In [5]:
d1 = pd.read_csv('smiles1_qed.csv')
d2 = pd.read_csv('smiles2_qed.csv')
d3 = pd.read_csv('smiles3_qed.csv')

frames = [d1, d2, d3]
df = pd.concat(frames)

df = df.drop_duplicates(subset=['smiles'])

df.head()

Unnamed: 0,smiles,qed
0,CC(C)(C)c1ccc2occ(CC(=O)Nc3ccccc3F)c2c1\n,0.731901
1,C[C@@H]1CC(Nc2cncc(-c3nncn3C)c2)C[C@@H](C)C1\n,0.941112
2,N#Cc1ccc(-c2ccc(O[C@@H](C(=O)N3CCCC3)c3ccccc3)...,0.626105
3,CCOC(=O)[C@@H]1CCCN(C(=O)c2nc(-c3ccc(C)cc3)n3c...,0.716225
4,N#CC1=C(SCC(=O)Nc2cccc(Cl)c2)N=C([O-])[C@H](C#...,0.809572


In [6]:
SMILE_CHARSET = '["C", "B", "F", "I", "H", "O", "N", "S", "P", "Cl", "Br"]'

bond_mapping = {"SINGLE": 0, "DOUBLE": 1, "TRIPLE": 2, "AROMATIC": 3}
bond_mapping.update(
    {0: BondType.SINGLE, 1: BondType.DOUBLE, 2: BondType.TRIPLE, 3: BondType.AROMATIC}
)
SMILE_CHARSET = ast.literal_eval(SMILE_CHARSET)

MAX_MOLSIZE = max(df["smiles"].str.len())
SMILE_to_index = dict((c, i) for i, c in enumerate(SMILE_CHARSET))
index_to_SMILE = dict((i, c) for i, c in enumerate(SMILE_CHARSET))
atom_mapping = dict(SMILE_to_index)
atom_mapping.update(index_to_SMILE)

BATCH_SIZE = 100
EPOCHS = 100

VAE_LR = 5e-4
NUM_ATOMS = 120  # Maximum number of atoms

ATOM_DIM = len(SMILE_CHARSET)  # Number of atom types
BOND_DIM = 4 + 1  # Number of bond types
LATENT_DIM = 435  # Size of the latent space

In [7]:
def smiles_to_graph(smiles):
    # Converts SMILES to molecule object
    molecule = Chem.MolFromSmiles(smiles)

    # Initialize adjacency and feature tensor
    adjacency = np.zeros((BOND_DIM, NUM_ATOMS, NUM_ATOMS), "float32")
    features = np.zeros((NUM_ATOMS, ATOM_DIM), "float32")

    # loop over each atom in molecule
    for atom in molecule.GetAtoms():
        i = atom.GetIdx()
        atom_type = atom_mapping[atom.GetSymbol()]
        features[i] = np.eye(ATOM_DIM)[atom_type]
        # loop over one-hop neighbors
        for neighbor in atom.GetNeighbors():
            j = neighbor.GetIdx()
            bond = molecule.GetBondBetweenAtoms(i, j)
            bond_type_idx = bond_mapping[bond.GetBondType().name]
            adjacency[bond_type_idx, [i, j], [j, i]] = 1

    # Where no bond, add 1 to last channel (indicating "non-bond")
    # Notice: channels-first
    adjacency[-1, np.sum(adjacency, axis=0) == 0] = 1

    # Where no atom, add 1 to last column (indicating "non-atom")
    features[np.where(np.sum(features, axis=1) == 0)[0], -1] = 1

    return adjacency, features


def graph_to_molecule(graph):
    # Unpack graph
    adjacency, features = graph

    # RWMol is a molecule object intended to be edited
    molecule = Chem.RWMol()

    # Remove "no atoms" & atoms with no bonds
    keep_idx = np.where(
        (np.argmax(features, axis=1) != ATOM_DIM - 1)
        & (np.sum(adjacency[:-1], axis=(0, 1)) != 0)
    )[0]
    features = features[keep_idx]
    adjacency = adjacency[:, keep_idx, :][:, :, keep_idx]

    # Add atoms to molecule
    for atom_type_idx in np.argmax(features, axis=1):
        atom = Chem.Atom(atom_mapping[atom_type_idx])
        _ = molecule.AddAtom(atom)

    # Add bonds between atoms in molecule; based on the upper triangles
    # of the [symmetric] adjacency tensor
    (bonds_ij, atoms_i, atoms_j) = np.where(np.triu(adjacency) == 1)
    for (bond_ij, atom_i, atom_j) in zip(bonds_ij, atoms_i, atoms_j):
        if atom_i == atom_j or bond_ij == BOND_DIM - 1:
            continue
        bond_type = bond_mapping[bond_ij]
        molecule.AddBond(int(atom_i), int(atom_j), bond_type)

    # Sanitize the molecule; for more information on sanitization, see
    # https://www.rdkit.org/docs/RDKit_Book.html#molecular-sanitization
    flag = Chem.SanitizeMol(molecule, catchErrors=True)
    # Let's be strict. If sanitization fails, return None
    if flag != Chem.SanitizeFlags.SANITIZE_NONE:
        return None

    return molecule

In [8]:
data = d3
data.dropna(inplace=True)

In [9]:

train_df = data.sample(frac=0.75, random_state=42)
val_df = data.drop(train_df.index)

train_df.reset_index(drop=True, inplace=True)
val_df.reset_index(drop=True, inplace=True)
train_df = data.sample(frac=0.75, random_state=42)  # random state is a seed value
train_df.reset_index(drop=True, inplace=True)

In [10]:

class MoleculeDataset(Dataset):
    def __init__(self, df, smiles_to_graph):
        self.adjacency_tensors = []
        self.feature_tensors = []
        self.qed_tensors = []

        for idx in range(len(df)):
            adjacency, features = smiles_to_graph(df.iloc[idx]['smiles'])
            qed = df.iloc[idx]['qed']
            self.adjacency_tensors.append(torch.tensor(adjacency, dtype=torch.float32))
            self.feature_tensors.append(torch.tensor(features, dtype=torch.float32))
            self.qed_tensors.append(torch.tensor([qed], dtype=torch.float32))

    def __len__(self):
        return len(self.qed_tensors)

    def __getitem__(self, idx):
        return self.adjacency_tensors[idx], self.feature_tensors[idx], self.qed_tensors[idx]

# Example usage
train_dataset = MoleculeDataset(train_df, smiles_to_graph)
val_dataset = MoleculeDataset(val_df, smiles_to_graph)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class RelationalGraphConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, use_bias=False, activation=F.relu):
        super(RelationalGraphConvLayer, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.use_bias = use_bias
        self.activation = activation
        
        # Initialize the weights
        self.kernel = nn.Parameter(torch.Tensor(in_channels, out_channels))
        if use_bias:
            self.bias = nn.Parameter(torch.Tensor(out_channels))
        else:
            self.register_parameter('bias', None)
        
        self.reset_parameters()
    
    #kernel act as the weights 
    def reset_parameters(self):
        # Initialize parameters similarly to 'glorot_uniform' in Keras
        nn.init.xavier_uniform_(self.kernel)
        if self.use_bias:
            nn.init.zeros_(self.bias)
    
    def forward(self, adjacency, features):
        # Assuming 'adjacency' is [batch_size, bond_dim, num_atoms, num_atoms]
        # and 'features' is [batch_size, num_atoms, atom_dim]
        # We want to aggregate information from neighbors, apply linear transformation, and then activation.
        
        # Aggregate neighbor features
        # Note: Depending on the exact format of your adjacency matrix, you may need to adjust this operation.
        support = torch.matmul(adjacency, features.unsqueeze(1))  # [batch_size, bond_dim, num_atoms, atom_dim]
        
        # Apply linear transformation
        output = torch.einsum('bnad,df->bnaf', support, self.kernel)  # [batch_size, bond_dim, num_atoms, out_channels]
        
        if self.use_bias:
            output += self.bias
        
        # Reduce bond types dimension
        output = torch.sum(output, dim=1)  # [batch_size, num_atoms, out_channels]
        
        # Apply non-linear transformation
        if self.activation is not None:
            output = self.activation(output)
        
        return output


In [12]:
val_adjacency, val_features, val_qed = [], [], []
for idx in range(12):
    v_adjacency, v_features = smiles_to_graph(val_df.iloc[idx]["smiles"])
    v_qed = val_df.iloc[idx]["qed"]
    val_adjacency.append(torch.tensor(v_adjacency, dtype=torch.float32))
    val_features.append(torch.tensor(v_features, dtype=torch.float32))
    val_qed.append(torch.tensor(v_qed, dtype=torch.float32))

# Convert lists to PyTorch tensors
val_adjacency = torch.stack(val_adjacency)
val_features = torch.stack(val_features)
val_qed = torch.stack(val_qed).unsqueeze(-1)  # Adding an extra dime

In [13]:

class GlobalAveragePooling1D(nn.Module):
    def forward(self, x):
        return torch.mean(x, dim=1)

class Encoder(nn.Module):
    def __init__(self, gconv_units, latent_dim, feature_shape, dense_units, dropout_rate):
        super(Encoder, self).__init__()
        self.gconv_layers = nn.ModuleList([RelationalGraphConvLayer(feature_shape[1], gconv_units[0])])
        for i in range(1, len(gconv_units)):
            self.gconv_layers.append(RelationalGraphConvLayer(gconv_units[i-1], gconv_units[i]))
        self.global_avg_pool = GlobalAveragePooling1D()
        self.dense_layers = nn.ModuleList([nn.Linear(gconv_units[-1], dense_units[0])])
        for i in range(1, len(dense_units)):
            self.dense_layers.append(nn.Linear(dense_units[i-1], dense_units[i]))
        self.dropout = nn.Dropout(dropout_rate)
        self.z_mean = nn.Linear(dense_units[-1], latent_dim)
        self.log_var = nn.Linear(dense_units[-1], latent_dim)
    
    def forward(self, adjacency, features):
        x = features
        for gconv in self.gconv_layers:
            x = gconv(adjacency, x)
        x = self.global_avg_pool(x)
        for dense in self.dense_layers:
            x = F.relu(dense(x))
            x = self.dropout(x)
        z_mean = self.z_mean(x)
        log_var = self.log_var(x)
        return z_mean, log_var

class Decoder(nn.Module):
    def __init__(self, latent_dim, adjacency_shape, feature_shape, dense_units, dropout_rate):
        super(Decoder, self).__init__()
        self.dense_layers = nn.ModuleList([nn.Linear(latent_dim, dense_units[0])])
        for i in range(1, len(dense_units)):
            self.dense_layers.append(nn.Linear(dense_units[i-1], dense_units[i]))
        self.dropout = nn.Dropout(dropout_rate)
        self.adjacency_shape = adjacency_shape
        self.feature_shape = feature_shape
        self.adj_decoder = nn.Linear(dense_units[-1], np.prod(adjacency_shape))
        self.feature_decoder = nn.Linear(dense_units[-1], np.prod(feature_shape))
    
    def forward(self, z):
        x = z
        for dense in self.dense_layers:
            x = F.relu(dense(x))  # Changed to ReLU for consistency
            x = self.dropout(x)
        x_adj = self.adj_decoder(x).view(-1, *self.adjacency_shape)
        x_features = self.feature_decoder(x).view(-1, *self.feature_shape)
        return torch.softmax(x_adj, dim=1), torch.softmax(x_features, dim=2)


In [15]:


class Sampling(nn.Module):
    def forward(self, inputs):
        z_mean, z_log_var = inputs
        batch, dim = z_log_var.size()
        epsilon = torch.randn_like(z_log_var)  # Generates a tensor of random numbers from a normal distribution with the same size as z_log_var
        return z_mean + torch.exp(0.5 * z_log_var) * epsilon


In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

class MoleculeGenerator(nn.Module):
    def __init__(self, encoder, decoder, max_len):
        super(MoleculeGenerator, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.property_prediction_layer = nn.Linear(max_len, 1)  # Assuming max_len aligns with the expected input size for property prediction
        
    def forward(self, adjacency_tensor, feature_tensor):
        z_mean, z_log_var = self.encoder(adjacency_tensor, feature_tensor)
        z = self._reparameterize(z_mean, z_log_var)
        gen_adjacency, gen_features = self.decoder(z)
        qed_pred = self.property_prediction_layer(z)  # Example usage, may need adjustment
        return gen_adjacency, gen_features, qed_pred, z_mean, z_log_var

    def _reparameterize(self, z_mean, z_log_var):
        std = torch.exp(0.5*z_log_var)
        eps = torch.randn_like(std)
        return z_mean + eps*std
    
    # Custom loss function to be defined based on your needs
    def loss_function(self, *args, **kwargs):
        # Implement loss calculation
        pass

In [17]:
def train(model, train_loader, val_loader, epochs=10):
    optimizer = Adam(model.parameters())
    model.train()
    
    for epoch in range(epochs):
        for adjacency_tensor, feature_tensor, qed_tensor in train_loader:
            optimizer.zero_grad()
            gen_adjacency, gen_features, qed_pred, z_mean, z_log_var = model(adjacency_tensor, feature_tensor)
            loss = model.loss_function(gen_adjacency, gen_features, qed_tensor, qed_pred, z_mean, z_log_var)
            loss.backward()
            optimizer.step()
        
        # Implement validation step
        # Update your loss tracking variables here

# Assuming `train_loader` and `val_loader` are PyTorch DataLoader instances containing your training and validation data

In [18]:
def vae_loss(reconstructed_adjacency, reconstructed_features, true_adjacency, true_features, z_mean, z_log_var, qed_pred, true_qed):
    # Reconstruction loss
    recon_loss_adj = F.binary_cross_entropy(reconstructed_adjacency, true_adjacency, reduction='sum')
    recon_loss_feat = F.binary_cross_entropy(reconstructed_features, true_features, reduction='sum')
    
    # Property prediction loss (if applicable)
    qed_loss = F.mse_loss(qed_pred, true_qed)
    
    # KL divergence
    kl_loss = -0.5 * torch.sum(1 + z_log_var - z_mean.pow(2) - z_log_var.exp())
    
    return recon_loss_adj + recon_loss_feat + kl_loss + qed_loss


In [19]:
def train_vae(model, train_loader, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for data in train_loader:
            adjacency_tensor, feature_tensor, qed_tensor = data
            optimizer.zero_grad()
            
            # Forward pass
            gen_adjacency, gen_features, qed_pred, z_mean, z_log_var = model(adjacency_tensor, feature_tensor)
            
            # Loss computation
            loss = vae_loss(gen_adjacency, gen_features, adjacency_tensor, feature_tensor, z_mean, z_log_var, qed_pred, qed_tensor)
            total_loss += loss.item()
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
        
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(train_loader)}")

# Model instantiation
encoder = Encoder(gconv_units=[9], latent_dim=LATENT_DIM, feature_shape=(NUM_ATOMS, ATOM_DIM), dense_units=[512], dropout_rate=0.0)
decoder = Decoder(latent_dim=LATENT_DIM, adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS), feature_shape=(NUM_ATOMS, ATOM_DIM), dense_units=[128, 256, 512], dropout_rate=0.2)
model = MoleculeGenerator(encoder, decoder, MAX_MOLSIZE)

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=VAE_LR)

# Assuming `train_loader` is defined
train_vae(model, train_loader, optimizer, epochs=EPOCHS)


IndexError: tuple index out of range