Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reframe/core/systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def json(self):
'descr': self._descr,
'hostnames': self._hostnames,
'modules_system': self._modules_system.name,
'modules': [m.name for m in self._preload_env.modules],
'modules': [m for m in self._preload_env.modules],
'variables': [
[name, value]
for name, value in self._preload_env.variables.items()
Expand Down
4 changes: 4 additions & 0 deletions reframe/frontend/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import reframe.frontend.dependency as dependency
from reframe.core.exceptions import (AbortTaskError, JobNotStartedError,
ReframeForceExitError, TaskExit)
from reframe.core.schedulers.local import LocalJobScheduler
from reframe.frontend.printer import PrettyPrinter
from reframe.frontend.statistics import TestStats

Expand Down Expand Up @@ -452,6 +453,9 @@ def __init__(self):
self.printer = None
self.strict_check = False

# Local scheduler for running forced local jobs
self.local_scheduler = LocalJobScheduler()

# Scheduler options
self.sched_flex_alloc_nodes = None
self.sched_options = []
Expand Down
27 changes: 24 additions & 3 deletions reframe/frontend/executors/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,15 @@ def runcase(self, case):
task.compile()
task.compile_wait()
task.run()

# Pick the right scheduler
if task.check.local:
sched = self.local_scheduler
else:
sched = partition.scheduler

while True:
partition.scheduler.poll(task.check.job)
sched.poll(task.check.job)
if task.run_complete():
break

Expand Down Expand Up @@ -349,14 +356,28 @@ def runcase(self, case):
def _poll_tasks(self):
'''Update the counts of running checks per partition.'''

def split_jobs(tasks):
'''Split jobs into forced local and normal ones.'''
forced_local = []
normal = []
for t in tasks:
if t.check.local:
forced_local.append(t.check.job)
else:
normal.append(t.check.job)

return forced_local, normal

getlogger().debug('updating counts for running test cases')
for part in self._partitions:
partname = part.fullname
num_tasks = len(self._running_tasks[partname])
getlogger().debug(f'polling {num_tasks} task(s) in {partname!r}')
part.scheduler.poll(
*(task.check.job for task in self._running_tasks[partname])
forced_local_jobs, part_jobs = split_jobs(
self._running_tasks[partname]
)
part.scheduler.poll(*part_jobs)
self.local_scheduler.poll(*forced_local_jobs)

# Trigger notifications for finished jobs
for t in self._running_tasks[partname]:
Expand Down
9 changes: 9 additions & 0 deletions unittests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ def test_check_success(run_reframe, tmp_path, logfile):
assert os.path.exists(tmp_path / 'report.json')


def test_check_success_force_local(run_reframe, tmp_path, logfile):
# We explicitly use a system here with a non-local scheduler and pass the
# `--force-local` option
returncode, stdout, _ = run_reframe(system='testsys:gpu', local=True)
assert 'PASSED' in stdout
assert 'FAILED' not in stdout
assert returncode == 0


def test_report_file_with_sessionid(run_reframe, tmp_path):
returncode, stdout, _ = run_reframe(
more_options=[
Expand Down
16 changes: 14 additions & 2 deletions unittests/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def common_exec_ctx(temp_runtime):
yield from temp_runtime(fixtures.TEST_CONFIG_FILE, 'generic')


@pytest.fixture
def testsys_exec_ctx(temp_runtime):
yield from temp_runtime(fixtures.TEST_CONFIG_FILE, 'testsys:gpu')


@pytest.fixture(params=[policies.SerialExecutionPolicy,
policies.AsynchronousExecutionPolicy])
def make_runner(request):
Expand Down Expand Up @@ -230,15 +235,22 @@ def test_strict_performance_check(make_runner, make_cases, common_exec_ctx):
assert 1 == num_failures_stage(runner, 'cleanup')


def test_force_local_execution(make_runner, make_cases, common_exec_ctx):
# We explicitly ask for a system with a non-local scheduler here, to make sure
# that the execution policies behave correctly with forced local tests
def test_force_local_execution(make_runner, make_cases, testsys_exec_ctx):
runner = make_runner()
runner.policy.force_local = True
runner.runall(make_cases([HelloTest()]))
test = HelloTest()
test.valid_prog_environs = ['builtin-gcc']

runner.runall(make_cases([test]))
assert_runall(runner)
stats = runner.stats
for t in stats.tasks():
assert t.check.local

assert not stats.failures()


def test_kbd_interrupt_within_test(make_runner, make_cases, common_exec_ctx):
runner = make_runner()
Expand Down
8 changes: 4 additions & 4 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only):

t_grace = datetime.now()
minimal_job.cancel()
time.sleep(0.01)
time.sleep(0.1)
minimal_job.wait()
t_grace = datetime.now() - t_grace

Expand All @@ -530,7 +530,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only):

# Verify that the spawned sleep is killed, too, but back off a bit in
# order to allow the sleep process to wake up and get the signal
time.sleep(0.01)
time.sleep(0.1)
assert_process_died(sleep_pid)


Expand All @@ -556,7 +556,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only):

# Stall a bit here to let the the spawned process start and install its
# signal handler for SIGTERM
time.sleep(.1)
time.sleep(1)

t_grace = datetime.now()
minimal_job.cancel()
Expand All @@ -574,7 +574,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only):

# Verify that the spawned sleep is killed, too, but back off a bit in
# order to allow the sleep process to wake up and get the signal
time.sleep(0.01)
time.sleep(0.1)
assert_process_died(sleep_pid)


Expand Down