From 330c7b4b2616d04e6787d868cdf10de677fde983 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 20 Nov 2020 08:43:15 +0100 Subject: [PATCH 1/5] Align implementations of the PBS and the Torque scheduler regarding the use of `qstat` --- reframe/core/schedulers/pbs.py | 117 ++++++++++++++++++++++++++---- reframe/core/schedulers/torque.py | 109 ---------------------------- unittests/test_schedulers.py | 2 +- 3 files changed, 103 insertions(+), 125 deletions(-) diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index a44097d1c2..afaedd642b 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -40,6 +40,18 @@ _run_strict = functools.partial(osext.run_command, check=True) +JOB_STATES = { + 'Q': 'QUEUED', + 'H': 'HELD', + 'R': 'RUNNING', + 'E': 'EXITING', + 'T': 'MOVED', + 'W': 'WAITING', + 'S': 'SUSPENDED', + 'C': 'COMPLETED', +} + + class _PbsJob(sched.Job): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -156,24 +168,99 @@ def finished(self, job): return job.completed - def _poll_job(self, job): - if job is None: + def _update_nodelist(self, job, nodespec): + if job.nodelist is not None: return - with osext.change_dir(job.workdir): - output_ready = (os.path.exists(job.stdout) and - os.path.exists(job.stderr)) + job._nodelist = [x.split('/')[0] for x in nodespec.split('+')] + job._nodelist.sort() - done = job.cancelled or output_ready - if done: - t_now = time.time() - if job.completion_time is None: - job._completion_time = t_now + def poll(self, *jobs): + if jobs: + # Filter out non-jobs + jobs = [job for job in jobs if job is not None] - time_from_finish = t_now - job.completion_time - if time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT: - job._completed = True + if not jobs: + return + + completed = osext.run_command( + f'qstat -f {" ".join(job.jobid for job in jobs)}' + ) + + # Depending on the configuration, completed jobs will remain on the job + # list for a limited time, or be removed upon completion. + # If qstat cannot find any of the job IDs, it will return 153. + # Otherwise, it will return with return code 0 and print information + # only for the jobs it could find. + if completed.returncode == 153: + self.log('Return code is 153: jobids not known by scheduler, ' + 'assuming all jobs completed') + for job in jobs: + job._state = 'COMPLETED' + + return + + if completed.returncode != 0: + raise JobSchedulerError( + f'qstat failed with exit code {completed.returncode} ' + f'(standard error follows):\n{completed.stderr}' + ) + + # Store information for each job separately + jobinfo = {} + for job_raw_info in completed.stdout.split('\n\n'): + jobid_match = re.search( + r'^Job Id:\s*(?P\S+)', job_raw_info, re.MULTILINE + ) + if jobid_match: + jobid = jobid_match.group('jobid') + jobinfo[jobid] = job_raw_info - def poll(self, *jobs): for job in jobs: - self._poll_job(job) + if job.jobid not in jobinfo: + self.log(f'Job {job.jobid} not known to scheduler, ' + f'assuming job completed') + job._state = 'COMPLETED' + job._completed = True + continue + + info = jobinfo[job.jobid] + state_match = re.search( + r'^\s*job_state = (?P[A-Z])', info, re.MULTILINE + ) + if not state_match: + self.log(f'Job state not found (job info follows):\n{info}') + continue + + state = state_match.group('state') + job._state = JOB_STATES[state] + nodelist_match = re.search( + r'exec_host = (?P[\S\t\n]+)', + info, re.MULTILINE + ) + if nodelist_match: + nodespec = nodelist_match.group('nodespec') + nodespec = re.sub(r'[\n\t]*', '', nodespec) + self._update_nodelist(job, nodespec) + + if job.state == 'COMPLETED': + exitcode_match = re.search( + r'^\s*exit_status = (?P\d+)', + info, re.MULTILINE, + ) + if exitcode_match: + job._exitcode = int(exitcode_match.group('code')) + + # We report a job as finished only when its stdout/stderr are + # written back to the working directory + stdout = os.path.join(job.workdir, job.stdout) + stderr = os.path.join(job.workdir, job.stderr) + out_ready = os.path.exists(stdout) and os.path.exists(stderr) + done = job.cancelled or out_ready + if done: + job._completed = True + elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and + job.max_pending_time): + if (time.time() - job.submit_time >= job.max_pending_time): + self.cancel(job) + job._exception = JobError('maximum pending time exceeded') diff --git a/reframe/core/schedulers/torque.py b/reframe/core/schedulers/torque.py index 995a6185a8..71b04b144a 100644 --- a/reframe/core/schedulers/torque.py +++ b/reframe/core/schedulers/torque.py @@ -18,115 +18,6 @@ from reframe.core.schedulers.pbs import PbsJobScheduler, _run_strict -JOB_STATES = { - 'Q': 'QUEUED', - 'H': 'HELD', - 'R': 'RUNNING', - 'E': 'EXITING', - 'T': 'MOVED', - 'W': 'WAITING', - 'S': 'SUSPENDED', - 'C': 'COMPLETED', -} - - @register_scheduler('torque') class TorqueJobScheduler(PbsJobScheduler): TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}' - - def _update_nodelist(self, job, nodespec): - if job.nodelist is not None: - return - - job._nodelist = [x.split('/')[0] for x in nodespec.split('+')] - job._nodelist.sort() - - def poll(self, *jobs): - if jobs: - # Filter out non-jobs - jobs = [job for job in jobs if job is not None] - - if not jobs: - return - - completed = osext.run_command( - f'qstat -f {" ".join(job.jobid for job in jobs)}' - ) - - # Depending on the configuration, completed jobs will remain on the job - # list for a limited time, or be removed upon completion. - # If qstat cannot find any of the job IDs, it will return 153. - # Otherwise, it will return with return code 0 and print information - # only for the jobs it could find. - if completed.returncode == 153: - self.log('Return code is 153: jobids not known by scheduler, ' - 'assuming all jobs completed') - for job in jobs: - job._state = 'COMPLETED' - - return - - if completed.returncode != 0: - raise JobSchedulerError( - f'qstat failed with exit code {completed.returncode} ' - f'(standard error follows):\n{completed.stderr}' - ) - - # Store information for each job separately - jobinfo = {} - for job_raw_info in completed.stdout.split('\n\n'): - jobid_match = re.search( - r'^Job Id:\s*(?P\S+)', job_raw_info, re.MULTILINE - ) - if jobid_match: - jobid = jobid_match.group('jobid') - jobinfo[jobid] = job_raw_info - - for job in jobs: - if job.jobid not in jobinfo: - self.log(f'Job {job.jobid} not known to scheduler, ' - f'assuming job completed') - job._state = 'COMPLETED' - job._completed = True - continue - - info = jobinfo[job.jobid] - state_match = re.search( - r'^\s*job_state = (?P[A-Z])', info, re.MULTILINE - ) - if not state_match: - self.log(f'Job state not found (job info follows):\n{info}') - continue - - state = state_match.group('state') - job._state = JOB_STATES[state] - nodelist_match = re.search( - r'exec_host = (?P[\S\t\n]+)', - info, re.MULTILINE - ) - if nodelist_match: - nodespec = nodelist_match.group('nodespec') - nodespec = re.sub(r'[\n\t]*', '', nodespec) - self._update_nodelist(job, nodespec) - - if job.state == 'COMPLETED': - exitcode_match = re.search( - r'^\s*exit_status = (?P\d+)', - info, re.MULTILINE, - ) - if exitcode_match: - job._exitcode = int(exitcode_match.group('code')) - - # We report a job as finished only when its stdout/stderr are - # written back to the working directory - stdout = os.path.join(job.workdir, job.stdout) - stderr = os.path.join(job.workdir, job.stderr) - out_ready = os.path.exists(stdout) and os.path.exists(stderr) - done = job.cancelled or out_ready - if done: - job._completed = True - elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and - job.max_pending_time): - if (time.time() - job.submit_time >= job.max_pending_time): - self.cancel(job) - job._exception = JobError('maximum pending time exceeded') diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 23caa7e6ca..4f6f52e06c 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -455,7 +455,7 @@ def test_guess_num_tasks(minimal_job, scheduler): def test_submit_max_pending_time(make_job, exec_ctx, scheduler): - if scheduler.registered_name in ('local', 'pbs'): + if scheduler.registered_name in ('local'): pytest.skip(f"max_pending_time not supported by the " f"'{scheduler.registered_name}' scheduler") From 8009e4cc96cc54fe580b3ec959e9aec163b5bc2c Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Fri, 20 Nov 2020 23:02:53 +0100 Subject: [PATCH 2/5] Move Torque scheduler backend in pbs.py --- reframe/core/schedulers/pbs.py | 5 +++++ reframe/core/schedulers/torque.py | 23 ----------------------- 2 files changed, 5 insertions(+), 23 deletions(-) delete mode 100644 reframe/core/schedulers/torque.py diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index afaedd642b..504f668af6 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -264,3 +264,8 @@ def poll(self, *jobs): if (time.time() - job.submit_time >= job.max_pending_time): self.cancel(job) job._exception = JobError('maximum pending time exceeded') + + +@register_scheduler('torque') +class TorqueJobScheduler(PbsJobScheduler): + TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}' diff --git a/reframe/core/schedulers/torque.py b/reframe/core/schedulers/torque.py deleted file mode 100644 index 71b04b144a..0000000000 --- a/reframe/core/schedulers/torque.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2016-2020 Swiss National Supercomputing Centre (CSCS/ETH Zurich) -# ReFrame Project Developers. See the top-level LICENSE file for details. -# -# SPDX-License-Identifier: BSD-3-Clause -# -# Torque backend -# -# - Initial version submitted by Samuel Moors, Vrije Universiteit Brussel (VUB) -# - -import re -import os -import time - -import reframe.utility.osext as osext -from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError, JobSchedulerError -from reframe.core.schedulers.pbs import PbsJobScheduler, _run_strict - - -@register_scheduler('torque') -class TorqueJobScheduler(PbsJobScheduler): - TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}' From 0bc96f40167644a0cdc2989a8a1d1c4e9983bfad Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Fri, 20 Nov 2020 23:10:10 +0100 Subject: [PATCH 3/5] Fix unit tests --- reframe/core/backends.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/reframe/core/backends.py b/reframe/core/backends.py index 30ec9e90ad..587913d707 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -19,8 +19,7 @@ _scheduler_backend_modules = [ 'reframe.core.schedulers.local', 'reframe.core.schedulers.slurm', - 'reframe.core.schedulers.pbs', - 'reframe.core.schedulers.torque' + 'reframe.core.schedulers.pbs' ] _schedulers = {} From cc71854e40b8cb36083aaa85755af334316ba3aa Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Fri, 20 Nov 2020 23:35:21 +0100 Subject: [PATCH 4/5] Fix unit tests --- reframe/core/schedulers/pbs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index 504f668af6..a5e4abeb30 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -20,7 +20,7 @@ import reframe.utility.osext as osext from reframe.core.backends import register_scheduler from reframe.core.config import settings -from reframe.core.exceptions import JobSchedulerError +from reframe.core.exceptions import (JobError, JobSchedulerError) from reframe.utility import seconds_to_hms From 3c441d55a7daf9db43b8f9a7e9449dbef061337f Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Sat, 21 Nov 2020 00:03:48 +0100 Subject: [PATCH 5/5] Fix PBS unit tests --- unittests/test_schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 4f6f52e06c..7d5ecc710e 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -467,7 +467,7 @@ def test_submit_max_pending_time(make_job, exec_ctx, scheduler): def state(self): if scheduler.registered_name in ('slurm', 'squeue'): return 'PENDING' - elif scheduler.registered_name == 'torque': + elif scheduler.registered_name in ('pbs', 'torque'): return 'QUEUED' else: # This should not happen