Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5f06bfa
Add pin_nodes attribute to Job
ekouts Feb 4, 2022
f943d19
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Feb 16, 2022
06cd315
Remove forgotten print
ekouts Feb 17, 2022
ea2891b
Add special parameter with all the nodes
ekouts Mar 2, 2022
ba05735
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 2, 2022
82b9963
Fix small typo
ekouts Mar 2, 2022
a377f57
Filter nodes by partition
ekouts Mar 3, 2022
eac9d18
Add --flex-alloc-singlenode
ekouts Mar 3, 2022
a121d5a
Check all node parameters before making testcase
ekouts Mar 3, 2022
b4651ad
Update --flex-alloc-singlenode behaviour
ekouts Mar 4, 2022
6bdd243
Fix formatting issues
ekouts Mar 4, 2022
0900c18
Take into account cli job options in NodeTestParam
ekouts Mar 4, 2022
ab7749f
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 18, 2022
405fc3c
Merge branch 'refactor/fixture-instantiation' of https://github.com/v…
ekouts Mar 18, 2022
bacd9ca
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 20, 2022
9e26477
Update flex-alloc-singlenode implementation
ekouts Mar 20, 2022
4e3f1ad
Set the prefix in dynamically created tests
ekouts Mar 20, 2022
d95b5ac
Replace testcases_all in flex_alloc_singlenode
ekouts Mar 29, 2022
cc5705b
Add basic unittests
ekouts Mar 29, 2022
7fd49a5
Add more unittest with fixtures
ekouts Apr 7, 2022
46c2096
Address PR comments
ekouts Apr 7, 2022
78b13e9
Remove _rfm_dynamic_test_prefix attribute
ekouts Apr 7, 2022
9cd3648
Rename to dummy_job
ekouts Apr 7, 2022
5e2c4f5
Address PR comments
ekouts Apr 7, 2022
a5cfbb9
Remove unnecessary check
ekouts Apr 7, 2022
f51df33
Address PR comments
ekouts Apr 7, 2022
5cbff99
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 7, 2022
f0c38e2
Fix unittest after master merging
ekouts Apr 7, 2022
5861e68
Update documentation for the distribute cli option
ekouts Apr 8, 2022
7401bae
Set versionadded for pin_nodes attribute
ekouts Apr 8, 2022
b99ab6d
Merge branch 'feat/extend-validx-syntax' of https://github.com/vkarak…
ekouts Apr 8, 2022
62c65c5
Remove skip_system_check and skip_prgenv_check from generate_testcases
ekouts Apr 8, 2022
ceb6e1b
Fix pep8 issues
ekouts Apr 8, 2022
f188029
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 8, 2022
4ac94b5
Address PR comments
ekouts Apr 12, 2022
4aeba8e
Move distribute_tests and getallnodes to a separate module
ekouts Apr 12, 2022
91f699a
Remove unused imports
ekouts Apr 12, 2022
4810a5e
Remove empty line
ekouts Apr 12, 2022
994354d
Remove incorrect rerun message
ekouts Apr 12, 2022
a15a0b0
Address PR comments
ekouts Apr 12, 2022
a0ed9b3
Add formatting function for nodelist
ekouts Apr 12, 2022
9e69b34
Add default to distribute option
ekouts Apr 12, 2022
7ef2a45
Make test_distribute_testcases stricter
ekouts Apr 13, 2022
7c9e5fa
Remove _D_ from class name
ekouts Apr 13, 2022
212cd8b
Address comments
ekouts Apr 13, 2022
19b6aac
Split long line
ekouts Apr 13, 2022
9f6d4ed
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 13, 2022
8f7c399
Fix bug in cli options
ekouts Apr 13, 2022
6e56af7
Small fix
ekouts Apr 13, 2022
1eef8b5
Update documentation
Apr 13, 2022
dacd357
Merge branch 'feat/alt_flex_alloc' of github.com:ekouts/reframe into …
Apr 13, 2022
cf56231
Code style fixes
Apr 13, 2022
631be2e
Rename distribute_tests.py to distribute.py
Apr 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/manpage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,36 @@ Options controlling ReFrame execution

