### Importing ANI Tools and Viewing Data

In [2]:
import pyanitools as pya
import pandas as pd
import numpy as np
import torch
from sys import exit

# Set the HDF5 file containing the data
hdf5file = 'ani_gdb_s01.h5'

# Construct the data loader class
adl = pya.anidataloader(hdf5file)

p = []
x = []
e = []
s = []
sm = []
i = 0

for data in adl:
    if (i >= 10):
        break
    # Extract the data
    p.append(data['path'])
    x.append(data['coordinates'])
    e.append(data['energies'])
    s.append(data['species'])
    sm.append(data['smiles'])
    i+=1
    



# Closes the H5 data file
adl.cleanup()


In [3]:
from pyanitools import anidataloader
# data = anidataloader("../../ANI1_dataset/ANI-1_release/ani_gdb_s07.h5")
data = anidataloader("ani_gdb_s01.h5")
data_iter = data.__iter__()

In [4]:
mols = next(data_iter)
# Extract the data
P = mols['path']
X = mols['coordinates']
E = mols['energies']
S = mols['species']
sm = mols['smiles']

# Print the data
print("Path:   ", P)
print("  Smiles:      ","".join(sm))
print("  Symbols:     ", S)
print("  Coordinates: ", X.shape)
print("  Energies:    ", E.shape, "\n")
print("  Energies: ", E)


Path:    /gdb11_s01/gdb11_s01-0
  Smiles:       [H]C([H])([H])[H]
  Symbols:      ['C', 'H', 'H', 'H', 'H']
  Coordinates:  (5400, 5, 3)
  Energies:     (5400,) 

  Energies:  [-40.48058817 -40.48311923 -40.48473545 ... -40.4961279  -40.45599721
 -40.46479283]


In [5]:
data_iter = data.__iter__()
count = 0
count_conf = 0
for mol in data_iter:
    count += 1
    count_conf += len(mol['energies'])
print(count)
print(count_conf)

3
10800


### Defining AEV Calculation

In [6]:
import numpy as np


def calc_f_C(Rij, RC):
    f_C_value = 0.5 * np.cos(np.pi * Rij / RC) + 0.5
    indicator = ((Rij <= RC) & (Rij != 0)).astype(float) # Make f_C(0)=0 to make sure the sum in distance conversion function and radial conversion function can run with j=i
    return f_C_value * indicator

def radial_component(Rijs, eta, Rs, RC=5.2):
    # Rijs is a 1d array, all other parameters are scalars
    f_C_values = calc_f_C(Rijs, RC)
    individual_components = np.exp(-eta * (Rijs - Rs) ** 2) * f_C_values
    return np.sum(individual_components)

def angular_component(Rij_vectors, Rik_vectors, zeta, theta_s, eta, Rs, RC=3.5):
    # Rij_vectors and Rik_vectors are 2d arrays with shape (n_atoms, 3), all other parameters are scalars
    # calculate theta_ijk values from vector operations
    dot_products = Rij_vectors.dot(Rik_vectors.T)
    Rij_norms = np.linalg.norm(Rij_vectors, axis=-1)
    Rik_norms = np.linalg.norm(Rik_vectors, axis=-1)
    norms = Rij_norms.reshape((-1, 1)).dot(Rik_norms.reshape((1, -1)))
    cos_values = np.clip(dot_products / (norms + 1e-8), -1, 1)
    theta_ijks = np.arccos(cos_values)
    theta_ijk_filter = (theta_ijks != 0).astype(float)
    mean_dists = (Rij_norms.reshape((-1, 1)) + Rik_norms.reshape((1, -1))) / 2
    f_C_values_Rij = calc_f_C(Rij_norms, RC)
    f_C_values_Rik = calc_f_C(Rik_norms, RC)
    f_C_values = f_C_values_Rij.reshape((-1, 1)).dot(f_C_values_Rik.reshape((1, -1)))
    individual_components = (1 + np.cos(theta_ijks - theta_s)) ** zeta * np.exp(-eta * (mean_dists - Rs) ** 2) * f_C_values * theta_ijk_filter
    return 2 ** (1 - zeta) * np.sum(individual_components)

def calc_aev(atom_types, coords, i_index):
    # atom_types are np.array of ints
    relative_coordinates = coords - coords[i_index]
    nearby_atom_indicator = np.linalg.norm(relative_coordinates, axis=-1) < 5.3
    relative_coordinates = relative_coordinates[nearby_atom_indicator]
    atom_types = atom_types[nearby_atom_indicator]
    radial_aev = np.array([radial_component(np.linalg.norm(relative_coordinates[atom_types == atom], axis=-1), eta, Rs) \
                           for atom in [0, 1, 2, 3] for eta in [16] \
                           for Rs in [0.900000,1.168750,1.437500,1.706250,1.975000,2.243750,2.51250,2.781250,3.050000,\
                                   3.318750,3.587500,3.856250,4.125000,4.39375,4.662500,4.931250]])
    angular_aev = np.array([angular_component(relative_coordinates[atom_types == atom_j], relative_coordinates[atom_types == atom_k],\
                                             zeta, theta_s, eta, Rs) \
                            for atom_j in [0, 1, 2, 3] for atom_k in range(atom_j, 4) for zeta in [32] \
                            for theta_s in [0.19634954,0.58904862,0.9817477,1.3744468,1.7671459,2.1598449,2.552544,2.945243]\
                            for eta in [8] for Rs in [0.900000,1.550000,2.200000,2.850000]])
