Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarak committed Mar 17, 2023
2 parents e527403 + 684a39d commit 3cc0275
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 117 deletions.
10 changes: 8 additions & 2 deletions docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ System Configuration
To understand the difference between the different execution contexts, please refer to ":ref:`execution-contexts`"
For the available scheduler options, see the :attr:`~config.systems.partitions.sched_options` in the partition configuration below.

.. versionadded:: 4.0.0
.. versionadded:: 4.1

.. warning::
This option is broken in 4.0.


------------------------------
Expand Down Expand Up @@ -307,8 +310,10 @@ System Partition Configuration
Scheduler-specific options for this partition.
See below for the available options.

.. versionadded:: 4.0.0
.. versionadded:: 4.1

.. warning::
This option is broken in 4.0.

.. py:attribute:: systems.partitions.sched_options.ignore_reqnodenotavail
Expand Down Expand Up @@ -1511,6 +1516,7 @@ General Configuration

ReFrame's asynchronous execution policy will try to advance as many tests as possible in their pipeline, but some tests may take too long to proceed (e.g., due to copying of large files) blocking the advancement of previously started tests.
If this timeout value is exceeded and at least one test has progressed, ReFrame will stop processing new tests and it will try to further advance tests that have already started.
See :ref:`pipeline-timeout` for more guidance on how to set this.

:required: No
:default: ``10``
Expand Down
17 changes: 17 additions & 0 deletions docs/manpage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,23 @@ Whenever an environment variable is associated with a configuration option, its
================================== ==================


.. envvar:: RFM_PIPELINE_TIMEOUT

Timeout in seconds for advancing the pipeline in the asynchronous execution policy.
See :ref:`pipeline-timeout` for more guidance on how to set this.


.. table::
:align: left

================================== ==================
Associated command line option N/A
Associated configuration parameter :attr:`~config.general.pipeline_timeout`
================================== ==================

.. versionadded:: 3.10.0


.. envvar:: RFM_PREFIX

General directory prefix for ReFrame-generated directories.
Expand Down
17 changes: 17 additions & 0 deletions docs/pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ To control the concurrency of the ReFrame execution context, users should set th
Execution contexts were formalized.


.. _pipeline-timeout:

-------------------------------------------------------------------------------------------
Tweaking the throughput and interactivity of test jobs in the asynchronous execution policy
-------------------------------------------------------------------------------------------

ReFrame's asynchronous execution policy will iteratively cycle through all the in-flight tests and will try to advance the state (see state diagram above) of as many as possible within a given time slot.
The duration of this time slot is controlled by the :attr:`~config.general.pipeline_timeout` configuration option or the :envvar:`RFM_PIPELINE_TIMEOUT` environment variable.
If this timeout expires and at least one test has progressed, ReFrame will stop processing new tests in this time slot.
In the next time slot, it will try to further advance tests that have already started and if there is enough time left, it will also start new tests.
Essentially, a small timeout value gives preference to tests that have already started, thus pushing them quicker down their pipeline, whereas higher values give preference to overall test throughput, as more tests will be running concurrently.
The default timeout is 10 seconds in order to balance interactivity and overall throughput.

There are cases when some tests take too long to proceed (e.g., due to copying of large files) and as a result they are blocking more tests from starting their pipeline.
In these cases, a higher timeout value will help to increase the test concurrency and therefore the overall throughput.


Timing the Test Pipeline
------------------------

Expand Down
2 changes: 1 addition & 1 deletion hpctestlib/microbenchmarks/gpu/gpu_burn.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class gpu_burn_check(rfm.RunOnlyRegressionTest):
valid_systems = ['+gpu']
valid_prog_environs = ['+cuda', '+hip']

@run_after('init')
@run_before('run')
def set_exec_opts(self):
if self.use_dp:
self.executable_opts += ['-d']
Expand Down
2 changes: 1 addition & 1 deletion reframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import sys

VERSION = '4.2.0-dev.0'
VERSION = '4.2.0-dev.1'
INSTALL_PREFIX = os.path.normpath(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
)
Expand Down
41 changes: 6 additions & 35 deletions reframe/core/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
# Useful descriptors for advanced operations on fields
#

import datetime
import re

import reframe.utility.typecheck as types
from reframe.core.warnings import user_deprecation_warning
from reframe.utility import ScopedDict
Expand Down Expand Up @@ -65,9 +62,11 @@ def __set__(self, obj, value):
class TypedField(Field):
'''Stores a field of predefined type'''

