Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and tests for SSHProxy #512

Merged
merged 2 commits into from Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 29 additions & 6 deletions ipyparallel/cluster/app.py
Expand Up @@ -10,6 +10,7 @@
import re
import signal
import sys
from functools import partial

import zmq
from IPython.core.profiledir import ProfileDir
Expand Down Expand Up @@ -308,18 +309,39 @@ def init_cluster(self):

def init_signal(self):
# Setup signals
for signame in ("SIGUSR1", "SIGUSR2", "SIGINFO"):
try:
signum = getattr(signal, signame)
except AttributeError:
self.log.debug(f"Not forwarding {signame}")
pass
else:
self.log.debug(f"Forwarding {signame} to engines")
signal.signal(signum, self.relay_signal)
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigint_handler)

def relay_signal(self, signum, frame):
self.log.debug(f"Received signal {signum} received, relaying to engines...")
self.loop.add_callback_from_signal(partial(self.cluster.signal_engines, signum))

def sigint_handler(self, signum, frame):
return self.relay_signal(signum, frame)

def sigterm_handler(self, signum, frame):
self.log.debug(f"Received signal {signum} received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def engines_started_ok(self):
self.log.info("Engines appear to have started successfully")
self.early_shutdown = 0

async def start_engines(self):
try:
await self.cluster.start_engines(self.n)
await self.cluster.start_engines()
except:
self.log.exception("Engine start failed")
raise
self.exit(1)

if self.daemonize:
self.loop.add_callback(self.loop.stop)
Expand Down Expand Up @@ -374,10 +396,6 @@ async def stop_cluster(self, r=None):
await self.cluster.stop_cluster()
self.loop.add_callback(self.loop.stop)

def sigint_handler(self, signum, frame):
self.log.debug("SIGINT received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def start_logging(self):
# Remove old log files of the controller and engine
if self.clean_logs:
Expand Down Expand Up @@ -463,6 +481,11 @@ def controller_stopped(self, stop_data):
self.log.warning("Controller stopped. Shutting down.")
self.loop.add_callback(self.stop_cluster)

def sigint_handler(self, signum, frame):
"""Unlike engines, SIGINT shuts down `ipcluster start`"""
self.log.debug(f"Received signal {signum} received, stopping launchers...")
self.loop.add_callback_from_signal(self.stop_cluster)

def start(self):
"""Start the app for the start subcommand."""
# First see if the cluster is already running
Expand Down
34 changes: 32 additions & 2 deletions ipyparallel/cluster/launcher.py
Expand Up @@ -967,6 +967,26 @@ def start(self, hostname=None, user=None, port=None):
self.scp_args.append('-P')
self.scp_args.append(str(port))

# create remote profile dir
check_output(
self.ssh_cmd
+ self.ssh_args
+ [
self.location,
shlex_join(
[
self.remote_python,
"-m",
"IPython",
"profile",
"create",
"--profile-dir",
self.remote_profile_dir,
]
),
],
input=None,
)
self.send_files()
self.pid = sshx(
self.ssh_cmd + self.ssh_args + [self.location],
Expand Down Expand Up @@ -1192,7 +1212,17 @@ class SSHProxyEngineSetLauncher(SSHLauncher):
"""

n = Integer().tag(to_dict=True)
ipcluster_cmd = List(['ipcluster'], config=True)
ipcluster_cmd = List(Unicode(), config=True)

@default("ipcluster_cmd")
def _default_ipcluster_cmd(self):
return [self.remote_python, "-m", "ipyparallel.cluster"]

ipcluster_args = List(
Unicode(),
config=True,
help="""Extra CLI arguments to pass to ipcluster engines""",
)

@property
def program(self):
Expand All @@ -1207,7 +1237,7 @@ def program_args(self):
self.remote_profile_dir,
'--cluster-id',
self.cluster_id,
]
] + self.ipcluster_args

@default("to_send")
def _to_send_default(self):
Expand Down
22 changes: 14 additions & 8 deletions ipyparallel/tests/test_ssh.py
Expand Up @@ -12,23 +12,29 @@
# import tests that use engine_launcher_class fixture


@pytest.fixture
def ssh_config(ssh_key):
@pytest.fixture(params=["SSH", "SSHProxy"])
def ssh_config(ssh_key, request):
c = Config()
c.Cluster.controller_ip = '0.0.0.0'
c.Cluster.engine_launcher_class = 'SSH'
c.SSHEngineSetLauncher.scp_args = c.SSHLauncher.ssh_args = [
c.Cluster.engine_launcher_class = request.param
engine_set_cfg = c[f"{request.param}EngineSetLauncher"]
engine_set_cfg.ssh_args = [
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
"-i",
ssh_key,
]
engine_set_cfg.scp_args = list(engine_set_cfg.ssh_args) # copy
engine_set_cfg.remote_python = "/opt/conda/bin/python3"
engine_set_cfg.remote_profile_dir = "/home/ciuser/.ipython/profile_default"
engine_set_cfg.engine_args = ['--debug']
c.SSHProxyEngineSetLauncher.hostname = "127.0.0.1"
c.SSHProxyEngineSetLauncher.ssh_args.append("-p2222")
c.SSHProxyEngineSetLauncher.scp_args.append("-P2222")
c.SSHProxyEngineSetLauncher.user = "ciuser"
c.SSHEngineSetLauncher.engines = {"ciuser@127.0.0.1:2222": 4}
c.SSHEngineSetLauncher.remote_python = "/opt/conda/bin/python3"
c.SSHEngineSetLauncher.remote_profile_dir = "/home/ciuser/.ipython/profile_default"
c.SSHEngineSetLauncher.engine_args = ['--debug']
return c


Expand All @@ -41,4 +47,4 @@ def Cluster(ssh_config, BaseCluster): # noqa: F811
# override engine_launcher_class
@pytest.fixture
def engine_launcher_class(ssh_config):
return 'SSH'
return ssh_config.Cluster.engine_launcher_class