In [None]:
# import packages
# general tools
import numpy as np

import pandas as pd

import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score
from sklearn.metrics import mean_squared_error, r2_score

# RDkit
from rdkit import Chem
from rdkit.Chem.rdmolops import GetAdjacencyMatrix
# Pytorch and Pytorch Geometric
import torch
from torch.nn import MSELoss
from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader


There are some Warnings informing about future scipy changes.

We will suppress them here as they have no influence on the results.

In [None]:
# import warnings filter
from warnings import simplefilter
# ignore all future warnings
simplefilter(action='ignore', category=FutureWarning)

### Select processor type

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
    print('The code uses a GPU...')
else:
    device = torch.device('cpu')
    print('The code uses a CPU...')

### Read data

Here we use classification data on blood brain barrier (BBB) penetration.

In [None]:
tmp = pd.read_table('B3DB_regression.tsv',sep='\t')

table = tmp.loc[:,('compound_name', 'IUPAC_name', 'SMILES', 'logBB')]
table = table.dropna(subset="logBB")
table.reset_index(drop=True, inplace=True)

task = 'regression'

table

### Function for one-hot encoding

In [None]:
def one_hot_encoding(x, permitted_list):
    """
    Maps input elements x which are not in the permitted list to the last element
    of the permitted list.
    """
    if x not in permitted_list:
        x = permitted_list[-1]
    binary_encoding = [int(boolean_value) for boolean_value in list(map(lambda s: x == s, permitted_list))]
    return binary_encoding

### Function for generating atom features

The features include

- Atom element
- Number of heavy atom neighbors
- Formal charge
- Hybridization
- Is in ring
- Is part of aromatic bond
- Atomic mass
- VdW radius
- covalend radius
- optional: chirality
- optional: number of bonded hydrogens

In [None]:
def get_atom_features(atom, 
                      use_chirality = True, 
                      hydrogens_implicit = True):
    """
    Takes an RDKit atom object as input and gives a 1d-numpy array of atom features as output.
    """
    # define list of permitted atoms
    
    #permitted_list_of_atoms =  ['C','N','O','S','F','Si','P','Cl','Br','Mg','Na','Ca','Fe','As','Al','I', 'B','V','K','Tl','Yb','Sb','Sn','Ag','Pd','Co','Se','Ti','Zn', 'Li','Ge','Cu','Au','Ni','Cd','In','Mn','Zr','Cr','Pt','Hg','Pb','Unknown']
    permitted_list_of_atoms =  ['C','N','O','S','F','Si','P','Cl','Br','I', 'B', 'Unknown']
    
    if hydrogens_implicit == False:
        permitted_list_of_atoms = ['H'] + permitted_list_of_atoms
    
    # compute atom features
    
    atom_type_enc = one_hot_encoding(str(atom.GetSymbol()), permitted_list_of_atoms)
    
    n_heavy_neighbors_enc = one_hot_encoding(int(atom.GetDegree()), [0, 1, 2, 3, 4, "MoreThanFour"])
    
    formal_charge_enc = one_hot_encoding(int(atom.GetFormalCharge()), [-3, -2, -1, 0, 1, 2, 3, "Extreme"])
    
    hybridisation_type_enc = one_hot_encoding(str(atom.GetHybridization()), ["S", "SP", "SP2", "SP3", "SP3D", "SP3D2", "OTHER"])
    
    is_in_a_ring_enc = [int(atom.IsInRing())]
    
    is_aromatic_enc = [int(atom.GetIsAromatic())]
    
    atomic_mass_scaled = [float((atom.GetMass() - 10.812)/116.092)]
    
    vdw_radius_scaled = [float((Chem.GetPeriodicTable().GetRvdw(atom.GetAtomicNum()) - 1.5)/0.6)]
    
    covalent_radius_scaled = [float((Chem.GetPeriodicTable().GetRcovalent(atom.GetAtomicNum()) - 0.64)/0.76)]
    atom_feature_vector = atom_type_enc + n_heavy_neighbors_enc + formal_charge_enc + hybridisation_type_enc + is_in_a_ring_enc + is_aromatic_enc + atomic_mass_scaled + vdw_radius_scaled + covalent_radius_scaled
                                    
    if use_chirality == True:
        chirality_type_enc = one_hot_encoding(str(atom.GetChiralTag()), ["CHI_UNSPECIFIED", "CHI_TETRAHEDRAL_CW", "CHI_TETRAHEDRAL_CCW", "CHI_OTHER"])
        atom_feature_vector += chirality_type_enc
    
    if hydrogens_implicit == True:
        n_hydrogens_enc = one_hot_encoding(int(atom.GetTotalNumHs()), [0, 1, 2, 3, 4, "MoreThanFour"])
        atom_feature_vector += n_hydrogens_enc
    return np.array(atom_feature_vector)

