From 1fe0f32f16f7c1afbfce14b3aad8d6d0c157ae2e Mon Sep 17 00:00:00 2001 From: rafael Date: Tue, 10 Dec 2019 13:43:56 +0100 Subject: [PATCH 01/10] add max queuing time to jobs --- reframe/core/pipeline.py | 10 ++++++++++ reframe/core/schedulers/__init__.py | 10 ++++++++-- reframe/core/schedulers/slurm.py | 9 ++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 93d6d3b45c..1391fce6e3 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -421,6 +421,14 @@ class RegressionTest(metaclass=RegressionTestMeta): use_multithreading = fields.TypedField('use_multithreading', bool, type(None)) + #: Specify the maximum time the job can be in the queue before running + #: + #: Ignored if :class:`None`. + #: + #: :type: integral or :class:`None` + #: :default: :class:`None + max_queuing_time = fields.TypedField('max_queuing_time', int, type(None)) + #: Specify whether this test needs exclusive access to nodes. #: #: :type: boolean @@ -670,6 +678,7 @@ def _rfm_init(self, name=None, prefix=None): self.num_tasks_per_socket = None self.use_multithreading = None self.exclusive_access = False + self.max_queuing_time = None # True only if check is to be run locally self.local = False @@ -957,6 +966,7 @@ def _setup_job(self, **job_opts): launcher_type(), name='rfm_%s_job' % self.name, workdir=self._stagedir, + max_queuing_time = self.max_queuing_time, sched_access=self._current_partition.access, sched_exclusive_access=self.exclusive_access, **job_opts) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 622822b642..4f47adb4a6 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -31,7 +31,7 @@ def submit(self, job): pass @abc.abstractmethod - def wait(self, job): + def wait(self, job, max_queuing_time): pass @abc.abstractmethod @@ -154,6 +154,7 @@ class Job: def __init__(self, name, workdir='.', + max_queuing_time = None, script_filename=None, stdout=None, stderr=None, @@ -186,6 +187,7 @@ def __init__(self, self._name = name self._workdir = workdir + self._max_queuing_time = max_queuing_time self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name @@ -215,6 +217,10 @@ def name(self): def workdir(self): return self._workdir + @property + def max_queuing_time(self): + return self._max_queuing_time + @property def script_filename(self): return self._script_filename @@ -321,7 +327,7 @@ def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') - return self.scheduler.wait(self) + return self.scheduler.wait(self, self.max_queuing_time) def cancel(self): if self.jobid is None: diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 37e729f6dd..6699ffd21f 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -376,7 +376,7 @@ def _check_and_cancel(self, job, reason_descr): raise JobBlockedError(reason_msg, jobid=job.jobid) - def wait(self, job): + def wait(self, job, max_queuing_time): # Quickly return in case we have finished already if slurm_state_completed(job.state): if self.is_array(job): @@ -386,7 +386,14 @@ def wait(self, job): intervals = itertools.cycle(settings().job_poll_intervals) self._update_state(job) + start_pending = time.time() while not slurm_state_completed(job.state): + if max_queuing_time: + if slurm_state_pending(job.state): + if time.time() - start_pending > max_queuing_time: + self.cancel(job) + raise JobError('maximum queuing time exceeded') + time.sleep(next(intervals)) self._update_state(job) From 44bac0908e22d407dd7c07044d9aa342528cb4ad Mon Sep 17 00:00:00 2001 From: rafael Date: Fri, 13 Dec 2019 15:12:48 +0100 Subject: [PATCH 02/10] fix comments --- reframe/core/pipeline.py | 10 ++++++---- reframe/core/schedulers/__init__.py | 12 ++++++------ reframe/core/schedulers/slurm.py | 6 ++++++ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 1391fce6e3..61e180469d 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -423,11 +423,13 @@ class RegressionTest(metaclass=RegressionTestMeta): #: Specify the maximum time the job can be in the queue before running #: - #: Ignored if :class:`None`. + #: The time is specified as a three-tuple in the form ``(hh, mm, ss)``, + #: with ``hh >= 0``, ``0 <= mm <= 59`` and ``0 <= ss <= 59``. + #: If set to :class:`None`, no maximum queuing time will be set. #: #: :type: integral or :class:`None` #: :default: :class:`None - max_queuing_time = fields.TypedField('max_queuing_time', int, type(None)) + max_queue_time = fields.TimerField('max_queue_time', type(None)) #: Specify whether this test needs exclusive access to nodes. #: @@ -678,7 +680,7 @@ def _rfm_init(self, name=None, prefix=None): self.num_tasks_per_socket = None self.use_multithreading = None self.exclusive_access = False - self.max_queuing_time = None + self.max_queue_time = None # True only if check is to be run locally self.local = False @@ -966,7 +968,7 @@ def _setup_job(self, **job_opts): launcher_type(), name='rfm_%s_job' % self.name, workdir=self._stagedir, - max_queuing_time = self.max_queuing_time, + max_queue_time = self.max_queue_time, sched_access=self._current_partition.access, sched_exclusive_access=self.exclusive_access, **job_opts) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 4f47adb4a6..9736bc2760 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -31,7 +31,7 @@ def submit(self, job): pass @abc.abstractmethod - def wait(self, job, max_queuing_time): + def wait(self, job, max_queue_time): pass @abc.abstractmethod @@ -154,7 +154,7 @@ class Job: def __init__(self, name, workdir='.', - max_queuing_time = None, + max_queue_time = None, script_filename=None, stdout=None, stderr=None, @@ -187,7 +187,7 @@ def __init__(self, self._name = name self._workdir = workdir - self._max_queuing_time = max_queuing_time + self._max_queue_time = max_queue_time self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name @@ -218,8 +218,8 @@ def workdir(self): return self._workdir @property - def max_queuing_time(self): - return self._max_queuing_time + def max_queue_time(self): + return self._max_queue_time @property def script_filename(self): @@ -327,7 +327,7 @@ def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') - return self.scheduler.wait(self, self.max_queuing_time) + return self.scheduler.wait(self, self.max_queue_time) def cancel(self): if self.jobid is None: diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 6699ffd21f..dea96b7f1f 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -387,6 +387,12 @@ def wait(self, job, max_queuing_time): intervals = itertools.cycle(settings().job_poll_intervals) self._update_state(job) start_pending = time.time() + if max_queuing_time is not None: + h, m, s = max_queuing_time + max_queuing_time = h * 3600 + m * 60 + s + else: + max_queuing_time = 0 + while not slurm_state_completed(job.state): if max_queuing_time: if slurm_state_pending(job.state): From 244dbd1d028977c465e0069fb60047af28f31812 Mon Sep 17 00:00:00 2001 From: rafael Date: Mon, 16 Dec 2019 16:56:16 +0100 Subject: [PATCH 03/10] fix comments --- reframe/core/pipeline.py | 14 ++++++-------- reframe/core/schedulers/__init__.py | 12 ++++++------ reframe/core/schedulers/local.py | 4 ++-- reframe/core/schedulers/pbs.py | 15 ++++++++++++++- reframe/core/schedulers/slurm.py | 16 ++++++++-------- unittests/test_schedulers.py | 1 + 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 61e180469d..3b100ceb9b 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -421,15 +421,13 @@ class RegressionTest(metaclass=RegressionTestMeta): use_multithreading = fields.TypedField('use_multithreading', bool, type(None)) - #: Specify the maximum time the job can be in the queue before running + #: Specify the maximum time the job can be pending before starting running #: - #: The time is specified as a three-tuple in the form ``(hh, mm, ss)``, - #: with ``hh >= 0``, ``0 <= mm <= 59`` and ``0 <= ss <= 59``. - #: If set to :class:`None`, no maximum queuing time will be set. + #: The time is specified with the same format of `time_limit`. #: - #: :type: integral or :class:`None` + #: :type: :class:`tuple[int]` #: :default: :class:`None - max_queue_time = fields.TimerField('max_queue_time', type(None)) + max_pending_time = fields.TimerField('max_pending_time', type(None)) #: Specify whether this test needs exclusive access to nodes. #: @@ -680,7 +678,7 @@ def _rfm_init(self, name=None, prefix=None): self.num_tasks_per_socket = None self.use_multithreading = None self.exclusive_access = False - self.max_queue_time = None + self.max_pending_time = None # True only if check is to be run locally self.local = False @@ -968,7 +966,7 @@ def _setup_job(self, **job_opts): launcher_type(), name='rfm_%s_job' % self.name, workdir=self._stagedir, - max_queue_time = self.max_queue_time, + max_pending_time=self.max_pending_time, sched_access=self._current_partition.access, sched_exclusive_access=self.exclusive_access, **job_opts) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 9736bc2760..35538e3637 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -31,7 +31,7 @@ def submit(self, job): pass @abc.abstractmethod - def wait(self, job, max_queue_time): + def wait(self, job, max_pending_time): pass @abc.abstractmethod @@ -154,7 +154,7 @@ class Job: def __init__(self, name, workdir='.', - max_queue_time = None, + max_pending_time=None, script_filename=None, stdout=None, stderr=None, @@ -187,7 +187,7 @@ def __init__(self, self._name = name self._workdir = workdir - self._max_queue_time = max_queue_time + self._max_pending_time = max_pending_time self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name @@ -218,8 +218,8 @@ def workdir(self): return self._workdir @property - def max_queue_time(self): - return self._max_queue_time + def max_pending_time(self): + return self._max_pending_time @property def script_filename(self): @@ -327,7 +327,7 @@ def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') - return self.scheduler.wait(self, self.max_queue_time) + return self.scheduler.wait(self, self.max_pending_time) def cancel(self): if self.jobid is None: diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 561ebf2ca0..dd28723a4f 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -118,9 +118,9 @@ def cancel(self, job): # Set the time limit to the grace period and let wait() do the final # killing job.time_limit = (0, 0, self._cancel_grace_period) - self.wait(job) + self.wait(job, None) - def wait(self, job): + def wait(self, job, max_pending_time): '''Wait for the spawned job to finish. As soon as the parent job process finishes, all of its spawned diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index 7edb61ae25..21569890b6 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -107,9 +107,22 @@ def submit(self, job): if info: self._pbs_server = info[0] - def wait(self, job): + def wait(self, job, max_pending_time): intervals = itertools.cycle(settings().job_poll_intervals) + start_pending = time.time() + if max_pending_time is not None: + h, m, s = max_pending_time + max_pending_time = h * 3600 + m * 60 + s + else: + max_pending_time = 0 + while not self.finished(job): + if max_pending_time: + if slurm_state_pending(job.state): + if time.time() - start_pending > max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded') + time.sleep(next(intervals)) def cancel(self, job): diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index dea96b7f1f..12857ee58f 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -376,7 +376,7 @@ def _check_and_cancel(self, job, reason_descr): raise JobBlockedError(reason_msg, jobid=job.jobid) - def wait(self, job, max_queuing_time): + def wait(self, job, max_pending_time): # Quickly return in case we have finished already if slurm_state_completed(job.state): if self.is_array(job): @@ -387,18 +387,18 @@ def wait(self, job, max_queuing_time): intervals = itertools.cycle(settings().job_poll_intervals) self._update_state(job) start_pending = time.time() - if max_queuing_time is not None: - h, m, s = max_queuing_time - max_queuing_time = h * 3600 + m * 60 + s + if max_pending_time is not None: + h, m, s = max_pending_time + max_pending_time = h * 3600 + m * 60 + s else: - max_queuing_time = 0 + max_pending_time = 0 while not slurm_state_completed(job.state): - if max_queuing_time: + if max_pending_time: if slurm_state_pending(job.state): - if time.time() - start_pending > max_queuing_time: + if time.time() - start_pending > max_pending_time: self.cancel(job) - raise JobError('maximum queuing time exceeded') + raise JobError('maximum pending time exceeded') time.sleep(next(intervals)) self._update_state(job) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 47b2508270..5c8fb9f39d 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -26,6 +26,7 @@ def setUp(self): self.scheduler, self.launcher, name='testjob', workdir=self.workdir, + max_pending_time=None, script_filename=os_ext.mkstemp_path( dir=self.workdir, suffix='.sh' ), From a8007ff1829eee40179411bddd2a6d9542d479c1 Mon Sep 17 00:00:00 2001 From: rafael Date: Tue, 25 Feb 2020 23:38:21 +0100 Subject: [PATCH 04/10] minor fixes --- reframe/core/schedulers/__init__.py | 2 +- reframe/core/schedulers/slurm.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 6ed4588786..5185187b56 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -344,8 +344,8 @@ def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') + self.scheduler.wait(self, self.max_pending_time) self._completion_time = self._completion_time or time.time() - return self.scheduler.wait(self, self.max_pending_time) def cancel(self): if self.jobid is None: diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 4976f68ba7..a8e8ff63b0 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -489,9 +489,6 @@ def __init__(self): def completion_time(self, job): return None - def submit(self, job): - super().submit(job) - def _update_state(self, job): time_from_submit = datetime.now() - self._submit_time rem_wait = self._squeue_delay - time_from_submit.total_seconds() From 06c61da3849ccf16c8f652ac0748ff05b582ba14 Mon Sep 17 00:00:00 2001 From: rafael Date: Thu, 5 Mar 2020 09:26:07 +0100 Subject: [PATCH 05/10] fix comments --- reframe/core/pipeline.py | 6 +++--- reframe/core/schedulers/__init__.py | 12 ++++++------ reframe/core/schedulers/local.py | 4 ++-- reframe/core/schedulers/slurm.py | 12 ++++++------ 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 825b5936b3..78b5dd316f 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -450,15 +450,15 @@ class RegressionTest(metaclass=RegressionTestMeta): use_multithreading = fields.TypedField('use_multithreading', bool, type(None)) - #: Specify the maximum time the job can be pending before starting running + #: The maximum time a job can be pending before starting running. #: - #: The time is specified with the same format of `time_limit`. + #: Time duration is specified as of the :attr:`time_limit` attribute. #: #: :type: :class:`str` or :class:`datetime.timedelta`` #: :default: :class:`None #: #: .. note:: - #: .. versionchanged:: 2.15 + #: .. versionchanged:: 3.0 #: max_pending_time = fields.TimerField('max_pending_time', type(None)) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 5185187b56..47d4e85c9f 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -43,7 +43,7 @@ def submit(self, job): pass @abc.abstractmethod - def wait(self, job, max_pending_time): + def wait(self, job): pass @abc.abstractmethod @@ -51,7 +51,7 @@ def cancel(self, job): pass @abc.abstractmethod - def finished(self, job, max_pending_time): + def finished(self, job): pass @@ -166,10 +166,10 @@ class Job: def __init__(self, name, workdir='.', - max_pending_time=None, script_filename=None, stdout=None, stderr=None, + max_pending_time=None, sched_flex_alloc_nodes=None, sched_access=[], sched_account=None, @@ -199,10 +199,10 @@ def __init__(self, self._name = name self._workdir = workdir - self._max_pending_time = max_pending_time self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name + self._max_pending_time = max_pending_time self._completion_time = None # Backend scheduler related information @@ -344,7 +344,7 @@ def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') - self.scheduler.wait(self, self.max_pending_time) + self.scheduler.wait(self) self._completion_time = self._completion_time or time.time() def cancel(self): @@ -357,7 +357,7 @@ def finished(self): if self.jobid is None: raise JobNotStartedError('cannot poll an unstarted job') - done = self.scheduler.finished(self, self.max_pending_time) + done = self.scheduler.finished(self) if done: self._completion_time = self._completion_time or time.time() diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 90f1e2b210..4d88b11dc0 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -128,7 +128,7 @@ def cancel(self, job): job.time_limit = timedelta(seconds=self._cancel_grace_period) self.wait(job, None) - def wait(self, job, max_pending_time): + def wait(self, job): '''Wait for the spawned job to finish. As soon as the parent job process finishes, all of its spawned @@ -164,7 +164,7 @@ def wait(self, job, max_pending_time): self._f_stdout.close() self._f_stderr.close() - def finished(self, job, max_pending_time): + def finished(self, job): '''Check if the spawned process has finished. This function does not wait the process. It just queries its state. If diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index a8e8ff63b0..f218607305 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -409,7 +409,7 @@ def _check_and_cancel(self, job, reason_descr): raise JobBlockedError(reason_msg, jobid=job.jobid) - def wait(self, job, max_pending_time): + def wait(self, job): # Quickly return in case we have finished already if slurm_state_completed(job.state): if self.is_array(job): @@ -421,9 +421,9 @@ def wait(self, job, max_pending_time): self._update_state(job) while not slurm_state_completed(job.state): - if max_pending_time: + if job.max_pending_time: if slurm_state_pending(job.state): - if datetime.now() - self._submit_time > max_pending_time: + if datetime.now() - self._submit_time >= job.max_pending_time: self.cancel(job) raise JobError('maximum pending time exceeded', jobid=job.jobid) @@ -440,7 +440,7 @@ def cancel(self, job): timeout=settings().job_submit_timeout) self._is_cancelling = True - def finished(self, job, max_pending_time): + def finished(self, job): try: self._update_state(job) except JobBlockedError: @@ -452,9 +452,9 @@ def finished(self, job, max_pending_time): getlogger().debug('ignoring error during polling: %s' % e) return False else: - if max_pending_time: + if job.max_pending_time: if slurm_state_pending(job.state): - if datetime.now() - self._submit_time > max_pending_time: + if datetime.now() - self._submit_time >= job.max_pending_time: self.cancel(job) raise JobError('maximum pending time exceeded', jobid=job.jobid) From ceb8a4d886bfe14d4d7f46bee0b1fb1ee064fbf9 Mon Sep 17 00:00:00 2001 From: rafael Date: Fri, 6 Mar 2020 10:26:55 +0100 Subject: [PATCH 06/10] fix comments and add unittest --- reframe/core/schedulers/local.py | 2 +- reframe/core/schedulers/pbs.py | 15 +++------------ unittests/test_schedulers.py | 19 +++++++++++++++++-- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 4d88b11dc0..6d4bbf7730 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -126,7 +126,7 @@ def cancel(self, job): # Set the time limit to the grace period and let wait() do the final # killing job.time_limit = timedelta(seconds=self._cancel_grace_period) - self.wait(job, None) + self.wait(job) def wait(self, job): '''Wait for the spawned job to finish. diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index 812ce03ebb..de62e5a6b4 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -122,11 +122,9 @@ def submit(self, job): if info: self._pbs_server = info[0] - self._submit_time = datetime.now() - - def wait(self, job, max_pending_time): + def wait(self, job): intervals = itertools.cycle(settings().job_poll_intervals) - while not self.finished(job, max_pending_time): + while not self.finished(job): time.sleep(next(intervals)) def cancel(self, job): @@ -138,7 +136,7 @@ def cancel(self, job): getlogger().debug('cancelling job (id=%s)' % jobid) _run_strict('qdel %s' % jobid, timeout=settings().job_submit_timeout) - def finished(self, job, max_pending_time): + def finished(self, job): with os_ext.change_dir(job.workdir): done = os.path.exists(job.stdout) and os.path.exists(job.stderr) @@ -147,11 +145,4 @@ def finished(self, job, max_pending_time): self._time_finished = self._time_finished or t_now time_from_finish = (t_now - self._time_finished).total_seconds() - if max_pending_time: - if slurm_state_pending(job.state): - if datetime.now() - self._submit_time > max_pending_time: - self.cancel(job) - raise JobError('maximum pending time exceeded', - jobid=job.jobid) - return done and time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index a2ab5f1b05..2d01e0ed28 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -11,7 +11,7 @@ import tempfile import time import unittest -from datetime import datetime +from datetime import datetime, timedelta import reframe.core.runtime as rt import reframe.utility.os_ext as os_ext @@ -32,7 +32,6 @@ def setUp(self): self.scheduler, self.launcher, name='testjob', workdir=self.workdir, - max_pending_time=None, script_filename=os_ext.mkstemp_path( dir=self.workdir, suffix='.sh' ), @@ -339,6 +338,9 @@ def sched_configured(self): def setup_user(self, msg=None): super().setup_user(msg='SLURM (with sacct) not configured') + def _update_state(self, job): + job.state = 'PENDING' + def test_prepare(self): self.setup_job() super().test_prepare() @@ -437,6 +439,19 @@ def test_submit_job_array(self): assert all([re.search('Task id: 0', output), re.search('Task id: 1', output)]) + # Monkey patch `self._update_state` to simulate that the job is + # pending on the queue for enough time so it can be canceled due + # to exceeding the maximum pending time + @fixtures.switch_to_user_runtime + def test_submit_max_pending_time(self): + self.setup_user() + self.prepare() + self.testjob.scheduler._update_state = self._update_state + self.testjob._max_pending_time = timedelta(seconds=5) + self.testjob.submit() + with pytest.raises(JobError): + self.testjob.wait() + class TestSqueueJob(TestSlurmJob): @property From 96c3a57d3bcf6cf70b8dd1aa5dcd6eeba3036aae Mon Sep 17 00:00:00 2001 From: rafael Date: Fri, 6 Mar 2020 17:10:42 +0100 Subject: [PATCH 07/10] fix comments and add unittest --- reframe/core/schedulers/pbs.py | 2 ++ reframe/core/schedulers/torque.py | 8 ++++++++ unittests/test_schedulers.py | 18 ++++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index de62e5a6b4..74310d8247 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -122,6 +122,8 @@ def submit(self, job): if info: self._pbs_server = info[0] + self._submit_time = datetime.now() + def wait(self, job): intervals = itertools.cycle(settings().job_poll_intervals) while not self.finished(job): diff --git a/reframe/core/schedulers/torque.py b/reframe/core/schedulers/torque.py index 07aba34214..e126130ef9 100644 --- a/reframe/core/schedulers/torque.py +++ b/reframe/core/schedulers/torque.py @@ -9,6 +9,7 @@ # import re +from datetime import datetime import reframe.utility.os_ext as os_ext from reframe.core.config import settings @@ -83,4 +84,11 @@ def finished(self, job): getlogger().debug('ignoring error during polling: %s' % e) return False else: + if job.max_pending_time: + if job.state in ['QUEUED', 'HELD', 'WAITING']: + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) + return job.state == 'COMPLETED' diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 2d01e0ed28..d81993ffb8 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -445,6 +445,7 @@ def test_submit_job_array(self): @fixtures.switch_to_user_runtime def test_submit_max_pending_time(self): self.setup_user() + self.parallel_cmd = 'sleep 30' self.prepare() self.testjob.scheduler._update_state = self._update_state self.testjob._max_pending_time = timedelta(seconds=5) @@ -577,6 +578,23 @@ def test_submit_timelimit(self): # Skip this test for PBS, since we the minimum time limit is 1min pytest.skip("Torque minimum time limit is 60s") + def _update_state(self, job): + job.state = 'QUEUED' + + # Monkey patch `self._update_state` to simulate that the job is + # pending on the queue for enough time so it can be canceled due + # to exceeding the maximum pending time + @fixtures.switch_to_user_runtime + def test_submit_max_pending_time(self): + self.setup_user() + self.parallel_cmd = 'sleep 30' + self.prepare() + self.testjob.scheduler._update_state = self._update_state + self.testjob._max_pending_time = timedelta(seconds=5) + self.testjob.submit() + with pytest.raises(JobError): + self.testjob.wait() + class TestSlurmFlexibleNodeAllocation(unittest.TestCase): def create_dummy_nodes(obj): From 1f9c8d135d84cff6012eae8fe7f941d1429aa32e Mon Sep 17 00:00:00 2001 From: rafael Date: Mon, 9 Mar 2020 07:43:36 +0100 Subject: [PATCH 08/10] fix comments --- reframe/core/pipeline.py | 2 +- reframe/core/schedulers/slurm.py | 22 ++++++++++------------ reframe/core/schedulers/torque.py | 13 +++++++------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 78b5dd316f..8d62f0e6e1 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -430,7 +430,7 @@ class RegressionTest(metaclass=RegressionTestMeta): #: :type: integral or :class:`None` #: :default: :class:`None` num_tasks_per_core = fields.TypedField('num_tasks_per_core', - int, type(None)) + int, type(None)) #: Number of tasks per socket required by this test. #: diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index f218607305..20f53c51b0 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -421,12 +421,11 @@ def wait(self, job): self._update_state(job) while not slurm_state_completed(job.state): - if job.max_pending_time: - if slurm_state_pending(job.state): - if datetime.now() - self._submit_time >= job.max_pending_time: - self.cancel(job) - raise JobError('maximum pending time exceeded', - jobid=job.jobid) + if job.max_pending_time and slurm_state_pending(job.state): + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) time.sleep(next(intervals)) self._update_state(job) @@ -452,12 +451,11 @@ def finished(self, job): getlogger().debug('ignoring error during polling: %s' % e) return False else: - if job.max_pending_time: - if slurm_state_pending(job.state): - if datetime.now() - self._submit_time >= job.max_pending_time: - self.cancel(job) - raise JobError('maximum pending time exceeded', - jobid=job.jobid) + if job.max_pending_time and slurm_state_pending(job.state): + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) return slurm_state_completed(job.state) diff --git a/reframe/core/schedulers/torque.py b/reframe/core/schedulers/torque.py index e126130ef9..47c66afe36 100644 --- a/reframe/core/schedulers/torque.py +++ b/reframe/core/schedulers/torque.py @@ -84,11 +84,12 @@ def finished(self, job): getlogger().debug('ignoring error during polling: %s' % e) return False else: - if job.max_pending_time: - if job.state in ['QUEUED', 'HELD', 'WAITING']: - if datetime.now() - self._submit_time >= job.max_pending_time: - self.cancel(job) - raise JobError('maximum pending time exceeded', - jobid=job.jobid) + if job.max_pending_time and job.state in ['QUEUED', + 'HELD', + 'WAITING']: + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) return job.state == 'COMPLETED' From e77a66a73aa24bd85b5a1fcfe50e62f7b83720be Mon Sep 17 00:00:00 2001 From: rafael Date: Mon, 9 Mar 2020 14:39:15 +0100 Subject: [PATCH 09/10] fix comments --- unittests/test_schedulers.py | 51 ++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index d81993ffb8..4b42f1fc2a 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -197,6 +197,20 @@ def test_guess_num_tasks(self): with pytest.raises(NotImplementedError): self.testjob.guess_num_tasks() + # Monkey patch `self._update_state` to simulate that the job is + # pending on the queue for enough time so it can be canceled due + # to exceeding the maximum pending time + @fixtures.switch_to_user_runtime + def test_submit_max_pending_time(self): + self.setup_user() + self.parallel_cmd = 'sleep 30' + self.prepare() + self.testjob.scheduler._update_state = self._update_state + self.testjob._max_pending_time = timedelta(seconds=5) + self.testjob.submit() + with pytest.raises(JobError): + self.testjob.wait() + class TestLocalJob(_TestJob, unittest.TestCase): def assertProcessDied(self, pid): @@ -321,6 +335,10 @@ def test_guess_num_tasks(self): self.testjob.wait() assert self.testjob.num_tasks == 1 + def test_submit_max_pending_time(self): + pytest.skip('the maximum pending time has no effect on the ' + 'local scheduler') + class TestSlurmJob(_TestJob, unittest.TestCase): @property @@ -439,20 +457,6 @@ def test_submit_job_array(self): assert all([re.search('Task id: 0', output), re.search('Task id: 1', output)]) - # Monkey patch `self._update_state` to simulate that the job is - # pending on the queue for enough time so it can be canceled due - # to exceeding the maximum pending time - @fixtures.switch_to_user_runtime - def test_submit_max_pending_time(self): - self.setup_user() - self.parallel_cmd = 'sleep 30' - self.prepare() - self.testjob.scheduler._update_state = self._update_state - self.testjob._max_pending_time = timedelta(seconds=5) - self.testjob.submit() - with pytest.raises(JobError): - self.testjob.wait() - class TestSqueueJob(TestSlurmJob): @property @@ -546,9 +550,12 @@ def test_prepare_no_cpus(self): assert self.expected_directives == found_directives def test_submit_timelimit(self): - # Skip this test for PBS, since we the minimum time limit is 1min + # Skip this test for PBS, since the minimum time limit is 1min pytest.skip("PBS minimum time limit is 60s") + def test_submit_max_pending_time(self): + pytest.skip('not implemented for the pbs scheduler') + class TestTorqueJob(TestPbsJob): @property @@ -581,20 +588,6 @@ def test_submit_timelimit(self): def _update_state(self, job): job.state = 'QUEUED' - # Monkey patch `self._update_state` to simulate that the job is - # pending on the queue for enough time so it can be canceled due - # to exceeding the maximum pending time - @fixtures.switch_to_user_runtime - def test_submit_max_pending_time(self): - self.setup_user() - self.parallel_cmd = 'sleep 30' - self.prepare() - self.testjob.scheduler._update_state = self._update_state - self.testjob._max_pending_time = timedelta(seconds=5) - self.testjob.submit() - with pytest.raises(JobError): - self.testjob.wait() - class TestSlurmFlexibleNodeAllocation(unittest.TestCase): def create_dummy_nodes(obj): From bb3a026e8ddaf85b83d08e6b65e7e9f454b90857 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 11 Mar 2020 20:31:45 +0100 Subject: [PATCH 10/10] Address PR comments Also: - Enable max_pending_time unit test for the Torque backend. --- unittests/test_schedulers.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 4b42f1fc2a..4c72f256f8 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -206,9 +206,10 @@ def test_submit_max_pending_time(self): self.parallel_cmd = 'sleep 30' self.prepare() self.testjob.scheduler._update_state = self._update_state - self.testjob._max_pending_time = timedelta(seconds=5) + self.testjob._max_pending_time = timedelta(milliseconds=50) self.testjob.submit() - with pytest.raises(JobError): + with pytest.raises(JobError, + match='maximum pending time exceeded'): self.testjob.wait() @@ -588,6 +589,9 @@ def test_submit_timelimit(self): def _update_state(self, job): job.state = 'QUEUED' + def test_submit_max_pending_time(self): + _TestJob.test_submit_max_pending_time(self) + class TestSlurmFlexibleNodeAllocation(unittest.TestCase): def create_dummy_nodes(obj):