Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ System Partition Configuration
If not, you should consider using the ``squeue`` backend below.
- ``squeue``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.
- ``lsf``: Jobs will be launched using the `LSF <https://www.ibm.com/docs/en/spectrum-lsf/>`__ scheduler.

.. versionadded:: 3.7.2
Support for the SGE scheduler is added.
Expand Down Expand Up @@ -276,6 +277,8 @@ System Partition Configuration
This is a custom parallel program launcher used at `TACC <https://portal.tacc.utexas.edu/user-guides/stampede2>`__.
- ``local``: No parallel program launcher will be used.
The program will be launched locally.
- ``lrun``: Parallel programs will be launched using `LC Launcher <https://hpc.llnl.gov/training/tutorials/using-lcs-sierra-system#lrun>`__'s ``lrun`` command.
- ``lrun-gpu``: Parallel programs will be launched using `LC Launcher <https://hpc.llnl.gov/training/tutorials/using-lcs-sierra-system#lrun>`__'s ``lrun -M "-gpu"`` command that enables the CUDA-aware Spectrum MPI.
- ``mpirun``: Parallel programs will be launched using the ``mpirun`` command.
- ``mpiexec``: Parallel programs will be launched using the ``mpiexec`` command.
- ``srun``: Parallel programs will be launched using `Slurm <https://slurm.schedmd.com/srun.html>`__'s ``srun`` command.
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
'reframe.core.schedulers.local',
'reframe.core.schedulers.slurm',
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.sge'
'reframe.core.schedulers.sge',
'reframe.core.schedulers.lsf'
]
_schedulers = {}

Expand Down
19 changes: 19 additions & 0 deletions reframe/core/launchers/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,22 @@ def command(self, job):
ret.append(opt)

return ret


@register_launcher('lrun')
class LrunLauncher(JobLauncher):
'''LLNL's custom parallel job launcher'''

def command(self, job):
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = job.num_tasks // num_tasks_per_node
return ['lrun', '-N', str(num_nodes),
'-T', str(num_tasks_per_node)]


@register_launcher('lrun-gpu')
class LrungpuLauncher(LrunLauncher):
'''LLNL's custom parallel job launcher w/ CUDA aware Spectum MPI'''

def command(self, job):
return super().command(job) + ['-M "-gpu"']
129 changes: 129 additions & 0 deletions reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

#
# LSF backend
#
# - Initial version submitted by Ryan Goodner, UNM (based on PBS backend)
#

import functools
import re
import time

import reframe.core.runtime as rt
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import JobSchedulerError
from reframe.core.schedulers.pbs import PbsJobScheduler

_run_strict = functools.partial(osext.run_command, check=True)


@register_scheduler('lsf')
class LsfJobScheduler(PbsJobScheduler):
def __init__(self):
self._prefix = '#BSUB'
self._submit_timeout = rt.runtime().get_option(
f'schedulers/@{self.registered_name}/job_submit_timeout'
)

def emit_preamble(self, job):
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = job.num_tasks // num_tasks_per_node

preamble = [
self._format_option(f'-J {job.name}'),
self._format_option(f'-o {job.stdout}'),
self._format_option(f'-e {job.stderr}'),
self._format_option(f'-nnodes {num_nodes}')
]

# add job time limit in minutes
if job.time_limit is not None:
preamble.append(
self._format_option(f'-W {int(job.time_limit // 60)}')
)

# emit the rest of the options
options = job.options + job.cli_options
for opt in options:
if opt.startswith('#'):
preamble.append(opt)
else:
preamble.append(self._format_option(opt))

# change to working dir with cd
preamble.append(f'cd {job.workdir}')

return preamble

def submit(self, job):
cmd = f'bsub {job.script_filename}'
completed = _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'^Job <(?P<jobid>\S+)> is submitted',
completed.stdout)
if not jobid_match:
raise JobSchedulerError('could not retrieve the job id '
'of the submitted job')

job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def poll(self, *jobs):
if jobs:
# filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

completed = _run_strict(
f'bjobs -noheader {" ".join(job.jobid for job in jobs)}'
)
job_status = {}
job_status_lines = completed.stdout.split('\n')

for line in job_status_lines:
job_regex = (r'(?P<jobid>\d+)\s+'
r'(?P<user>\S+)\s+'
r'(?P<status>\S+)\s+'
r'(?P<queue>\S+)')
job_match = re.search(job_regex, line)
if job_match:
job_status[job_match['jobid']] = job_match['status']