#     print(len(radial_aev), len(angular_aev))
    return np.concatenate([radial_aev, angular_aev])

        

### Installing Torchani and AEVComputer

In [None]:
!pip install torchani

In [11]:
import torchani
from torchani import AEVComputer
Rcr = 5.2
EtaR = torch.tensor([16], dtype=torch.float)
ShfR = torch.tensor([0.900000,1.168750,1.437500,1.706250,1.975000,2.243750,2.51250,2.781250,3.050000,3.318750,3.587500,3.856250,4.125000,4.39375,4.662500,4.931250])
Rca = 3.5
EtaA = torch.tensor([8], dtype=torch.float)
ShfA = torch.tensor([0.900000,1.550000,2.200000,2.850000], dtype=torch.float)
ShfZ = torch.tensor([0.19634954,0.58904862,0.9817477,1.3744468,1.7671459,2.1598449,2.552544,2.945243]) 
Zeta = torch.tensor([32], dtype=torch.float)
species_order = ['H', 'C', 'N', 'O']
num_species = len(species_order)

aev_computer = torchani.AEVComputer(Rcr, Rca, EtaR, ShfR, EtaA, ShfA, ShfZ, Zeta, num_species)


In [12]:
aev_converter = torchani.AEVComputer(Rcr, Rca, EtaR, ShfR, EtaA, Zeta, ShfA, ShfZ, num_species)
mapping = {"H": 0, "C" : 1, "N": 2, "O": 3}
species = np.array([mapping[atom] for atom in S])
species = np.tile(species, (X.shape[0], 1))
species = torch.tensor(species)
X = torch.tensor(X)

In [13]:
aev_output = aev_computer((species, X)) #SPecies: (Number, Atoms) A in [0, 1, 2, 3] Coords: (N, A, 3) Output : (N, A, 384)
aev_output[1].shape

torch.Size([5400, 5, 384])

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


### Defining MLP Model with Torch

In [15]:
import torch
from torch import nn

class ANI(nn.Module):
    def __init__(self):
        super().__init__()
        self.sub_nets = nn.ModuleDict({
            "C": ANI_sub(),
            "H": ANI_sub(),
            "N": ANI_sub(),
            "O": ANI_sub()})

        print(self.sub_nets)
    def forward(self, aevs, atom_types):
        atomic_energies = [self.sub_nets[atom_types][i][aevs[i]] for i in range(len(aevs))]
        
        total_energies = torch.sum(atomic_energies,dim=...)
        return total_energies

class ANI_sub(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(384, 128),
            nn.GELU(),
            nn.Linear(128, 128),
            nn.GELU(),
            nn.Linear(128, 64),
            nn.GELU(),
            nn.Linear(64, 1)
        )

    def forward(self, aev):
        atomic_energy = self.layers[aev]
        return atomic_energy


In [20]:
from torchsummary import summary
model = ANI()
print(model)
#summary(model, [(1, 384,), elements])

