In [1]:
import numpy as np
import torch
from torch import nn
import torchani
import pyanitools as pya
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, KFold

# CHEM 277B - Final Project #

For this project, we will develop a supervised learning artificial neural network (ANN) that predicts the energy of a molecular system given the coordinates of the molecules in the system. While solving the Schrodinger equation is computationally expensive, we will be using machine learning to predict the system's energy without solving the equation computationally. 

## Understanding the ANI-1 Dataset ##

The ANI-1 dataset is a collection of 57,000 organic molecules with up to 8 heavy atoms. Let's look at the first subset of the dataset involving only one heavy atom. 

In [2]:
# Load ANI-1 dataset with 1 heavy atom
ani_data = pya.anidataloader(
    '/global/scratch/users/joshuablomgren/ANI-1_release/ani_gdb_s01.h5')

# Loop over each data point in the dataset
for data in ani_data:
    # Extract information from the dataset
    path = data['path']
    coordinates = data['coordinates']
    energies = data['energies']
    species = data['species']
    smiles = data['smiles']

    # Print the information for this data point
    print("Path:      ", path)
    print("  SMILES:  ", "".join(smiles))
    print("  Symbols: ", species)
    print("  Coordinates: ", coordinates.shape)
    print("  Energies:    ", energies.shape, "\n")

# Close the H5 data file
ani_data.cleanup()

Path:       /gdb11_s01/gdb11_s01-0
  SMILES:   [H]C([H])([H])[H]
  Symbols:  ['C', 'H', 'H', 'H', 'H']
  Coordinates:  (5400, 5, 3)
  Energies:     (5400,) 

Path:       /gdb11_s01/gdb11_s01-1
  SMILES:   [H]N([H])[H]
  Symbols:  ['N', 'H', 'H', 'H']
  Coordinates:  (3600, 4, 3)
  Energies:     (3600,) 

Path:       /gdb11_s01/gdb11_s01-2
  SMILES:   [H]O[H]
  Symbols:  ['O', 'H', 'H']
  Coordinates:  (1800, 3, 3)
  Energies:     (1800,) 



From the information printed above, we can see from the shape of the coordinates that each molecule has several thousand different conformations given: (number of confirmations, number of atoms, 3-dimensions). For each confirmation, the energy of the system is also given, hence the size of energies is the same as the number of confirmations.

## Subset 5 - 5 Heavy Atoms ##

In [3]:
data = pya.anidataloader('/global/scratch/users/joshuablomgren/ANI-1_release/ani_gdb_s05.h5')

In [4]:
data_iter = data.__iter__()
count = 0
count_conformations = 0

# iterate through all the molecules in data subset 5
for mol in data_iter:
    count += 1
    count_conformations += len(mol['energies'])

print(count)
print(count_conformations)

267
1813151


In subset 5 of the ANI-1 dataset, there are **267** molecules that each have 5 heavy atoms. There are a total of **1,813,151** conformations in this subset, each of the molecules having a different number of given conformations.

In [5]:
data_iter = data.__iter__()
mols = next(data_iter)
# Extract the data
P = mols['path']
X = mols['coordinates']
E = mols['energies']
S = mols['species']
sm = mols['smiles']

# Print the data
print("Path:   ", P)
print("  Smiles:      ","".join(sm))
print("  Symbols:     ", S)
print("  Coordinates: ", X.shape)
print("  Energies:    ", E.shape, "\n")

