In-process kernel support (take 2) #2397

Closed
wants to merge 14 commits into
from
+1,384 −397
Split
@@ -0,0 +1,260 @@
+""" Defines a KernelManager that provides signals and slots.
+"""
+
+# System library imports.
+from IPython.external.qt import QtCore
+
+# IPython imports.
+from IPython.utils.traitlets import HasTraits, Type
+from util import MetaQObjectHasTraits, SuperQObject
+
+
+class ChannelQObject(SuperQObject):
+
+ # Emitted when the channel is started.
+ started = QtCore.Signal()
+
+ # Emitted when the channel is stopped.
+ stopped = QtCore.Signal()
+
+ #---------------------------------------------------------------------------
+ # Channel interface
+ #---------------------------------------------------------------------------
+
+ def start(self):
+ """ Reimplemented to emit signal.
+ """
+ super(ChannelQObject, self).start()
+ self.started.emit()
+
+ def stop(self):
+ """ Reimplemented to emit signal.
+ """
+ super(ChannelQObject, self).stop()
+ self.stopped.emit()
+
+ #---------------------------------------------------------------------------
+ # InProcessChannel interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers_later(self, *args, **kwds):
+ """ Call the message handlers later.
+ """
+ do_later = lambda: self.call_handlers(*args, **kwds)
+ QtCore.QTimer.singleShot(0, do_later)
+
+ def process_events(self):
+ """ Process any pending GUI events.
+ """
+ QtCore.QCoreApplication.instance().processEvents()
+
+
+class QtShellChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when a reply has been received for the corresponding request
+ # type.
+ execute_reply = QtCore.Signal(object)
+ complete_reply = QtCore.Signal(object)
+ object_info_reply = QtCore.Signal(object)
+ history_reply = QtCore.Signal(object)
+
+ # Emitted when the first reply comes back.
+ first_reply = QtCore.Signal()
+
+ # Used by the first_reply signal logic to determine if a reply is the
+ # first.
+ _handlers_called = False
+
+ #---------------------------------------------------------------------------
+ # 'ShellSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ signal = getattr(self, msg_type, None)
+ if signal:
+ signal.emit(msg)
+
+ if not self._handlers_called:
+ self.first_reply.emit()
+ self._handlers_called = True
+
+ #---------------------------------------------------------------------------
+ # 'QtShellChannelMixin' interface
+ #---------------------------------------------------------------------------
+
+ def reset_first_reply(self):
+ """ Reset the first_reply signal to fire again on the next reply.
+ """
+ self._handlers_called = False
+
+
+class QtSubChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'stream' is received.
+ stream_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyin' is received.
+ pyin_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyout' is received.
+ pyout_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyerr' is received.
+ pyerr_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'display_data' is received
+ display_data_received = QtCore.Signal(object)
+
+ # Emitted when a crash report message is received from the kernel's
+ # last-resort sys.excepthook.
+ crash_received = QtCore.Signal(object)
+
+ # Emitted when a shutdown is noticed.
+ shutdown_reply_received = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'SubSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ signal = getattr(self, msg_type + '_received', None)
+ if signal:
+ signal.emit(msg)
+ elif msg_type in ('stdout', 'stderr'):
+ self.stream_received.emit(msg)
+
+ def flush(self):
+ """ Reimplemented to ensure that signals are dispatched immediately.
+ """
+ super(QtSubChannelMixin, self).flush()
+ QtCore.QCoreApplication.instance().processEvents()
+
+
+class QtStdInChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when an input request is received.
+ input_requested = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'StdInSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ if msg_type == 'input_request':
+ self.input_requested.emit(msg)
+
+
+class QtHBChannelMixin(ChannelQObject):
+
+ # Emitted when the kernel has died.
+ kernel_died = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'HBSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, since_last_heartbeat):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.kernel_died.emit(since_last_heartbeat)
+
+
+class QtKernelManagerMixin(HasTraits, SuperQObject):
+ """ A KernelManager that provides signals and slots.
+ """
+
+ __metaclass__ = MetaQObjectHasTraits
+
+ # Emitted when the kernel manager has started listening.
+ started_kernel = QtCore.Signal()
+
+ # Emitted when the kernel manager has started listening.
+ started_channels = QtCore.Signal()
+
+ # Emitted when the kernel manager has stopped listening.
+ stopped_channels = QtCore.Signal()
+
+ # Use Qt-specific channel classes that emit signals.
+ sub_channel_class = Type(QtSubChannelMixin)
+ shell_channel_class = Type(QtShellChannelMixin)
+ stdin_channel_class = Type(QtStdInChannelMixin)
+ hb_channel_class = Type(QtHBChannelMixin)
+
+ #---------------------------------------------------------------------------
+ # 'KernelManager' interface
+ #---------------------------------------------------------------------------
+
+ #------ Kernel process management ------------------------------------------
+
+ def start_kernel(self, *args, **kw):
+ """ Reimplemented for proper heartbeat management.
+ """
+ if self._shell_channel is not None:
+ self._shell_channel.reset_first_reply()
+ super(QtKernelManagerMixin, self).start_kernel(*args, **kw)
+ self.started_kernel.emit()
+
+ #------ Channel management -------------------------------------------------
+
+ def start_channels(self, *args, **kw):
+ """ Reimplemented to emit signal.
+ """
+ super(QtKernelManagerMixin, self).start_channels(*args, **kw)
+ self.started_channels.emit()
+
+ def stop_channels(self):
+ """ Reimplemented to emit signal.
+ """
+ super(QtKernelManagerMixin, self).stop_channels()
+ self.stopped_channels.emit()
+
+ @property
+ def shell_channel(self):
+ """ Reimplemented for proper heartbeat management.
+ """
+ if self._shell_channel is None:
+ self._shell_channel = super(QtKernelManagerMixin,self).shell_channel
+ self._shell_channel.first_reply.connect(self._first_reply)
+ return self._shell_channel
+
+ #---------------------------------------------------------------------------
+ # Protected interface
+ #---------------------------------------------------------------------------
+
+ def _first_reply(self):
+ """ Unpauses the heartbeat channel when the first reply is received on
+ the execute channel. Note that this will *not* start the heartbeat
+ channel if it is not already running!
+ """
+ if self._hb_channel is not None:
+ self._hb_channel.unpause()
@@ -0,0 +1,33 @@
+""" Defines an in-process KernelManager with signals and slots.
+"""
+
+# Local imports.
+from IPython.inprocess.kernelmanager import \
+ ShellInProcessChannel, SubInProcessChannel, StdInInProcessChannel, \
+ HBInProcessChannel, InProcessKernelManager
+from IPython.utils.traitlets import Type
+from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
+ QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
+
+
+class QtShellInProcessChannel(QtShellChannelMixin, ShellInProcessChannel):
+ pass
+
+class QtSubInProcessChannel(QtSubChannelMixin, SubInProcessChannel):
+ pass
+
+class QtStdInInProcessChannel(QtStdInChannelMixin, StdInInProcessChannel):
+ pass
+
+class QtHBInProcessChannel(QtHBChannelMixin, HBInProcessChannel):
+ pass
+
+
+class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
+ """ An in-process KernelManager with signals and slots.
+ """
+
+ sub_channel_class = Type(QtSubInProcessChannel)
+ shell_channel_class = Type(QtShellInProcessChannel)
+ stdin_channel_class = Type(QtStdInInProcessChannel)
+ hb_channel_class = Type(QtHBInProcessChannel)
Oops, something went wrong.