diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 0355c3586c..0c7b27d50b 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -218,6 +218,7 @@ System Partition Configuration If not, you should consider using the ``squeue`` backend below. - ``squeue``: Jobs will be launched using the `Slurm `__ scheduler. This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. + - ``lsf``: Jobs will be launched using the `LSF `__ scheduler. .. versionadded:: 3.7.2 Support for the SGE scheduler is added. @@ -276,6 +277,8 @@ System Partition Configuration This is a custom parallel program launcher used at `TACC `__. - ``local``: No parallel program launcher will be used. The program will be launched locally. + - ``lrun``: Parallel programs will be launched using `LC Launcher `__'s ``lrun`` command. + - ``lrun-gpu``: Parallel programs will be launched using `LC Launcher `__'s ``lrun -M "-gpu"`` command that enables the CUDA-aware Spectrum MPI. - ``mpirun``: Parallel programs will be launched using the ``mpirun`` command. - ``mpiexec``: Parallel programs will be launched using the ``mpiexec`` command. - ``srun``: Parallel programs will be launched using `Slurm `__'s ``srun`` command. diff --git a/reframe/core/backends.py b/reframe/core/backends.py index fda7555f1e..4a8205b897 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -20,7 +20,8 @@ 'reframe.core.schedulers.local', 'reframe.core.schedulers.slurm', 'reframe.core.schedulers.pbs', - 'reframe.core.schedulers.sge' + 'reframe.core.schedulers.sge', + 'reframe.core.schedulers.lsf' ] _schedulers = {} diff --git a/reframe/core/launchers/mpi.py b/reframe/core/launchers/mpi.py index 1364b02afc..ad62b85376 100644 --- a/reframe/core/launchers/mpi.py +++ b/reframe/core/launchers/mpi.py @@ -124,3 +124,22 @@ def command(self, job): ret.append(opt) return ret + + +@register_launcher('lrun') +class LrunLauncher(JobLauncher): + '''LLNL's custom parallel job launcher''' + + def command(self, job): + num_tasks_per_node = job.num_tasks_per_node or 1 + num_nodes = job.num_tasks // num_tasks_per_node + return ['lrun', '-N', str(num_nodes), + '-T', str(num_tasks_per_node)] + + +@register_launcher('lrun-gpu') +class LrungpuLauncher(LrunLauncher): + '''LLNL's custom parallel job launcher w/ CUDA aware Spectum MPI''' + + def command(self, job): + return super().command(job) + ['-M "-gpu"'] diff --git a/reframe/core/schedulers/lsf.py b/reframe/core/schedulers/lsf.py new file mode 100644 index 0000000000..8197988d72 --- /dev/null +++ b/reframe/core/schedulers/lsf.py @@ -0,0 +1,129 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +# +# LSF backend +# +# - Initial version submitted by Ryan Goodner, UNM (based on PBS backend) +# + +import functools +import re +import time + +import reframe.core.runtime as rt +import reframe.utility.osext as osext +from reframe.core.backends import register_scheduler +from reframe.core.exceptions import JobSchedulerError +from reframe.core.schedulers.pbs import PbsJobScheduler + +_run_strict = functools.partial(osext.run_command, check=True) + + +@register_scheduler('lsf') +class LsfJobScheduler(PbsJobScheduler): + def __init__(self): + self._prefix = '#BSUB' + self._submit_timeout = rt.runtime().get_option( + f'schedulers/@{self.registered_name}/job_submit_timeout' + ) + + def emit_preamble(self, job): + num_tasks_per_node = job.num_tasks_per_node or 1 + num_nodes = job.num_tasks // num_tasks_per_node + + preamble = [ + self._format_option(f'-J {job.name}'), + self._format_option(f'-o {job.stdout}'), + self._format_option(f'-e {job.stderr}'), + self._format_option(f'-nnodes {num_nodes}') + ] + + # add job time limit in minutes + if job.time_limit is not None: + preamble.append( + self._format_option(f'-W {int(job.time_limit // 60)}') + ) + + # emit the rest of the options + options = job.options + job.cli_options + for opt in options: + if opt.startswith('#'): + preamble.append(opt) + else: + preamble.append(self._format_option(opt)) + + # change to working dir with cd + preamble.append(f'cd {job.workdir}') + + return preamble + + def submit(self, job): + cmd = f'bsub {job.script_filename}' + completed = _run_strict(cmd, timeout=self._submit_timeout) + jobid_match = re.search(r'^Job <(?P\S+)> is submitted', + completed.stdout) + if not jobid_match: + raise JobSchedulerError('could not retrieve the job id ' + 'of the submitted job') + + job._jobid = jobid_match.group('jobid') + job._submit_time = time.time() + + def poll(self, *jobs): + if jobs: + # filter out non-jobs + jobs = [job for job in jobs if job is not None] + + if not jobs: + return + + completed = _run_strict( + f'bjobs -noheader {" ".join(job.jobid for job in jobs)}' + ) + job_status = {} + job_status_lines = completed.stdout.split('\n') + + for line in job_status_lines: + job_regex = (r'(?P\d+)\s+' + r'(?P\S+)\s+' + r'(?P\S+)\s+' + r'(?P\S+)') + job_match = re.search(job_regex, line) + if job_match: + job_status[job_match['jobid']] = job_match['status'] + + for job in jobs: + if job.jobid not in job_status: + # job id not found + self.log(f'Job {job.jobid} not known to scheduler, ' + f'assuming job completed') + job._state = 'COMPLETED' + job._completed = True + elif job_status[job.jobid] in ('DONE', 'EXIT'): + # job done + job._state = 'COMPLETED' + job._completed = True + elif job_status[job.jobid] == 'RUN': + # job running + job._state = 'RUNNING' + elif job_status[job.jobid] == 'PEND': + # job pending + job._state = 'PENDING' + elif job_status[job.jobid] in ['PSUSP', 'SSUSP', 'USUSP']: + # job suspended + job._state = 'SUSPENDED' + else: + # job status unknown + self.log(f'Job {job_status[job.jobid]} not known, ' + f'assuming job completed') + job._state = 'COMPLETED' + job._completed = True + + def finished(self, job): + if job.exception: + raise job.exception + + return job.state == 'COMPLETED' diff --git a/reframe/core/schedulers/registry.py b/reframe/core/schedulers/registry.py index cf41168afc..8da99255e1 100644 --- a/reframe/core/schedulers/registry.py +++ b/reframe/core/schedulers/registry.py @@ -40,3 +40,4 @@ def getscheduler(name): import reframe.core.schedulers.pbs # noqa: F401, F403 import reframe.core.schedulers.sge # noqa: F401, F403 import reframe.core.schedulers.torque # noqa: F401, F403 +import reframe.core.schedulers.lsf # noqa: F401, F403 diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index 8b1e3e990a..70d82e8fc8 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -248,7 +248,7 @@ "type": "string", "enum": [ "local", "pbs", "slurm", - "sge", "squeue", "torque" + "sge", "squeue", "torque", "lsf" ] }, "launcher": { @@ -256,7 +256,7 @@ "enum": [ "alps", "ibrun", "local", "mpirun", "mpiexec", "srun", "srunalloc", "ssh", - "upcrun", "upcxx-run" + "upcrun", "upcxx-run", "lrun", "lrun-gpu" ] }, "access": { @@ -370,7 +370,7 @@ "properties": { "name": { "type": "string", - "enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"] + "enum": ["local", "pbs", "sge", "slurm", "squeue", "torque", "lsf"] }, "ignore_reqnodenotavail": {"type": "boolean"}, "resubmit_on_errors": { diff --git a/unittests/test_launchers.py b/unittests/test_launchers.py index 4f09d2594e..903906c1a8 100644 --- a/unittests/test_launchers.py +++ b/unittests/test_launchers.py @@ -11,8 +11,8 @@ @pytest.fixture(params=[ - 'alps', 'launcherwrapper', 'local', 'mpiexec', - 'mpirun', 'srun', 'srunalloc', 'ssh', 'upcrun', 'upcxx-run' + 'alps', 'launcherwrapper', 'local', 'mpiexec', 'mpirun', + 'srun', 'srunalloc', 'ssh', 'upcrun', 'upcxx-run', 'lrun', 'lrun-gpu' ]) def launcher(request): if request.param == 'launcherwrapper': @@ -139,6 +139,10 @@ def test_run_command(job): assert command == 'upcrun -N 2 -n 4 --foo' elif launcher_name == 'upcxx-run': assert command == 'upcxx-run -N 2 -n 4 --foo' + elif launcher_name == 'lrun': + assert command == 'lrun -N 2 -T 2 --foo' + elif launcher_name == 'lrun-gpu': + assert command == 'lrun -N 2 -T 2 -M "-gpu" --foo' def test_run_command_minimal(minimal_job): @@ -169,3 +173,7 @@ def test_run_command_minimal(minimal_job): assert command == 'upcrun -n 1 --foo' elif launcher_name == 'upcxx-run': assert command == 'upcxx-run -n 1 --foo' + elif launcher_name == 'lrun': + assert command == 'lrun -N 1 -T 1 --foo' + elif launcher_name == 'lrun-gpu': + assert command == 'lrun -N 1 -T 1 -M "-gpu" --foo' diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 007945c0a3..8f082e2548 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -26,7 +26,8 @@ def launcher(): return getlauncher('local') -@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque']) +@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', + 'pbs', 'torque', 'lsf']) def scheduler(request): return getscheduler(request.param) @@ -132,6 +133,23 @@ def assert_job_script_sanity(job): 'echo postrun'] == matches +def _expected_lsf_directives(job): + num_tasks = job.num_tasks or 1 + num_tasks_per_node = job.num_tasks_per_node or 1 + num_nodes = int(num_tasks // num_tasks_per_node) + return set([ + f'#BSUB -J testjob', + f'#BSUB -o {job.stdout}', + f'#BSUB -e {job.stderr}', + f'#BSUB -nnodes {num_nodes}', + f'#BSUB -W {int(job.time_limit // 60)}', + f'#BSUB --account=spam', + f'#BSUB --gres=gpu:4', + f'#DW jobdw capacity=100GB', + f'#DW stage_in source=/foo', + ]) + + def _expected_sge_directives(job): num_nodes = job.num_tasks // job.num_tasks_per_node num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node