In [1]:
# general imports
import pickle
import random

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import torch
# Imports from the openff toolkit
from openff.toolkit.topology import Molecule
from openff.toolkit.typing.engines.smirnoff import ForceField

# import from openmm and ecosystem
from openmmml import MLPotential
import openmm as mm
from openmm.app import Simulation
from openmm import unit

forcefield = ForceField('openff_unconstrained-2.0.0.offxml')

from pymbar import BAR, EXP
from tqdm import tqdm

### define units
distance_unit = unit.angstrom
time_unit = unit.femtoseconds
speed_unit = distance_unit / time_unit

# constants
stepsize = 1 * time_unit
collision_rate = 1 / unit.picosecond
temperature = 300 * unit.kelvin

platform = 'cuda'
num_threads = 4
torch.set_num_threads(num_threads)

# generate a molecule using openff
###################
name = 'acetylacetone'
###################
molecule = Molecule.from_smiles('CC(C(C)=O)C(C)=O', hydrogens_are_explicit=False)
molecule.generate_conformers()



In [None]:
def get_positions(sim):
    """get position of system in a state"""
    return sim.context.getState(getPositions=True).getPositions(asNumpy=True)

def collect_samples(sim, n_samples=1_000, n_steps_per_sample=10_000, lamb:float=0.0):
    """generate samples using a classical FF"""
    sim.context.setParameter('lambda', lamb)

    print(f'Generate samples with mixed System: {lamb=}, {n_samples=}, {n_steps_per_sample=}')   
    samples = []
    for _ in tqdm(range(n_samples)):
        sim.step(n_steps_per_sample)
        samples.append(get_positions(sim))
    return samples

def create_mm_system(molecule):
    """given a molecule it creates an openMM system and topology instance"""
    topology = molecule.to_topology()
    system = forcefield.create_openmm_system(topology)
    return system, topology


In [None]:
potential = MLPotential('ani2x')
# generate a molecule using openff
system, topology = create_mm_system(molecule)
# define integrator
integrator = mm.LangevinIntegrator(temperature, collision_rate, stepsize)
# define the atoms that are calculated using both potentials
ml_atoms = [atom.topology_atom_index for atom in topology.topology_atoms]
ml_system = potential.createMixedSystem(topology.to_openmm(), system, ml_atoms, interpolate=True)    

platform = mm.Platform.getPlatformByName('Reference')

sim = Simulation(topology, ml_system, integrator, platform=platform)
sim.context.setPositions(molecule.conformers[0])

In [None]:
#mm_samples = collect_samples(sim, n_samples=1000, n_steps_per_sample=5_000, lamb=0.0)
#qml_samples = collect_samples(sim, n_samples=1000, n_steps_per_sample=5_000, lamb=1.0)
#pickle.dump(mm_samples, open(f'{name}_mm_samples.pickle', 'wb+'))
#pickle.dump(qml_samples, open('{name}_qml_samples.pickle', 'wb+'))

In [None]:
mm_samples = pickle.load(open(f'data_for_neq/{name}_mm_samples.pickle', 'rb'))
qml_samples = pickle.load(open(f'data_for_neq/{name}_qml_samples.pickle', 'rb'))

In [None]:
def perform_switching(lambdas:list, samples:list, nr_of_switches:int=50)->list:
    """performs NEQ switching using the lambda sheme passed from randomly dranw samples"""
    
    # list  of work values
    ws = []
    # start with switch
    for _ in tqdm(range(nr_of_switches)):
        # select a random sample
        x = np.array(random.choice(samples).value_in_unit(distance_unit)) * distance_unit
        # initialize work
        w = 0.0
        # set position    
        sim.context.setPositions(x)
        
        # perform NEQ switching
        for idx_lamb in range(1,len(lambdas)):
            # set lambda parameter
            sim.context.setParameter('lambda', lambdas[idx_lamb])
            # perform 1 simulation step
            sim.step(1)
            # calculate work
            # evaluate u_t(x_t) - u_{t-1}(x_t)
            # calculate u_t(x_t)
            u_now = sim.context.getState(getEnergy=True).getPotentialEnergy()
            # calculate u_{t-1}(x_t)
            sim.context.setParameter('lambda', lambdas[idx_lamb-1])
            u_before = sim.context.getState(getEnergy=True).getPotentialEnergy()
            # add to accumulated work
            w += (u_now - u_before).value_in_unit(unit.kilojoule_per_mole)

        ws.append(w)
    return np.array(ws) * unit.kilojoule_per_mole

In [None]:
# NEQ
switching_length = 5_001
nr_of_switches = 500

lambs = np.linspace(0,1,switching_length)
ws_from_mm_to_qml = perform_switching(lambs, samples=mm_samples,nr_of_switches=nr_of_switches)
pickle.dump(ws_from_mm_to_qml, open(f'data_for_neq/neq_ws_from_mm_to_qml_{name}.pickle', 'wb+'))
lambs = np.linspace(1,0,switching_length)
ws_from_qml_to_mm = perform_switching(lambs, samples=qml_samples,nr_of_switches=nr_of_switches)
pickle.dump(ws_from_mm_to_qml, open(f'data_for_neq/neq_ws_from_qml_to_mm_{name}.pickle', 'wb+'))

In [None]:
print(f"Crooks' equation: {BAR(ws_from_mm_to_qml, ws_from_qml_to_mm)}")
print(f"Jarzynski's equation: {EXP(ws_from_mm_to_qml)}")

In [None]:
# instantenious swichting (FEP)
switching_length = 2
nr_of_switches = 500

lambs = np.linspace(0,1,switching_length)
ws_from_mm_to_qml = perform_switching(lambs, samples=mm_samples,nr_of_switches=nr_of_switches)
lambs = np.linspace(1,0,switching_length)
ws_from_qml_to_mm = perform_switching(lambs, samples=qml_samples,nr_of_switches=nr_of_switches)
print(f'FEP: From MM to QML: {EXP(ws_from_mm_to_qml)}')
print(f'FEP: From MM to QML: {EXP(ws_from_qml_to_mm)}')
print(f"BAR: {BAR(ws_from_mm_to_qml, ws_from_qml_to_mm)}")

In [None]:
plt.hist(ws_from_mm_to_qml - np.average(ws_from_mm_to_qml))
plt.hist(ws_from_qml_to_mm + np.average(ws_from_qml_to_mm))
plt.show()
