Molecular dynamics simulations based on WGDP

In [1]:
import ase
from ase.io import read,write
from ase.io import Trajectory
from ase.optimize import BFGS
from ase.md import Langevin
from ase.md.nose_hoover_chain import NoseHooverChainNVT
from ase.md.nvtberendsen import NVTBerendsen
from ase.md.andersen import Andersen
from ase.md.bussi import Bussi
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase import units
from ase.geometry import wrap_positions
import os

from model import DPMODEL

import torch

import pandas as pd

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
ase.__version__

'3.24.0'

In [3]:
DTYPE = torch.float32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
class Calculator(ase.calculators.calculator.Calculator):
    """DPMODEL calculator for ASE

    """
    implemented_properties = ['energy', 'forces', 'stress', 'free_energy'] #

    def __init__(self, model, species, device):
        super(Calculator, self).__init__()
        self.model = model
        self.species = species #element dict for example {'H':0,'O':1}
        self.device = device
    
    def calculate(self, atoms=None, properties=['energy', 'forces'],
                 system_changes=ase.calculators.calculator.all_changes):
        super(Calculator, self).calculate(atoms, properties, system_changes)
        boxs = torch.tensor((self.atoms.get_cell(complete=True)).reshape(3,3),dtype=DTYPE)
        zi_list = [self.species[sym] for sym in self.atoms.get_chemical_symbols()]
        numbers = torch.tensor([zi_list],dtype=torch.long)
        coords = torch.tensor((self.atoms.get_positions()).reshape(len(self.atoms),3),dtype=DTYPE)

        lattice = boxs
        inv_lattice = torch.linalg.pinv(lattice.T).T
        frac_coords = coords @ inv_lattice
        frac_coords = frac_coords % 1.0
        coords = frac_coords @ lattice.T

        boxs = boxs.reshape(1,9)
        coords = coords.reshape(1,-1)

        boxs = boxs.to(self.device)
        numbers = numbers.to(self.device)
        coords = coords.to(self.device)

        self.model.eval()

        boxs.requires_grad = False
        numbers.requires_grad = False
        coords.requires_grad = True

        self.model.zero_grad()
        predict_energy = self.model(boxs, numbers, coords)

        self.results['energy'] =  predict_energy.detach().squeeze(0).item()
        self.results['free_energy'] = predict_energy.detach().squeeze(0).item()

        if 'forces' in properties:
            dE_dxyz = torch.autograd.grad(
                predict_energy,
                coords,
                grad_outputs=torch.ones_like(predict_energy,dtype=DTYPE,device=self.device),
                retain_graph=True,
                create_graph=True,
            )[0]
            predict_force = -dE_dxyz
            self.results['forces'] = predict_force.detach().reshape(-1,3).to('cpu').numpy()

        if 'stress' in properties:
            volume = self.atoms.get_volume()
            scaling = torch.eye(3, requires_grad=True, dtype=DTYPE, device=self.device)
            stress = torch.autograd.grad(predict_energy.squeeze(), scaling)[0] / volume
            self.results['stress'] = stress.cpu().numpy()
        
        coords.requires_grad_(False)
        self.model.zero_grad()

