# Conformer Generation Pipeline

## **Step 1: Input Smile and Reference Conformer**

In [None]:
import time

# Capture the start time
start_time: float = time.time()

In [None]:
import sys

# Add a folder to Python's import path
sys.path.append('/app')

In [None]:
from atk_conformer_generation_pipeline.utils import *
from atk_conformer_generation_pipeline.variables import *
import os
import glob
import re
import subprocess
from termcolor import colored

In [None]:
#Change the dir to /work
os.chdir("/work")
!pwd

**Change the below variables accordingly**

In [None]:
os.makedirs(output_dir, exist_ok=True)
os.chdir(output_dir)

In [None]:
!pwd

In [None]:
import sys
sys.setrecursionlimit(10000)
# set the maximum depth of the Python interpreter stack. This stack depth is crucial for recursive function calls, 
# as it limits how deep the recursion can go before causing a RecursionError.

**Importing the necessary libraries**

In [None]:
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
import shutil
import time
import os
import re
import shutil
import pandas as pd
import numpy as np
from numpy import loadtxt
import csv
from typing import *
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.colors as pltc
from matplotlib.gridspec import GridSpec

In [None]:
%%time

### Remove all files and directories created in the previous execution to avoid any confusion

file_and_dir_to_remove: List[str]=[init_conf_xyz,opt_conf_SMILES_file,similarity_output_csv,
feasible_geometries_csv,infeasible_geometries_csv,feasible_geometries_xyz,infeasible_geometries_xyz,pairwise_RMSDs_dat,
pairwise_RMSDs_csv,cluster_reps_csv,cluster_reps_xyz,cluster_rep_prefix,cluster_reps_dir,clusters_RMSD_stats_csv,clusters_energy_stats_csv,
opt_cluster_reps_csv, opt_conf_energy_csv, opt_conf_sdf]

remove_paths(file_and_dir_to_remove)

# Step 2: Loading GNNIS & TD Conformers 

In [None]:
def loading_sdf_file(sdf_file):
    # Load all conformers from the SDF file
    supplier = Chem.SDMolSupplier(sdf_file, removeHs=False)
    
    # Create a new molecule to hold all conformers
    mol = None
    
    for i, m in enumerate(supplier):
        if m is None:
            print(f"[Warning] Molecule {i} could not be read. Check formatting in SDF.")
            continue
    
        if mol is None:
            mol = Chem.Mol(m)
            mol.RemoveAllConformers()  # start with clean conformer list
    
        # Add conformer with unique ID
        conf = m.GetConformer()
        conf.SetId(i)
        mol.AddConformer(conf, assignId=True)
    return mol

mol_gnnis=loading_sdf_file("conformers_gnnis.sdf")
mol_td=loading_sdf_file("conformers_TD.sdf")

# Generating Conformers using RDKit

In [None]:
%%time

import time
import sys


mol_rdkit_ini: Chem.Mol= generate_conformers(inp_smiles, num_conf_rdkit)  # Call the function to generate conformers

### Optimize the generated conformers and save the optimized coordinates
mol_rdkit, _ = mmff_optimize_conformers(mol_rdkit_ini)     # Call the function to optimize conformers
save_conformers_to_sdf(mol_rdkit,"conformers_RDKit.sdf")

# Combining all the conformers from TD, GNNIS and RDKit

In [None]:
def simple_combine_conformers(mol1, mol2):
    """Simple conformer combination"""
    
    combined_mol = Chem.Mol(mol1)
    combined_mol.RemoveAllConformers()
    
    # Add all conformers from both molecules
    for i in range(mol1.GetNumConformers()):
        conf = mol1.GetConformer(i)
        combined_mol.AddConformer(conf, assignId=True)
    
    for i in range(mol2.GetNumConformers()):
        conf = mol2.GetConformer(i)
        combined_mol.AddConformer(conf, assignId=True)
    
    return combined_mol

# Usage
mol_1 = simple_combine_conformers(mol_rdkit, mol_gnnis)
mol_2=simple_combine_conformers(mol_1, mol_td)
save_conformers_to_sdf(mol_2, init_conf_sdf)

