Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions docs/manpage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -737,17 +737,15 @@ If no node can be selected, the test will be marked as a failure with an appropr

Available values are the following:

- ``all``: Flexible tests will be assigned as many tasks as needed in order to span over *all* the nodes of the node pool.
- ``STATE``: Flexible tests will be assigned as many tasks as needed in order to span over the nodes that are currently in state ``STATE``.
Querying of the node state and submission of the test job are two separate steps not executed atomically.
It is therefore possible that the number of tasks assigned does not correspond to the actual nodes in the given state.

If this option is not specified, the default allocation policy for flexible tests is 'idle'.
- Any positive integer: Flexible tests will be assigned as many tasks as needed in order to span over the specified number of nodes from the node pool.
- Any of the values supported by the :option:`--distribute` option.
- Any positive integer: flexible tests will be assigned as many tasks as needed in order to span over the specified number of nodes from the node pool.

.. versionchanged:: 3.1
It is now possible to pass an arbitrary node state as a flexible node allocation parameter.

.. versionchanged:: 4.6
Align the state selection with the :option:`--distribute` option.
See the :option:`--distribute` for more details.

---------------------------------------
Options controlling ReFrame environment
Expand Down Expand Up @@ -867,10 +865,12 @@ The way the tests are generated and how they interact with the test filtering op

You can optionally specify the state of the nodes to consider when distributing the test through the ``NODESTATE`` argument:

- ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the nodes' state.
- ``idle``: Tests will run on all *idle* nodes of their respective valid partitions.
- ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE`` of their respective valid partitions.
If ``NODESTATE`` is not specified, ``idle`` will be assumed.
- ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the node state.
- ``avail``: Tests will run on all the nodes of their respective valid partitions that are available for running jobs.
Note that if a node is currently allocated to another job it is still considered as "available."
- ``NODESTATE``: Tests will run on all the nodes of their respective valid partitions that are exclusively in state ``NODESTATE``.
aIf ``NODESTATE`` is not specified, ``idle`` is assumed.
- ``NODESTATE*``: Tests will run on all the nodes of their respective valid partitions that are at least in state ``NODESTATE``.

The state of the nodes will be determined once, before beginning the
execution of the tests, so it might be different at the time the tests are actually submitted.
Expand All @@ -881,9 +881,20 @@ The way the tests are generated and how they interact with the test filtering op
.. note::
Distributing tests with dependencies is not supported, but you can distribute tests that use fixtures.

.. note::
This option is supported only for the ``local``, ``squeue``, ``slurm`` and ``ssh`` scheduler backends.

.. versionadded:: 3.11.0

.. versionadded:: 4.6

The ``avail`` argument is introduced and the ability to differentiate between exclusive and non-exclusive node states.

.. versionchanged:: 4.6

``--distribute=NODESTATE`` now matches nodes that are exclusively in state ``NODESTATE``, so that the default ``--distribute=idle`` will match only the Slurm nodes that are in the ``IDLE`` state exclusively.
To achieve the previous behaviour, you should use ``--distribute=idle*``.


.. option:: -P, --parameterize=[TEST.]VAR=VAL0,VAL1,...

Expand Down
72 changes: 62 additions & 10 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,37 @@ def log(self, message, level=DEBUG2):
getlogger().log(level, f'[S] {self.registered_name}: {message}')


def filter_nodes_by_state(nodelist, state):
'''Filter nodes by their state

:arg nodelist: List of :class:`Node` instances to filter.
:arg state: The state of the nodes.
If ``all``, the initial list is returned untouched.
If ``avail``, only the available nodes will be returned.
All other values are interpretes as a state string.
State match is exclusive unless the ``*`` is added at the end of the
state string.
:returns: the filtered node list
'''
if state == 'avail':
nodelist = {n for n in nodelist if n.is_avail()}
elif state != 'all':
if state.endswith('*'):
# non-exclusive stat match
state = state[:-1]
nodelist = {
n for n in nodelist if n.in_state(state)
}
else:
nodelist = {
n for n in nodelist if n.in_statex(state)
}

return nodelist
nodes[part.fullname] = [n.name for n in nodelist]



class Job(jsonext.JSONSerializable, metaclass=JobMeta):
'''A job descriptor.

Expand Down Expand Up @@ -568,16 +599,10 @@ def guess_num_tasks(self):
)

# Try to guess the number of tasks now
available_nodes = filter_nodes_by_state(
available_nodes, self.sched_flex_alloc_nodes.lower()
)
available_nodes = self.scheduler.filternodes(self, available_nodes)
if self.sched_flex_alloc_nodes.casefold() != 'all':
available_nodes = {n for n in available_nodes
if n.in_state(self.sched_flex_alloc_nodes)}
getlogger().debug(
f'[F] Selecting nodes in state '
f'{self.sched_flex_alloc_nodes!r}: '
f'available nodes now: {len(available_nodes)}'
)

return len(available_nodes) * num_tasks_per_node

def submit(self):
Expand Down Expand Up @@ -619,6 +644,15 @@ class Node(abc.ABC):
:meta private:
'''

