Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,34 @@ def pipeline_hooks(cls):

#: Number of tasks required by this test.
#:
#: If the number of tasks is set to a number ``<=0``, ReFrame will try to
#: flexibly allocate the number of tasks, based on the command line option
#: |--flex-alloc-nodes|_. A negative number is used to indicate the minimum
#: number of tasks required for the test. In this case the minimum number
#: of tasks is the absolute value of the number, while Setting
#: :attr:`num_tasks` to ``0`` is equivalent to setting it to
#: If the number of tasks is set to zero or a negative value, ReFrame will
#: try to flexibly allocate the number of tasks based on the command line
#: option :option:`--flex-alloc-nodes`. A negative number is used to
#: indicate the minimum number of tasks required for the test. In this case
#: the minimum number of tasks is the absolute value of the number, while
#: Setting :attr:`num_tasks` to zero is equivalent to setting it to
#: :attr:`-num_tasks_per_node
#: <reframe.core.pipeline.RegressionTest.num_tasks_per_node>`.
#:
#: :type: integral
#: Setting :attr:`num_tasks` to :obj:`None` has a scheduler-specific
#: interpretation, but in principle, passes the responsibility of producing
#: a correct job script to the user by setting the appropriate scheduler
#: options. More specifically, the different backends interpret the
#: :obj:`None` :attr:`num_tasks` as follows:
#:
#: - ``flux``: not applicable.
#: - ``local``: not applicable.
#: - ``lsf``: Neither the ``-nnodes`` nor the ``-n`` will be emitted.
#: - ``oar``: Resets it to ``1``.
#: - ``pbs``: Resets it to ``1``.
#: - ``sge``: not applicable.
#: - ``slurm``: Neither the ``--ntasks`` nor the ``--nodes`` option (if the
#: :attr:`~config.systems.partitions.sched_options.use_nodes_option` is
#: specified) will be emitted.
#: - ``squeue``: See ``slurm`` backend.
#: - ``torque``: See ``pbs`` backend.
#:
#: :type: integral or :obj:`None`
#: :default: ``1``
#:
#: .. note::
Expand All @@ -542,10 +560,9 @@ def pipeline_hooks(cls):
#: .. versionchanged:: 2.16
#: Negative :attr:`num_tasks` is allowed for specifying the minimum
#: number of required tasks by the test.
#:
#: .. |--flex-alloc-nodes| replace:: :attr:`--flex-alloc-nodes`
#: .. _--flex-alloc-nodes: manpage.html#cmdoption-flex-alloc-nodes
num_tasks = variable(int, value=1, loggable=True)
#: .. versionchanged:: 4.1
#: Allow :attr:`num_tasks` to be :obj:`None`.
num_tasks = variable(int, type(None), value=1, loggable=True)

#: Number of tasks per node required by this test.
#:
Expand Down
9 changes: 6 additions & 3 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks = variable(int, value=1)
#:
#: .. versionchanged:: 4.1
#: Allow :obj:`None` values
num_tasks = variable(int, type(None), value=1)

#: Number of tasks per node for this job.
#:
Expand Down Expand Up @@ -487,7 +490,7 @@ def nodelist(self):
This attribute might be useful in a flexible regression test for
determining the actual nodes that were assigned to the test.
For more information on flexible node allocation, see the
|--flex-alloc-nodes|_ command-line option
:option:`--flex-alloc-nodes` command-line option.

This attribute is *not* supported by the ``pbs`` scheduler backend.

Expand All @@ -514,7 +517,7 @@ def submit_time(self):

def prepare(self, commands, environs=None, prepare_cmds=None, **gen_opts):
environs = environs or []
if self.num_tasks <= 0:
if self.num_tasks is not None and self.num_tasks <= 0:
getlogger().debug(f'[F] Flexible node allocation requested')
num_tasks_per_node = self.num_tasks_per_node or 1
min_num_tasks = (-self.num_tasks if self.num_tasks else
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def emit_preamble(self, job):
self._format_option(job.stderr, '-e {0}'),
]

if job.num_tasks_per_node is not None:
if job.num_tasks is not None and job.num_tasks_per_node is not None:
num_nodes = job.num_tasks // job.num_tasks_per_node
preamble.append(self._format_option(num_nodes, '-nnodes {0}'))
else:
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/schedulers/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def emit_preamble(self, job):
tasks_opt += ',walltime=%d:%d:%d' % (h, m, s)

# Get number of nodes in the reservation
num_tasks = job.num_tasks or 1
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = job.num_tasks // num_tasks_per_node
num_nodes = num_tasks // num_tasks_per_node

