In [191]:
from pyanitools import anidataloader
data = anidataloader("ani_gdb_s01.h5")
data_iter = data.__iter__()

In [192]:
mols = next(data_iter)
# Extract the data
P = mols['path']
X = mols['coordinates']
E = mols['energies'] #energies are in hartree
S = mols['species']
sm = mols['smiles']

# Print the data
print("Path:   ", P)
print("  Smiles:      ","".join(sm))
print("  Symbols:     ", S)
print("  Coordinates: ", X.shape)
print("  Energies:    ", E.shape, "\n")


Path:    /gdb11_s01/gdb11_s01-0
  Smiles:       [H]C([H])([H])[H]
  Symbols:      ['C', 'H', 'H', 'H', 'H']
  Coordinates:  (5400, 5, 3)
  Energies:     (5400,) 



In [193]:
data_iter = data.__iter__()
count = 0
count_conf = 0
for mol in data_iter:
    count+=1
    count_conf += len(mol['energies'])
print(count)
print(count_conf)

3
10800


In [194]:
data.cleanup()

In [195]:
import numpy as np


def calc_f_C(Rij, RC):
    """Calculate the local atomic environment approximation f_c. """
    f_C_value = 0.5 * np.cos(np.pi * Rij / RC) + 0.5
    # Make f_C(0)=0 to make sure the sum in distance conversion function 
    # and radial conversion function can run with j=i
    indicator = ((Rij <= RC) & (Rij != 0)).astype(float) 
    return f_C_value * indicator
    

def radial_component(Rijs, eta, Rs, RC=5.2):
    # Rijs is a 1d array, all other parameters are scalars
    f_C_values = calc_f_C(Rijs, RC)
    individual_components = np.exp(-eta * (Rijs - Rs) ** 2) * f_C_values
    return np.sum(individual_components)

def angular_component(Rij_vectors, Rik_vectors, zeta, theta_s, eta, Rs, RC=3.5):
    # Rij_vectors and Rik_vectors are 2d arrays with shape (n_atoms, 3), all other parameters are scalars
    # calculate theta_ijk values from vector operations
    dot_products = Rij_vectors.dot(Rik_vectors.T)
    Rij_norms = np.linalg.norm(Rij_vectors, axis=-1)
    Rik_norms = np.linalg.norm(Rik_vectors, axis=-1)
    norms = Rij_norms.reshape((-1, 1)).dot(Rik_norms.reshape((1, -1)))
    cos_values = np.clip(dot_products / (norms + 1e-8), -1, 1)
    theta_ijks = np.arccos(cos_values)
    theta_ijk_filter = (theta_ijks != 0).astype(float)
    mean_dists = (Rij_norms.reshape((-1, 1)) + Rik_norms.reshape((1, -1))) / 2
    f_C_values_Rij = calc_f_C(Rij_norms, RC)
    f_C_values_Rik = calc_f_C(Rik_norms, RC)
    f_C_values = f_C_values_Rij.reshape((-1, 1)).dot(f_C_values_Rik.reshape((1, -1)))
    individual_components = (1 + np.cos(theta_ijks - theta_s)) ** zeta * \
        np.exp(-eta * (mean_dists - Rs) ** 2) * f_C_values * theta_ijk_filter
    return 2 ** (1 - zeta) * np.sum(individual_components)

def calc_aev(atom_types, coords, i_index):
    # atom_types are np.array of ints
    relative_coordinates = coords - coords[i_index]
    nearby_atom_indicator = np.linalg.norm(relative_coordinates, axis=-1) < 5.3
    relative_coordinates = relative_coordinates[nearby_atom_indicator]
    atom_types = atom_types[nearby_atom_indicator]
    radial_aev = np.array([radial_component(np.linalg.norm(relative_coordinates[atom_types == atom], 
                                                           axis=-1), eta, Rs) \
                           for atom in [0, 1, 2, 3] for eta in [16] \
                           for Rs in [0.900000,1.168750,1.437500,1.706250,1.975000,2.243750,2.51250,2.781250,3.050000,\
                                   3.318750,3.587500,3.856250,4.125000,4.39375,4.662500,4.931250]])
    angular_aev = np.array([angular_component(relative_coordinates[atom_types == atom_j], 
                                              relative_coordinates[atom_types == atom_k],\
                                             zeta, theta_s, eta, Rs) \
                            for atom_j in [0, 1, 2, 3] for atom_k in range(atom_j, 4) for zeta in [32] \
                            for theta_s in [0.19634954,0.58904862,0.9817477,1.3744468,1.7671459,2.1598449,2.552544,2.945243]\
                            for eta in [8] for Rs in [0.900000,1.550000,2.200000,2.850000]])
    print(len(radial_aev), len(angular_aev))
    return np.concatenate([radial_aev, angular_aev])

        

