In [2]:
import numpy as np
import io
import os
from tqdm import tqdm
from pathlib import Path
from natsort import natsorted 
import multiprocessing
from multiprocessing import Pool, cpu_count

from openmm.app import *
from openmm import *
from openmm.unit import *

from openff.units.openmm import from_openmm
from openff.units.openmm import to_openmm
from openff.toolkit.topology import Molecule
from openmmforcefields.generators import SMIRNOFFTemplateGenerator

from ase.io import read as aseread
from ase.io import write as asewrite
from openbabel import pybel 

 Amber FF could be used for small molecules when interfaced with OpenFF   
 See GAFF generator for more details


In [3]:
def get_amber_E_F(molecule, forcefield_xml, charge_scheme):
    """
    Process a molecule to compute its potential energy using OpenMM.

    Args:
        molecule (Molecule): The OpenFF Molecule object.
        forcefield_xml (str): The path to the force field XML file.
        charge_scheme (str): The charge scheme to use (e.g., 'gasteiger').

    Returns:
        float: The potential energy of the molecule in kJ/mol.
    """
    # Generate conformers and assign partial charges
    # molecule.generate_conformers() - do NOT generate for single-point calculations
    platform = Platform.getPlatformByName("CPU")
    molecule.assign_partial_charges(charge_scheme)
    
    
    # Create an integrator and simulation
    # Create the SMIRNOFF template generator
    smirnoff = SMIRNOFFTemplateGenerator(molecules=molecule, )
    
    # Create an OpenMM ForceField object
    forcefield = app.ForceField(forcefield_xml)
    
    # Register the SMIRNOFF template generator
    forcefield.registerTemplateGenerator(smirnoff.generator)
    
    # Convert topology and positions to openmm format 
    topology = molecule.to_topology().to_openmm()
    positions_in_nm = to_openmm(molecule.conformers[0])
    
    # Create the system
    system = forcefield.createSystem(
        topology=topology,
        nonbondedMethod=app.NoCutoff,
        constraints=None
    )
    
    # Create an integrator and simulation
    integrator = LangevinMiddleIntegrator(300*kelvin, 1/picosecond, 0.004*picoseconds)
    simulation = Simulation(topology, system, integrator, platform)
    simulation.context.setPositions(positions_in_nm)
    
    # Get the potential energy
    state = simulation.context.getState(getEnergy=True, getForces=True)
    potential_energy = state.getPotentialEnergy().value_in_unit(kilocalorie_per_mole)

    # Extract atomic forces (convert to kcal/(mol·Å))
    forces = state.getForces(asNumpy=True).value_in_unit(kilocalorie_per_mole / angstrom)
    return potential_energy, np.array(forces)

In [4]:
def worker(sdf_file):
    molecule = Molecule.from_file(sdf_file, allow_undefined_stereo=True, file_format='sdf')
    energy, forces = get_amber_E_F(molecule, 'amber14-all.xml', 'mmff94')
    return sdf_file, energy, forces 

def parallel_process(sdf_files, N=32):
    with Pool(processes=N) as pool: # set reasonable amount but LESS than all cores
# Wrap the iterator with tqdm for a progress bar; pool.imap preserves order.
        results = list(tqdm(pool.imap(worker, sdf_files), total=len(sdf_files)))
    return results


In [5]:
def create_sdf_batch(atoms_list, output_dir=None, bond_order_pair=None, new_bond_order=2):
    """
    Convert a list of ASE Atoms objects into a list of in-memory SDF file-like objects.
    Optionally save each SDF string to a file in the specified output directory.
    Additionally, if a bond_order_pair is provided, modify the bond order between these
    atoms using OpenBabel before generating the SDF.
    
    With simple command tool conversion or on-the-fly generation withoit fixing N=N bond, Sage potential from OpenFF produces:
    >> The OpenFF Toolkit does not currently support parsing molecules with S- and P-block radicals. 
    >> Found 1 radical electrons on molecule [H]c1c([H])c([H])c([N][N]c2c([H])c([H])c([H])c([H])c2[H])c([H])c1[H].

    text
    On-the-fly conversion saves processing time when transferring data. Oftentimes, it is
    unnecessary to keep SDF files on disk.

    Args:
        atoms_list (list): List of ASE Atoms objects.
        output_dir (str, optional): Directory in which to save SDF files.
        bond_order_pair (tuple, optional): A tuple (atom1_index, atom2_index) specifying the
            pair of atoms whose bond order will be modified. Use 1-indexed positions (as required
            by OpenBabel's OBMol.GetAtom method). If None, no bond order modifications are made.
        new_bond_order (int, optional): The new bond order to set for the specified pair (default is 2).

    Returns:
        list: A list of io.BytesIO objects containing SDF data for each molecule.
    """
    printed_message = False 
    sdf_list = []