ModuleDict(
  (C): ANI_sub(
    (layers): Sequential(
      (0): Linear(in_features=384, out_features=128, bias=True)
      (1): GELU(approximate='none')
      (2): Linear(in_features=128, out_features=128, bias=True)
      (3): GELU(approximate='none')
      (4): Linear(in_features=128, out_features=64, bias=True)
      (5): GELU(approximate='none')
      (6): Linear(in_features=64, out_features=1, bias=True)
    )
  )
  (H): ANI_sub(
    (layers): Sequential(
      (0): Linear(in_features=384, out_features=128, bias=True)
      (1): GELU(approximate='none')
      (2): Linear(in_features=128, out_features=128, bias=True)
      (3): GELU(approximate='none')
      (4): Linear(in_features=128, out_features=64, bias=True)
      (5): GELU(approximate='none')
      (6): Linear(in_features=64, out_features=1, bias=True)
    )
  )
  (N): ANI_sub(
    (layers): Sequential(
      (0): Linear(in_features=384, out_features=128, bias=True)
      (1): GELU(approximate='none')
      (2): Linear(in_f

### Defining Torch Trainer Class

In [17]:
from functools import wraps
from time import time

def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        print('func:%r  took: %2.4f sec' % (f.__name__,  te-ts))
        return result
    return wrap

In [18]:
from torch.optim import SGD, Adam
import torch.nn.functional as F
import random
from tqdm import tqdm
import math
from sklearn.model_selection import train_test_split, KFold

def create_chunks(complete_list, chunk_size=None, num_chunks=None):
    '''
    Cut a list into multiple chunks, each having chunk_size (the last chunk might be less than chunk_size) or having a total of num_chunk chunks
    '''
    chunks = []
    if num_chunks is None:
        num_chunks = math.ceil(len(complete_list) / chunk_size)
    elif chunk_size is None:
        chunk_size = math.ceil(len(complete_list) / num_chunks)
    for i in range(num_chunks):
        chunks.append(complete_list[i * chunk_size: (i + 1) * chunk_size])
    return chunks

class Trainer():
    def __init__(self, model, optimizer_type, learning_rate, epoch, batch_size, input_transform=lambda x: x,):
        """ The class for training the model
        model: nn.Module
            A pytorch model
        optimizer_type: 'adam' or 'sgd'
        learning_rate: float
        epoch: int
        batch_size: int
        input_transform: func
            transforming input. Can do reshape here
        """
        self.model = model
        if optimizer_type == "sgd":
            self.optimizer = SGD(model.parameters(), learning_rate,momentum=0.9)
        elif optimizer_type == "adam":
            self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
            
        self.epoch = epoch
        self.batch_size = batch_size
        self.input_transform = input_transform
        self.learning_rate = learning_rate


    @timing
    def train(self, inputs, outputs, val_inputs, val_outputs,early_stop=False,l2=False,silent=False):
        """ train self.model with specified arguments
        inputs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        val_nputs: np.array, The shape of input_transform(val_input) should be (ndata,nfeatures)
        val_outputs: np.array shape (ndata,)
        early_stop: bool
        l2: bool
        silent: bool. Controls whether or not to print the train and val error during training
        
        @return
        a dictionary of arrays with train and val losses and accuracies
        """
        ### convert data to tensor of correct shape and type here ###
        inputs = torch.tensor(inputs, dtype=torch.float32)
        outputs = torch.tensor(outputs, dtype=torch.long)
        val_inputs = torch.tensor(val_inputs, dtype=torch.float32)
        val_outputs = torch.tensor(val_outputs, dtype=torch.long)
        
        losses = []
        accuracies = []
        val_losses = []
        val_accuracies = []
        weights = self.model.state_dict()
        lowest_val_loss = np.inf
        
        for n_epoch in tqdm(range(self.epoch), leave=False):
            self.model.train()
            batch_indices = list(range(inputs.shape[0]))
            random.shuffle(batch_indices)
            batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
            epoch_loss = 0
            epoch_acc = 0
            for batch in batch_indices:
                batch_importance = len(batch) / len(outputs)
                batch_input = inputs[batch]
                batch_output = outputs[batch]
                ### make prediction and compute loss with loss function of your choice on this batch ###
                batch_predictions = self.model.forward(batch_input)
                loss_func = nn.CrossEntropyLoss()
                loss = loss_func(batch_predictions, batch_output)
                if l2:
                    ### Compute the loss with L2 regularization ###
                    self.optimizer = torch.optim.Adam(model.parameters(), lr = self.learning_rate, weight_decay = 1e-5)
                    loss = loss_func(batch_predictions, batch_output)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                ### Compute epoch_loss and epoch_acc
            epoch_loss, epoch_acc = self.evaluate(inputs, outputs)  
            val_loss, val_acc = self.evaluate(val_inputs, val_outputs, print_acc=False)
            if n_epoch % 10 ==0 and not silent: 
                print("Epoch %d/%d - Loss: %.3f - Acc: %.3f" % (n_epoch + 1, self.epoch, epoch_loss, epoch_acc))
                print("              Val_loss: %.3f - Val_acc: %.3f" % (val_loss, val_acc))
            losses.append(epoch_loss.detach().numpy())
            accuracies.append(epoch_acc)
            val_losses.append(val_loss.detach().numpy())
            val_accuracies.append(val_acc)
            if early_stop:
                if val_loss < lowest_val_loss:
                    lowest_val_loss = val_loss
                    weights = self.model.state_dict()

        if early_stop:
            self.model.load_state_dict(weights)    

        return {"losses": losses, "accuracies": accuracies, "val_losses": val_losses, "val_accuracies": val_accuracies}
        
    def evaluate(self, inputs, outputs, print_acc=False):
        """ evaluate model on provided input and output
        inputs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        print_acc: bool
        
        @return
        losses: float
        acc: float
        """

        inputs = torch.tensor(inputs, dtype=torch.float32)
        outputs = torch.tensor(outputs, dtype=torch.long)

        loss_func = nn.CrossEntropyLoss()
        
        pred = self.model.forward(inputs)

        losses = loss_func(pred, outputs)
        #print("pred = ", pred)
        #print("truth = " ,outputs)
        
        sum = 0
        for i in range(len(outputs)):
            if outputs[i] == torch.argmax(pred[i]):
                sum += 1
        acc = sum / len(outputs)
        if print_acc:
            print("Accuracy: %.3f" % acc)
        return losses, acc