In [2]:
from rdkit import Chem
import numpy as np
from rdkit.Chem import rdmolops
from rdkit.Chem import rdDistGeom
class Graph:
    def __init__(self, molecule_smiles:str, node_vec_len:int, max_atoms:int=None):
        self.smiles = molecule_smiles
        self.node_vec_len = node_vec_len
        self.max_atoms = max_atoms
        self.smiles_to_mol()
        if self.mol is not None:
            self.smiles_to_graph()

    def smiles_to_mol(self):
        mol = Chem.MolFromSmiles(self.smiles)
        if mol is None:
            self.mol = None
        self.mol =Chem.AddHs(mol)

    def smiles_to_graph(self):
        atoms = self.mol.GetAtoms()

        if self.max_atoms is None:
            n_atoms = len(list(atoms))
        else:
            n_atoms = self.max_atoms
        
        node_mat = np.zeros((n_atoms, self.node_vec_len))

        for atom in atoms:
            atom_idx = atom.GetIdx()
            atom_no = atom.GetAtomicNum()
            node_mat[atom_idx, atom_no]= 1
          

        adj_mat = rdmolops.GetAdjacencyMatrix(self.mol)
        self.std_adj_mat = np.copy(adj_mat)

        dist_mat = rdDistGeom.GetDistanceMatrix(self.mol)
        dist_mat[dist_mat == 0.] = 1

        adj_mat = adj_mat *(1/dist_mat)

        dim_add = n_atoms - adj_mat.shape[0]
        adj_mat = np.pad(adj_mat, pad_width= ((0, dim_add), (0, dim_add)), mode= 'constant')
        adj_mat = adj_mat + np.eye(n_atoms)

        self.node_mat = node_mat
        self.adj_mat = adj_mat




In [3]:
from torch.utils.data import Dataset
import pandas as pd
import torch
class GraphData(Dataset):
    def __init__(self, dataset_path:str, node_vec_len:int, max_atoms:int=None):
        self.node_vec_len = node_vec_len
        self.max_atoms = max_atoms
        df = pd.read_csv(dataset_path)
        self.indices = df.index.to_list()
        self.smiles = df['smiles'].to_list()
        self.outputs = df['measured log solubility in mols per litre'].to_list()

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx:int):
        smile = self.smiles[idx]
        graph = Graph(smile, self.node_vec_len, self.max_atoms)
        node_mat = torch.Tensor(graph.node_mat)
        adj_mat = torch.Tensor(graph.adj_mat)
        output = torch.Tensor([self.outputs[idx]])
        return (node_mat, adj_mat), output , smile


def collate_graph_dataset(dataset: Dataset):
    node_mats = []
    adj_mats = []
    outputs = []
    smiles = []
    for (node_mat, adj_mat), output, smile in dataset:
        node_mats.append(node_mat)
        adj_mats.append(adj_mat)
        outputs.append(output)
        smiles.append(smile)
    
    node_mats_tensor = torch.cat(node_mats, dim=0)
    adj_mats_tensor = torch.cat(adj_mats, dim=0)
    outputs_tensor = torch.cat(outputs, dim=0)

    return (node_mats_tensor, adj_mats_tensor), outputs_tensor, smiles

In [4]:
import torch.nn as nn

class ConvolutionLayer(nn.Module):
    def __inin__(self, node_in_len, node_out_len):
        super(ConvolutionLayer, self).__init__()
        self.conv_linear = nn.Linear(node_in_len, node_out_len)
        self.con_activation = nn.LeakyReLU()

    def forward(self, node_mat, adj_mat):
        n_neighbors = adj_mat.sum(dim= -1, keepdim= True)
        self.idx_mat = torch.eye(adj_mat.shape[-2], adj_mat.shape[-1], device=n_neighbors.device)
        idx_mat = self.ids_mat.unsqueeze(0).expand(*adj_mat.shape)
        inv_deg_mat = torch.mul(idx_mat, 1/n_neighbors)
        node_feats = torch.bmm(inv_deg_mat,adj_mat)
        node_feats = torch.bmm(node_feats, node_mat)
        node_feats = self.conv_linear(node_feats)
        node_feats = self.con_activation(node_feats)
        return node_feats


class PoolingLayer(nn.Module):
    def __init__(self):
        super(PoolingLayer, self).__init__()

    def forward(self, node_feats):
        pooled_node_feats = node_feats.mean(dim=1)
        return pooled_node_feats

