Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,58 @@ System Partition Configuration
- ``local``: Jobs will be launched locally without using any job scheduler.
- ``pbs``: Jobs will be launched using the `PBS Pro <https://en.wikipedia.org/wiki/Portable_Batch_System>`__ scheduler.
- ``torque``: Jobs will be launched using the `Torque <https://en.wikipedia.org/wiki/TORQUE>`__ scheduler.
- ``sge``: Jobs will be launched using the `Sun Grid Engine <https://arc.liv.ac.uk/SGE/htmlman/manuals.html>`__ scheduler.
- ``slurm``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend requires job accounting to be enabled in the target system.
If not, you should consider using the ``squeue`` backend below.
- ``squeue``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.

.. versionadded:: 3.7.2
Support for the SGE scheduler is added.

.. note::

The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific.
For this reason, the ``sge`` scheduler backend does not try to interpret any related arguments, e.g., ``num_tasks``, ``num_tasks_per_node`` etc.
Users must specify how these resources are to be requested by setting the :js:attr:`resources` partition configuration parameter and then request them from inside a test using the :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` test attribute.
Here is an example configuration for a system partition named ``foo`` that defines different ways for submitting MPI-only, OpenMP-only and MPI+OpenMP jobs:

.. code-block:: python

{
'name': 'foo',
'scheduler': 'sge',
'resources': [
{
'name': 'smp',
'options': ['-pe smp {num_slots}']
},
{
'name': 'mpi',
'options': ['-pe mpi {num_slots}']
},
{
'name': 'mpismp',
'options': ['-pe mpismp {num_slots}']
}
]
}

Each test then can request the different type of slots as follows:

.. code-block:: python

self.extra_resouces = {
'smp': {'num_slots': self.num_cpus_per_task},
'mpi': {'num_slots': self.num_tasks},
'mpismp': {'num_slots': self.num_tasks*self.num_cpus_per_task}
}

Notice that defining :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` does not make the test non-portable to other systems that have different schedulers;
the :py:attr:`extra_resources` will be simply ignored in this case and the scheduler backend will interpret the different test fields in the appropriate way.


.. js:attribute:: .systems[].partitions[].launcher

:required: Yes
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
_scheduler_backend_modules = [
'reframe.core.schedulers.local',
'reframe.core.schedulers.slurm',
'reframe.core.schedulers.pbs'
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.sge'
]
_schedulers = {}

Expand Down
1 change: 1 addition & 0 deletions reframe/core/schedulers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ def getscheduler(name):
import reframe.core.schedulers.local # noqa: F401, F403
import reframe.core.schedulers.slurm # noqa: F401, F403
import reframe.core.schedulers.pbs # noqa: F401, F403
import reframe.core.schedulers.sge # noqa: F401, F403
import reframe.core.schedulers.torque # noqa: F401, F403
146 changes: 146 additions & 0 deletions reframe/core/schedulers/sge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

#
# SGE backend
#
# - Initial version submitted by Mosè Giordano, UCL (based on the PBS backend)
#

import functools
import re
import time
import xml.etree.ElementTree as ET

import reframe.core.runtime as rt
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import JobSchedulerError
from reframe.core.schedulers.pbs import PbsJobScheduler
from reframe.utility import seconds_to_hms

_run_strict = functools.partial(osext.run_command, check=True)


@register_scheduler('sge')
class SgeJobScheduler(PbsJobScheduler):
def __init__(self):
self._prefix = '#$'
self._submit_timeout = rt.runtime().get_option(
f'schedulers/@{self.registered_name}/job_submit_timeout'
)

def emit_preamble(self, job):
preamble = [
self._format_option(f'-N "{job.name}"'),
self._format_option(f'-o {job.stdout}'),
self._format_option(f'-e {job.stderr}'),
self._format_option(f'-wd {job.workdir}')
]

if job.time_limit is not None:
h, m, s = seconds_to_hms(job.time_limit)
preamble.append(
self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s))
)

# Emit the rest of the options
options = job.options + job.cli_options
for opt in options:
if opt.startswith('#'):
preamble.append(opt)
else:
preamble.append(self._format_option(opt))

return preamble

