In [None]:
import os 
import sys 
import re 
import numpy as np
import matplotlib.pyplot as plt

from ase.io import read, write
from wrappers.orca_wrappers import sf_orca_run, orca_load
from mace.calculators import MACECalculator
from ase.units import fs
from mace.calculators import MACECalculator
from ase.io.trajectory import Trajectory
from ase.units import fs
from ase.md.langevin import Langevin
from ase.md import MDLogger
import pandas as pd
import torch

In [15]:
class Tester:
    def __init__(self, models: list):
        self.models = models
        self.raw_atoms = {}
        self.atoms = []
    def run_orca_energies(self, out_dir, num_states):
        for i in range(1, num_states + 1): # Avoid using Iroot 0 with Spin Flip calculations (Orca 6.0 returns empty Engrad)
            orca_blocks = f'%pal nprocs 64 end \n%tddft \n Iroot {i} Nroots {num_states + 1} \nsf true \nend'
            for name, atom in self.raw_atoms.items():
                calc_dir = f'{out_dir}/state_{i}_{name}'
                if os.path.exists(f'{calc_dir}/orca.engrad'):
                    continue
                sf_orca_run(atom, calc_dir, task = 'Engrad', basis_set = 'def2-TZVPPD', func = 'wB97M-D3BJ', orca_blocks = orca_blocks)

    def load_orca_energies(self, orca_dir):
        paths = os.listdir(orca_dir)
        for path in paths:
            atoms = orca_load(f'{orca_dir}/{path}')

            state = re.search(r'state_([^_]+)', path)
            geometry_step = re.search(r'step_([^_]+)', path)

            atoms.info['state'] = state.group(1)
            atoms.info['step'] = geometry_step.group(1)
            energy = atoms.get_potential_energy()
            atoms.info['orca_energy'] = energy
            self.atoms.append(atoms)
    
    def load_atoms(self, path):
        folders = os.listdir(path)
        for folder in folders:
            full_path = f'{path}/{folder}'
            atoms = read(full_path)
            self.raw_atoms[folder[:-4]] = atoms
        
    
    def run_models(self, device):
        for model in self.models:
            calculator = MACECalculator(model_paths = model, device = device)
            model_name = model.split('_')[5]
            for atom in self.atoms:
                atom.calc = calculator
                energy = atom.get_potential_energy()
                atom.info[f'{model_name}'] = energy
    
    def calculate_rmse(self):
        for atom in self.atoms:
            orca_energy = atom.info['orca_energy']
            for model in self.models:
                model_name = model.split('_')[5]
                model_energy = atom.info[f'{model_name}']
                rmse = np.sqrt((model_energy - orca_energy) ** 2)
                atom.info[f'{model_name}_rmse'] = rmse

    def write_results(self, out_dir):
        rmses = {}
        total_rmses = {}

        for atom in self.atoms:
            for model in self.models:
                model_name = model.split('_')[5]
                state = atom.info['state']
                key = f'{model_name}-{state}'

                if key not in rmses:
                    rmses[key] = [] 
                model_rmse = atom.info[f'{model_name}_rmse']  
                rmses[key].append(model_rmse)

        for key, values in rmses.items():
            true_rmse = np.mean(values)
            total_rmses[key] = true_rmse

        df = pd.DataFrame(rmses)
        df.to_csv(f'{out_dir}')

    def plot_results(self):
        fig, ax = plt.subplots()
        rmses = {}
        model_name = model.split('_')[5]
        for atom in self.atoms:
            state = atom.info['state']
            for model in self.models:
                rmse = atom.info[f'{model_name}_rmse']
            if state not in rmses:
                rmses[state] = []
            rmses[state].append(rmse)

        for state, rmse_values in rmses.items():
            ax.hist(rmse_values, bins=20, alpha=0.5, label=f'State {state}')

        ax.set_xlabel('RMSE')
        ax.set_ylabel('Frequency')
        ax.legend()
        plt.show()

    def run_mds(self, model, atoms_path, snapshot_out_dir, traj_out_dir, num_simulations, snapshot_steps, change_head_ci = True, save_snapshots = True):
        log_steps = [5, 10, 50, 200, 500]
        num_simulations = 25
        temperature_K = 300
        friction = 0.01/fs
        timestep = 0.5*fs
        num_steps = 500
    
        device = 'cuda' if torch.cuda.is_available() else 'cpu'

        es_1_calculator = MACECalculator(model_paths=model, device=device)
        es_2_calculator = MACECalculator(model_paths=model, device=device)

        atoms = read(atoms_path)

        atoms.info['head'] = 'spin_flip_es2'
        atoms.calc = es_2_calculator

        for i in range(num_simulations):
            atoms.rattle(stdev=0.001)
            atoms.calc = es_2_calculator

            traj_file = f"{traj_out_dir}/sim_{i}.traj"
            traj = Trajectory(traj_file, mode='w')  

            snapshot_dir = f"{snapshot_out_dir}/sim_{i}"

            dyn = Langevin(atoms, timestep=timestep, temperature_K=temperature_K, friction=friction)
            for j in range(1, num_steps + 1):
                dyn.run(1)
                atoms.info['head'] = 'spin_flip_es1'
                atoms.calc = es_1_calculator
                es1_energy = atoms.get_potential_energy()

                atoms.info['head'] = 'spin_flip_es2'
                atoms.calc = es_2_calculator
                es2_energy = atoms.get_potential_energy()

                if es2_energy < es1_energy:
                    atoms.info['head'] = 'spin_flip_es1'
                    atoms.calc = es_1_calculator
                else:
                    atoms.info['head'] = 'spin_flip_es2'
                    atoms.calc = es_2_calculator

                atoms.info['es1_energy'] = es1_energy
                atoms.info['es2_energy'] = es2_energy

                traj.write(atoms)
                if save_snapshots:
                    if j in log_steps:
                        print(f"Logged step {j}")
                        snapshot_path = f"{snapshot_dir}_step_{j}.xyz"
                        write(snapshot_path, atoms)
                    
            traj.close()

    def model_mds(self, atoms_path, traj_out, change_head_ci = True):
        temperature_K = 500
        friction = 0.01/fs
        timestep = 0.5*fs
        num_steps = 500

        device = 'cpu'

        for model in self.models:
            es_1_calculator = MACECalculator(model_paths=model, device=device)
            es_2_calculator = MACECalculator(model_paths=model, device=device)

            model_name = model.split('_')[5]
    
            atoms = read(atoms_path)

            atoms.info['head'] = 'spin_flip_es2'
            atoms.calc = es_2_calculator

            traj_file = f"{traj_out}/model_{model_name}.traj"
            traj = Trajectory(traj_file, mode='w')  

            dyn = Langevin(atoms, timestep=timestep, temperature_K=temperature_K, friction=friction)
            for j in range(1, num_steps + 1):
                dyn.run(1)
                atoms.info['head'] = 'spin_flip_es1'
                atoms.calc = es_1_calculator
                es1_energy = atoms.get_potential_energy()

                atoms.info['head'] = 'spin_flip_es2'
                atoms.calc = es_2_calculator
                es2_energy = atoms.get_potential_energy()

                if es2_energy < es1_energy:
                    atoms.info['head'] = 'spin_flip_es1'
                    atoms.calc = es_1_calculator
                else:
                    atoms.info['head'] = 'spin_flip_es2'
                    atoms.calc = es_2_calculator

                atoms.info['es1_energy'] = es1_energy
                atoms.info['es2_energy'] = es2_energy

                traj.write(atoms)
                    
            traj.close()


