diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 1e804958c5..af4773e8a5 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -429,8 +429,8 @@ class RegressionTest(metaclass=RegressionTestMeta): #: #: :type: integral or :class:`None` #: :default: :class:`None` - num_tasks_per_core = fields.TypedField('num_tasks_per_core', - int, type(None)) + num_tasks_per_core = fields.TypedField('num_tasks_per_core', + int, type(None)) #: Number of tasks per socket required by this test. #: @@ -450,6 +450,18 @@ class RegressionTest(metaclass=RegressionTestMeta): use_multithreading = fields.TypedField('use_multithreading', bool, type(None)) + #: The maximum time a job can be pending before starting running. + #: + #: Time duration is specified as of the :attr:`time_limit` attribute. + #: + #: :type: :class:`str` or :class:`datetime.timedelta`` + #: :default: :class:`None + #: + #: .. note:: + #: .. versionchanged:: 3.0 + #: + max_pending_time = fields.TimerField('max_pending_time', type(None)) + #: Specify whether this test needs exclusive access to nodes. #: #: :type: boolean @@ -714,6 +726,7 @@ def _rfm_init(self, name=None, prefix=None): self.num_tasks_per_socket = None self.use_multithreading = None self.exclusive_access = False + self.max_pending_time = None # True only if check is to be run locally self.local = False @@ -996,6 +1009,7 @@ def _setup_job(self, **job_opts): launcher_type(), name='rfm_%s_job' % self.name, workdir=self._stagedir, + max_pending_time=self.max_pending_time, sched_access=self._current_partition.access, sched_exclusive_access=self.exclusive_access, **job_opts) @@ -1286,7 +1300,7 @@ def check_performance(self): with os_ext.change_dir(self._stagedir): # Check if default reference perf values are provided and - # store all the variables tested in the performance check + # store all the variables tested in the performance check has_default = False variables = set() for key, ref in self.reference.items(): diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index e04e9f384f..47d4e85c9f 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -169,6 +169,7 @@ def __init__(self, script_filename=None, stdout=None, stderr=None, + max_pending_time=None, sched_flex_alloc_nodes=None, sched_access=[], sched_account=None, @@ -201,6 +202,7 @@ def __init__(self, self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name + self._max_pending_time = max_pending_time self._completion_time = None # Backend scheduler related information @@ -228,6 +230,10 @@ def name(self): def workdir(self): return self._workdir + @property + def max_pending_time(self): + return self._max_pending_time + @property def script_filename(self): return self._script_filename diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index de62e5a6b4..74310d8247 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -122,6 +122,8 @@ def submit(self, job): if info: self._pbs_server = info[0] + self._submit_time = datetime.now() + def wait(self, job): intervals = itertools.cycle(settings().job_poll_intervals) while not self.finished(job): diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 7dec0c4666..20f53c51b0 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -195,6 +195,7 @@ def submit(self, job): 'could not retrieve the job id of the submitted job') job.jobid = int(jobid_match.group('jobid')) + self._submit_time = datetime.now() def allnodes(self): try: @@ -418,7 +419,14 @@ def wait(self, job): intervals = itertools.cycle(settings().job_poll_intervals) self._update_state(job) + while not slurm_state_completed(job.state): + if job.max_pending_time and slurm_state_pending(job.state): + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) + time.sleep(next(intervals)) self._update_state(job) @@ -443,6 +451,12 @@ def finished(self, job): getlogger().debug('ignoring error during polling: %s' % e) return False else: + if job.max_pending_time and slurm_state_pending(job.state): + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) + return slurm_state_completed(job.state) def is_array(self, job): @@ -473,10 +487,6 @@ def __init__(self): def completion_time(self, job): return None - def submit(self, job): - super().submit(job) - self._submit_time = datetime.now() - def _update_state(self, job): time_from_submit = datetime.now() - self._submit_time rem_wait = self._squeue_delay - time_from_submit.total_seconds() diff --git a/reframe/core/schedulers/torque.py b/reframe/core/schedulers/torque.py index 07aba34214..47c66afe36 100644 --- a/reframe/core/schedulers/torque.py +++ b/reframe/core/schedulers/torque.py @@ -9,6 +9,7 @@ # import re +from datetime import datetime import reframe.utility.os_ext as os_ext from reframe.core.config import settings @@ -83,4 +84,12 @@ def finished(self, job): getlogger().debug('ignoring error during polling: %s' % e) return False else: + if job.max_pending_time and job.state in ['QUEUED', + 'HELD', + 'WAITING']: + if datetime.now() - self._submit_time >= job.max_pending_time: + self.cancel(job) + raise JobError('maximum pending time exceeded', + jobid=job.jobid) + return job.state == 'COMPLETED' diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 2acd9039cb..4c72f256f8 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -11,7 +11,7 @@ import tempfile import time import unittest -from datetime import datetime +from datetime import datetime, timedelta import reframe.core.runtime as rt import reframe.utility.os_ext as os_ext @@ -197,6 +197,21 @@ def test_guess_num_tasks(self): with pytest.raises(NotImplementedError): self.testjob.guess_num_tasks() + # Monkey patch `self._update_state` to simulate that the job is + # pending on the queue for enough time so it can be canceled due + # to exceeding the maximum pending time + @fixtures.switch_to_user_runtime + def test_submit_max_pending_time(self): + self.setup_user() + self.parallel_cmd = 'sleep 30' + self.prepare() + self.testjob.scheduler._update_state = self._update_state + self.testjob._max_pending_time = timedelta(milliseconds=50) + self.testjob.submit() + with pytest.raises(JobError, + match='maximum pending time exceeded'): + self.testjob.wait() + class TestLocalJob(_TestJob, unittest.TestCase): def assertProcessDied(self, pid): @@ -321,6 +336,10 @@ def test_guess_num_tasks(self): self.testjob.wait() assert self.testjob.num_tasks == 1 + def test_submit_max_pending_time(self): + pytest.skip('the maximum pending time has no effect on the ' + 'local scheduler') + class TestSlurmJob(_TestJob, unittest.TestCase): @property @@ -338,6 +357,9 @@ def sched_configured(self): def setup_user(self, msg=None): super().setup_user(msg='SLURM (with sacct) not configured') + def _update_state(self, job): + job.state = 'PENDING' + def test_prepare(self): self.setup_job() super().test_prepare() @@ -529,9 +551,12 @@ def test_prepare_no_cpus(self): assert self.expected_directives == found_directives def test_submit_timelimit(self): - # Skip this test for PBS, since we the minimum time limit is 1min + # Skip this test for PBS, since the minimum time limit is 1min pytest.skip("PBS minimum time limit is 60s") + def test_submit_max_pending_time(self): + pytest.skip('not implemented for the pbs scheduler') + class TestTorqueJob(TestPbsJob): @property @@ -561,6 +586,12 @@ def test_submit_timelimit(self): # Skip this test for PBS, since we the minimum time limit is 1min pytest.skip("Torque minimum time limit is 60s") + def _update_state(self, job): + job.state = 'QUEUED' + + def test_submit_max_pending_time(self): + _TestJob.test_submit_max_pending_time(self) + class TestSlurmFlexibleNodeAllocation(unittest.TestCase): def create_dummy_nodes(obj):