# Applying cis/trans filter

In [None]:
!python /app/dihedral_filter.py "{inp_smiles}" {init_conf_sdf} filtered_initial_generated_conformers.sdf
#Reading it as mol
mol=loading_sdf_file("filtered_initial_generated_conformers.sdf")

In [None]:
# Find the number of atoms in the molecule
num_atoms_generated_conf: int = mol.GetNumAtoms()

# Step 3: Calculating Energy of the Conformers using ANI2x

In [None]:
import torchani
from rdkit.Geometry import Point3D
import torch

def ani_optimize_conformers(opt_mol: Chem.Mol, model_name: str = 'ANI2x', 
                           max_iter: int = 50, batch_size: int = 20) -> Tuple[Chem.Mol, Dict[int, float]]:
    """Optimize the conformers of a molecule using the ANI model with performance optimizations.

    Args:
        opt_mol (Chem.Mol): The molecule with conformers to optimize.
        model_name (str): ANI model to use ('ANI1ccx', 'ANI2x', etc.).
        max_iter (int): Maximum optimization iterations per conformer.
        batch_size (int): Number of conformers to process in parallel.

    Returns:
        Tuple[Chem.Mol, Dict[int, float]]: The optimized molecule and a dictionary
                                           containing ANI energies of the conformers in kcal/mol.
    """
    # Conversion factor from Hartree to kcal/mol
    HARTREE_TO_KCAL_MOL = 627.509474
    
    # Set up device and load ANI model ONCE
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    if model_name == 'ANI1ccx':
        model = torchani.models.ANI1ccx().to(device)
    elif model_name == 'ANI2x':
        model = torchani.models.ANI2x().to(device)
    else:
        raise ValueError(f"Unsupported ANI model: {model_name}")
    
    # Set model to evaluation mode for better performance
    model.eval()
    
    # An empty dictionary to store ANI energies of optimized conformers
    ani_energies: Dict[int, float] = {}
    
    # Get species tensor for all atoms in the molecule ONCE
    symbols = [atom.GetSymbol() for atom in opt_mol.GetAtoms()]
    species_str = ''.join(symbols)
    
    try:
        species = model.species_to_tensor(species_str).to(device).unsqueeze(0)
    except Exception:
        raise ValueError(f"ANI model does not support all atom types in the molecule")

    num_conformers = opt_mol.GetNumConformers()
    print(f"Optimizing {num_conformers} conformers...")
    
    # Process conformers in batches for better memory management
    for batch_start in range(0, num_conformers, batch_size):
        batch_end = min(batch_start + batch_size, num_conformers)
        print(f"Processing conformers {batch_start}-{batch_end-1}")
        
        batch_coords = []
        batch_conf_ids = []
        
        # Prepare batch data
        for conf_id in range(batch_start, batch_end):
            conf = opt_mol.GetConformer(conf_id)
            positions = conf.GetPositions()
            coordinates = torch.tensor(positions, dtype=torch.float32, device=device)
            batch_coords.append(coordinates)
            batch_conf_ids.append(conf_id)
        
        # Stack coordinates for batch processing
        batch_coordinates = torch.stack(batch_coords).requires_grad_(True)
        batch_species = species.repeat(len(batch_coords), 1)
        
        # Optimize batch
        optimizer = torch.optim.LBFGS([batch_coordinates], 
                                     max_iter=max_iter,
                                     tolerance_grad=1e-4,  # Looser convergence for speed
                                     tolerance_change=1e-6)
        
        def closure():
            optimizer.zero_grad()
            _, energies = model((batch_species, batch_coordinates))
            total_energy = energies.sum()
            total_energy.backward()
            return total_energy
        
        # Optimize the batch
        start_time = time.time()
        optimizer.step(closure)
        opt_time = time.time() - start_time
        print(f"Batch optimization took {opt_time:.2f} seconds")
        
        # Extract results and update molecule
        with torch.no_grad():
            _, final_energies = model((batch_species, batch_coordinates))
            optimized_coords = batch_coordinates.cpu().numpy()
            
            for i, conf_id in enumerate(batch_conf_ids):
                conf = opt_mol.GetConformer(conf_id)
                coords = optimized_coords[i]
                
                # Update conformer coordinates
                for atom_idx, pos in enumerate(coords):
                    point = Point3D(float(pos[0]), float(pos[1]), float(pos[2]))
                    conf.SetAtomPosition(atom_idx, point)
                
                # Store energy converted to kcal/mol
                energy_hartree = final_energies[i].item()
                energy_kcal_mol = energy_hartree * HARTREE_TO_KCAL_MOL
                ani_energies[conf_id] = energy_kcal_mol
        
        # Clear GPU cache to free memory
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    return opt_mol, ani_energies


