diff --git a/docs/config_reference.rst b/docs/config_reference.rst
index cbb9373032..bb6064ad40 100644
--- a/docs/config_reference.rst
+++ b/docs/config_reference.rst
@@ -260,8 +260,9 @@ System Partition Configuration
The job scheduler that will be used to launch jobs on this partition.
Supported schedulers are the following:
- - ``local``: Jobs will be launched locally without using any job scheduler.
- ``flux``: Jobs will be launched using the `Flux Framework `_ scheduler.
+ - ``local``: Jobs will be launched locally without using any job scheduler.
+ - ``lsf``: Jobs will be launched using the `LSF `__ scheduler.
- ``oar``: Jobs will be launched using the `OAR `__ scheduler.
- ``pbs``: Jobs will be launched using the `PBS Pro `__ scheduler.
- ``sge``: Jobs will be launched using the `Sun Grid Engine `__ scheduler.
@@ -270,8 +271,24 @@ System Partition Configuration
If not, you should consider using the ``squeue`` backend below.
- ``squeue``: Jobs will be launched using the `Slurm `__ scheduler.
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.
+ - ``ssh``: Jobs will be launched on a remote host using SSH.
+
+ The remote host will be selected from the list of hosts specified in :attr:`~systems.partitions.sched_options.ssh_hosts`.
+ The scheduler keeps track of the hosts that it has submitted jobs to, and it will select the next available one in a round-robin fashion.
+ For connecting to a remote host, the options specified in :attr:`~systems.partitions.access` will be used.
+
+ When a job is submitted with this scheduler, its stage directory will be copied over to a unique temporary directory on the remote host, then the job will be executed and, finally, any produced artifacts will be copied back.
+
+ The contents of the stage directory are copied to the remote host either using ``rsync``, if available, or ``scp`` as a second choice.
+ The same :attr:`~systems.partitions.access` options will be used in those operations as well.
+ Please note, that the connection options of ``ssh`` and ``scp`` differ and ReFrame will not attempt to translate any options between the two utilities in case ``scp`` is selected for copying to the remote host.
+ In this case, it is preferable to set up the host connection options in ``~/.ssh/config`` and leave :attr:`~systems.partition.access` blank.
+
+ Job-scheduler command line options can be used to interact with the ``ssh`` backend.
+ More specifically, if the :option:`--distribute` option is used, a test will be generated for each host listed in :attr:`~systems.partitions.sched_options.ssh_hosts`.
+ You can also pin a test to a specific host if you pass the ``#host`` directive to the :option:`-J` option, e.g., ``-J '#host=myhost'``.
+
- ``torque``: Jobs will be launched using the `Torque `__ scheduler.
- - ``lsf``: Jobs will be launched using the `LSF `__ scheduler.
.. versionadded:: 3.7.2
Support for the SGE scheduler is added.
@@ -282,6 +299,9 @@ System Partition Configuration
.. versionadded:: 3.11.0
Support for the LSF scheduler is added.
+ .. versionadded:: 4.4
+ The ``ssh`` scheduler is added.
+
.. note::
The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific.
@@ -337,6 +357,14 @@ System Partition Configuration
.. warning::
This option is broken in 4.0.
+.. py:attribute:: systems.partitions.sched_options.ssh_hosts
+
+ :required: No
+ :default: ``[]``
+
+ List of hosts in a partition that uses the ``ssh`` scheduler.
+
+
.. py:attribute:: systems.partitions.sched_options.ignore_reqnodenotavail
:required: No
diff --git a/reframe/core/backends.py b/reframe/core/backends.py
index a8720c2ce7..6a2924bc87 100644
--- a/reframe/core/backends.py
+++ b/reframe/core/backends.py
@@ -23,7 +23,8 @@
'reframe.core.schedulers.pbs',
'reframe.core.schedulers.oar',
'reframe.core.schedulers.sge',
- 'reframe.core.schedulers.slurm'
+ 'reframe.core.schedulers.slurm',
+ 'reframe.core.schedulers.ssh'
]
_schedulers = {}
diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py
index e21e33e56e..a5daee11f2 100644
--- a/reframe/core/schedulers/__init__.py
+++ b/reframe/core/schedulers/__init__.py
@@ -627,3 +627,15 @@ def in_state(self, state):
:returns: :class:`True` if the nodes's state matches the given one,
:class:`False` otherwise.
'''
+
+
+class AlwaysIdleNode(Node):
+ def __init__(self, name):
+ self._name = name
+
+ @property
+ def name(self):
+ return self._name
+
+ def in_state(self, state):
+ return state.casefold() == 'idle'
diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py
index 5ace2c917a..09d671f43e 100644
--- a/reframe/core/schedulers/local.py
+++ b/reframe/core/schedulers/local.py
@@ -12,11 +12,7 @@
import reframe.core.schedulers as sched
import reframe.utility.osext as osext
from reframe.core.backends import register_scheduler
-from reframe.core.exceptions import JobError, ReframeError
-
-
-class _TimeoutExpired(ReframeError):
- pass
+from reframe.core.exceptions import JobError
class _LocalJob(sched.Job):
diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py
new file mode 100644
index 0000000000..e1d7727d40
--- /dev/null
+++ b/reframe/core/schedulers/ssh.py
@@ -0,0 +1,231 @@
+# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
+# ReFrame Project Developers. See the top-level LICENSE file for details.
+#
+# SPDX-License-Identifier: BSD-3-Clause
+
+import os
+import time
+
+import reframe.utility.osext as osext
+from reframe.core.backends import register_scheduler
+from reframe.core.exceptions import ConfigError, SpawnedProcessError
+from reframe.core.schedulers import Job, JobScheduler, AlwaysIdleNode
+
+
+class _SSHJob(Job):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._localdir = None
+ self._remotedir = None
+ self._host = None
+ self._ssh_options = []
+
+ # Async processes spawned for this job
+ self.steps = {}
+
+ @property
+ def localdir(self):
+ return self._localdir
+
+ @property
+ def remotedir(self):
+ return self._remotedir
+
+ @property
+ def host(self):
+ return self._host
+
+ @property
+ def ssh_options(self):
+ return self._ssh_options
+
+
+@register_scheduler('ssh')
+class SSHJobScheduler(JobScheduler):
+ def __init__(self, *, hosts=None):
+ self._free_hosts = set(hosts or self.get_option('ssh_hosts'))
+ self._allocated_hosts = set()
+ if not self._free_hosts:
+ raise ConfigError(f'no hosts specified for the SSH scheduler: '
+ f'{self._config_prefix}')
+
+ # Determine if rsync is available
+ try:
+ osext.run_command('rsync --version', check=True)
+ except (FileNotFoundError, SpawnedProcessError):
+ self._has_rsync = False
+ else:
+ self._has_rsync = True
+
+ def _reserve_host(self, host=None):
+ pool = self._free_hosts if self._free_hosts else self._allocated_hosts
+ if host:
+ pool.discard(host)
+ self._allocated_hosts.add(host)
+ return host
+
+ host = pool.pop()
+ self._allocated_hosts.add(host)
+ return host
+
+ def make_job(self, *args, **kwargs):
+ return _SSHJob(*args, **kwargs)
+
+ def emit_preamble(self, job):
+ return []
+
+ def _push_artefacts(self, job):
+ assert isinstance(job, _SSHJob)
+ options = ' '.join(job.ssh_options)
+
+ # Create a temporary directory on the remote host and push the job
+ # artifacts
+ completed = osext.run_command(
+ f'ssh -o BatchMode=yes {options} {job.host} '
+ f'mktemp -td rfm.XXXXXXXX', check=True
+ )
+ remotedir = completed.stdout.strip()
+
+ # Store the local and remote dirs
+ job._localdir = os.getcwd()
+ job._remotedir = remotedir
+
+ if self._has_rsync:
+ job.steps['push'] = osext.run_command_async2(
+ f'rsync -az -e "ssh -o BatchMode=yes {options}" '
+ f'{job.localdir}/ {job.host}:{remotedir}/', check=True
+ )
+ else:
+ job.steps['push'] = osext.run_command_async2(
+ f'scp -r -o BatchMode=yes {options} '
+ f'{job.localdir}/* {job.host}:{remotedir}/',
+ shell=True, check=True
+ )
+
+ def _pull_artefacts(self, job):
+ assert isinstance(job, _SSHJob)
+ options = ' '.join(job.ssh_options)
+ if self._has_rsync:
+ job.steps['pull'] = osext.run_command_async2(
+ f'rsync -az -e "ssh -o BatchMode=yes {options}" '
+ f'{job.host}:{job.remotedir}/ {job.localdir}/'
+ )
+ else:
+ job.steps['pull'] = osext.run_command_async2(
+ f"scp -r -o BatchMode=yes {options} "
+ f"'{job.host}:{job.remotedir}/*' {job.localdir}/", shell=True
+ )
+
+ def _do_submit(self, job):
+ # Modify the spawn command and submit
+ options = ' '.join(job.ssh_options)
+ job.steps['exec'] = osext.run_command_async2(
+ f'ssh -o BatchMode=yes {options} {job.host} '
+ f'"cd {job.remotedir} && bash -l {job.script_filename}"'
+ )
+
+ def submit(self, job):
+ assert isinstance(job, _SSHJob)
+
+ # Check if `#host` pseudo-option is specified and use this as a host,
+ # stripping it off the rest of the options
+ host = None
+ stripped_opts = []
+ options = job.sched_access + job.options + job.cli_options
+ for opt in options:
+ if opt.startswith('#host='):
+ _, host = opt.split('=', maxsplit=1)
+ else:
+ stripped_opts.append(opt)
+
+ # Host is pinned externally (`--distribute` option)
+ if job.pin_nodes:
+ host = job.pin_nodes[0]
+
+ job._submit_time = time.time()
+ job._ssh_options = stripped_opts
+ job._host = self._reserve_host(host)
+
+ self._push_artefacts(job)
+ self._do_submit(job)
+ self._pull_artefacts(job)
+
+ def success(proc):
+ return proc.exitcode == 0
+
+ job.steps['push'].then(
+ job.steps['exec'],
+ when=success
+ ).then(
+ job.steps['pull'],
+ when=success
+ )
+ job.steps['push'].start()
+ job._jobid = job.steps['push'].pid
+
+ def wait(self, job):
+ for step in job.steps.values():
+ if step.started():
+ step.wait()
+
+ def cancel(self, job):
+ for step in job.steps.values():
+ if step.started():
+ step.cancel()
+
+ def finished(self, job):
+ if job.exception:
+ raise job.exception
+
+ return job.state is not None
+
+ def poll(self, *jobs):
+ for job in jobs:
+ self._poll_job(job)
+
+ def _poll_job(self, job):
+ last_done = None
+ last_failed = None
+ for proc_kind, proc in job.steps.items():
+ if proc.started() and proc.done():
+ last_done = proc_kind
+ if proc.exitcode != 0:
+ last_failed = proc_kind
+ break
+
+ if last_failed is None and last_done != 'pull':
+ return False
+
+ # Either all processes were done or one failed
+ # Update the job info
+ last_proc = job.steps[last_done]
+ job._exitcode = last_proc.exitcode
+ job._exception = last_proc.exception()
+ job._signal = last_proc.signal
+ if job._exitcode == 0:
+ job._state = 'SUCCESS'
+ else:
+ job._state = 'FAILURE'
+
+ exec_proc = job.steps['exec']
+ if exec_proc.started():
+ with osext.change_dir(job.localdir):
+ with open(job.stdout, 'w+') as fout:
+ fout.write(exec_proc.stdout().read())
+
+ with open(job.stderr, 'w+') as ferr:
+ ferr.write(exec_proc.stderr().read())
+
+ return True
+
+ def allnodes(self):
+ return [AlwaysIdleNode(h) for h in self._free_hosts]
+
+ def filternodes(self, job, nodes):
+ options = job.sched_access + job.options + job.cli_options
+ for opt in options:
+ if opt.startswith('#host='):
+ _, host = opt.split('=', maxsplit=1)
+ return [AlwaysIdleNode(host)]
+ else:
+ return [AlwaysIdleNode(h) for h in self._free_hosts]
diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json
index c699062426..70926f87cb 100644
--- a/reframe/schemas/config.json
+++ b/reframe/schemas/config.json
@@ -109,6 +109,10 @@
"sched_options": {
"type": "object",
"properties": {
+ "hosts": {
+ "type": "array",
+ "items": {"type": "string"}
+ },
"ignore_reqnodenotavail": {"type": "boolean"},
"job_submit_timeout": {"type": "number"},
"resubmit_on_errors": {
@@ -615,6 +619,7 @@
"systems/partitions/time_limit": null,
"systems/partitions/devices": [],
"systems/partitions/extras": {},
+ "systems*/sched_options/ssh_hosts": [],
"systems*/sched_options/ignore_reqnodenotavail": false,
"systems*/sched_options/job_submit_timeout": 60,
"systems*/sched_options/resubmit_on_errors": [],
diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py
index efd22ee639..c6cd32f553 100644
--- a/reframe/utility/osext.py
+++ b/reframe/utility/osext.py
@@ -29,6 +29,228 @@
from . import OrderedSet
+class UnstartedProcError(ReframeError):
+ '''Raised when a process operation is attempted on a
+ not yet started process future'''
+
+
+class _ProcFuture:
+ '''A future encapsulating a command to be executed asynchronously.
+
+ Users may not create a :class:`_ProcFuture` directly, but should use
+ :func:`run_command_async2` instead.
+
+ :meta public:
+
+ .. versionadded:: 4.4
+ '''
+
+ def __init__(self, check=False, *args, **kwargs):
+ self._proc = None
+ self._exitcode = None
+ self._signal = None
+ self._check = check
+ self._cmd_args = (args, kwargs)
+ self._next = []
+ self._done_callbacks = []
+ self._completed = False
+ self._cancelled = False
+
+ def _check_started(self):
+ if not self.started():
+ raise UnstartedProcError
+
+ def start(self):
+ '''Start the future, i.e. spawn the encapsulated command.'''
+
+ args, kwargs = self._cmd_args
+ self._proc = run_command_async(*args, **kwargs)
+
+ if os.getsid(self._proc.pid) == self._proc.pid:
+ self._session = True
+ else:
+ self._session = False
+
+ return self
+
+ @property
+ def pid(self):
+ '''The PID of the spawned process.'''
+ return self._proc.pid
+
+ @property
+ def exitcode(self):
+ '''The exit code of the spawned process.'''
+ return self._exitcode
+
+ @property
+ def signal(self):
+ '''The signal number that caused the spawned process to exit.'''
+ return self._signal
+
+ def cancelled(self):
+ '''Returns :obj:`True` if the future was cancelled.'''
+ return self._cancelled
+
+ def is_session(self):
+ '''Returns :obj:`True` is the spawned process is a group or session
+ leader.'''
+ return self._session
+
+ def kill(self, signum):
+ '''Send signal ``signum`` to the spawned process.
+
+ If the process is a group or session leader, the signal will be sent
+ to the whole group or session.
+ '''
+
+ self._check_started()
+ kill_fn = os.killpg if self.is_session() else os.kill
+ kill_fn(self.pid, signum)
+ self._signal = signum
+
+ def terminate(self):
+ '''Terminate the spawned process by sending ``SIGTERM``.'''
+ self.kill(signal.SIGTERM)
+
+ def cancel(self):
+ '''Cancel the spawned process by sending ``SIGKILL``.'''
+ self._check_started()
+ if not self.cancelled():
+ self.kill(signal.SIGKILL)
+
+ self._cancelled = True
+
+ def add_done_callback(self, func):
+ '''Add a callback that will be called when this future is done.
+
+ The callback function will be called with the future as its sole
+ argument.
+ '''
+ if not util.is_trivially_callable(func, non_def_args=1):
+ raise ValueError('the callback function must '
+ 'accept a single argument')
+
+ self._done_callbacks.append(func)
+
+ def then(self, future, when=None):
+ '''Schedule another future for execution after this one.
+
+ :arg future: a :class:`_ProcFuture` to be started after this one
+ finishes.
+ :arg when: A callable that will be used as conditional for starting or
+ not the next future. It will be called with this future as its
+ sole argument and must return a boolean. If the return value is
+ true, then ``future`` will start execution, otherwise not.
+
+ If ``when`` is :obj:`None`, then the next future will be executed
+ unconditionally.
+ :returns: the passed ``future``, so that multiple :func:`then` calls
+ can be chained.
+ '''
+
+ if when is None:
+ def when(fut):
+ return True
+
+ if not util.is_trivially_callable(when, non_def_args=1):
+ raise ValueError("the 'when' function must "
+ "accept a single argument")
+
+ self._next.append((future, when))
+ return future
+
+ def started(self):
+ '''Check if this future has started.'''
+ return self._proc is not None
+
+ def _wait(self, *, nohang=False):
+ self._check_started()
+ if self._completed:
+ return True
+
+ options = os.WNOHANG if nohang else 0
+ try:
+ pid, status = os.waitpid(self.pid, options)
+ except OSError as e:
+ if e.errno == errno.ECHILD:
+ self._completed = True
+ return self._completed
+ else:
+ raise e
+
+ if nohang and not pid:
+ return False
+
+ if os.WIFEXITED(status):
+ self._exitcode = os.WEXITSTATUS(status)
+ elif os.WIFSIGNALED(status):
+ self._signal = os.WTERMSIG(status)
+
+ self._completed = True
+
+ # Call any done callbacks
+ for func in self._done_callbacks:
+ func(self)
+
+ # Start the next futures in the chain
+ for fut, cond in self._next:
+ if cond(self):
+ fut.start()
+
+ return self._completed
+
+ def done(self):
+ '''Check if the future has finished.
+
+ This is a non-blocking call.
+ '''
+ self._check_started()
+ return self._wait(nohang=True)
+
+ def wait(self):
+ '''Wait for this future to finish.'''
+ self._wait()
+
+ def exception(self):
+ '''Retrieve the exception raised by this future.
+
+ This is a blocking call and will wait until this future finishes.
+
+ The only exception that a :func:`_ProcFuture` can return is a
+ :class:`SpawnedProcessError` if the ``check`` flag was set in
+ :func:`run_command_async2`.
+ '''
+
+ self._wait()
+ if not self._check:
+ return
+
+ if self._proc.returncode == 0:
+ return
+
+ return SpawnedProcessError(self._proc.args,
+ self._proc.stdout.read(),
+ self._proc.stderr.read(),
+ self._proc.returncode)
+
+ def stdout(self):
+ '''Retrieve the standard output of the spawned process.
+
+ This is a blocking call and will wait until the future finishes.
+ '''
+ self._wait()
+ return self._proc.stdout
+
+ def stderr(self):
+ '''Retrieve the standard error of the spawned process.
+
+ This is a blocking call and will wait until the future finishes.
+ '''
+ self._wait()
+ return self._proc.stderr
+
+
def run_command(cmd, check=False, timeout=None, **kwargs):
'''Run command synchronously.
@@ -102,7 +324,7 @@ def run_command_async(cmd,
if log:
from reframe.core.logging import getlogger
- getlogger().debug2(f'[CMD] {cmd!r}')
+ getlogger().debug(f'[CMD] {cmd!r}')
if isinstance(cmd, str) and not shell:
cmd = shlex.split(cmd)
@@ -116,6 +338,26 @@ def run_command_async(cmd,
**popen_args)
+def run_command_async2(*args, check=False, **kwargs):
+ '''Return a :class:`_ProcFuture` that encapsulates a command to be
+ executed.
+
+ The command to be executed will not start until the returned future is
+ started.
+
+ :arg args: Any of the arguments that can be passed to
+ :func:`run_command_async`
+ :arg check: If true, flag the future with a :func:`SpawnedProcessError`
+ exception, upon failure.
+ :arg kwargs: Any of the keyword arguments that can be passed to
+ :func:`run_command_async`.
+
+ .. versionadded:: 4.4
+
+ '''
+ return _ProcFuture(check, *args, **kwargs)
+
+
def osuser():
'''Return the name of the current OS user.
diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py
index 797fecd2b2..c38914c972 100644
--- a/unittests/test_schedulers.py
+++ b/unittests/test_schedulers.py
@@ -26,8 +26,8 @@ def launcher():
return getlauncher('local')
-@pytest.fixture(params=['flux', 'local', 'lsf', 'oar',
- 'pbs', 'sge', 'slurm', 'squeue', 'torque'])
+@pytest.fixture(params=['flux', 'local', 'lsf', 'oar', 'pbs',
+ 'sge', 'slurm', 'ssh', 'squeue', 'torque'])
def scheduler(request):
try:
return getscheduler(request.param)
@@ -73,7 +73,13 @@ def exec_ctx(make_exec_ctx, scheduler):
@pytest.fixture
def make_job(scheduler, launcher, tmp_path):
def _make_job(sched_opts=None, **jobargs):
- sched = scheduler(**sched_opts) if sched_opts else scheduler()
+ if sched_opts:
+ sched = scheduler(**sched_opts)
+ elif scheduler.registered_name == 'ssh':
+ sched = scheduler(hosts=['localhost'])
+ else:
+ sched = scheduler()
+
return Job.create(
sched, launcher(),
name='testjob',
@@ -361,6 +367,18 @@ def _expected_local_directives_no_tasks(job):
return set()
+def _expected_ssh_directives(job):
+ return set()
+
+
+def _expected_ssh_directives_minimal(job):
+ return set()
+
+
+def _expected_ssh_directives_no_tasks(job):
+ return set()
+
+
def test_prepare(fake_job):
sched_name = fake_job.scheduler.registered_name
if sched_name == 'pbs':
@@ -649,6 +667,10 @@ def test_guess_num_tasks(minimal_job, scheduler):
# of the default partition through the use of `scontrol show`
minimal_job.scheduler._get_default_partition = lambda: 'pdef'
assert minimal_job.guess_num_tasks() == 0
+ elif scheduler.registered_name == 'ssh':
+ minimal_job.num_tasks = 0
+ minimal_job._sched_flex_alloc_nodes = 'all'
+ assert minimal_job.guess_num_tasks() == 1
else:
with pytest.raises(NotImplementedError):
minimal_job.guess_num_tasks()
diff --git a/unittests/test_utility.py b/unittests/test_utility.py
index b89236cea8..c3d33c6101 100644
--- a/unittests/test_utility.py
+++ b/unittests/test_utility.py
@@ -6,6 +6,7 @@
import os
import pytest
import random
+import signal
import sys
import time
@@ -84,6 +85,195 @@ def test_command_async():
assert t_sleep >= 1
+def test_command_futures():
+ proc = osext.run_command_async2('echo hello', shell=True)
+
+ # Check that some operations cannot be performed on an unstarted future
+ with pytest.raises(osext.UnstartedProcError):
+ proc.done()
+
+ with pytest.raises(osext.UnstartedProcError):
+ proc.cancel()
+
+ with pytest.raises(osext.UnstartedProcError):
+ proc.terminate()
+
+ with pytest.raises(osext.UnstartedProcError):
+ proc.wait()
+
+ assert not proc.started()
+ proc.start()
+ assert proc.started()
+ assert proc.pid is not None
+
+ # By default a process is not started as a new session
+ assert not proc.is_session()
+
+ # stdout must block
+ assert proc.stdout().read() == 'hello\n'
+ assert proc.exitcode == 0
+ assert proc.signal is None
+
+ # Additional wait() should have no effect
+ proc.wait()
+ proc.wait()
+
+ assert proc.done()
+ assert not proc.cancelled()
+ assert proc.exception() is None
+
+
+def test_command_futures_callbacks():
+ num_called = 0
+
+ def _callback(_):
+ nonlocal num_called
+ num_called += 1
+
+ proc = osext.run_command_async2("echo hello", shell=True)
+ proc.add_done_callback(_callback)
+ with pytest.raises(ValueError):
+ proc.add_done_callback(lambda: 1)
+
+ proc.start()
+ while not proc.done():
+ pass
+
+ # Call explicitly more times
+ proc.done()
+ proc.done()
+ assert num_called == 1
+
+
+@pytest.fixture(params=['checked', 'unchecked'])
+def _checked_cmd(request):
+ return request.param == 'checked'
+
+
+def test_command_futures_error(_checked_cmd):
+ proc = osext.run_command_async2("false", shell=True, check=_checked_cmd)
+ proc.start()
+
+ # exception() blocks until the process is finished
+ if _checked_cmd:
+ assert isinstance(proc.exception(), SpawnedProcessError)
+ else:
+ assert proc.exception() is None
+
+ assert proc.exitcode == 1
+ assert proc.signal is None
+
+
+@pytest.fixture(params=['SIGINT', 'SIGTERM', 'SIGKILL'])
+def _signal(request):
+ if request.param == 'SIGINT':
+ return signal.SIGINT
+ elif request.param == 'SIGTERM':
+ return signal.SIGTERM
+ elif request.param == 'SIGKILL':
+ return signal.SIGKILL
+
+ assert 0
+
+
+def test_command_futures_signal(_checked_cmd, _signal):
+ proc = osext.run_command_async2('sleep 3', shell=True, check=_checked_cmd)
+ proc.start()
+ if _signal == signal.SIGTERM:
+ proc.terminate()
+ elif _signal == signal.SIGKILL:
+ proc.cancel()
+ else:
+ proc.kill(_signal)
+
+ proc.wait()
+ assert proc.done()
+ if _signal == signal.SIGKILL:
+ assert proc.cancelled()
+ else:
+ assert not proc.cancelled()
+
+ assert proc.signal == _signal
+ assert proc.exitcode is None
+ if _checked_cmd:
+ assert isinstance(proc.exception(), SpawnedProcessError)
+ else:
+ assert proc.exception() is None
+
+
+def test_command_futures_chain(tmp_path):
+ with open(tmp_path / 'stdout.txt', 'w+') as fp:
+ proc0 = osext.run_command_async2('echo hello', shell=True, stdout=fp)
+ proc1 = osext.run_command_async2('sleep 1', shell=True, stdout=fp)
+ proc2 = osext.run_command_async2('sleep 1', shell=True, stdout=fp)
+ proc3 = osext.run_command_async2('echo world', shell=True, stdout=fp)
+ proc0.then(proc1)
+ proc0.then(proc2).then(proc3)
+ all_procs = [proc0, proc1, proc2, proc3]
+ t_start = time.time()
+ proc0.start()
+ while not all(p.done() for p in all_procs if p.started()):
+ pass
+
+ t_elapsed = time.time() - t_start
+ assert t_elapsed < 2
+ assert all(p.done() for p in all_procs)
+
+ with open(tmp_path / 'stdout.txt') as fp:
+ assert fp.read() == 'hello\nworld\n'
+
+
+@pytest.fixture(params=['fail_on_error', 'ignore_errors'])
+def _chain_policy(request):
+ return request.param
+
+
+def test_command_futures_chain_cond(_chain_policy, tmp_path):
+ if _chain_policy == 'fail_on_error':
+ def cond(proc):
+ return proc.exitcode == 0
+ else:
+ def cond(proc):
+ return True
+
+ with open(tmp_path / 'stdout.txt', 'w+') as fp:
+ proc0 = osext.run_command_async2("echo hello", shell=True, stdout=fp)
+ proc1 = osext.run_command_async2("false", shell=True)
+ proc2 = osext.run_command_async2("echo world", shell=True, stdout=fp)
+ proc0.then(proc1).then(proc2, when=cond)
+ with pytest.raises(ValueError):
+ proc0.then(proc1, when=lambda: False)
+
+ proc0.start()
+ proc0.wait()
+ proc1.wait()
+ if _chain_policy == 'fail_on_error':
+ assert not proc2.started()
+ else:
+ proc2.wait()
+
+ with open(tmp_path / 'stdout.txt') as fp:
+ if _chain_policy == 'fail_on_error':
+ assert fp.read() == 'hello\n'
+ else:
+ assert fp.read() == 'hello\nworld\n'
+
+
+def test_command_futures_chain_cancel():
+ proc0 = osext.run_command_async2('echo hello', shell=True)
+ proc1 = osext.run_command_async2('sleep 1', shell=True)
+ proc2 = osext.run_command_async2('echo world', shell=True)
+ proc0.then(proc1).then(proc2)
+ proc0.start()
+ while not proc0.done():
+ pass
+
+ assert proc1.started()
+ proc1.cancel()
+ assert proc1.cancelled()
+ assert not proc2.started()
+
+
def test_copytree(tmp_path):
dir_src = tmp_path / 'src'
dir_src.mkdir()