def submit(self, job):
# `-o` and `-e` options are only recognized in command line by the PBS,
# SGE, and Slurm wrappers.
cmd = f'qsub -o {job.stdout} -e {job.stderr} {job.script_filename}'
completed = _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'^Your job (?P<jobid>\S+)', completed.stdout)
if not jobid_match:
raise JobSchedulerError('could not retrieve the job id '
'of the submitted job')

job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

user = osext.osuser()
completed = osext.run_command(f'qstat -xml -u {user}')
if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
f'(standard error follows):\n{completed.stderr}'
)

# Index the jobs to poll on their jobid
jobs_to_poll = {job.jobid: job for job in jobs}

# Parse the XML
root = ET.fromstring(completed.stdout)

# We are iterating over the returned XML and update the status of the
# jobs relevant to ReFrame; the naming convention of variables matches
# that of SGE's XML output

known_jobs = set() # jobs known to the SGE scheduler
for queue_info in root:
# Reads the XML and prints jobs with status belonging to user.
if queue_info is None:
raise JobSchedulerError('could not retrieve queue information')

for job_list in queue_info:
if job_list.find("JB_owner").text != user:
# Not a job of this user.
continue

jobid = job_list.find("JB_job_number").text
if jobid not in jobs_to_poll:
# Not a reframe job
continue

state = job_list.find("state").text
job = jobs_to_poll[jobid]
known_jobs.add(job)

# For the list of known statuses see `man 5 sge_status`
# (https://arc.liv.ac.uk/SGE/htmlman/htmlman5/sge_status.html)
if state in ['r', 'hr', 't', 'Rr', 'Rt']:
job._state = 'RUNNING'
elif state in ['qw', 'Rq', 'hqw', 'hRwq']:
job._state = 'PENDING'
elif state in ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs',
'Rts', 'RS', 'RtS', 'RT', 'RtT']:
job._state = 'SUSPENDED'
elif state in ['Eqw', 'Ehqw', 'EhRqw']:
job._state = 'ERROR'
elif state in ['dr', 'dt', 'dRr', 'dRt', 'ds',
'dS', 'dT', 'dRs', 'dRS', 'dRT']:
job._state = 'DELETING'
elif state == 'z':
job._state = 'COMPLETED'

# Mark any "unknown" job as completed
unknown_jobs = set(jobs) - known_jobs
for job in unknown_jobs:
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'COMPLETED'

def finished(self, job):
if job.exception:
raise job.exception

return job.state == 'COMPLETED'
4 changes: 2 additions & 2 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
"type": "string",
"enum": [
"local", "pbs", "slurm",
"squeue", "torque"
"sge", "squeue", "torque"
]
},
"launcher": {
Expand Down Expand Up @@ -370,7 +370,7 @@
"properties": {
"name": {
"type": "string",
"enum": ["local", "pbs", "slurm", "squeue", "torque"]
"enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"]
},
"ignore_reqnodenotavail": {"type": "boolean"},
"resubmit_on_errors": {
Expand Down
20 changes: 18 additions & 2 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def launcher():
return getlauncher('local')


@pytest.fixture(params=['slurm', 'squeue', 'local', 'pbs', 'torque'])
@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque'])
def scheduler(request):
return getscheduler(request.param)

Expand Down Expand Up @@ -132,6 +132,22 @@ def assert_job_script_sanity(job):
'echo postrun'] == matches


def _expected_sge_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node
return set([
f'#$ -N "testjob"',
f'#$ -l h_rt=0:5:0',
f'#$ -o {job.stdout}',
f'#$ -e {job.stderr}',
f'#$ -wd {job.workdir}',
f'#$ --gres=gpu:4',
f'#$ --account=spam',
f'#DW jobdw capacity=100GB',
f'#DW stage_in source=/foo'
])


def _expected_slurm_directives(job):
return set([
'#SBATCH --job-name="testjob"',
Expand Down Expand Up @@ -205,7 +221,7 @@ def test_prepare(fake_job):

prepare_job(fake_job)
with open(fake_job.script_filename) as fp:
found_directives = set(re.findall(r'^\#\w+ .*', fp.read(),
found_directives = set(re.findall(r'^\#\S+ .*', fp.read(),
re.MULTILINE))

expected_directives = globals()[f'_expected_{sched_name}_directives']
Expand Down