In [196]:
import numpy as np
import matplotlib.pyplot as plt
r_ij = np.linspace(0,10,500)
rc = 5.2
# plt.plot(r_ij,[calc_f_C(r,rc)for r in r_ij]);


In [197]:
mapping={"H":0, "C":1, "N":2, "O":3}
elements= np.array([mapping[atom] for atom in S])
elements


array([1, 0, 0, 0, 0])

In [198]:
from functools import wraps
from time import time

def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        print('func:%r  took: %2.4f sec' % (f.__name__,  te-ts))
        return result
    return wrap


In [199]:
# from torch.optim import SGD, Adam
# import torch.nn.functional as F
# import random
# from tqdm import tqdm
# import math
# import torch
# import torch.nn as nn
# architecture = {"input_size": 1, "hidden1": 128, "hidden2": 128, "hidden3": 64, "output_size": 64} 
# class ANI(nn.Module):
#     def __init__(self):
#         super(ANI, self).__init__()
#         self.sub_nets = nn.ModuleDict({
#             'C': ANI_sub(architecture),
#             'H': ANI_sub(architecture),
#             'N': ANI_sub(architecture),
#             'O': ANI_sub(architecture)
#         })
#         # self.network = create_network(input_dim)

#     def forward(self, aevs, atom_types):
#         atomic_energies = []
#         for atom_type, sub in self.sub_nets.items():
#             atom = (atom_types == atom_type).unsqueeze(-1)
#             atom_aev = aevs * atom
#             atomic_energies[atom_type] = sub(atom_aev)
#         # total_energies = torch.sum(atomic_energies,dim=...)
#         total_energies = torch.stack(list(atomic_energies.values())).sum(dim=0)
#         return total_energies


# class ANI_sub(nn.Module):
#     def __init__(self, architecture):
#         super().__init__()
#         self.fc1 = nn.Linear(architecture['input_size'], architecture['hidden1'])
#         self.act1 = nn.Tanh()  
#         self.fc2 = nn.Linear(architecture['hidden1'], architecture['hidden2'])
#         self.act2 = nn.Tanh() 
#         self.fc3 = nn.Linear(architecture['hidden2'], architecture['hidden3'])
#         self.act3 = nn.Tanh() 
#         self.fc4 = nn.Linear(architecture['hidden3'], architecture['output_size'])

        

#     def forward(self, aev):
#         x = self.fc1(aev)
#         x = self.act1(x)
#         x = self.fc2(x)
#         x = self.act2(x)
#         x = self.fc3(x)
#         x = self.act3(x)
#         x = self.fc4(x)
#         atomic_energy = x.squeeze(dim=1)
#         return atomic_energy
      


# # def create_network(input_dim):
# #     return nn.Sequential(
# #         nn.Linear(input_dim, 128),
# #         nn.SiLU(),
# #         nn.Linear(128, 128),
# #         nn.SiLU(),
# #         nn.Linear(128, 64),
# #         nn.SiLU(),
# #         nn.Linear(64, 1)
# #     )

# # element_networks = nn.ModuleDict({
# #     'H': create_network(aev_dim),
# #     'C': create_network(aev_dim),
# #     'N': create_network(aev_dim),
# #     'O': create_network(aev_dim)
# # })


In [189]:
elements

array([1, 0, 0, 0, 0])

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
Caevs = [] 
Haevs = [] 
Naevs = [] 
Oaevs = [] 

# for i in range(len(S)): 
#     aev = calc_aev(elements, X[i], i)
#     # scaled_aev = scaler.fit_transform(aev) 
#     aevs.append(aev)

aevs = []
for coords in X:
    for atom_index in range(len(elements)):
        aev = calc_aev(elements, coords, atom_index)
        aevs.append(aev)

all_aevs = np.array(aevs)




64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320
64 320

In [956]:
from torch.optim import SGD, Adam
import torch.nn.functional as F
import random
from tqdm import tqdm
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import math 

def create_chunks(complete_list, chunk_size=None, num_chunks=None):
    '''
    Cut a list into multiple chunks, each having chunk_size (the last chunk might be less than chunk_size)
    or having a total of num_chunk chunks
    '''
    chunks = []
    if num_chunks is None:
        num_chunks = math.ceil(len(complete_list) / chunk_size)
    elif chunk_size is None:
        chunk_size = math.ceil(len(complete_list) / num_chunks)
    for i in range(num_chunks):
        chunks.append(complete_list[i * chunk_size: (i + 1) * chunk_size])
    return chunks

