diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 9bdc4a3c28..6b2b720f67 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -212,12 +212,58 @@ System Partition Configuration - ``local``: Jobs will be launched locally without using any job scheduler. - ``pbs``: Jobs will be launched using the `PBS Pro `__ scheduler. - ``torque``: Jobs will be launched using the `Torque `__ scheduler. + - ``sge``: Jobs will be launched using the `Sun Grid Engine `__ scheduler. - ``slurm``: Jobs will be launched using the `Slurm `__ scheduler. This backend requires job accounting to be enabled in the target system. If not, you should consider using the ``squeue`` backend below. - ``squeue``: Jobs will be launched using the `Slurm `__ scheduler. This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. + .. versionadded:: 3.7.2 + Support for the SGE scheduler is added. + + .. note:: + + The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific. + For this reason, the ``sge`` scheduler backend does not try to interpret any related arguments, e.g., ``num_tasks``, ``num_tasks_per_node`` etc. + Users must specify how these resources are to be requested by setting the :js:attr:`resources` partition configuration parameter and then request them from inside a test using the :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` test attribute. + Here is an example configuration for a system partition named ``foo`` that defines different ways for submitting MPI-only, OpenMP-only and MPI+OpenMP jobs: + + .. code-block:: python + + { + 'name': 'foo', + 'scheduler': 'sge', + 'resources': [ + { + 'name': 'smp', + 'options': ['-pe smp {num_slots}'] + }, + { + 'name': 'mpi', + 'options': ['-pe mpi {num_slots}'] + }, + { + 'name': 'mpismp', + 'options': ['-pe mpismp {num_slots}'] + } + ] + } + + Each test then can request the different type of slots as follows: + + .. code-block:: python + + self.extra_resouces = { + 'smp': {'num_slots': self.num_cpus_per_task}, + 'mpi': {'num_slots': self.num_tasks}, + 'mpismp': {'num_slots': self.num_tasks*self.num_cpus_per_task} + } + + Notice that defining :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` does not make the test non-portable to other systems that have different schedulers; + the :py:attr:`extra_resources` will be simply ignored in this case and the scheduler backend will interpret the different test fields in the appropriate way. + + .. js:attribute:: .systems[].partitions[].launcher :required: Yes diff --git a/reframe/core/backends.py b/reframe/core/backends.py index 00eb84bd7d..fda7555f1e 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -19,7 +19,8 @@ _scheduler_backend_modules = [ 'reframe.core.schedulers.local', 'reframe.core.schedulers.slurm', - 'reframe.core.schedulers.pbs' + 'reframe.core.schedulers.pbs', + 'reframe.core.schedulers.sge' ] _schedulers = {} diff --git a/reframe/core/schedulers/registry.py b/reframe/core/schedulers/registry.py index 835a1d95c4..cf41168afc 100644 --- a/reframe/core/schedulers/registry.py +++ b/reframe/core/schedulers/registry.py @@ -38,4 +38,5 @@ def getscheduler(name): import reframe.core.schedulers.local # noqa: F401, F403 import reframe.core.schedulers.slurm # noqa: F401, F403 import reframe.core.schedulers.pbs # noqa: F401, F403 +import reframe.core.schedulers.sge # noqa: F401, F403 import reframe.core.schedulers.torque # noqa: F401, F403 diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py new file mode 100644 index 0000000000..14965dc7e4 --- /dev/null +++ b/reframe/core/schedulers/sge.py @@ -0,0 +1,146 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +# +# SGE backend +# +# - Initial version submitted by Mosè Giordano, UCL (based on the PBS backend) +# + +import functools +import re +import time +import xml.etree.ElementTree as ET + +import reframe.core.runtime as rt +import reframe.utility.osext as osext +from reframe.core.backends import register_scheduler +from reframe.core.exceptions import JobSchedulerError +from reframe.core.schedulers.pbs import PbsJobScheduler +from reframe.utility import seconds_to_hms + +_run_strict = functools.partial(osext.run_command, check=True) + + +@register_scheduler('sge') +class SgeJobScheduler(PbsJobScheduler): + def __init__(self): + self._prefix = '#$' + self._submit_timeout = rt.runtime().get_option( + f'schedulers/@{self.registered_name}/job_submit_timeout' + ) + + def emit_preamble(self, job): + preamble = [ + self._format_option(f'-N "{job.name}"'), + self._format_option(f'-o {job.stdout}'), + self._format_option(f'-e {job.stderr}'), + self._format_option(f'-wd {job.workdir}') + ] + + if job.time_limit is not None: + h, m, s = seconds_to_hms(job.time_limit) + preamble.append( + self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s)) + ) + + # Emit the rest of the options + options = job.options + job.cli_options + for opt in options: + if opt.startswith('#'): + preamble.append(opt) + else: + preamble.append(self._format_option(opt)) + + return preamble + + def submit(self, job): + # `-o` and `-e` options are only recognized in command line by the PBS, + # SGE, and Slurm wrappers. + cmd = f'qsub -o {job.stdout} -e {job.stderr} {job.script_filename}' + completed = _run_strict(cmd, timeout=self._submit_timeout) + jobid_match = re.search(r'^Your job (?P\S+)', completed.stdout) + if not jobid_match: + raise JobSchedulerError('could not retrieve the job id ' + 'of the submitted job') + + job._jobid = jobid_match.group('jobid') + job._submit_time = time.time() + + def poll(self, *jobs): + if jobs: + # Filter out non-jobs + jobs = [job for job in jobs if job is not None] + + if not jobs: + return + + user = osext.osuser() + completed = osext.run_command(f'qstat -xml -u {user}') + if completed.returncode != 0: + raise JobSchedulerError( + f'qstat failed with exit code {completed.returncode} ' + f'(standard error follows):\n{completed.stderr}' + ) + + # Index the jobs to poll on their jobid + jobs_to_poll = {job.jobid: job for job in jobs} + + # Parse the XML + root = ET.fromstring(completed.stdout) + + # We are iterating over the returned XML and update the status of the + # jobs relevant to ReFrame; the naming convention of variables matches + # that of SGE's XML output + + known_jobs = set() # jobs known to the SGE scheduler + for queue_info in root: + # Reads the XML and prints jobs with status belonging to user. + if queue_info is None: + raise JobSchedulerError('could not retrieve queue information') + + for job_list in queue_info: + if job_list.find("JB_owner").text != user: + # Not a job of this user. + continue + + jobid = job_list.find("JB_job_number").text + if jobid not in jobs_to_poll: + # Not a reframe job + continue + + state = job_list.find("state").text + job = jobs_to_poll[jobid] + known_jobs.add(job) + + # For the list of known statuses see `man 5 sge_status` + # (https://arc.liv.ac.uk/SGE/htmlman/htmlman5/sge_status.html) + if state in ['r', 'hr', 't', 'Rr', 'Rt']: + job._state = 'RUNNING' + elif state in ['qw', 'Rq', 'hqw', 'hRwq']: + job._state = 'PENDING' + elif state in ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', + 'Rts', 'RS', 'RtS', 'RT', 'RtT']: + job._state = 'SUSPENDED' + elif state in ['Eqw', 'Ehqw', 'EhRqw']: + job._state = 'ERROR' + elif state in ['dr', 'dt', 'dRr', 'dRt', 'ds', + 'dS', 'dT', 'dRs', 'dRS', 'dRT']: + job._state = 'DELETING' + elif state == 'z': + job._state = 'COMPLETED' + + # Mark any "unknown" job as completed + unknown_jobs = set(jobs) - known_jobs + for job in unknown_jobs: + self.log(f'Job {job.jobid} not known to scheduler, ' + f'assuming job completed') + job._state = 'COMPLETED' + + def finished(self, job): + if job.exception: + raise job.exception + + return job.state == 'COMPLETED' diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index 294fba0a32..8b1e3e990a 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -248,7 +248,7 @@ "type": "string", "enum": [ "local", "pbs", "slurm", - "squeue", "torque" + "sge", "squeue", "torque" ] }, "launcher": { @@ -370,7 +370,7 @@ "properties": { "name": { "type": "string", - "enum": ["local", "pbs", "slurm", "squeue", "torque"] + "enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"] }, "ignore_reqnodenotavail": {"type": "boolean"}, "resubmit_on_errors": { diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 6928e7efa4..169beeae40 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -26,7 +26,7 @@ def launcher(): return getlauncher('local') -@pytest.fixture(params=['slurm', 'squeue', 'local', 'pbs', 'torque']) +@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque']) def scheduler(request): return getscheduler(request.param) @@ -132,6 +132,22 @@ def assert_job_script_sanity(job): 'echo postrun'] == matches +def _expected_sge_directives(job): + num_nodes = job.num_tasks // job.num_tasks_per_node + num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node + return set([ + f'#$ -N "testjob"', + f'#$ -l h_rt=0:5:0', + f'#$ -o {job.stdout}', + f'#$ -e {job.stderr}', + f'#$ -wd {job.workdir}', + f'#$ --gres=gpu:4', + f'#$ --account=spam', + f'#DW jobdw capacity=100GB', + f'#DW stage_in source=/foo' + ]) + + def _expected_slurm_directives(job): return set([ '#SBATCH --job-name="testjob"', @@ -205,7 +221,7 @@ def test_prepare(fake_job): prepare_job(fake_job) with open(fake_job.script_filename) as fp: - found_directives = set(re.findall(r'^\#\w+ .*', fp.read(), + found_directives = set(re.findall(r'^\#\S+ .*', fp.read(), re.MULTILINE)) expected_directives = globals()[f'_expected_{sched_name}_directives']