### Function for generating bond features

Features include

- Bond type
- Is conjugated
- Is part of ring
- Optional: Stereochemistry

In [None]:
def get_bond_features(bond, 
                      use_stereochemistry = True):
    """
    Takes an RDKit bond object as input and gives a 1d-numpy array of bond features as output.
    """
    permitted_list_of_bond_types = [Chem.rdchem.BondType.SINGLE, Chem.rdchem.BondType.DOUBLE, Chem.rdchem.BondType.TRIPLE, Chem.rdchem.BondType.AROMATIC]
    bond_type_enc = one_hot_encoding(bond.GetBondType(), permitted_list_of_bond_types)
    
    bond_is_conj_enc = [int(bond.GetIsConjugated())]
    
    bond_is_in_ring_enc = [int(bond.IsInRing())]
    
    bond_feature_vector = bond_type_enc + bond_is_conj_enc + bond_is_in_ring_enc
    
    if use_stereochemistry == True:
        stereo_type_enc = one_hot_encoding(str(bond.GetStereo()), ["STEREOZ", "STEREOE", "STEREOANY", "STEREONONE"])
        bond_feature_vector += stereo_type_enc
    return np.array(bond_feature_vector)

### Function for generating graph structure for molecule

In [None]:
def create_graph_data_from_smiles(smiles, y):
    """
    Inputs:
    
    smiles:  SMILES string of molecule
    
    Outputs:
    
    G:       torch_geometric.data.Data object which represents labeled molecular graph
    
    """
    
        
    # convert SMILES to RDKit mol object
    mol = Chem.MolFromSmiles(smiles)

    # get feature dimensions
    n_nodes = mol.GetNumAtoms()
    n_edges = 2*mol.GetNumBonds()
    unrelated_smiles = "O=O"
    unrelated_mol = Chem.MolFromSmiles(unrelated_smiles)
    n_node_features = len(get_atom_features(unrelated_mol.GetAtomWithIdx(0)))
    n_edge_features = len(get_bond_features(unrelated_mol.GetBondBetweenAtoms(0,1)))
    
    # construct node feature matrix X of shape (n_nodes, n_node_features)
    X = np.zeros((n_nodes, n_node_features))
    for atom in mol.GetAtoms():
        X[atom.GetIdx(), :] = get_atom_features(atom)
    X = torch.tensor(X, dtype = torch.float)
        
    # construct edge index array E of shape (2, n_edges)
    (rows, cols) = np.nonzero(GetAdjacencyMatrix(mol))
    torch_rows = torch.from_numpy(rows.astype(np.int64)).to(torch.long)
    torch_cols = torch.from_numpy(cols.astype(np.int64)).to(torch.long)
    E = torch.stack([torch_rows, torch_cols], dim = 0)
        
    # construct edge feature array EF of shape (n_edges, n_edge_features)
    EF = np.zeros((n_edges, n_edge_features))
    for (k, (i,j)) in enumerate(zip(rows, cols)):
        EF[k] = get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j))) 
    EF = torch.tensor(EF, dtype = torch.float)

    # construct label tensor
    y_tensor = torch.tensor(np.array([y]), dtype = torch.float)

    # construct Pytorch Geometric data object and append to data list
    G = Data(x = X, edge_index = E, edge_attr = EF, y=y_tensor)
    
    return G

### Create data list of graphs

In [None]:
# create molecular graph objects from list of SMILES x_smiles and add to table
data_list = []
for i in table.index:
    graph_data = create_graph_data_from_smiles(table.loc[i,'SMILES'], table.loc[i, 'logBB'])
    print(graph_data)
    data_list.append(graph_data)