class Trainer():
    def __init__(self, models, optimizer_type, learning_rates, epochs, batch_size,\
                 input_transform=lambda x: x, max_tries=10, lr_decay=0.5, lr_min=1e-5):
        """ The class for training the model
        models: list of models, one for each atom in the molecule 
        optimizer_type: 'adam' or 'sgd'
        learning_rates: list of learning rates for each network 
        epochs: lsit of epochs 
        batch_size: int
        input_transform: func
        max_tries: int, # of epochs with no improvement, then reduce lr 
        lr_decay: float, factor to reduce lr by 
        lr_min: float, minimum learning rate threshold
        """
        # self.model = model
        self.models = models 
        self.optimizer_type = optimizer_type
        self.learning_rates = learning_rates
        self.optimizers = []
        for model, lr in zip(self.models, self.learning_rates): 
            if self.optimizer_type == "sgd":
                optimizer = SGD(model.parameters(), lr, momentum=0.9)
            elif self.optimizer_type == "adam":
                optimizer = Adam(model.parameters(), lr, betas=(0.9, 0.999))
            self.optimizers.append(optimizer)
        
        self.epochs = epochs
        self.batch_size = batch_size
        self.input_transform = input_transform
        self.max_tries = max_tries 
        self.lr_decay = lr_decay
        self.lr_min = lr_min 
    
        
    def input_transform(self, inputs): 
        return torch.tensor(inputs, dtype=torch.float)

    
    @staticmethod
    def rmse(predictions, targets):
        """To calculate accuracy"""
        return np.sqrt(((predictions - targets) ** 2).mean())
    
    @staticmethod
    def sum_outputs(outputs):
        return torch.stack(outputs).sum(dim=0)

    @timing
    def train(self, aevs, outputs, val_aevs, val_outputs, learning_rates, early_stop=False, l2=False, silent=False):
        """ Train the models with the specified inputs/outputs. 
        aevs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        val_nputs: np.array, The shape of input_transform(val_input) should be (ndata,nfeatures)
        val_outputs: np.array shape (ndata,)
        early_stop: bool
        l2: bool
        silent: bool. Controls whether or not to print the train and val error during training
        @return
        a dictionary of arrays with train and val losses and accuracies
        """
        
        losses = []
        # accuracies = []
        val_losses = []
        # val_accuracies = []
        lowest_val_loss = np.inf 
        
        
        # loop through the different models and their parameters, and their set of inputs 
        for model, optimizer, X_train, y_train, X_val, y_val, learning_rate, epoch in zip\
                                (self.models, self.optimizers, aevs, outputs, val_aevs, val_outputs, learning_rates, self.epochs):
            inputs = self.input_transform(X_train)
            inputs = torch.tensor(X_train, dtype=torch.float)
            outputs = torch.tensor(y_train, dtype=torch.float32) 
            val_inputs = torch.tensor(X_val, dtype=torch.float)
            val_outputs = torch.tensor(y_val, dtype=torch.float32) 
            no_improvement = 0 


            for n_epoch in tqdm(range(epoch), leave=False):
                model.train()
                batch_indices = list(range(len(X_train)))
                random.shuffle(batch_indices)
                batch_indices = list(range(inputs.shape[0]))
                batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
                epoch_loss = 0
                epoch_acc = 0
                rmse = 0 
                

                for batch in batch_indices:
                    batch_importance = len(batch) / len(outputs)  # ratio of this batch
                    batch_input = torch.tensor(X_train[batch], dtype=torch.float)
                    batch_input = batch_input.view(-1, X_train.shape[1])  # reshape input to (batch_size, input_dim)
                    batch_output = torch.tensor(y_train[batch], dtype=torch.float32)
           
                 
                    batch_predictions = model(batch_input)
#                     loss = nn.MSELoss()(batch_predictions, batch_output)