.. versionadded:: 3.2

.. option:: --distribute[=NODESTATE]

Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions.

ReFrame will parameterize and run the tests on the selected nodes.
Effectively, it will dynamically create new tests that inherit from the original tests and add a new parameter named ``$nid`` which contains the list of nodes that the test must run on.
The new tests are named with the following pattern ``{orig_test_basename}_{partition_fullname}``.

When determining the list of nodes to distribute the selected tests, ReFrame will take into account any job options passed through the :option:`-J` option.

You can optionally specify the state of the nodes to consider when distributing the test through the ``NODESTATE`` argument:

- ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the nodes' state.
- ``idle``: Tests will run on all *idle* nodes of their respective valid partitions.
- ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE`` of their respective valid partitions.
If ``NODESTATE`` is not specified, ``idle`` will be assumed.

The state of the nodes will be determined once, before beginning the
execution of the tests, so it might be different at the time the tests are actually submitted.

.. note::
Currently, only single-node jobs can be distributed and only local or the Slurm-based backends support this feature.

.. note::
Distributing tests with dependencies is not supported.
However, you can distribute tests that use fixtures.


.. versionadded:: 3.11.0

.. option:: --exec-policy=POLICY

The execution policy to be used for running tests.
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ class MyTest(rfm.RegressionTest):
...

# Get the raw info for variant 0
MyTest.get_variant_info(0, recursive=True)
MyTest.get_variant_info(0, recurse=True)
# {
# 'params': {'p1': 'a'},
# 'fixtures': {
Expand Down Expand Up @@ -846,7 +846,7 @@ def make_test(name, bases, body, methods=None, **kwargs):
class HelloTest(rfm.RunOnlyRegressionTest):
valid_systems = ['*']
valid_prog_environs = ['*']
executable = 'echo',
executable = 'echo'
sanity_patterns: sn.assert_true(1)

hello_cls = HelloTest
Expand Down
12 changes: 12 additions & 0 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: :type: :class:`reframe.core.launchers.JobLauncher`
launcher = variable(JobLauncher)

#: Pin the jobs on the given nodes.
#:
#: The list of nodes will be transformed to a suitable string and be
#: passed to the scheduler's options. Currently it will have an effect
#: only for the Slurm scheduler.
#:
#: :type: :class:`List[str]`
#: :default: ``[]``
#:
#: .. versionadded:: 3.11.0
pin_nodes = variable(typ.List[str], value=[])

# The sched_* arguments are exposed also to the frontend
def __init__(self,
name,
Expand Down
4 changes: 4 additions & 0 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,9 @@ class _LocalNode(sched.Node):
def __init__(self, name):
self._name = name

@property
def name(self):
return self._name

def in_state(self, state):
return state.casefold() == 'idle'
16 changes: 15 additions & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glob
import itertools
import re
import shlex
import time
from argparse import ArgumentParser
from contextlib import suppress
Expand All @@ -19,7 +20,7 @@
JobBlockedError,
JobError,
JobSchedulerError)
from reframe.utility import seconds_to_hms
from reframe.utility import nodelist_abbrev, seconds_to_hms


def slurm_state_completed(state):
Expand Down Expand Up @@ -192,6 +193,14 @@ def emit_preamble(self, job):
else:
hint = 'multithread' if job.use_smt else 'nomultithread'

if job.pin_nodes:
preamble.append(
self._format_option(
nodelist_abbrev(job.pin_nodes),
'--nodelist={0}'
)
)

for opt in job.sched_access:
if not opt.strip().startswith(('-C', '--constraint')):
preamble.append('%s %s' % (self._prefix, opt))
Expand Down Expand Up @@ -297,6 +306,11 @@ def filternodes(self, job, nodes):
# create a mutable list out of the immutable SequenceView that
# sched_access is
options = job.sched_access + job.options + job.cli_options