n_node_features = data_list[0]['x'][0].shape[0]
print(n_node_features)
n_edge_features = data_list[0]['edge_attr'][0].shape[0]
print(n_edge_features)



In [None]:

print(data_list[0]['x'][0])
print(data_list[0]['edge_index'])
print(data_list[0]['edge_attr'])
# create dataloader for training
#dataloader = DataLoader(dataset = data_list, batch_size = 2**7)


### Split data into training and test set

Here, we use random splitting

In [None]:
# train test split
import random

debug_mode = False
if debug_mode:
    fo1 = open("debug1.txt", "w")
    for i in data_list:
        fo1.write('{}\n'.format(i['x']))
        fo1.write('{}\n'.format(i['edge_index']))
        fo1.write('{}\n'.format(i['edge_attr']))
        fo1.write('{}\n'.format(i['y']))
    fo1.close()

# shuffle data
random.shuffle(data_list)

if debug_mode:
    fo1 = open("debug2.txt", "w")
    for i in data_list:
        fo1.write('{}\n'.format(i['x']))
        fo1.write('{}\n'.format(i['edge_index']))
        fo1.write('{}\n'.format(i['edge_attr']))
        fo1.write('{}\n'.format(i['y']))
    fo1.close()

num_data = len(data_list)
num_train = int(0.6*num_data)
num_valid = int(0.2*num_data)
num_test = int(num_data - num_train - num_valid)

train_data = data_list[:num_train]
valid_data = data_list[num_train:num_train+num_valid]
test_data = data_list[num_train+num_valid:]


print(num_data, len(train_data), len(valid_data), len(test_data))
#print(train_data)

### Generate Datasets for pytorch

We generate three instances (training, validation and test set) of a class Data that is derived from the pytorch class Dataset.

Then, we initiate three dataloaders for the three instances that will be used to provide data to the training, validation and test phases.

In [None]:
   
batch_size = 128

# Instantiate training, validation and test data
train_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(dataset=valid_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)
    


### Class for Message pass

Message pass is here performed using GCN layers where neighboring node features are first transformed by a weight matrix, normalized by their degree, and finally summed up. Lastly, we apply the bias vector to the aggregated output. This formula can be divided into the following steps:

1. Initial embedding of node features

2. Add self-loops to the adjacency matrix.

3. Linearly transform node feature matrix.

4. Compute normalization coefficients.

5. Normalize node features in 

6. Sum up neighboring node features ("add" aggregation).

7. Apply a final bias vector.

8. Readout: Generate graph embedding from node embeddings

9. FCNN for generating regression output

$$
\mathbf{x}_i^{(k)} = \sum_{j \in \mathcal{N}(i) \cup \{ i \}} \frac{1}{\sqrt{\deg(i)} \cdot \sqrt{\deg(j)}} \cdot \left( \mathbf{W}^{\top} \cdot \mathbf{x}_j^{(k-1)} \right) + \mathbf{b}
$$

In [None]:
#from torch_geometric.nn import GCNConv
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import add_self_loops, degree
from torch_geometric.nn.pool import global_mean_pool, global_add_pool
import torch.nn.functional as F
from torch.nn import Linear, Parameter

output_size = 128
radius = 2


class GNN_MPNN(MessagePassing):
    def __init__(self, in_channels, out_channels, radius):
        super().__init__(aggr='add')  # "Add" aggregation (Step 5).
        self.embedding = Linear(in_channels, out_channels, bias=True)
        self.lin = Linear(out_channels, out_channels, bias=False)
        self.bias = Parameter(torch.Tensor(out_channels))
        self.lin_readout = Linear(out_channels, 1, bias=True)
        self.radius = radius

        self.reset_parameters()

    def reset_parameters(self):
        self.embedding.reset_parameters()
        self.lin.reset_parameters()
        self.lin_readout.reset_parameters()
        self.bias.data.zero_()

    def forward(self, x, edge_index, batch):
        # x has shape [N, in_channels]
        # edge_index has shape [2, E]
        # Step 1: Initial embedding
        x = self.embedding(x)

        # Step 2: Add self-loops to the adjacency matrix.
        #edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))

        for r in range(self.radius):
            # Step 3: Linearly transform node feature matrix.
            x = self.lin(x)
            #x = F.relu(x)

            # Step 4: Compute normalization.
            row, col = edge_index
            deg = degree(col, x.size(0), dtype=x.dtype)
            deg_inv_sqrt = deg.pow(-0.5)
            deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
            norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

            # Step 5-6: Start propagating messages.
            x = self.propagate(edge_index, x=x, norm=norm)

            # Step 7: Apply a final bias vector.
            x += self.bias

        # Step 8: Readout
        xr = global_mean_pool(x, batch)
        
        # Step 9: Regression
        out = self.lin_readout(xr)

        return out

    def message(self, x_j, norm):
        # x_j has shape [E, out_channels]

        # Step 5: Normalize node features.
        return norm.view(-1, 1) * x_j



