In [1]:
%cd ../

e:\python\atomization-energy-regression


In [4]:
from src.data_processing.molecule import get_molecule_name 
import scipy.io
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_absolute_error

In [5]:
data = scipy.io.loadmat('dataset\qm7.mat')
X = data['X'] # Coulomb matrices -> 7165 x 23 x 23
T = data['T'].T.squeeze() # atomization energies Y -> 7165 x 1
Z = data['Z'] # atomic charge -> 7165 x 23
R = data['R'] # cartesian coordinates -> 7165 x 23 x 3
data_train, data_test = {}, {}
molecule_name = get_molecule_name(data)


In [6]:
def feature_engineer(data):
    X = data['X'] # Coulomb matrices -> 7165 x 23 x 23
    T = data['T'].T.squeeze() # atomization energies Y -> 7165 x 1
    Z = data['Z'] # atomic charge -> 7165 x 23
    R = data['R'] # cartesian coordinates -> 7165 x 23 x 3
    data_train, data_test = {}, {}
    molecule_name = get_molecule_name(data)
    
    
    y = np.transpose(T)
    y_scaling_factor = np.max(np.absolute(y))
    y_scaled = y / y_scaling_factor
   
    features_vector = []
    for (x,z,r) in zip(X,Z,R):
        sorted_idx = np.argsort(np.linalg.norm(x, axis=1)) 
        sorted_coulomb_mat = x[sorted_idx, :]  # Sort rows
        sorted_coulomb_mat.sort(axis=1)
        order_x = sorted_coulomb_mat
        features_vector.append(np.concatenate((np.linalg.eigvals(x), list(nx.degree_centrality(nx.from_numpy_matrix(x)).values()), order_x.flatten(),z, r.mean(axis=0), r.std(axis=0))))
        # print(features_vector[-1].shape)
    return features_vector, y_scaled, y_scaling_factor

X, Y, min_max_scaler  = feature_engineer(data)
X = np.asarray(X)
Y = np.asarray(Y)

In [71]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR
from src.solver import make_optimizer, make_scheduler
# import List
class MLP(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super(MLP, self).__init__()
        print(input_size)
        self.fc = nn.Sequential()
        self.fc.add_module("fc0", nn.Linear(input_size, hidden_sizes[0]))
        for idx, (in_size, out_size) in enumerate(zip(hidden_sizes[:-1], hidden_sizes[1:])):
            self.fc.add_module("fc{}".format(idx+1), nn.Linear(in_size, out_size))
            self.fc.add_module("relu{}".format(idx+1), nn.ReLU())

        self.sigmoid = nn.Sigmoid()
        self.fc3 = nn.Linear(hidden_sizes[-1], 1)

    def forward(self, x):
        x = self.fc(x)
        x = self.sigmoid(x)
        x = self.fc3(x)
        return x

In [None]:
cfg = {
   'OPTIMIZER_NAME': 'SGD',
   'MOMENTUM': 0.9,
   'BASE_LR': 0.0001,
   'WEIGHT_DECAY': 1e-4,
   'BIAS_LR_FACTOR': 2,
   'WEIGHT_DECAY_BIAS': 1e-4,
   'WARMUP_METHOD': 'CosineAnnealing',

}  

#   IMS_PER_BATCH: 64
#   STEPS: [2, 4]
#   GAMMA: 0.
#   WARMUP_FACTOR: 0.01
#   WARMUP_EPOCHS: 5
#   LARGE_FC_LR: False
#   CHECKPOINT_PERIOD: 70
#   LOG_PERIOD: 200
#   EVAL_PERIOD: 5
#   WEIGHT_DECAY:  1e-4
#   WEIGHT_DECAY_BIAS: 1e-4
#   BIAS_LR_FACTOR: 2
#   FP16_ENABLED: True
#   WARMUP_METHOD: CosineAnnealing
#   SEED: 507
#   IMS_PER_BATCH: 96
#   EVAL_PERIOD: 1

In [None]:
NUM_FEATURES = X.shape[1]
print(NUM_FEATURES)
learning_rate = 0.001
num_epochs = 500
batch_size = 100

model = MLP(NUM_FEATURES, [512,256,64,32])
criterion = nn.MSELoss()
# optimizer = optim.AdamW(model.parameters(), lr=learning_rate,)
# scheduler = StepLR(optimizer, step_size=25, gamma=0.1)
optimizer = make_scheduler(cfg, model)
scheduler = make_scheduler(cfg, optimizer)  
from torch.utils.data import DataLoader
split = data['P'][0]
mask = np.zeros(Y.size, dtype=bool)
mask[split] = True

X_train = X[~mask]
y_train = Y[~mask]
X_test = X[mask]
y_test = Y[mask]
X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
y_train = y_train.view(-1, 1)
y_test = y_test.view(-1, 1)
print(X_train.shape)
train_loader = DataLoader(dataset=list(zip(X_train, y_train)), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=list(zip(X_test, y_test)), batch_size=batch_size, shuffle=False)
# Train the model
check = {}
for epoch in range(num_epochs):
    for i, (feature, labels) in enumerate(train_loader):
        # Forward pass
        outputs = model(feature)
        # print(outputs.shape, labels.shape)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()    
        
        # Print progress
        # if (i + 1) % 20 == 0:
        #     print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
    # validate
    val_loss = 10000
    with torch.no_grad():
        error = []
        for i, (feature, labels) in enumerate(test_loader):
            outputs = model(feature)
            MAE = nn.L1Loss()
            error.append(MAE(outputs, labels))
        print("Validation: ", np.mean(error))
        val_loss = torch.mean(torch.stack(error))
        print(f'Validation: Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(test_loader)}], Loss: {np.mean(error):.4f}')
    
   
    scheduler.step()
    current_lr = optimizer.param_groups[0]['lr']
    print("Learning rate: ", current_lr)