In [None]:
model = 'testing_suite/thymine/models/thymine_md_compiled.model'
atoms = 'thymine/isolated_system/geometry_optimisations/length_3.0_angle_70.57/orca.xyz'
snapshot_out = 'testing_suite/thymine/mds/snapshots'
traj_out = 'testing_suite/thymine/mds/trajectories'
orca_energy_path = 'testing_suite/thymine/mds/snapshots_orca_energies'


tester = Tester([model])
# tester.run_mds(model, atoms, snapshot_out, traj_out, 50, [10, 50, 100, 200, 500], change_head_ci=True)
tester.load_atoms(snapshot_out)
tester.run_orca_energies(orca_energy_path, 2)

In [None]:
model_path = 'testing_suite/thymine/models'
models = [f'{model_path}/{model}' for model in os.listdir(model_path)]
tester = Tester(models)
atoms = 'testing_suite/thymine/mds/md_geom.xyz'
traj_out = 'testing_suite/thymine/mds/model_trajectories'

tester.model_mds(atoms, traj_out)

In [3]:
!export LC_ALL="POSIX"

In [None]:
model_path = 'testing_suite/thymine/models'
orca_energy_path = 'testing_suite/thymine/mds/snapshots_orca_energies'
models = [f'{model_path}/{model}' for model in os.listdir(model_path)]
tester = Tester(models)
tester.load_orca_energies(orca_energy_path)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
tester.run_models(device)
tester.calculate_rmse()

out_dir = 'testing_suite/thymine/results/results.csv'
tester.write_results(out_dir)