Skip to content

Commit

Permalink
Merge pull request #1154 from pyiron/flux
Browse files Browse the repository at this point in the history
Reimplement the flux interface
  • Loading branch information
samwaseda committed Jul 8, 2023
2 parents 2dd3b0b + 01c7cbf commit 74b82d1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
1 change: 1 addition & 0 deletions .ci_support/environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- gitpython
- h5io
- h5py
- jinja2
- numpy
- pandas
- pint
Expand Down
1 change: 1 addition & 0 deletions .ci_support/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- gitpython =3.1.31
- h5io =0.1.7
- h5py =3.9.0
- jinja2 =3.1.2
- numpy =1.24.3
- pandas =2.0.3
- pint =0.22
Expand Down
91 changes: 89 additions & 2 deletions pyiron_base/jobs/job/runfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@
import posixpath
import subprocess

from jinja2 import Template

from pyiron_base.utils.deprecate import deprecate
from pyiron_base.jobs.job.wrapper import JobWrapper
from pyiron_base.state import state
from pyiron_base.utils.instance import static_isinstance


try:
import flux.job

flux_available = True
except ImportError:
flux_available = False

"""
The function job.run() inside pyiron is executed differently depending on the status of the job object. This module
introduces the most general run functions and how they are selected.
Expand Down Expand Up @@ -484,11 +493,15 @@ def run_job_with_runmode_executor(job, executor, gpus_per_slot=None):
raise NotImplementedError(
"Currently job.server.run_mode.executor does not support GenericMaster jobs."
)
if isinstance(executor, ProcessPoolExecutor):
if flux_available and isinstance(executor, flux.job.FluxExecutor):
return run_job_with_runmode_executor_flux(
job=job, executor=executor, gpus_per_slot=gpus_per_slot
)
elif isinstance(executor, ProcessPoolExecutor):
return run_job_with_runmode_executor_futures(job=job, executor=executor)
else:
raise NotImplementedError(
"Currently only concurrent.futures.ProcessPoolExecutor is supported."
"Currently only flux.job.FluxExecutor and concurrent.futures.ProcessPoolExecutor are supported."
)


Expand Down Expand Up @@ -533,6 +546,80 @@ def run_job_with_runmode_executor_futures(job, executor):
)


def run_job_with_runmode_executor_flux(job, executor, gpus_per_slot=None):
"""
Interface for the flux.job.FluxExecutor executor. Flux is a hierarchical resource management. It can either be used to
replace queuing systems like SLURM or be used as a user specific queuing system within an existing allocation.
pyiron provides two interfaces to flux, this executor interface as well as a traditional queuing system interface
via pysqa. This executor interface is designed for the development of asynchronous simulation protocols, while the
traditional queuing system interface simplifies the transition from other queuing systems like SLURM. The usuage
is analog to the concurrent.futures.Executor interface:
>>> from flux.job import FluxExecutor
>>> job.server.executor = FluxExecutor()
>>> job.run()
>>> job.server.future.done()
False
>>> job.server.future.result()
>>> job.server.future.done()
True
A word of caution - flux is currently only available on Linux, for all other operation systems the ProcessPoolExecutor
from the python standard library concurrent.futures is recommended. The advantage of flux over the ProcessPoolExecutor
is that flux takes over the resource management, like monitoring how many cores are available while with the
ProcessPoolExecutor this is left to the user.
Args:
job (GenericJob): pyiron job object
executor (flux.job.FluxExecutor): flux executor class which implements the executor interface defined in the
python concurrent.futures.Executor class.
gpus_per_slot (int): number of GPUs per MPI rank, typically 1
Returns:
concurrent.futures.Future: future object to develop asynchronous simulation protocols
"""
if not flux_available:
raise ModuleNotFoundError(
"No module named 'flux'. Running in flux mode is only available on Linux;"
"For CPU jobs, please use `conda install -c conda-forge flux-core`; for "
"GPU support you will additionally need "
"`conda install -c conda-forge flux-sched libhwloc=*=cuda*`"
)
if not state.database.database_is_disabled:
executable_template = Template(
"#!/bin/bash\n",
"python -m pyiron_base.cli wrapper -p {{working_directory}} -j {{job_id}}",
)
exeuctable_str = executable_template.render(
working_directory=job.working_directory,
job_id=str(job.job_id),
)
job_name = "pi_" + str(job.job_id)
else:
executable_template = Template(
"#!/bin/bash\n",
"python -m pyiron_base.cli wrapper -p {{working_directory}} -f {{file_name}}{{h5_path}}",
)
exeuctable_str = executable_template.render(
working_directory=job.working_directory,
file_name=job.project_hdf5.file_name,
h5_path=job.project_hdf5.h5_path,
)
job_name = "pi_" + job.job_name

jobspec = flux.job.JobspecV1.from_batch_command(
jobname=job_name,
script=exeuctable_str,
num_nodes=1,
cores_per_slot=1,
gpus_per_slot=gpus_per_slot,
num_slots=job.server.cores,
)
jobspec.cwd = job.project_hdf5.working_directory
jobspec.environment = dict(os.environ)
return executor.submit(jobspec)


def run_time_decorator(func):
def wrapper(job):
if not state.database.database_is_disabled and job.job_id is not None:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
'gitpython==3.1.31',
'h5io==0.1.7',
'h5py==3.9.0',
'jinja2==3.1.2',
'numpy==1.24.3',
'pandas==2.0.3',
'pint==0.22',
Expand Down

0 comments on commit 74b82d1

Please sign in to comment.