Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
_scheduler_backend_modules = [
'reframe.core.schedulers.local',
'reframe.core.schedulers.slurm',
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.torque'
'reframe.core.schedulers.pbs'
]
_schedulers = {}

Expand Down
124 changes: 108 additions & 16 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.config import settings
from reframe.core.exceptions import JobSchedulerError
from reframe.core.exceptions import (JobError, JobSchedulerError)
from reframe.utility import seconds_to_hms


Expand All @@ -40,6 +40,18 @@
_run_strict = functools.partial(osext.run_command, check=True)


JOB_STATES = {
'Q': 'QUEUED',
'H': 'HELD',
'R': 'RUNNING',
'E': 'EXITING',
'T': 'MOVED',
'W': 'WAITING',
'S': 'SUSPENDED',
'C': 'COMPLETED',
}


class _PbsJob(sched.Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -156,24 +168,104 @@ def finished(self, job):

return job.completed

def _poll_job(self, job):
if job is None:
def _update_nodelist(self, job, nodespec):
if job.nodelist is not None:
return

with osext.change_dir(job.workdir):
output_ready = (os.path.exists(job.stdout) and
os.path.exists(job.stderr))
job._nodelist = [x.split('/')[0] for x in nodespec.split('+')]
job._nodelist.sort()

done = job.cancelled or output_ready
if done:
t_now = time.time()
if job.completion_time is None:
job._completion_time = t_now
def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]

time_from_finish = t_now - job.completion_time
if time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT:
job._completed = True
if not jobs:
return

completed = osext.run_command(
f'qstat -f {" ".join(job.jobid for job in jobs)}'
)

# Depending on the configuration, completed jobs will remain on the job
# list for a limited time, or be removed upon completion.
# If qstat cannot find any of the job IDs, it will return 153.
# Otherwise, it will return with return code 0 and print information
# only for the jobs it could find.
if completed.returncode == 153:
self.log('Return code is 153: jobids not known by scheduler, '
'assuming all jobs completed')
for job in jobs:
job._state = 'COMPLETED'

return

if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
f'(standard error follows):\n{completed.stderr}'
)

# Store information for each job separately
jobinfo = {}
for job_raw_info in completed.stdout.split('\n\n'):
jobid_match = re.search(
r'^Job Id:\s*(?P<jobid>\S+)', job_raw_info, re.MULTILINE
)
if jobid_match:
jobid = jobid_match.group('jobid')
jobinfo[jobid] = job_raw_info

def poll(self, *jobs):
for job in jobs:
self._poll_job(job)
if job.jobid not in jobinfo:
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'COMPLETED'
job._completed = True
continue

info = jobinfo[job.jobid]
state_match = re.search(
r'^\s*job_state = (?P<state>[A-Z])', info, re.MULTILINE
)
if not state_match:
self.log(f'Job state not found (job info follows):\n{info}')
continue

state = state_match.group('state')
job._state = JOB_STATES[state]
nodelist_match = re.search(
r'exec_host = (?P<nodespec>[\S\t\n]+)',
info, re.MULTILINE
)
if nodelist_match:
nodespec = nodelist_match.group('nodespec')
nodespec = re.sub(r'[\n\t]*', '', nodespec)
self._update_nodelist(job, nodespec)

if job.state == 'COMPLETED':
exitcode_match = re.search(
r'^\s*exit_status = (?P<code>\d+)',
info, re.MULTILINE,
)
if exitcode_match:
job._exitcode = int(exitcode_match.group('code'))

# We report a job as finished only when its stdout/stderr are
# written back to the working directory
stdout = os.path.join(job.workdir, job.stdout)
stderr = os.path.join(job.workdir, job.stderr)
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
done = job.cancelled or out_ready
if done:
job._completed = True
elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and
job.max_pending_time):
if (time.time() - job.submit_time >= job.max_pending_time):
self.cancel(job)
job._exception = JobError('maximum pending time exceeded')


@register_scheduler('torque')
class TorqueJobScheduler(PbsJobScheduler):
TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}'
132 changes: 0 additions & 132 deletions reframe/core/schedulers/torque.py

This file was deleted.

4 changes: 2 additions & 2 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def test_guess_num_tasks(minimal_job, scheduler):


def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
if scheduler.registered_name in ('local', 'pbs'):
if scheduler.registered_name in ('local'):
pytest.skip(f"max_pending_time not supported by the "
f"'{scheduler.registered_name}' scheduler")

Expand All @@ -467,7 +467,7 @@ def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
def state(self):
if scheduler.registered_name in ('slurm', 'squeue'):
return 'PENDING'
elif scheduler.registered_name == 'torque':
elif scheduler.registered_name in ('pbs', 'torque'):
return 'QUEUED'
else:
# This should not happen
Expand Down