From aa8605acd9833ea4205b4401e8a4a9df89d2d3a7 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 12 Jul 2023 19:03:01 +0300 Subject: [PATCH 01/11] Add an SSH scheduler for running remote jobs The following are added: - Implementation of a future that wraps a spawned process - A new scheduler that can spawn reframe jobs on a remote machine accessed with SSH. --- reframe/core/backends.py | 3 +- reframe/core/schedulers/__init__.py | 11 ++ reframe/core/schedulers/local.py | 9 +- reframe/core/schedulers/ssh.py | 219 ++++++++++++++++++++++++++++ reframe/schemas/config.json | 7 +- reframe/utility/osext.py | 149 ++++++++++++++++++- unittests/test_utility.py | 176 ++++++++++++++++++++++ 7 files changed, 565 insertions(+), 9 deletions(-) create mode 100644 reframe/core/schedulers/ssh.py diff --git a/reframe/core/backends.py b/reframe/core/backends.py index a8720c2ce7..6a2924bc87 100644 --- a/reframe/core/backends.py +++ b/reframe/core/backends.py @@ -23,7 +23,8 @@ 'reframe.core.schedulers.pbs', 'reframe.core.schedulers.oar', 'reframe.core.schedulers.sge', - 'reframe.core.schedulers.slurm' + 'reframe.core.schedulers.slurm', + 'reframe.core.schedulers.ssh' ] _schedulers = {} diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index e21e33e56e..781bb031bb 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -627,3 +627,14 @@ def in_state(self, state): :returns: :class:`True` if the nodes's state matches the given one, :class:`False` otherwise. ''' + +class AlwaysIdleNode(Node): + def __init__(self, name): + self._name = name + + @property + def name(self): + return self._name + + def in_state(self, state): + return state.casefold() == 'idle' diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 5ace2c917a..a6d969ff7c 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -12,11 +12,7 @@ import reframe.core.schedulers as sched import reframe.utility.osext as osext from reframe.core.backends import register_scheduler -from reframe.core.exceptions import JobError, ReframeError - - -class _TimeoutExpired(ReframeError): - pass +from reframe.core.exceptions import JobError class _LocalJob(sched.Job): @@ -27,6 +23,7 @@ def __init__(self, *args, **kwargs): self._f_stderr = None self._signal = None self._cancel_time = None + self.spawn_command = f'./{self._script_filename}' @property def proc(self): @@ -66,7 +63,7 @@ def submit(self, job): # we can later kill any other processes that this might spawn by just # killing this one. proc = osext.run_command_async( - os.path.abspath(job.script_filename), + job.spawn_command, stdout=f_stdout, stderr=f_stderr, start_new_session=True diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py new file mode 100644 index 0000000000..e99b3e467a --- /dev/null +++ b/reframe/core/schedulers/ssh.py @@ -0,0 +1,219 @@ +# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import os +import functools +import time + +import reframe.utility.osext as osext +from reframe.core.backends import register_scheduler +from reframe.core.exceptions import ConfigError, SpawnedProcessError +from reframe.core.schedulers import Job, JobScheduler, AlwaysIdleNode + + +class _SSHJob(Job): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._localdir = None + self._remotedir = None + self._host = None + self._ssh_options = [] + + # Async processes spawned for this job + self.steps = {} + + @property + def localdir(self): + return self._localdir + + @property + def remotedir(self): + return self._remotedir + + @property + def host(self): + return self._host + + @property + def ssh_options(self): + return self._ssh_options + +@register_scheduler('ssh') +class SSHJobScheduler(JobScheduler): + def __init__(self): + self._free_hosts = set(self.get_option('hosts')) + self._allocated_hosts = set() + if not self._free_hosts: + raise ConfigError(f'no hosts specified for the SSH scheduler: {self._config_prefix}') + + # Determine if rsync is available + try: + osext.run_command('rsync --version', check=True) + except SpawnedProcessError: + self._has_rsync = False + else: + self._has_rsync = True + + def _reserve_host(self, host=None): + pool = self._free_hosts if self._free_hosts else self._allocated_hosts + if host: + pool.discard(host) + self._allocated_hosts.add(host) + return host + + host = pool.pop() + self._allocated_hosts.add(host) + return host + + def make_job(self, *args, **kwargs): + return _SSHJob(*args, **kwargs) + + def emit_preamble(self, job): + return [] + + def _push_artefacts(self, job): + assert isinstance(job, _SSHJob) + options = ' '.join(job.ssh_options) + + # Create a temporary directory on the remote host and push the job artifacts + completed = osext.run_command(f'ssh -o BatchMode=yes {options} {job.host} mktemp -td rfm.XXXXXXXX', check=True) + remotedir = completed.stdout.strip() + + # Store the local and remote dirs + job._localdir = os.getcwd() + job._remotedir = remotedir + + if self._has_rsync: + job.steps['push'] = osext.run_command_async2( + f'rsync -az -e "ssh -o BatchMode=yes {options}" {job.localdir}/ {job.host}:{remotedir}/', check=True + ) + else: + job.steps['push'] = osext.run_command_async2( + f'scp -r -o BatchMode=yes {options} {job.localdir}/* {job.host}:{remotedir}/', shell=True, check=True + ) + + + def _pull_artefacts(self, job): + assert isinstance(job, _SSHJob) + options = ' '.join(job.ssh_options) + if self._has_rsync: + job.steps['pull'] = osext.run_command_async2( + f'rsync -az -e "ssh -o BatchMode=yes {options}" {job.host}:{job.remotedir}/ {job.localdir}/' + ) + else: + job.steps['pull'] = osext.run_command_async2( + f"scp -r -o BatchMode=yes {options} '{job.host}:{job.remotedir}/*' {job.localdir}/", shell=True + ) + + def _do_submit(self, job): + # Modify the spawn command and submit + options = ' '.join(job.ssh_options) + job.steps['exec'] = osext.run_command_async2( + f'ssh -o BatchMode=yes {options} {job.host} "cd {job.remotedir} && bash -l {job.script_filename}"' + ) + + def submit(self, job): + assert isinstance(job, _SSHJob) + + # Check if `#host` pseudo-option is specified and use this as a host, + # stripping it off the rest of the options + host = None + stripped_opts = [] + options = job.sched_access + job.options + job.cli_options + for opt in options: + if opt.startswith('#host='): + _, host = opt.split('=', maxsplit=1) + else: + stripped_opts.append(opt) + + # Host is pinned externally (`--distribute` option) + if job.pin_nodes: + host = job.pin_nodes[0] + + job._submit_time = time.time() + job._ssh_options = stripped_opts + job._host = self._reserve_host(host) + + self._push_artefacts(job) + self._do_submit(job) + self._pull_artefacts(job) + + def success(proc): + return proc.exitcode == 0 + + job.steps['push'].then( + job.steps['exec'], + when=success + ).then( + job.steps['pull'], + when=success + ) + job.steps['push'].start() + job._jobid = job.steps['push'].pid + + def wait(self, job): + for step in job.steps.values(): + if step.started(): + step.wait() + + def cancel(self, job): + for step in job.steps.values(): + if step.started(): + step.cancel() + + def finished(self, job): + if job.exception: + raise job.exception + + return job.state is not None + + def poll(self, *jobs): + for job in jobs: + self._poll_job(job) + + def _poll_job(self, job): + last_done = None + last_failed = None + for proc_kind, proc in job.steps.items(): + if proc.started() and proc.done(): + last_done = proc_kind + if proc.exitcode != 0: + last_failed = proc_kind + break + + if last_failed is None and last_done != 'pull': + return False + + # Either all processes were done or one failed + # Update the job info + last_proc = job.steps[last_done] + job._exitcode = last_proc.exitcode + job._exception = last_proc.exception() + job._signal = last_proc.signal + if job._exitcode == 0: + job._state = 'SUCCESS' + else: + job._state = 'FAILURE' + + exec_proc = job.steps['exec'] + if exec_proc.started(): + with osext.change_dir(job.localdir): + with open(job.stdout, 'w+') as fout, open(job.stderr, 'w+') as ferr: + fout.write(exec_proc.stdout.read()) + ferr.write(exec_proc.stderr.read()) + + return True + + def allnodes(self): + return [AlwaysIdleNode(h) for h in self._free_hosts] + + def filternodes(self, job, nodes): + options = job.sched_access + job.options + job.cli_options + for opt in options: + if opt.startswith('#host='): + _, host = opt.split('=', maxsplit=1) + return [AlwaysIdleNode(host)] + else: + return [AlwaysIdleNode(h) for h in self._free_hosts] diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index 59a1d8f9ec..c1e6cfdfd9 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -109,6 +109,10 @@ "sched_options": { "type": "object", "properties": { + "hosts": { + "type": "array", + "items": {"type": "string"} + }, "ignore_reqnodenotavail": {"type": "boolean"}, "job_submit_timeout": {"type": "number"}, "resubmit_on_errors": { @@ -278,7 +282,7 @@ "type": "string", "enum": [ "flux", "local", "lsf", "oar", "pbs", - "sge", "slurm", "squeue", "torque" + "sge", "slurm", "squeue", "ssh", "torque" ] }, "sched_options": {"$ref": "#/defs/sched_options"}, @@ -620,6 +624,7 @@ "systems/partitions/time_limit": null, "systems/partitions/devices": [], "systems/partitions/extras": {}, + "systems/*/sched_options/hosts": [], "systems*/sched_options/ignore_reqnodenotavail": false, "systems*/sched_options/job_submit_timeout": 60, "systems*/sched_options/resubmit_on_errors": [], diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index efd22ee639..207e90183e 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -29,6 +29,151 @@ from . import OrderedSet +class UnstartedProcError(ReframeError): + '''Raised when a process operation is attempted on an unstarted process future''' + +class _ProcFuture: + def __init__(self, check=False, *args, **kwargs): + self._proc = None + self._exitcode = None + self._signal = None + self._check = check + self._cmd_args = (args, kwargs) + self._next = [] + self._done_callbacks = [] + self._completed = False + self._cancelled = False + + def _check_started(self): + if not self.started(): + raise UnstartedProcError + + def start(self): + args, kwargs = self._cmd_args + self._proc = run_command_async(*args, **kwargs) + + if os.getsid(self._proc.pid) == self._proc.pid: + self._session = True + else: + self._session = False + + @property + def pid(self): + return self._proc.pid + + @property + def exitcode(self): + return self._exitcode + + @property + def signal(self): + return self._signal + + def cancelled(self): + return self._cancelled + + def scheduled(self): + return self._scheduled + + def is_session(self): + return self._session + + def kill(self, signum): + self._check_started() + kill_fn = os.killpg if self.is_session() else os.kill + kill_fn(self.pid, signum) + self._signal = signum + + def terminate(self): + self.kill(signal.SIGTERM) + + def cancel(self): + self._check_started() + if not self.cancelled(): + self.kill(signal.SIGKILL) + + self._cancelled = True + + def add_done_callback(self, func): + self._done_callbacks.append(func) + + def then(self, future, when=None): + if when is None: + when = lambda fut: True + + self._next.append((future, when)) + return future + + def started(self): + return self._proc is not None + + def _wait(self, *, nohang=False): + self._check_started() + if self._completed: + return True + + options = os.WNOHANG if nohang else 0 + try: + pid, status = os.waitpid(self.pid, options) + except OSError as e: + if e.errno == errno.ECHILD: + self._completed = True + return self._completed + else: + raise e + + if nohang and not pid: + return False + + if os.WIFEXITED(status): + self._exitcode = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + self._signal = os.WTERMSIG(status) + + self._completed = True + + # Call any done callbacks + for func in self._done_callbacks: + func(self) + + # Start the next futures in the chain + for fut, cond in self._next: + if cond(self): + fut.start() + + return self._completed + + def done(self): + self._check_started() + return self._wait(nohang=True) + + def wait(self): + self._wait() + + def exception(self): + self._wait() + if not self._check: + return + + if self._proc.returncode == 0: + return + + return SpawnedProcessError(self._proc.args, + self._proc.stdout.read(), + self._proc.stderr.read(), + self._proc.returncode) + + @property + def stdout(self): + self._wait() + return self._proc.stdout + + @property + def stderr(self): + self._wait() + return self._proc.stderr + + def run_command(cmd, check=False, timeout=None, **kwargs): '''Run command synchronously. @@ -102,7 +247,7 @@ def run_command_async(cmd, if log: from reframe.core.logging import getlogger - getlogger().debug2(f'[CMD] {cmd!r}') + getlogger().debug(f'[CMD] {cmd!r}') if isinstance(cmd, str) and not shell: cmd = shlex.split(cmd) @@ -115,6 +260,8 @@ def run_command_async(cmd, shell=shell, **popen_args) +def run_command_async2(*args, check=False, **kwargs): + return _ProcFuture(check, *args, **kwargs) def osuser(): '''Return the name of the current OS user. diff --git a/unittests/test_utility.py b/unittests/test_utility.py index b89236cea8..b2e995920a 100644 --- a/unittests/test_utility.py +++ b/unittests/test_utility.py @@ -6,6 +6,7 @@ import os import pytest import random +import signal import sys import time @@ -83,6 +84,181 @@ def test_command_async(): assert t_launch < 1 assert t_sleep >= 1 +def test_command_futures(): + proc = osext.run_command_async2('echo hello', shell=True) + + # Check that some operations cannot be performed on an unstarted future + with pytest.raises(osext.UnstartedProcError): + proc.done() + + with pytest.raises(osext.UnstartedProcError): + proc.cancel() + + with pytest.raises(osext.UnstartedProcError): + proc.terminate() + + with pytest.raises(osext.UnstartedProcError): + proc.wait() + + assert not proc.started() + proc.start() + assert proc.started() + assert proc.pid is not None + + # By default a process is not started as a new session + assert not proc.is_session() + + # stdout must block + assert proc.stdout.read() == 'hello\n' + assert proc.exitcode == 0 + assert proc.signal is None + + # Additional wait() should have no effect + proc.wait() + proc.wait() + + assert proc.done() + assert not proc.cancelled() + assert proc.exception() is None + + +def test_command_futures_callbacks(): + num_called = 0 + def _callback(_): + nonlocal num_called + num_called += 1 + + proc = osext.run_command_async2("echo hello", shell=True) + proc.add_done_callback(_callback) + proc.start() + while not proc.done(): + pass + + # Call explicitly more times + proc.done() + proc.done() + assert num_called == 1 + + +@pytest.fixture(params=['checked', 'unchecked']) +def _checked_cmd(request): + return request.param == 'checked' + +def test_command_futures_error(_checked_cmd): + proc = osext.run_command_async2("false", shell=True, check=_checked_cmd) + proc.start() + + # exception() blocks until the process is finished + if _checked_cmd: + assert isinstance(proc.exception(), SpawnedProcessError) + else: + assert proc.exception() is None + + assert proc.exitcode == 1 + assert proc.signal is None + +@pytest.fixture(params=['SIGINT', 'SIGTERM', 'SIGKILL']) +def _signal(request): + if request.param == 'SIGINT': + return signal.SIGINT + elif request.param == 'SIGTERM': + return signal.SIGTERM + elif request.param == 'SIGKILL': + return signal.SIGKILL + + assert 0 + +def test_command_futures_signal(_checked_cmd, _signal): + proc = osext.run_command_async2('sleep 3', shell=True, check=_checked_cmd) + proc.start() + if _signal == signal.SIGTERM: + proc.terminate() + elif _signal == signal.SIGKILL: + proc.cancel() + else: + proc.kill(_signal) + + proc.wait() + assert proc.done() + if _signal == signal.SIGKILL: + assert proc.cancelled() + else: + assert not proc.cancelled() + + assert proc.signal == _signal + assert proc.exitcode is None + if _checked_cmd: + assert isinstance(proc.exception(), SpawnedProcessError) + else: + assert proc.exception() is None + +def test_command_futures_chain(tmp_path): + with open(tmp_path / 'stdout.txt', 'w+') as fp: + proc0 = osext.run_command_async2('echo hello', shell=True, stdout=fp) + proc1 = osext.run_command_async2('sleep 1', shell=True, stdout=fp) + proc2 = osext.run_command_async2('sleep 1', shell=True, stdout=fp) + proc3 = osext.run_command_async2('echo world', shell=True, stdout=fp) + proc0.then(proc1) + proc0.then(proc2).then(proc3) + + all_procs = [proc0, proc1, proc2, proc3] + t_start = time.time() + proc0.start() + while not all(p.done() for p in all_procs if p.started()): + pass + + t_elapsed = time.time() - t_start + assert t_elapsed < 2 + assert all(p.done() for p in all_procs) + + with open(tmp_path / 'stdout.txt') as fp: + assert fp.read() == 'hello\nworld\n' + +@pytest.fixture(params=['fail_on_error', 'ignore_errors']) +def _chain_policy(request): + return request.param + +def test_command_futures_chain_cond(_chain_policy, tmp_path): + if _chain_policy == 'fail_on_error': + def cond(proc): + return proc.exitcode == 0 + else: + def cond(proc): + return True + + with open(tmp_path / 'stdout.txt', 'w+') as fp: + proc0 = osext.run_command_async2("echo hello", shell=True, stdout=fp) + proc1 = osext.run_command_async2("false", shell=True) + proc2 = osext.run_command_async2("echo world", shell=True, stdout=fp) + proc0.then(proc1).then(proc2, when=cond) + proc0.start() + proc0.wait() + proc1.wait() + if _chain_policy == 'fail_on_error': + assert not proc2.started() + else: + proc2.wait() + + with open(tmp_path / 'stdout.txt') as fp: + if _chain_policy == 'fail_on_error': + assert fp.read() == 'hello\n' + else: + assert fp.read() == 'hello\nworld\n' + + +def test_command_futures_chain_cancel(): + proc0 = osext.run_command_async2('echo hello', shell=True) + proc1 = osext.run_command_async2('sleep 1', shell=True) + proc2 = osext.run_command_async2('echo world', shell=True) + proc0.then(proc1).then(proc2) + proc0.start() + while not proc0.done(): + pass + + assert proc1.started() + proc1.cancel() + assert proc1.cancelled() + assert not proc2.started() def test_copytree(tmp_path): dir_src = tmp_path / 'src' From de7a82a519e7b0dfc56f3e8f51cc52a36a5222c3 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Fri, 25 Aug 2023 23:03:39 +0200 Subject: [PATCH 02/11] Style fixes --- reframe/core/schedulers/ssh.py | 29 ++++++++++++++++++++--------- reframe/utility/osext.py | 8 ++++++-- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index e99b3e467a..43fba82e9c 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -40,13 +40,15 @@ def host(self): def ssh_options(self): return self._ssh_options + @register_scheduler('ssh') class SSHJobScheduler(JobScheduler): def __init__(self): self._free_hosts = set(self.get_option('hosts')) self._allocated_hosts = set() if not self._free_hosts: - raise ConfigError(f'no hosts specified for the SSH scheduler: {self._config_prefix}') + raise ConfigError(f'no hosts specified for the SSH scheduler: ' + f'{self._config_prefix}') # Determine if rsync is available try: @@ -78,7 +80,10 @@ def _push_artefacts(self, job): options = ' '.join(job.ssh_options) # Create a temporary directory on the remote host and push the job artifacts - completed = osext.run_command(f'ssh -o BatchMode=yes {options} {job.host} mktemp -td rfm.XXXXXXXX', check=True) + completed = osext.run_command( + f'ssh -o BatchMode=yes {options} {job.host} ' + f'mktemp -td rfm.XXXXXXXX', check=True + ) remotedir = completed.stdout.strip() # Store the local and remote dirs @@ -87,31 +92,36 @@ def _push_artefacts(self, job): if self._has_rsync: job.steps['push'] = osext.run_command_async2( - f'rsync -az -e "ssh -o BatchMode=yes {options}" {job.localdir}/ {job.host}:{remotedir}/', check=True + f'rsync -az -e "ssh -o BatchMode=yes {options}" ' + f'{job.localdir}/ {job.host}:{remotedir}/', check=True ) else: job.steps['push'] = osext.run_command_async2( - f'scp -r -o BatchMode=yes {options} {job.localdir}/* {job.host}:{remotedir}/', shell=True, check=True + f'scp -r -o BatchMode=yes {options} ' + f'{job.localdir}/* {job.host}:{remotedir}/', + shell=True, check=True ) - def _pull_artefacts(self, job): assert isinstance(job, _SSHJob) options = ' '.join(job.ssh_options) if self._has_rsync: job.steps['pull'] = osext.run_command_async2( - f'rsync -az -e "ssh -o BatchMode=yes {options}" {job.host}:{job.remotedir}/ {job.localdir}/' + f'rsync -az -e "ssh -o BatchMode=yes {options}" ' + f'{job.host}:{job.remotedir}/ {job.localdir}/' ) else: job.steps['pull'] = osext.run_command_async2( - f"scp -r -o BatchMode=yes {options} '{job.host}:{job.remotedir}/*' {job.localdir}/", shell=True + f"scp -r -o BatchMode=yes {options} " + f"'{job.host}:{job.remotedir}/*' {job.localdir}/", shell=True ) def _do_submit(self, job): # Modify the spawn command and submit options = ' '.join(job.ssh_options) job.steps['exec'] = osext.run_command_async2( - f'ssh -o BatchMode=yes {options} {job.host} "cd {job.remotedir} && bash -l {job.script_filename}"' + f'ssh -o BatchMode=yes {options} {job.host} ' + f'"cd {job.remotedir} && bash -l {job.script_filename}"' ) def submit(self, job): @@ -200,7 +210,8 @@ def _poll_job(self, job): exec_proc = job.steps['exec'] if exec_proc.started(): with osext.change_dir(job.localdir): - with open(job.stdout, 'w+') as fout, open(job.stderr, 'w+') as ferr: + with (open(job.stdout, 'w+') as fout, + open(job.stderr, 'w+') as ferr): fout.write(exec_proc.stdout.read()) ferr.write(exec_proc.stderr.read()) diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index 207e90183e..a1f5c21a96 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -30,7 +30,9 @@ class UnstartedProcError(ReframeError): - '''Raised when a process operation is attempted on an unstarted process future''' + '''Raised when a process operation is attempted on a + not yet started process future''' + class _ProcFuture: def __init__(self, check=False, *args, **kwargs): @@ -99,7 +101,7 @@ def add_done_callback(self, func): def then(self, future, when=None): if when is None: - when = lambda fut: True + def when(fut): return True self._next.append((future, when)) return future @@ -260,9 +262,11 @@ def run_command_async(cmd, shell=shell, **popen_args) + def run_command_async2(*args, check=False, **kwargs): return _ProcFuture(check, *args, **kwargs) + def osuser(): '''Return the name of the current OS user. From ea8fa42e5a5b2b4663ab6403f8e13471c3f435fd Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Aug 2023 00:17:26 +0200 Subject: [PATCH 03/11] Add docs about SSH scheduler --- docs/config_reference.rst | 27 ++++++++- reframe/core/schedulers/ssh.py | 4 +- reframe/utility/osext.py | 101 +++++++++++++++++++++++++++++++-- unittests/test_utility.py | 18 +++++- 4 files changed, 139 insertions(+), 11 deletions(-) diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 4dbcab805a..2808244acc 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -260,8 +260,9 @@ System Partition Configuration The job scheduler that will be used to launch jobs on this partition. Supported schedulers are the following: - - ``local``: Jobs will be launched locally without using any job scheduler. - ``flux``: Jobs will be launched using the `Flux Framework `_ scheduler. + - ``local``: Jobs will be launched locally without using any job scheduler. + - ``lsf``: Jobs will be launched using the `LSF `__ scheduler. - ``oar``: Jobs will be launched using the `OAR `__ scheduler. - ``pbs``: Jobs will be launched using the `PBS Pro `__ scheduler. - ``sge``: Jobs will be launched using the `Sun Grid Engine `__ scheduler. @@ -270,8 +271,19 @@ System Partition Configuration If not, you should consider using the ``squeue`` backend below. - ``squeue``: Jobs will be launched using the `Slurm `__ scheduler. This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. + - ``ssh``: Jobs will be launched on a remote host using SSH. + + The remote host will be selected from the list of hosts specified in :attr:`~systems.partitions.sched_options.hosts`. + The scheduler keeps track of the hosts that it has submitted jobs to, and it will select the next available one in a round-robin fashion. + For connecting to a remote host, the options specified in :attr:`~systems.partitions.access` will be used. + + When a job is submitted with this scheduler, its stage directory will be copied over to a unique temporary directory on the remote host, then the job will be executed and, finally, any produced artifacts will be copied back. + + The contents of the stage directory are copied to the remote host either using ``rsync``, if available, or ``scp`` as a second choice. + The same :attr:`~systems.partitions.access` options will be used in those operations as well. + Please note, that the connection options of ``ssh`` and ``scp`` differ and ReFrame will not attempt to translate any options between the two utilities in case ``scp`` is selected for copying to the remote host. + In this case, it is preferable to set up the host connection options in ``~/.ssh/config`` and leave :attr:`~systems.partition.access` blank. - ``torque``: Jobs will be launched using the `Torque `__ scheduler. - - ``lsf``: Jobs will be launched using the `LSF `__ scheduler. .. versionadded:: 3.7.2 Support for the SGE scheduler is added. @@ -282,6 +294,9 @@ System Partition Configuration .. versionadded:: 3.11.0 Support for the LSF scheduler is added. + .. versionadded:: 4.4 + The ``ssh`` scheduler is added. + .. note:: The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific. @@ -337,6 +352,14 @@ System Partition Configuration .. warning:: This option is broken in 4.0. +.. py:attribute:: systems.partitions.sched_options.hosts + + :required: No + :default: ``[]`` + + List of hosts in a partition that uses the ``ssh`` scheduler. + + .. py:attribute:: systems.partitions.sched_options.ignore_reqnodenotavail :required: No diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 43fba82e9c..dbfb678f24 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -212,8 +212,8 @@ def _poll_job(self, job): with osext.change_dir(job.localdir): with (open(job.stdout, 'w+') as fout, open(job.stderr, 'w+') as ferr): - fout.write(exec_proc.stdout.read()) - ferr.write(exec_proc.stderr.read()) + fout.write(exec_proc.stdout().read()) + ferr.write(exec_proc.stderr().read()) return True diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index a1f5c21a96..ccc2d90365 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -35,6 +35,17 @@ class UnstartedProcError(ReframeError): class _ProcFuture: + '''A future encapsulating a command to be executed asynchronously. + + Users may not create a :class:`_ProcFuture` directly, but should use + :func:`run_command_async2` instead. + + :meta public: + + .. versionadded:: 4.4 + + ''' + def __init__(self, check=False, *args, **kwargs): self._proc = None self._exitcode = None @@ -51,6 +62,8 @@ def _check_started(self): raise UnstartedProcError def start(self): + '''Start the future, i.e. spawn the encapsulated command.''' + args, kwargs = self._cmd_args self._proc = run_command_async(*args, **kwargs) @@ -59,37 +72,50 @@ def start(self): else: self._session = False + return self + @property def pid(self): + '''The PID of the spawned process.''' return self._proc.pid @property def exitcode(self): + '''The exit code of the spawned process.''' return self._exitcode @property def signal(self): + '''The signal number that caused the spawned process to exit.''' return self._signal def cancelled(self): + '''Returns :obj:`True` if the future was cancelled.''' return self._cancelled - def scheduled(self): - return self._scheduled - def is_session(self): + '''Returns :obj:`True` is the spawned process is a group or session + leader.''' return self._session def kill(self, signum): + '''Send signal ``signum`` to the spawned process. + + If the process is a group or session leader, the signal will be sent + to the whole group or session. + ''' + self._check_started() kill_fn = os.killpg if self.is_session() else os.kill kill_fn(self.pid, signum) self._signal = signum def terminate(self): + '''Terminate the spawned process by sending ``SIGTERM``.''' self.kill(signal.SIGTERM) def cancel(self): + '''Cancel the spawned process by sending ``SIGKILL``.''' self._check_started() if not self.cancelled(): self.kill(signal.SIGKILL) @@ -97,16 +123,45 @@ def cancel(self): self._cancelled = True def add_done_callback(self, func): + '''Add a callback that will be called when this future is done. + + The callback function will be called with the future as its sole + argument. + ''' + if not util.is_trivially_callable(func, non_def_args=1): + raise ValueError('the callback function must ' + 'accept a single argument') + self._done_callbacks.append(func) def then(self, future, when=None): + '''Schedule another future for execution after this one. + + :arg future: a :class:`_ProcFuture` to be started after this one + finishes. + :arg when: A callable that will be used as conditional for starting or + not the next future. It will be called with this future as its + sole argument and must return a boolean. If the return value is + true, then ``future`` will start execution, otherwise not. + + If ``when`` is :obj:`None`, then the next future will be executed + unconditionally. + :returns: the passed ``future``, so that multiple :func:`then` calls + can be chained. + ''' + if when is None: def when(fut): return True + if not util.is_trivially_callable(when, non_def_args=1): + raise ValueError("the 'when' function must " + "accept a single argument") + self._next.append((future, when)) return future def started(self): + '''Check if this future has started.''' return self._proc is not None def _wait(self, *, nohang=False): @@ -146,13 +201,27 @@ def _wait(self, *, nohang=False): return self._completed def done(self): + '''Check if the future has finished. + + This is a non-blocking call. + ''' self._check_started() return self._wait(nohang=True) def wait(self): + '''Wait for this future to finish.''' self._wait() def exception(self): + '''Retrieve the exception raised by this future. + + This is a blocking call and will wait until this future finishes. + + The only exception that a :func:`_ProcFuture` can return is a + :class:`SpawnedProcessError` if the ``check`` flag was set in + :func:`run_command_async2`. + ''' + self._wait() if not self._check: return @@ -165,13 +234,19 @@ def exception(self): self._proc.stderr.read(), self._proc.returncode) - @property def stdout(self): + '''Retrieve the standard output of the spawned process. + + This is a blocking call and will wait until the future finishes. + ''' self._wait() return self._proc.stdout - @property def stderr(self): + '''Retrieve the standard error of the spawned process. + + This is a blocking call and will wait until the future finishes. + ''' self._wait() return self._proc.stderr @@ -264,6 +339,22 @@ def run_command_async(cmd, def run_command_async2(*args, check=False, **kwargs): + '''Return a :class:`_ProcFuture` that encapsulates a command to be + executed. + + The command to be executed will not start until the returned future is + started. + + :arg args: Any of the arguments that can be passed to + :func:`run_command_async` + :arg check: If true, flag the future with a :func:`SpawnedProcessError` + exception, upon failure. + :arg kwargs: Any of the keyword arguments that can be passed to + :func:`run_command_async`. + + .. versionadded:: 4.4 + + ''' return _ProcFuture(check, *args, **kwargs) diff --git a/unittests/test_utility.py b/unittests/test_utility.py index b2e995920a..c3d33c6101 100644 --- a/unittests/test_utility.py +++ b/unittests/test_utility.py @@ -84,6 +84,7 @@ def test_command_async(): assert t_launch < 1 assert t_sleep >= 1 + def test_command_futures(): proc = osext.run_command_async2('echo hello', shell=True) @@ -109,7 +110,7 @@ def test_command_futures(): assert not proc.is_session() # stdout must block - assert proc.stdout.read() == 'hello\n' + assert proc.stdout().read() == 'hello\n' assert proc.exitcode == 0 assert proc.signal is None @@ -124,12 +125,16 @@ def test_command_futures(): def test_command_futures_callbacks(): num_called = 0 + def _callback(_): nonlocal num_called num_called += 1 proc = osext.run_command_async2("echo hello", shell=True) proc.add_done_callback(_callback) + with pytest.raises(ValueError): + proc.add_done_callback(lambda: 1) + proc.start() while not proc.done(): pass @@ -144,6 +149,7 @@ def _callback(_): def _checked_cmd(request): return request.param == 'checked' + def test_command_futures_error(_checked_cmd): proc = osext.run_command_async2("false", shell=True, check=_checked_cmd) proc.start() @@ -157,6 +163,7 @@ def test_command_futures_error(_checked_cmd): assert proc.exitcode == 1 assert proc.signal is None + @pytest.fixture(params=['SIGINT', 'SIGTERM', 'SIGKILL']) def _signal(request): if request.param == 'SIGINT': @@ -168,6 +175,7 @@ def _signal(request): assert 0 + def test_command_futures_signal(_checked_cmd, _signal): proc = osext.run_command_async2('sleep 3', shell=True, check=_checked_cmd) proc.start() @@ -192,6 +200,7 @@ def test_command_futures_signal(_checked_cmd, _signal): else: assert proc.exception() is None + def test_command_futures_chain(tmp_path): with open(tmp_path / 'stdout.txt', 'w+') as fp: proc0 = osext.run_command_async2('echo hello', shell=True, stdout=fp) @@ -200,7 +209,6 @@ def test_command_futures_chain(tmp_path): proc3 = osext.run_command_async2('echo world', shell=True, stdout=fp) proc0.then(proc1) proc0.then(proc2).then(proc3) - all_procs = [proc0, proc1, proc2, proc3] t_start = time.time() proc0.start() @@ -214,10 +222,12 @@ def test_command_futures_chain(tmp_path): with open(tmp_path / 'stdout.txt') as fp: assert fp.read() == 'hello\nworld\n' + @pytest.fixture(params=['fail_on_error', 'ignore_errors']) def _chain_policy(request): return request.param + def test_command_futures_chain_cond(_chain_policy, tmp_path): if _chain_policy == 'fail_on_error': def cond(proc): @@ -231,6 +241,9 @@ def cond(proc): proc1 = osext.run_command_async2("false", shell=True) proc2 = osext.run_command_async2("echo world", shell=True, stdout=fp) proc0.then(proc1).then(proc2, when=cond) + with pytest.raises(ValueError): + proc0.then(proc1, when=lambda: False) + proc0.start() proc0.wait() proc1.wait() @@ -260,6 +273,7 @@ def test_command_futures_chain_cancel(): assert proc1.cancelled() assert not proc2.started() + def test_copytree(tmp_path): dir_src = tmp_path / 'src' dir_src.mkdir() From b3a7a26f49f223f8ceca5aabec61357d3c9c7f8d Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Aug 2023 22:54:21 +0200 Subject: [PATCH 04/11] Fix unit tests --- reframe/core/schedulers/local.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index a6d969ff7c..09d671f43e 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -23,7 +23,6 @@ def __init__(self, *args, **kwargs): self._f_stderr = None self._signal = None self._cancel_time = None - self.spawn_command = f'./{self._script_filename}' @property def proc(self): @@ -63,7 +62,7 @@ def submit(self, job): # we can later kill any other processes that this might spawn by just # killing this one. proc = osext.run_command_async( - job.spawn_command, + os.path.abspath(job.script_filename), stdout=f_stdout, stderr=f_stderr, start_new_session=True From 47b15fb58f8963a8c332d07fa89e9986d9dda128 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 29 Aug 2023 00:03:08 +0200 Subject: [PATCH 05/11] Remove parenthesized with stmt --- reframe/core/schedulers/ssh.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index dbfb678f24..41be9cfd04 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -210,9 +210,10 @@ def _poll_job(self, job): exec_proc = job.steps['exec'] if exec_proc.started(): with osext.change_dir(job.localdir): - with (open(job.stdout, 'w+') as fout, - open(job.stderr, 'w+') as ferr): + with open(job.stdout, 'w+') as fout: fout.write(exec_proc.stdout().read()) + + with open(job.stderr, 'w+') as ferr: ferr.write(exec_proc.stderr().read()) return True From 4f6fe9e1a779273dcf87257cc39fedcba4bdd71d Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 25 Sep 2023 22:46:22 +0200 Subject: [PATCH 06/11] Remove unused imports --- reframe/core/schedulers/ssh.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 41be9cfd04..5a789835ac 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD-3-Clause import os -import functools import time import reframe.utility.osext as osext From a70944a16bd9156df74b70a2b3b16aac6584f891 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 26 Sep 2023 00:02:27 +0200 Subject: [PATCH 07/11] Update docs --- docs/config_reference.rst | 9 +++++++-- reframe/core/schedulers/ssh.py | 2 +- reframe/schemas/config.json | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 2808244acc..39f925cbb0 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -273,7 +273,7 @@ System Partition Configuration This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible. - ``ssh``: Jobs will be launched on a remote host using SSH. - The remote host will be selected from the list of hosts specified in :attr:`~systems.partitions.sched_options.hosts`. + The remote host will be selected from the list of hosts specified in :attr:`~systems.partitions.sched_options.ssh_hosts`. The scheduler keeps track of the hosts that it has submitted jobs to, and it will select the next available one in a round-robin fashion. For connecting to a remote host, the options specified in :attr:`~systems.partitions.access` will be used. @@ -283,6 +283,11 @@ System Partition Configuration The same :attr:`~systems.partitions.access` options will be used in those operations as well. Please note, that the connection options of ``ssh`` and ``scp`` differ and ReFrame will not attempt to translate any options between the two utilities in case ``scp`` is selected for copying to the remote host. In this case, it is preferable to set up the host connection options in ``~/.ssh/config`` and leave :attr:`~systems.partition.access` blank. + + Job-scheduler command line options can be used to interact with the ``ssh`` backend. + More specifically, if the :option:`--distribute` option is used, a test will be generated for each host listed in :attr:`~systems.partitions.sched_options.ssh_hosts`. + You can also pin a test to a specific host if you pass the ``#host`` directive to the :option:`-J` option, e.g., ``-J '#host=myhost'``. + - ``torque``: Jobs will be launched using the `Torque `__ scheduler. .. versionadded:: 3.7.2 @@ -352,7 +357,7 @@ System Partition Configuration .. warning:: This option is broken in 4.0. -.. py:attribute:: systems.partitions.sched_options.hosts +.. py:attribute:: systems.partitions.sched_options.ssh_hosts :required: No :default: ``[]`` diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 5a789835ac..3984b60a46 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -43,7 +43,7 @@ def ssh_options(self): @register_scheduler('ssh') class SSHJobScheduler(JobScheduler): def __init__(self): - self._free_hosts = set(self.get_option('hosts')) + self._free_hosts = set(self.get_option('ssh_hosts')) self._allocated_hosts = set() if not self._free_hosts: raise ConfigError(f'no hosts specified for the SSH scheduler: ' diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index ee71272deb..0cc8ff1ffa 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -619,7 +619,7 @@ "systems/partitions/time_limit": null, "systems/partitions/devices": [], "systems/partitions/extras": {}, - "systems/*/sched_options/hosts": [], + "systems/*/sched_options/ssh_hosts": [], "systems*/sched_options/ignore_reqnodenotavail": false, "systems*/sched_options/job_submit_timeout": 60, "systems*/sched_options/resubmit_on_errors": [], From fe144af7e470357be441510addbdb876f3380a37 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 28 Sep 2023 22:46:58 +0200 Subject: [PATCH 08/11] Coding style fixes --- reframe/core/schedulers/__init__.py | 1 + reframe/core/schedulers/ssh.py | 3 ++- reframe/utility/osext.py | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 781bb031bb..a5daee11f2 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -628,6 +628,7 @@ def in_state(self, state): :class:`False` otherwise. ''' + class AlwaysIdleNode(Node): def __init__(self, name): self._name = name diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 3984b60a46..0e1dc9137b 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -78,7 +78,8 @@ def _push_artefacts(self, job): assert isinstance(job, _SSHJob) options = ' '.join(job.ssh_options) - # Create a temporary directory on the remote host and push the job artifacts + # Create a temporary directory on the remote host and push the job + # artifacts completed = osext.run_command( f'ssh -o BatchMode=yes {options} {job.host} ' f'mktemp -td rfm.XXXXXXXX', check=True diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index ccc2d90365..53d4065158 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -43,7 +43,6 @@ class _ProcFuture: :meta public: .. versionadded:: 4.4 - ''' def __init__(self, check=False, *args, **kwargs): From dea4751cffcb66cd009a133f6a73d3a6051e48a9 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 28 Sep 2023 23:42:38 +0200 Subject: [PATCH 09/11] Add unit tests --- reframe/core/schedulers/ssh.py | 4 ++-- reframe/schemas/config.json | 2 +- unittests/test_schedulers.py | 28 +++++++++++++++++++++++++--- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 0e1dc9137b..03e35c6a15 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -42,8 +42,8 @@ def ssh_options(self): @register_scheduler('ssh') class SSHJobScheduler(JobScheduler): - def __init__(self): - self._free_hosts = set(self.get_option('ssh_hosts')) + def __init__(self, *, hosts=None): + self._free_hosts = set(hosts or self.get_option('ssh_hosts')) self._allocated_hosts = set() if not self._free_hosts: raise ConfigError(f'no hosts specified for the SSH scheduler: ' diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index 0cc8ff1ffa..70926f87cb 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -619,7 +619,7 @@ "systems/partitions/time_limit": null, "systems/partitions/devices": [], "systems/partitions/extras": {}, - "systems/*/sched_options/ssh_hosts": [], + "systems*/sched_options/ssh_hosts": [], "systems*/sched_options/ignore_reqnodenotavail": false, "systems*/sched_options/job_submit_timeout": 60, "systems*/sched_options/resubmit_on_errors": [], diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 797fecd2b2..c38914c972 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -26,8 +26,8 @@ def launcher(): return getlauncher('local') -@pytest.fixture(params=['flux', 'local', 'lsf', 'oar', - 'pbs', 'sge', 'slurm', 'squeue', 'torque']) +@pytest.fixture(params=['flux', 'local', 'lsf', 'oar', 'pbs', + 'sge', 'slurm', 'ssh', 'squeue', 'torque']) def scheduler(request): try: return getscheduler(request.param) @@ -73,7 +73,13 @@ def exec_ctx(make_exec_ctx, scheduler): @pytest.fixture def make_job(scheduler, launcher, tmp_path): def _make_job(sched_opts=None, **jobargs): - sched = scheduler(**sched_opts) if sched_opts else scheduler() + if sched_opts: + sched = scheduler(**sched_opts) + elif scheduler.registered_name == 'ssh': + sched = scheduler(hosts=['localhost']) + else: + sched = scheduler() + return Job.create( sched, launcher(), name='testjob', @@ -361,6 +367,18 @@ def _expected_local_directives_no_tasks(job): return set() +def _expected_ssh_directives(job): + return set() + + +def _expected_ssh_directives_minimal(job): + return set() + + +def _expected_ssh_directives_no_tasks(job): + return set() + + def test_prepare(fake_job): sched_name = fake_job.scheduler.registered_name if sched_name == 'pbs': @@ -649,6 +667,10 @@ def test_guess_num_tasks(minimal_job, scheduler): # of the default partition through the use of `scontrol show` minimal_job.scheduler._get_default_partition = lambda: 'pdef' assert minimal_job.guess_num_tasks() == 0 + elif scheduler.registered_name == 'ssh': + minimal_job.num_tasks = 0 + minimal_job._sched_flex_alloc_nodes = 'all' + assert minimal_job.guess_num_tasks() == 1 else: with pytest.raises(NotImplementedError): minimal_job.guess_num_tasks() From 716f8517cb5027aa2ae80d67bb068cf0fd819927 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 28 Sep 2023 23:44:50 +0200 Subject: [PATCH 10/11] Coding style fixes --- reframe/utility/osext.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index 53d4065158..c6cd32f553 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -150,7 +150,8 @@ def then(self, future, when=None): ''' if when is None: - def when(fut): return True + def when(fut): + return True if not util.is_trivially_callable(when, non_def_args=1): raise ValueError("the 'when' function must " From a6cdc013f4d39d535bf2e2200acb10bb532a0e65 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Fri, 29 Sep 2023 22:43:03 +0200 Subject: [PATCH 11/11] Treat properly cases when `rsync` is not available --- reframe/core/schedulers/ssh.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index 03e35c6a15..e1d7727d40 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -52,7 +52,7 @@ def __init__(self, *, hosts=None): # Determine if rsync is available try: osext.run_command('rsync --version', check=True) - except SpawnedProcessError: + except (FileNotFoundError, SpawnedProcessError): self._has_rsync = False else: self._has_rsync = True