In [5]:
def setup_md(atoms, dynamics_name, timestep, temperature, log_interval=1, trajectory_interval=1, MaxwellBoltzmann=True, logfile='md_log.txt', trajectory='md.traj',**kwargs):
    """
    Set up molecular dynamics simulations and save energy information in real time
    
    Parameters:
    atoms: ASE's Atoms object
    timestep: Time step (fs)
    temperature: Temperature (K)
    logfile: Path to the text file for saving information
    """
    # initialize molecular dynamics
    if MaxwellBoltzmann == True:
        MaxwellBoltzmannDistribution(atoms, temperature_K=temperature)

    dynamics_name = dynamics_name.lower()
    
    if dynamics_name == 'langevin':
        # Langevin
        friction = kwargs.get('friction', 0.01)
        dyn = Langevin(atoms, timestep=timestep*units.fs, temperature_K=temperature, friction=friction / units.fs)
    elif dynamics_name == 'nosehoover':
        # Nose-Hoover
        tdamp = kwargs.get("tdamp",100)
        tchain = kwargs.get("tchain",3)
        tloop = kwargs.get("tloop",2)
        dyn =  NoseHooverChainNVT(atoms, timestep=timestep*units.fs, temperature_K=temperature, tdamp=tdamp* units.fs, tchain=tchain, tloop=tloop)
    elif dynamics_name == 'nvtberendsen':
        # NVT Berendsen
        taut = kwargs.get('taut', 100)  # (fs)
        dyn =  NVTBerendsen(atoms, timestep=timestep*units.fs, temperature_K=temperature, taut=taut * units.fs)
    elif dynamics_name == 'andersen':
        # Andersen
        andersen_prob = kwargs.get('andersen_prob', 0.1)
        dyn =  Andersen(atoms, timestep=timestep*units.fs, temperature_K=temperature, andersen_prob=andersen_prob)
    elif dynamics_name == 'bussi':
        taut = kwargs.get('taut', 100)  # (fs)
        dyn = Bussi(atoms, timestep=timestep*units.fs, temperature_K=temperature, taut=taut * units.fs)
    else:
        raise ValueError(
            f"Unknown dynamic name: {dynamics_name}. "
            f"Supported options: ['langevin', 'nosehoover', 'nvtberendsen', 'andersen','bussi']")

    traj = Trajectory(trajectory, 'w', atoms)
    dyn.attach(traj.write, interval=trajectory_interval)
    
    # Initialize log file
    with open(logfile, 'w') as f:
        header = "      Step   Time[fs]      Pot[eV]  Kin[eV]      Tot[eV]   Temp[K]    Vol[Å³]"
        f.write(header + "\n")
    
    # Functions for getting dynamic information
    def get_md_info():
        step = dyn.get_number_of_steps()
        time_passed = step * timestep
        epot = atoms.get_potential_energy()
        ekin = atoms.get_kinetic_energy()
        temp = ekin / (1.5 * len(atoms) * units.kB)
        vol = atoms.get_volume()
        return step, time_passed, epot, ekin, temp, vol
    
    # Screen print function
    def print_md_info():
        step, time_passed, epot, ekin, temp, vol = get_md_info()
        print(f"{step:10d} {time_passed:10.2f} {epot:12.3f} {ekin:8.3f} "
              f"{epot+ekin:12.3f} {temp:9.2f} {vol:10.2f}")
    
    # Function for recording documents
    def log_md_info():
        step, time_passed, epot, ekin, temp, vol = get_md_info()
        with open(logfile, 'a') as f:
            f.write(f"{step:10d} {time_passed:10.2f} {epot:12.3f} {ekin:8.3f} "
              f"{epot+ekin:12.3f} {temp:9.2f} {vol:10.2f}\n")
    
    # Attach to dynamics
    dyn.attach(print_md_info, interval=log_interval)
    dyn.attach(log_md_info, interval=log_interval)
    
    return dyn


In [6]:
work_path = r"F:\DP_PROJECT_20241201\Tmp\md\nh3_md"

In [7]:
init_configuration_path = os.path.join(work_path,"init-configuration.xyz")
checkpoint_path = os.path.join(work_path,"checkpoint.pth")

In [8]:
#load model

atoms = read(init_configuration_path,format='extxyz')
# atoms.wrap(pretty_translation=True)

model = DPMODEL(symbol_features=2,embedding_layers=[15,30],Rcut=6.5,fitting_layers=[120,120,120],device=device)

checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['state_dict'])

atoms.calc = Calculator(model=model,species={"H":0,"N":1},device=device)

In [9]:
# Set Temperature and Time Step
temperature = 298  # Unit: K
timestep = 1.0  # Unit: fs
total_relax_step = 1000  # Unit: fs
total_md_step = 2000   # Unit: fs

log_interval = 1
trajectory_interval = 1

In [10]:
# minimization
# dyn = BFGS(atoms=atoms)
# dyn.run(fmax=0.05)

In [11]:
# from ase.constraints import FixAtoms
# constraint = FixAtoms(indices=[49])
# atoms.set_constraint(constraint)

In [12]:
log_path = os.path.join(work_path,"md_relax_log.txt")
traj_path = os.path.join(work_path,"md_relax_traj.traj")
dyn = setup_md(atoms, dynamics_name="nvtberendsen", taut=200, timestep=timestep, temperature=temperature, log_interval=log_interval, trajectory_interval=trajectory_interval, logfile=log_path, trajectory=traj_path)

