Skip to content

Commit

Permalink
Merge pull request #2778 from vkarak/feat/no-num-tasks
Browse files Browse the repository at this point in the history
[feat] Allow `num_tasks` to be `None`
  • Loading branch information
vkarak committed Feb 14, 2023
2 parents 6c3d163 + 7e1e808 commit b6e14c6
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 25 deletions.
39 changes: 28 additions & 11 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,34 @@ def pipeline_hooks(cls):

#: Number of tasks required by this test.
#:
#: If the number of tasks is set to a number ``<=0``, ReFrame will try to
#: flexibly allocate the number of tasks, based on the command line option
#: |--flex-alloc-nodes|_. A negative number is used to indicate the minimum
#: number of tasks required for the test. In this case the minimum number
#: of tasks is the absolute value of the number, while Setting
#: :attr:`num_tasks` to ``0`` is equivalent to setting it to
#: If the number of tasks is set to zero or a negative value, ReFrame will
#: try to flexibly allocate the number of tasks based on the command line
#: option :option:`--flex-alloc-nodes`. A negative number is used to
#: indicate the minimum number of tasks required for the test. In this case
#: the minimum number of tasks is the absolute value of the number, while
#: Setting :attr:`num_tasks` to zero is equivalent to setting it to
#: :attr:`-num_tasks_per_node
#: <reframe.core.pipeline.RegressionTest.num_tasks_per_node>`.
#:
#: :type: integral
#: Setting :attr:`num_tasks` to :obj:`None` has a scheduler-specific
#: interpretation, but in principle, passes the responsibility of producing
#: a correct job script to the user by setting the appropriate scheduler
#: options. More specifically, the different backends interpret the
#: :obj:`None` :attr:`num_tasks` as follows:
#:
#: - ``flux``: not applicable.
#: - ``local``: not applicable.
#: - ``lsf``: Neither the ``-nnodes`` nor the ``-n`` will be emitted.
#: - ``oar``: Resets it to ``1``.
#: - ``pbs``: Resets it to ``1``.
#: - ``sge``: not applicable.
#: - ``slurm``: Neither the ``--ntasks`` nor the ``--nodes`` option (if the
#: :attr:`~config.systems.partitions.sched_options.use_nodes_option` is
#: specified) will be emitted.
#: - ``squeue``: See ``slurm`` backend.
#: - ``torque``: See ``pbs`` backend.
#:
#: :type: integral or :obj:`None`
#: :default: ``1``
#:
#: .. note::
Expand All @@ -542,10 +560,9 @@ def pipeline_hooks(cls):
#: .. versionchanged:: 2.16
#: Negative :attr:`num_tasks` is allowed for specifying the minimum
#: number of required tasks by the test.
#:
#: .. |--flex-alloc-nodes| replace:: :attr:`--flex-alloc-nodes`
#: .. _--flex-alloc-nodes: manpage.html#cmdoption-flex-alloc-nodes
num_tasks = variable(int, value=1, loggable=True)
#: .. versionchanged:: 4.1
#: Allow :attr:`num_tasks` to be :obj:`None`.
num_tasks = variable(int, type(None), value=1, loggable=True)

#: Number of tasks per node required by this test.
#:
Expand Down
9 changes: 6 additions & 3 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks = variable(int, value=1)
#:
#: .. versionchanged:: 4.1
#: Allow :obj:`None` values
num_tasks = variable(int, type(None), value=1)

#: Number of tasks per node for this job.
#:
Expand Down Expand Up @@ -487,7 +490,7 @@ def nodelist(self):
This attribute might be useful in a flexible regression test for
determining the actual nodes that were assigned to the test.
For more information on flexible node allocation, see the
|--flex-alloc-nodes|_ command-line option
:option:`--flex-alloc-nodes` command-line option.
This attribute is *not* supported by the ``pbs`` scheduler backend.
Expand All @@ -514,7 +517,7 @@ def submit_time(self):

def prepare(self, commands, environs=None, prepare_cmds=None, **gen_opts):
environs = environs or []
if self.num_tasks <= 0:
if self.num_tasks is not None and self.num_tasks <= 0:
getlogger().debug(f'[F] Flexible node allocation requested')
num_tasks_per_node = self.num_tasks_per_node or 1
min_num_tasks = (-self.num_tasks if self.num_tasks else
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def emit_preamble(self, job):
self._format_option(job.stderr, '-e {0}'),
]

if job.num_tasks_per_node is not None:
if job.num_tasks is not None and job.num_tasks_per_node is not None:
num_nodes = job.num_tasks // job.num_tasks_per_node
preamble.append(self._format_option(num_nodes, '-nnodes {0}'))
else:
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/schedulers/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def emit_preamble(self, job):
tasks_opt += ',walltime=%d:%d:%d' % (h, m, s)

# Get number of nodes in the reservation
num_tasks = job.num_tasks or 1
num_tasks_per_node = job.num_tasks_per_node or 1
num_nodes = job.num_tasks // num_tasks_per_node
num_nodes = num_tasks // num_tasks_per_node

