Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,6 @@
from reframe.core.logging import getlogger


class JobState:
def __init__(self, state):
self._state = state

def __repr__(self):
return debug.repr(self)

def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented

return self._state == other._state

def __str__(self):
return self._state


class Job(abc.ABC):
"""A job descriptor.

Expand All @@ -53,7 +36,7 @@ class Job(abc.ABC):

_jobid = fields.TypedField('_jobid', int, type(None))
_exitcode = fields.TypedField('_exitcode', int, type(None))
_state = fields.TypedField('_state', JobState, type(None))
_state = fields.TypedField('_state', str, type(None))

# The sched_* arguments are exposed also to the frontend
def __init__(self,
Expand Down
16 changes: 3 additions & 13 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@
from reframe.core.schedulers.registry import register_scheduler


# Local job states
class LocalJobState(sched.JobState):
pass


LOCAL_JOB_SUCCESS = LocalJobState('SUCCESS')
LOCAL_JOB_FAILURE = LocalJobState('FAILURE')
LOCAL_JOB_TIMEOUT = LocalJobState('TIMEOUT')


class _TimeoutExpired(ReframeError):
pass

Expand Down Expand Up @@ -153,12 +143,12 @@ def wait(self):
self._wait_all(timeout)
self._exitcode = self._proc.returncode
if self._exitcode != 0:
self._state = LOCAL_JOB_FAILURE
self._state = 'FAILURE'
else:
self._state = LOCAL_JOB_SUCCESS
self._state = 'SUCCESS'
except (_TimeoutExpired, subprocess.TimeoutExpired):
getlogger().debug('job timed out')
self._state = LOCAL_JOB_TIMEOUT
self._state = 'TIMEOUT'
finally:
# Cleanup all the processes of this job
self._kill_all()
Expand Down
79 changes: 42 additions & 37 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,39 @@
from reframe.core.schedulers.registry import register_scheduler


class SlurmJobState(sched.JobState):
pass


# Slurm Job states
SLURM_JOB_BOOT_FAIL = SlurmJobState('BOOT_FAIL')
SLURM_JOB_CANCELLED = SlurmJobState('CANCELLED')
SLURM_JOB_COMPLETED = SlurmJobState('COMPLETED')
SLURM_JOB_CONFIGURING = SlurmJobState('CONFIGURING')
SLURM_JOB_COMPLETING = SlurmJobState('COMPLETING')
SLURM_JOB_FAILED = SlurmJobState('FAILED')
SLURM_JOB_NODE_FAILED = SlurmJobState('NODE_FAILED')
SLURM_JOB_PENDING = SlurmJobState('PENDING')
SLURM_JOB_PREEMPTED = SlurmJobState('PREEMPTED')
SLURM_JOB_RESIZING = SlurmJobState('RESIZING')
SLURM_JOB_RUNNING = SlurmJobState('RUNNING')
SLURM_JOB_SUSPENDED = SlurmJobState('SUSPENDED')
SLURM_JOB_TIMEOUT = SlurmJobState('TIMEOUT')
def slurm_state_completed(state):
completion_states = {
'BOOT_FAIL',
'CANCELLED',
'COMPLETED',
'CONFIGURING',
'COMPLETING',
'DEADLINE',
'FAILED',
'NODE_FAIL',
'OUT_OF_MEMORY',
'PREEMPTED',
'TIMEOUT',
}
return state in completion_states


def slurm_state_pending(state):
pending_states = {
'PENDING',
'RESV_DEL_HOLD',
'REQUEUE_FED',
'REQUEUE_HOLD',
'REQUEUED',
'RESIZING',
'REVOKED',
'SIGNALING',
'SPECIAL_EXIT',
'STAGE_OUT',
'STOPPED',
'SUSPENDED',
}
return state in pending_states


_run_strict = functools.partial(os_ext.run_command, check=True)
Expand All @@ -49,15 +64,6 @@ class SlurmJob(sched.Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._prefix = '#SBATCH'
self._completion_states = [SLURM_JOB_BOOT_FAIL,
SLURM_JOB_CANCELLED,
SLURM_JOB_COMPLETED,
SLURM_JOB_FAILED,
SLURM_JOB_NODE_FAILED,
SLURM_JOB_PREEMPTED,
SLURM_JOB_TIMEOUT]
self._pending_states = [SLURM_JOB_CONFIGURING,
SLURM_JOB_PENDING]

# Reasons to cancel a pending job: if the job is expected to remain
# pending for a much longer time then usual (mostly if a sysadmin
Expand Down Expand Up @@ -277,17 +283,17 @@ def _update_state(self):
completed.stdout)
return

self._state = SlurmJobState(state_match.group('state'))
self._state = state_match.group('state')
if not self._update_state_count % SlurmJob.SACCT_SQUEUE_RATIO:
self._cancel_if_blocked()

if self._state in self._completion_states:
if slurm_state_completed(self._state):
self._exitcode = int(state_match.group('exitcode'))

self._set_nodelist(state_match.group('nodespec'))

def _cancel_if_blocked(self):
if self._is_cancelling or self._state not in self._pending_states:
if self._is_cancelling or not slurm_state_pending(self._state):
return

completed = _run_strict('squeue -h -j %s -o %%r' % self._jobid)
Expand Down Expand Up @@ -340,12 +346,12 @@ def wait(self):
super().wait()

# Quickly return in case we have finished already
if self._state in self._completion_states:
if slurm_state_completed(self._state):
return

intervals = itertools.cycle(settings().job_poll_intervals)
self._update_state()
while self._state not in self._completion_states:
while not slurm_state_completed(self._state):
time.sleep(next(intervals))
self._update_state()

Expand All @@ -369,7 +375,7 @@ def finished(self):
getlogger().debug('ignoring error during polling: %s' % e)
return False
else:
return self._state in self._completion_states
return slurm_state_completed(self._state)


@register_scheduler('squeue')
Expand Down Expand Up @@ -401,18 +407,17 @@ def _update_state(self):
r'(?P<reason>.+)', completed.stdout)
if state_match is None:
# Assume that job has finished
self._state = (SLURM_JOB_CANCELLED if self._cancelled
else SLURM_JOB_COMPLETED)
self._state = 'CANCELLED' if self._cancelled else 'COMPLETED'

# Set exit code manually, if not set already by the polling
if self._exitcode is None:
self._exitcode = 0

return

self._state = SlurmJobState(state_match.group('state'))
self._state = state_match.group('state')
self._set_nodelist(state_match.group('nodespec'))
if not self._is_cancelling and self._state in self._pending_states:
if not self._is_cancelling and slurm_state_pending(self._state):
self._check_and_cancel(state_match.group('reason'))

def cancel(self):
Expand Down
16 changes: 4 additions & 12 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,8 @@ def test_submit(self):
self.assertEqual([socket.gethostname()], self.testjob.nodelist)

def test_submit_timelimit(self):
from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT

super().test_submit_timelimit()
self.assertEqual(self.testjob.state, LOCAL_JOB_TIMEOUT)
self.assertEqual(self.testjob.state, 'TIMEOUT')

def test_cancel_with_grace(self):
# This test emulates a spawned process that ignores the SIGTERM signal
Expand All @@ -223,8 +221,6 @@ def test_cancel_with_grace(self):
# killed immediately after the grace period of 2 seconds expires.
#
# We also check that the additional spawned process is also killed.
from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT

self.parallel_cmd = 'sleep 5 &'
self.pre_run = ['trap -- "" TERM']
self.post_run = ['echo $!', 'wait']
Expand All @@ -249,7 +245,7 @@ def test_cancel_with_grace(self):

self.assertGreaterEqual(t_grace.total_seconds(), 2)
self.assertLess(t_grace.total_seconds(), 5)
self.assertEqual(LOCAL_JOB_TIMEOUT, self.testjob.state)
self.assertEqual(self.testjob.state, 'TIMEOUT')

# Verify that the spawned sleep is killed, too
self.assertProcessDied(sleep_pid)
Expand All @@ -266,8 +262,6 @@ def test_cancel_term_ignore(self):
# spawned sleep will ignore it. We need to make sure that our
# implementation grants the sleep process a grace period and then
# kills it.
from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT

self.pre_run = []
self.post_run = []
self.parallel_cmd = os.path.join(fixtures.TEST_RESOURCES_CHECKS,
Expand All @@ -290,7 +284,7 @@ def test_cancel_term_ignore(self):
sleep_pid = int(f.read())

self.assertGreaterEqual(t_grace.total_seconds(), 2)
self.assertEqual(LOCAL_JOB_TIMEOUT, self.testjob.state)
self.assertEqual(self.testjob.state, 'TIMEOUT')

# Verify that the spawned sleep is killed, too
self.assertProcessDied(sleep_pid)
Expand Down Expand Up @@ -384,10 +378,8 @@ def test_submit_timelimit(self):
self.skipTest("SLURM's minimum time limit is 60s")

def test_cancel(self):
from reframe.core.schedulers.slurm import SLURM_JOB_CANCELLED

super().test_cancel()
self.assertEqual(self.testjob.state, SLURM_JOB_CANCELLED)
self.assertEqual(self.testjob.state, 'CANCELLED')

def test_guess_num_tasks(self):
self.testjob._num_tasks = 0
Expand Down