Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import itertools
import os
import re
import time
from datetime import datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not needed.


import reframe.core.schedulers as sched
import reframe.utility.os_ext as os_ext
from reframe.core.exceptions import (SpawnedProcessError,
JobBlockedError, JobError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the JobBlockedError here. This is thrown by the SLURM backend when a job is indefinitely blocked due to reasons that require sysadmin intervention to be resolved.

from reframe.core.logging import getlogger
from reframe.core.schedulers.registry import register_scheduler
from reframe.settings import settings


@register_scheduler('pbs')
class PbsJob(sched.Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._prefix = '#PBS'
self._is_cancelling = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this in your implementation. We use this in the SLURM backend to support the automatic cancellation of the job if it's blocked in an unrecoverable state.

# fix for regression tests with compile
if os.path.dirname(self.command) is '.':
self._command = os.path.join(
self.workdir, os.path.basename(self.command))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best way is to emit a cd workdir at the beginning of your job script right after the preamble. Now, you are just fixing the case where your script contains just the executable, but what if it defines commands in pre_run and post_run. The user expects them to run inside the stage directory of the test.


def _emit_job_option(self, var, option, builder):
if var is not None:
if isinstance(var, tuple):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this a bit more generic by testing that var is any sequence type:

import collections.abc

if isinstance(var, collections.abc.Sequence):
    ...

builder.verbatim(self._prefix + ' ' + option.format(*var))
else:
builder.verbatim(self._prefix + ' ' + option.format(var))

def _run_command(self, cmd, timeout=None):
"""Run command cmd and re-raise any exception as a JobError."""
try:
return os_ext.run_command(cmd, check=True, timeout=timeout)
except SpawnedProcessError as e:
raise JobError(jobid=self._jobid) from e

def emit_preamble(self, builder):
self._emit_job_option(self.name, '-N "{0}"', builder)

extra_options = ''
if len(self.options):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simply check with if self.options:.

extra_options = ':' + ':'.join(self.options)

self._emit_job_option((int(self._num_tasks/self._num_tasks_per_node), self._num_tasks_per_node, self._num_tasks_per_node, extra_options),
'-lselect={0}:ncpus={1}:mpiprocs={2}{3}', builder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments here:

  1. This line is too long. Could you fit wrap it at 79 columns?
  2. There is not need to int() to take the integer part of the division. You may simply use // instead of /.
  3. You need not use self._num_tasks_per_node twice. You could simple repeat the corresponding placeholder in your format string: '-lselect={0}:ncpus={1}:mpiprocs={1}{2}'

self._emit_job_option('%d:%d:%d' % self.time_limit,
'-l walltime={0}', builder)

self._emit_job_option(self.sched_partition, '-q {0}', builder)

self._emit_job_option(self.stdout, '-o {0}', builder)
self._emit_job_option(self.stderr, '-e {0}', builder)

def submit(self):
cmd = 'qsub %s' % self.script_filename
completed = self._run_command(cmd, settings.job_submit_timeout)
jobid_match = re.search('^(?P<jobid>\d+)',
completed.stdout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can fit nicely in a single line.

full_jobid_match = re.search('^(?P<fjobid>\d+\.\w+\d*)$',
completed.stdout)

if not jobid_match:
raise JobError(
'could not retrieve the job id of the submitted job')

self._jobid = int(jobid_match.group('jobid'))
self._fulljobid = full_jobid_match.group('fjobid')

def wait(self):
super().wait()
intervals = itertools.cycle(settings.job_poll_intervals)

# check if the stdout file is already there (i.e. job finished)
while not os.path.isfile(self.stdout):
time.sleep(next(intervals))

def cancel(self):
super().cancel()
getlogger().debug('cancelling job (id=%s)' % self._jobid)
self._is_cancelling = True
jobid, server = self._fulljobid.split(".")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use single quotes for strings.

if server == 'pbspro':
self._run_command('qdel %s' % self._fulljobid,
settings.job_submit_timeout)
elif server == 'pbs11':
self._run_command('qdel %s@pbs11' % self._fulljobid,
settings.job_submit_timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rafa-esco I am adapting your PR in order to prepare it for merging. I am using the Torque wrappers of Slurm to test it and I was wondering, why you need this special treatment here? Couldn't qdel JOBID just work?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi!, yep, you are totally right... to be honest this is quite particular to our installation. We have two pbs servers working side by side: pbspro (the default name) and pbs11. So yes, in a normal installation with only one server, a qdel JOBID should work. So removing the if,elif,else and just go with a 'qdel %s' % self._fulljobid will be general enough for anybody.

else:
raise JobError('Did not recognize server', server)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second argument is not gonna be printed. You may first format the message and pass it to JobError. I'd suggest also to use "could not" instead of "did not".


def finished(self):
super().finished()
intervals = itertools.cycle(settings.job_poll_intervals)
if os.path.isfile(self.stdout):
return True
else:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may simply return as return os.path.isfile(self.stdout) here.

1 change: 1 addition & 0 deletions reframe/core/schedulers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ def getscheduler(name):
# Import the schedulers modules to trigger their registration
import reframe.core.schedulers.local
import reframe.core.schedulers.slurm
import reframe.core.schedulers.pbs