# Technical assignment for LAMAlab candidates

## Loading data and relevant imports

In [10]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
with open("data.json", "r") as f:
    data = json.load(f)
y = np.array(data["y"])
X = [np.array(point_cloud) for point_cloud in data["X"]]
assert len(X) == len(y)
assert isinstance(X,  list)


## Preliminary considerations

* The goal is to predict a scalar quantity (energy) of a 3D point cloud (xyz positions)
* This scalar quanitity is invariant to the translation and rotations of 3D point cloud, and it should be permutationally invariant with respect to the order of points inside the cloud.
* The points are not annotated: the only feature each point has is it's position
* We will assume the energy depends on the interaction between points, which depends on their relative positions, but we don't assume anything about the nature of the interactions (one-body, two-body, 3-body terms etc.)


### A general discussion how to incorporate invariances into the model
1. The simplest approach is to train a model that doesn't have any invariances encoded in its architecture. Instead, we augment the training data with rotated, translated and permutated examples from the original dataset and hope that the model will learn these invariances. However, this approach fails for multiple reasons:
    * This is highly inpracical, as there are infinitely many rotations and translations that can be used, and n! permutations among n points
    * The model will only be approximately invariant, even if we could augment the dataset with all possible transformations, because the training never converges to the true global minimum
    * Even if we could find the global minimum, most of the models can't be rotationally invariant due to their architecture

2. Transform the data based on some heuristic before passing it to the model. For instance, we could always translate the point cloud so that the geometric center is located at the origin of coordinate system. However, it's not clear how to do the similar thing for rotation. Some models are proposed to solve this task, either as self-supervised models or as a part of the larger architecture.
    * The advantage of this approach is the flexibility and expressivity of the model can be better if the model is not contrained by invariances.
    * Downsides are the artifacts model can learn based on the pretransforming heuristic and there is no clear way to achive true rotational invariance.

3. Transform the features into rotationally and translationally invariant features. In this example, we don't have any features other than the positions themselves. Therefore, instead of having the matrix of points positions, we can work with pairwase distance matrix, which is rotationally and translationaly invariant. However, we lose some infomation about angles using this approach, because mapping from the positions to pairwase distances is not unique. It might be benefitial to have more than one scalar value per atom pair instead. In this case, we can expand the distance inside a basis set. A popular choice is set of radial basis functions. i-th element of this encoding is defined as:
$$
e_i = exp(-(r-d_i)^2/ \alpha)
$$
where $\alpha$ is a hyperparameter and $d_i$ is a distance on a predefined grid.

4. To address permutational invarinace, it's enough to use some reduction along points axis, like a sum, mean, max. Some learnable permutationally invariant function can also work. This is the core idea behind graph neural networks, where permutational invariance is achieved using permutationally invariant aggregation function.

5. Another approach is to use equivariant layers in the model architecture, followed by reduction that makes it invariant. This is how the current SOTA models are deisgned. In short, we expand the point clouds in spherical harmonics basis, and then the layers are complex matematical functions that keep rotational and translational eqivariance. Output from those layers are vectors equivariant to rotation and translation. We can later reduce the vectors for each point into a single vector, making the whole model invariant to rotation, translation and permutation. This approach also works well for predicting energies, as it might be desirable to get the contribution of each point towards the results. A natural choice in case of energy is to use sum.

6. Finally, we can use already existing well-tested models like SchNet, PaiNN or similar models, without the need to implement any layer ourselves. These models are still based on the principles discussed above.

In [11]:
from lightning.pytorch import Trainer, LightningDataModule, LightningModule
from torch.utils.data import Dataset, DataLoader
import torch
class list_dataset(Dataset):
    def __init__(self, data, targets) -> None:
        super().__init__()
        self.data = [torch.tensor(example, dtype=torch.float32) for example in data]
        self.targets = torch.tensor(targets, dtype=torch.float32).view(-1,1)
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index) -> tuple:
        return (self.data[index], self.targets[index])

### Distance matrix

In [12]:
from scipy.spatial import distance_matrix
from scipy.spatial.transform import Rotation as R
def from_coords_to_distances(r: np.ndarray):
    return distance_matrix(r,r)

In [13]:
example = X[0]
rotation = R.from_euler("xyz",angles=[45,46,7],degrees=True)
rotated_example = rotation.apply(X[0])
translated_example = example + np.array([5,6,7])

In [14]:
# Check if the average pairwise distance is rotationally invariant
np.allclose(from_coords_to_distances(example), from_coords_to_distances(rotated_example))

True