In [None]:
%%time

### Optimize the generated conformers and save the optimized coordinates
opt_mol, ani_energies = ani_optimize_conformers(mol)     # Call the function to optimize conformers
save_conformers_to_sdf(opt_mol,opt_conf_sdf)


num_opt_conf: int= opt_mol.GetNumConformers()

### Save the energies of optimized to a CSV file
ani_energies_items : List[Tuple[int, float]] = list(ani_energies.items())
energy_DF: pd.DataFrame = pd.DataFrame(ani_energies_items, columns=['conformer_id', 'energy_in_kcalpermol'])
energy_DF.to_csv(opt_conf_energy_csv, index=False)

In [None]:
%%time

### Convert the 3D gometries of conformers into SMILES and save them
convert_conformers_to_smiles(opt_conf_sdf,opt_conf_SMILES_file)

In [None]:
### Process optimized conformers to calculate Tanimoto similarity and separate feasible and infeasible geometries.
infeasible_geom_DF, energy_DF=process_conformers(opt_conf_SMILES_file,opt_conf_sdf,feasible_geometries_sdf,infeasible_geometries_sdf,similarity_output_csv,infeasible_geometries_csv,inp_smiles,num_opt_conf,energy_DF)


In [None]:
%%time

### Calculate the numbers of conformers with feasible and infeasible geometries
num_feasible_geom: int = len(energy_DF)
num_infeasible_geom: int = len(infeasible_geom_DF)

with open("outputs.txt", 'a') as file:
    file.write(f'Number_of_feasible_geometries: {num_feasible_geom}\n')
    
print("Number of conformers with infeasible geometries:", num_infeasible_geom)
print("Number of conformers with feasible geometries:", num_feasible_geom)
print("Total number of conformers for which the geometry feasibility was checked:", num_infeasible_geom+num_feasible_geom)
#print("Total number of conformers generated:", num_conf)

In [None]:
%%time

### Calculate the relative energies of conformers and write the results to a CSV file.
rel_energy_DF: pd.DataFrame=calculate_relative_energies(energy_DF,feasible_geometries_csv)


In [None]:
%%time

fig = plt.figure(figsize=(4, 4))

### Plot the relative energy distribution for conformers with feasible geometries
n_bins=10
plt.hist(rel_energy_DF['rel_energy_in_kcalpermol'], bins=n_bins, density=False, color='black', histtype='step', fill=False, lw=2)
#density=False: If True, the histogram is normalized so that the area under the histogram integrates to 1. If False, the histogram represents the count of occurrences in each bin.
#'bar': Traditional bar histogram (default)
plt.xlabel('Rel. ANI-2x Energy (kcal/mol)')
plt.ylabel('Count')
plt.title('GRT-ani')
plt.grid(False)

### Show the plot
plt.show()

### Save figure
fig.savefig("rel_ANI-2x_energies-count_histogram", bbox_inches='tight', pad_inches=0.04, transparent = False)

## **Step 4: Calculating RMSD Matrix**

Using Open Babel obrms command to calculate the Root Mean Square Deviation (RMSD) between the feasiable geometries present in an SDF file

In [None]:
%%time

### Run obrms on the concatenated sdf file of conformers with feasible geometries to compute RMSD matrix
calculate_rmsd(feasible_geometries_sdf,pairwise_RMSDs_dat)