#                     if l2:
#                         l2_norm = sum([p.pow(2.0).sum() for p in model.parameters()])
#                         loss = loss + 1e-5 * l2_norm

                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    epoch_loss += loss.detach().cpu().item() * batch_importance

            # tot_predictions = torch.sum(model(torch.tensor(X_train, dtype=torch.float)).detach(), dim=1)
            # tot_rmse = self.rmse(tot_predictions, torch.tensor(y_train, dtype=torch.float32))
            final_output = model(torch.tensor(X_train, dtype=torch.float)).detach()
            tot_predictions = torch.sum(final_output, dim=0)

            val_loss = self.evaluate(model, val_aevs, val_outputs, print_acc=False)

            if val_loss < lowest_val_loss:
                lowest_val_loss = val_loss
                weights = model.state_dict()
            else: 
                no_improvement += 1 

            if no_improvement >= self.max_tries:
                # reduce learning rate and reset count (new try) 
                learning_rate *= self.lr_decay 
                learning_rate = max(learning_rate, self.lr_min)
                optimizer.param_groups[0]['lr'] = learning_rate
                no_improvement = 0

            if n_epoch % 10 == 0 and not silent: 
                print("Model: %d - Epoch %d/%d - Loss: %.3f" % (self.models.index(model) + 1, n_epoch + 1, epoch, epoch_loss))
                print("              Val_loss: %.3f" % val_loss)
            losses.append(epoch_loss)
            # accuracies.append(epoch_acc)
            val_losses.append(val_loss)
            # val_accuracies.append(val_acc)

        if early_stop:
            model.load_state_dict(weights)    

        return {"losses": losses, "val_losses": val_losses}
    
    def evaluate(self, model, inputs, outputs, print_acc=True):
        """ evaluate model on provided input and output
        inputs: np.array, The shape of input_transform(input) should be (ndata,nfeatures)
        outputs: np.array shape (ndata,)
        print_acc: bool
        
        @return
        losses: float
        rmse: float (rmse) 
        """
        inputs = self.input_transform(inputs)
        inputs = torch.tensor(inputs, dtype=torch.float)  # Convert inputs to a tensor
        outputs = torch.tensor(outputs, dtype=torch.float32)
        model.eval()
        batch_indices = list(range(inputs.shape[0]))
        batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
        rmse = 0
        losses = 0
        for batch in batch_indices:
            batch_importance = len(batch) / len(outputs)
            batch_input = inputs[batch]
            batch_output = outputs[batch]
            with torch.no_grad():
                batch_predictions = model(batch_input)
                tot_batch_predictions = torch.sum(batch_predictions, dim=0)
                tot_batch_predictions = tot_batch_predictions.view(-1) 
                batch_output = batch_output.view(-1) 
                loss = nn.MSELoss()(tot_batch_predictions, batch_output)
            batch_rmse = self.rmse(batch_predictions, batch_output)
            losses += loss.detach().cpu().item() * batch_importance
            # rmse += batch_rmse * batch_importance

        if print_acc:
            print("rmse: %.3f" % losses)
        return losses

    

# Training

In [957]:
from torch.optim import SGD, Adam
import torch.nn.functional as F
import random
from tqdm import tqdm
import math
import torch
import torch.nn as nn
architecture = {"input_size": 384, "hidden1": 128, "hidden2": 128, "hidden3": 64, "output_size": 1} 

class ANI_single(nn.Module):
    def __init__(self, input_dim):
        super(ANI_single, self).__init__()
        self.fc1 = nn.Linear(input_dim, architecture['hidden1'])
        self.act1 = nn.Tanh()  
        self.fc2 = nn.Linear(architecture['hidden1'], architecture['hidden2'])
        self.act2 = nn.Tanh() 
        self.fc3 = nn.Linear(architecture['hidden2'], architecture['hidden3'])
        self.act3 = nn.Tanh() 
        self.fc4 = nn.Linear(architecture['hidden3'], 1)

    def forward(self, aev):
        x = self.fc1(aev)
 
        x = self.act1(x)
        x = self.fc2(x)
        x = self.act2(x)
        x = self.fc3(x)
        x = self.act3(x)
        x = self.fc4(x)
        atomic_energy = x.squeeze(dim=1)
        return atomic_energy
 


In [958]:
len(aevs)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
all_aevs = scaler.fit_transform(all_aevs)

y=E
scaler_y = StandardScaler()
# y = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()
y = scaler_y.fit_transform(y.reshape(-1,1)).flatten()

C = all_aevs[0:5400]
H1 = all_aevs[5400:10800]
H2 = all_aevs[10800:16200]
H3 = all_aevs[16200:21600]
H4 = all_aevs[21600:27000]
aevs_norm = np.array([C, H1, H2, H3, H4])
molecule_aevs = {'C': C, 'H1': H1, 'H2': H2, 'H3': H3, 'H4': H4} 

In [959]:
from sklearn.model_selection import train_test_split

def split_train_test(aevs, y, test_size=0.2, random_state=42):
    aevs_train, aevs_test, y_train, y_test = train_test_split(aevs, y, test_size=test_size, random_state=random_state)
    X_train, X_val, y_train, y_val = split_train_val(aevs_train, y_train)
    return X_train, X_val, y_train, y_val, aevs_test, y_test 

def split_train_val(aevs_train, y_train, test_size=0.2, random_state=42):
    X_train, X_val, y_train, y_val = train_test_split(aevs_train, y_train, test_size=test_size, random_state=random_state)
    return X_train, X_val, y_train, y_val