In [15]:
# Check if the average pairwise distance is translationally invariant
np.allclose(from_coords_to_distances(example), from_coords_to_distances(translated_example))

True

However, we still didn't achieve permutational invariance. In order to do that, we can do a simple reduction over point axis:

In [16]:
rng = np.random.default_rng()
permuted_example = example[rng.permutation(example.shape[0]),:]
print(np.sum(from_coords_to_distances(example),axis=(0,1)))
print(np.sum(from_coords_to_distances(permuted_example),axis=(0,1)))

367.12981471716637
367.12981471716637


In order to achieve rotational invariance, we had to lower our input feature dimensions from NxNx3 to NxN. To get permutational invariance we had to lower the dimensionality to 1. However, trying to predict a scalar from a single feature is not going to work, so we have to move to GNN framework to get permutational invariance and to use more sofisticated ways to get rotationally invariant atomic (or edge) features. It's time to move to more advanced atomic descriptors that are rotationally and translationally invariant.
### SOAP descriptor

In [17]:
from dscribe.descriptors import SOAP
from ase import Atoms
species = ["H"]
r_cut = 6.0 # distance cutoff
n_max = 8 # number of radial basis functions
l_max = 6 # number of spherical harmonics

# Setting up the SOAP descriptor
soap = SOAP(
    species=species,
    periodic=False,
    r_cut=r_cut,
    n_max=n_max,
    l_max=l_max,
)
dim = soap.get_number_of_features()


In [18]:
# Convert our point cloud to ase atoms object
def cloud_to_ase(x: np.ndarray)-> Atoms:
    return Atoms(symbols=["H"] * x.shape[0],positions=x)

X_soap = soap.create([cloud_to_ase(x) for x in X ], n_jobs=7)

In [28]:
example = soap.create(cloud_to_ase(X[0]))
rotation = R.from_euler("xyz",angles=[45,46,7],degrees=True)
rotated_example = soap.create(cloud_to_ase(rotation.apply(X[0]))) 
translated_example = soap.create(cloud_to_ase(X[0] + np.array([5,6,7])))

In [31]:
# check for rotational invariance
np.allclose(example,rotated_example)

array([[ 1.70896060e-02,  8.40539773e-02,  2.06599738e-01, ...,
         3.76073734e-01, -3.11124842e-01,  2.58194933e-01],
       [ 1.71178016e-02,  8.40629731e-02,  2.06935492e-01, ...,
         3.03293363e-01, -2.47287011e-01,  2.02978732e-01],
       [ 1.73409536e-02,  8.41256937e-02,  2.10212706e-01, ...,
         9.91709154e-11, -3.88799513e-11,  1.52428825e-11],
       [ 1.73427907e-02,  8.41256108e-02,  2.10243665e-01, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [ 1.72801489e-02,  8.41297401e-02,  2.09158197e-01, ...,
         3.20626788e-05, -1.46836198e-05,  6.72460494e-06],
       [ 1.70725476e-02,  8.40572568e-02,  2.06143088e-01, ...,
         1.96111897e-01, -1.41992399e-01,  1.03082929e-01]])

In [19]:
class Datamodule(LightningDataModule):
    def __init__(self, X: list[np.ndarray],y: np.ndarray, batch_size: int = 1) -> None:
        super().__init__()
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X,y, test_size=0.1, shuffle=True, random_state=42)
        self.batch_size = batch_size
    def setup(self, stage: str) -> None:
        self.train_data = list_dataset(self.X_train, self.y_train)
        self.test_data = list_dataset(self.X_test, self.y_test)

    def train_dataloader(self) -> DataLoader:
        return DataLoader(self.train_data, batch_size=self.batch_size, num_workers=7, shuffle=True)
    
    def test_dataloader(self) -> DataLoader:
        return DataLoader(self.test_data, batch_size=self.batch_size, num_workers=7)

