# Dimer interaction energies DESS66x8

In this notebook we will compute dimer interaction energies for the DESS66x8 dataset using the DE-FF and other mainline OpenFF force fields to compare the performance.

In [1]:
from openff.toolkit.topology import Molecule, Topology
from openff.toolkit.typing.engines.smirnoff import ForceField
from openmm import unit, openmm, app
import matplotlib.pylab as plt
import networkx as nx
from collections import defaultdict
import numpy as np
import tqdm
from io import BytesIO
import base64
from rdkit import Chem
from rdkit.Chem.Draw import rdMolDraw2D
import pandas as pd
from typing import List, Optional
import h5py
import seaborn as sns
import csv
import os
sns.set_theme()


Create a class to store our creeated OpenMM simulations for each dimer pair and add a method to compute the interaction energy for a given conformation.

In [None]:
class InteractionEnergy:
    "Calculate the interation energy for a dimer pair at a given conformation usiing openmm"

    def __init__(self, off_mol: Molecule, force_field: ForceField, system_id: int, group_id: int, title: str) -> None:
        "Build an OpenMM simulation for each fragment and the total system"
        self.system_id = system_id
        self.group_id = group_id
        self.title = title
        platform = openmm.Platform.getPlatformByName("Reference")
        # track the toal number of atoms to make sure the corrds match the topology
        self.n_atoms = off_mol.n_atoms
        graph = off_mol.to_networkx()
        # work out the disconected sub graphs as the input molecule contains both monomers 
        sub_graphs = [
            sorted(sub_graph)
            for sub_graph in nx.connected_components(graph)
        ]
        # build the individual molecules
        off_mols = []
        for i, sub_graph in enumerate(sub_graphs):
            # map the old index to the new one
            index_mapping = {}
            comp_mol = Molecule()
            for atom in sub_graph:
                new_index = comp_mol.add_atom(**off_mol.atoms[atom].to_dict())
                index_mapping[atom] = new_index
            for bond in off_mol.bonds:
                if bond.atom1_index in sub_graph and bond.atom2_index in sub_graph:
                    bond_data = {
                        "atom1": comp_mol.atoms[index_mapping[bond.atom1_index]],
                        "atom2": comp_mol.atoms[index_mapping[bond.atom2_index]],
                        "bond_order": bond.bond_order,
                        "stereochemistry": bond.stereochemistry,
                        "is_aromatic": bond.is_aromatic,
                        "fractional_bond_order": bond.fractional_bond_order,
                    }
                    comp_mol.add_bond(**bond_data)
            off_mols.append(comp_mol)
            # build an openmm simulation for each fragment
            comp_top = comp_mol.to_topology()
            system = force_field.create_openmm_system(comp_top)
            integrator = openmm.VerletIntegrator(1.0 * unit.femtoseconds)

            simulation = app.Simulation(
                comp_top.to_openmm(), system, integrator, platform
            )
            if i == 0:
                self.A = simulation
                self.A_atoms = sub_graph
                self.A_index_mapping = index_mapping

            else:
                self.B = simulation
                self.B_atoms = sub_graph
                self.B_index_mapping = index_mapping

        # now build the dimer total system
        total_off_top = Topology.from_molecules(molecules=off_mols)
        total_system = force_field.create_openmm_system(total_off_top)
        integrator = openmm.VerletIntegrator(1.0 * unit.femtoseconds)
        simulation = app.Simulation(
                total_off_top.to_openmm(), total_system, integrator, platform
            )
        self.D = simulation

    def _calculate_simulation_energy(self, simulation: app.Simulation, positions: unit.Quantity) -> float:
        "calculate the energy of a simulation and return the energy in kcal/mol"
        system = simulation.context.getSystem()
        n_vsites = sum(1 for i in range(system.getNumParticles()) if system.isVirtualSite(i))
        n_atoms = system.getNumParticles() - n_vsites
        if n_vsites > 0 and len(positions) != n_atoms + n_vsites:
            # pad the positions if we have v-sites on the water 
            new_positions = np.zeros((n_atoms + n_vsites, 3))

            i = 0 
            for j in range(system.getNumParticles()):
                if not system.isVirtualSite(i):
                    new_positions[j] = positions[i].value_in_unit(unit.angstrom)
                    i += 1

            positions = new_positions * unit.angstrom

        simulation.context.setPositions(positions)
        if n_vsites > 0:
            simulation.context.computeVirtualSites()
        state = simulation.context.getState(getEnergy=True)
        energy = state.getPotentialEnergy().value_in_unit(unit.kilocalorie_per_mole)
        return energy
    
    def calculate_interaction_energy(self, conformation: unit.Quantity) -> float:
        "Calculate the interaction energy in kcal/mol at the given system conformation"
        assert conformation.shape == (self.n_atoms, 3)
        total_energy = self._calculate_simulation_energy(simulation=self.D, positions=conformation)
        # calculate the energy of each fragment
        fragA_conformer = np.zeros((len(self.A_atoms), 3))
        for i in self.A_atoms:
            fragA_conformer[self.A_index_mapping[i]] = conformation[i]
        fragA_energy = self._calculate_simulation_energy(simulation=self.A, positions=fragA_conformer * unit.angstrom)


        fragB_conformer = np.zeros((len(self.B_atoms), 3))
        for i in self.B_atoms:
            fragB_conformer[self.B_index_mapping[i]] = conformation[i]
        fragB_energy = self._calculate_simulation_energy(simulation=self.B, positions=fragB_conformer * unit.angstrom)

        return total_energy - fragA_energy - fragB_energy
    
