Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def prepare_job(job, command='hostname',
pre_run = pre_run or ['echo prerun']
post_run = post_run or ['echo postrun']
prepare_cmds = prepare_cmds or ['echo prepare']
with rt.module_use('unittests/modules'):
with rt.module_use(test_util.TEST_MODULES):
job.prepare(
[
*pre_run,
Expand All @@ -127,6 +127,11 @@ def prepare_job(job, command='hostname',
)


def submit_job(job):
with rt.module_use(test_util.TEST_MODULES):
job.submit()


def assert_job_script_sanity(job):
'''Assert the sanity of the produced script file.'''
with open(job.script_filename) as fp:
Expand Down Expand Up @@ -386,7 +391,7 @@ def test_submit(make_job, exec_ctx):
minimal_job = make_job(sched_access=exec_ctx.access)
prepare_job(minimal_job)
assert minimal_job.nodelist is None
minimal_job.submit()
submit_job(minimal_job)
assert minimal_job.jobid is not None
minimal_job.wait()

Expand All @@ -401,14 +406,19 @@ def test_submit(make_job, exec_ctx):
num_tasks_per_node = minimal_job.num_tasks_per_node or 1
num_nodes = minimal_job.num_tasks // num_tasks_per_node
assert num_nodes == len(minimal_job.nodelist)
assert 0 == minimal_job.exitcode

# Handle the case where the exitcode was not reported by the scheduler
assert minimal_job.exitcode is None or 0 == minimal_job.exitcode

with open(minimal_job.stderr) as stderr:
assert not stderr.read().strip()


def test_submit_timelimit(minimal_job, local_only):
minimal_job.time_limit = '2s'
prepare_job(minimal_job, 'sleep 10')
t_job = time.time()
minimal_job.submit()
submit_job(minimal_job)
assert minimal_job.jobid is not None
minimal_job.wait()
t_job = time.time() - t_job
Expand All @@ -430,7 +440,7 @@ def test_submit_job_array(make_job, slurm_only, exec_ctx):
job = make_job(sched_access=exec_ctx.access)
job.options = ['--array=0-1']
prepare_job(job, command='echo "Task id: ${SLURM_ARRAY_TASK_ID}"')
job.submit()
submit_job(job)
job.wait()
if job.scheduler.registered_name == 'slurm':
assert job.exitcode == 0
Expand All @@ -445,7 +455,7 @@ def test_cancel(make_job, exec_ctx):
prepare_job(minimal_job, 'sleep 30')
t_job = time.time()

minimal_job.submit()
submit_job(minimal_job)
minimal_job.cancel()

# We give some time to the local scheduler for the TERM signal to be
Expand Down Expand Up @@ -484,7 +494,7 @@ def test_wait_before_submit(minimal_job):
def test_finished(make_job, exec_ctx):
minimal_job = make_job(sched_access=exec_ctx.access)
prepare_job(minimal_job, 'sleep 2')
minimal_job.submit()
submit_job(minimal_job)
assert not minimal_job.finished()
minimal_job.wait()

Expand All @@ -498,7 +508,7 @@ def test_finished_before_submit(minimal_job):
def test_finished_raises_error(make_job, exec_ctx):
minimal_job = make_job(sched_access=exec_ctx.access)
prepare_job(minimal_job, 'echo hello')
minimal_job.submit()
submit_job(minimal_job)
minimal_job.wait()

# Emulate an error during polling and verify that it is raised correctly
Expand Down Expand Up @@ -555,7 +565,7 @@ def test_guess_num_tasks(minimal_job, scheduler):
minimal_job.num_tasks = 0
minimal_job._sched_flex_alloc_nodes = 'idle'
prepare_job(minimal_job)
minimal_job.submit()
submit_job(minimal_job)
minimal_job.wait()
assert minimal_job.num_tasks == 1
elif scheduler.registered_name in ('slurm', 'squeue'):
Expand Down Expand Up @@ -596,7 +606,7 @@ def state(self):

type(minimal_job).state = property(state)
prepare_job(minimal_job, 'sleep 30')
minimal_job.submit()
submit_job(minimal_job)
with pytest.raises(JobError,
match='maximum pending time exceeded'):
minimal_job.wait()
Expand Down Expand Up @@ -649,7 +659,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only):
pre_run=['trap -- "" TERM'],
post_run=['echo $!', 'wait'],
prepare_cmds=[''])
minimal_job.submit()
submit_job(minimal_job)

# Stall a bit here to let the the spawned process start and install its
# signal handler for SIGTERM
Expand Down Expand Up @@ -696,7 +706,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only):
pre_run=[''],
post_run=[''],
prepare_cmds=[''])
minimal_job.submit()
submit_job(minimal_job)

# Stall a bit here to let the the spawned process start and install its
# signal handler for SIGTERM
Expand Down