for job in jobs:
if job.jobid not in job_status:
# job id not found
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'COMPLETED'
job._completed = True
elif job_status[job.jobid] in ('DONE', 'EXIT'):
# job done
job._state = 'COMPLETED'
job._completed = True
elif job_status[job.jobid] == 'RUN':
# job running
job._state = 'RUNNING'
elif job_status[job.jobid] == 'PEND':
# job pending
job._state = 'PENDING'
elif job_status[job.jobid] in ['PSUSP', 'SSUSP', 'USUSP']:
# job suspended
job._state = 'SUSPENDED'
else:
# job status unknown
self.log(f'Job {job_status[job.jobid]} not known, '
f'assuming job completed')
job._state = 'COMPLETED'
job._completed = True

def finished(self, job):
if job.exception:
raise job.exception

return job.state == 'COMPLETED'
1 change: 1 addition & 0 deletions reframe/core/schedulers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ def getscheduler(name):
import reframe.core.schedulers.pbs # noqa: F401, F403
import reframe.core.schedulers.sge # noqa: F401, F403
import reframe.core.schedulers.torque # noqa: F401, F403
import reframe.core.schedulers.lsf # noqa: F401, F403
6 changes: 3 additions & 3 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@
"type": "string",
"enum": [
"local", "pbs", "slurm",
"sge", "squeue", "torque"
"sge", "squeue", "torque", "lsf"
]
},
"launcher": {
"type": "string",
"enum": [
"alps", "ibrun", "local", "mpirun",
"mpiexec", "srun", "srunalloc", "ssh",
"upcrun", "upcxx-run"
"upcrun", "upcxx-run", "lrun", "lrun-gpu"
]
},
"access": {
Expand Down Expand Up @@ -370,7 +370,7 @@
"properties": {
"name": {
"type": "string",
"enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"]
"enum": ["local", "pbs", "sge", "slurm", "squeue", "torque", "lsf"]
},
"ignore_reqnodenotavail": {"type": "boolean"},
"resubmit_on_errors": {
Expand Down
12 changes: 10 additions & 2 deletions unittests/test_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


@pytest.fixture(params=[
'alps', 'launcherwrapper', 'local', 'mpiexec',
'mpirun', 'srun', 'srunalloc', 'ssh', 'upcrun', 'upcxx-run'
'alps', 'launcherwrapper', 'local', 'mpiexec', 'mpirun',
'srun', 'srunalloc', 'ssh', 'upcrun', 'upcxx-run', 'lrun', 'lrun-gpu'
])
def launcher(request):
if request.param == 'launcherwrapper':
Expand Down Expand Up @@ -139,6 +139,10 @@ def test_run_command(job):
assert command == 'upcrun -N 2 -n 4 --foo'
elif launcher_name == 'upcxx-run':
assert command == 'upcxx-run -N 2 -n 4 --foo'
elif launcher_name == 'lrun':
assert command == 'lrun -N 2 -T 2 --foo'
elif launcher_name == 'lrun-gpu':
assert command == 'lrun -N 2 -T 2 -M "-gpu" --foo'


def test_run_command_minimal(minimal_job):
Expand Down Expand Up @@ -169,3 +173,7 @@ def test_run_command_minimal(minimal_job):
assert command == 'upcrun -n 1 --foo'
elif launcher_name == 'upcxx-run':
assert command == 'upcxx-run -n 1 --foo'
elif launcher_name == 'lrun':
assert command == 'lrun -N 1 -T 1 --foo'
elif launcher_name == 'lrun-gpu':
assert command == 'lrun -N 1 -T 1 -M "-gpu" --foo'
20 changes: 19 additions & 1 deletion unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def launcher():
return getlauncher('local')


@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque'])
@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local',
'pbs', 'torque', 'lsf'])
def scheduler(request):
return getscheduler(request.param)

Expand Down Expand Up @@ -132,6 +133,23 @@ def assert_job_script_sanity(job):
'echo postrun'] == matches


def _expected_lsf_directives(job):
num_tasks = job.num_tasks or 1
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = int(num_tasks // num_tasks_per_node)
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}',
f'#BSUB -nnodes {num_nodes}',
f'#BSUB -W {int(job.time_limit // 60)}',
f'#BSUB --account=spam',
f'#BSUB --gres=gpu:4',
f'#DW jobdw capacity=100GB',
f'#DW stage_in source=/foo',
])


def _expected_sge_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node
Expand Down