@abc.abstractmethod
def in_statex(self, state):
'''Returns whether the node is in the give state exclusively.

:arg state: The node state.
:returns: :class:`True` if the nodes is exclusively
in the requested state.
'''

@abc.abstractmethod
def in_state(self, state):
'''Returns whether the node is in the given state.
Expand All @@ -628,14 +662,32 @@ def in_state(self, state):
:class:`False` otherwise.
'''

@abc.abstractmethod
def is_avail(self):
'''Check whether the node is available for scheduling jobs.'''

def is_down(self):
'''Check whether node is down.

This is the inverse of :func:`is_avail`.
'''
return not self.is_avail()


class AlwaysIdleNode(Node):
def __init__(self, name):
self._name = name
self._state = 'idle'

@property
def name(self):
return self._name

def is_avail(self):
return True

def in_statex(self, state):
return state.lower() == self._state

def in_state(self, state):
return state.casefold() == 'idle'
return self.in_statex(state)
16 changes: 2 additions & 14 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def emit_preamble(self, job):
return []

def allnodes(self):
return [_LocalNode(socket.gethostname())]
return [sched.AlwaysIdleNode(socket.gethostname())]

def filternodes(self, job, nodes):
return [_LocalNode(socket.gethostname())]
return [sched.AlwaysIdleNode(socket.gethostname())]

def _kill_all(self, job):
'''Send SIGKILL to all the processes of the spawned job.'''
Expand Down Expand Up @@ -202,15 +202,3 @@ def _poll_job(self, job):
elif os.WIFSIGNALED(status):
job._state = 'FAILURE'
job._signal = os.WTERMSIG(status)


class _LocalNode(sched.Node):
def __init__(self, name):
self._name = name

@property
def name(self):
return self._name

def in_state(self, state):
return state.casefold() == 'idle'
9 changes: 8 additions & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,15 @@ def in_state(self, state):
return all([self._states >= set(state.upper().split('+')),
self._partitions, self._active_features, self._states])

def in_statex(self, state):
return self._states == set(state.upper().split('+'))

def is_avail(self):
return any(self.in_statex(s)
for s in ('ALLOCATED', 'COMPLETING', 'IDLE'))

def is_down(self):
return bool({'DOWN', 'DRAIN', 'MAINT', 'NO_RESPOND'} & self._states)
return not self.is_avail()

@property
def active_features(self):
Expand Down
5 changes: 3 additions & 2 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def main():

# Test generation options
testgen_options.add_argument(
'--distribute', action='store', metavar='{all|STATE}',
'--distribute', action='store', metavar='{all|avail|STATE}',
nargs='?', const='idle',
help=('Distribute the selected single-node jobs on every node that'
'is in STATE (default: "idle"')
Expand Down Expand Up @@ -1150,7 +1150,8 @@ def _case_failed(t):
testcases = testcases_all

if options.distribute:
node_map = getallnodes(options.distribute, parsed_job_options)
node_map = getallnodes(options.distribute.lower(),
parsed_job_options)

# Remove the job options that begin with '--nodelist' and '-w', so
# that they do not override those set from the distribute feature.
Expand Down
13 changes: 3 additions & 10 deletions reframe/frontend/testgenerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from reframe.core.fields import make_convertible
from reframe.core.logging import getlogger, time_function
from reframe.core.meta import make_test
from reframe.core.schedulers import Job
from reframe.core.schedulers import Job, filter_nodes_by_state
from reframe.frontend.executors import generate_testcases


@time_function
def getallnodes(state='all', jobs_cli_options=None):
def getallnodes(state, jobs_cli_options=None):
rt = runtime.runtime()
nodes = {}
for part in rt.system.partitions:
Expand All @@ -36,14 +36,7 @@ def getallnodes(state='all', jobs_cli_options=None):
f'Total available nodes for {part.name}: {len(available_nodes)}'
)

if state.casefold() != 'all':
available_nodes = {n for n in available_nodes
if n.in_state(state)}
getlogger().debug(
f'[F] Selecting nodes in state {state!r}: '
f'available nodes now: {len(available_nodes)}'
)

available_nodes = filter_nodes_by_state(available_nodes, state)
nodes[part.fullname] = [n.name for n in available_nodes]

return nodes
Expand Down