def __init__(self, main_type, *other_types, attr_name=None):
def __init__(self, main_type, *other_types,
attr_name=None, allow_implicit=False):
super().__init__(attr_name)
self._types = (main_type,) + other_types
self._allow_implicit = allow_implicit
if not all(isinstance(t, type) for t in self._types):
raise TypeError('{0} is not a sequence of types'.
format(self._types))
Expand All @@ -88,8 +87,9 @@ def __set__(self, obj, value):
self._check_type(value)
except TypeError:
raw_value = remove_convertible(value)
if raw_value is value:
# value was not convertible; reraise
if raw_value is value and not self._allow_implicit:
# value was not convertible and the field does not allow
# implicit conversions; re-raise
raise

# Try to convert value to any of the supported types
Expand Down Expand Up @@ -137,35 +137,6 @@ def __set__(self, obj, value):
raise ValueError('attempt to set a read-only variable')


class TimerField(TypedField):
'''Stores a timer in the form of a :class:`datetime.timedelta` object'''

def __init__(self, *other_types, attr_name=None):
super().__init__(str, int, float, *other_types, attr_name=attr_name)

def __set__(self, obj, value):
value = remove_convertible(value)
self._check_type(value)
if isinstance(value, str):
time_match = re.match(r'^((?P<days>\d+)d)?'
r'((?P<hours>\d+)h)?'
r'((?P<minutes>\d+)m)?'
r'((?P<seconds>\d+)s)?$',
value)
if not time_match:
raise ValueError('invalid format for timer field')

value = datetime.timedelta(
**{k: int(v) for k, v in time_match.groupdict().items() if v}
).total_seconds()
elif isinstance(value, float) or isinstance(value, int):
if value < 0:
raise ValueError('timer field value cannot be negative')

# Call Field's __set__() method, type checking is already performed
Field.__set__(self, obj, value)


class ScopedDictField(TypedField):
'''Stores a ScopedDict with a specific type.
Expand Down
8 changes: 6 additions & 2 deletions reframe/core/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,10 @@ def make_test(name, bases, body, methods=None, module=None, **kwargs):
.. code-block:: python
hello_cls = rfm.make_test(
from reframe.core.meta import make_test
hello_cls = make_test(
'HelloTest', (rfm.RunOnlyRegressionTest,),
{
'valid_systems': ['*'],
Expand Down Expand Up @@ -860,6 +863,7 @@ class HelloTest(rfm.RunOnlyRegressionTest):
.. code-block:: python
import reframe.core.builtins as builtins
from reframe.core.meta import make_test
def set_message(obj):
Expand All @@ -868,7 +872,7 @@ def set_message(obj):
def validate(obj):
return sn.assert_found(obj.message, obj.stdout)
hello_cls = rfm.make_test(
hello_cls = make_test(
'HelloTest', (rfm.RunOnlyRegressionTest,),
{
'valid_systems': ['*'],
Expand Down
13 changes: 6 additions & 7 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,8 @@ def pipeline_hooks(cls):
#:
#: :type: :class:`str` or :class:`datetime.timedelta`
#: :default: :class:`None`
max_pending_time = variable(
type(None), field=fields.TimerField, value=None, loggable=True
)
max_pending_time = variable(type(None), typ.Duration, value=None,
loggable=True, allow_implicit=True)

#: Specify whether this test needs exclusive access to nodes.
#:
Expand Down Expand Up @@ -860,8 +859,8 @@ def pipeline_hooks(cls):
#: .. versionchanged:: 3.5.1
#: The default value is now :class:`None` and it can be set globally
#: per partition via the configuration.
time_limit = variable(type(None), field=fields.TimerField,
value=None, loggable=True)
time_limit = variable(type(None), typ.Duration, value=None,
loggable=True, allow_implicit=True)

#: .. versionadded:: 3.5.1
#:
Expand All @@ -871,8 +870,8 @@ def pipeline_hooks(cls):
#:
#: :type: :class:`str` or :class:`float` or :class:`int`
#: :default: :class:`None`
build_time_limit = variable(type(None), field=fields.TimerField,
value=None, loggable=True)
build_time_limit = variable(type(None), typ.Duration, value=None,
loggable=True, allow_implicit=True)

#: .. versionadded:: 2.8
#:
Expand Down
8 changes: 4 additions & 4 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import abc
import time

import reframe.core.fields as fields
import reframe.core.runtime as runtime
import reframe.core.shell as shell
import reframe.utility.jsonext as jsonext
Expand Down Expand Up @@ -261,7 +260,8 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
time_limit = variable(type(None), field=fields.TimerField, value=None)
time_limit = variable(type(None), typ.Duration,
value=None, allow_implicit=True)

#: Maximum pending time for this job.
#:
Expand All @@ -273,8 +273,8 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
max_pending_time = variable(type(None),
field=fields.TimerField, value=None)
max_pending_time = variable(type(None), typ.Duration,
value=None, allow_implicit=True)

#: Arbitrary options to be passed to the backend job scheduler.
#:
Expand Down
3 changes: 2 additions & 1 deletion reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ def print_infoline(param, value):
'config_files': rt.site_config.sources,
'data_version': runreport.DATA_VERSION,
'hostname': socket.gethostname(),
'log_files': logging.log_files(),
'prefix_output': rt.output_prefix,
'prefix_stage': rt.stage_prefix,
'user': osext.osuser(),
Expand All @@ -950,7 +951,7 @@ def print_infoline(param, value):
print_infoline('stage directory', repr(session_info['prefix_stage']))
print_infoline('output directory', repr(session_info['prefix_output']))
print_infoline('log files',
', '.join(repr(s) for s in logging.log_files()))
', '.join(repr(s) for s in session_info['log_files']))
printer.info('')
try:
logging.getprofiler().enter_region('test processing')
Expand Down
2 changes: 1 addition & 1 deletion reframe/frontend/runreport.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# The schema data version
# Major version bumps are expected to break the validation of previous schemas

DATA_VERSION = '3.0'
DATA_VERSION = '3.1'
_SCHEMA = os.path.join(rfm.INSTALL_PREFIX, 'reframe/schemas/runreport.json')


Expand Down
4 changes: 4 additions & 0 deletions reframe/schemas/runreport.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
},
"data_version": {"type": "string"},
"hostname": {"type": "string"},
"log_files": {
"type": "array",
"items": {"type": "string"}
},
"num_cases": {"type": "number"},
"num_failures": {"type": "number"},
"num_aborted": {"type": "number"},
Expand Down
64 changes: 49 additions & 15 deletions reframe/utility/typecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
'''

