Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,14 @@ All handlers accept the following set of attributes (keys) in their configuratio
It can be configured on a per test basis by overriding the :func:`info <reframe.core.pipeline.RegressionTest.info>` method of a specific regression test.
- ``check_jobid``: Prints the job or process id of the job or process associated with the currently executing regression test.
If a job or process is not yet created, ``-1`` will be printed.
- ``check_job_completion_time``: *[new in 2.21]* The completion time of the job spawned by this regression test.
This timestamp will be formatted according to ``datefmt`` (see below).
The accuracy of the timestamp depends on the backend scheduler.
The ``slurm`` scheduler backend relies on job accounting and returns the actual termination time of the job.
The rest of the backends report as completion time the moment when the framework realizes that the spawned job has finished.
In this case, the accuracy depends on the execution policy used.
If tests are executed with the serial execution policy, this is close to the real completion time, but if the asynchronous execution policy is used, it can differ significantly.
If the job completion time cannot be retrieved, ``None`` will be printed.
- ``check_name``: Prints the name of the regression test on behalf of which ReFrame is currently executing.
If ReFrame is not in the context of regression test, ``reframe`` will be printed.
- ``check_num_tasks``: The number of tasks assigned to the regression test.
Expand Down
16 changes: 16 additions & 0 deletions reframe/core/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,22 @@ def emit_load_commands(*environs):
return commands


class temp_environment:
'''Context manager to temporarily change the environment.'''

def __init__(self, modules=[], variables=[]):
self._modules = modules
self._variables = variables

def __enter__(self):
new_env = Environment('_rfm_temp_env', self._modules, self._variables)
self._environ_save, _ = load(new_env)
return new_env

def __exit__(self, exc_type, exc_value, traceback):
self._environ_save.restore()


