# In this notebook, we define a simple Python-based Molecular Dynamics engine which will allow for the use of ML-based Ab-Initio Molecular Dynamics Analogues.

In [86]:
import numpy as np
import os
import sys
import matplotlib.pyplot as plt
import numpy.random as rand
import numpy.linalg as la

## Change this according to your local environment

In [87]:
sys.path.append('/Users/steven/Documents/Research/Projects/MLED/ML-electron-density/util')

First, we define the engine which will power the whole simulation.

The engine object needs to keep track of every atom's individual position, and will propagate the system forward by computing the forces acting on each atom and then propagating forward by a time step.

In [88]:
class MD_engine():
    
    def __init__(self,input_atoms=[],cell=np.eye(3), dx=.1,verbosity=1,model='SHO',store_trajectory=True,
                espresso_config=None,thermo_config=None):
        """
        Initialize the features of the system, which include:
        atoms: Atoms in the unit cell
        cell:  Vectors which define the dimensions of the unit cell
        dx:    Perturbation distance used to evaluate the gradient in potential energy
        model: The energy model used
        time:  Internal timer for simulations
        """
        self.verbosity = verbosity

        self.atoms = []  # List of atoms 
        for atom in input_atoms:
            if self.verbosity==5:
                print('Loading in',atom)
            self.add_atom(atom)
        
        self.dx = dx
        self.cell = np.array(cell)
        
        self.model = model

        self.time = 0.
        
        self.store_trajectory=store_trajectory
        
        if store_trajectory:
            self.trajs=[]
            for n in range(len(self.atoms)):
                self.trajs.append([])
                
        if espresso_config is not None:
            self.espresso_config = espresso_config
        if thermo_config is not None:
            self.thermo_config = thermo_config
        
    def add_atom(self,atom):
        """
        Helper function which adds atoms to the simulation
        """
        if self.verbosity==4:
            print("Adding atom",atom)
        self.atoms.append(atom)    
    

    def get_config_energy(self,atoms):
        """
        The positions of the atoms are passed to a given model of choice,
        in which the positions are converted into the configuration which is
        appropriate to be parsed by the model.

        For instance, the positions may need to be converted into 
        atom-centered symmetry functions for the Gaussian Process model. By
        passing in the positions, the necessary mapping is performed, and then
        the appropriate model is called.
        """ 
        
        positions=[atom.position for atom in self.atoms]
        
        if self.model=='SHO':
            return SHO_energy(atoms)
    
        #######
        # Below are 'hooks' where we will later plug in to our machine learning models
        #######
        if self.model=='GP':
            config = GP_config(positions)
            return GP_energy(config)

        if self.model=="KRR":
            config = KRR_config(positions)
            return GP_energy(config)
        
        
        if self.model=='LJ':
            return LJ_energy(atoms)
    
    def update_atom_forces(self):
        """
        Perturbs the atoms by a small amount dx in each direction
        and models the gradient (and thus the force) 
        via a finite-difference approximation.
        """
        
        if self.model=='AIMD':
            results=run_espresso(self.atoms,self.cell)
            force_list = results['forces']
            for n  in range(len(force_list)):
                self.atoms[n].force=list(results['forces'][n])
            return
        
        E0 = self.get_config_energy(self.atoms)
        atoms = self.atoms
        for atom in atoms:
            for coord in range(3):
                atom.position[coord] += self.dx
                Eplus = self.get_config_energy(atoms)
                
                atom.position[coord] -= 2*self.dx
                Eminus = self.get_config_energy(atoms)
                atom.position[coord] += self.dx
            
                atom.force[coord] = -first_derivative_2nd(Eminus,Eplus,self.dx)
                if self.verbosity==5:
                    print("Just set force on atom's coordinate",coord," to be ", -first_derivative_2nd(Eminus,Eplus,self.dx))
            
            atom.apply_constraint()
        
        
    def take_timestep(self,dt,method='Verlet'):
        
        temp_num=0.
        #####
        # Bad first order Euler method
        # Undesirable because accumulates error at order dt**2 which 
        # adds up quickly for long simulations
        #####
        if method=='FO_Euler':
            for atom in atoms:
                for coord in range(3):
                    atom.prev_pos[coord] = np.copy(atom.position[coord])
                    atom.position[coord] += atom.velocity[coord]*dt+ atom.force[coord]*dt**2/atom.mass
                    atom.velocity[coord] += atom.force[coord]*dt/atom.mass
                    
        ######
        # Superior Verlet integration
        # Citation:  https://en.wikipedia.org/wiki/Verlet_integration
        ######
        elif method=='Verlet':
            for atom in self.atoms:
                for coord in range(3):
                    temp_num = np.copy(atom.position[coord])
                    atom.position[coord] = 2*atom.position[coord] - atom.prev_pos[coord] + atom.force[coord]*dt**2/atom.mass
                    atom.velocity[coord] += atom.force[coord]*dt/atom.mass     
                    atom.prev_pos[coord] = np.copy(temp_num)
                    
                    

        
        
        if self.store_trajectory:
            for n in range(len(self.atoms)):
                self.trajs[n].append(np.copy(self.atoms[n].position))
        self.time+=dt
    
    def run(self,tf,dt):
        """
        Handles timestepping; at each step, calculates the force and then
        advances via the take_timestep method.
        
        """
        if self.time==0:
            self.update_atom_forces()
            self.take_timestep(dt,method='FO_Euler')
            self.assert_boundary_conditions()
        while self.time<tf:
            self.update_atom_forces()
            self.take_timestep(dt)
            self.assert_boundary_conditions()

            

            
            if self.verbosity>=3:
                self.print_positions()
            
            
            if self.model=='GP':
            
                valid = self.gauge_uncertainty(self.atoms,self.threshold)
                if valid:                    
                    continue
                else:
                    take_timestep(-dt)
                    self.time -=dt
                    call_dft()
                    train_model()
                

        

    def gauge_uncertainty(self,positions,threshold):
        """
        For later implementation with the Gaussian Process model.
        Will check to see if the uncertainty of the model's prediction of the energy
        within a given configuration is within an acceptable bound given by threshold.
        
        If it is unacceptable, the current configuration is exported into a Quantum ESPRESSO run,
        which is then added into the ML dataset and the model is re-trained.
        """
        config=GP_config(positions)
        sigma=get_config_uncertainty(config)
        if sigma>threshold:
            return False
        else:
            return True
        
    def assert_boundary_conditions(self):
        """
        We seek to have our atoms be entirely within the unit cell, that is,
        for bravais lattice vectors a1 a2 and a3, we want our atoms to be at positions
        
         x= a a1 + b a2 + c a3
         where a, b, c in [0,1)
         
         So in order to impose this condition, we invert the matrix equation such that
        
          [a11 a12 a13] [a] = [x1]
          [a21 a22 a23] [b] = [x2]
          [a31 a32 a33] [c] = [x3]
          
          And if we find that a, b, c not in [0,1) we modulo by 1.
        """
        a1 = self.cell[0]
        a2 = self.cell[1]
        a3 = self.cell[2]
        print(a1)
        print(a2)
        print(a3)
        for atom in self.atoms:
            
            coords= np.dot(la.inv(self.cell),atom.position)
            if self.verbosity==5:
                print('Atom positions before BC check:',atom.position)
                print('Resultant coords:',coords)
            
            if any([coord>1.0 for coord in coords]) or any([coord<0.0 for coord in coords]):
                if self.verbosity==4: print('Atom positions before BC check:',atom.position)

                atom.position= a1 * (coords[0]%1) + a2 * (coords[1]%1) + a3 * (coords[2]%1 )
                if self.verbosity==4: print("BC updated position:",atom.position)
            
    def print_positions(self,forces=True):
        
        print("T=",self.time)
        for n in range(len(self.atoms)):
            
            pos=self.atoms[n].position
            force=self.atoms[n].force
            if forces:
                print('Atom %d:'%n,pos[0],pos[1],pos[2], ' Force:', force[0],force[1],force[2])
            else:
                print('Atom %d:'%n,pos[0],pos[1],pos[2])


