diff --git a/reframe/core/launchers/mpi.py b/reframe/core/launchers/mpi.py index 21d29ef696..633a1c0561 100644 --- a/reframe/core/launchers/mpi.py +++ b/reframe/core/launchers/mpi.py @@ -110,7 +110,7 @@ def command(self, job): if job.num_cpus_per_task: ret += ['--cpus-per-task=%s' % str(job.num_cpus_per_task)] - if job.sched_exclusive_access: + if job.exclusive_access: ret += ['--exclusive'] if job.use_smt is not None: diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 094fe86ca5..828248dbc8 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1514,9 +1514,7 @@ def _setup_job(self, name, force_local=False, **job_opts): launcher, name=name, workdir=self._stagedir, - max_pending_time=self.max_pending_time, sched_access=self._current_partition.access, - sched_exclusive_access=self.exclusive_access, **job_opts) def _setup_perf_logging(self): @@ -1768,6 +1766,8 @@ def run(self): self.job.time_limit = (self.time_limit or rt.runtime().get_option( f'systems/0/partitions/@{self.current_partition.name}/time_limit') ) + self.job.max_pending_time = self.max_pending_time + self.job.exclusive_access = self.exclusive_access exec_cmd = [self.job.launcher.run_command(self.job), self.executable, *self.executable_opts] diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index c8a7530c52..3b58cc384d 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -18,6 +18,11 @@ from reframe.core.exceptions import JobError, JobNotStartedError from reframe.core.launchers import JobLauncher from reframe.core.logging import getlogger, DEBUG2 +from reframe.core.meta import RegressionTestMeta + + +class JobMeta(RegressionTestMeta, abc.ABCMeta): + '''Job metaclass.''' class JobScheduler(abc.ABC): @@ -112,7 +117,7 @@ def log(self, message, level=DEBUG2): getlogger().log(level, f'[S] {self.registered_name}: {message}') -class Job(jsonext.JSONSerializable): +class Job(jsonext.JSONSerializable, metaclass=JobMeta): '''A job descriptor. A job descriptor is created by the framework after the "setup" phase and @@ -123,19 +128,120 @@ class Job(jsonext.JSONSerializable): ''' - num_tasks = fields.TypedField(int) - num_tasks_per_node = fields.TypedField(int, type(None)) - num_tasks_per_core = fields.TypedField(int, type(None)) - num_tasks_per_socket = fields.TypedField(int, type(None)) - num_cpus_per_task = fields.TypedField(int, type(None)) - use_smt = fields.TypedField(bool, type(None)) - time_limit = fields.TimerField(type(None)) + #: Number of tasks for this job. + #: + #: :type: integral + #: :default: ``1`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + num_tasks = variable(int, value=1) + + #: Number of tasks per node for this job. + #: + #: :type: integral or :class:`NoneType` + #: :default: ``None`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + num_tasks_per_node = variable(int, type(None), value=None) + + #: Number of tasks per core for this job. + #: + #: :type: integral or :class:`NoneType` + #: :default: ``None`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + num_tasks_per_core = variable(int, type(None), value=None) + + #: Number of tasks per socket for this job. + #: + #: :type: integral or :class:`NoneType` + #: :default: ``None`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + num_tasks_per_socket = variable(int, type(None), value=None) + + #: Number of processing elements associated with each task for this job. + #: + #: :type: integral or :class:`NoneType` + #: :default: ``None`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + num_cpus_per_task = variable(int, type(None), value=None) + + #: Enable SMT for this job. + #: + #: :type: :class:`bool` or :class:`NoneType` + #: :default: ``None`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + use_smt = variable(bool, type(None), value=None) + + #: Request exclusive access on the nodes for this job. + #: + #: :type: :class:`bool` + #: :default: ``false`` + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + exclusive_access = variable(bool, value=False) + + #: Time limit for this job. + #: + #: See :attr:`reframe.core.pipeline.RegressionTest.time_limit` for more + #: details. + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + time_limit = variable(type(None), field=fields.TimerField, value=None) + + #: Maximum pending time for this job. + #: + #: See :attr:`reframe.core.pipeline.RegressionTest.max_pending_time` for + #: more details. + #: + #: .. note:: + #: This attribute is set by the framework just before submitting the job + #: based on the test information. + #: + #: .. versionadded:: 3.11.0 + max_pending_time = variable(type(None), + field=fields.TimerField, value=None) - #: Options to be passed to the backend job scheduler. + #: Arbitrary options to be passed to the backend job scheduler. #: #: :type: :class:`List[str]` #: :default: ``[]`` - options = fields.TypedField(typ.List[str]) + options = variable(typ.List[str], value=[]) #: The (parallel) program launcher that will be used to launch the #: (parallel) executable of this job. @@ -157,7 +263,7 @@ class Job(jsonext.JSONSerializable): #: self.job.launcher = getlauncher('local')() #: #: :type: :class:`reframe.core.launchers.JobLauncher` - launcher = fields.TypedField(JobLauncher) + launcher = variable(JobLauncher) # The sched_* arguments are exposed also to the frontend def __init__(self, @@ -166,34 +272,20 @@ def __init__(self, script_filename=None, stdout=None, stderr=None, - max_pending_time=None, sched_flex_alloc_nodes=None, sched_access=[], - sched_exclusive_access=None, sched_options=None): - # Mutable fields - self.num_tasks = 1 - self.num_tasks_per_node = None - self.num_tasks_per_core = None - self.num_tasks_per_socket = None - self.num_cpus_per_task = None - self.use_smt = None - self.time_limit = None - self.cli_options = list(sched_options) if sched_options else [] - self.options = [] - + self._cli_options = list(sched_options) if sched_options else [] self._name = name self._workdir = workdir self._script_filename = script_filename or '%s.sh' % name self._stdout = stdout or '%s.out' % name self._stderr = stderr or '%s.err' % name - self._max_pending_time = max_pending_time # Backend scheduler related information self._sched_flex_alloc_nodes = sched_flex_alloc_nodes self._sched_access = sched_access - self._sched_exclusive_access = sched_exclusive_access # Live job information; to be filled during job's lifetime by the # scheduler @@ -224,8 +316,8 @@ def workdir(self): return self._workdir @property - def max_pending_time(self): - return self._max_pending_time + def cli_options(self): + return self._cli_options @property def script_filename(self): @@ -247,10 +339,6 @@ def sched_flex_alloc_nodes(self): def sched_access(self): return self._sched_access - @property - def sched_exclusive_access(self): - return self._sched_exclusive_access - @property def completion_time(self): '''The completion time of this job as a floating point number diff --git a/reframe/core/schedulers/lsf.py b/reframe/core/schedulers/lsf.py index d12e0d46cd..10d42ff93b 100644 --- a/reframe/core/schedulers/lsf.py +++ b/reframe/core/schedulers/lsf.py @@ -71,7 +71,7 @@ def emit_preamble(self, job): else: preamble.append(self._prefix + ' ' + opt) - if job.sched_exclusive_access: + if job.exclusive_access: preamble.append(f'{self._prefix} -x') # Filter out empty statements before returning diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 01ebf1eb8c..3ac3deac7f 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -178,9 +178,9 @@ def emit_preamble(self, job): self._format_option('%d:%d:%d' % (h, m, s), '--time={0}') ) - if job.sched_exclusive_access: + if job.exclusive_access: preamble.append( - self._format_option(job.sched_exclusive_access, '--exclusive') + self._format_option(job.exclusive_access, '--exclusive') ) if self._use_nodes_opt: diff --git a/unittests/test_launchers.py b/unittests/test_launchers.py index a913b43bf6..66cd975317 100644 --- a/unittests/test_launchers.py +++ b/unittests/test_launchers.py @@ -76,7 +76,6 @@ def job(make_job, launcher): stdout='fake_stdout', stderr='fake_stderr', sched_access=access, - sched_exclusive_access='fake_exclude_access', sched_options=['--fake']) job.num_tasks = 4 job.num_tasks_per_node = 2 @@ -85,6 +84,7 @@ def job(make_job, launcher): job.num_cpus_per_task = 2 job.use_smt = True job.time_limit = '10m' + job.exclusive_access = True job.options = ['--gres=gpu:4', '#DW jobdw anything'] job.launcher.options = ['--foo'] return job diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 6e25466722..a5b1ae161e 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -90,8 +90,7 @@ def minimal_job(make_job): @pytest.fixture def fake_job(make_job): - ret = make_job(sched_exclusive_access=True, - sched_options=['--account=spam']) + ret = make_job(sched_options=['--account=spam']) ret.time_limit = '5m' ret.num_tasks = 16 ret.num_tasks_per_node = 2 @@ -99,6 +98,7 @@ def fake_job(make_job): ret.num_tasks_per_socket = 1 ret.num_cpus_per_task = 18 ret.use_smt = True + ret.exclusive_access = True ret.options += ['--gres=gpu:4', '#DW jobdw capacity=100GB', '#DW stage_in source=/foo'] @@ -332,7 +332,8 @@ def test_prepare_minimal(minimal_job): def test_prepare_no_exclusive(make_job, slurm_only): - job = make_job(sched_exclusive_access=False) + job = make_job() + job.exclusive_access = False prepare_job(job) with open(job.script_filename) as fp: assert re.search(r'--exclusive', fp.read()) is None @@ -566,8 +567,8 @@ def test_submit_max_pending_time(make_job, exec_ctx, scheduler): pytest.skip(f"max_pending_time not supported by the " f"'{scheduler.registered_name}' scheduler") - minimal_job = make_job(sched_access=exec_ctx.access, - max_pending_time=0.05) + minimal_job = make_job(sched_access=exec_ctx.access) + minimal_job.max_pending_time = 0.05 # Monkey-patch the Job's state property to pretend that the job is always # pending