### Function for evaluating data using dataloaders

In [None]:
def evalute_data(model, loader):
    with torch.no_grad():
        data_eval = []
        for (k, batch) in enumerate(loader):
            output = model(batch.x, batch.edge_index, batch.batch)
            for i in output[:,0]:
                data_eval.append(i)
    return data_eval


### Training

In [None]:
num_epochs = 500
loss_values = []
loss_epoch_train = []
loss_epoch_valid = []

model = GNN_MPNN(n_node_features, output_size, radius).to(device)
# define loss function
loss_function = MSELoss()
# define optimiser
optimiser = torch.optim.Adam(model.parameters(), lr = 1e-3)

# set model to training mode
model.train()

# loop over training epochs
for epoch in range(num_epochs):
    loss_epoch = 0
    # loop over minibatches for training
    for (k, batch) in enumerate(train_dataloader):
        # set past gradient to zero
        optimiser.zero_grad()
        # compute current value of loss function via forward pass
        output = model(batch.x, batch.edge_index, batch.batch)
        loss_function_value = loss_function(output[:,0], batch.y)
        
        # loss value for epoch
        loss_epoch += batch.x.shape[0]*loss_function_value.item()

        # compute current gradient via backward pass
        loss_function_value.backward()
        # update model weights using gradient and optimisation method
        optimiser.step()

    
    # save loss for each epoch    
    loss_epoch_train.append([epoch, loss_epoch/len(train_data)])
    # validation loss
    with torch.no_grad():
        loss_epoch_v = 0
        for (k, batch) in enumerate(valid_dataloader):
            output = model(batch.x, batch.edge_index, batch.batch)
            loss_function_value = loss_function(output[:,0], batch.y)
            loss_epoch_v += batch.x.shape[0]*loss_function_value.item()
        loss_epoch_valid.append([epoch, loss_epoch_v])
    
    print("% 5d % .3f % .3f" % (epoch, loss_epoch/len(train_data), loss_epoch_v/len(valid_data)))

torch.save(model, "model_saved.pt")

print("Training Complete")

#### Plot training loss as function of number of batches (here: One epoch = 10 batches)


In [None]:
fig, ax = plt.subplots(figsize=(15,5))
#plt.plot(step, np.array(loss_values))
loss_epoch_train_np = np.asanyarray(loss_epoch_train)
plt.plot(loss_epoch_train_np[:,0], loss_epoch_train_np[:,1], "-", c="r")
plt.title("Step-wise Loss")
plt.xlabel("Batches")
plt.ylabel("Loss")
plt.show()

#### Correlation between experimental and predicted logBB for training set

In [None]:
model = torch.load("model_saved.pt")

with torch.no_grad():
    pred_train = evalute_data(model, train_dataloader)
    ground_truth = [i.y[0] for i in train_data]
#print(pred_train)
#print(ground_truth)


# Mean squared error
print("Mean squared error: %.2f" % mean_squared_error(ground_truth, pred_train))
# The coefficient of determination: 1 is perfect prediction
print("Coefficient of determination: %.2f" % r2_score(ground_truth, pred_train))

# Plot outputs
plt.figure(figsize=(7, 7))
plt.scatter(ground_truth, pred_train, color="black", s=10)
plt.plot([-3.0, 2.0], [-3.0, 2.0], color="black", linewidth=3)

plt.xlabel("ground truth")
plt.ylabel("predicted")

plt.show()