Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,22 @@ System Partition Configuration
Supported schedulers are the following:

- ``local``: Jobs will be launched locally without using any job scheduler.
- ``oar``: Jobs will be launched using the `OAR <https://oar.imag.fr/>`__ scheduler.
- ``pbs``: Jobs will be launched using the `PBS Pro <https://en.wikipedia.org/wiki/Portable_Batch_System>`__ scheduler.
- ``torque``: Jobs will be launched using the `Torque <https://en.wikipedia.org/wiki/TORQUE>`__ scheduler.
- ``sge``: Jobs will be launched using the `Sun Grid Engine <https://arc.liv.ac.uk/SGE/htmlman/manuals.html>`__ scheduler.
- ``slurm``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend requires job accounting to be enabled in the target system.
If not, you should consider using the ``squeue`` backend below.
- ``squeue``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.
- ``lsf``: Jobs will be launched using the `LSF <https://www.ibm.com/docs/en/spectrum-lsf/>`__ scheduler.
- ``torque``: Jobs will be launched using the `Torque <https://en.wikipedia.org/wiki/TORQUE>`__ scheduler.

.. versionadded:: 3.7.2
Support for the SGE scheduler is added.

.. versionadded:: 3.8.2
Support for the OAR scheduler is added.

.. note::

The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific.
Expand Down
6 changes: 4 additions & 2 deletions docs/tutorial_tips_tricks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,10 @@ The following is an example of ``.gitlab-ci.yml`` file that does exactly that:
It defines two stages.
The first one, called ``generate``, will call ReFrame to generate the pipeline specification for the desired tests.
All the usual `test selection options <manpage.html#test-filtering>`__ can be used to select specific tests.
ReFrame will process them as usual, but instead of running the selected tests, it will generate the correct steps for running each test individually as a Gitlab job.
We then pass the generated CI pipeline file to second phase as an artifact and we are done!
ReFrame will process them as usual, but instead of running the selected tests, it will generate the correct steps
for running each test individually as a Gitlab job. We then pass the generated CI pipeline file to second phase as
an artifact and we are done! If ``image`` keyword is defined in ``.gitlab-ci.yml``, the emitted pipeline will use
the same image as the one defined in the parent pipeline.

The following figure shows one part of the automatically generated pipeline for the test graph depicted `above <#fig-deps-complex>`__.

Expand Down
5 changes: 3 additions & 2 deletions reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
_launchers = {}
_scheduler_backend_modules = [
'reframe.core.schedulers.local',
'reframe.core.schedulers.slurm',
'reframe.core.schedulers.lsf',
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.oar',
'reframe.core.schedulers.sge',
'reframe.core.schedulers.lsf'
'reframe.core.schedulers.slurm'
]
_schedulers = {}

Expand Down
190 changes: 190 additions & 0 deletions reframe/core/schedulers/oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

#
# OAR backend
#
# - Initial version submitted by Mahendra Paipuri, INRIA
#

import functools
import os
import re
import time

import reframe.core.runtime as rt
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import JobError, JobSchedulerError
from reframe.core.schedulers.pbs import PbsJobScheduler
from reframe.utility import seconds_to_hms


# States can be found here:
# https://github.com/oar-team/oar/blob/0fccc4fc3bb86ee935ce58effc5aec514a3e155d/sources/core/qfunctions/oarstat#L293
def oar_state_completed(state):
completion_states = {
'Error',
'Terminated',
}
if state:
return all(s in completion_states for s in state.split(','))

return False


def oar_state_pending(state):
pending_states = {
'Waiting',
'toLaunch',
'Launching',
'Hold',
'Running',
'toError',
'Finishing',
'Suspended',
'Resuming',
}
if state:
return any(s in pending_states for s in state.split(','))

return False


_run_strict = functools.partial(osext.run_command, check=True)


@register_scheduler('oar')
class OarJobScheduler(PbsJobScheduler):
# host is de-facto nodes and core is number of cores requested per node
# number of sockets can also be specified using cpu={num_sockets}
TASKS_OPT = '-l /host={num_nodes}/core={num_tasks_per_node}'

def __init__(self):
self._prefix = '#OAR'
self._submit_timeout = rt.runtime().get_option(
f'schedulers/@{self.registered_name}/job_submit_timeout'
)

def emit_preamble(self, job):
# Same reason as oarsub, we give full path to output and error files to
# avoid writing them in the working dir
preamble = [
self._format_option(f'-n "{job.name}"'),
self._format_option(f'-O {os.path.join(job.workdir, job.stdout)}'),
self._format_option(f'-E {os.path.join(job.workdir, job.stderr)}'),
]

if job.time_limit is not None:
h, m, s = seconds_to_hms(job.time_limit)
self.TASKS_OPT += ',walltime=%d:%d:%d' % (h, m, s)

# Get number of nodes in the reservation
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = job.num_tasks // num_tasks_per_node

# Emit main resource reservation option
options = [self.TASKS_OPT.format(
num_nodes=num_nodes, num_tasks_per_node=num_tasks_per_node,
)]

