Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reframe/core/launchers/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def command(self, job):
if job.num_cpus_per_task:
ret += ['--cpus-per-task=%s' % str(job.num_cpus_per_task)]

if job.sched_exclusive_access:
if job.exclusive_access:
ret += ['--exclusive']

if job.use_smt is not None:
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,9 +1514,7 @@ def _setup_job(self, name, force_local=False, **job_opts):
launcher,
name=name,
workdir=self._stagedir,
max_pending_time=self.max_pending_time,
sched_access=self._current_partition.access,
sched_exclusive_access=self.exclusive_access,
**job_opts)

def _setup_perf_logging(self):
Expand Down Expand Up @@ -1768,6 +1766,8 @@ def run(self):
self.job.time_limit = (self.time_limit or rt.runtime().get_option(
f'systems/0/partitions/@{self.current_partition.name}/time_limit')
)
self.job.max_pending_time = self.max_pending_time
self.job.exclusive_access = self.exclusive_access
exec_cmd = [self.job.launcher.run_command(self.job),
self.executable, *self.executable_opts]

Expand Down
152 changes: 120 additions & 32 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
from reframe.core.exceptions import JobError, JobNotStartedError
from reframe.core.launchers import JobLauncher
from reframe.core.logging import getlogger, DEBUG2
from reframe.core.meta import RegressionTestMeta


class JobMeta(RegressionTestMeta, abc.ABCMeta):
'''Job metaclass.'''


class JobScheduler(abc.ABC):
Expand Down Expand Up @@ -112,7 +117,7 @@ def log(self, message, level=DEBUG2):
getlogger().log(level, f'[S] {self.registered_name}: {message}')


class Job(jsonext.JSONSerializable):
class Job(jsonext.JSONSerializable, metaclass=JobMeta):
'''A job descriptor.

A job descriptor is created by the framework after the "setup" phase and
Expand All @@ -123,19 +128,120 @@ class Job(jsonext.JSONSerializable):

'''

num_tasks = fields.TypedField(int)
num_tasks_per_node = fields.TypedField(int, type(None))
num_tasks_per_core = fields.TypedField(int, type(None))
num_tasks_per_socket = fields.TypedField(int, type(None))
num_cpus_per_task = fields.TypedField(int, type(None))
use_smt = fields.TypedField(bool, type(None))
time_limit = fields.TimerField(type(None))
#: Number of tasks for this job.
#:
#: :type: integral
#: :default: ``1``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks = variable(int, value=1)

#: Number of tasks per node for this job.
#:
#: :type: integral or :class:`NoneType`
#: :default: ``None``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks_per_node = variable(int, type(None), value=None)

#: Number of tasks per core for this job.
#:
#: :type: integral or :class:`NoneType`
#: :default: ``None``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks_per_core = variable(int, type(None), value=None)

#: Number of tasks per socket for this job.
#:
#: :type: integral or :class:`NoneType`
#: :default: ``None``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_tasks_per_socket = variable(int, type(None), value=None)

#: Number of processing elements associated with each task for this job.
#:
#: :type: integral or :class:`NoneType`
#: :default: ``None``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
num_cpus_per_task = variable(int, type(None), value=None)

#: Enable SMT for this job.
#:
#: :type: :class:`bool` or :class:`NoneType`
#: :default: ``None``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
use_smt = variable(bool, type(None), value=None)

#: Request exclusive access on the nodes for this job.
#:
#: :type: :class:`bool`
#: :default: ``false``
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
exclusive_access = variable(bool, value=False)

#: Time limit for this job.
#:
#: See :attr:`reframe.core.pipeline.RegressionTest.time_limit` for more
#: details.
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
time_limit = variable(type(None), field=fields.TimerField, value=None)

#: Maximum pending time for this job.
#:
#: See :attr:`reframe.core.pipeline.RegressionTest.max_pending_time` for
#: more details.
#:
#: .. note::
#: This attribute is set by the framework just before submitting the job
#: based on the test information.
#:
#: .. versionadded:: 3.11.0
max_pending_time = variable(type(None),
field=fields.TimerField, value=None)

#: Options to be passed to the backend job scheduler.
#: Arbitrary options to be passed to the backend job scheduler.
#:
#: :type: :class:`List[str]`
#: :default: ``[]``
options = fields.TypedField(typ.List[str])
options = variable(typ.List[str], value=[])

#: The (parallel) program launcher that will be used to launch the
#: (parallel) executable of this job.
Expand All @@ -157,7 +263,7 @@ class Job(jsonext.JSONSerializable):
#: self.job.launcher = getlauncher('local')()
#:
#: :type: :class:`reframe.core.launchers.JobLauncher`
launcher = fields.TypedField(JobLauncher)
launcher = variable(JobLauncher)

