Skip to content
Browse files

Clients can now shutdown the controller.

  • Loading branch information...
1 parent 7289c9f commit ffd81f5788a50a42870256f8d51dd6911d753d5a @minrk minrk committed Nov 22, 2010
View
18 IPython/zmq/parallel/client.py
@@ -554,20 +554,32 @@ def abort(self, msg_ids = None, targets=None, block=None):
@spinfirst
@defaultblock
- def shutdown(self, targets=None, restart=False, block=None):
- """Terminates one or more engine processes."""
+ def shutdown(self, targets=None, restart=False, controller=False, block=None):
+ """Terminates one or more engine processes, optionally including the controller."""
+ if controller:
+ targets = 'all'
targets = self._build_targets(targets)[0]
for t in targets:
self.session.send(self._control_socket, 'shutdown_request',
content={'restart':restart},ident=t)
error = False
- if self.block:
+ if block or controller:
for i in range(len(targets)):
idents,msg = self.session.recv(self._control_socket,0)
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
error = ss.unwrap_exception(msg['content'])
+
+ if controller:
+ time.sleep(0.25)
+ self.session.send(self._query_socket, 'shutdown_request')
+ idents,msg = self.session.recv(self._query_socket, 0)
+ if self.debug:
+ pprint(msg)
+ if msg['content']['status'] != 'ok':
+ error = ss.unwrap_exception(msg['content'])
+
if error:
return error
View
24 IPython/zmq/parallel/controller.py
@@ -15,13 +15,15 @@
#-----------------------------------------------------------------------------
from __future__ import print_function
+import sys
import os
from datetime import datetime
import logging
+import time
+import uuid
import zmq
from zmq.eventloop import zmqstream, ioloop
-import uuid
# internal:
from IPython.zmq.log import logger # a Logger object
@@ -232,6 +234,7 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
'purge_request': self.purge_results,
'load_request': self.check_load,
'resubmit_request': self.resubmit_task,
+ 'shutdown_request': self.shutdown_request,
}
self.registrar_handlers = {'registration_request' : self.register_engine,
@@ -716,6 +719,24 @@ def _purge_stalled_registration(self, heart):
# Client Requests
#-------------------------------------------------------------------------
+ def shutdown_request(self, client_id, msg):
+ """handle shutdown request."""
+ # s = self.context.socket(zmq.XREQ)
+ # s.connect(self.client_connections['mux'])
+ # time.sleep(0.1)
+ # for eid,ec in self.engines.iteritems():
+ # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
+ # time.sleep(1)
+ self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
+ dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
+ dc.start()
+
+ def _shutdown(self):
+ logger.info("controller::controller shutting down.")
+ time.sleep(0.1)
+ sys.exit(0)
+
+
def check_load(self, client_id, msg):
content = msg['content']
try:
@@ -1018,6 +1039,7 @@ def main(argv=None):
signal_children(children)
con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
+ dc.start()
loop.start()
View
39 IPython/zmq/parallel/streamkernel.py
@@ -29,7 +29,7 @@
from IPython.zmq.log import logger # a Logger object
from streamsession import StreamSession, Message, extract_header, serialize_object,\
- unpack_apply_message, ISO8601
+ unpack_apply_message, ISO8601, wrap_exception
from dependency import UnmetDependency
import heartmonitor
from client import Client
@@ -53,6 +53,7 @@ class Kernel(HasTraits):
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
client = Instance(Client)
+ loop = Instance(ioloop.IOLoop)
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
@@ -127,15 +128,21 @@ def abort_request(self, stream, ident, parent):
def shutdown_request(self, stream, ident, parent):
"""kill ourself. This should really be handled in an external process"""
- self.abort_queues()
- content = dict(parent['content'])
+ try:
+ self.abort_queues()
+ except:
+ content = wrap_exception()
+ else:
+ content = dict(parent['content'])
+ content['status'] = 'ok'
msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
# msg = self.session.send(self.pub_socket, 'shutdown_reply',
# content, parent, ident)
# print >> sys.__stdout__, msg
- time.sleep(0.1)
- sys.exit(0)
+ # time.sleep(0.2)
+ dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
+ dc.start()
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
@@ -207,15 +214,7 @@ def execute_request(self, stream, ident, parent):
sys.displayhook.set_parent(parent)
exec comp_code in self.user_ns, self.user_ns
except:
- # result = u'error'
- etype, evalue, tb = sys.exc_info()
- tb = traceback.format_exception(etype, evalue, tb)
- exc_content = {
- u'status' : u'error',
- u'traceback' : tb,
- u'etype' : unicode(etype),
- u'evalue' : unicode(evalue)
- }
+ exc_content = wrap_exception()
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
reply_content = exc_content
@@ -292,15 +291,7 @@ def apply_request(self, stream, ident, parent):
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
- result = u'error'
- etype, evalue, tb = sys.exc_info()
- tb = traceback.format_exception(etype, evalue, tb)
- exc_content = {
- u'status' : u'error',
- u'traceback' : tb,
- u'etype' : unicode(etype),
- u'evalue' : unicode(evalue)
- }
+ exc_content = wrap_exception()
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
reply_content = exc_content
@@ -426,7 +417,7 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
kernel = Kernel(session=session, control_stream=control_stream,
shell_streams=shell_streams, iopub_stream=iopub_stream,
- client=client)
+ client=client, loop=loop)
kernel.start()
return loop, c, kernel
View
6 IPython/zmq/parallel/streamsession.py
@@ -65,9 +65,9 @@ def wrap_exception():
tb = traceback.format_exception(etype, evalue, tb)
exc_content = {
'status' : 'error',
- 'traceback' : str(tb),
- 'etype' : str(etype),
- 'evalue' : str(evalue)
+ 'traceback' : tb.encode('utf8'),
+ 'etype' : etype.encode('utf8'),
+ 'evalue' : evalue.encode('utf8')
}
return exc_content

0 comments on commit ffd81f5

Please sign in to comment.
Something went wrong with that request. Please try again.