## Set up SHO configuration and Energy functions

Note that this SHO energy is a very simple model which only has the atoms oscillating about their initial positions; there is no interactions between the atoms whatsoever. This is merely built as a test of the MD engine to ensure that it is running and propagating the atoms correctly.

In [89]:
def SHO_config(atoms):
    return [atom.position for atom in atoms]

def SHO_energy(atoms,kx=10.0,ky=1.0,kz=1.0):
    
    if type(kx) is list :
        if len(kx)!=len(atoms):
            print("Warning! Number of Kx supplied is not equal to number of atoms")
    elif type(kx) is float:
        kx=[kx]*len(atoms)
    
    if type(ky) is list:
        if len(ky)!=len(atoms):
            print("Warning! Number of Ky supplied is not equal to number of atoms")
    elif type(ky) is float:
        ky=[ky]*len(atoms)
     
    
    if type(kz) is list:
        if len(kz)!=len(atoms):
            print("Warning! Number of Kz supplied is not equal to number of atoms")
    elif type(kz) is float:
        kz=[kz]*len(atoms)
    
    init_pos  = [atom.initial_pos for atom in atoms]
    positions = [atom.position for atom in atoms]
    
    K = [kx,ky,kz]
    
    E=0
    for m in range(len(atoms)):
        for n in range(3):
            E+= K[n][m] * (init_pos[m][n]- positions[m][n])**2
            
            
    return E
            
    