# Properly split lexically all the arguments in the options list so as
# to treat correctly entries such as '--option foo'.
options = list(itertools.chain.from_iterable(shlex.split(opt)
for opt in options))
option_parser = ArgumentParser()
option_parser.add_argument('--reservation')
option_parser.add_argument('-p', '--partition')
Expand Down
56 changes: 41 additions & 15 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import reframe.utility.typecheck as typ


from reframe.frontend.printer import PrettyPrinter
from reframe.frontend.loader import RegressionCheckLoader
from reframe.frontend.distribute import distribute_tests, getallnodes
from reframe.frontend.executors.policies import (SerialExecutionPolicy,
AsynchronousExecutionPolicy)
from reframe.frontend.executors import Runner, generate_testcases
from reframe.frontend.loader import RegressionCheckLoader
from reframe.frontend.printer import PrettyPrinter


def format_env(envvars):
Expand Down Expand Up @@ -370,6 +371,12 @@ def main():
'--disable-hook', action='append', metavar='NAME', dest='hooks',
default=[], help='Disable a pipeline hook for this run'
)
run_options.add_argument(
'--distribute', action='store', metavar='{all|STATE}',
nargs='?', const='idle',
help=('Distribute the selected single-node jobs on every node that'
'is in STATE (default: "idle"')
)
run_options.add_argument(
'--exec-policy', metavar='POLICY', action='store',
choices=['async', 'serial'], default='async',
Expand Down Expand Up @@ -933,6 +940,19 @@ def print_infoline(param, value):
print_infoline('output directory', repr(session_info['prefix_output']))
printer.info('')
try:
# Need to parse the cli options before loading the tests
parsed_job_options = []
for opt in options.job_options:
opt_split = opt.split('=', maxsplit=1)
optstr = opt_split[0]
valstr = opt_split[1] if len(opt_split) > 1 else ''
if opt.startswith('-') or opt.startswith('#'):
parsed_job_options.append(opt)
elif len(optstr) == 1:
parsed_job_options.append(f'-{optstr} {valstr}')
else:
parsed_job_options.append(f'--{optstr} {valstr}')

# Locate and load checks; `force=True` is not needed for normal
# invocations from the command line and has practically no effect, but
# it is needed to better emulate the behavior of running reframe's CLI
Expand Down Expand Up @@ -1015,6 +1035,22 @@ def _case_failed(t):
f'{len(testcases)} remaining'
)

if options.distribute:
node_map = getallnodes(options.distribute, parsed_job_options)

# Remove the job options that begin with '--nodelist' and '-w', so
# that they do not override those set from the distribute feature.
#
# NOTE: This is Slurm-specific. When support of distributing tests
# is added to other scheduler backends, this needs to be updated,
# too.
parsed_job_options = [
x for x in parsed_job_options
if (not x.startswith('-w') and not x.startswith('--nodelist'))
]
testcases = distribute_tests(testcases, node_map)
testcases_all = testcases

# Prepare for running
printer.debug('Building and validating the full test DAG')
testgraph, skipped_cases = dependencies.build_deps(testcases_all)
Expand Down Expand Up @@ -1194,18 +1230,6 @@ def module_unuse(*paths):
sched_flex_alloc_nodes = options.flex_alloc_nodes

exec_policy.sched_flex_alloc_nodes = sched_flex_alloc_nodes
parsed_job_options = []
for opt in options.job_options:
opt_split = opt.split('=', maxsplit=1)
optstr = opt_split[0]
valstr = opt_split[1] if len(opt_split) > 1 else ''
if opt.startswith('-') or opt.startswith('#'):
parsed_job_options.append(opt)
elif len(optstr) == 1:
parsed_job_options.append(f'-{optstr} {valstr}')
else:
parsed_job_options.append(f'--{optstr} {valstr}')