# Create the output directory if requested.
    if output_dir is not None:
        os.makedirs(output_dir, exist_ok=True)

    for i, atoms in enumerate(atoms_list):
    # Write the ASE Atoms object to an in-memory XYZ string.
        xyz_buffer = io.StringIO()
        asewrite(xyz_buffer, atoms, format="xyz")
        xyz_str = xyz_buffer.getvalue()

        # Convert the XYZ string to a molecule using Pybel.
        mol = pybel.readstring("xyz", xyz_str)

        # If a bond_order_pair is provided, fix the bond order.
        if bond_order_pair is not None:
            # Get the underlying OBMol structure.
            obmol = mol.OBMol
            obmol.ConnectTheDots()
            obmol.PerceiveBondOrders()

            # Retrieve the atoms using the provided (1-indexed) indices.
            atom1 = obmol.GetAtom(bond_order_pair[0])
            atom2 = obmol.GetAtom(bond_order_pair[1])
            bond = obmol.GetBond(atom1, atom2)
            bond = obmol.GetBond(obmol.GetAtom(bond_order_pair[0]), obmol.GetAtom(bond_order_pair[1]))
    
            if bond is None:
                if not printed_message:
                    print(f"No bond found between atoms {bond_order_pair[0]} and {bond_order_pair[1]} in molecule {i}.")
                    printed_message = True  # Set the flag so that this message is printed only once.
            else:
                bond.SetBondOrder(new_bond_order)
                if not printed_message:
                    print(f"Molecule {i}: Set bond order between atoms {bond_order_pair[0]} and {bond_order_pair[1]} to {new_bond_order}.\n")
                    print('This message will be printed once, but all files will be modified.')
                    printed_message = True  # Again, only print once.

        # Write the molecule to an SDF string.
        sdf_str = mol.write("sdf")
        sdf_bytes = sdf_str.encode("utf-8")
        sdf_buffer = io.BytesIO(sdf_bytes)
        sdf_list.append(sdf_buffer)

        # Optionally, save the SDF file to disk.
        if output_dir is not None:
            filename = os.path.join(output_dir, f"molecule_{i}.sdf")
            with open(filename, "w") as f:
                f.write(sdf_str)

    return sdf_list

In [6]:
def process_batch(atoms_list, N=32, bond_order_pair=None, new_bond_order=2):
    """
    Process a batch of atoms to compute shifted energies and forces.
    Parameters:
    - atoms_list: List of atoms to process.
    - N: Number of parallel processes.
    - bond_order_pair: Optional parameter for bond order.
    - new_bond_order: Optional parameter for new bond order.
    """
    # Create SDF files from the list of atoms    
    sdf_files = create_sdf_batch(atoms_list, bond_order_pair=bond_order_pair, new_bond_order=new_bond_order)
    results = parallel_process(sdf_files, N=N)
    
    E_amber = [energy for _, energy, _ in results] #unpack energies from list of tuples
    E_amber = np.array(E_amber)
    E_amber_shifted = E_amber - np.mean(E_amber)
    forces = np.array([force for _, _, force in results])

    return E_amber_shifted, forces

### Get single point energies for alanine dipeptide


In [7]:
ad_path = Path('data/AD/thermal_MD_10k/DFT-logs')
ad_atoms_list = [aseread(x) for x in natsorted(ad_path.iterdir()) if x.is_file() and x.suffix == '.log']

In [8]:
ad_results = process_batch(ad_atoms_list, N=48)

100%|███████████████████████████████████████████████████████████████████████| 10000/10000 [03:32<00:00, 47.04it/s]


In [9]:
np.save('data/AD/thermal_MD_10k/ad_E_amber_kcal_mol.npy', ad_results[0])
np.save('data/AD/thermal_MD_10k/ad_F_amber_kcal_mol_A.npy', ad_results[1])

### Get single point energies for azobenzene


In [10]:
az_path = Path('data/AZ/thermal_MD_10k/DFT-logs')
az_atoms_list = [aseread(x) for x in natsorted(az_path.iterdir()) if x.is_file() and x.suffix == '.log']

In [11]:
len(az_atoms_list)

10000

In [12]:
az_results = process_batch(az_atoms_list, N=48, bond_order_pair=(12, 13), new_bond_order=2)
# you can always check atom ordeing in any DFT log: for example, langevin_300K_AZ-cis_frame_0000.log
# openbabel starts indexing from 1 (not 0 like Python or ASE)

Molecule 0: Set bond order between atoms 12 and 13 to 2.

This message will be printed once, but all files will be modified.


100%|███████████████████████████████████████████████████████████████████████| 10000/10000 [03:43<00:00, 44.79it/s]


In [14]:
az_results[0][:10]

array([-16.55494802,  -4.90555354,  -5.30487768,   5.30358135,
       -11.37924605,  -2.50679347,  -4.31165088,  -6.52711469,
         0.27021311,  -0.867263  ])

In [15]:
np.save('data/AZ/thermal_MD_10k/az_E_amber_kcal_mol.npy', az_results[0])
np.save('data/AZ/thermal_MD_10k/az_F_amber_kcal_mol_A.npy', az_results[1])

In [None]:
### Get single point energies for azobenzene ISOMERIZATION paths

In [16]:
cs_path = Path('data/AZ/cs-inversion/AZ_cs-DFT_inversion_path.xyz')
cs_atoms_list = aseread(cs_path, index=":")

In [17]:
len(cs_atoms_list)

179

In [18]:
cs_iso_results = process_batch(cs_atoms_list, N=48, bond_order_pair=(12, 13), new_bond_order=2)
# sae atom order as in DFT calculations above

Molecule 0: Set bond order between atoms 12 and 13 to 2.

This message will be printed once, but all files will be modified.


100%|███████████████████████████████████████████████████████████████████████████| 179/179 [00:06<00:00, 26.49it/s]


In [19]:
np.save('data/AZ/cs-inversion/cs_inv_E_amber_kcal_mol.npy', cs_iso_results[0])

In [22]:
# same exercise for inversion open-shell pathway
os_path = Path('data/AZ/os-rotation/AZ_os-DFT_rotation_path.xyz')
os_atoms_list = aseread(os_path, index=":")
print(len(os_atoms_list))

270


In [24]:
os_results = process_batch(os_atoms_list, N=48, bond_order_pair=(1, 13), new_bond_order=2)
# different order of atoms 

Molecule 0: Set bond order between atoms 1 and 13 to 2.

This message will be printed once, but all files will be modified.


100%|███████████████████████████████████████████████████████████████████████████| 270/270 [00:08<00:00, 31.39it/s]


In [25]:
np.save('data/AZ/os-rotation/os_rot_E_amber_kcal_mol.npy', os_results[0])

