Skip to content
Browse files

use wrap_exception in controller, fix clear on kernel

  • Loading branch information...
1 parent fe4b48c commit 89abd30156590f224659e38df7d723738baa84e8 @minrk minrk committed
Showing with 55 additions and 20 deletions.
  1. +44 −16 IPython/zmq/parallel/controller.py
  2. +11 −4 IPython/zmq/parallel/streamkernel.py
View
60 IPython/zmq/parallel/controller.py
@@ -452,7 +452,7 @@ def save_task_request(self, idents, msg):
def save_task_result(self, idents, msg):
"""save the result of a completed task."""
- client_id, engine_uuid = idents[:2]
+ client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
@@ -461,19 +461,24 @@ def save_task_result(self, idents, msg):
return
parent = msg['parent_header']
- eid = self.by_ident[engine_uuid]
if not parent:
# print msg
- # logger.warn("")
+ logger.warn("Task %r had no parent!"%msg)
return
msg_id = parent['msg_id']
self.results[msg_id] = msg
- if msg_id in self.pending and msg_id in self.tasks[eid]:
+
+ header = msg['header']
+ engine_uuid = header.get('engine', None)
+ eid = self.by_ident.get(engine_uuid, None)
+
+ if msg_id in self.pending:
self.pending.pop(msg_id)
if msg_id in self.mia:
self.mia.remove(msg_id)
- self.completed[eid].append(msg_id)
- self.tasks[eid].remove(msg_id)
+ if eid is not None and msg_id in self.tasks[eid]:
+ self.completed[eid].append(msg_id)
+ self.tasks[eid].remove(msg_id)
else:
logger.debug("task::unknown task %s finished"%msg_id)
@@ -539,16 +544,28 @@ def register_engine(self, reg, msg):
content.update(self.engine_addrs)
# check if requesting available IDs:
if queue in self.by_ident:
- content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
+ try:
+ raise KeyError("queue_id %r in use"%queue)
+ except:
+ content = wrap_exception()
elif heart in self.hearts: # need to check unique hearts?
- content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
+ try:
+ raise KeyError("heart_id %r in use"%heart)
+ except:
+ content = wrap_exception()
else:
for h, pack in self.incoming_registrations.iteritems():
if heart == h:
- content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
+ try:
+ raise KeyError("heart_id %r in use"%heart)
+ except:
+ content = wrap_exception()
break
elif queue == pack[1]:
- content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
+ try:
+ raise KeyError("queue_id %r in use"%queue)
+ except:
+ content = wrap_exception()
break
msg = self.session.send(self.registrar, "registration_reply",
@@ -566,7 +583,7 @@ def register_engine(self, reg, msg):
dc.start()
self.incoming_registrations[heart] = (eid,queue,reg,dc)
else:
- logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
+ logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
return eid
def unregister_engine(self, ident, msg):
@@ -688,14 +705,23 @@ def purge_results(self, client_id, msg):
self.results.pop(msg_id)
else:
if msg_id in self.pending:
- reply = dict(status='error', reason="msg pending: %r"%msg_id)
+ try:
+ raise IndexError("msg pending: %r"%msg_id)
+ except:
+ reply = wrap_exception()
else:
- reply = dict(status='error', reason="No such msg: %r"%msg_id)
+ try:
+ raise IndexError("No such msg: %r"%msg_id)
+ except:
+ reply = wrap_exception()
break
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
- reply = dict(status='error', reason="No such engine: %i"%eid)
+ try:
+ raise IndexError("No such engine: %i"%eid)
+ except:
+ reply = wrap_exception()
break
msg_ids = self.completed.pop(eid)
for msg_id in msg_ids:
@@ -725,8 +751,10 @@ def get_results(self, client_id, msg):
if not statusonly:
content[msg_id] = self.results[msg_id]['content']
else:
- content = dict(status='error')
- content['reason'] = 'no such message: '+msg_id
+ try:
+ raise KeyError('No such message: '+msg_id)
+ except:
+ content = wrap_exception()
break
self.session.send(self.clientele, "result_reply", content=content,
parent=msg, ident=client_id)
View
15 IPython/zmq/parallel/streamkernel.py
@@ -143,6 +143,7 @@ def __init__(self, session, control_stream, reply_stream, pub_stream,
self.control_stream = control_stream
# self.control_socket = control_stream.socket
self.reply_stream = reply_stream
+ self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
self.task_stream = task_stream
self.pub_stream = pub_stream
self.client = client
@@ -158,7 +159,7 @@ def __init__(self, session, control_stream, reply_stream, pub_stream,
for msg_type in ['execute_request', 'complete_request', 'apply_request']:
self.queue_handlers[msg_type] = getattr(self, msg_type)
- for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
+ for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
self.control_handlers[msg_type] = getattr(self, msg_type)
#-------------------- control handlers -----------------------------
@@ -214,7 +215,7 @@ def abort_request(self, stream, ident, parent):
print(Message(reply_msg), file=sys.__stdout__)
def kill_request(self, stream, idents, parent):
- """kill ourselves. This should really be handled in an external process"""
+ """kill ourself. This should really be handled in an external process"""
self.abort_queues()
msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
content = dict(status='ok'))
@@ -225,6 +226,12 @@ def kill_request(self, stream, idents, parent):
time.sleep(1)
os.kill(os.getpid(), SIGKILL)
+ def clear_request(self, stream, idents, parent):
+ """Clear our namespace."""
+ self.user_ns = {}
+ msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
+ content = dict(status='ok'))
+
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
msg = self.session.unpack_message(msg, content=True, copy=False)
@@ -330,7 +337,7 @@ def apply_request(self, stream, ident, parent):
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.pub_stream.send(pyin_msg)
# self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
- sub = {'dependencies_met' : True}
+ sub = {'dependencies_met' : True, 'engine' : self.identity}
try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
@@ -382,7 +389,7 @@ def apply_request(self, stream, ident, parent):
result_buf = []
if etype is UnmetDependency:
- sub = {'dependencies_met' : False}
+ sub['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent)

0 comments on commit 89abd30

Please sign in to comment.
Something went wrong with that request. Please try again.