In [5]:
class ChemGCN(nn.Module):
    def __init__(self, node_vec_len, node_feats_len, hiden_feats_len,n_cov, n_hidden,n_outputs, p_dropout=0.0):
        super(ChemGCN, self).__init__()
        self.init_transorm = nn.Linear(node_vec_len,node_feats_len)
        self.conv_layers = nn.ModuleList([ConvolutionLayer(node_feats_len, node_feats_len) for _ in range(n_cov)])
        self.pooling = PoolingLayer()
        pooled_node_feats_len = node_feats_len
        self.pooled_activation = nn.LeakyReLU()
        self.pooled_to_hidden = nn.Linear(pooled_node_feats_len, hiden_feats_len)
        self.hidden_layer = nn.Linear(hiden_feats_len, hiden_feats_len)
        self.hidden_activation = nn.LeakyReLU()
        self.dropout = nn.Dropout(p_dropout)
        self.n_hidden = n_hidden
        if self.n_hidden > 1:
            self.hidden_layers = nn.ModuleList([self.hiddenlayer for _ in range(n_hidden-1)])
            self.hidden_activation_layers = nn.ModuleList([self.hidden_activation for _ in range(n_hidden-1)])
            self.dropouts = nn.ModuleList([self.dropout for _ in range(n_hidden-1)])
        self.hidden_to_output = nn.Linear(hiden_feats_len, n_outputs)

    def forward(self, node_mat, adj_mat):
        node_feats = self.init_transorm(node_mat)
        for conv_layer in self.conv_layers:
            node_feats = conv_layer(node_feats, adj_mat)
        pooled_node_feats = self.pooling(node_feats)
        pooled_node_feats = self.pooled_activation(pooled_node_feats)

        pooled_node_feats = self.pooled_to_hidden(pooled_node_feats)
        pooled_node_feats = self.hidden_activation(pooled_node_feats)
        pooled_node_feats = self.dropout(pooled_node_feats)

        if self.n_hidden > 1:
            for hidden_layer, hidden_activation, dropout in zip(self.hidden_layers, self.hidden_activation_layers, self.dropouts):
                pooled_node_feats = hidden_layer(pooled_node_feats)
                pooled_node_feats = hidden_activation(pooled_node_feats)
                pooled_node_feats = dropout(pooled_node_feats)
        output = self.hidden_to_output(pooled_node_feats)
        return output

In [6]:
class Standardizer:
    def __init__(self, x):
        self.mean = torch.mean(x)
        self.std = torch.std(x)
    def standarize(self, x):
        z = (x - self.mean)/self.std
        return z
    def restore(self, z):
        x = z * self.std + self.mean
        return x
    def state(self):
        return {'mean': self.mean, 'std': self.std}
        
    def load(self, state):
        self.mean = state['mean']
        self.std = state['std']

In [7]:
from sklearn.metrics import mean_absolute_error
def train_model(epoch, model, train_loader, optimizer, loss_func, standardizer,max_atoms,node_vec_len, device):
    avg_loss = 0
    avg_mea = 0
    count = 0
    model.train()
    for i, dataset in enumerate(train_loader):
        node_mat = dataset[0][0]
        adj_mat = dataset[0][1]
        output = dataset[1]

        first_dim = int(torch.numel(node_mat)/(max_atoms*node_vec_len))
        node_mat = node_mat.view(first_dim, max_atoms, node_vec_len)
        adj_mat = adj_mat.view(first_dim, max_atoms, max_atoms)

        node_mat = standardizer.standarize(node_mat)
        adj_mat = standardizer.standarize(adj_mat)

        output_std = standardizer.standarize(output)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        nn_preds = model(node_mat.to(device), adj_mat.to(device))
        loss = loss_func(nn_preds, output_std.to(device))
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        prediction = standardizer.restore(nn_preds.detach().cpu())
        mae = mean_absolute_error(prediction, output) 
        avg_mae += mae
        avg_loss += loss
        count += 1
    avg_loss /= count
    avg_mae /= count
    print(f"Epoch: {epoch}, Loss: {avg_loss}, MAE: {avg_mae}")
    return avg_loss, avg_mae

In [None]:
np.random.seed(0)
torch.manual.seed(0)

max_atoms = 200
node_vec_len = 60
train_size = 0.7
batch_size = 32
hidden_nodes = 60
n_conv_layers = 4
n_hidden_layers = 2
learning_rate = 0.01
n_epochs = 50

from pathlib import Path
from torch.utils.data import SubsetRandomSampler
from torch.utils.data import DataLoader
main_path = Path(__file__).resolve().parent
dataset_path = main_path / 'data' / 'solubility.csv'
dataset = GraphData(dataset_path, node_vec_len, max_atoms)
dataset_indices = np.arange(0,len(dataset), 1)
train_size = int(np.round(train_size*len(dataset)))
test_size = len(dataset) - train_size
train_indices = np.random.choice(dataset_indices, train_size, replace=False)
test_indices = np.array(list(set(dataset_indices) - set(train_indices)))
train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)
train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler, collate_fn = collate_graph_dataset)
test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler, collate_fn = collate_graph_dataset)


model = ChemGCN(node_vec_len,hidden_nodes, hidden_nodes, n_conv_layers, n_hidden_layers, n_outputs=1, p_dropout=0.1)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_func = torch.nn.MSELoss()
outputs = [dataset[i][1] for i in range(len(dataset))]
standardizer = Standardizer(torch.Tensor(outputs))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

loss = []
mae = []
epoch = []

for i in range(n_epochs):
    l, m = train_model(i, model, train_loader, optimizer, loss_func, standardizer, max_atoms, node_vec_len, device)
    loss.append(l)
    mae.append(m)
    epoch.append(i)

with torch.no_grad():
    test_loss, test_mae = model(model, test_loader, loss_func, standardizer, max_atoms, node_vec_len, device)
    print(f'Training loss: {loss[-1]:.4f}, Training MAE: {mae[-1]:.4f}')
    print(f"Test Loss: {test_loss:.4f}, Test MAE: {test_mae:.4f}")  