Skip to content

Commit

Permalink
Merge pull request #512 from minrk/sshproxy
Browse files Browse the repository at this point in the history
Fixes and tests for SSHProxy
  • Loading branch information
minrk committed Jul 2, 2021
2 parents c429c0b + 2652468 commit f3777c6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 16 deletions.
35 changes: 29 additions & 6 deletions ipyparallel/cluster/app.py
Expand Up @@ -10,6 +10,7 @@
import re
import signal
import sys
from functools import partial

import zmq
from IPython.core.profiledir import ProfileDir
Expand Down Expand Up @@ -308,18 +309,39 @@ def init_cluster(self):

def init_signal(self):
# Setup signals
for signame in ("SIGUSR1", "SIGUSR2", "SIGINFO"):
try:
signum = getattr(signal, signame)
except AttributeError:
self.log.debug(f"Not forwarding {signame}")
pass
else:
self.log.debug(f"Forwarding {signame} to engines")
signal.signal(signum, self.relay_signal)
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigint_handler)

def relay_signal(self, signum, frame):
self.log.debug(f"Received signal {signum} received, relaying to engines...")
self.loop.add_callback_from_signal(partial(self.cluster.signal_engines, signum))

def sigint_handler(self, signum, frame):
return self.relay_signal(signum, frame)

def sigterm_handler(self, signum, frame):
self.log.debug(f"Received signal {signum} received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def engines_started_ok(self):
self.log.info("Engines appear to have started successfully")
self.early_shutdown = 0

async def start_engines(self):
try:
await self.cluster.start_engines(self.n)
await self.cluster.start_engines()
except:
self.log.exception("Engine start failed")
raise
self.exit(1)

if self.daemonize:
self.loop.add_callback(self.loop.stop)
Expand Down Expand Up @@ -374,10 +396,6 @@ async def stop_cluster(self, r=None):
await self.cluster.stop_cluster()
self.loop.add_callback(self.loop.stop)

def sigint_handler(self, signum, frame):
self.log.debug("SIGINT received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def start_logging(self):
# Remove old log files of the controller and engine
if self.clean_logs:
Expand Down Expand Up @@ -463,6 +481,11 @@ def controller_stopped(self, stop_data):
self.log.warning("Controller stopped. Shutting down.")
self.loop.add_callback(self.stop_cluster)

def sigint_handler(self, signum, frame):
"""Unlike engines, SIGINT shuts down `ipcluster start`"""
self.log.debug(f"Received signal {signum} received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def start(self):
"""Start the app for the start subcommand."""
# First see if the cluster is already running
Expand Down
34 changes: 32 additions & 2 deletions ipyparallel/cluster/launcher.py
Expand Up @@ -967,6 +967,26 @@ def start(self, hostname=None, user=None, port=None):
self.scp_args.append('-P')
self.scp_args.append(str(port))

# create remote profile dir
check_output(
self.ssh_cmd
+ self.ssh_args
+ [
self.location,
shlex_join(
[
self.remote_python,
"-m",
"IPython",
"profile",
"create",
"--profile-dir",
self.remote_profile_dir,
]
),
],
input=None,
)
self.send_files()
self.pid = sshx(
self.ssh_cmd + self.ssh_args + [self.location],
Expand Down Expand Up @@ -1192,7 +1212,17 @@ class SSHProxyEngineSetLauncher(SSHLauncher):
"""

n = Integer().tag(to_dict=True)
ipcluster_cmd = List(['ipcluster'], config=True)
ipcluster_cmd = List(Unicode(), config=True)

@default("ipcluster_cmd")
def _default_ipcluster_cmd(self):
return [self.remote_python, "-m", "ipyparallel.cluster"]

ipcluster_args = List(
Unicode(),
config=True,
help="""Extra CLI arguments to pass to ipcluster engines""",
)

@property
def program(self):
Expand All @@ -1207,7 +1237,7 @@ def program_args(self):
self.remote_profile_dir,
'--cluster-id',
self.cluster_id,
]
] + self.ipcluster_args

@default("to_send")
def _to_send_default(self):
Expand Down
22 changes: 14 additions & 8 deletions ipyparallel/tests/test_ssh.py
Expand Up @@ -12,23 +12,29 @@
# import tests that use engine_launcher_class fixture


@pytest.fixture
def ssh_config(ssh_key):
@pytest.fixture(params=["SSH", "SSHProxy"])
def ssh_config(ssh_key, request):
c = Config()
c.Cluster.controller_ip = '0.0.0.0'
c.Cluster.engine_launcher_class = 'SSH'
c.SSHEngineSetLauncher.scp_args = c.SSHLauncher.ssh_args = [
c.Cluster.engine_launcher_class = request.param
engine_set_cfg = c[f"{request.param}EngineSetLauncher"]
engine_set_cfg.ssh_args = [
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
"-i",
ssh_key,
]
engine_set_cfg.scp_args = list(engine_set_cfg.ssh_args) # copy
engine_set_cfg.remote_python = "/opt/conda/bin/python3"
engine_set_cfg.remote_profile_dir = "/home/ciuser/.ipython/profile_default"
engine_set_cfg.engine_args = ['--debug']
c.SSHProxyEngineSetLauncher.hostname = "127.0.0.1"
c.SSHProxyEngineSetLauncher.ssh_args.append("-p2222")
c.SSHProxyEngineSetLauncher.scp_args.append("-P2222")
c.SSHProxyEngineSetLauncher.user = "ciuser"
c.SSHEngineSetLauncher.engines = {"ciuser@127.0.0.1:2222": 4}
c.SSHEngineSetLauncher.remote_python = "/opt/conda/bin/python3"
c.SSHEngineSetLauncher.remote_profile_dir = "/home/ciuser/.ipython/profile_default"
c.SSHEngineSetLauncher.engine_args = ['--debug']
return c


Expand All @@ -41,4 +47,4 @@ def Cluster(ssh_config, BaseCluster): # noqa: F811
# override engine_launcher_class
@pytest.fixture
def engine_launcher_class(ssh_config):
return 'SSH'
return ssh_config.Cluster.engine_launcher_class

0 comments on commit f3777c6

Please sign in to comment.