diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index a5b3d27453..089026126b 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -14,23 +14,6 @@ from reframe.core.logging import getlogger -class JobState: - def __init__(self, state): - self._state = state - - def __repr__(self): - return debug.repr(self) - - def __eq__(self, other): - if not isinstance(other, type(self)): - return NotImplemented - - return self._state == other._state - - def __str__(self): - return self._state - - class Job(abc.ABC): """A job descriptor. @@ -53,7 +36,7 @@ class Job(abc.ABC): _jobid = fields.TypedField('_jobid', int, type(None)) _exitcode = fields.TypedField('_exitcode', int, type(None)) - _state = fields.TypedField('_state', JobState, type(None)) + _state = fields.TypedField('_state', str, type(None)) # The sched_* arguments are exposed also to the frontend def __init__(self, diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index b2d4ac0612..673e151882 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -13,16 +13,6 @@ from reframe.core.schedulers.registry import register_scheduler -# Local job states -class LocalJobState(sched.JobState): - pass - - -LOCAL_JOB_SUCCESS = LocalJobState('SUCCESS') -LOCAL_JOB_FAILURE = LocalJobState('FAILURE') -LOCAL_JOB_TIMEOUT = LocalJobState('TIMEOUT') - - class _TimeoutExpired(ReframeError): pass @@ -153,12 +143,12 @@ def wait(self): self._wait_all(timeout) self._exitcode = self._proc.returncode if self._exitcode != 0: - self._state = LOCAL_JOB_FAILURE + self._state = 'FAILURE' else: - self._state = LOCAL_JOB_SUCCESS + self._state = 'SUCCESS' except (_TimeoutExpired, subprocess.TimeoutExpired): getlogger().debug('job timed out') - self._state = LOCAL_JOB_TIMEOUT + self._state = 'TIMEOUT' finally: # Cleanup all the processes of this job self._kill_all() diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 321da8b519..9da8edd98b 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -14,24 +14,39 @@ from reframe.core.schedulers.registry import register_scheduler -class SlurmJobState(sched.JobState): - pass - - -# Slurm Job states -SLURM_JOB_BOOT_FAIL = SlurmJobState('BOOT_FAIL') -SLURM_JOB_CANCELLED = SlurmJobState('CANCELLED') -SLURM_JOB_COMPLETED = SlurmJobState('COMPLETED') -SLURM_JOB_CONFIGURING = SlurmJobState('CONFIGURING') -SLURM_JOB_COMPLETING = SlurmJobState('COMPLETING') -SLURM_JOB_FAILED = SlurmJobState('FAILED') -SLURM_JOB_NODE_FAILED = SlurmJobState('NODE_FAILED') -SLURM_JOB_PENDING = SlurmJobState('PENDING') -SLURM_JOB_PREEMPTED = SlurmJobState('PREEMPTED') -SLURM_JOB_RESIZING = SlurmJobState('RESIZING') -SLURM_JOB_RUNNING = SlurmJobState('RUNNING') -SLURM_JOB_SUSPENDED = SlurmJobState('SUSPENDED') -SLURM_JOB_TIMEOUT = SlurmJobState('TIMEOUT') +def slurm_state_completed(state): + completion_states = { + 'BOOT_FAIL', + 'CANCELLED', + 'COMPLETED', + 'CONFIGURING', + 'COMPLETING', + 'DEADLINE', + 'FAILED', + 'NODE_FAIL', + 'OUT_OF_MEMORY', + 'PREEMPTED', + 'TIMEOUT', + } + return state in completion_states + + +def slurm_state_pending(state): + pending_states = { + 'PENDING', + 'RESV_DEL_HOLD', + 'REQUEUE_FED', + 'REQUEUE_HOLD', + 'REQUEUED', + 'RESIZING', + 'REVOKED', + 'SIGNALING', + 'SPECIAL_EXIT', + 'STAGE_OUT', + 'STOPPED', + 'SUSPENDED', + } + return state in pending_states _run_strict = functools.partial(os_ext.run_command, check=True) @@ -49,15 +64,6 @@ class SlurmJob(sched.Job): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._prefix = '#SBATCH' - self._completion_states = [SLURM_JOB_BOOT_FAIL, - SLURM_JOB_CANCELLED, - SLURM_JOB_COMPLETED, - SLURM_JOB_FAILED, - SLURM_JOB_NODE_FAILED, - SLURM_JOB_PREEMPTED, - SLURM_JOB_TIMEOUT] - self._pending_states = [SLURM_JOB_CONFIGURING, - SLURM_JOB_PENDING] # Reasons to cancel a pending job: if the job is expected to remain # pending for a much longer time then usual (mostly if a sysadmin @@ -277,17 +283,17 @@ def _update_state(self): completed.stdout) return - self._state = SlurmJobState(state_match.group('state')) + self._state = state_match.group('state') if not self._update_state_count % SlurmJob.SACCT_SQUEUE_RATIO: self._cancel_if_blocked() - if self._state in self._completion_states: + if slurm_state_completed(self._state): self._exitcode = int(state_match.group('exitcode')) self._set_nodelist(state_match.group('nodespec')) def _cancel_if_blocked(self): - if self._is_cancelling or self._state not in self._pending_states: + if self._is_cancelling or not slurm_state_pending(self._state): return completed = _run_strict('squeue -h -j %s -o %%r' % self._jobid) @@ -340,12 +346,12 @@ def wait(self): super().wait() # Quickly return in case we have finished already - if self._state in self._completion_states: + if slurm_state_completed(self._state): return intervals = itertools.cycle(settings().job_poll_intervals) self._update_state() - while self._state not in self._completion_states: + while not slurm_state_completed(self._state): time.sleep(next(intervals)) self._update_state() @@ -369,7 +375,7 @@ def finished(self): getlogger().debug('ignoring error during polling: %s' % e) return False else: - return self._state in self._completion_states + return slurm_state_completed(self._state) @register_scheduler('squeue') @@ -401,8 +407,7 @@ def _update_state(self): r'(?P.+)', completed.stdout) if state_match is None: # Assume that job has finished - self._state = (SLURM_JOB_CANCELLED if self._cancelled - else SLURM_JOB_COMPLETED) + self._state = 'CANCELLED' if self._cancelled else 'COMPLETED' # Set exit code manually, if not set already by the polling if self._exitcode is None: @@ -410,9 +415,9 @@ def _update_state(self): return - self._state = SlurmJobState(state_match.group('state')) + self._state = state_match.group('state') self._set_nodelist(state_match.group('nodespec')) - if not self._is_cancelling and self._state in self._pending_states: + if not self._is_cancelling and slurm_state_pending(self._state): self._check_and_cancel(state_match.group('reason')) def cancel(self): diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 71dd558642..b13d62f84a 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -206,10 +206,8 @@ def test_submit(self): self.assertEqual([socket.gethostname()], self.testjob.nodelist) def test_submit_timelimit(self): - from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT - super().test_submit_timelimit() - self.assertEqual(self.testjob.state, LOCAL_JOB_TIMEOUT) + self.assertEqual(self.testjob.state, 'TIMEOUT') def test_cancel_with_grace(self): # This test emulates a spawned process that ignores the SIGTERM signal @@ -223,8 +221,6 @@ def test_cancel_with_grace(self): # killed immediately after the grace period of 2 seconds expires. # # We also check that the additional spawned process is also killed. - from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT - self.parallel_cmd = 'sleep 5 &' self.pre_run = ['trap -- "" TERM'] self.post_run = ['echo $!', 'wait'] @@ -249,7 +245,7 @@ def test_cancel_with_grace(self): self.assertGreaterEqual(t_grace.total_seconds(), 2) self.assertLess(t_grace.total_seconds(), 5) - self.assertEqual(LOCAL_JOB_TIMEOUT, self.testjob.state) + self.assertEqual(self.testjob.state, 'TIMEOUT') # Verify that the spawned sleep is killed, too self.assertProcessDied(sleep_pid) @@ -266,8 +262,6 @@ def test_cancel_term_ignore(self): # spawned sleep will ignore it. We need to make sure that our # implementation grants the sleep process a grace period and then # kills it. - from reframe.core.schedulers.local import LOCAL_JOB_TIMEOUT - self.pre_run = [] self.post_run = [] self.parallel_cmd = os.path.join(fixtures.TEST_RESOURCES_CHECKS, @@ -290,7 +284,7 @@ def test_cancel_term_ignore(self): sleep_pid = int(f.read()) self.assertGreaterEqual(t_grace.total_seconds(), 2) - self.assertEqual(LOCAL_JOB_TIMEOUT, self.testjob.state) + self.assertEqual(self.testjob.state, 'TIMEOUT') # Verify that the spawned sleep is killed, too self.assertProcessDied(sleep_pid) @@ -384,10 +378,8 @@ def test_submit_timelimit(self): self.skipTest("SLURM's minimum time limit is 60s") def test_cancel(self): - from reframe.core.schedulers.slurm import SLURM_JOB_CANCELLED - super().test_cancel() - self.assertEqual(self.testjob.state, SLURM_JOB_CANCELLED) + self.assertEqual(self.testjob.state, 'CANCELLED') def test_guess_num_tasks(self): self.testjob._num_tasks = 0