def make_models(molecule_aevs): 
    models = [] 
    for aev in molecule_aevs.values(): 
        model = ANI_single(aev.shape[1])
        models.append(model) 
    return models 

def make_splits(aevs, y): 
    X_train_set, X_val_set, y_train_set, y_val_set = [], [], [], []
    for i in range(len(aevs)): 
        aevs_train, aevs_test, y_train, y_test = train_test_split(aevs[i], y, test_size=0.2, random_state=42)
        X_train, X_val, y_train, y_val = train_test_split(aevs_train, y_train, test_size=0.2, random_state=42)
        X_train_set.append(X_train) 
        X_val_set.append(X_val) 
        y_train_set.append(y_train) 
        y_val_set.append(y_val) 
    return X_train_set, X_val_set, y_train_set, y_val_set

In [966]:
aev_dim = len(aev)
optimizer_type = 'adam'
learning_rate = 0.001 
epoch = 100
batch_size= 128 
models = make_models(molecule_aevs) 
learning_rates = [0.001, 0.001, 0.001, 0.001, 0.001]
epochs = [100,100,100,100,100]
X_train_set, X_val_set, y_train_set, y_val_set = make_splits(aevs_norm, y)

trainer = Trainer(models, optimizer_type, learning_rates, epochs, batch_size, input_transform=lambda x: x,)
# results = trainer.train(X_train_set, y_train_set, X_val_set, y_val_set, learning_rates)


5

In [970]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
X_train_set = torch.tensor(X_train_set, dtype=torch.float32)
y_train_set = torch.tensor(y_train_set, dtype=torch.float32)
X_val_set = torch.tensor(X_val_set, dtype=torch.float32)
y_val_set = torch.tensor(y_val_set, dtype=torch.float32)

batch_size= 128 
train_dataset = TensorDataset(X_train_set[0], y_train_set[0])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = TensorDataset(X_val_set[0], y_val_set[0])
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
model1 = ANI_single(384) 



def simple_train(model, train_dataset, train_loader, val_dataset, val_loader, epochs, learning_rate): 
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        model.train()
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for val_inputs, val_targets in val_loader:
                val_outputs = model(val_inputs)
                val_loss += criterion(val_outputs, val_targets).item()

        val_loss /= len(val_loader)
type(X_train_set[0])

  X_train_set = torch.tensor(X_train_set, dtype=torch.float32)
  y_train_set = torch.tensor(y_train_set, dtype=torch.float32)
  X_val_set = torch.tensor(X_val_set, dtype=torch.float32)
  y_val_set = torch.tensor(y_val_set, dtype=torch.float32)


torch.Tensor

In [None]:
# final_predictions = torch.zeros_like(predictions_ex)  # Initialize with zeros
# for model in models:
#     model.eval()  
#     model_predictions = model(X_test)  

#     final_predictions += model_predictions

In [None]:

from torch.optim import SGD, Adam
import torch.nn.functional as F
import random
from tqdm import tqdm
import math
import torch
import torch.nn as nn

class ANIv2(nn.Module):
    def __init__(self, aev_dim, sub_nets):
        super().__init__()
        self.sub_nets = nn.ModuleDict(sub_nets)

    def forward(self, aevs, atom_types):
        atomic_energies = {}
        
        
        for atom_type, net in self.sub_nets.items():
            atom = (atom_types == int(atom_type)).unsqueeze(-1)
            atom_aev = aevs * atom
            atomic_energies[atom_type] = net(atom_aev)

        total_energies = torch.cat(list(atomic_energies.values()), dim=0).sum(dim=0)

        return total_energies
    
H_network = torch.nn.Sequential(
    torch.nn.Linear(aev_dim, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 1)
)

C_network = torch.nn.Sequential(
    torch.nn.Linear(aev_dim, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 1)
)

N_network = torch.nn.Sequential(
    torch.nn.Linear(aev_dim, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 1)
)

O_network = torch.nn.Sequential(
    torch.nn.Linear(aev_dim, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 128),
    torch.nn.Tanh(),
    torch.nn.Linear(128, 64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 1)
)
sub_nets = {'1': H_network, '0': C_network, '2': N_network, '3': O_network}


architecture = {"input_size": 1, "hidden1": 128, "hidden2": 128, "hidden3": 64, "output_size": 64} 



#     def forward(self, aev):
#         x = self.fc1(aev)
#         x = self.act1(x)
#         x = self.fc2(x)
#         x = self.act2(x)
#         x = self.fc3(x)
#         x = self.act3(x)
#         x = self.fc4(x)
#         atomic_energy = x.squeeze(dim=1)
#         return atomic_energy
      


In [None]:


