Skip to content
32 changes: 30 additions & 2 deletions docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ System Partition Configuration
The job scheduler that will be used to launch jobs on this partition.
Supported schedulers are the following:

- ``local``: Jobs will be launched locally without using any job scheduler.
- ``flux``: Jobs will be launched using the `Flux Framework <https://flux-framework.org/>`_ scheduler.
- ``local``: Jobs will be launched locally without using any job scheduler.
- ``lsf``: Jobs will be launched using the `LSF <https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=lsf-session-scheduler>`__ scheduler.
- ``oar``: Jobs will be launched using the `OAR <https://oar.imag.fr/>`__ scheduler.
- ``pbs``: Jobs will be launched using the `PBS Pro <https://en.wikipedia.org/wiki/Portable_Batch_System>`__ scheduler.
- ``sge``: Jobs will be launched using the `Sun Grid Engine <https://arc.liv.ac.uk/SGE/htmlman/manuals.html>`__ scheduler.
Expand All @@ -270,8 +271,24 @@ System Partition Configuration
If not, you should consider using the ``squeue`` backend below.
- ``squeue``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.
- ``ssh``: Jobs will be launched on a remote host using SSH.

The remote host will be selected from the list of hosts specified in :attr:`~systems.partitions.sched_options.ssh_hosts`.
The scheduler keeps track of the hosts that it has submitted jobs to, and it will select the next available one in a round-robin fashion.
For connecting to a remote host, the options specified in :attr:`~systems.partitions.access` will be used.

When a job is submitted with this scheduler, its stage directory will be copied over to a unique temporary directory on the remote host, then the job will be executed and, finally, any produced artifacts will be copied back.

The contents of the stage directory are copied to the remote host either using ``rsync``, if available, or ``scp`` as a second choice.
The same :attr:`~systems.partitions.access` options will be used in those operations as well.
Please note, that the connection options of ``ssh`` and ``scp`` differ and ReFrame will not attempt to translate any options between the two utilities in case ``scp`` is selected for copying to the remote host.
In this case, it is preferable to set up the host connection options in ``~/.ssh/config`` and leave :attr:`~systems.partition.access` blank.

Job-scheduler command line options can be used to interact with the ``ssh`` backend.
More specifically, if the :option:`--distribute` option is used, a test will be generated for each host listed in :attr:`~systems.partitions.sched_options.ssh_hosts`.
You can also pin a test to a specific host if you pass the ``#host`` directive to the :option:`-J` option, e.g., ``-J '#host=myhost'``.

- ``torque``: Jobs will be launched using the `Torque <https://en.wikipedia.org/wiki/TORQUE>`__ scheduler.
- ``lsf``: Jobs will be launched using the `LSF <https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=lsf-session-scheduler>`__ scheduler.

.. versionadded:: 3.7.2
Support for the SGE scheduler is added.
Expand All @@ -282,6 +299,9 @@ System Partition Configuration
.. versionadded:: 3.11.0
Support for the LSF scheduler is added.

.. versionadded:: 4.4
The ``ssh`` scheduler is added.

.. note::

The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific.
Expand Down Expand Up @@ -337,6 +357,14 @@ System Partition Configuration
.. warning::
This option is broken in 4.0.

.. py:attribute:: systems.partitions.sched_options.ssh_hosts

:required: No
:default: ``[]``

List of hosts in a partition that uses the ``ssh`` scheduler.


.. py:attribute:: systems.partitions.sched_options.ignore_reqnodenotavail

:required: No
Expand Down
3 changes: 2 additions & 1 deletion reframe/core/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.oar',
'reframe.core.schedulers.sge',
'reframe.core.schedulers.slurm'
'reframe.core.schedulers.slurm',
'reframe.core.schedulers.ssh'
]
_schedulers = {}