exec_policy.sched_options = parsed_job_options
if options.maxfail < 0:
raise errors.ConfigError(
Expand Down Expand Up @@ -1236,7 +1260,9 @@ def module_unuse(*paths):
success = True
if runner.stats.failed():
success = False
runner.stats.print_failure_report(printer)
runner.stats.print_failure_report(
printer, not options.distribute
)
if options.failure_stats:
runner.stats.print_failure_stats(printer)

Expand Down
126 changes: 126 additions & 0 deletions reframe/frontend/distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause


import reframe.core.builtins as builtins
import reframe.core.runtime as runtime
import reframe.utility as util

from reframe.core.decorators import TestRegistry
from reframe.core.logging import getlogger
from reframe.core.meta import make_test
from reframe.core.schedulers import Job
from reframe.frontend.executors import generate_testcases


def getallnodes(state='all', jobs_cli_options=None):
rt = runtime.runtime()
nodes = {}
for part in rt.system.partitions:
# This job will not be submitted, it's used only to filter
# the nodes based on the partition configuration
dummy_job = Job.create(part.scheduler,
part.launcher_type(),
name='placeholder-job',
sched_access=part.access,
sched_options=jobs_cli_options)

available_nodes = part.scheduler.allnodes()
available_nodes = part.scheduler.filternodes(dummy_job,
available_nodes)
getlogger().debug(
f'Total available nodes for {part.name}: {len(available_nodes)}'
)

if state.casefold() != 'all':
available_nodes = {n for n in available_nodes
if n.in_state(state)}
getlogger().debug(
f'[F] Selecting nodes in state {state!r}: '
f'available nodes now: {len(available_nodes)}'
)

nodes[part.fullname] = [n.name for n in available_nodes]

return nodes


def _rfm_pin_run_nodes(obj):
nodelist = getattr(obj, '$nid')
if not obj.local:
obj.job.pin_nodes = nodelist


def _rfm_pin_build_nodes(obj):
pin_nodes = getattr(obj, '$nid')
if not obj.local and not obj.build_locally:
obj.build_job.pin_nodes = pin_nodes


def make_valid_systems_hook(systems):
'''Returns a function to be used as a hook that sets the
valid systems.

Since valid_systems change for each generated test, we need to
generate different post-init hooks for each one of them.
'''
def _rfm_set_valid_systems(obj):
obj.valid_systems = systems

return _rfm_set_valid_systems


def distribute_tests(testcases, node_map):
'''Returns new testcases that will be parameterized to run in node of
their partitions based on the nodemap
'''
tmp_registry = TestRegistry()
new_checks = []
# We don't want to register the same check for every environment
# per partition
check_part_combs = set()
for tc in testcases:
check, partition, _ = tc
candidate_comb = (check.unique_name, partition.fullname)
if check.is_fixture() or candidate_comb in check_part_combs:
continue

check_part_combs.add(candidate_comb)
cls = type(check)
variant_info = cls.get_variant_info(
check.variant_num, recurse=True
)
nc = make_test(
f'{cls.__name__}_{partition.fullname.replace(":", "_")}',
(cls,),
{
'valid_systems': [partition.fullname],
'$nid': builtins.parameter(
[[n] for n in node_map[partition.fullname]],
fmt=util.nodelist_abbrev
)
},
methods=[
builtins.run_before('run')(_rfm_pin_run_nodes),
builtins.run_before('compile')(_rfm_pin_build_nodes),
# We re-set the valid system in a hook to make sure that it
# will not be overwriten by a parent post-init hook
builtins.run_after('init')(
make_valid_systems_hook([partition.fullname])
),
]
)
# We have to set the prefix manually
nc._rfm_custom_prefix = check.prefix

for i in range(nc.num_variants):
# Check if this variant should be instantiated
vinfo = nc.get_variant_info(i, recurse=True)
vinfo['params'].pop('$nid')
if vinfo == variant_info:
tmp_registry.add(nc, variant_num=i)

new_checks = tmp_registry.instantiate_all()
return generate_testcases(new_checks)
Loading