def create_chunks(complete_list, chunk_size=None, num_chunks=None):
    '''
    Cut a list into multiple chunks, each having chunk_size (the last chunk might be less than chunk_size)
    or having a total of num_chunk chunks
    '''
    chunks = []
    if num_chunks is None:
        num_chunks = math.ceil(len(complete_list) / chunk_size)
    elif chunk_size is None:
        chunk_size = math.ceil(len(complete_list) / num_chunks)
    for i in range(num_chunks):
        chunks.append(complete_list[i * chunk_size: (i + 1) * chunk_size])
    return chunks

class Trainer2():
    def __init__(self, atom_networks, optimizer_type, learning_rate, epoch, batch_size,
                 input_transform=lambda aevs, atom_types: {atom_type: aev for atom_type, aev in zip(atom_types, aevs)}, max_tries=10, lr_decay=0.5, lr_min=1e-5):
        """ The class for training the model
        atom_networks: dict
            keys are atom types, and values are corresponding neural networks.
        optimizer_type: 'adam' or 'sgd'
        learning_rate: float
        epoch: int
        batch_size: int
        input_transform: func
        max_tries: int, # of epochs with no improvement, then reduce lr 
        lr_decay: float, factor to reduce lr by 
        lr_min: float, minimum learning rate threshold
        """
        self.atom_networks = atom_networks
        self.optimizer_type = optimizer_type
        self.learning_rate = learning_rate
        self.epoch = epoch
        self.batch_size = batch_size
        self.input_transform = input_transform
        self.max_tries = max_tries 
        self.lr_decay = lr_decay
        self.lr_min = lr_min 
        self.optimizer = {f'network_{i}': self.initialize_optimizer(atom_network) for i,\
                          atom_network in enumerate(self.atom_networks)}

    def initialize_optimizer(self,atom_network):
        if self.optimizer_type == "sgd":
            return SGD(atom_network.parameters(), self.learning_rate, momentum=0.9)
        elif self.optimizer_type == "adam":
            return Adam(atom_network.parameters(), self.learning_rate, betas=(0.9, 0.999))

    def input_transform(self, aevs, atom_types):
        transformed_inputs = {}
        for atom_type, aev in zip(atom_types, aevs):
            aev_tens = torch.tensor(aev, dtype=torch.float)
            transformed_inputs[atom_type] = aev_tens
        return transformed_inputs

    def train(self, aevs, atom_types, outputs, val_aevs, val_atom_types, val_outputs,\
              early_stop=False, l2=False, silent=False):
        """ Train the model with specified arguments
        inputs: 
        outputs: np.array shape (ndata,)
        val_inputs: 
        val_outputs: np.array shape (ndata,)
        early_stop: bool
        l2: bool
        silent: bool. Controls whether or not to print the train and val error during training
        @return
        a dictionary of arrays with train and val losses and accuracies
        """
        inputs = self.input_transform(aevs, atom_types) 
        inputs = {atom_type: torch.tensor(atom_inputs, dtype=torch.float) for atom_type,\
                  atom_inputs in inputs.items()}
        outputs = torch.tensor(outputs, dtype=torch.float32)
        val_inputs = self.input_transform(aevs, atom_types) 
        val_inputs = {atom_type: torch.tensor(atom_inputs, dtype=torch.float) for atom_type,\
                      atom_inputs in val_inputs.items()}
        val_outputs = torch.tensor(val_outputs, dtype=torch.float32)

        losses = {atom_type: [] for atom_type in self.atom_networks}
        accuracies = {atom_type: [] for atom_type in self.atom_networks}
        val_losses = {atom_type: [] for atom_type in self.atom_networks}
        val_rmse = {atom_type: [] for atom_type in self.atom_networks}
        weights = {f'network_{i}': atom_network.state_dict() for i, atom_network in enumerate(self.atom_networks)}
        lowest_val_loss = {atom_type: np.inf for atom_type in self.atom_networks}
        no_improvement = {atom_type: 0 for atom_type in self.atom_networks}

        for n_epoch in tqdm(range(self.epoch), leave=False):
            for atom_type, atom_network in enumerate(self.atom_networks):
                atom_network.train()
                optimizer = self.optimizer[atom_type]
                learning_rate = self.learning_rates[atom_type]
                batch_indices = list(range(inputs[atom_type].shape[0]))
                random.shuffle(batch_indices)
                batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
                epoch_loss = 0
                epoch_acc = 0
                for batch in batch_indices:
                    batch_importance = len(batch) / len(outputs)  # ratio of this batch
                    batch_input = inputs[atom_type][batch]
                    batch_output = outputs[batch]
                    # make prediction and compute loss with loss function of your choice on this batch
                    batch_predictions = atom_network(batch_input)
                    loss = nn.MSELoss()(batch_predictions, batch_output)

                    if l2:
                        l2_norm = sum([p.pow(2.0).sum() for p in atom_network.parameters()])
                        loss = loss + 1e-5 * l2_norm
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    epoch_loss += loss.detach().cpu().item() * batch_importance

                val_losses, val_rmse = self.evaluate(val_aevs, val_outputs, atom_types=val_atom_types, print_acc=False)

                if val_loss < lowest_val_loss[f'network_{i}']:
                    lowest_val_loss[f'network_{i}'] = val_loss
                    weights[f'network_{i}'] = atom_network.state_dict()
                else:
                    no_improvement[atom_type] += 1
                if no_improvement[atom_type] >= self.max_tries:
                    # reduce learning rate and reset count (new try)
                    self.learning_rates[atom_type] *= self.lr_decay
                    learning_rate = max(self.learning_rates[atom_type], self.lr_min)
                    self.optimizer[atom_type].param_groups[0]['lr'] = learning_rate
                    no_improvement[atom_type] = 0
                if n_epoch % 10 == 0 and not silent:
                    print(f"Epoch {n_epoch + 1}/{self.epoch} - Atom Type: {atom_type} - Loss: {epoch_loss:.3f} - Acc: {epoch_acc:.3f}")
                    print(f"              Val_loss: {val_loss:.3f} - Val_acc: {val_acc:.3f}")

                losses[atom_type].append(epoch_loss)
                accuracies[atom_type].append(epoch_acc)
                val_losses[atom_type].append(val_loss)
                val_accuracies[atom_type].append(val_acc)

        if early_stop:
            for atom_type, atom_network in self.atom_networks.items():
                atom_network.load_state_dict(weights[atom_type])

        return {"losses": losses, "accuracies": accuracies, "val_losses": val_losses, "val_accuracies": val_accuracies}

    def evaluate(self, inputs, outputs, atom_types=None, print_acc=True):
        """ Evaluate model on provided input and output
        inputs: dict
            keys are atom types, and values are input data for each atom type
        outputs: np.array shape (ndata,)
        print_acc: bool

        @return
        losses: float
        rmse: float (rmse)
        """
        inputs = self.input_transform(inputs, atom_types) 
        inputs = {atom_type: torch.tensor(atom_inputs, dtype=torch.float) for atom_type, atom_inputs in inputs.items()}
        outputs = torch.tensor(outputs, dtype=torch.float32)
        atom_types_to_evaluate = atom_types if atom_types else self.atom_networks.keys()

        for atom_type in atom_types_to_evaluate:
            self.model.eval()
            batch_indices = list(range(inputs[next(iter(inputs))].shape[0]))
            batch_indices = create_chunks(batch_indices, chunk_size=self.batch_size)
            rmse = 0
            losses = 0
            for batch in batch_indices:
                batch_importance = len(batch) / len(outputs)
                batch_input = {atom_type: atom_inputs[batch] for atom_type, atom_inputs in inputs.items()}
                batch_output = outputs[batch]
                with torch.no_grad():
                    batch_predictions = {atom_type: atom_network(batch_input[atom_type]) for atom_type, atom_network in self.atom_networks.items()}
                    loss = sum([nn.MSELoss()(batch_predictions[atom_type], batch_output) for atom_type in self.atom_networks])
                batch_rmse = sum([self.rmse(batch_predictions[atom_type], batch_output) for atom_type in self.atom_networks])
                losses += loss.detach().cpu().item() * batch_importance
                rmse += batch_rmse * batch_importance

            if print_acc:
                print("rmse: %.3f" % rmse)

        return losses, rmse