In [None]:
%%time

from scipy.spatial.distance import squareform, is_valid_dm

### Read the pairwise RMSD matrix from the output of obrms; it is supposed to be a hollow, asymmetric matrix
rmsd_matrix_DF: pd.DataFrame = pd.read_csv(pairwise_RMSDs_dat, header=None, index_col=0)

### Convert the pairwise RMSD matrix into a numpy float-type 2D array
rmsd_matrix: np.ndarray = rmsd_matrix_DF.to_numpy(dtype=float)

### Round the matrix elements to two decimal places to avoid possible asymmetry in the matrix due to insignificant numerical errors
rmsd_matrix_2DP: np.ndarray  = np.round(rmsd_matrix, 2)

# Force the matrix to be symmetric
rmsd_matrix_2DP = (rmsd_matrix_2DP + rmsd_matrix_2DP.T) / 2

# Check if the matrix is symmetric
if not is_valid_dm(rmsd_matrix_2DP, throw=False):
    raise ValueError("The provided RMSD matrix is not symmetric even after rounding and forcing symmetry.")

In [None]:
%%time

### Convert the the pairwise distance matrix to its condensed form; write the pairwise RMSDs from the condensed matrix into a CSV file
from scipy.spatial.distance import squareform
condensed_matrix: np.ndarray  = squareform(rmsd_matrix_2DP)
pairwise_RMSDs_DF: pd.DataFrame = pd.DataFrame(condensed_matrix)
pairwise_RMSDs_DF.to_csv(pairwise_RMSDs_csv, header=['pairwise_RMSD'], index=False)

In [None]:
%%time

### Plot the distribution of pairwise RMSDs
fig = plt.figure(figsize=(4, 4))

### Plot the histograms
plt.hist(condensed_matrix, bins=8, density=True, color='black', fill=False, lw=2)

### Format the axes
plt.xlabel(r'RMSD ($\AA)$')
plt.ylabel('Probability Density')
plt.title('Pairwise RMSDs')
plt.grid(False)

### Show the plot
plt.show()

### Save figure
fig.savefig("pairwise_rmsd_distribution-PD.png", bbox_inches='tight', pad_inches=0.04, transparent = False)

## **Step 4: Hierarchical Cluster**

Clustering the generated conformers into 20 clusters using hierarchical clustering with `ward` linkage method. 

In [None]:
%%time

### Perform hierarchical clustering with 'ward' linkage method on the condensed version of pairwise distance matrix
import scipy.cluster.hierarchy as sch
linkage_matrix_ward: np.ndarray = sch.linkage(condensed_matrix, method='ward')

In [None]:
%%time

### A few settings to export the image of the plot
plt.style.use('default')
fig = plt.figure(figsize=(8, 6))

### Plot the dendrogram to visualize the hierarchical clustering structure
sch.dendrogram(linkage_matrix_ward, no_labels=True)
plt.title('Dendrogram with Ward Linkage Method')
plt.xlabel('Conformers')
plt.ylabel('Distance')

### Show the plot
plt.show()

### Save figure
fig.savefig("hierarchical_clustering_dendogram-ward.png", bbox_inches='tight', pad_inches=0.04, transparent = False)

In [None]:
%%time

### A few settings to export the image of the plot
# plt.style.use('~/matplotlib_templates/single_column.mplstyle')
fig = plt.figure(figsize=(4, 4))

### Determine the optimal number of clusters using silhouette score; the original pairwise RMSD matrix must be used for this
from sklearn.metrics import silhouette_score
from scipy.cluster.hierarchy import linkage, fcluster
from collections import Counter


## Calculate silhouette score for different numbers of clusters
sil_scores = []
range_n_clusters = list(range(2, 101))     # Try different numbers of clusters
for n_clusters in range_n_clusters:
    cluster_labels = fcluster(linkage_matrix_ward, n_clusters, criterion='maxclust')
    cluster_counts = Counter(cluster_labels)
    # print(cluster_counts)
    
    # Check if the clustering resulted in more than one cluster
    if len(cluster_counts) > 1:
        sil_score = silhouette_score(rmsd_matrix_2DP, cluster_labels, metric='precomputed')
        sil_scores.append(sil_score)
    else:
        sil_scores.append(float('-inf'))  # Append a very low score if there's only one cluster    