## Set up LJ Energy Functions

In [90]:
def LJ_energy(atoms, rm=.5, eps=10., debug=False):
    E=0.
    for at in atoms:
        for at2 in atoms:
            if at.fingerprint!=at2.fingerprint:
                
                disp = la.norm(at.position-at2.position)
                if debug:
                    print('Current LJ disp between atoms is:',disp)
                
                E+= .5 * eps * ( (rm/disp)**12 - 2* (rm/disp)**6)
                
    return E

## Define the 'Atom' class

A list of atom objects are contained within the engine and are iterated over within every timestep.

In [94]:
class Atom():
    mass_dict = {'H':1.0, "Al":26.981539, "Si": 28.0855}
    
    def __init__(self,position=[0.,0.,0.], velocity=[0.,0.,0.],force=[0.,0.,0.],initial_pos=[0,0,0],mass=None,
                element='',constraint=[False,False,False]):

        self.position = np.array(position)
        self.prev_pos = np.array(self.position)
        self.velocity = np.array(velocity)
        self.force   = np.array(force)
        self.element  = element
        
        self.constraint = constraint
        
        self.fingerprint = rand.rand()  # This is how I tell atoms apart. Easier than indexing them manually...
                                        

        if self.element in self.mass_dict.keys() and mass==None:   
            self.mass = self.mass_dict[self.element]
        elif mass!=None:
            self.mass = mass
        elif mass==None:
            self.mass = 1.0


        #############
        ## Used for testing with a simple harmonic oscillator potential
        ## in which the force is merely the displacement squared from initial position
        ############
        self.initial_pos = np.array(initial_pos)
        
        self.parameters={'position': self.position,
                         'velocity': self.velocity,
                         'force'  : self.force,
                         'mass'    : self.mass,
                         'element' : self.element,
                         'constraint' :self.constraint,
                         'initial_pos':self.initial_pos}
        

    
    def __str__(self):
        return str(self.parameters)
    
    def get_position(self):
        return self.position
    def get_velocity(self):
        return self.velocity
    def get_force(self):
        return self.force

    def apply_constraint(self):
        for n in range(3):
            if self.constraint[n]:
                self.velocity[n] = 0.
                self.force[n]   = 0.
    

# Quantum Cafe: Where we brew ESPRESSO