class ProgEnvironment(Environment):
'''A class representing a programming environment.

Expand Down
8 changes: 8 additions & 0 deletions reframe/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ def __init__(self, logger=None, check=None):
{
'check_name': 'reframe',
'check_jobid': '-1',
'check_job_completion_time': None,
'check_info': 'reframe',
'check_system': None,
'check_partition': None,
Expand Down Expand Up @@ -426,6 +427,13 @@ def _update_check_extras(self):

if self.check.job:
self.extra['check_jobid'] = self.check.job.jobid
if self.check.job.completion_time:
# Use the logging handlers' date format to format
# completion_time
# NOTE: All handlers use the same date format
fmt = self.logger.handlers[0].formatter.datefmt
ct = self.check.job.completion_time.strftime(fmt)
self.extra['check_job_completion_time'] = ct

def log_performance(self, level, tag, value, ref,
low_thres, upper_thres, unit=None, *, msg=None):
Expand Down
19 changes: 17 additions & 2 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import abc
from datetime import datetime

import reframe.core.environments as env
import reframe.core.fields as fields
Expand All @@ -14,6 +15,10 @@


class JobScheduler(abc.ABC):
@abc.abstractmethod
def completion_time(self, job):
pass

@abc.abstractmethod
def emit_preamble(self, job):
pass
Expand Down Expand Up @@ -189,6 +194,7 @@ def __init__(self,
self._script_filename = script_filename or '%s.sh' % name
self._stdout = stdout or '%s.out' % name
self._stderr = stderr or '%s.err' % name
self._completion_time = None

# Backend scheduler related information
self._sched_flex_alloc_nodes = sched_flex_alloc_nodes
Expand Down Expand Up @@ -259,6 +265,10 @@ def sched_account(self):
def sched_exclusive_access(self):
return self._sched_exclusive_access

@property
def completion_time(self):
return self.scheduler.completion_time(self) or self._completion_time

def prepare(self, commands, environs=None, **gen_opts):
environs = environs or []
if self.num_tasks <= 0:
Expand Down Expand Up @@ -321,7 +331,8 @@ def wait(self):
if self.jobid is None:
raise JobNotStartedError('cannot wait an unstarted job')

return self.scheduler.wait(self)
self.scheduler.wait(self)
self._completion_time = self._completion_time or datetime.now()

def cancel(self):
if self.jobid is None:
Expand All @@ -333,7 +344,11 @@ def finished(self):
if self.jobid is None:
raise JobNotStartedError('cannot poll an unstarted job')

return self.scheduler.finished(self)
done = self.scheduler.finished(self)
if done:
self._completion_time = self._completion_time or datetime.now()

return done


class Node(abc.ABC):
Expand Down
3 changes: 3 additions & 0 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __init__(self):
self._f_stdout = None
self._f_stderr = None

def completion_time(self, job):
return None

def submit(self, job):
# `chmod +x' first, because we will execute the script locally
os.chmod(job.script_filename,
Expand Down
3 changes: 3 additions & 0 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def __init__(self):
# Optional part of the job id refering to the PBS server
self._pbs_server = None

def completion_time(self, job):
return None

def _emit_lselect_option(self, job):
num_tasks_per_node = job.num_tasks_per_node or 1
num_cpus_per_task = job.num_cpus_per_task or 1
Expand Down
48 changes: 40 additions & 8 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from contextlib import suppress
from datetime import datetime

import reframe.core.environments as env
import reframe.core.schedulers as sched
import reframe.utility.os_ext as os_ext
from reframe.core.config import settings
Expand Down Expand Up @@ -69,6 +70,13 @@ class SlurmJobScheduler(sched.JobScheduler):
# standard job state polling using sacct.
SACCT_SQUEUE_RATIO = 10

# This matches the format for both normal jobs as well as job arrays.
# For job arrays the job_id has one of the following formats:
# * <job_id>_<array_task_id>
# * <job_id>_[<array_task_id_start>-<array_task_id_end>]
# See (`Job Array Support<https://slurm.schedmd.com/job_array.html`__)
_state_patt = r'\d+(?:_\d+|_\[\d+-\d+\])?'

def __init__(self):
self._prefix = '#SBATCH'

Expand All @@ -88,6 +96,32 @@ def __init__(self):
self._is_cancelling = False
self._is_job_array = None
self._update_state_count = 0
self._completion_time = None

def completion_time(self, job):
if (self._completion_time or
not slurm_state_completed(job.state)):
return self._completion_time

with env.temp_environment(variables={'SLURM_TIME_FORMAT': 'standard'}):
completed = os_ext.run_command(
'sacct -S %s -P -j %s -o jobid,end' %
(datetime.now().strftime('%F'), job.jobid),
log=False
)

state_match = list(re.finditer(
r'^(?P<jobid>%s)\|(?P<end>\S+)' % self._state_patt,
completed.stdout, re.MULTILINE))
if not state_match:
return None

self._completion_time = max(
datetime.strptime(s.group('end'), '%Y-%m-%dT%H:%M:%S')
for s in state_match
)

return self._completion_time

def _format_option(self, var, option):
if var is not None:
Expand Down Expand Up @@ -296,14 +330,9 @@ def _update_state(self, job):
)
self._update_state_count += 1

# This matches the format for both normal jobs as well as job arrays.
# For job arrays the job_id has one of the following formats:
# * <job_id>_<array_task_id>
# * <job_id>_[<array_task_id_start>-<array_task_id_end>]
# See (`Job Array Support<https://slurm.schedmd.com/job_array.html`__)
state_match = list(re.finditer(
r'^(?P<jobid>\d+(?:_\d+|_\[\d+-\d+\])?)\|(?P<state>\S+)([^\|]*)\|'
r'(?P<exitcode>\d+)\:(?P<signal>\d+)\|(?P<nodespec>.*)',
r'^(?P<jobid>%s)\|(?P<state>\S+)([^\|]*)\|(?P<exitcode>\d+)\:'
r'(?P<signal>\d+)\|(?P<nodespec>.*)' % self._state_patt,
completed.stdout, re.MULTILINE))
if not state_match:
getlogger().debug('job state not matched (stdout follows)\n%s' %
Expand All @@ -312,7 +341,7 @@ def _update_state(self, job):

# Join the states with ',' in case of job arrays
job.state = ','.join(s.group('state') for s in state_match)
if not self._update_state_count % SlurmJobScheduler.SACCT_SQUEUE_RATIO:
if not self._update_state_count % self.SACCT_SQUEUE_RATIO:
self._cancel_if_blocked(job)

if slurm_state_completed(job.state):
Expand Down Expand Up @@ -438,6 +467,9 @@ def __init__(self):
self._squeue_delay = 2
self._cancelled = False

def completion_time(self, job):
return None

def submit(self, job):
super().submit(job)
self._submit_time = datetime.now()
Expand Down
11 changes: 7 additions & 4 deletions reframe/utility/os_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
SpawnedProcessTimeout)


def run_command(cmd, check=False, timeout=None, shell=False):
def run_command(cmd, check=False, timeout=None, shell=False, log=True):
try:
proc = run_command_async(cmd, shell=shell, start_new_session=True)
proc = run_command_async(cmd, shell=shell, start_new_session=True,
log=log)
proc_stdout, proc_stderr = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired as e:
os.killpg(proc.pid, signal.SIGKILL)
Expand Down Expand Up @@ -65,11 +66,13 @@ def run_command_async(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
log=True,
**popen_args):
# Import logger here to avoid unnecessary circular dependencies
from reframe.core.logging import getlogger
if log:
from reframe.core.logging import getlogger
getlogger().debug('executing OS command: ' + cmd)

getlogger().debug('executing OS command: ' + cmd)
if not shell:
cmd = shlex.split(cmd)

Expand Down
10 changes: 10 additions & 0 deletions unittests/test_environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def test_load_restore(self):

assert not self.environ.is_loaded

@fixtures.switch_to_user_runtime
def test_temp_environment(self):
self.setup_modules_system()
with env.temp_environment(
['testmod_foo'], {'_var0': 'val2', '_var3': 'val3'}
) as environ:
assert environ.is_loaded

assert not environ.is_loaded

@fixtures.switch_to_user_runtime
def test_load_already_present(self):
self.setup_modules_system()
Expand Down
4 changes: 4 additions & 0 deletions unittests/test_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@


class FakeJobScheduler(JobScheduler):
@property
def completion_time(self, job):
pass

def emit_preamble(self, job):
pass

Expand Down