In [13]:
print("="*100)
print("      Step   Time[fs]      Pot[eV]  Kin[eV]      Tot[eV]   Temp[K]    Vol[Å³]")
print("="*100)

#md_relax
dyn.run(steps=total_relax_step)

      Step   Time[fs]      Pot[eV]  Kin[eV]      Tot[eV]   Temp[K]    Vol[Å³]
         0       0.00    -7990.666    3.546    -7987.119    274.36    1000.00
         1       1.00    -7990.505    3.509    -7986.996    271.45    1000.00
         2       2.00    -7990.466    3.468    -7986.999    268.28    1000.00
         3       3.00    -7990.492    3.488    -7987.004    269.82    1000.00
         4       4.00    -7990.489    3.496    -7986.994    270.43    1000.00
         5       5.00    -7990.544    3.541    -7987.003    273.97    1000.00
         6       6.00    -7990.549    3.550    -7986.999    274.67    1000.00
         7       7.00    -7990.545    3.559    -7986.987    275.32    1000.00
         8       8.00    -7990.665    3.663    -7987.003    283.34    1000.00
         9       9.00    -7990.682    3.675    -7987.007    284.28    1000.00
        10      10.00    -7990.509    3.522    -7986.987    272.49    1000.00
        11      11.00    -7990.392    3.413    -7986.979    264.

True

In [14]:
log_path = os.path.join(work_path,"md_log.txt")
traj_path = os.path.join(work_path,"md_traj.traj")

dyn = setup_md(atoms, dynamics_name="nosehoover", tdamp=200, tchain=3, tloop=2, timestep=timestep, temperature=temperature, MaxwellBoltzmann=False, log_interval=log_interval, trajectory_interval=trajectory_interval, logfile=log_path, trajectory=traj_path)

In [15]:
print("="*100)
print("      Step   Time[fs]      Pot[eV]  Kin[eV]      Tot[eV]   Temp[K]    Vol[Å³]")
print("="*100)

#md
dyn.run(steps=total_md_step)

      Step   Time[fs]      Pot[eV]  Kin[eV]      Tot[eV]   Temp[K]    Vol[Å³]
         0       0.00    -7990.452    3.714    -7986.738    287.35    1000.00
         1       1.00    -7990.434    3.689    -7986.744    285.41    1000.00
         2       2.00    -7990.406    3.659    -7986.747    283.04    1000.00
         3       3.00    -7990.332    3.589    -7986.743    277.63    1000.00
         4       4.00    -7990.283    3.544    -7986.739    274.19    1000.00
         5       5.00    -7990.341    3.603    -7986.738    278.75    1000.00
         6       6.00    -7990.510    3.767    -7986.743    291.41    1000.00
         7       7.00    -7990.691    3.944    -7986.747    305.13    1000.00
         8       8.00    -7990.798    4.050    -7986.749    313.29    1000.00
         9       9.00    -7990.801    4.055    -7986.746    313.69    1000.00
        10      10.00    -7990.749    4.007    -7986.742    309.97    1000.00
        11      11.00    -7990.705    3.959    -7986.745    306.

True

In [16]:
def wrap_trajectory(input_file, output_file):
    """
    Read the trajectory file and wrap the atomic positions of each frame back into the PBC.
    
    Parameters:
        input_file: Input trajectory filename (supports multiple formats such as .traj, .xyz, .pdb, etc.)
        output_file: Output trajectory filename
    """
    # Read trajectory file
    traj = read(input_file, index=':')
    
    # Process each frame
    wrapped_frames = []
    for i, atoms in enumerate(traj):
        
        cell = atoms.get_cell()
        positions = atoms.get_positions()
        
        wrapped_positions = wrap_positions(positions, cell, pretty_translation=False)
        
        atoms.set_positions(wrapped_positions)
        wrapped_frames.append(atoms)
    
    # Save results
    write(output_file, wrapped_frames, format="extxyz")
    

In [17]:
traj_path = os.path.join(work_path,"md_traj.traj")
md_traj_xyz_save_path = os.path.join(work_path,"md_traj.xyz")
wrap_trajectory(traj_path,md_traj_xyz_save_path)

# Write the trajectory to the extxyz file
trajectory = read(traj_path, index=':')
write(md_traj_xyz_save_path, trajectory, format='extxyz')