In [503]:

aev_dim = len(aevs[0])
n_aevs  = len(all_aevs) 
methane = ANIv2(aev_dim, sub_nets)


In [504]:
import tensorflow as tf 
atom_types = torch.tensor(elements)
# array([1, 0, 0, 0, 0])
C_atoms = np.ones(5400) 
Hs = np.zeros(5400)
H_atoms = np.zeros(21600) 
atom_types = np.concatenate([C_atoms, H_atoms])
# len(all_aevs), len(atom_types)
# Xtrain, Xtest, atom_train, atom_test, y_train, y_test = train_test_split(all_aevs, atom_types, y, test_size=0.2, random_state=42)
# aevs_train, aevs_val, atoms_train, atoms_val,outputs_train, outputs_val = train_test_split(Xtrain, atom_trainy_train, test_size=0.2, random_state=42)

In [505]:
import torch
import torch.nn as nn
from torch.optim import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
Ctrain, Ctest, atoms_train, atoms_test, y_trainC, y_testC = train_test_split(C, C_atoms, y, test_size=0.2, random_state=42)
H1train, H1test, H1atoms_train, H1atoms_test, y_trainH1, y_testH1 = train_test_split(H1, Hs, y, test_size=0.2, random_state=42)
H2train, H2test, H2atoms_train, H2atoms_test, y_trainH2, y_testH2 = train_test_split(H2, Hs, y, test_size=0.2, random_state=42)
H3train, H3test, H3atoms_train, H3atoms_test, y_trainH3, y_testH3 = train_test_split(H3, Hs, y, test_size=0.2, random_state=42)
H4train, H4test, H4atoms_train, H4atoms_test, y_trainH4, y_testH4 = train_test_split(H4, Hs, y, test_size=0.2, random_state=42)