# Emit the rest of the options
options += job.sched_access + job.options + job.cli_options
for opt in options:
if opt.startswith('#'):
preamble.append(opt)
else:
preamble.append(self._format_option(opt))

# OAR starts the job in the home directory by default
preamble.append(f'cd {job.workdir}')
return preamble

def submit(self, job):
# For some reason OAR job manager says that job launching dir is
# working dir of the repo and not stage dir. A workaround is to give
# full path of script to oarsub
job_script_fullpath = os.path.join(job.workdir, job.script_filename)

# OAR needs -S to submit job in batch mode
cmd = f'oarsub -S {job_script_fullpath}'
completed = _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'.*OAR_JOB_ID=(?P<jobid>\S+)',
completed.stdout)
if not jobid_match:
raise JobSchedulerError('could not retrieve the job id '
'of the submitted job')

job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def cancel(self, job):
_run_strict(f'oardel {job.jobid}', timeout=self._submit_timeout)
job._cancelled = True

def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

for job in jobs:
completed = _run_strict(
f'oarstat -fj {job.jobid}'
)

# Store information for each job separately
jobinfo = {}

# Typical oarstat -fj <job_id> output:
# https://github.com/oar-team/oar/blob/0fccc4fc3bb86ee935ce58effc5aec514a3e155d/sources/core/qfunctions/oarstat#L310
job_raw_info = completed.stdout
jobid_match = re.search(
r'^Job_Id:\s*(?P<jobid>\S+)', completed.stdout, re.MULTILINE
)
if jobid_match:
jobid = jobid_match.group('jobid')
jobinfo[jobid] = job_raw_info

if job.jobid not in jobinfo:
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'Terminated'
job._completed = True
continue

info = jobinfo[job.jobid]
state_match = re.search(
r'^\s*state = (?P<state>[A-Z]\S+)', info, re.MULTILINE
)
if not state_match:
self.log(f'Job state not found (job info follows):\n{info}')
continue

job._state = state_match.group('state')
if oar_state_completed(job.state):
exitcode_match = re.search(
r'^\s*exit_code = (?P<code>\d+)',
info, re.MULTILINE,
)

if exitcode_match:
job._exitcode = int(exitcode_match.group('code'))

# We report a job as finished only when its stdout/stderr are
# written back to the working directory
stdout = os.path.join(job.workdir, job.stdout)
stderr = os.path.join(job.workdir, job.stderr)
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
done = job.cancelled or out_ready
if done:
job._completed = True
elif oar_state_pending(job.state) and job.max_pending_time:
if time.time() - job.submit_time >= job.max_pending_time:
self.cancel(job)
job._exception = JobError('maximum pending time exceeded',
job.jobid)
6 changes: 3 additions & 3 deletions reframe/core/schedulers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def getscheduler(name):

# Import the schedulers modules to trigger their registration
import reframe.core.schedulers.local # noqa: F401, F403
import reframe.core.schedulers.slurm # noqa: F401, F403
import reframe.core.schedulers.lsf # noqa: F401, F403
import reframe.core.schedulers.oar # noqa: F401, F403
import reframe.core.schedulers.pbs # noqa: F401, F403
import reframe.core.schedulers.sge # noqa: F401, F403
import reframe.core.schedulers.torque # noqa: F401, F403
import reframe.core.schedulers.lsf # noqa: F401, F403
import reframe.core.schedulers.slurm # noqa: F401, F403
7 changes: 4 additions & 3 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@
"scheduler": {
"type": "string",
"enum": [
"local", "pbs", "slurm",
"sge", "squeue", "torque", "lsf"
"local", "lsf", "oar", "pbs",
"sge", "slurm", "squeue", "torque"
]
},
"launcher": {
Expand Down Expand Up @@ -370,7 +370,8 @@
"properties": {
"name": {
"type": "string",
"enum": ["local", "pbs", "sge", "slurm", "squeue", "torque", "lsf"]
"enum": ["local", "lsf", "oar", "pbs",
"sge", "slurm", "squeue", "torque"]
},
"ignore_reqnodenotavail": {"type": "boolean"},
"resubmit_on_errors": {
Expand Down
19 changes: 17 additions & 2 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def launcher():
return getlauncher('local')


@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local',
'pbs', 'torque', 'lsf'])
@pytest.fixture(params=['local', 'lsf', 'oar', 'pbs',
'sge', 'slurm', 'squeue', 'torque'])
def scheduler(request):
return getscheduler(request.param)

Expand Down Expand Up @@ -226,6 +226,21 @@ def _expected_torque_directives(job):
])


def _expected_oar_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_tasks_per_node = job.num_tasks_per_node
return set([
f'#OAR -n "testjob"',
f'#OAR -O {job.stdout}',
f'#OAR -E {job.stderr}',
f'#OAR -l /host={num_nodes}/core={num_tasks_per_node},walltime=0:5:0',
f'#OAR --account=spam',
f'#OAR --gres=gpu:4',
f'#DW jobdw capacity=100GB',
f'#DW stage_in source=/foo'
])


def _expected_local_directives(job):
return set()

Expand Down