From 2f7e83fb19caf2918df9c8333d1cbc994bc25229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Wed, 21 Apr 2021 16:30:37 +0100 Subject: [PATCH 01/12] Minimally working SGE scheduler --- reframe/core/backends.py | 3 +- reframe/core/schedulers/registry.py | 1 + reframe/core/schedulers/sge.py | 218 ++++++++++++++++++++++++++++ reframe/schemas/config.json | 4 +- 4 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 reframe/core/schedulers/sge.py diff --git a/reframe/core/backends.py b/reframe/core/backends.py index 00eb84bd7d..04e9c1d8b5 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -19,7 +19,8 @@ _scheduler_backend_modules = [ 'reframe.core.schedulers.local', 'reframe.core.schedulers.slurm', - 'reframe.core.schedulers.pbs' + 'reframe.core.schedulers.pbs', + 'reframe.core.schedulers.sge', ] _schedulers = {} diff --git a/reframe/core/schedulers/registry.py b/reframe/core/schedulers/registry.py index 835a1d95c4..cf41168afc 100644 --- a/reframe/core/schedulers/registry.py +++ b/reframe/core/schedulers/registry.py @@ -38,4 +38,5 @@ def getscheduler(name): import reframe.core.schedulers.local # noqa: F401, F403 import reframe.core.schedulers.slurm # noqa: F401, F403 import reframe.core.schedulers.pbs # noqa: F401, F403 +import reframe.core.schedulers.sge # noqa: F401, F403 import reframe.core.schedulers.torque # noqa: F401, F403 diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py new file mode 100644 index 0000000000..8d4d4f5dc1 --- /dev/null +++ b/reframe/core/schedulers/sge.py @@ -0,0 +1,218 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +# +# SGE backend +# +# - Initial version submitted by Mosè Giordano, UCL (based on the PBS backend) +# + +import functools +import os +import itertools +import re +import time + +import reframe.core.runtime as rt +import reframe.core.schedulers as sched +import reframe.utility.osext as osext +from reframe.core.backends import register_scheduler +from reframe.core.exceptions import JobError, JobSchedulerError +from reframe.utility import seconds_to_hms + + +# Time to wait after a job is finished for its standard output/error to be +# written to the corresponding files. +# FIXME: Consider making this a configuration parameter +SGE_OUTPUT_WRITEBACK_WAIT = 3 + + +# Minimum amount of time between its submission and its cancellation. If you +# immediately cancel a SGE job after submission, its output files may never +# appear in the output causing the wait() to hang. +# FIXME: Consider making this a configuration parameter +SGE_CANCEL_DELAY = 3 + + +_run_strict = functools.partial(osext.run_command, check=True) + + +JOB_STATES = { + 'Q': 'QUEUED', + 'H': 'HELD', + 'R': 'RUNNING', + 'E': 'EXITING', + 'T': 'MOVED', + 'W': 'WAITING', + 'S': 'SUSPENDED', + 'C': 'COMPLETED', +} + + +class _SgeJob(sched.Job): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._cancelled = False + self._completed = False + + @property + def cancelled(self): + return self._cancelled + + @property + def completed(self): + return self._completed + + +@register_scheduler('sge') +class SgeJobScheduler(sched.JobScheduler): + TASKS_OPT = ('-l select={num_nodes}:mpiprocs={num_tasks_per_node}' + ':ncpus={num_cpus_per_node}') + + def __init__(self): + self._prefix = '#$' + self._submit_timeout = rt.runtime().get_option( + f'schedulers/@{self.registered_name}/job_submit_timeout' + ) + + def _emit_lselect_option(self, job): + num_tasks_per_node = job.num_tasks_per_node or 1 + num_cpus_per_task = job.num_cpus_per_task or 1 + num_nodes = job.num_tasks // num_tasks_per_node + num_cpus_per_node = num_tasks_per_node * num_cpus_per_task + select_opt = '' + self.TASKS_OPT.format( + num_nodes=num_nodes, + num_tasks_per_node=num_tasks_per_node, + num_cpus_per_node=num_cpus_per_node + ) + + # Options starting with `-` are emitted in separate lines + rem_opts = [] + verb_opts = [] + for opt in (*job.sched_access, *job.options, *job.cli_options): + if opt.startswith('-'): + rem_opts.append(opt) + elif opt.startswith('#'): + verb_opts.append(opt) + else: + select_opt += ':' + opt + + return [self._format_option(select_opt), + *(self._format_option(opt) for opt in rem_opts), + *verb_opts] + + def _format_option(self, option): + return self._prefix + ' ' + option + + def make_job(self, *args, **kwargs): + return _SgeJob(*args, **kwargs) + + def emit_preamble(self, job): + preamble = [ + self._format_option('-N "%s"' % job.name), + self._format_option('-o %s' % job.stdout), + self._format_option('-e %s' % job.stderr), + self._format_option('-wd %s' % job.workdir), + ] + + if job.time_limit is not None: + h, m, s = seconds_to_hms(job.time_limit) + preamble.append( + self._format_option('-l h_rt=%d:%d:%d' % (h, m, s))) + + preamble += self._emit_lselect_option(job) + + return preamble + + def allnodes(self): + raise NotImplementedError('sge backend does not support node listing') + + def filternodes(self, job, nodes): + raise NotImplementedError('sge backend does not support ' + 'node filtering') + + def submit(self, job): + # `-o` and `-e` options are only recognized in command line by the PBS, + # SGE, and Slurm wrappers. + cmd = f'qsub -o {job.stdout} -e {job.stderr} {job.script_filename}' + completed = _run_strict(cmd, timeout=self._submit_timeout) + jobid_match = re.search(r'^Your job (?P\S+)', completed.stdout) + if not jobid_match: + raise JobSchedulerError('could not retrieve the job id ' + 'of the submitted job') + + job._jobid = jobid_match.group('jobid') + job._submit_time = time.time() + + def wait(self, job): + intervals = itertools.cycle([1, 2, 3]) + while not self.finished(job): + self.poll(job) + time.sleep(next(intervals)) + + def cancel(self, job): + time_from_submit = time.time() - job.submit_time + if time_from_submit < SGE_CANCEL_DELAY: + time.sleep(SGE_CANCEL_DELAY - time_from_submit) + + _run_strict(f'qdel {job.jobid}', timeout=self._submit_timeout) + job._cancelled = True + + def finished(self, job): + if job.exception: + raise job.exception + + return job.completed + + def poll(self, *jobs): + if jobs: + # Filter out non-jobs + jobs = [job for job in jobs if job is not None] + + if not jobs: + return + + completed = osext.run_command( + f'qstat -f -j {",".join(job.jobid for job in jobs)}' + ) + + # If qstat cannot find any of the job IDs, it will return 1. + # Otherwise, it will return with return code 0 and print information + # only for the jobs it could find. + if completed.returncode == 1: + self.log('Return code is 1: jobids not known by scheduler, ' + 'assuming all jobs completed') + for job in jobs: + job._state = 'COMPLETED' + job._completed = True + + return + + if completed.returncode != 0: + raise JobSchedulerError( + f'qstat failed with exit code {completed.returncode} ' + f'(standard error follows):\n{completed.stderr}' + ) + + # Store information for each job separately + jobinfo = {} + for job_raw_info in completed.stdout.split('\n\n'): + jobid_match = re.search( + r'^job_number:\s*(?P\S+)', job_raw_info, re.MULTILINE + ) + if jobid_match: + jobid = jobid_match.group('jobid') + jobinfo[jobid] = job_raw_info + + for job in jobs: + if job.jobid not in jobinfo: + self.log(f'Job {job.jobid} not known to scheduler, ' + f'assuming job completed') + job._state = 'COMPLETED' + job._completed = True + continue + + job._state = 'RUNNING' diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index 391b91187a..8a027541a9 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -232,7 +232,7 @@ "type": "string", "enum": [ "local", "pbs", "slurm", - "squeue", "torque" + "sge", "squeue", "torque" ] }, "launcher": { @@ -357,7 +357,7 @@ "properties": { "name": { "type": "string", - "enum": ["local", "pbs", "slurm", "squeue", "torque"] + "enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"] }, "ignore_reqnodenotavail": {"type": "boolean"}, "resubmit_on_errors": { From db90f78a8ab87b091b0773339a27311208a15e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Sun, 2 May 2021 19:18:18 +0100 Subject: [PATCH 02/12] Use XML output --- reframe/core/schedulers/sge.py | 77 ++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 8d4d4f5dc1..ba71b04846 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -10,16 +10,16 @@ # import functools -import os import itertools import re import time +import xml.etree.ElementTree as ET import reframe.core.runtime as rt import reframe.core.schedulers as sched import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError, JobSchedulerError +from reframe.core.exceptions import JobSchedulerError from reframe.utility import seconds_to_hms @@ -39,18 +39,6 @@ _run_strict = functools.partial(osext.run_command, check=True) -JOB_STATES = { - 'Q': 'QUEUED', - 'H': 'HELD', - 'R': 'RUNNING', - 'E': 'EXITING', - 'T': 'MOVED', - 'W': 'WAITING', - 'S': 'SUSPENDED', - 'C': 'COMPLETED', -} - - class _SgeJob(sched.Job): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -175,37 +163,56 @@ def poll(self, *jobs): if not jobs: return + user = osext.osuser() completed = osext.run_command( - f'qstat -f -j {",".join(job.jobid for job in jobs)}' + f'qstat -xml -u {user}' ) - # If qstat cannot find any of the job IDs, it will return 1. - # Otherwise, it will return with return code 0 and print information - # only for the jobs it could find. - if completed.returncode == 1: - self.log('Return code is 1: jobids not known by scheduler, ' - 'assuming all jobs completed') - for job in jobs: - job._state = 'COMPLETED' - job._completed = True - - return - if completed.returncode != 0: raise JobSchedulerError( f'qstat failed with exit code {completed.returncode} ' f'(standard error follows):\n{completed.stderr}' ) + root = ET.fromstring(completed.stdout) + # Store information for each job separately jobinfo = {} - for job_raw_info in completed.stdout.split('\n\n'): - jobid_match = re.search( - r'^job_number:\s*(?P\S+)', job_raw_info, re.MULTILINE - ) - if jobid_match: - jobid = jobid_match.group('jobid') - jobinfo[jobid] = job_raw_info + for queue_info in root: + + # Reads the XML and prints jobs with status belonging to user. + if queue_info is None: + raise JobSchedulerError('Decomposition error!\n') + + for job_list in queue_info: + if job_list.find("JB_owner").text != user: + # Not a job of this user. + continue + + job_number = job_list.find("JB_job_number").text + + if job_number not in [job.jobid for job in jobs]: + # Not a reframe job. + continue + + state = job_list.find("state").text + + # For the list of known statuses see `man 5 sge_status` + # (https://arc.liv.ac.uk/SGE/htmlman/htmlman5/sge_status.html) + if state in ['r', 'hr', 't', 'Rr', 'Rt']: + jobinfo[job_number] = 'RUNNING' + elif state in ['qw', 'Rq', 'hqw', 'hRwq']: + jobinfo[job_number] = 'PENDING' + elif state in ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', + 'Rts', 'RS', 'RtS', 'RT', 'RtT']: + jobinfo[job_number] = 'SUSPENDED' + elif state in ['Eqw', 'Ehqw', 'EhRqw']: + jobinfo[job_number] = 'ERROR' + elif state in ['dr', 'dt', 'dRr', 'dRt', 'ds', + 'dS', 'dT', 'dRs', 'dRS', 'dRT']: + jobinfo[job_number] = 'DELETING' + elif state == 'z': + jobinfo[job_number] = 'COMPLETED' for job in jobs: if job.jobid not in jobinfo: @@ -215,4 +222,4 @@ def poll(self, *jobs): job._completed = True continue - job._state = 'RUNNING' + job._state = jobinfo[job.jobid] From aae98c48e402252472bbf0087a82527665495604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Wed, 5 May 2021 15:17:50 +0100 Subject: [PATCH 03/12] Inherit `SgeJobScheduler` from `PbsJobScheduler` --- reframe/core/schedulers/sge.py | 67 +--------------------------------- 1 file changed, 2 insertions(+), 65 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index ba71b04846..9dd9cb6f2a 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -10,52 +10,22 @@ # import functools -import itertools import re import time import xml.etree.ElementTree as ET import reframe.core.runtime as rt -import reframe.core.schedulers as sched import reframe.utility.osext as osext from reframe.core.backends import register_scheduler from reframe.core.exceptions import JobSchedulerError from reframe.utility import seconds_to_hms - - -# Time to wait after a job is finished for its standard output/error to be -# written to the corresponding files. -# FIXME: Consider making this a configuration parameter -SGE_OUTPUT_WRITEBACK_WAIT = 3 - - -# Minimum amount of time between its submission and its cancellation. If you -# immediately cancel a SGE job after submission, its output files may never -# appear in the output causing the wait() to hang. -# FIXME: Consider making this a configuration parameter -SGE_CANCEL_DELAY = 3 - +from reframe.core.schedulers.pbs import PbsJobScheduler _run_strict = functools.partial(osext.run_command, check=True) -class _SgeJob(sched.Job): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._cancelled = False - self._completed = False - - @property - def cancelled(self): - return self._cancelled - - @property - def completed(self): - return self._completed - - @register_scheduler('sge') -class SgeJobScheduler(sched.JobScheduler): +class SgeJobScheduler(PbsJobScheduler): TASKS_OPT = ('-l select={num_nodes}:mpiprocs={num_tasks_per_node}' ':ncpus={num_cpus_per_node}') @@ -92,12 +62,6 @@ def _emit_lselect_option(self, job): *(self._format_option(opt) for opt in rem_opts), *verb_opts] - def _format_option(self, option): - return self._prefix + ' ' + option - - def make_job(self, *args, **kwargs): - return _SgeJob(*args, **kwargs) - def emit_preamble(self, job): preamble = [ self._format_option('-N "%s"' % job.name), @@ -115,13 +79,6 @@ def emit_preamble(self, job): return preamble - def allnodes(self): - raise NotImplementedError('sge backend does not support node listing') - - def filternodes(self, job, nodes): - raise NotImplementedError('sge backend does not support ' - 'node filtering') - def submit(self, job): # `-o` and `-e` options are only recognized in command line by the PBS, # SGE, and Slurm wrappers. @@ -135,26 +92,6 @@ def submit(self, job): job._jobid = jobid_match.group('jobid') job._submit_time = time.time() - def wait(self, job): - intervals = itertools.cycle([1, 2, 3]) - while not self.finished(job): - self.poll(job) - time.sleep(next(intervals)) - - def cancel(self, job): - time_from_submit = time.time() - job.submit_time - if time_from_submit < SGE_CANCEL_DELAY: - time.sleep(SGE_CANCEL_DELAY - time_from_submit) - - _run_strict(f'qdel {job.jobid}', timeout=self._submit_timeout) - job._cancelled = True - - def finished(self, job): - if job.exception: - raise job.exception - - return job.completed - def poll(self, *jobs): if jobs: # Filter out non-jobs From 624c44e2b2ae64fbf383d03a1aed0ae925effca1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 17 May 2021 16:26:36 +0100 Subject: [PATCH 04/12] Apply suggestions from code review Co-authored-by: Vasileios Karakasis --- reframe/core/backends.py | 2 +- reframe/core/schedulers/sge.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reframe/core/backends.py b/reframe/core/backends.py index 04e9c1d8b5..fda7555f1e 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -20,7 +20,7 @@ 'reframe.core.schedulers.local', 'reframe.core.schedulers.slurm', 'reframe.core.schedulers.pbs', - 'reframe.core.schedulers.sge', + 'reframe.core.schedulers.sge' ] _schedulers = {} diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 9dd9cb6f2a..6a480284c7 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -18,8 +18,8 @@ import reframe.utility.osext as osext from reframe.core.backends import register_scheduler from reframe.core.exceptions import JobSchedulerError -from reframe.utility import seconds_to_hms from reframe.core.schedulers.pbs import PbsJobScheduler +from reframe.utility import seconds_to_hms _run_strict = functools.partial(osext.run_command, check=True) @@ -126,7 +126,7 @@ def poll(self, *jobs): # Not a job of this user. continue - job_number = job_list.find("JB_job_number").text + jobid = job_list.find("JB_job_number").text if job_number not in [job.jobid for job in jobs]: # Not a reframe job. From dc60d736f733742c6d2c1f7d7646bccbd212a494 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 17 May 2021 23:56:40 +0200 Subject: [PATCH 05/12] Some polling improvements to the SGE scheduler --- reframe/core/schedulers/sge.py | 54 ++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 6a480284c7..20d5d006c0 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -101,22 +101,25 @@ def poll(self, *jobs): return user = osext.osuser() - completed = osext.run_command( - f'qstat -xml -u {user}' - ) - + completed = osext.run_command(f'qstat -xml -u {user}') if completed.returncode != 0: raise JobSchedulerError( f'qstat failed with exit code {completed.returncode} ' f'(standard error follows):\n{completed.stderr}' ) + # Index the jobs to poll on their jobid + jobs_to_poll = {job.jobid: job for job in jobs} + + # Parse the XML root = ET.fromstring(completed.stdout) - # Store information for each job separately - jobinfo = {} - for queue_info in root: + # We are iterating over the returned XML and update the status of the + # jobs relevant to ReFrame; the naming convention of variables matches + # that of SGE's XML output + known_jobs = set() # jobs known to the SGE scheduler + for queue_info in root: # Reads the XML and prints jobs with status belonging to user. if queue_info is None: raise JobSchedulerError('Decomposition error!\n') @@ -127,36 +130,35 @@ def poll(self, *jobs): continue jobid = job_list.find("JB_job_number").text - - if job_number not in [job.jobid for job in jobs]: - # Not a reframe job. + if job_number not in jobs_to_poll: + # Not a reframe job continue state = job_list.find("state").text + job = jobs_to_poll[job_number] + known_jobs.add(job) # For the list of known statuses see `man 5 sge_status` # (https://arc.liv.ac.uk/SGE/htmlman/htmlman5/sge_status.html) if state in ['r', 'hr', 't', 'Rr', 'Rt']: - jobinfo[job_number] = 'RUNNING' + job._state = 'RUNNING' elif state in ['qw', 'Rq', 'hqw', 'hRwq']: - jobinfo[job_number] = 'PENDING' + job._state = 'PENDING' elif state in ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', 'Rts', 'RS', 'RtS', 'RT', 'RtT']: - jobinfo[job_number] = 'SUSPENDED' + job._state = 'SUSPENDED' elif state in ['Eqw', 'Ehqw', 'EhRqw']: - jobinfo[job_number] = 'ERROR' + job._state = 'ERROR' elif state in ['dr', 'dt', 'dRr', 'dRt', 'ds', 'dS', 'dT', 'dRs', 'dRS', 'dRT']: - jobinfo[job_number] = 'DELETING' + job._state = 'DELETING' elif state == 'z': - jobinfo[job_number] = 'COMPLETED' - - for job in jobs: - if job.jobid not in jobinfo: - self.log(f'Job {job.jobid} not known to scheduler, ' - f'assuming job completed') - job._state = 'COMPLETED' - job._completed = True - continue - - job._state = jobinfo[job.jobid] + job._state = 'COMPLETED' + + # Mark any "unknown" job as completed + unknown_jobs = set(jobs) - known_jobs + for job in unknown_jobs: + self.log(f'Job {job.jobid} not known to scheduler, ' + f'assuming job completed') + job._state = 'COMPLETED' + job._completed = True From 1f6cfb3351b2150795321565f17377a466574e2c Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 18 May 2021 00:09:20 +0200 Subject: [PATCH 06/12] Simplify completion assessment --- reframe/core/schedulers/sge.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 20d5d006c0..f596da9fc1 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -161,4 +161,9 @@ def poll(self, *jobs): self.log(f'Job {job.jobid} not known to scheduler, ' f'assuming job completed') job._state = 'COMPLETED' - job._completed = True + + def finished(self, job): + if job.exception: + raise job.exception + + return job.state == 'COMPLETED' From d90431cacf792e7a586246ea0d5810a6fb93586d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Tue, 18 May 2021 18:38:56 +0100 Subject: [PATCH 07/12] Fix variable names and improve error message --- reframe/core/schedulers/sge.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index f596da9fc1..312180b062 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -122,7 +122,7 @@ def poll(self, *jobs): for queue_info in root: # Reads the XML and prints jobs with status belonging to user. if queue_info is None: - raise JobSchedulerError('Decomposition error!\n') + raise JobSchedulerError('could not retrieve queue information!\n') for job_list in queue_info: if job_list.find("JB_owner").text != user: @@ -130,12 +130,12 @@ def poll(self, *jobs): continue jobid = job_list.find("JB_job_number").text - if job_number not in jobs_to_poll: + if jobid not in jobs_to_poll: # Not a reframe job continue state = job_list.find("state").text - job = jobs_to_poll[job_number] + job = jobs_to_poll[jobid] known_jobs.add(job) # For the list of known statuses see `man 5 sge_status` From ab5fcc2b7d88a730aee81623d98d625176c5cce1 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 19 May 2021 00:04:07 +0200 Subject: [PATCH 08/12] Update documentation --- docs/config_reference.rst | 5 +++++ reframe/core/schedulers/sge.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/config_reference.rst b/docs/config_reference.rst index c9ecaf8c08..314b14a369 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -212,12 +212,17 @@ System Partition Configuration - ``local``: Jobs will be launched locally without using any job scheduler. - ``pbs``: Jobs will be launched using the `PBS Pro `__ scheduler. - ``torque``: Jobs will be launched using the `Torque `__ scheduler. + - ``sge``: Jobs will be launched using the `Sun Grid Engine `__ scheduler. - ``slurm``: Jobs will be launched using the `Slurm `__ scheduler. This backend requires job accounting to be enabled in the target system. If not, you should consider using the ``squeue`` backend below. - ``squeue``: Jobs will be launched using the `Slurm `__ scheduler. This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. + .. versionadded:: 3.6.1 + Support for the SGE scheduler. + + .. js:attribute:: .systems[].partitions[].launcher :required: Yes diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 312180b062..17234dccb8 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -122,7 +122,7 @@ def poll(self, *jobs): for queue_info in root: # Reads the XML and prints jobs with status belonging to user. if queue_info is None: - raise JobSchedulerError('could not retrieve queue information!\n') + raise JobSchedulerError('could not retrieve queue information') for job_list in queue_info: if job_list.find("JB_owner").text != user: From b95439824a974661e56aac158bea062581513717 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 19 May 2021 00:12:15 +0200 Subject: [PATCH 09/12] WIP: Add unit test --- unittests/test_schedulers.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 6928e7efa4..cce867731f 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -26,7 +26,7 @@ def launcher(): return getlauncher('local') -@pytest.fixture(params=['slurm', 'squeue', 'local', 'pbs', 'torque']) +@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque']) def scheduler(request): return getscheduler(request.param) @@ -132,6 +132,25 @@ def assert_job_script_sanity(job): 'echo postrun'] == matches +def _expected_sge_directives(job): + num_nodes = job.num_tasks // job.num_tasks_per_node + num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node + return set([ + '#$ -N "testjob"', + '#$ -l walltime=0:5:0', + '#$ -o %s' % job.stdout, + '#$ -e %s' % job.stderr, + '#$ -l select=%s:mpiprocs=%s:ncpus=%s' + ':mem=100GB:cpu_type=haswell' % (num_nodes, + job.num_tasks_per_node, + num_cpus_per_node), + '#$ --account=spam', + '#$ --gres=gpu:4', + '#DW jobdw capacity=100GB', + '#DW stage_in source=/foo' + ]) + + def _expected_slurm_directives(job): return set([ '#SBATCH --job-name="testjob"', From 833357cc1c70387fd90d10d3a494d78f7434b356 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 20 May 2021 00:05:30 +0200 Subject: [PATCH 10/12] WIP: Add unit test --- reframe/core/schedulers/sge.py | 11 ++++++----- unittests/test_schedulers.py | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 17234dccb8..1944d32283 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -64,16 +64,17 @@ def _emit_lselect_option(self, job): def emit_preamble(self, job): preamble = [ - self._format_option('-N "%s"' % job.name), - self._format_option('-o %s' % job.stdout), - self._format_option('-e %s' % job.stderr), - self._format_option('-wd %s' % job.workdir), + self._format_option(f'-N "{job.name}"'), + self._format_option(f'-o {job.stdout}'), + self._format_option(f'-e {job.stderr}'), + self._format_option(f'-wd {job.workdir}') ] if job.time_limit is not None: h, m, s = seconds_to_hms(job.time_limit) preamble.append( - self._format_option('-l h_rt=%d:%d:%d' % (h, m, s))) + self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s)) + ) preamble += self._emit_lselect_option(job) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index cce867731f..b8a9c58433 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -137,7 +137,7 @@ def _expected_sge_directives(job): num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node return set([ '#$ -N "testjob"', - '#$ -l walltime=0:5:0', + '#$ -l h_rt=0:5:0', '#$ -o %s' % job.stdout, '#$ -e %s' % job.stderr, '#$ -l select=%s:mpiprocs=%s:ncpus=%s' @@ -224,7 +224,7 @@ def test_prepare(fake_job): prepare_job(fake_job) with open(fake_job.script_filename) as fp: - found_directives = set(re.findall(r'^\#\w+ .*', fp.read(), + found_directives = set(re.findall(r'^\#\S+ .*', fp.read(), re.MULTILINE)) expected_directives = globals()[f'_expected_{sched_name}_directives'] From f364ab5995fa5705cdad0bb69665d061c66906c0 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 22 Jul 2021 23:45:16 +0200 Subject: [PATCH 11/12] Remove `-lselect` option from the SGE scheduler + add unit tests --- reframe/core/schedulers/sge.py | 38 +++++++--------------------------- unittests/test_schedulers.py | 21 ++++++++----------- 2 files changed, 16 insertions(+), 43 deletions(-) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index 1944d32283..14965dc7e4 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -26,42 +26,12 @@ @register_scheduler('sge') class SgeJobScheduler(PbsJobScheduler): - TASKS_OPT = ('-l select={num_nodes}:mpiprocs={num_tasks_per_node}' - ':ncpus={num_cpus_per_node}') - def __init__(self): self._prefix = '#$' self._submit_timeout = rt.runtime().get_option( f'schedulers/@{self.registered_name}/job_submit_timeout' ) - def _emit_lselect_option(self, job): - num_tasks_per_node = job.num_tasks_per_node or 1 - num_cpus_per_task = job.num_cpus_per_task or 1 - num_nodes = job.num_tasks // num_tasks_per_node - num_cpus_per_node = num_tasks_per_node * num_cpus_per_task - select_opt = '' - self.TASKS_OPT.format( - num_nodes=num_nodes, - num_tasks_per_node=num_tasks_per_node, - num_cpus_per_node=num_cpus_per_node - ) - - # Options starting with `-` are emitted in separate lines - rem_opts = [] - verb_opts = [] - for opt in (*job.sched_access, *job.options, *job.cli_options): - if opt.startswith('-'): - rem_opts.append(opt) - elif opt.startswith('#'): - verb_opts.append(opt) - else: - select_opt += ':' + opt - - return [self._format_option(select_opt), - *(self._format_option(opt) for opt in rem_opts), - *verb_opts] - def emit_preamble(self, job): preamble = [ self._format_option(f'-N "{job.name}"'), @@ -76,7 +46,13 @@ def emit_preamble(self, job): self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s)) ) - preamble += self._emit_lselect_option(job) + # Emit the rest of the options + options = job.options + job.cli_options + for opt in options: + if opt.startswith('#'): + preamble.append(opt) + else: + preamble.append(self._format_option(opt)) return preamble diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index b8a9c58433..169beeae40 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -136,18 +136,15 @@ def _expected_sge_directives(job): num_nodes = job.num_tasks // job.num_tasks_per_node num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node return set([ - '#$ -N "testjob"', - '#$ -l h_rt=0:5:0', - '#$ -o %s' % job.stdout, - '#$ -e %s' % job.stderr, - '#$ -l select=%s:mpiprocs=%s:ncpus=%s' - ':mem=100GB:cpu_type=haswell' % (num_nodes, - job.num_tasks_per_node, - num_cpus_per_node), - '#$ --account=spam', - '#$ --gres=gpu:4', - '#DW jobdw capacity=100GB', - '#DW stage_in source=/foo' + f'#$ -N "testjob"', + f'#$ -l h_rt=0:5:0', + f'#$ -o {job.stdout}', + f'#$ -e {job.stderr}', + f'#$ -wd {job.workdir}', + f'#$ --gres=gpu:4', + f'#$ --account=spam', + f'#DW jobdw capacity=100GB', + f'#DW stage_in source=/foo' ]) From 393fc9eabaf25a967481045286602ce77a4d986f Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 26 Jul 2021 22:11:13 +0200 Subject: [PATCH 12/12] Document how slots can be defined and used with the SGE backend --- docs/config_reference.rst | 45 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 314b14a369..d1b3c60231 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -219,8 +219,49 @@ System Partition Configuration - ``squeue``: Jobs will be launched using the `Slurm `__ scheduler. This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. - .. versionadded:: 3.6.1 - Support for the SGE scheduler. + .. versionadded:: 3.7.2 + Support for the SGE scheduler is added. + + .. note:: + + The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific. + For this reason, the ``sge`` scheduler backend does not try to interpret any related arguments, e.g., ``num_tasks``, ``num_tasks_per_node`` etc. + Users must specify how these resources are to be requested by setting the :js:attr:`resources` partition configuration parameter and then request them from inside a test using the :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` test attribute. + Here is an example configuration for a system partition named ``foo`` that defines different ways for submitting MPI-only, OpenMP-only and MPI+OpenMP jobs: + + .. code-block:: python + + { + 'name': 'foo', + 'scheduler': 'sge', + 'resources': [ + { + 'name': 'smp', + 'options': ['-pe smp {num_slots}'] + }, + { + 'name': 'mpi', + 'options': ['-pe mpi {num_slots}'] + }, + { + 'name': 'mpismp', + 'options': ['-pe mpismp {num_slots}'] + } + ] + } + + Each test then can request the different type of slots as follows: + + .. code-block:: python + + self.extra_resouces = { + 'smp': {'num_slots': self.num_cpus_per_task}, + 'mpi': {'num_slots': self.num_tasks}, + 'mpismp': {'num_slots': self.num_tasks*self.num_cpus_per_task} + } + + Notice that defining :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` does not make the test non-portable to other systems that have different schedulers; + the :py:attr:`extra_resources` will be simply ignored in this case and the scheduler backend will interpret the different test fields in the appropriate way. .. js:attribute:: .systems[].partitions[].launcher