Expand Down
12 changes: 12 additions & 0 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,15 @@ def in_state(self, state):
:returns: :class:`True` if the nodes's state matches the given one,
:class:`False` otherwise.
'''


class AlwaysIdleNode(Node):
def __init__(self, name):
self._name = name

@property
def name(self):
return self._name

def in_state(self, state):
return state.casefold() == 'idle'
6 changes: 1 addition & 5 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
import reframe.core.schedulers as sched
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import JobError, ReframeError


class _TimeoutExpired(ReframeError):
pass
from reframe.core.exceptions import JobError


class _LocalJob(sched.Job):
Expand Down
231 changes: 231 additions & 0 deletions reframe/core/schedulers/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import os
import time

import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
from reframe.core.exceptions import ConfigError, SpawnedProcessError
from reframe.core.schedulers import Job, JobScheduler, AlwaysIdleNode


class _SSHJob(Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._localdir = None
self._remotedir = None
self._host = None
self._ssh_options = []

# Async processes spawned for this job
self.steps = {}

@property
def localdir(self):
return self._localdir

@property
def remotedir(self):
return self._remotedir

@property
def host(self):
return self._host

@property
def ssh_options(self):
return self._ssh_options


@register_scheduler('ssh')
class SSHJobScheduler(JobScheduler):
def __init__(self, *, hosts=None):
self._free_hosts = set(hosts or self.get_option('ssh_hosts'))
self._allocated_hosts = set()
if not self._free_hosts:
raise ConfigError(f'no hosts specified for the SSH scheduler: '
f'{self._config_prefix}')

# Determine if rsync is available
try:
osext.run_command('rsync --version', check=True)
except (FileNotFoundError, SpawnedProcessError):
self._has_rsync = False
else:
self._has_rsync = True

def _reserve_host(self, host=None):
pool = self._free_hosts if self._free_hosts else self._allocated_hosts
if host:
pool.discard(host)
self._allocated_hosts.add(host)
return host

host = pool.pop()
self._allocated_hosts.add(host)
return host

def make_job(self, *args, **kwargs):
return _SSHJob(*args, **kwargs)

def emit_preamble(self, job):
return []

def _push_artefacts(self, job):
assert isinstance(job, _SSHJob)
options = ' '.join(job.ssh_options)

# Create a temporary directory on the remote host and push the job
# artifacts
completed = osext.run_command(
f'ssh -o BatchMode=yes {options} {job.host} '
f'mktemp -td rfm.XXXXXXXX', check=True
)
remotedir = completed.stdout.strip()

# Store the local and remote dirs
job._localdir = os.getcwd()
job._remotedir = remotedir

if self._has_rsync:
job.steps['push'] = osext.run_command_async2(
f'rsync -az -e "ssh -o BatchMode=yes {options}" '
f'{job.localdir}/ {job.host}:{remotedir}/', check=True
)
else:
job.steps['push'] = osext.run_command_async2(
f'scp -r -o BatchMode=yes {options} '
f'{job.localdir}/* {job.host}:{remotedir}/',
shell=True, check=True
)

def _pull_artefacts(self, job):
assert isinstance(job, _SSHJob)
options = ' '.join(job.ssh_options)
if self._has_rsync:
job.steps['pull'] = osext.run_command_async2(
f'rsync -az -e "ssh -o BatchMode=yes {options}" '
f'{job.host}:{job.remotedir}/ {job.localdir}/'
)
else:
job.steps['pull'] = osext.run_command_async2(
f"scp -r -o BatchMode=yes {options} "
f"'{job.host}:{job.remotedir}/*' {job.localdir}/", shell=True
)

def _do_submit(self, job):
# Modify the spawn command and submit
options = ' '.join(job.ssh_options)
job.steps['exec'] = osext.run_command_async2(
f'ssh -o BatchMode=yes {options} {job.host} '
f'"cd {job.remotedir} && bash -l {job.script_filename}"'
)

def submit(self, job):
assert isinstance(job, _SSHJob)

# Check if `#host` pseudo-option is specified and use this as a host,
# stripping it off the rest of the options
host = None
stripped_opts = []
options = job.sched_access + job.options + job.cli_options
for opt in options:
if opt.startswith('#host='):
_, host = opt.split('=', maxsplit=1)
else:
stripped_opts.append(opt)

# Host is pinned externally (`--distribute` option)
if job.pin_nodes:
host = job.pin_nodes[0]

job._submit_time = time.time()
job._ssh_options = stripped_opts
job._host = self._reserve_host(host)

self._push_artefacts(job)
self._do_submit(job)
self._pull_artefacts(job)

def success(proc):
return proc.exitcode == 0

job.steps['push'].then(
job.steps['exec'],
when=success
).then(
job.steps['pull'],
when=success
)
job.steps['push'].start()
job._jobid = job.steps['push'].pid

def wait(self, job):
for step in job.steps.values():
if step.started():
step.wait()

def cancel(self, job):
for step in job.steps.values():
if step.started():
step.cancel()

def finished(self, job):
if job.exception:
raise job.exception

return job.state is not None

def poll(self, *jobs):
for job in jobs:
self._poll_job(job)

def _poll_job(self, job):
last_done = None
last_failed = None
for proc_kind, proc in job.steps.items():
if proc.started() and proc.done():
last_done = proc_kind
if proc.exitcode != 0:
last_failed = proc_kind
break

if last_failed is None and last_done != 'pull':
return False

# Either all processes were done or one failed
# Update the job info
last_proc = job.steps[last_done]
job._exitcode = last_proc.exitcode
job._exception = last_proc.exception()
job._signal = last_proc.signal
if job._exitcode == 0:
job._state = 'SUCCESS'
else:
job._state = 'FAILURE'

exec_proc = job.steps['exec']
if exec_proc.started():
with osext.change_dir(job.localdir):
with open(job.stdout, 'w+') as fout:
fout.write(exec_proc.stdout().read())

with open(job.stderr, 'w+') as ferr:
ferr.write(exec_proc.stderr().read())

return True

def allnodes(self):
return [AlwaysIdleNode(h) for h in self._free_hosts]

def filternodes(self, job, nodes):
options = job.sched_access + job.options + job.cli_options
for opt in options:
if opt.startswith('#host='):
_, host = opt.split('=', maxsplit=1)
return [AlwaysIdleNode(host)]
else:
return [AlwaysIdleNode(h) for h in self._free_hosts]
5 changes: 5 additions & 0 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@
"sched_options": {
"type": "object",
"properties": {
"hosts": {
"type": "array",
"items": {"type": "string"}
},
"ignore_reqnodenotavail": {"type": "boolean"},
"job_submit_timeout": {"type": "number"},
"resubmit_on_errors": {
Expand Down Expand Up @@ -615,6 +619,7 @@
"systems/partitions/time_limit": null,
"systems/partitions/devices": [],
"systems/partitions/extras": {},
"systems*/sched_options/ssh_hosts": [],
"systems*/sched_options/ignore_reqnodenotavail": false,
"systems*/sched_options/job_submit_timeout": 60,
"systems*/sched_options/resubmit_on_errors": [],
Expand Down
Loading