forked from ACEsuit/mace
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mixed_system.py
318 lines (276 loc) · 11 KB
/
mixed_system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import sys
from ase.io import read
import torch
import time
from argparse import ArgumentParser
import numpy as np
import sys
from rdkit import Chem
from ase import Atoms
from openmm.openmm import System
from openmm import unit
from openmm import Platform, LangevinMiddleIntegrator
from openmmtools.integrators import AlchemicalNonequilibriumLangevinIntegrator
from openmm.app import (
Simulation,
Topology,
StateDataReporter,
ForceField,
PDBReporter,
PDBFile,
HBonds,
Modeller,
PME,
)
from typing import List, Optional, Tuple
from openmm.unit import nanometer, nanometers, molar, angstrom
from openmm.unit import kelvin, picosecond, femtosecond, kilojoule_per_mole
from openff.toolkit.topology import Molecule
from openmmforcefields.generators import SMIRNOFFTemplateGenerator
from mace.calculators.openmm import MacePotentialImplFactory
from openmmml import MLPotential
from openmmtools.openmm_torch.repex import (
MixedSystemConstructor,
RepexConstructor,
get_atoms_from_resname,
)
from tempfile import mkstemp
import os
import logging
def get_xyz_from_mol(mol):
xyz = np.zeros((mol.GetNumAtoms(), 3))
conf = mol.GetConformer()
for i in range(conf.GetNumAtoms()):
position = conf.GetAtomPosition(i)
xyz[i, 0] = position.x
xyz[i, 1] = position.y
xyz[i, 2] = position.z
return xyz
MLPotential.registerImplFactory("mace", MacePotentialImplFactory())
torch.set_default_dtype(torch.float64)
# platform = Platform.getPlatformByName("CUDA")
# platform.setPropertyDefaultValue("DeterministicForces", "true")
logger = logging.getLogger("INFO")
class MixedSystem:
def __init__(
self,
file: str,
smiles: str,
model_path: str,
forcefields: List[str],
resname: str,
padding: float,
ionicStrength: float,
nonbondedCutoff: float,
potential: str,
temperature: float,
repex_storage_path: str,
friction_coeff: float = 1.0,
timestep: float = 1,
) -> None:
self.forcefields = forcefields
self.padding = padding
self.ionicStrength = ionicStrength
self.nonbondedCutoff = nonbondedCutoff
self.resname = resname
self.potential = potential
self.temperature = temperature
self.friction_coeff = friction_coeff / picosecond
self.timestep = timestep * femtosecond
self.repex_storage_path = repex_storage_path
self.mixed_system, self.modeller = self.create_mixed_system(
file=file, smiles=smiles, model_path=model_path
)
def initialize_mm_forcefield(self, molecule: Optional[Molecule] = None):
forcefield = ForceField(*self.forcefields)
if molecule is not None:
logging.info("registering smirnoff template generator")
smirnoff = SMIRNOFFTemplateGenerator(molecules=molecule)
forcefield.registerTemplateGenerator(smirnoff.generator)
return forcefield
def initialize_ase_atoms_from_smiles(self, smiles: str) -> Tuple[Atoms, Molecule]:
molecule = Molecule.from_smiles(smiles)
_, tmpfile = mkstemp(suffix="xyz")
molecule._to_xyz_file(tmpfile)
atoms = read(tmpfile)
os.remove(tmpfile)
return atoms, molecule
def create_mixed_system(
self,
file: str,
model_path: str,
smiles: str = None,
pure_ml_system: bool = False,
) -> Tuple[System, Modeller]:
"""Creates the mixed system from a purely mm system
:param str file: input pdb file
:param str smiles: smiles of the small molecule, only required when passed as part of the complex
:param str model_path: path to the mace model
:return Tuple[System, Modeller]: return mixed system and the modeller for topology + position access by downstream methods
"""
# initialize the ase atoms for MACE
atoms, molecule = self.initialize_ase_atoms_from_smiles(smiles)
# Handle a complex, passed as a pdb file
if file.endswith(".pdb"):
turn_off_constraints = False
input_file = PDBFile(file)
# if pure_ml_system specified, we just need to parse the input file
if not pure_ml_system:
modeller = Modeller(input_file.topology, input_file.positions)
# Handle a ligand, passed as an sdf, override the Molecule initialized from smiles
elif file.endswith(".sdf"):
turn_off_constraints = True
molecule = Molecule.from_file(file)
# Hold positions in nanometers
positions = get_xyz_from_mol(molecule.to_rdkit()) / 10
modeller = Modeller(molecule.to_topology().to_openmm(), positions)
if pure_ml_system:
# we have the input_file, create the system directly from the mace potential
modeller = None
ml_potential = MLPotential("mace")
ml_system = ml_potential.createSystem(
input_file.getTopology(), atoms_obj=atoms, filename=model_path
)
return ml_system, modeller
else:
forcefield = self.initialize_mm_forcefield(molecule)
modeller.addSolvent(
forcefield,
padding=self.padding * nanometers,
ionicStrength=self.ionicStrength * molar,
neutralize=True,
)
omm_box_vecs = modeller.topology.getPeriodicBoxVectors()
atoms.set_cell(
[
omm_box_vecs[0][0].value_in_unit(angstrom),
omm_box_vecs[1][1].value_in_unit(angstrom),
omm_box_vecs[2][2].value_in_unit(angstrom),
]
)
mm_system = forcefield.createSystem(
modeller.topology,
nonbondedMethod=PME,
nonbondedCutoff=self.nonbondedCutoff * nanometer,
# TODO: why are the constarints causing us issues
# Empirically it looks like we avoid integrator issues with the MCMC if we have constraints on for the ligand, and off for the protein
constraints=HBonds if not turn_off_constraints else None,
# constraints=HBonds,
# rigidWater=True,
)
mixed_system = MixedSystemConstructor(
system=mm_system,
topology=modeller.topology,
nnpify_resname=self.resname,
nnp_potential=self.potential,
atoms_obj=atoms,
filename=model_path,
).mixed_system
return mixed_system, modeller
# def create_pure_ml_system(self, file: str, model_path: str) -> System:
# """Calls the createSystem from the ml potential directly, instead of the mixed system. Useful for materials systems etc where openMM cannot generate parameters in the first place
# """
# input_file = PDBFile(file)
# self.
def run_mixed_md(self, steps: int, interval: int, output_file: str) -> float:
"""Runs plain MD on the mixed system, writes a pdb trajectory
:param int steps: number of steps to run the simulation for
:param int interval: reportInterval attached to reporters
"""
integrator = LangevinMiddleIntegrator(
self.temperature, self.friction_coeff, self.timestep
)
simulation = Simulation(
self.modeller.topology,
self.mixed_system,
integrator,
platformProperties={"Precision": "Mixed"},
)
simulation.context.setPositions(self.modeller.getPositions())
logging.info("Minimising energy")
simulation.minimizeEnergy()
reporter = StateDataReporter(
file=sys.stdout,
reportInterval=interval,
step=True,
time=True,
potentialEnergy=True,
temperature=True,
speed=True,
)
simulation.reporters.append(reporter)
simulation.reporters.append(
PDBReporter(
file=output_file,
reportInterval=interval,
)
)
simulation.step(steps)
state = simulation.context.getState(getEnergy=True)
energy_2 = state.getPotentialEnergy().value_in_unit(kilojoule_per_mole)
return energy_2
def run_replex_equilibrium_fep(self, replicas: int, restart: bool) -> None:
sampler = RepexConstructor(
mixed_system=self.mixed_system,
initial_positions=self.modeller.getPositions(),
# repex_storage_file="./out_complex.nc",
temperature=self.temperature * kelvin,
n_states=replicas,
restart=restart,
storage_kwargs={
"storage": self.repex_storage_path,
"checkpoint_interval": 100,
"analysis_particle_indices": get_atoms_from_resname(
topology=self.modeller.topology, resname=self.resname
),
},
).sampler
if not restart:
logging.info("Minimizing system...")
t1 = time.time()
sampler.minimize()
logging.info(f"Minimised system in {time.time() - t1} seconds")
sampler.run()
def run_neq_switching(self, steps: int, interval: int) -> float:
"""Compute the protocol work performed by switching from the MM description to the MM/ML through lambda_interpolate
:param int steps: number of steps in non-equilibrium switching simulation
:param int interval: reporterInterval
:return float: protocol work from the integrator
"""
alchemical_functions = {"lambda_interpolate": "lambda"}
integrator = AlchemicalNonequilibriumLangevinIntegrator(
alchemical_functions=alchemical_functions,
nsteps_neq=steps,
temperature=self.temperature,
collision_rate=self.friction_coeff,
timestep=self.timestep,
)
simulation = Simulation(
self.modeller.topology,
self.mixed_system,
integrator,
platformProperties={"Precision": "Mixed"},
)
simulation.context.setPositions(self.modeller.getPositions())
logging.info("Minimising energy")
simulation.minimizeEnergy()
reporter = StateDataReporter(
file=sys.stdout,
reportInterval=interval,
step=True,
time=True,
potentialEnergy=True,
temperature=True,
speed=True,
totalSteps=steps,
remainingTime=True,
)
simulation.reporters.append(reporter)
# Append the snapshots to the pdb file
simulation.reporters.append(
PDBReporter("output_frames.pdb", steps / 80, enforcePeriodicBox=True)
)
# We need to take the final state
simulation.step(steps)
protocol_work = (integrator.get_protocol_work(dimensionless=True),)
return protocol_work