Skip to content

Commit

Permalink
Prepare for parallel sampler
Browse files Browse the repository at this point in the history
Some reorganization of parallel width estimator. New approach to splitting methods into files (multiple notebooks exporting to single .py file).
  • Loading branch information
jochym committed Apr 22, 2024
1 parent 7e0c7b8 commit 404616f
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 419 deletions.
179 changes: 111 additions & 68 deletions 11_core.ipynb

Large diffs are not rendered by default.

353 changes: 141 additions & 212 deletions 11_parallel.ipynb

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions hecss/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
'hecss.cli.reshape_sample': ('cli.html#reshape_sample', 'hecss/cli.py'),
'hecss.cli.run_cli_cmd': ('cli.html#run_cli_cmd', 'hecss/cli.py')},
'hecss.core': { 'hecss.core.HECSS': ('core.html#hecss', 'hecss/core.py'),
'hecss.core.HECSS.__estimate_width_scale_ser': ('core.html#hecss.__estimate_width_scale_ser', 'hecss/core.py'),
'hecss.core.HECSS.__estimate_width_scale_aio': ( 'parallel.html#hecss.__estimate_width_scale_aio',
'hecss/core.py'),
'hecss.core.HECSS.__get_calculator': ('core.html#hecss.__get_calculator', 'hecss/core.py'),
'hecss.core.HECSS.__init__': ('core.html#hecss.__init__', 'hecss/core.py'),
'hecss.core.HECSS._sampler': ('core.html#hecss._sampler', 'hecss/core.py'),
'hecss.core.HECSS._estimate_width_scale_aio': ( 'parallel.html#hecss._estimate_width_scale_aio',
'hecss/core.py'),
'hecss.core.HECSS._estimate_width_scale_ser': ('core.html#hecss._estimate_width_scale_ser', 'hecss/core.py'),
'hecss.core.HECSS._sampler_aio': ('parallel.html#hecss._sampler_aio', 'hecss/core.py'),
'hecss.core.HECSS._sampler_ser': ('core.html#hecss._sampler_ser', 'hecss/core.py'),
'hecss.core.HECSS.estimate_width_scale': ('core.html#hecss.estimate_width_scale', 'hecss/core.py'),
'hecss.core.HECSS.generate': ('core.html#hecss.generate', 'hecss/core.py'),
'hecss.core.HECSS.print_xs': ('core.html#hecss.print_xs', 'hecss/core.py'),
Expand Down Expand Up @@ -48,9 +53,7 @@
'hecss.monitor.show_dc_conv': ('monitor.html#show_dc_conv', 'hecss/monitor.py')},
'hecss.optimize': { 'hecss.optimize.get_sample_weights': ('optimize.html#get_sample_weights', 'hecss/optimize.py'),
'hecss.optimize.make_sampling': ('optimize.html#make_sampling', 'hecss/optimize.py')},
'hecss.parallel': { 'hecss.parallel.HECSS.__estimate_width_scale_aio': ( 'parallel.html#hecss.__estimate_width_scale_aio',
'hecss/parallel.py'),
'hecss.parallel.Vasp.__calculate_aio': ('parallel.html#vasp.__calculate_aio', 'hecss/parallel.py'),
'hecss.parallel': { 'hecss.parallel.Vasp.__calculate_aio': ('parallel.html#vasp.__calculate_aio', 'hecss/parallel.py'),
'hecss.parallel.Vasp._arun': ('parallel.html#vasp._arun', 'hecss/parallel.py'),
'hecss.parallel.__run_async': ('parallel.html#__run_async', 'hecss/parallel.py')},
'hecss.planner': {'hecss.planner.plan_T_scan': ('planner.html#plan_t_scan', 'hecss/planner.py')},
Expand Down
212 changes: 188 additions & 24 deletions hecss/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
from spglib import find_primitive, get_symmetry_dataset

# %% ../11_core.ipynb 4
import hecss
from hecss import *

# %% ../11_core.ipynb 5
_disp_dists = {
'normal' : stats.norm,
'logistic' : stats.logistic,
Expand All @@ -37,7 +41,7 @@
}


# %% ../11_core.ipynb 5
# %% ../11_core.ipynb 6
class HECSS:
'''
Class encapsulating the sampling and weight generation
Expand Down Expand Up @@ -129,7 +133,7 @@ def print_xs(self, c, s):



# %% ../11_core.ipynb 6
# %% ../11_core.ipynb 7
@patch
def __get_calculator(self: HECSS):
'''
Expand All @@ -139,9 +143,9 @@ def __get_calculator(self: HECSS):
'''
return self.calc() if callable(self.calc) else self.calc

# %% ../11_core.ipynb 7
# %% ../11_core.ipynb 8
@patch
def __estimate_width_scale_ser(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None):
def _estimate_width_scale_ser(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None):
'''
Serial version of w-estimator.
Estimate coefficient between temperature and displacement scale (eta).
Expand Down Expand Up @@ -201,9 +205,10 @@ def __estimate_width_scale_ser(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=
pbar.update()


# %% ../11_core.ipynb 8
# %% ../11_core.ipynb 9
@patch
def estimate_width_scale(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None, nwork=None):
def estimate_width_scale(self: HECSS, n=1, Tmax=600,
set_scale=True, pbar=None, nwork=None):
'''
Estimate coefficient between temperature and displacement scale (eta).
Calculate energy increase from the `n` temperatures uniformly
Expand All @@ -230,7 +235,7 @@ def estimate_width_scale(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None,
* else : mean(eta), std(eta)
* wm - the nx3 array of: [width, Temperature, (E-E0)/nat]
'''

if self.Ep0 is None:
self.Ep0 = self.cryst.get_potential_energy()
E0 = self.Ep0
Expand All @@ -248,17 +253,17 @@ def estimate_width_scale(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None,
pbar.update(len(self._eta_list))

# Execute async/parallel version if possible
if nwork is not None:
if self.calc.name == 'vasp':
from hecss.parallel import __run_async
__run_async(self.__estimate_width_scale_aio, n, Tmax, set_scale, pbar, nwork)
else :
try :
# Rises NotImplementedError exception for unsupported calculators
# and for nwork=None call (i.e. serial version)
self._estimate_width_scale_aio(n, Tmax, set_scale, pbar, nwork)
except NotImplementedError:
if nwork is not None:
# Warn if the call was for parallel version with unsupported
# calculator. Silent, if the call was for serial version.
print('WARNING: Parallel execution only supported for VASP.')
print('Running serial version')
# Or serial for other calculators or if requested
else :
self.__estimate_width_scale_ser(n, Tmax, set_scale, pbar)

self._estimate_width_scale_ser(n, Tmax, set_scale, pbar)

wm = np.array(self._eta_list).T
pathlib.Path(f'{self.directory}/w_est/').mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -286,9 +291,9 @@ def estimate_width_scale(self: HECSS, n=1, Tmax=600, set_scale=True, pbar=None,

return m, y.std(), xscale

# %% ../11_core.ipynb 9
# %% ../11_core.ipynb 10
@patch
def _sampler(self: HECSS, T_goal, N=None, delta_sample=0.01, sigma=2,
def _sampler_ser(self: HECSS, T_goal, N=None, delta_sample=0.01, sigma=2,
eqdelta=0.05, eqsigma=0.2, xi=1, chi=1,
modify=None, modify_args=None, symprec=1e-5,
width_list=None, dofmu_list=None, xscale_list=None,
Expand Down Expand Up @@ -546,11 +551,11 @@ def _sampler(self: HECSS, T_goal, N=None, delta_sample=0.01, sigma=2,
# print('Generator terminated')
break

# %% ../11_core.ipynb 10
# %% ../11_core.ipynb 11
@patch
def sample(self: HECSS, T, N, sentinel=None, sentinel_args={}, **kwargs):
def sample(self: HECSS, T, N, sentinel=None, sentinel_args={}, nwork=None, **kwargs):
'''
Generate N samples using `HECSS._sampler` generator.
Generate N samples using `HECSS._sampler_(ser/aio)` generator.
`sentinel` parameter is a call-back function
which is called on every sample to decide if the
iteration should be stopped early. If it returns
Expand Down Expand Up @@ -638,7 +643,14 @@ def sample(self: HECSS, T, N, sentinel=None, sentinel_args={}, **kwargs):
if T in self.samplers:
generator = self.samplers[T]
else :
generator = self._sampler(T, **kwargs)
# Execute async/parallel version if possible
try :
# Try parallel version first
# Raises NotImplementedError if called for serial work
# (i.e. nwork==None) or unsupported calculator
generator = self._sampler_aio(T, **kwargs, nwork=nwork)
except NotImplementedError:
generator = self._sampler_ser(T, **kwargs)
self.samplers[T] = generator

for smpl in generator:
Expand All @@ -654,10 +666,10 @@ def sample(self: HECSS, T, N, sentinel=None, sentinel_args={}, **kwargs):
self._pbar=None
return smpls

# %% ../11_core.ipynb 11
# %% ../11_core.ipynb 12
from hecss.optimize import make_sampling

# %% ../11_core.ipynb 12
# %% ../11_core.ipynb 13
@patch
def generate(self: HECSS, S, T=None, sigma_scale=1.0, border=False, probTH=0.25,
Nmul=4, N=None, nonzero_w=True, debug=False):
Expand Down Expand Up @@ -699,3 +711,155 @@ def generate(self: HECSS, S, T=None, sigma_scale=1.0, border=False, probTH=0.25,
return make_sampling(S, T, sigma_scale=sigma_scale, border=border, probTH=probTH,
Nmul=Nmul, N=N, nonzero_w=nonzero_w, debug=debug)



# %% ../11_parallel.ipynb 13
@patch
async def __estimate_width_scale_aio(self: HECSS, n=1, Tmax=600,
set_scale=True, pbar=None, nwork=5):
'''
Async/parallel version of w-estimator. Only supported for VASP.
Estimate coefficient between temperature and displacement scale (eta).
Calculate energy increase from the `n` temperatures uniformly
distributed between 0 and `Tmax` and calculate avarage $\sqrt{E-E0/T}$
which is a width scale for a given temperature:
$$
w = \\eta\\sqrt{T}
$$
which comes from the assumed approximate relationship:
$$
\\frac{E(w(T))-E_0}{T} \\approx \\mathrm{const} = \\eta^2.
$$
#### Input
* `n` - number of sampling points
* `Tmax` - max sampled temperature
* `set_scale` - set scale parameter in the class after run
* `pbar` - show progress bar during calculation
* `nwork` -
#### Output
* if wm_out : mean(eta), std(eta), wm
* else : mean(eta), std(eta)
* wm - the nx3 array of: [width, Temperature, (E-E0)/nat]
'''
results = []

async def worker(q, i):
while not q.empty():
(T, w, dx, clc) = await q.get()
try:
# print(f'Worker {i} run task')
returned = await clc.__calculate_aio(clc.atoms)
# print(f'Worker {i} done task')
results.append((T, w, dx, clc))
if pbar:
pbar.update()
except Exception as e:
print(f"Error executing task: {e}")


# print('Parallel estimate_width_scale')
nat = len(self.cryst)
dim = (nat, 3)

# try:
# [task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1'])))
# for index, row in run_column.iterrows()]
# await asyncio.gather(*[worker(task_queue) for _ in range(5)])
# except Exception as e:
# print(f"\nUnable to get data: {e}\n")


if pbar:
pbar.set_description('Create')

# Build the queue
structs = asyncio.Queue()
while structs.qsize() < n - len(self._eta_list):
cr = ase.Atoms(self.cryst.get_atomic_numbers(),
cell=self.cryst.get_cell(),
scaled_positions=self.cryst.get_scaled_positions(),
pbc=True,
)

T = stats.uniform.rvs(0, Tmax) # Kelvin
if not T:
continue
w = self.w_scale * np.sqrt(T)
dx = self.Q.rvs(size=dim, scale=w)

clc = self.calc.__class__()
clc.fromdict(self.calc.asdict())
clc.atoms.set_positions(self.cryst.get_positions()+dx)
try :
clc.set(directory=f'{self.directory}/w_est/{len(self._eta_list)+structs.qsize():03d}')
except AttributeError :
# Calculator is not directory-based
# Ignore the error
pass
clc.set(command=self.calc.command)
structs.put_nowait((T, w, dx, clc))
if pbar:
pbar.update()

if pbar:
pbar.reset(structs.qsize())
pbar.set_description('Collect')

if nwork is None or nwork < 1:
nwork = structs.qsize()
await asyncio.gather(*[worker(structs, _) for _ in range(nwork)])

while results:
T, w, dx, clc = results.pop()
E = clc.get_potential_energy()
i = len(self._eta_list)
self._eta_samples.append((i, i, dx, clc.get_forces(), (E-self.Ep0)/nat))
self._eta_list.append([w, T, (E-self.Ep0)/nat])

return

# %% ../11_parallel.ipynb 14
@patch
def _estimate_width_scale_aio(self: HECSS, n=1, Tmax=600,
set_scale=True, pbar=None, nwork=5):
'''
Runner for the asynchronous version of width estimator.
Guards against not supported calculators and serial call
request (nwork==None) with NotImplementedError exception.
'''
if nwork is None:
# Silent. This is parallel version. Use serial instead.
raise NotImplementedError
if self.calc.name in ('vasp',):
__run_async(self.__estimate_width_scale_aio, n, Tmax, set_scale, pbar, nwork)
else :
# Warn if the call was for parallel version with unsupported
# calculator. Silent, if the call was for serial version.
print('WARNING: Parallel execution supported only for some calculators.')
print('Using serial version')
raise NotImplementedError

# %% ../11_parallel.ipynb 35
@patch
def _sampler_aio(self: HECSS, T_goal, N=None, delta_sample=0.01, sigma=2,
eqdelta=0.05, eqsigma=0.2, xi=1, chi=1,
modify=None, modify_args=None, symprec=1e-5,
width_list=None, dofmu_list=None, xscale_list=None,
verb=True, nwork=None):
'''
Runner for the parallel version of the sampler.
'''

if nwork is None:
# Silent. This is parallel version. Use serial version instead.
raise NotImplementedError

if self.calc.name in ('vasp',):
raise NotImplementedError
else :
# Warn if the call was for unsupported calculator.
print('WARNING: Parallel execution supported only for some calculators.')
print('Using serial version')
raise NotImplementedError
Loading

0 comments on commit 404616f

Please sign in to comment.