Skip to content
20 changes: 17 additions & 3 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ class RegressionTest(metaclass=RegressionTestMeta):
#:
#: :type: integral or :class:`None`
#: :default: :class:`None`
num_tasks_per_core = fields.TypedField('num_tasks_per_core',
int, type(None))
num_tasks_per_core = fields.TypedField('num_tasks_per_core',
int, type(None))

#: Number of tasks per socket required by this test.
#:
Expand All @@ -450,6 +450,18 @@ class RegressionTest(metaclass=RegressionTestMeta):
use_multithreading = fields.TypedField('use_multithreading',
bool, type(None))

#: The maximum time a job can be pending before starting running.
#:
#: Time duration is specified as of the :attr:`time_limit` attribute.
#:
#: :type: :class:`str` or :class:`datetime.timedelta``
#: :default: :class:`None
#:
#: .. note::
#: .. versionchanged:: 3.0
#:
max_pending_time = fields.TimerField('max_pending_time', type(None))

#: Specify whether this test needs exclusive access to nodes.
#:
#: :type: boolean
Expand Down Expand Up @@ -714,6 +726,7 @@ def _rfm_init(self, name=None, prefix=None):
self.num_tasks_per_socket = None
self.use_multithreading = None
self.exclusive_access = False
self.max_pending_time = None

# True only if check is to be run locally
self.local = False
Expand Down Expand Up @@ -996,6 +1009,7 @@ def _setup_job(self, **job_opts):
launcher_type(),
name='rfm_%s_job' % self.name,
workdir=self._stagedir,
max_pending_time=self.max_pending_time,
sched_access=self._current_partition.access,
sched_exclusive_access=self.exclusive_access,
**job_opts)
Expand Down Expand Up @@ -1286,7 +1300,7 @@ def check_performance(self):