def plot_energies(k_points, qm_energies, ff_energies, title) -> BytesIO:
    "Plot the interaction energies with the QM refernce and return the image to be embeded"
    plt.plot(k_points, qm_energies, label="CCSD(T)", marker="o")
    for key, values in ff_energies.items():
        plt.plot(k_points, values, label=key)
    plt.legend()
    plt.xlabel("K index")
    plt.ylabel("Interaction Energy kcal/mol")
    plt.title(title)
    tmpfile = BytesIO()
    plt.savefig(tmpfile, format="svg")
    plt.close()
    return tmpfile

Transform the raw version of the DESS66x8 dataset found at [zenodo](https://zenodo.org/record/5676284) into a format easier to work with. 

In [None]:
csv_file = "DESS66x8/DESS66x8.csv"
geometries = "DESS66x8/geometries/"
db_name = "DESS66x8-NEW.hdf5"

ref_db = h5py.File(db_name, "w")

with open(csv_file) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in tqdm.tqdm(reader, ncols=80, total=reader.line_num, desc="Create ref database"):
        # print(f"building new system for {row['smiles0']} and {row['smiles1']} {row['system_id']}  {row['group_id']}")
        position_file = os.path.join(geometries, row["system_id"], f"DESS66x8_{row['geom_id']}.mol")
        off_mol = Molecule.from_file(position_file)
        ccsd_energy = float(row["cbs_CCSD(T)_all"])
        k_index = int(row["k_index"])
        name = f"{row['system_id']}-{row['group_id']}-{row['geom_id']}"
        # create a group for the entry
        group = ref_db.create_group(name=name)
        group.create_dataset("smiles", data=[off_mol.to_smiles(mapped=True)], dtype=h5py.string_dtype())
        conformations = group.create_dataset("conformation", data=[off_mol.conformers[0].value_in_unit(unit.angstrom)], dtype=np.float64)
        conformations.attrs["units"] = "angstrom"
        group.create_dataset("atomic_numbers", data=[atom.atomic_number for atom in off_mol.atoms], dtype=np.int16)
        int_energy = group.create_dataset("interaction_energy", data=[ccsd_energy], dtype=np.float64)
        int_energy.attrs["units"] = "kcal / mol"
        group.create_dataset("kindex", data=[k_index], dtype=np.int16)
        group.create_dataset("title", data=[row['system_name']], dtype=h5py.string_dtype())

Create a function that can take the hdf5 version of the dataset and calculate the interaction energy for each entry with the given force field and store the value under the title field.

In [None]:
def calculate_interaction_energies(reference_db: h5py.File, force_field: ForceField, title: str):
    """For the given reference DB of interaction energies and conformations calculate values using the input force field and store them under the title field for each configuration

    Note the DB should be opened in write mode so the calculated values can be added in place .
    """

    current_system = None
    for ref_data in tqdm.tqdm(reference_db.values(), desc=f"Calculating energies for {title}", ncols=80):
        # check if we need to construct the system again
        system_id, group_id, _ = ref_data.name.split("-")
        if current_system is not None and (current_system.system_id != system_id or current_system.group_id != group_id):
            current_system = InteractionEnergy(off_mol=Molecule.from_mapped_smiles(ref_data['smiles'].asstr()[0]), force_field=force_field, system_id=system_id, group_id=group_id, title=ref_data['title'].asstr()[0])
        elif current_system is None:
            current_system = InteractionEnergy(off_mol=Molecule.from_mapped_smiles(ref_data['smiles'].asstr()[0]), force_field=force_field, system_id=system_id, group_id=group_id, title=ref_data['title'].asstr()[0])

        # calculate the energy
        conformation = ref_data["conformation"][:].reshape((-1, 3))
        units = ref_data['conformation'].attrs['units']
        conformation *= getattr(unit, units)
        ff_energy = current_system.calculate_interaction_energy(conformation=conformation)
        # add it back to the dataset with units under the title
        try:
            calc_energy = ref_data.create_dataset(title, data=[ff_energy], dtype=np.float64)
        except ValueError:
            # if the dataset has already been created we need to just overwrite the data
            calc_energy = ref_data[title]
            calc_energy[0] = ff_energy
        
        calc_energy.attrs['units'] = "kcal / mol"



Calculate the interaction energies with DE and Sage.

In [None]:
# DEFF
de_ff = ForceField("dexp_all_fit_v1.offxml", load_plugins=True, allow_cosmetic_attributes=True)
calculate_interaction_energies(reference_db=ref_db, force_field=de_ff, title="DEFF")
# Sage
sage = ForceField("openff_unconstrained-2.0.0.offxml")
calculate_interaction_energies(reference_db=ref_db, force_field=sage, title="Sage")

In [None]:
# close the db and open in read only mode
ref_db.close()
ref_db = h5py.File(db_name, "r")

Define some analysis functions to create a pandas dataframe to compare the force field methods with QM reference data.

In [None]:
def RMSE(experiment_values: np.array, calculated_data: np.array) -> float:
    "Calculate the RMSE"
    return np.sqrt(np.mean((experiment_values - calculated_data) ** 2))

def make_interaction_image(molecule: Molecule) -> str:
        "Make an svg image of the molecules whoes interaction energy we are calculating."
        # smiles_parser = Chem.rdmolfiles.SmilesParserParams()
        # smiles_parser.removeHs = False

        smiles = molecule.to_smiles(explicit_hydrogens=False)
        rdkit_mol = Chem.MolFromSmiles(smiles)
        rdkit_mol = Chem.RemoveHs(rdkit_mol)
        Chem.rdDepictor.Compute2DCoords(rdkit_mol)
        drawer = rdMolDraw2D.MolDraw2DSVG(200, 200)
        rdMolDraw2D.PrepareAndDrawMolecule(drawer, rdkit_mol)
        drawer.FinishDrawing()
        return drawer.GetDrawingText()

def create_comparison_dataframe(reference_db: h5py.File, methods: List[str], excluded_common_id: Optional[List[str]] = None) -> pd.DataFrame:
    """
    For the given refernce database construct a pandas data frame which includes plots of the interaction energy as a function of distance for the list of methods compared to the reference.

    Args:
        reference_db: the hdf5 file with the reference geometries and energies
        methods: the list of methods the interactions have been calculated with which should be compared
        excluded_common_ids: the string of the system and group ids for systems which should be excluded from the comparison.
    """

    # group by ids
    records_by_ids = defaultdict(list)
    recored_ordering = []
    for key in reference_db.keys():
        system_id, group_id, _ = key.split("-")
        rec_id = f"{system_id}-{group_id}"
        records_by_ids[rec_id].append(key)
        index = float(reference_db[key]['title'].asstr()[0].split("_")[0])
        if (index, rec_id) not in recored_ordering:
            recored_ordering.append((index, rec_id))
         
    recored_ordering.sort(key=lambda x:x[0])
    all_data = {"smiles": [], "molecules": [], "graphs": []}
    for method in methods:
        all_data[method] = []

    if excluded_common_id is None:
        excluded_common_id = []
    # collect the data for each of the entries
    for _, rec_id in recored_ordering:
        if rec_id in excluded_common_id:
            # skip if the common id has been requested to be missed!
            continue
        record_ids = records_by_ids[rec_id]
        ref_energies = []
        ff_energies = defaultdict(list)
        kindex = []
        for record_id in record_ids:
            record = reference_db[record_id]
            ref_energies.append(record['interaction_energy'][0])
            kindex.append(record['kindex'][0])
            for method in methods:
                ff_energies[method].append(record[method][0])

        # once dont use the last record to extract the smiles and plot
        smiles = record['smiles'].asstr()[0]
        off_mol = Molecule.from_mapped_smiles(smiles)
        mol_figure = make_interaction_image(molecule=off_mol)
        graph = plot_energies(k_points=kindex, qm_energies=ref_energies, ff_energies=ff_energies, title=record['title'].asstr()[0])
        # add to the main datafram
        all_data['smiles'].append(off_mol.to_smiles(explicit_hydrogens=False))
        all_data['molecules'].append(mol_figure)
        all_data['graphs'].append(graph)
        for method in methods:
            all_data[method].append(RMSE(experiment_values=np.array(ref_energies), calculated_data=np.array(ff_energies[method])))

    return pd.DataFrame(all_data)

In [None]:
df = create_comparison_dataframe(reference_db=ref_db, reference_label="CCSD(T)", methods=["DEFF", "Sage"])

In [None]:
df

Define a function to convert the pandas dataframe into an HTML report with embedded images of the molecules which make up the dimer and plots of the interaction potential as a function of dimer separation.

In [None]:
from IPython.core.display import HTML, SVG
def create_html(data_frame: pd.DataFrame, file_name: Optional[str] = None) -> Optional[HTML]:
    "Create an HTML report for the dataframe or return the HTML object if it should be displayed in the notebook (when file name is not provided)"

    # define the helper functions for the mol figure and the graph
    def graph_to_html(graph: BytesIO) -> str:
        data = base64.b64encode(graph.getbuffer()).decode("ascii")
        return f"<img src='data:image/svg+xml;base64,{data}'/>"

    def mol_to_html(interaction_image: str) -> str:
        raw_data = base64.b64encode(interaction_image.encode()).decode()
        return f"<img src='data:image/svg+xml;base64,{raw_data}'/>"

    if file_name is not None:
        return data_frame.to_html(file_name, escape=False, formatters={"graphs": graph_to_html, "molecules": mol_to_html})
    return HTML(data_frame.to_html(escape=False, formatters={"graphs": graph_to_html, "molecules": mol_to_html}))
    

Remove any molecules in the dataset typed by unoptimised non-bonded parameters. 

In [None]:
# loop over the smiles and type the molecules, collect all ids which are valid
optimised_smirks = [
    "[#1:1]-[#6X4]", 
    "[#1:1]-[#6X4]-[#7,#8,#9,#16,#17,#35]",
    "[#1:1]-[#6X3]",
    "[#1:1]-[#6X3]~[#7,#8,#9,#16,#17,#35]",
    "[#1:1]-[#6X3](~[#7,#8,#9,#16,#17,#35])~[#7,#8,#9,#16,#17,#35]",
    "[#1:1]-[#7]",
    "[#1:1]-[#8]",
    "[#6:1]",
    "[#6X4:1]",
    "[#8:1]",
    "[#8X2H0+0:1]",
    "[#8X2H1+0:1]",
    "[#7:1]",
    "[#17:1]",
    "[#35:1]",
    "[#1]-[#8X2H2+0:1]-[#1]",
    "[#1:1]-[#8X2H2+0]-[#1]"
    ]
# group by ids
skip_ids = []
records_by_ids = defaultdict(list)
recored_ordering = []
for key in ref_db.keys():
    system_id, group_id, _ = key.split("-")
    rec_id = f"{system_id}-{group_id}"
    records_by_ids[rec_id].append(key)
    index = float(ref_db[key]['title'].asstr()[0].split("_")[0])
    if (index, rec_id) not in recored_ordering:
        recored_ordering.append((index, rec_id))
        
recored_ordering.sort(key=lambda x:x[0])
for _, rec_id in recored_ordering:
        record_ids = records_by_ids[rec_id]
        # check the coverage of the first record
        record = ref_db[record_ids[0]]
        off_mol = Molecule.from_mapped_smiles(record['smiles'].asstr()[0])
        labels = sage.label_molecules(off_mol.to_topology())[0]['vdW']
        for parameter in labels.values():
            if parameter.smirks not in optimised_smirks:
                 skip_ids.append(rec_id)
                 break

skip_ids

We find there are 7 dimer pairs which correspond to a triple bonded carbon type which should be excluded from the reported results.

To calculate the root-mean-squared error across all dimers we now collect together the individual results into a single array.

In [None]:
ff_values = defaultdict(list)
qm_values = []
per_interaction_rmse = defaultdict(list)
methods = ["DEFF", "Sage"]
for _, rec_id in recored_ordering:
    if rec_id in skip_ids:
        continue
    record_ids = records_by_ids[rec_id]
    for record_id in record_ids:
        record = ref_db[record_id]
        qm_values.append(record['interaction_energy'][0])
        for method in methods:
            ff_values[method].append(record[method][0])

Define the analysis functions to compute bootstrapped metrics.

In [None]:
# overal rmse
import scipy
from typing import Dict

def MUE(experiment_values: np.array, calculated_values: np.array) -> float:
    "Calculate the mean unsigned error between a set experimental and calculated values."
    return np.mean(np.absolute(experiment_values - calculated_values))

def CORRELATION(experiment_values: np.array, calculated_values: np.array) -> float:
    "Calculate the r squared correlation between a set experimental and calculated values."
    _, _, r_value, _, _ = scipy.stats.linregress(experiment_values, calculated_values)
    return r_value**2

def KTAU(experiment_values: np.array, calculated_values: np.array) -> float:
    "Calculate the kendal tau between a set experimental and calculated values."
    return scipy.stats.kendalltau(experiment_values, calculated_values)[0]

def MSE(experiment_values: np.array, calculated_values: np.array) -> float:
    "Calculate the mean signed error between a set experimental and calculated values."
    return np.mean(calculated_values - experiment_values)

def compute_bootstraped_statistics(refernce_values: List[float], calculated_values: List[float], bootstrap_iterations: int = 1000, interval: float = 0.95) -> Dict[str, float]:
    """
    Compute bootstraped statistics for each of the defined error metrics.
    """
    # make a list of experimental and predicted values
    experimental_values = np.array(refernce_values)
    prediction_values = np.array(calculated_values)
    mue = MUE(experiment_values=experimental_values, calculated_values=prediction_values)
    rmse = RMSE(experimental_values, prediction_values)
    correlation = CORRELATION(experimental_values, prediction_values)
    tau = KTAU(experimental_values, prediction_values)
    mse = MSE(experimental_values, prediction_values)
    samples = len(experimental_values)
    sample_statistics = defaultdict(list)
    for _ in range(bootstrap_iterations):
        sample_indicies = np.random.randint(low=0, high=samples, size=samples)
        sample_experiment_values = experimental_values[sample_indicies]
        sample_prediction_values = prediction_values[sample_indicies]

        sample_statistics["MUE"].append(MUE(experiment_values=sample_experiment_values, calculated_values=sample_prediction_values))
        sample_statistics["RMSE"].append(RMSE(experiment_values=sample_experiment_values, calculated_data=sample_prediction_values))
        sample_statistics["R^2"].append(CORRELATION(experiment_values=sample_experiment_values, calculated_values=sample_prediction_values))
        sample_statistics["KTAU"].append(KTAU(experiment_values=sample_experiment_values, calculated_values=sample_prediction_values))
        sample_statistics["MSE"].append(MSE(experiment_values=sample_experiment_values, calculated_values=sample_prediction_values))
        
     # Compute the confidence intervals.
    lower_percentile_index = int(bootstrap_iterations * (1 - interval) / 2)
    upper_percentile_index = int(bootstrap_iterations * (1 + interval) / 2)

    final_stats = {}
    for metric, base_value in [("MUE", mue), ("RMSE", rmse), ("R^2", correlation), ("KTAU", tau), ("MSE", mse)]:
        final_stats[metric] = base_value
        sorted_samples = np.sort(sample_statistics[metric])
        final_stats[metric + "_lower"] = sorted_samples[lower_percentile_index]
        final_stats[metric + "_higher"] = sorted_samples[upper_percentile_index]

    return final_stats

In [None]:
# overall RMSE
compute_bootstraped_statistics(qm_values, ff_values["Sage"])

In [None]:
compute_bootstraped_statistics(qm_values, ff_values["DEFF"])

We can now create our report for the reduced set of targets.

In [None]:
df_subset = create_comparison_dataframe(reference_db=ref_db, reference_label="", methods=["DEFF", "Sage"], excluded_common_id=skip_ids)

In [None]:
df_subset

In [None]:
create_html(data_frame=df_subset, file_name="S66x8_subset.html")

In [None]:
ref_db.close()