In the ESPRESSO config object, tune the parameters according to your system. You could use environment variables like so (defined in your home directory's .bash_profile) or just hard-code them into the notebook below.

This is going to be important for letting the Gaussian Process model interface with ESPRESSO, in that positions will be printed corresponding to the 

In [92]:
from project_pwscf import *
from objects import *
from project_objects import *
from ase import Atoms

class ESPRESSO_config(object):
    workdir   = os.environ['PROJDIR']+'/AIMD'
    run_pwscf = os.environ['PWSCF_COMMAND']
    pseudopotentials={"H":PseudoPotential(path=os.environ["ESPRESSO_PSEUDO"], ptype='uspp', element='H',
                                            functional='GGA', name='H.pbe-kjpaw.UPF'),
                      "Si": PseudoPotential(path=os.environ["ESPRESSO_PSEUDO"], ptype='uspp', element='Si',
                                            functional='GGA', name='Si.pz-vbc.UPF'),
                      'Al': os.environ['ESPRESSO_PSEUDO']+'/Al.pz-vbc.UPF'}
    
    molecule=True
    
    nk=8
    
    
    
config1 = ESPRESSO_config()
config2 = ESPRESSO_config()
config2.molecule=False
print(config1.molecule)
    
def run_espresso(atoms,cell,qe_config=config1,ecut=90,molecule=True,stepcount=0):

    
    pseudopots={}
    elements = [atom.element for atom in atoms]
    for element in elements:
        pseudopots[element] = qe_config.pseudopotentials[element]
    
    ase_struc = Atoms(symbols= [atom.element for atom in atoms],
                     positions=[atom.position for atom in atoms],
                     cell = cell,
                     pbc=[0,0,0] if molecule else [1,1,1])
    
    struc = Struc(ase2struc(ase_struc))


    if qe_config.molecule:
        kpts = Kpoints(gridsize=[1, 1, 1], option='gamma', offset=False)
    else:
        nk=qe_config.nk
        kpts = Kpoints(gridsize=[nk,nk,nk], option='automatic',offset=False)
        
    dirname = 'step'+str(stepcount)
    runpath = Dir(path=os.path.join(os.environ['PROJDIR'], "AIMD", dirname))
    input_params = PWscf_inparam({
        'CONTROL': {
                   'prefix': 'AIMD',
            'calculation': 'scf',
            'pseudo_dir': os.environ['ESPRESSO_PSEUDO'],
            'outdir': runpath.path,
#            'wfcdir': runpath.path,
            'disk_io': 'low',
            'tprnfor' : True,
            'wf_collect' :False
        },
        'SYSTEM': {
            'ecutwfc': ecut,
            'ecutrho': ecut * 4,
#			'nspin': 4 if 'rel' in potname else 1,
#			'occupations': 'smearing',
#			'smearing': 'mp',
#            'degauss': 0.02,
            'noinv': False
            #'lspinorb':True if 'rel' in potname else False,
             },
        'ELECTRONS': {
            'diagonalization': 'david',
            'mixing_beta': 0.5,
            'conv_thr': 1e-7
        },
        'IONS': {},
        'CELL': {},
        })

    output_file = run_qe_pwscf(runpath=runpath, struc=struc,  pseudopots=pseudopots,
                                   params=input_params, kpoints=kpts,
                                    ncpu=1)
    output = parse_qe_pwscf_output(outfile=output_file)

    return output
    
        
        
    
    

True


In [93]:
Hatom3 = Atom(position=[0,0,0], element='H')
Hatom4 = Atom(position=[.5,0,0], element='H')

atoms2  = [Hatom3,Hatom4]
#eng = MD_engine(cell=[[2,0,0],[0,2.0,0],[0,0,1.5]],input_atoms=atoms2,verbosity=0,model='LJ',dx=.001)

    
results=run_espresso(atoms2,cell=[[2,0,0],[0,5,0],[0,0,1.5]],ecut=90,molecule=True,qe_config=config1)

NameError: name 'mass_dict' is not defined

In [None]:
print(results['forces'])

In [None]:
def first_derivative_2nd(fm,fp,h):
    """
    Computes the second-order accurate finite difference form of the first derivative
    which is (  fp/2 - fm/2)/(h)
    as seen on Wikipedia: https://en.wikipedia.org/wiki/Finite_difference_coefficient
    """
    if h==0:
        print("Warning... Trying to divide by zero. Derivative will diverge.")
    return (fp-fm)/float(2*h)

def first_derivative_4th(fmm,fm,fp,fpp,h):
    """
    Computes the fourth-order accurate finite difference form of the first derivative
    which is (fmm/12  - 2 fm /3 + 2 fp /3 - fpp /12)/h
    as seen on Wikipedia: https://en.wikipedia.org/wiki/Finite_difference_coefficient
    """
    
    if h==0:
        print("Warning... Trying to divide by zero. Derivative will diverge.")
        
    return (fmm/12. - 2*fm/3. +  2*fp/3. - fpp/12.)/float(h)



# Testing Ground

In [None]:

Hatom1 = Atom(position=[.26,0,0],initial_pos=[.25,0,0], element='H')
Hatom2 = Atom(position=[.75,0,0], element='H')

atoms  = [Hatom1,Hatom2]
#atoms = [Hatom1]
#eng = MD_engine(cell=[[1,0,0],[0,1,0],[0,0,1]],input_atoms=atoms,verbosity=0,model='LJ',dx=.001)


#eng.run(.5,.001)

#print([x for x in eng.trajs[0]])
#plt.plot(np.linspace(10,.1,1000/2),[x[0] for x in eng.trajs[0]])

#plt.show()

In [None]:
Hatom3 = Atom(position=[5,0,8], element='H')
atoms2  = [Hatom3]
#eng = MD_engine(cell=[[1,.5,0],[1,-.5,0],[0,0,1.5]],input_atoms=atoms2,verbosity=0,model='LJ',dx=.001)

#eng.assert_boundary_conditions()
#print(eng.atoms)

In [None]:
Hatom1 = Atom(position=[.24,0,0], element='H')
Hatom2 = Atom(position=[.75,0,0], element='H')

atoms  = [Hatom1,Hatom2]
#atoms = [Hatom1]
#eng = MD_engine(cell=[[5,0,0],[0,1,0],[0,0,1]],input_atoms=atoms,verbosity=0,model='AIMD',dx=.001)
#eng.run(.5,.001)




# Silicon Test
(Some code borrowed from Jon Vandermause)

In [None]:
# define scf parameters
pref = 'si'
pseudo_dir = '/n/home03/jonpvandermause/qe-6.2.1/pseudo'
pseudo_dir = '/Users/steven/Documents/Schoolwork/CDMAT275/ESPRESSO/qe-6.0'
outdir='/n/home03/jonpvandermause/Cluster/Si_Supercell_SCF'
alat = 5.431 # lattice parameter of si in angstrom
ecut = 18.0 # plane wave cutoff energy
nk = 8 # size of kpoint grid
dim = 4 # size of supercell
nat = 2 * dim**3


pw_loc = '/n/home03/jonpvandermause/qe-6.2.1/bin/pw.x'
in_name = 'si.scf.in'
out_name = 'si.scf.out'
sh_name = 'Si_Super.sh'
partition = 'kaxiras'
memory = 1000
email = 'torrisi@g.harvard.edu'


def make_struc_super(alat, dim):
    unitcell = crystal('Si', [(0, 0, 0)], spacegroup=227, \
                       cellpar=[alat, alat, alat, 90, 90, 90], \
                      primitive_cell = True)
    multiplier = np.identity(3) * dim
    ase_supercell = make_supercell(unitcell, multiplier)
    structure = Struc(ase2struc(ase_supercell))
    return structure

In [97]:
from ase.spacegroup import crystal
from ase.build import *

Sicrystal=make_struc_super(alat,dim=2)
cell=Sicrystal.cell
SiAtoms=[]
for at in Sicrystal.positions:
    SiAtoms.append(Atom(element='Si',position=list(np.array(at[1])*1.02)))
    
print(len(SiAtoms))



config2= ESPRESSO_config()

config2.molecule=False
config2.nk=4

eng3 = MD_engine(cell=Sicrystal.cell,input_atoms=SiAtoms,verbosity=3,model='AIMD',dx=.001,espresso_config=config2)

t0=5
dt=1
eng3.run(t0,dt)


16


KeyboardInterrupt: 

In [None]:
length=len([x[0] for x in eng3.trajs[0]])
plt.plot(np.linspace(t0,dt,length),[x[0] for x in eng3.trajs[0]])