with os_ext.change_dir(self._stagedir):
# Check if default reference perf values are provided and
# store all the variables tested in the performance check
# store all the variables tested in the performance check
has_default = False
variables = set()
for key, ref in self.reference.items():
Expand Down
6 changes: 6 additions & 0 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __init__(self,
script_filename=None,
stdout=None,
stderr=None,
max_pending_time=None,
sched_flex_alloc_nodes=None,
sched_access=[],
sched_account=None,
Expand Down Expand Up @@ -201,6 +202,7 @@ def __init__(self,
self._script_filename = script_filename or '%s.sh' % name
self._stdout = stdout or '%s.out' % name
self._stderr = stderr or '%s.err' % name
self._max_pending_time = max_pending_time
self._completion_time = None

# Backend scheduler related information
Expand Down Expand Up @@ -228,6 +230,10 @@ def name(self):
def workdir(self):
return self._workdir

@property
def max_pending_time(self):
return self._max_pending_time

@property
def script_filename(self):
return self._script_filename
Expand Down
2 changes: 2 additions & 0 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def submit(self, job):
if info:
self._pbs_server = info[0]

self._submit_time = datetime.now()

def wait(self, job):
intervals = itertools.cycle(settings().job_poll_intervals)
while not self.finished(job):
Expand Down
18 changes: 14 additions & 4 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def submit(self, job):
'could not retrieve the job id of the submitted job')

job.jobid = int(jobid_match.group('jobid'))
self._submit_time = datetime.now()

def allnodes(self):
try:
Expand Down Expand Up @@ -418,7 +419,14 @@ def wait(self, job):

intervals = itertools.cycle(settings().job_poll_intervals)
self._update_state(job)

while not slurm_state_completed(job.state):
if job.max_pending_time and slurm_state_pending(job.state):
if datetime.now() - self._submit_time >= job.max_pending_time:
self.cancel(job)
raise JobError('maximum pending time exceeded',
jobid=job.jobid)

time.sleep(next(intervals))
self._update_state(job)

Expand All @@ -443,6 +451,12 @@ def finished(self, job):
getlogger().debug('ignoring error during polling: %s' % e)
return False
else:
if job.max_pending_time and slurm_state_pending(job.state):
if datetime.now() - self._submit_time >= job.max_pending_time:
self.cancel(job)
raise JobError('maximum pending time exceeded',
jobid=job.jobid)

return slurm_state_completed(job.state)

def is_array(self, job):
Expand Down Expand Up @@ -473,10 +487,6 @@ def __init__(self):
def completion_time(self, job):
return None

def submit(self, job):
super().submit(job)
self._submit_time = datetime.now()

def _update_state(self, job):
time_from_submit = datetime.now() - self._submit_time
rem_wait = self._squeue_delay - time_from_submit.total_seconds()
Expand Down
9 changes: 9 additions & 0 deletions reframe/core/schedulers/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#

import re
from datetime import datetime

import reframe.utility.os_ext as os_ext
from reframe.core.config import settings
Expand Down Expand Up @@ -83,4 +84,12 @@ def finished(self, job):
getlogger().debug('ignoring error during polling: %s' % e)
return False
else:
if job.max_pending_time and job.state in ['QUEUED',
'HELD',
'WAITING']:
if datetime.now() - self._submit_time >= job.max_pending_time:
self.cancel(job)
raise JobError('maximum pending time exceeded',
jobid=job.jobid)

return job.state == 'COMPLETED'
35 changes: 33 additions & 2 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import tempfile
import time
import unittest
from datetime import datetime
from datetime import datetime, timedelta

import reframe.core.runtime as rt
import reframe.utility.os_ext as os_ext
Expand Down Expand Up @@ -197,6 +197,21 @@ def test_guess_num_tasks(self):
with pytest.raises(NotImplementedError):
self.testjob.guess_num_tasks()

# Monkey patch `self._update_state` to simulate that the job is
# pending on the queue for enough time so it can be canceled due
# to exceeding the maximum pending time
@fixtures.switch_to_user_runtime
def test_submit_max_pending_time(self):
self.setup_user()
self.parallel_cmd = 'sleep 30'
self.prepare()
self.testjob.scheduler._update_state = self._update_state
self.testjob._max_pending_time = timedelta(milliseconds=50)
self.testjob.submit()
with pytest.raises(JobError,
match='maximum pending time exceeded'):
self.testjob.wait()


class TestLocalJob(_TestJob, unittest.TestCase):
def assertProcessDied(self, pid):
Expand Down Expand Up @@ -321,6 +336,10 @@ def test_guess_num_tasks(self):
self.testjob.wait()
assert self.testjob.num_tasks == 1

def test_submit_max_pending_time(self):
pytest.skip('the maximum pending time has no effect on the '
'local scheduler')


class TestSlurmJob(_TestJob, unittest.TestCase):
@property
Expand All @@ -338,6 +357,9 @@ def sched_configured(self):
def setup_user(self, msg=None):
super().setup_user(msg='SLURM (with sacct) not configured')

def _update_state(self, job):
job.state = 'PENDING'

def test_prepare(self):
self.setup_job()
super().test_prepare()
Expand Down Expand Up @@ -529,9 +551,12 @@ def test_prepare_no_cpus(self):
assert self.expected_directives == found_directives

def test_submit_timelimit(self):
# Skip this test for PBS, since we the minimum time limit is 1min
# Skip this test for PBS, since the minimum time limit is 1min
pytest.skip("PBS minimum time limit is 60s")

def test_submit_max_pending_time(self):
pytest.skip('not implemented for the pbs scheduler')


class TestTorqueJob(TestPbsJob):
@property
Expand Down Expand Up @@ -561,6 +586,12 @@ def test_submit_timelimit(self):
# Skip this test for PBS, since we the minimum time limit is 1min
pytest.skip("Torque minimum time limit is 60s")

def _update_state(self, job):
job.state = 'QUEUED'

def test_submit_max_pending_time(self):
_TestJob.test_submit_max_pending_time(self)


class TestSlurmFlexibleNodeAllocation(unittest.TestCase):
def create_dummy_nodes(obj):
Expand Down