Skip to content

Commit

Permalink
add new parallel module
Browse files Browse the repository at this point in the history
  • Loading branch information
jochym committed Jan 25, 2024
1 parent 8ffe3ba commit c1a8d79
Showing 1 changed file with 226 additions and 0 deletions.
226 changes: 226 additions & 0 deletions hecss/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../11_parallel.ipynb.

# %% auto 0
__all__ = []

# %% ../11_parallel.ipynb 2
from fastcore.basics import patch

# %% ../11_parallel.ipynb 3
import ase
from ase.calculators.vasp import Vasp
from ase.calculators import calculator
from ase.calculators.vasp.vasp import check_atoms
from ase import units as un
import asyncio
from concurrent.futures import ThreadPoolExecutor
from tqdm.auto import tqdm
from scipy import stats
import numpy as np

# %% ../11_parallel.ipynb 4
from hecss import *
import hecss
from hecss.util import write_dfset, calc_init_xscale
from hecss.optimize import make_sampling

# %% ../11_parallel.ipynb 6
def __run_async(func, *args, **kwargs):
'''
Run async methods detecting running loop in jupyter.
'''
try:
loop = asyncio.get_running_loop()
except RuntimeError: # 'RuntimeError: There is no current event loop...'
loop = None

if loop and loop.is_running():
# print('Async event loop already running. Running in new thread.')
# Create a separate thread so we can block before returning
with ThreadPoolExecutor(1) as pool:
result = pool.submit(lambda: asyncio.run(func(*args, **kwargs))).result()
else:
# print('Starting new event loop')
result = asyncio.run(func(*args, **kwargs))
return result

# %% ../11_parallel.ipynb 7
@patch
async def _arun(self: Vasp, command=None, out=None, directory=None):
"""
Method to explicitly execute VASP in async mode
This is an asyncio version of the function.
"""
# DEBUG
# print(f'Async _run {command} in {directory}')
if command is None:
command = self.command
if directory is None:
directory = self.directory

proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=directory
)

stdout, stderr = await proc.communicate()

# DEBUG
# print(f'[{command!r} exited with {proc.returncode}]')
# if stdout:
# print(f'[stdout]\n{stdout.decode()}')
# if stderr:
# print(f'[stderr]\n{stderr.decode()}')

return proc.returncode

# %% ../11_parallel.ipynb 8
@patch
async def __calculate_aio(self: Vasp,
atoms=None,
properties=('energy', ),
system_changes=tuple(calculator.all_changes)
):
"""
Do a VASP calculation in the specified directory.
This will generate the necessary VASP input files, and then
execute VASP. After execution, the energy, forces. etc. are read
from the VASP output files.
This is an asyncio version of the function.
"""
# Check for zero-length lattice vectors and PBC
# and that we actually have an Atoms object.
check_atoms(atoms)

self.clear_results()

if atoms is not None:
self.atoms = atoms.copy()

command = self.make_command(self.command)
self.write_input(self.atoms, properties, system_changes)

with self._txt_outstream() as out:
errorcode = await self._arun(command=command,
out=out,
directory=self.directory)

if errorcode:
raise calculator.CalculationFailed(
'{} in {} returned an error: {:d}'.format(
self.name, self.directory, errorcode))

# Read results from calculation
self.update_atoms(atoms)
self.read_results()
return errorcode

# %% ../11_parallel.ipynb 9
@patch
async def __estimate_width_scale_aio(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None, nwork=5):
'''
Async/parallel version of w-estimator. Only supported for VASP.
Estimate coefficient between temperature and displacement scale (eta).
Calculate energy increase from the `n` temperatures uniformly
distributed between 0 and `Tmax` and calculate avarage $\sqrt{E-E0/T}$
which is a width scale for a given temperature:
$$
w = \\eta\\sqrt{T}
$$
which comes from the assumed approximate relationship:
$$
\\frac{E(w(T))-E_0}{T} \\approx \\mathrm{const} = \\eta^2.
$$
#### Input
* `n` - number of sampling points
* `Tmax` - max sampled temperature
* `set_scale` - set scale parameter in the class after run
* `pbar` - show progress bar during calculation
* `nwork` -
#### Output
* if wm_out : mean(eta), std(eta), wm
* else : mean(eta), std(eta)
* wm - the nx3 array of: [width, Temperature, (E-E0)/nat]
'''
results = []

async def worker(q, i):
while not q.empty():
(T, w, dx, clc) = await q.get()
try:
# print(f'Worker {i} run task')
returned = await clc.__calculate_aio(clc.atoms)
# print(f'Worker {i} done task')
results.append((T, w, dx, clc))
if pbar:
pbar.update()
except Exception as e:
print(f"Error executing task: {e}")


# print('Parallel estimate_width_scale')
nat = len(self.cryst)
dim = (nat, 3)

# try:
# [task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1'])))
# for index, row in run_column.iterrows()]
# await asyncio.gather(*[worker(task_queue) for _ in range(5)])
# except Exception as e:
# print(f"\nUnable to get data: {e}\n")


if pbar:
pbar.set_description('Create')

# Build the queue
structs = asyncio.Queue()
while structs.qsize() < n - len(self._eta_list):
cr = ase.Atoms(self.cryst.get_atomic_numbers(),
cell=self.cryst.get_cell(),
scaled_positions=self.cryst.get_scaled_positions(),
pbc=True,
)

T = stats.uniform.rvs(0, Tmax) # Kelvin
if not T:
continue
w = self.w_scale * np.sqrt(T)
dx = self.Q.rvs(size=dim, scale=w)

clc = self.calc.__class__()
clc.fromdict(self.calc.asdict())
clc.atoms.set_positions(self.cryst.get_positions()+dx)
try :
clc.set(directory=f'{self.directory}/w_est/{len(self._eta_list)+structs.qsize():03d}')
except AttributeError :
# Calculator is not directory-based
# Ignore the error
pass
clc.set(command=self.calc.command)
structs.put_nowait((T, w, dx, clc))
if pbar:
pbar.update()

if pbar:
pbar.reset(structs.qsize())
pbar.set_description('Collect')

if nwork is None or nwork < 1:
nwork = structs.qsize()
await asyncio.gather(*[worker(structs, _) for _ in range(nwork)])

while results:
T, w, dx, clc = results.pop()
E = clc.get_potential_energy()
i = len(self._eta_list)
self._eta_samples.append((i, i, dx, clc.get_forces(), (E-self.Ep0)/nat))
self._eta_list.append([w, T, (E-self.Ep0)/nat])

return

0 comments on commit c1a8d79

Please sign in to comment.