In [20]:
from typing import Any
from torch import nn
import torch.nn.functional as F
class SOAP_FFN(LightningModule):
    def __init__(self, dim: int = 252, hidden_size: int = 256, num_hidden: int = 3, lr: float = 3e-4) -> None:
        super().__init__()
        layers = [nn.Linear(dim,hidden_size), nn.ReLU()]
        for _ in range(num_hidden):
            layers.extend([nn.Linear(hidden_size,hidden_size), nn.ReLU()])
        layers.append(nn.Linear(hidden_size,1))
        self.ffn = nn.Sequential(*layers)
        self.lr = lr
    def forward(self, x: torch.Tensor) -> Any:
        return torch.sum(self.ffn(x),axis=1)
    
    def training_step(self, batch) -> Any:
        x, y = batch
        y_hat = self.forward(x)
        loss = F.mse_loss(y_hat.squeeze(),y.squeeze())
        self.log("train_loss", loss, prog_bar=True)
        return loss
    
    def test_step(self, batch) -> Any:
        x, y = batch
        y_hat = self.forward(x)
        test_loss = F.mse_loss(y_hat.squeeze(),y.squeeze())
        self.log("test_loss", test_loss, prog_bar=True,on_epoch=True)

    def configure_optimizers(self) -> torch.optim.Optimizer:
        adam = torch.optim.Adam(self.parameters(),lr= self.lr)
        return adam

In [21]:
trainer = Trainer(max_epochs=5)
model = SOAP_FFN(dim,hidden_size=256,num_hidden=2)
datamodule = Datamodule(X_soap,y)
trainer.fit(model,datamodule)
trainer.test(model,datamodule)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
2024-04-26 19:31:36.128279: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-26 19:31:36.128381: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-26 19:31:36.165913: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-26 19:31:36.255827: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instruc

Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss': 0.03297505900263786}]

### Geometric deep learning

The downside of any atomic descriptor is that it's not trainable. Some problems might require different transformations of xyz to invariant coordinates in order to achieve good performance with less parameters. Here I will use SchNet model, the model used as the baseline to compare performance of any new geometric NN for quantum chemistry.

In [22]:
from typing import Any
from numpy import ndarray
from torch_geometric.nn.models import SchNet
from torch_geometric.loader import DataLoader as PyGDataLoader
from torch_geometric.data import Data
class SchNetModule(LightningModule):
    def __init__(self,lr, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.model = SchNet(*args)
        self.lr = lr

    def forward(self, batch) -> Any:
        atom_nums = torch.ones(batch.x.shape[0], dtype=torch.int32)
        return self.model.forward(atom_nums,batch.x,batch.batch)
    
    def training_step(self, batch) -> Any:
        y_hat = self.forward(batch)
        loss = F.mse_loss(y_hat.squeeze(),batch.y.squeeze())
        self.log("train_loss", loss, prog_bar=True)
        return loss
    
    def test_step(self, batch) -> Any:
        y_hat = self.forward(batch)
        test_loss = F.mse_loss(y_hat.squeeze(),batch.y.squeeze())
        self.log("test_loss", test_loss, prog_bar=True,on_epoch=True)

    def configure_optimizers(self) -> torch.optim.Optimizer:
        adam = torch.optim.Adam(self.parameters(),lr= self.lr)
        return adam
    
class PyGDataModule(LightningDataModule):
    def __init__(self, X: list[np.ndarray],y: np.ndarray, batch_size: int = 1) -> None:
        super().__init__()
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X,y, test_size=0.1, shuffle=True, random_state=42)
        self.batch_size = batch_size
    def setup(self, stage: str) -> None:
        self.train_data = [Data(x=torch.tensor(x,dtype=torch.float32),y=torch.tensor(y,dtype=torch.float32)) for x,y in zip(self.X_train, self.y_train)]
        self.test_data = [Data(x=torch.tensor(x,dtype=torch.float32),y=torch.tensor(y,dtype=torch.float32)) for x,y in zip(self.X_test, self.y_test)]
    def train_dataloader(self) -> PyGDataLoader:
        return PyGDataLoader(self.train_data, batch_size=self.batch_size, num_workers=1, shuffle=True)
    
    def test_dataloader(self) -> PyGDataLoader:
        return PyGDataLoader(self.test_data, batch_size=self.batch_size, num_workers=1)


In [23]:
trainer = Trainer(max_epochs=5)
model = SchNetModule(lr=3e-4)
datamodule = PyGDataModule(X, y, batch_size=4)
trainer.fit(model,datamodule)
trainer.test(model,datamodule)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs



  | Name  | Type   | Params
---------------------------------
0 | model | SchNet | 455 K 
---------------------------------
455 K     Trainable params
0         Non-trainable params
455 K     Total params
1.823     Total estimated model params size (MB)
/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.
/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/utilities/data.py:77: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 53. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.
/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/utilities/data.py:77: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 69. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.
/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/utilities/data.py:77: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 47. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.
/home/zarko/miniconda3/envs/THESIS/lib/python3.11/site-packages/lightning/pytorch/utilities/data.py:77: Trying to infer the `batch_size` from an ambiguous collection. The batch size we

[{'test_loss': 0.01617174781858921}]