Skip to content

Commit

Permalink
Merge pull request ipython#1372 from minrk/reuse-cleanup
Browse files Browse the repository at this point in the history
ipcontroller cleans up connection files unless reuse=True

Connection files are not valid across sessions if reuse is False, but were previously preserved, which could cause engines to try to connect to the wrong ports if the Controller took too long to start and/or write new connection files.

Also shuffles signal handling around a bit, so that a clean exit occurs, rather than calling sys.exit directly in the signal handler.
  • Loading branch information
minrk committed Feb 6, 2012
2 parents 44d2863 + 89bd3d5 commit 9ec1bad
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 26 deletions.
84 changes: 59 additions & 25 deletions IPython/parallel/apps/ipcontrollerapp.py
Expand Up @@ -30,6 +30,7 @@
import sys

from multiprocessing import Process
from signal import signal, SIGINT, SIGABRT, SIGTERM

import zmq
from zmq.devices import ProcessMonitoredQueue
Expand All @@ -55,7 +56,7 @@
from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB

from IPython.parallel.util import signal_children, split_url, disambiguate_url
from IPython.parallel.util import split_url, disambiguate_url

# conditional import of MongoDB backend class

Expand Down Expand Up @@ -151,7 +152,9 @@ class IPControllerApp(BaseParallelApplication):
help="""Whether to create profile dir if it doesn't exist.""")

reuse_files = Bool(False, config=True,
help='Whether to reuse existing json connection files.'
help="""Whether to reuse existing json connection files.
If False, connection files will be removed on a clean exit.
"""
)
ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
Expand Down Expand Up @@ -192,6 +195,12 @@ def _cluster_id_changed(self, name, old, new):

def _use_threads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')

write_connection_files = Bool(True,
help="""Whether to write connection files to disk.
True in all cases other than runs with `reuse_files=True` *after the first*
"""
)

aliases = Dict(aliases)
flags = Dict(flags)
Expand Down Expand Up @@ -256,14 +265,32 @@ def load_config_from_json(self):
self.ssh_server = cfg['ssh']
assert int(ports) == c.HubFactory.regport, "regport mismatch"

def cleanup_connection_files(self):
if self.reuse_files:
self.log.debug("leaving JSON connection files for reuse")
return
self.log.debug("cleaning up JSON connection files")
for f in (self.client_json_file, self.engine_json_file):
f = os.path.join(self.profile_dir.security_dir, f)
try:
os.remove(f)
except Exception as e:
self.log.error("Failed to cleanup connection file: %s", e)
else:
self.log.debug(u"removed %s", f)

def load_secondary_config(self):
"""secondary config, loading from JSON and setting defaults"""
if self.reuse_files:
try:
self.load_config_from_json()
except (AssertionError,IOError) as e:
self.log.error("Could not load config from JSON: %s" % e)
self.reuse_files=False
else:
# successfully loaded config from JSON, and reuse=True
# no need to wite back the same file
self.write_connection_files = False

# switch Session.key default to secure
default_secure(self.config)
self.log.debug("Config changed")
Expand All @@ -284,7 +311,7 @@ def init_hub(self):
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)

if not self.reuse_files:
if self.write_connection_files:
# save to new json config files
f = self.factory
cdict = {'exec_key' : f.session.key.decode('ascii'),
Expand All @@ -298,7 +325,6 @@ def init_hub(self):
edict['ssh'] = self.engine_ssh_server
self.save_connection_dict(self.engine_json_file, edict)

#
def init_schedulers(self):
children = self.children
mq = import_item(str(self.mq_class))
Expand Down Expand Up @@ -367,21 +393,31 @@ def init_schedulers(self):
kwargs['in_thread'] = True
launch_scheduler(*sargs, **kwargs)

def terminate_children(self):
child_procs = []
for child in self.children:
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
self.log.critical("terminating children...")
for child in child_procs:
try:
child.terminate()
except OSError:
# already dead
pass

def save_urls(self):
"""save the registration urls to files."""
c = self.config

sec_dir = self.profile_dir.security_dir
cf = self.factory

with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))

with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))

def handle_signal(self, sig, frame):
self.log.critical("Received signal %i, shutting down", sig)
self.terminate_children()
self.loop.stop()

def init_signal(self):
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, self.handle_signal)

def do_import_statements(self):
statements = self.import_statements
for s in statements:
Expand Down Expand Up @@ -415,22 +451,20 @@ def initialize(self, argv=None):
def start(self):
# Start the subprocesses:
self.factory.start()
child_procs = []
# children must be started before signals are setup,
# otherwise signal-handling will fire multiple times
for child in self.children:
child.start()
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
self.init_signal()

self.write_pid_file(overwrite=True)

try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
finally:
self.cleanup_connection_files()



Expand Down
2 changes: 1 addition & 1 deletion IPython/parallel/controller/scheduler.py
Expand Up @@ -722,5 +722,5 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
try:
loop.start()
except KeyboardInterrupt:
print ("interrupted, exiting...", file=sys.__stderr__)
scheduler.log.critical("Interrupted, exiting...")

0 comments on commit 9ec1bad

Please sign in to comment.