# The sched_* arguments are exposed also to the frontend
def __init__(self,
Expand All @@ -166,34 +272,20 @@ def __init__(self,
script_filename=None,
stdout=None,
stderr=None,
max_pending_time=None,
sched_flex_alloc_nodes=None,
sched_access=[],
sched_exclusive_access=None,
sched_options=None):

# Mutable fields
self.num_tasks = 1
self.num_tasks_per_node = None
self.num_tasks_per_core = None
self.num_tasks_per_socket = None
self.num_cpus_per_task = None
self.use_smt = None
self.time_limit = None
self.cli_options = list(sched_options) if sched_options else []
self.options = []

self._cli_options = list(sched_options) if sched_options else []
self._name = name
self._workdir = workdir
self._script_filename = script_filename or '%s.sh' % name
self._stdout = stdout or '%s.out' % name
self._stderr = stderr or '%s.err' % name
self._max_pending_time = max_pending_time

# Backend scheduler related information
self._sched_flex_alloc_nodes = sched_flex_alloc_nodes
self._sched_access = sched_access
self._sched_exclusive_access = sched_exclusive_access

# Live job information; to be filled during job's lifetime by the
# scheduler
Expand Down Expand Up @@ -224,8 +316,8 @@ def workdir(self):
return self._workdir

@property
def max_pending_time(self):
return self._max_pending_time
def cli_options(self):
return self._cli_options

@property
def script_filename(self):
Expand All @@ -247,10 +339,6 @@ def sched_flex_alloc_nodes(self):
def sched_access(self):
return self._sched_access

@property
def sched_exclusive_access(self):
return self._sched_exclusive_access

@property
def completion_time(self):
'''The completion time of this job as a floating point number
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def emit_preamble(self, job):
else:
preamble.append(self._prefix + ' ' + opt)

if job.sched_exclusive_access:
if job.exclusive_access:
preamble.append(f'{self._prefix} -x')

# Filter out empty statements before returning
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ def emit_preamble(self, job):
self._format_option('%d:%d:%d' % (h, m, s), '--time={0}')
)

if job.sched_exclusive_access:
if job.exclusive_access:
preamble.append(
self._format_option(job.sched_exclusive_access, '--exclusive')
self._format_option(job.exclusive_access, '--exclusive')
)

if self._use_nodes_opt:
Expand Down
2 changes: 1 addition & 1 deletion unittests/test_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def job(make_job, launcher):
stdout='fake_stdout',
stderr='fake_stderr',
sched_access=access,
sched_exclusive_access='fake_exclude_access',
sched_options=['--fake'])
job.num_tasks = 4
job.num_tasks_per_node = 2
Expand All @@ -85,6 +84,7 @@ def job(make_job, launcher):
job.num_cpus_per_task = 2
job.use_smt = True
job.time_limit = '10m'
job.exclusive_access = True
job.options = ['--gres=gpu:4', '#DW jobdw anything']
job.launcher.options = ['--foo']
return job
Expand Down
11 changes: 6 additions & 5 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ def minimal_job(make_job):

@pytest.fixture
def fake_job(make_job):
ret = make_job(sched_exclusive_access=True,
sched_options=['--account=spam'])
ret = make_job(sched_options=['--account=spam'])
ret.time_limit = '5m'
ret.num_tasks = 16
ret.num_tasks_per_node = 2
ret.num_tasks_per_core = 1
ret.num_tasks_per_socket = 1
ret.num_cpus_per_task = 18
ret.use_smt = True
ret.exclusive_access = True
ret.options += ['--gres=gpu:4',
'#DW jobdw capacity=100GB',
'#DW stage_in source=/foo']
Expand Down Expand Up @@ -332,7 +332,8 @@ def test_prepare_minimal(minimal_job):


def test_prepare_no_exclusive(make_job, slurm_only):
job = make_job(sched_exclusive_access=False)
job = make_job()
job.exclusive_access = False
prepare_job(job)
with open(job.script_filename) as fp:
assert re.search(r'--exclusive', fp.read()) is None
Expand Down Expand Up @@ -566,8 +567,8 @@ def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
pytest.skip(f"max_pending_time not supported by the "
f"'{scheduler.registered_name}' scheduler")

minimal_job = make_job(sched_access=exec_ctx.access,
max_pending_time=0.05)
minimal_job = make_job(sched_access=exec_ctx.access)
minimal_job.max_pending_time = 0.05

# Monkey-patch the Job's state property to pretend that the job is always
# pending
Expand Down