diff --git a/ipyparallel/cluster/app.py b/ipyparallel/cluster/app.py index 8c7a52119..ab1ec2d15 100755 --- a/ipyparallel/cluster/app.py +++ b/ipyparallel/cluster/app.py @@ -10,6 +10,7 @@ import re import signal import sys +from functools import partial import zmq from IPython.core.profiledir import ProfileDir @@ -308,7 +309,28 @@ def init_cluster(self): def init_signal(self): # Setup signals + for signame in ("SIGUSR1", "SIGUSR2", "SIGINFO"): + try: + signum = getattr(signal, signame) + except AttributeError: + self.log.debug(f"Not forwarding {signame}") + pass + else: + self.log.debug(f"Forwarding {signame} to engines") + signal.signal(signum, self.relay_signal) signal.signal(signal.SIGINT, self.sigint_handler) + signal.signal(signal.SIGTERM, self.sigint_handler) + + def relay_signal(self, signum, frame): + self.log.debug(f"Received signal {signum} received, relaying to engines...") + self.loop.add_callback_from_signal(partial(self.cluster.signal_engines, signum)) + + def sigint_handler(self, signum, frame): + return self.relay_signal(signum, frame) + + def sigterm_handler(self, signum, frame): + self.log.debug(f"Received signal {signum} received, stopping launchers...") + self.loop.add_callback_from_signal(self.stop_cluster) def engines_started_ok(self): self.log.info("Engines appear to have started successfully") @@ -316,10 +338,10 @@ def engines_started_ok(self): async def start_engines(self): try: - await self.cluster.start_engines(self.n) + await self.cluster.start_engines() except: self.log.exception("Engine start failed") - raise + self.exit(1) if self.daemonize: self.loop.add_callback(self.loop.stop) @@ -374,10 +396,6 @@ async def stop_cluster(self, r=None): await self.cluster.stop_cluster() self.loop.add_callback(self.loop.stop) - def sigint_handler(self, signum, frame): - self.log.debug("SIGINT received, stopping launchers...") - self.loop.add_callback_from_signal(self.stop_cluster) - def start_logging(self): # Remove old log files of the controller and engine if self.clean_logs: @@ -463,6 +481,11 @@ def controller_stopped(self, stop_data): self.log.warning("Controller stopped. Shutting down.") self.loop.add_callback(self.stop_cluster) + def sigint_handler(self, signum, frame): + """Unlike engines, SIGINT shuts down `ipcluster start`""" + self.log.debug(f"Received signal {signum} received, stopping launchers...") + self.loop.add_callback_from_signal(self.stop_cluster) + def start(self): """Start the app for the start subcommand.""" # First see if the cluster is already running diff --git a/ipyparallel/cluster/launcher.py b/ipyparallel/cluster/launcher.py index fe04b49b4..aeaa38b12 100644 --- a/ipyparallel/cluster/launcher.py +++ b/ipyparallel/cluster/launcher.py @@ -967,6 +967,26 @@ def start(self, hostname=None, user=None, port=None): self.scp_args.append('-P') self.scp_args.append(str(port)) + # create remote profile dir + check_output( + self.ssh_cmd + + self.ssh_args + + [ + self.location, + shlex_join( + [ + self.remote_python, + "-m", + "IPython", + "profile", + "create", + "--profile-dir", + self.remote_profile_dir, + ] + ), + ], + input=None, + ) self.send_files() self.pid = sshx( self.ssh_cmd + self.ssh_args + [self.location], @@ -1192,7 +1212,17 @@ class SSHProxyEngineSetLauncher(SSHLauncher): """ n = Integer().tag(to_dict=True) - ipcluster_cmd = List(['ipcluster'], config=True) + ipcluster_cmd = List(Unicode(), config=True) + + @default("ipcluster_cmd") + def _default_ipcluster_cmd(self): + return [self.remote_python, "-m", "ipyparallel.cluster"] + + ipcluster_args = List( + Unicode(), + config=True, + help="""Extra CLI arguments to pass to ipcluster engines""", + ) @property def program(self): @@ -1207,7 +1237,7 @@ def program_args(self): self.remote_profile_dir, '--cluster-id', self.cluster_id, - ] + ] + self.ipcluster_args @default("to_send") def _to_send_default(self): diff --git a/ipyparallel/tests/test_ssh.py b/ipyparallel/tests/test_ssh.py index 216fdb724..850c94fb0 100644 --- a/ipyparallel/tests/test_ssh.py +++ b/ipyparallel/tests/test_ssh.py @@ -12,12 +12,13 @@ # import tests that use engine_launcher_class fixture -@pytest.fixture -def ssh_config(ssh_key): +@pytest.fixture(params=["SSH", "SSHProxy"]) +def ssh_config(ssh_key, request): c = Config() c.Cluster.controller_ip = '0.0.0.0' - c.Cluster.engine_launcher_class = 'SSH' - c.SSHEngineSetLauncher.scp_args = c.SSHLauncher.ssh_args = [ + c.Cluster.engine_launcher_class = request.param + engine_set_cfg = c[f"{request.param}EngineSetLauncher"] + engine_set_cfg.ssh_args = [ "-o", "UserKnownHostsFile=/dev/null", "-o", @@ -25,10 +26,15 @@ def ssh_config(ssh_key): "-i", ssh_key, ] + engine_set_cfg.scp_args = list(engine_set_cfg.ssh_args) # copy + engine_set_cfg.remote_python = "/opt/conda/bin/python3" + engine_set_cfg.remote_profile_dir = "/home/ciuser/.ipython/profile_default" + engine_set_cfg.engine_args = ['--debug'] + c.SSHProxyEngineSetLauncher.hostname = "127.0.0.1" + c.SSHProxyEngineSetLauncher.ssh_args.append("-p2222") + c.SSHProxyEngineSetLauncher.scp_args.append("-P2222") + c.SSHProxyEngineSetLauncher.user = "ciuser" c.SSHEngineSetLauncher.engines = {"ciuser@127.0.0.1:2222": 4} - c.SSHEngineSetLauncher.remote_python = "/opt/conda/bin/python3" - c.SSHEngineSetLauncher.remote_profile_dir = "/home/ciuser/.ipython/profile_default" - c.SSHEngineSetLauncher.engine_args = ['--debug'] return c @@ -41,4 +47,4 @@ def Cluster(ssh_config, BaseCluster): # noqa: F811 # override engine_launcher_class @pytest.fixture def engine_launcher_class(ssh_config): - return 'SSH' + return ssh_config.Cluster.engine_launcher_class