In [506]:
aevs_train_C, aevs_val_C, atoms_trainC, atoms_valC, outputs_train_C, outputs_val_C = train_test_split(Ctrain, atoms_train,\
                                                                                    y_trainC, test_size=0.2, random_state=42)
aevs_train_H1, aevs_val_H1, atoms_trainH1, atoms_valH1, outputs_train_H1, outputs_val_H1 = train_test_split(H1train,\
                                                                    H1atoms_train, y_trainH1, test_size=0.2, random_state=42)
aevs_train_H2, aevs_val_H2, atoms_trainH2, atoms_valH2, outputs_train_H2, outputs_val_H2 = train_test_split(H2train,\
                                                                    H2atoms_train, y_trainH2, test_size=0.2, random_state=42)
aevs_train_H3, aevs_val_H3, atoms_trainH3, atoms_valH3, outputs_train_H3, outputs_val_H3 = train_test_split(H3train,\
                                                                    H3atoms_train, y_trainH3, test_size=0.2, random_state=42)
aevs_train_H4, aevs_val_H4, atoms_trainH4, atoms_valH4, outputs_train_H4, outputs_val_H4 = train_test_split(H4train,\
                                                                    H4atoms_train, y_trainH4, test_size=0.2, random_state=42)
# trainer_C = Trainer2(C_network, optimizer_type, learning_rate, epoch, batch_size)
# trainer_H1 = Trainer2(H_network, optimizer_type, learning_rate, epoch, batch_size)
# trainer_H2 = Trainer2(H_network, optimizer_type, learning_rate, epoch, batch_size)
# trainer_H3 = Trainer2(H_network, optimizer_type, learning_rate, epoch, batch_size)
# trainer_H4 = Trainer2(H_network, optimizer_type, learning_rate, epoch, batch_size)


In [509]:
# training without trainer class 
aevs_traintens = torch.tensor(aevs_train_C, dtype=torch.float) 
atoms_traintens = torch.tensor(atoms_trainC, dtype=torch.float)
outputs_traintens= torch.tensor(outputs_train_C, dtype=torch.float)
aevs_valtens = torch.tensor(aevs_val_C, dtype=torch.float)
atoms_valtens = torch.tensor(atoms_valC, dtype=torch.float)
outputs_valtens = torch.tensor(outputs_val_C, dtype=torch.float)

model = ANIv2(aev_dim=384, sub_nets=sub_nets) 
loss_fn = nn.MSELoss() 
optimizer = optimizer = Adam(model.parameters(), lr=0.001)
for epoch in range(15): 
    model.train() 
    optimizer.zero_grad() 
    predictions = model(aevs_traintens, atoms_traintens)
    loss = loss_fn(predictions, outputs_traintens) 
    loss.backward()
    optimizer.step() 
    model.eval() 
    with torch.no_grad(): 
        val_predictions = model(aevs_valtens, atoms_valtens)
        val_loss = loss_fn(val_predictions, outputs_valtens) 

  return F.mse_loss(input, target, reduction=self.reduction)


In [None]:
# results_C = trainer_C.train(aevs_train_C, atoms_trainC, outputs_train_C, aevs_val_C,atoms_valC, outputs_val_C)
# results_H1 = trainer_H1.train(aevs_train_H1, atoms_trainH1, outputs_train_H1, aevs_val_H1, atoms_valH1, outputs_val_H1)
# results_H2 = trainer_H2.train(aevs_train_H2, atoms_trainH2, outputs_train_H2, aevs_val_H2, atoms_valH2, outputs_val_H2)
# results_H3 = trainer_H3.train(aevs_train_H3, atoms_trainH3, outputs_train_H3, aevs_val_H3, atoms_valH3, outputs_val_H3)
# results_H4 = trainer_H4.train(aevs_train_H4, atoms_trainH4, outputs_train_H4, aevs_val_H4, atoms_valH4, outputs_val_H4)
# aevs, atom_types, outputs, val_aevs, val_atom_types, val_outputs,\
#               early_stop=False, l2=False, silent=False):