import abc
import datetime
import re


Expand Down Expand Up @@ -152,19 +153,6 @@ def __call__(cls, *args, **kwargs):
# container types


class _CompositeType(abc.ABCMeta):
def __instancecheck__(cls, inst):
assert hasattr(cls, '_types') and len(cls._types) == 2
return (issubclass(type(inst), cls._types[0]) or
issubclass(type(inst), cls._types[1]))


class _InvertedType(abc.ABCMeta):
def __instancecheck__(cls, inst):
assert hasattr(cls, '_xtype')
return not issubclass(type(inst), cls._xtype)


class _BuiltinType(ConvertibleType):
def __init__(cls, name, bases, namespace):
# Make sure that the class defines `_type`
Expand All @@ -175,8 +163,7 @@ def __init__(cls, name, bases, namespace):

def __instancecheck__(cls, inst):
if hasattr(cls, '_types'):
return (issubclass(type(inst), cls._types[0]) or
issubclass(type(inst), cls._types[1]))
return any(issubclass(type(inst), t) for t in cls._types)

if hasattr(cls, '_xtype'):
return not issubclass(type(inst), cls._xtype)
Expand Down Expand Up @@ -424,3 +411,50 @@ def make_meta_type(name, cls, metacls=_BuiltinType):
Set = make_meta_type('Set', set, _SequenceType)
Str = make_meta_type('Str', str, _StrType)
Tuple = make_meta_type('Tuple', tuple, _TupleType)


class Duration(metaclass=ConvertibleType):
'''A float type that represents duration in seconds.
This type supports the following implicit conversions:
- From integer values
- From string values in the form of ``<days>d<hours>h<minutes>m<seconds>s``
.. versionadded:: 4.2
'''

def _assert_non_negative(val):
if val < 0:
raise ValueError('duration cannot be negative')

return val

@classmethod
def __rfm_cast_int__(cls, val):
return Duration._assert_non_negative(float(val))

@classmethod
def __rfm_cast_float__(cls, val):
return Duration._assert_non_negative(val)

@classmethod
def __rfm_cast_str__(cls, val):
# First try to convert to a float
try:
val = float(val)
except ValueError:
time_match = re.match(r'^((?P<days>\d+)d)?'
r'((?P<hours>\d+)h)?'
r'((?P<minutes>\d+)m)?'
r'((?P<seconds>\d+)s)?$',
val)
if not time_match:
raise ValueError(f'invalid duration: {val}') from None

val = datetime.timedelta(
**{k: int(v) for k, v in time_match.groupdict().items() if v}
).total_seconds()

return Duration._assert_non_negative(val)

0 comments on commit 3cc0275

Please sign in to comment.