# Emit main resource reservation option
options = [tasks_opt.format(
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ def __init__(self):
self._submit_timeout = self.get_option('job_submit_timeout')

def _emit_lselect_option(self, job):
num_tasks = job.num_tasks or 1
num_tasks_per_node = job.num_tasks_per_node or 1
num_cpus_per_task = job.num_cpus_per_task or 1
num_nodes = job.num_tasks // num_tasks_per_node
num_nodes = num_tasks // num_tasks_per_node
num_cpus_per_node = num_tasks_per_node * num_cpus_per_task
select_opt = self.TASKS_OPT.format(
num_nodes=num_nodes,
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def emit_preamble(self, job):
self._format_option(job.exclusive_access, '--exclusive')
)

if self._use_nodes_opt:
if self._use_nodes_opt and job.num_tasks is not None:
num_nodes = job.num_tasks // job.num_tasks_per_node
preamble.append(self._format_option(num_nodes, '--nodes={0}'))

Expand Down
72 changes: 65 additions & 7 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def minimal_job(make_job):
return make_job()


@pytest.fixture
def no_tasks_job(make_job):
ret = make_job()
ret.num_tasks = None
return ret


@pytest.fixture
def fake_job(make_job):
ret = make_job(sched_options=['--account=spam'])
Expand Down Expand Up @@ -157,6 +164,23 @@ def _expected_lsf_directives(job):
])


def _expected_lsf_directives_minimal(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}',
f'#BSUB -n {job.num_tasks}'
])


def _expected_lsf_directives_no_tasks(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}'
])


def _expected_flux_directives(job):
return set()

Expand All @@ -165,13 +189,8 @@ def _expected_flux_directives_minimal(job):
return set()


def _expected_lsf_directives_minimal(job):
return set([
f'#BSUB -J testjob',
f'#BSUB -o {job.stdout}',
f'#BSUB -e {job.stderr}',
f'#BSUB -n {job.num_tasks}'
])
def _expected_flux_directives_no_tasks(job):
return set()


def _expected_sge_directives(job):
Expand Down Expand Up @@ -199,6 +218,9 @@ def _expected_sge_directives_minimal(job):
])


_expected_sge_directives_no_tasks = _expected_sge_directives_minimal


def _expected_slurm_directives(job):
return set([
'#SBATCH --job-name="testjob"',
Expand Down Expand Up @@ -229,8 +251,17 @@ def _expected_slurm_directives_minimal(job):
])


def _expected_slurm_directives_no_tasks(job):
return set([
'#SBATCH --job-name="testjob"',
'#SBATCH --output=%s' % job.stdout,
'#SBATCH --error=%s' % job.stderr,
])


_expected_squeue_directives = _expected_slurm_directives
_expected_squeue_directives_minimal = _expected_slurm_directives_minimal
_expected_squeue_directives_no_tasks = _expected_slurm_directives_no_tasks


def _expected_pbs_directives(job):
Expand Down Expand Up @@ -258,6 +289,9 @@ def _expected_pbs_directives_minimal(job):
])


_expected_pbs_directives_no_tasks = _expected_pbs_directives_minimal


def _expected_torque_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node
Expand All @@ -284,6 +318,9 @@ def _expected_torque_directives_minimal(job):
])


_expected_torque_directives_no_tasks = _expected_torque_directives_minimal


def _expected_oar_directives(job):
num_nodes = job.num_tasks // job.num_tasks_per_node
num_tasks_per_node = job.num_tasks_per_node
Expand All @@ -308,6 +345,9 @@ def _expected_oar_directives_minimal(job):
])


_expected_oar_directives_no_tasks = _expected_oar_directives_minimal


def _expected_local_directives(job):
return set()

Expand All @@ -316,6 +356,10 @@ def _expected_local_directives_minimal(job):
return set()


def _expected_local_directives_no_tasks(job):
return set()


def test_prepare(fake_job):
sched_name = fake_job.scheduler.registered_name
if sched_name == 'pbs':
Expand Down Expand Up @@ -347,6 +391,20 @@ def test_prepare_minimal(minimal_job):
assert expected_directives(minimal_job) == found_directives


def test_prepare_no_tasks(no_tasks_job):
prepare_job(no_tasks_job)
with open(no_tasks_job.script_filename) as fp:
found_directives = set(re.findall(r'^\#\S+ .*', fp.read(),
re.MULTILINE))

sched_name = no_tasks_job.scheduler.registered_name
expected_directives = globals()[
f'_expected_{sched_name}_directives_no_tasks'
]
assert_job_script_sanity(no_tasks_job)
assert expected_directives(no_tasks_job) == found_directives


def test_prepare_no_exclusive(make_job, slurm_only):
job = make_job()
job.exclusive_access = False
Expand Down

0 comments on commit b6e14c6

Please sign in to comment.