Path:    /gdb11_s05/gdb11_s05-0
  Smiles:       [H]N([H])C(C([H])([H])[H])(C([H])([H])[H])C([H])([H])[H]
  Symbols:      ['C', 'C', 'C', 'C', 'N', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H']
  Coordinates:  (10080, 16, 3)
  Energies:     (10080,) 



<br>

## Atomic Environment Vectors (AEV) ##

AEVs, Atomic Environment Vectors, are feature vectors that encode the local chemical environment around an atom in a molecule. We will be using AEVs to transform the initial atomic configurations using symmetry functions before putting them in our neural network. In order to train a neural network for predicting molecular properties, it is important to ensure that the network is invariant to translations, rotations, and permutations of the atoms in a molecule, which can be achieved through computing AEVs. 

The `torchani` library provides a module called AEV Computer that computes the AEVs (Atomic Environment Vectors) of molecular structures. The `torchani.AEVComputer` class is initialized with certain parameters such as Rcr (cutoff radius for radial symmetry functions), Rca (cutoff radius for angular symmetry functions), and a set of hyperparameters such as EtaR, EtaA, Zeta, ShfR, ShfA, and ShfZ, which are used to compute the AEVs of molecular structures.

In [6]:
Rcr = 5.2
EtaR = torch.tensor([16], dtype=torch.float)
ShfR = torch.tensor([0.900000,1.168750,1.437500,1.706250,1.975000,2.243750,2.51250,2.781250,3.050000,3.318750,3.587500,3.856250,4.125000,4.39375,4.662500,4.931250])
Rca = 3.5
EtaA = torch.tensor([8], dtype=torch.float)
ShfA = torch.tensor([0.900000,1.550000,2.200000,2.850000], dtype=torch.float)
ShfZ = torch.tensor([0.19634954,0.58904862,0.9817477,1.3744468,1.7671459,2.1598449,2.552544,2.945243]) 
Zeta = torch.tensor([32], dtype=torch.float)
symbols_order = ['H', 'C', 'N', 'O']
num_symbols = len(symbols_order) # number of different atom symbols

aev_computer = torchani.AEVComputer(Rcr, Rca, EtaR, ShfR, EtaA, Zeta, ShfA, ShfZ, num_symbols)

In [7]:
mapping = {"H": 0, "C": 1, "N": 2, "O": 3} # map atom types to integers
symbols = np.array([mapping[atom] for atom in S]) #S the same S as symbols
symbols = np.tile(symbols, (X.shape[0], 1)) 
symbols = torch.tensor(symbols)
X = torch.tensor(X)

In [8]:
aevs = aev_computer((symbols, X))[1]
# input: tuple
# symbols.shape: (Number of conformations, mapped atoms)
# X.shape: (Number of conformations, atoms, 3)
# output: SpeciesAEV object
# output[0] : symbols
# output[1].shape: (Number of conformations, number of atoms, num of AEV vectors)

In [9]:
aevs.shape

torch.Size([10080, 16, 384])

The `AEVComputer` returns a SpeciesAEV object that contains the mapped atomic symbols as well as the AEVs for each atom. In this case, 384 AEVs are generated for each atom and are stored as `aevs`.

## Create AEVs for All Molecules in Subset 2 ##

In [10]:
# AEV Computer hyperparameters
Rcr = 5.2
EtaR = torch.tensor([16], dtype=torch.float)
ShfR = torch.tensor([0.900000,1.168750,1.437500,1.706250,1.975000,2.243750,2.51250,2.781250,3.050000,3.318750,3.587500,3.856250,4.125000,4.39375,4.662500,4.931250])
Rca = 3.5
EtaA = torch.tensor([8], dtype=torch.float)
ShfA = torch.tensor([0.900000,1.550000,2.200000,2.850000], dtype=torch.float)
ShfZ = torch.tensor([0.19634954,0.58904862,0.9817477,1.3744468,1.7671459,2.1598449,2.552544,2.945243]) 
Zeta = torch.tensor([32], dtype=torch.float)

symbols_order = ['H', 'C', 'N', 'O']
num_symbols = len(symbols_order) # number of different atom symbols

aev_computer = torchani.AEVComputer(Rcr, Rca, EtaR, ShfR, EtaA, Zeta, ShfA, ShfZ, num_symbols)


data = pya.anidataloader('/global/scratch/users/joshuablomgren/ANI-1_release/ani_gdb_s02.h5')
data_iter = data.__iter__()

# create lists to store the AEVs and energies for all molecules
energies = []
aevs_all = []

mapping = {"H": 0, "C": 1, "N": 2, "O": 3} # map atom types to integers

# iterate through all the molecules in data subset 5
for mol in data_iter:
    X = torch.tensor(mol['coordinates'])
    S = mol['species']
    symbols = np.array([mapping[atom] for atom in S])
    symbols = np.tile(symbols, (X.shape[0], 1)) 
    symbols = torch.tensor(symbols)
    aevs = aev_computer((symbols, X))
    aevs_all.append(aevs)
    
    E = mols['energies']
    energies.append(E)

print(len(aevs_all))

13


In [13]:
len(energies)

13

In [14]:
len(aevs_all[0][1])

8469

The list `all_aevs` contains the SpeciesAEV objects for all molecules in the sub-dataset. Because there are 61 molecules in subset 4, there are 61 SpeciesAEV objects. The SpeciesAEV object is a tuple that contains the mapped atom symbols for each conformation as well as the AEVs for each atom of these configurations. 

Energies contains the list of all energies for each conformation. 

## Neural Network ##

In [15]:
training_data = aevs_all[:int(len(aevs_all)*0.8)]
training_labels = energies[:int(len(energies)*0.8)]
testing_data = aevs_all[int(len(aevs_all)*0.8):]
testing_labels = energies[int(len(energies)*0.8):]

In [16]:
print(f'Number of training molecules: {len(training_data)}')
print(f'Species + AEVs: {len(training_data[0])}')
print(f'Number of conformations: {len(training_data[0][0])}')
print(f'Number of atoms: {len(training_data[0][0][0])}')
print(f'Number of energies: {len(training_labels)}')

Number of training molecules: 10
Species + AEVs: 2
Number of conformations: 8469
Number of atoms: 8
Number of energies: 10


In [19]:
class ANI(nn.Module):
    def __init__(self, architecture):
        super().__init__()
        self.sub_nets = nn.ModuleDict({"C": ANI_sub(architecture), 
                                       "H": ANI_sub(architecture), 
                                       "N": ANI_sub(architecture), 
                                       "O": ANI_sub(architecture)})
        self.atom_types = {"C": 0, "H": 1, "N": 2, "O": 3}

    def forward(self, aevs, atom_types):
        atomic_energies = []
        for i in range(len(aevs)):
            atomic_energy = self.sub_nets[atom_types[i]](aevs[i])
            atomic_energies.append(atomic_energy)

        total_energies = torch.sum(atomic_energies, dim=0)
        return total_energies

class ANI_sub(nn.Module):
    def __init__(self, architecture):
        super().__init__()
        self.fc_layers = nn.ModuleList([])
        for i in range(len(architecture)-1):
            self.fc_layers.append(nn.Linear(architecture[i], architecture[i+1]))
            self.fc_layers.append(nn.ReLU())

        self.fc_layers = self.fc_layers[:-1]
        self.final_layer = nn.Linear(architecture[-2], 1)

    def forward(self, aev):
        x = aev
        for layer in self.fc_layers:
            x = layer(x)

        atomic_energy = self.final_layer(x)
        return atomic_energy

In [21]:
from functools import wraps
from time import time

def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        print('func:%r  took: %2.4f sec' % (f.__name__,  te-ts))
        return result
    return wrap

For Loss, I am planning to use Mean Squared Error since the model is predicting a specific value rather than classifying the data. I will need to implement a threshold when testing the accuracy of the model as to how close the prediction is to the actual energy value.

In [22]:
def create_chunks(complete_list, chunk_size=None, num_chunks=None):
    '''
    Cut a list into multiple chunks, each having chunk_size (the last chunk might be less than chunk_size) or having a total of num_chunk chunks
    '''
    chunks = []
    if num_chunks is None:
        num_chunks = math.ceil(len(complete_list) / chunk_size)
    elif chunk_size is None:
        chunk_size = math.ceil(len(complete_list) / num_chunks)
    for i in range(num_chunks):
        chunks.append(complete_list[i * chunk_size: (i + 1) * chunk_size])
    return chunks

class Trainer():
    def __init__(self, model, optimizer_type, learning_rate, epoch, batch_size, input_transform=lambda x: x):
        """ The class for training the model
        model: nn.Module
            A pytorch model
        optimizer_type: 'adam' or 'sgd'
        learning_rate: float
        epoch: int
        batch_size: int
        input_transform: func
            transforming input. Can do reshape here
        """
        self.model = model
        if optimizer_type == "sgd":
            self.optimizer = SGD(model.parameters(), learning_rate,momentum=0.9)
        elif optimizer_type == "adam":
            self.optimizer = Adam(model.parameters(), learning_rate)
            
        self.epoch = epoch
        self.batch_size = batch_size
        self.input_transform = input_transform


    @timing
    def train(self, inputs, outputs, val_inputs, val_outputs,early_stop=False,l2=False,silent=False):
        """ train self.model with specified arguments
        inputs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        val_nputs: np.array, The shape of input_transform(val_input) should be (ndata,nfeatures)
        val_outputs: np.array shape (ndata,)
        early_stop: bool
        l2: bool
        silent: bool. Controls whether or not to print the train and val error during training
        
        @return
        a dictionary of arrays with train and val losses and accuracies
        """
        ### convert data to correct shape using self.input_transform ###
        inputs = self.input_transform(inputs)
        val_inputs = self.input_transform(val_inputs)
        ### convert data to torch tensors ###
        inputs = torch.tensor(inputs, dtype=torch.float)
        outputs = torch.tensor(outputs, dtype=torch.int64)
        
        losses = []
        accuracies = []
        val_losses = []
        val_accuracies = []
        weights = self.model.state_dict()
        lowest_val_loss = np.inf
        
        for n_epoch in tqdm(range(self.epoch), leave=False):
            self.model.train()
            batch_indices = list(range(inputs.shape[0]))
            random.shuffle(batch_indices)
            batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
            epoch_loss = 0
            epoch_acc = 0
            for batch in batch_indices:
                batch_importance = len(batch) / len(outputs)
                batch_input = inputs[batch]
                batch_output = outputs[batch]
                ### make prediction and compute loss with loss function of your choice on this batch ###
                batch_predictions = self.model(batch_input)
                loss = nn.MSELoss()(batch_predictions, batch_output)
                if l2:
                    ### Compute the loss with L2 regularization ###
                    l2_lambda = 1e-5
                    l2_norm = sum([p.pow(2.0).sum() for p in self.model.parameters()])
                    loss = loss + l2_lambda * l2_norm
                ### Backpropagation ###
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                ### Compute epoch_loss and epoch_acc
                epoch_loss += loss.detach().item() * batch_importance
                pred = torch.argmax(batch_predictions, axis=-1)
                acc = torch.sum(pred == batch_output) / len(batch_output)
                epoch_acc += acc.detach().item() * batch_importance
            val_loss, val_acc = self.evaluate(val_inputs, val_outputs, print_acc=False)
            if n_epoch % 10 == 0 and not silent: 
                print("Epoch %d/%d - Loss: %.3f - Acc: %.3f" % (n_epoch + 1, self.epoch, 
                                                                epoch_loss, epoch_acc))
                print("              Val_loss: %.3f - Val_acc: %.3f" % (val_loss, val_acc))
            losses.append(epoch_loss)
            accuracies.append(epoch_acc)
            val_losses.append(val_loss)
            val_accuracies.append(val_acc)
            if early_stop:
                if val_loss < lowest_val_loss:
                    lowest_val_loss = val_loss
                    weights = self.model.state_dict()

        if early_stop:
            self.model.load_state_dict(weights)    

        return {"losses": losses, "accuracies": accuracies, "val_losses": val_losses, 
                "val_accuracies": val_accuracies}
        
    def evaluate(self, inputs, outputs, print_acc=True):
        """ evaluate model on provided input and output
        inputs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        print_acc: bool
        
        @return
        losses: float
        acc: float
        """
        ### convert data to correct shape using self.input_transform ###
        inputs = self.input_transform(inputs)
        ### convert data to torch tensors ###
        inputs = torch.tensor(inputs, dtype=torch.float)
        outputs = torch.tensor(outputs, dtype=torch.int64)
        
        self.model.eval() 
        batch_indices = list(range(inputs.shape[0]))
        batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
        acc = 0
        losses = 0
        for batch in batch_indices:
            batch_importance = len(batch) / len(outputs)
            batch_input = inputs[batch]
            batch_output = outputs[batch]
            with torch.no_grad():
                ### Compute predictions and loss ###
                batch_predictions = self.model(batch_input)
                loss = nn.MSELoss()(batch_predictions, batch_output)
            pred = torch.argmax(batch_predictions, axis=-1)
            batch_acc = torch.sum(pred == batch_output) / len(batch_output)
            losses += loss.detach().item() * batch_importance
            acc += batch_acc.detach().item() * batch_importance
        if print_acc:
            print("Accuracy: %.3f" % acc)
        return losses, acc