Skip to content

Commit

Permalink
allow aborting tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed May 13, 2012
1 parent 1901b79 commit 1587a7f
Showing 1 changed file with 35 additions and 42 deletions.
77 changes: 35 additions & 42 deletions IPython/zmq/ipkernel.py
Expand Up @@ -42,7 +42,7 @@
from IPython.utils.frame import extract_module_locals
from IPython.utils.jsonutil import json_clean
from IPython.utils.traitlets import (
Any, Instance, Float, Dict, CaselessStrEnum, List
Any, Instance, Float, Dict, CaselessStrEnum, List, Set
)

from entry_point import base_launch_kernel
Expand Down Expand Up @@ -84,7 +84,7 @@ def _user_ns_changed(self, name, old, new):
self.shell.init_user_ns()

# Private interface

# Time to sleep after flushing the stdout/err buffers in each execute
# cycle. While this introduces a hard limit on the minimal latency of the
# execute cycle, it helps prevent output synchronization problems for
Expand All @@ -109,6 +109,9 @@ def _user_ns_changed(self, name, old, new):
# This is a dict of port number that the kernel is listening on. It is set
# by record_ports and used by connect_request.
_recorded_ports = Dict()

# set of aborted msg_ids
aborted = Set()



Expand Down Expand Up @@ -179,6 +182,7 @@ def do_one_iteration(self):

def dispatch_message(self, socket, idents, msg, handlers):
msg_type = msg['header']['msg_type']
msg_id = msg['header']['msg_id']

# This assert will raise in versions of zeromq 2.0.7 and lesser.
# We now require 2.0.8 or above, so we can uncomment for safety.
Expand All @@ -191,6 +195,17 @@ def dispatch_message(self, socket, idents, msg, handlers):
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
self.log.debug(' Content: %s\n --->\n ', msg['content'])

# check if request has been aborted
if msg_id in self.aborted:
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
reply_msg = self.session.send(socket, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
return


# Find and call actual handler for message
handler = handlers.get(msg_type, None)
if handler is None:
Expand Down Expand Up @@ -390,7 +405,7 @@ def execute_request(self, socket, ident, parent):
self.log.debug("%s", reply_msg)

if reply_msg['content']['status'] == u'error':
self._abort_queue()
self._abort_queues()

self.session.send(self.iopub_socket,
u'status',
Expand Down Expand Up @@ -548,31 +563,6 @@ def apply_request(self, socket, ident, parent):
# Control messages
#---------------------------------------------------------------------------

def abort_queues(self):
for socket in self.shell_sockets:
if socket:
self.abort_queue(socket)

def abort_queue(self, socket):
while True:
idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
if msg is None:
return

self.log.info("Aborting:")
self.log.info("%s", msg)
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(socket, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
self.log.debug("%s", reply_msg)
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)

def abort_request(self, socket, ident, parent):
"""abort a specifig msg by id"""
msg_ids = parent['content'].get('msg_ids', None)
Expand Down Expand Up @@ -600,28 +590,31 @@ def clear_request(self, socket, idents, parent):
# Protected interface
#---------------------------------------------------------------------------

def _abort_queue(self):
def _abort_queues(self):
for socket in self.shell_sockets:
if socket:
self._abort_queue(socket)

def _abort_queue(self, socket):
while True:
try:
ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
except Exception:
self.log.warn("Invalid Message:", exc_info=True)
continue
idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
if msg is None:
break
else:
assert ident is not None, \
"Unexpected missing message part."
return

self.log.debug("Aborting:\n%s", msg)
self.log.info("Aborting:")
self.log.info("%s", msg)
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
reply_msg = self.session.send(self.shell_socket, reply_type,
{'status' : 'aborted'}, msg, ident=ident)
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(socket, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
self.log.debug("%s", reply_msg)
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.1)
time.sleep(0.05)


def _no_raw_input(self):
"""Raise StdinNotImplentedError if active frontend doesn't support
Expand Down

0 comments on commit 1587a7f

Please sign in to comment.