# Lab 3 Take Home Assignment - Molecule Generation with Normalizing Flows

## What are Normalizing Flows:
Normalizing Flows constitute a generative model that uses invertible neural networks to build a probability distribution. This methodology enables the computation of likelihoods and the generation of samples by transforming simple base distributions (e.g., gaussian) into more complex ones through a sequence of reversible transformations. The precision and control afforded by Normalizing Flows make them particularly advantageous for generating molecules with highly specific attributes.

Here we will be using [SMILES](https://www.cs.tufts.edu/comp/150CSB/refs/1987%20%20SMILES,%20a%20chemical%20language%20and%20information%20system.%201.%20Introduction%20to%20methodology%20and%20encoding%20rules.pdf) [1] strings to represent molecules. SMILES (Simplified Molecular Input Line Entry System) is a string notation for representing molecules and reactions. It is a human-readable and compact way to represent a chemical structure.

We will also be using [SELFIES](https://arxiv.org/abs/1905.13741) (SELF-referencIng Embedded Strings) [2] as the base distribution for our Normalizing Flow. It is a syntax that allows the representation of a molecule as a string. SELFIES is a more robust and flexible alternative to SMILES. It is a fully fledged molecular string representation that can be used to represent any molecule.

We will be using the [RDKit](https://www.rdkit.org/) library to work with SMILES strings. RDKit is a collection of cheminformatics and machine learning tools that can be used to work with chemical data. It is a powerful library that can be used to work with SMILES strings, molecular fingerprints, and molecular descriptors.

In this assignment, You will have to code up the 2 main components of a normalizing flow:
1. The Affine transformation
2. The Normalizing flow

You will also have to implement the training loop for the normalizing flow.

The rest of the code is provided to you. You will have to fill in the missing parts of the code.

## Imports and Installs

(yes there are a lot of things we are importing, you can safely ignore all of these as well as any warnings that might pop up with it)

### Install packages that are not present on Colab

In [None]:
!pip install --pre deepchem

In [None]:
!pip install selfies

### Imports

In [None]:
import os
from typing import Sequence, Tuple

In [None]:
import torch
from torch import nn
from torch.distributions.multivariate_normal import MultivariateNormal

torch.manual_seed(21) # Setting Seed for reproducability

In [None]:
# Typical Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tqdm
import seaborn as sns


We will be using deepchem to download and handle our dataset

In [None]:
import deepchem as dc
from deepchem.models.optimizers import Adam
from deepchem.data import NumpyDataset
from deepchem.splits import RandomSplitter
from deepchem.molnet import load_qm7

rdkit is used to handle molecular visualizations and to check molecule validity

In [None]:
import rdkit
from rdkit import Chem
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import Draw
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.*')  # suppress error messages

We use IPython Display to display molecules

In [None]:
from IPython.display import Image, display

We are using selfies to convert a string represented molecule to an object that can be used to train the model

In [None]:
import selfies as sf



# Normalizing Flow

### Affine Transformation
An affine transformation is one of the simplest yet powerful transformations used in normalizing flows. It applies a linear transformation (scale and shift) to the data, which can alter the distribution in a way that increases its complexity. The affine transformation is defined as:

$y=x⋅e^{a}+b$

where $x$ is the input, $y$ is the transformed output, $a$ (scale) and $b$ (shift) are the parameters of the transformation.

For transformations in normalizing flows to be useful, they must be bijective (invertible), allowing us to compute both forward and backward transformations. Additionally, the computation of the log-Jacobian determinant is crucial for calculating the change in log-density under the transformation, which is necessary for training these models using maximum likelihood.

### Task 1: Implement the Affine Transformation
Your task is to implement the `Affine` class in PyTorch, which performs an affine transformation as part of a normalizing flow. The class should include both the forward and inverse transformations, as well as the computation of the log-Jacobian determinant for both directions.

**Requirements**
1. Forward Transformation: Implement the forward method to transform an input tensor $x$ using the formula $y=x⋅e^{a}+b$. Also, compute the log-Jacobian determinant of this transformation.
2. Inverse Transformation: Implement the inverse method to reverse the transformation, computing 
$x$ from $y$ using the formula $x=(y−b)/e^{a}$, and calculate the log-Jacobian determinant for the inverse transformation.

In [None]:
class Affine(nn.Module):
  """Class which performs the Affine transformation.

  This transformation is based on the affinity of the base distribution with
  the target distribution. A geometric transformation is applied where
  the parameters performs changes on the scale and shift of a function
  (inputs).

  Normalizing Flow transformations must be bijective in order to compute
  the logarithm of jacobian's determinant. For this reason, transformations
  must perform a forward and inverse pass.

  """

  def __init__(self, dim: int) -> None:
    """Create a Affine transform layer.

    Parameters
    ----------
    dim: int
      Value of the Nth dimension of the dataset.

    """

    super().__init__()
    self.dim = dim
    self.scale = nn.Parameter(torch.zeros(self.dim))
    self.shift = nn.Parameter(torch.zeros(self.dim))

  def forward(self, x: Sequence) -> Tuple[torch.Tensor, torch.Tensor]:
    """Performs a transformation between two different distributions. This
    particular transformation represents the following function:
    y = x * exp(a) + b, where a is scale parameter and b performs a shift.
    This class also returns the logarithm of the jacobians determinant
    which is useful when invert a transformation and compute the
    probability of the transformation.

    Parameters
    ----------
    x : Sequence
      Tensor sample with the initial distribution data which will pass into
      the normalizing flow algorithm.

    Returns
    -------
    y : torch.Tensor
      Transformed tensor according to Affine layer with the shape of 'x'.
    log_det_jacobian : torch.Tensor
      Tensor which represents the info about the deviation of the initial
      and target distribution.

    """
    ##############CODE STARTS HERE################
    y = 
    det_jacobian = 
    log_det_jacobian = 
    ##############CODE ENDS HERE################
    return y, log_det_jacobian

  def inverse(self, y: Sequence) -> Tuple[torch.Tensor, torch.Tensor]:
    """Performs a transformation between two different distributions.
    This transformation represents the bacward pass of the function
    mention before. Its mathematical representation is x = (y - b) / exp(a)
    , where "a" is scale parameter and "b" performs a shift. This class
    also returns the logarithm of the jacobians determinant which is
    useful when invert a transformation and compute the probability of
    the transformation.

    Parameters
    ----------
    y : Sequence
      Tensor sample with transformed distribution data which will be used in
      the normalizing algorithm inverse pass.

    Returns
    -------
    x : torch.Tensor
      Transformed tensor according to Affine layer with the shape of 'y'.
    inverse_log_det_jacobian : torch.Tensor
      Tensor which represents the information of the deviation of the initial
      and target distribution.

    """
    ##############CODE STARTS HERE################
    x = 
    det_jacobian = 
    inverse_log_det_jacobian = 
    ##############CODE ENDS HERE################
    return x, inverse_log_det_jacobian


### Normalizing Flow
Normalizing flows model complex probability distributions through a sequence of invertible transformations. Starting from a simple distribution, such as a Multivariate Normal, each step transforms the distribution in a way that becomes more complex and can better approximate the target distribution. The key feature of normalizing flows is their invertibility and the tractability of their Jacobian determinant, allowing for exact likelihood computation.

### Task2 : Implementing the NormalizingFlow Class
Your task is to implement the NormalizingFlow class, which will encapsulate the process of transforming a base distribution into a complex one through a sequence of transformations. This class should be able to compute the log probability of given inputs under the transformed distribution and generate samples from the transformed distribution.

**Requirements**
1. Log Probability Calculation: Implement the log_probability method to compute the log probability of inputs under the transformed distribution.
2. Sampling: Implement the sample method to generate samples from the transformed distribution and compute their log probabilities.

In [None]:

class NormalizingFlow(nn.Module):
  """Normalizing flows are widley used to perform generative models.
  This algorithm gives advantages over variational autoencoders (VAE) because
  of ease in sampling by applying invertible transformations
  (Frey, Gadepally, & Ramsundar, 2022).

  """

  def __init__(self, transform: Sequence, base_distribution, dim: int) -> None:
    """This class considers a transformation, or a composition of transformations
    functions (layers), between a base distribution and a target distribution.

    Parameters
    ----------
    transform: Sequence
      Bijective transformation/transformations which are considered the layers
      of a Normalizing Flow model.
    base_distribution: torch.Tensor
      Probability distribution to initialize the algorithm. The Multivariate Normal
      distribution is mainly used for this parameter.
    dim: int
      Value of the Nth dimension of the dataset.

    """
    super().__init__()
    self.dim = dim
    self.transforms = nn.ModuleList(transform)
    self.base_distribution = base_distribution

  def log_probability(self, inputs: torch.Tensor) -> torch.Tensor:
    """This method computes the probability of the inputs when
    transformation/transformations are applied.

    Parameters
    ----------
    inputs: torch.Tensor
      Tensor used to evaluate the log_prob computation of the learned
      distribution.
      shape: (samples, dim)

    Returns
    -------
    log_prob: torch.Tensor
      This tensor contains the value of the log probability computed.
      shape: (samples)

    """
    ##############CODE STARTS HERE################
    log_prob = # Initialize log_prob 
    for biject in reversed(self.transforms):
      inputs, inverse_log_det_jacobian = 
      log_prob =

    log_prob = 
    ##############CODE ENDS HERE################
    return log_prob

  def sample(self, n_samples: int) -> Tuple[torch.Tensor, torch.Tensor]:
    """Performs a sampling from the transformed distribution.
    Besides the outputs (sampling), this method returns the logarithm of
    probability to obtain the outputs at the base distribution.

    Parameters
    ----------
    n_samples: int
      Number of samples to select from the transformed distribution

    Returns
    -------
    sample: tuple
      This tuple contains a two torch.Tensor objects. The first represents
      a sampling of the learned distribution when transformations had been
      applied. The secong torc.Tensor is the computation of log probabilities
      of the transformed distribution.
      shape: ((samples, dim), (samples))

    """
    ##############CODE STARTS HERE################
    outputs = 
    log_prob = 

    for biject in self.transforms:
      outputs, log_det_jacobian = 
      log_prob = 
    ##############CODE ENDS HERE################
    return outputs, log_prob

# Train Function

### Task 3: Implement the train function
Your task is to implement the `train` function for the `NormalizingFlow` model. This function will perform gradient descent using the Adam optimizer to minimize the negative log-likelihood of the data. The training loop should include the following steps:
1. Compute the log probability of the training data under the transformed distribution.
2. Compute the negative log likelihood loss.
3. Compute the gradients and update the model parameters.



In [None]:
def train(model, data, epochs = 100, batch_size = 64):
    train_loader = torch.utils.data.DataLoader(data, batch_size=batch_size)
    optimizer = torch.optim.Adam(model.parameters())

    losses = []
    with tqdm.tqdm(range(epochs), unit=' Epoch') as tepoch:
        epoch_loss = 0
        for epoch in tepoch:
            for batch_index, training_sample in enumerate(train_loader):
                ##############CODE STARTS HERE################
                log_prob = 

                loss = 


                # ~ 3 lines of code to perform backpropagation

                epoch_loss = 
                ##############CODE ENDS HERE################
            epoch_loss /= len(train_loader)
            losses.append(np.copy(epoch_loss.detach().numpy()))
            tepoch.set_postfix(loss=epoch_loss.detach().numpy())
    return model, losses

## Training

Downloading the dataset

In [None]:
# Download from MolNet
tasks, datasets, transformers = dc.molnet.load_qm7(featurizer='ECFP')
df = pd.DataFrame(data={'smiles': datasets[0].ids})

Sampling from the dataset for training on colab

In [None]:
data = df[['smiles']].sample(2500, random_state=42)

Setup SELFIES and Helper Functions (can be safely ignored)

In [None]:
sf.set_semantic_constraints()  # reset constraints
constraints = sf.get_semantic_constraints()
constraints['?'] = 3

sf.set_semantic_constraints(constraints)

In [None]:
def preprocess_smiles(smiles):
  return sf.encoder(smiles)

def keys_int(symbol_to_int):
  d={}
  i=0
  for key in symbol_to_int.keys():
    d[i]=key
    i+=1
  return d

Adding selfies and the length of the smiles to the dataframe

In [None]:
data['selfies'] = data['smiles'].apply(preprocess_smiles)
data['len'] = data['smiles'].apply(lambda x: len(x))

In [None]:
data.sort_values(by='len').head()

In [None]:
selfies_list = np.asanyarray(data.selfies) # list of the selfies
selfies_alphabet = sf.get_alphabet_from_selfies(selfies_list) # Get the alphabet from the list of selfies
selfies_alphabet.add('[nop]')  # Add the "no operation" symbol as a padding character
selfies_alphabet.add('.') 
selfies_alphabet = list(sorted(selfies_alphabet)) # Sort the alphabet
largest_selfie_len = max(sf.len_selfies(s) for s in selfies_list) # Get the length of the largest selfies
symbol_to_int = dict((c, i) for i, c in enumerate(selfies_alphabet)) # Create a symbol to integer mapping
int_mol=keys_int(symbol_to_int) # Create a integer to symbol mapping
print(largest_selfie_len) # Print the length of the largest selfies

Converting the SELFIES to a tensor by one hot encoding

In [None]:
onehots=sf.batch_selfies_to_flat_hot(selfies_list, symbol_to_int,largest_selfie_len)
input_tensor = torch.tensor(onehots, dtype=torch.float64)

Creating Noise Tensor and Dequantized Tensor - Dequantization is a technique used to improve the quality of the generated samples. It involves adding noise to the input data to make the model more robust to small perturbations.

In [None]:
noise_tensor = torch.rand(input_tensor.shape, dtype=torch.float64)
dequantized_data = torch.add(input_tensor, noise_tensor)

Selecting only the first 35 columns of the one hot encoded tensor for sped up training

In [None]:
dequantized_data = dequantized_data[:, :35]

Creating the Dataset and splitting the data into training and validation sets

In [None]:
ds = NumpyDataset(dequantized_data)  # Create a DeepChem dataset
splitter = RandomSplitter()
train_s, val, test = splitter.train_valid_test_split(dataset=ds, seed=42)
train_idx, val_idx, test_idx = splitter.split(dataset=ds, seed=42)

dim = len(train_s.X[0])  # length of one-hot encoded vectors
train_s.X.shape  # 2000 samples,

In [None]:
# SMILES strings of training data
train_smiles = data['smiles'].iloc[train_idx].values

### Train the model

In [None]:
transforms = [Affine(dim)]
distribution = MultivariateNormal(torch.zeros(dim), torch.eye(dim))

model = NormalizingFlow(transforms, distribution, dim)
model, loss = train(model, dequantized_data, epochs = 500)
# plot_density(model, data)

### Sample Molecules

In [None]:
generated_samples, _ = model.sample(10)
mols = torch.floor(generated_samples)
mols = torch.clamp(mols, 0, 1)
mols_list = mols.detach().numpy().tolist()

In [None]:
import itertools

d = dict(itertools.islice(int_mol.items(), 35))
mols=sf.batch_flat_hot_to_selfies(mols_list, d)

## Checking % of valid molecules

Here we will check the percentage of valid molecules in the generated samples

First, we have to convert the molecules from the selfies representation to the SMILES representation, then we will use RDKit to check if the molecule is valid or not.

In [None]:
valid_count = 0
valid_selfies, invalid_selfies = [], []
for idx, selfies in enumerate(mols):
  try:
    if Chem.MolFromSmiles(sf.decoder(mols[idx]), sanitize=True) is not None:
        valid_count += 1
        valid_selfies.append(selfies)
    else:
      invalid_selfies.append(selfies)
  except Exception:
    pass
print('%.2f' % (valid_count / len(mols)),  ' of generated samples are valid molecules.')

In [None]:
valid_smiles = [sf.decoder(vs) for vs in valid_selfies]
gen_mols = [Chem.MolFromSmiles(sf.decoder(vs)) for vs in valid_selfies]

## Helper Functions to Visualize Molecules

In [None]:
def display_images(filenames):
    """Helper to pretty-print images."""
    for file in filenames:
      display(Image(file))

def mols_to_pngs(mols, basename="generated_mol"):
    """Helper to write RDKit mols to png files."""
    filenames = []
    for i, mol in enumerate(mols):
        filename = "%s%d.png" % (basename, i)
        Draw.MolToFile(mol, filename)
        filenames.append(filename)
    return filenames

## Visualizing the Results

In [None]:
display_mols = []
for i in range(10):
  display_mols.append(gen_mols[i])

display_images(mols_to_pngs(display_mols))

## References:

[1] Weininger, D. (1988). SMILES, a chemical language and information system. 1. Introduction to methodology and encoding rules. Journal of chemical information and computer sciences, 28(1), 31-36.

[2] Frey, N. C., Gadepally, V., & Ramsundar, B. (2022). Fastflows: Flow-based models for molecular graph generation. arXiv preprint arXiv:2201.12419.

[3] Madhawa, K., Ishiguro, K., Nakago, K., & Abe, M. (2019). Graphnvp: An invertible flow model for generating molecular graphs. arXiv preprint arXiv:1905.11600.


This Lab was made by [Shreyas V](https://github.com/shreyasvinaya/)