# Emit main resource reservation option
options = [tasks_opt.format(
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ def __init__(self):
self._submit_timeout = self.get_option('job_submit_timeout')

def _emit_lselect_option(self, job):
num_tasks = job.num_tasks or 1
num_tasks_per_node = job.num_tasks_per_node or 1
num_cpus_per_task = job.num_cpus_per_task or 1
num_nodes = job.num_tasks // num_tasks_per_node
num_nodes = num_tasks // num_tasks_per_node
num_cpus_per_node = num_tasks_per_node * num_cpus_per_task
select_opt = self.TASKS_OPT.format(
num_nodes=num_nodes,
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def emit_preamble(self, job):
self._format_option(job.exclusive_access, '--exclusive')
)

if self._use_nodes_opt:
if self._use_nodes_opt and job.num_tasks is not None:
num_nodes = job.num_tasks // job.num_tasks_per_node
preamble.append(self._format_option(num_nodes, '--nodes={0}'))

Expand Down
72 changes: 65 additions & 7 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def minimal_job(make_job):
return make_job()


@pytest.fixture
def no_tasks_job(make_job):
ret = make_job()
ret.num_tasks = None
return ret


@pytest.fixture
def fake_job(make_job):
ret = make_job(sched_options=['--account=spam'])
Expand Down Expand Up @@ -157,6 +164,23 @@ def _expected_lsf_directives(job):
])


def _expected_lsf_directives_minimal(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}',
f'#BSUB -n {job.num_tasks}'
])


def _expected_lsf_directives_no_tasks(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}'
])


def _expected_flux_directives(job):
return set()

Expand All @@ -165,13 +189,8 @@ def _expected_flux_directives_minimal(job):
return set()


def _expected_lsf_directives_minimal(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}',
f'#BSUB -n {job.num_tasks}'
])
def _expected_flux_directives_no_tasks(job):
return set()


def _expected_sge_directives(job):
Expand Down Expand Up @@ -199,6 +218,9 @@ def _expected_sge_directives_minimal(job):
])


_expected_sge_directives_no_tasks = _expected_sge_directives_minimal


def _expected_slurm_directives(job):
return set([
'#SBATCH --job-name="testjob"',
Expand Down Expand Up @@ -229,8 +251,17 @@ def _expected_slurm_directives_minimal(job):
])


def _expected_slurm_directives_no_tasks(job):
return set([
'#SBATCH --job-name="testjob"',
'#SBATCH --output=%s' % job.stdout,
'#SBATCH --error=%s' % job.stderr,
])


_expected_squeue_directives = _expected_slurm_directives
_expected_squeue_directives_minimal = _expected_slurm_directives_minimal
_expected_squeue_directives_no_tasks = _expected_slurm_directives_no_tasks


def _expected_pbs_directives(job):
Expand Down Expand Up @@ -258,6 +289,9 @@ def _expected_pbs_directives_minimal(job):
])


_expected_pbs_directives_no_tasks = _expected_pbs_directives_minimal


def _expected_torque_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node
Expand All @@ -284,6 +318,9 @@ def _expected_torque_directives_minimal(job):
])


_expected_torque_directives_no_tasks = _expected_torque_directives_minimal


def _expected_oar_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_tasks_per_node = job.num_tasks_per_node
Expand All @@ -308,6 +345,9 @@ def _expected_oar_directives_minimal(job):
])


_expected_oar_directives_no_tasks = _expected_oar_directives_minimal


def _expected_local_directives(job):
return set()

Expand All @@ -316,6 +356,10 @@ def _expected_local_directives_minimal(job):
return set()


def _expected_local_directives_no_tasks(job):
return set()


def test_prepare(fake_job):
sched_name = fake_job.scheduler.registered_name
if sched_name == 'pbs':
Expand Down Expand Up @@ -347,6 +391,20 @@ def test_prepare_minimal(minimal_job):
assert expected_directives(minimal_job) == found_directives


def test_prepare_no_tasks(no_tasks_job):
prepare_job(no_tasks_job)
with open(no_tasks_job.script_filename) as fp:
found_directives = set(re.findall(r'^\#\S+ .*', fp.read(),
re.MULTILINE))

sched_name = no_tasks_job.scheduler.registered_name
expected_directives = globals()[
f'_expected_{sched_name}_directives_no_tasks'
]
assert_job_script_sanity(no_tasks_job)
assert expected_directives(no_tasks_job) == found_directives


def test_prepare_no_exclusive(make_job, slurm_only):
job = make_job()
job.exclusive_access = False
Expand Down