## Plot the Silhouette scores
plt.plot(range_n_clusters, sil_scores, marker='o', color='black', fillstyle='none', ms=2, lw=2)
plt.xlabel("Number of Clusters")
plt.ylabel("Silhouette Score")
plt.title("Ward Linkage")
plt.axis([-5, 105, -0.05, 1.05])
plt.xticks([0, 25, 50, 75, 100])
plt.yticks([0.0, 0.2, 0.4, 0.6, 0.8, 1.0])

### Show the plot
plt.show()

### Save figure
fig.savefig("silhouette_score_vs_num_clust-ward.png", bbox_inches='tight', pad_inches=0.04, transparent = False)

## Find the optimal number of clusters based on the maximum value of silhouette score and printing it
max_sil_score = np.max(sil_scores)
optimal_clusters = range_n_clusters[np.argmax(sil_scores)]
#print(f"The optimal number of clusters is {optimal_clusters} with a silhouette score of {max_sil_score}")

# Additional logic to prevent printing an invalid result
if max_sil_score == float('-inf'):
    print("Clustering failed to produce more than one cluster for any tested value. No optimal number of clusters found.")
else:
    print(f"The optimal number of clusters is {optimal_clusters} with a silhouette score of {max_sil_score}")

In [None]:
%%time

from scipy.cluster.hierarchy import linkage, fcluster

num_clusters = 5 if optimal_clusters > 5 else optimal_clusters

## For each conformer, assign the cluster label to which it belongs
cluster_labels: np.ndarray = fcluster(linkage_matrix_ward, num_clusters, criterion='maxclust')

## Create an empty dictionary to store the cluster sets
clusters: Dict[int, List[int]] = {i: [] for i in range(1, num_clusters + 1)}

## Assign each cluster label to the respective cluster set
for index, label in enumerate(cluster_labels):
    clusters[label].append(index)     # Store the indices instead of raw data
    

## **Step 5: Identifying Cluster Representative**

Identifying the minimum energy conformer within each cluster as its representative

In [None]:
%%time

### Identify the minimum energy conformer within each cluster as its representative

## Loop over all the cluster sets
cluster_reps_list: List[pd.DataFrame] = []

for clust_label, clust_elements in clusters.items():
    if len(clust_elements)!=0:
        clust_DF: pd.DataFrame = rel_energy_DF.loc[clust_elements]     # Extract the relative energies of the cluster elements into a dataframe
        min_energy_index: int = clust_DF['rel_energy_in_kcalpermol'].idxmin()     # Find the row index correspoding to the minimum relative energy conformer within the cluster 
        min_energy_DF: pd.DataFrame= clust_DF.loc[[min_energy_index]]     # Isolate the repesentative conformer's relative energy into a dataframe
        min_energy_DF['cluster_id'] = clust_label     # Add the 'cluster ID' information to the above dataframe
        # print(min_energy_DF)
        cluster_reps_list.append(min_energy_DF)     # Append the dataframe corresponding to each cluster representative into a list of dataframes

## Concatenate the dataframes of all cluster representatives into a single dataframe
cluster_reps_DF: pd.DataFrame = pd.concat(cluster_reps_list, ignore_index=True)

## Sort the cluster respresentatives samples by 'conformer_id' and save the sorted dataframe to a csv file
sorted_cluster_reps_DF: pd.DataFrame = cluster_reps_DF.sort_values(by='conformer_id', ascending=True)
sorted_cluster_reps_DF.to_csv(cluster_reps_csv, index=False)

In [None]:
%%time

###  Write the coordinates of cluster representative conformers to SDF files.
write_cluster_representatives(opt_conf_sdf,cluster_reps_dir,cluster_reps_sdf,sorted_cluster_reps_DF,cluster_reps_DF,cluster_rep_prefix,conf_extension)

