Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refactor kernel managers in preparation for the EmbeddedKernel.

  • Loading branch information...
commit e367f3e4adb14a46b620abec20de8d321d883e53 1 parent e1a4864
@epatters epatters authored
View
0  IPython/embedded/__init__.py
No changes.
View
75 IPython/embedded/blockingkernelmanager.py
@@ -0,0 +1,75 @@
+""" Implements a fully blocking kernel manager.
+
+Useful for test suites and blocking terminal interfaces.
+"""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2012 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING.txt, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+from __future__ import print_function
+
+# Standard library imports.
+import Queue
+from threading import Event
+
+# Local imports.
+from IPython.utils.traitlets import Type
+from kernelmanager import EmbeddedKernelManager, ShellEmbeddedChannel, \
+ SubEmbeddedChannel, StdInEmbeddedChannel
+
+#-----------------------------------------------------------------------------
+# Utility classes
+#-----------------------------------------------------------------------------
+
+class BlockingChannelMixin(object):
+
+ def __init__(self, *args, **kwds):
+ super(BlockingChannelMixin, self).__init__(*args, **kwds)
+ self._in_queue = Queue.Queue()
+
+ def call_handlers(self, msg):
+ self._in_queue.put(msg)
+
+ def get_msg(self, block=True, timeout=None):
+ """ Gets a message if there is one that is ready. """
+ return self._in_queue.get(block, timeout)
+
+ def get_msgs(self):
+ """ Get all messages that are currently ready. """
+ msgs = []
+ while True:
+ try:
+ msgs.append(self.get_msg(block=False))
+ except Queue.Empty:
+ break
+ return msgs
+
+ def msg_ready(self):
+ """ Is there a message that has been received? """
+ return not self._in_queue.empty()
+
+#-----------------------------------------------------------------------------
+# Blocking kernel manager
+#-----------------------------------------------------------------------------
+
+class BlockingShellEmbeddedChannel(BlockingChannelMixin, ShellEmbeddedChannel):
+ pass
+
+class BlockingSubEmbeddedChannel(BlockingChannelMixin, SubEmbeddedChannel):
+ pass
+
+class BlockingStdInEmbeddedChannel(BlockingChannelMixin, StdInEmbeddedChannel):
+ pass
+
+class BlockingEmbeddedKernelManager(EmbeddedKernelManager):
+
+ # The classes to use for the various channels.
+ shell_channel_class = Type(BlockingShellEmbeddedChannel)
+ sub_channel_class = Type(BlockingSubEmbeddedChannel)
+ stdin_channel_class = Type(BlockingStdInEmbeddedChannel)
View
22 IPython/embedded/ipkernel.py
@@ -0,0 +1,22 @@
+""" An embedded (in-process) kernel. """
+
+#-----------------------------------------------------------------------------
+# Copyright (C) 2012 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+# Local imports.
+from IPython.zmq.ipkernel import Kernel
+
+#-----------------------------------------------------------------------------
+# Main kernel class
+#-----------------------------------------------------------------------------
+
+class EmbeddedKernel(Kernel):
+ pass
View
398 IPython/embedded/kernelmanager.py
@@ -0,0 +1,398 @@
+""" A kernel manager for embedded (in-process) kernels. """
+
+#-----------------------------------------------------------------------------
+# Copyright (C) 2012 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+# Local imports.
+from IPython.config.loader import Config
+from IPython.utils.traitlets import HasTraits, Any, Instance, Type
+
+#-----------------------------------------------------------------------------
+# Channel classes
+#-----------------------------------------------------------------------------
+
+class EmbeddedChannel(object):
+ """ Base class for embedded channels.
+ """
+
+ def __init__(self, manager):
+ super(EmbeddedChannel, self).__init__()
+ self.manager = manager
+ self._is_alive = False
+
+ #--------------------------------------------------------------------------
+ # Channel interface
+ #--------------------------------------------------------------------------
+
+ def is_alive(self):
+ return self._is_alive
+
+ def start(self):
+ self._is_alive = True
+
+ def stop(self):
+ self._is_alive = False
+
+ def call_handlers(self, msg):
+ """ This method is called in the main thread when a message arrives.
+
+ Subclasses should override this method to handle incoming messages.
+ """
+ raise NotImplementedError('call_handlers must be defined in a subclass.')
+
+ #--------------------------------------------------------------------------
+ # EmbeddedChannel interface
+ #--------------------------------------------------------------------------
+
+ def call_handlers_later(self, *args, **kwds):
+ """ Call the message handlers later.
+
+ The default implementation just calls the handlers immediately, but this
+ method exists so that GUI toolkits can defer calling the handlers until
+ after the event loop has run, as expected by GUI frontends.
+ """
+ self.call_handlers(*args, **kwds)
+
+ def process_events(self):
+ """ Process any pending GUI events.
+
+ This method will be never be called from a frontend without an event
+ loop (e.g., a terminal frontend).
+ """
+ raise NotImplementedError
+
+
+class ShellEmbeddedChannel(EmbeddedChannel):
+ """The DEALER channel for issues request/replies to the kernel.
+ """
+
+ # flag for whether execute requests should be allowed to call raw_input
+ allow_stdin = True
+
+ def execute(self, code, silent=False, store_history=True,
+ user_variables=[], user_expressions={}, allow_stdin=None):
+ """Execute code in the kernel.
+
+ Parameters
+ ----------
+ code : str
+ A string of Python code.
+
+ silent : bool, optional (default False)
+ If set, the kernel will execute the code as quietly possible, and
+ will force store_history to be False.
+
+ store_history : bool, optional (default True)
+ If set, the kernel will store command history. This is forced
+ to be False if silent is True.
+
+ user_variables : list, optional
+ A list of variable names to pull from the user's namespace. They
+ will come back as a dict with these names as keys and their
+ :func:`repr` as values.
+
+ user_expressions : dict, optional
+ A dict mapping names to expressions to be evaluated in the user's
+ dict. The expression values are returned as strings formatted using
+ :func:`repr`.
+
+ allow_stdin : bool, optional (default self.allow_stdin)
+ Flag for whether the kernel can send stdin requests to frontends.
+
+ Some frontends (e.g. the Notebook) do not support stdin requests.
+ If raw_input is called from code executed from such a frontend, a
+ StdinNotImplementedError will be raised.
+
+ Returns
+ -------
+ The msg_id of the message sent.
+ """
+ raise NotImplementedError
+
+ def complete(self, text, line, cursor_pos, block=None):
+ """Tab complete text in the kernel's namespace.
+
+ Parameters
+ ----------
+ text : str
+ The text to complete.
+ line : str
+ The full line of text that is the surrounding context for the
+ text to complete.
+ cursor_pos : int
+ The position of the cursor in the line where the completion was
+ requested.
+ block : str, optional
+ The full block of code in which the completion is being requested.
+
+ Returns
+ -------
+ The msg_id of the message sent.
+ """
+ raise NotImplementedError
+
+ def object_info(self, oname, detail_level=0):
+ """Get metadata information about an object.
+
+ Parameters
+ ----------
+ oname : str
+ A string specifying the object name.
+ detail_level : int, optional
+ The level of detail for the introspection (0-2)
+
+ Returns
+ -------
+ The msg_id of the message sent.
+ """
+ raise NotImplementedError
+
+ def history(self, raw=True, output=False, hist_access_type='range', **kwds):
+ """Get entries from the history list.
+
+ Parameters
+ ----------
+ raw : bool
+ If True, return the raw input.
+ output : bool
+ If True, then return the output as well.
+ hist_access_type : str
+ 'range' (fill in session, start and stop params), 'tail' (fill in n)
+ or 'search' (fill in pattern param).
+
+ session : int
+ For a range request, the session from which to get lines. Session
+ numbers are positive integers; negative ones count back from the
+ current session.
+ start : int
+ The first line number of a history range.
+ stop : int
+ The final (excluded) line number of a history range.
+
+ n : int
+ The number of lines of history to get for a tail request.
+
+ pattern : str
+ The glob-syntax pattern for a search request.
+
+ Returns
+ -------
+ The msg_id of the message sent.
+ """
+ raise NotImplementedError
+
+ def shutdown(self, restart=False):
+ """ Request an immediate kernel shutdown.
+
+ A dummy method for the embedded kernel.
+ """
+ # FIXME: What to do here?
+ raise NotImplementedError('Shutdown not supported for embedded kernel')
+
+
+class SubEmbeddedChannel(EmbeddedChannel):
+ """The SUB channel which listens for messages that the kernel publishes.
+ """
+
+ def flush(self, timeout=1.0):
+ """ Immediately processes all pending messages on the SUB channel.
+
+ A dummy method for the embedded kernel.
+ """
+ pass
+
+
+class StdInEmbeddedChannel(EmbeddedChannel):
+ """ A reply channel to handle raw_input requests that the kernel makes. """
+
+ def input(self, string):
+ """ Send a string of raw input to the kernel.
+ """
+ raise NotImplementedError
+
+
+class HBEmbeddedChannel(EmbeddedChannel):
+ """ A dummy heartbeat channel. """
+
+ time_to_dead = 3.0
+
+ def __init__(self, *args, **kwds):
+ super(HBEmbeddedChannel, self).__init__(*args, **kwds)
+ self._pause = True
+
+ def pause(self):
+ """ Pause the heartbeat. """
+ self._pause = True
+
+ def unpause(self):
+ """ Unpause the heartbeat. """
+ self._pause = False
+
+ def is_beating(self):
+ """ Is the heartbeat running and responsive (and not paused). """
+ return not self._pause
+
+
+#-----------------------------------------------------------------------------
+# Main kernel manager class
+#-----------------------------------------------------------------------------
+
+class EmbeddedKernelManager(HasTraits):
+ """ A manager for an embedded kernel.
+
+ This class implements most of the interface of
+ ``IPython.zmq.kernelmanager.KernelManager`` and allows (asynchronous)
+ frontends to be used seamlessly with an in-process kernel.
+ """
+ # Config object for passing to child configurables
+ config = Instance(Config)
+
+ # The Session to use for building messages.
+ session = Instance('IPython.zmq.session.Session')
+ def _session_default(self):
+ from IPython.zmq.session import Session
+ return Session(config=self.config)
+
+ # The kernel process with which the KernelManager is communicating.
+ kernel = Instance('IPython.embedded.ipkernel.EmbeddedKernel')
+
+ # The classes to use for the various channels.
+ shell_channel_class = Type(ShellEmbeddedChannel)
+ sub_channel_class = Type(SubEmbeddedChannel)
+ stdin_channel_class = Type(StdInEmbeddedChannel)
+ hb_channel_class = Type(HBEmbeddedChannel)
+
+ # Protected traits.
+ _shell_channel = Any
+ _sub_channel = Any
+ _stdin_channel = Any
+ _hb_channel = Any
+
+ #--------------------------------------------------------------------------
+ # Channel management methods:
+ #--------------------------------------------------------------------------
+
+ def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
+ """ Starts the channels for this kernel.
+ """
+ if shell:
+ self.shell_channel.start()
+ if sub:
+ self.sub_channel.start()
+ if stdin:
+ self.stdin_channel.start()
+ self.shell_channel.allow_stdin = True
+ else:
+ self.shell_channel.allow_stdin = False
+ if hb:
+ self.hb_channel.start()
+
+ def stop_channels(self):
+ """ Stops all the running channels for this kernel.
+ """
+ if self.shell_channel.is_alive():
+ self.shell_channel.stop()
+ if self.sub_channel.is_alive():
+ self.sub_channel.stop()
+ if self.stdin_channel.is_alive():
+ self.stdin_channel.stop()
+ if self.hb_channel.is_alive():
+ self.hb_channel.stop()
+
+ @property
+ def channels_running(self):
+ """ Are any of the channels created and running? """
+ return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
+ self.stdin_channel.is_alive() or self.hb_channel.is_alive())
+
+ #--------------------------------------------------------------------------
+ # Kernel management methods:
+ #--------------------------------------------------------------------------
+
+ def start_kernel(self, **kwds):
+ """ Starts a kernel process and configures the manager to use it.
+ """
+ from IPython.embedded.ipkernel import EmbeddedKernel
+ self.kernel = EmbeddedKernel()
+ self.kernel.frontends.append(self)
+
+ def shutdown_kernel(self):
+ """ Attempts to the stop the kernel process cleanly. If the kernel
+ cannot be stopped and the kernel is local, it is killed.
+ """
+ self.kill_kernel()
+
+ def restart_kernel(self, now=False, **kwds):
+ """ Restarts a kernel with the arguments that were used to launch it.
+
+ The 'now' parameter is ignored.
+ """
+ self.shutdown_kernel()
+ self.start_kernel(**kwds)
+
+ @property
+ def has_kernel(self):
+ """ Returns whether a kernel process has been specified for the kernel
+ manager.
+ """
+ return self.kernel is not None
+
+ def kill_kernel(self):
+ """ Kill the running kernel.
+ """
+ self.kernel.frontends.remove(self)
+ self.kernel = None
+
+ def interrupt_kernel(self):
+ """ Interrupts the kernel. """
+ raise NotImplementedError("Cannot interrupt embedded kernel.")
+
+ def signal_kernel(self, signum):
+ """ Sends a signal to the kernel. """
+ raise NotImplementedError("Cannot signal embedded kernel.")
+
+ @property
+ def is_alive(self):
+ """ Is the kernel process still running? """
+ return True
+
+ #--------------------------------------------------------------------------
+ # Channels used for communication with the kernel:
+ #--------------------------------------------------------------------------
+
+ @property
+ def shell_channel(self):
+ """Get the REQ socket channel object to make requests of the kernel."""
+ if self._shell_channel is None:
+ self._shell_channel = self.shell_channel_class(self)
+ return self._shell_channel
+
+ @property
+ def sub_channel(self):
+ """Get the SUB socket channel object."""
+ if self._sub_channel is None:
+ self._sub_channel = self.sub_channel_class(self)
+ return self._sub_channel
+
+ @property
+ def stdin_channel(self):
+ """Get the REP socket channel object to handle stdin (raw_input)."""
+ if self._stdin_channel is None:
+ self._stdin_channel = self.stdin_channel_class(self)
+ return self._stdin_channel
+
+ @property
+ def hb_channel(self):
+ """Get the heartbeat socket channel object to check that the
+ kernel is alive."""
+ if self._hb_channel is None:
+ self._hb_channel = self.hb_channel_class(self)
+ return self._hb_channel
View
258 IPython/frontend/qt/base_kernelmanager.py
@@ -0,0 +1,258 @@
+""" Defines a KernelManager that provides signals and slots.
+"""
+
+# System library imports.
+from IPython.external.qt import QtCore
+
+# IPython imports.
+from IPython.utils.traitlets import Type
+from util import SuperQObject
+
+
+class ChannelQObject(SuperQObject):
+
+ # Emitted when the channel is started.
+ started = QtCore.Signal()
+
+ # Emitted when the channel is stopped.
+ stopped = QtCore.Signal()
+
+ #---------------------------------------------------------------------------
+ # Channel interface
+ #---------------------------------------------------------------------------
+
+ def start(self):
+ """ Reimplemented to emit signal.
+ """
+ super(ChannelQObject, self).start()
+ self.started.emit()
+
+ def stop(self):
+ """ Reimplemented to emit signal.
+ """
+ super(ChannelQObject, self).stop()
+ self.stopped.emit()
+
+ #---------------------------------------------------------------------------
+ # EmbeddedChannel interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers_later(self, *args, **kwds):
+ """ Call the message handlers later.
+ """
+ do_later = lambda: self.call_handlers(*args, **kwds)
+ QtCore.QTimer.singleShot(0, do_later)
+
+ def process_events(self):
+ """ Process any pending GUI events.
+ """
+ QtCore.QCoreApplication.instance().processEvents()
+
+
+class QtShellChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when a reply has been received for the corresponding request
+ # type.
+ execute_reply = QtCore.Signal(object)
+ complete_reply = QtCore.Signal(object)
+ object_info_reply = QtCore.Signal(object)
+ history_reply = QtCore.Signal(object)
+
+ # Emitted when the first reply comes back.
+ first_reply = QtCore.Signal()
+
+ # Used by the first_reply signal logic to determine if a reply is the
+ # first.
+ _handlers_called = False
+
+ #---------------------------------------------------------------------------
+ # 'ShellSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ signal = getattr(self, msg_type, None)
+ if signal:
+ signal.emit(msg)
+
+ if not self._handlers_called:
+ self.first_reply.emit()
+ self._handlers_called = True
+
+ #---------------------------------------------------------------------------
+ # 'QtShellChannelMixin' interface
+ #---------------------------------------------------------------------------
+
+ def reset_first_reply(self):
+ """ Reset the first_reply signal to fire again on the next reply.
+ """
+ self._handlers_called = False
+
+
+class QtSubChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'stream' is received.
+ stream_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyin' is received.
+ pyin_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyout' is received.
+ pyout_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'pyerr' is received.
+ pyerr_received = QtCore.Signal(object)
+
+ # Emitted when a message of type 'display_data' is received
+ display_data_received = QtCore.Signal(object)
+
+ # Emitted when a crash report message is received from the kernel's
+ # last-resort sys.excepthook.
+ crash_received = QtCore.Signal(object)
+
+ # Emitted when a shutdown is noticed.
+ shutdown_reply_received = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'SubSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ signal = getattr(self, msg_type + '_received', None)
+ if signal:
+ signal.emit(msg)
+ elif msg_type in ('stdout', 'stderr'):
+ self.stream_received.emit(msg)
+
+ def flush(self):
+ """ Reimplemented to ensure that signals are dispatched immediately.
+ """
+ super(QtSubChannelMixin, self).flush()
+ QtCore.QCoreApplication.instance().processEvents()
+
+
+class QtStdInChannelMixin(ChannelQObject):
+
+ # Emitted when any message is received.
+ message_received = QtCore.Signal(object)
+
+ # Emitted when an input request is received.
+ input_requested = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'StdInSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, msg):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.message_received.emit(msg)
+
+ # Emit signals for specialized message types.
+ msg_type = msg['header']['msg_type']
+ if msg_type == 'input_request':
+ self.input_requested.emit(msg)
+
+
+class QtHBChannelMixin(ChannelQObject):
+
+ # Emitted when the kernel has died.
+ kernel_died = QtCore.Signal(object)
+
+ #---------------------------------------------------------------------------
+ # 'HBSocketChannel' interface
+ #---------------------------------------------------------------------------
+
+ def call_handlers(self, since_last_heartbeat):
+ """ Reimplemented to emit signals instead of making callbacks.
+ """
+ # Emit the generic signal.
+ self.kernel_died.emit(since_last_heartbeat)
+
+
+class QtKernelManagerMixin(object):
+ """ A KernelManager that provides signals and slots.
+ """
+
+ # Emitted when the kernel manager has started listening.
+ started_kernel = QtCore.Signal()
+
+ # Emitted when the kernel manager has started listening.
+ started_channels = QtCore.Signal()
+
+ # Emitted when the kernel manager has stopped listening.
+ stopped_channels = QtCore.Signal()
+
+ # Use Qt-specific channel classes that emit signals.
+ sub_channel_class = Type(QtSubChannelMixin)
+ shell_channel_class = Type(QtShellChannelMixin)
+ stdin_channel_class = Type(QtStdInChannelMixin)
+ hb_channel_class = Type(QtHBChannelMixin)
+
+ #---------------------------------------------------------------------------
+ # 'KernelManager' interface
+ #---------------------------------------------------------------------------
+
+ #------ Kernel process management ------------------------------------------
+
+ def start_kernel(self, *args, **kw):
+ """ Reimplemented for proper heartbeat management.
+ """
+ if self._shell_channel is not None:
+ self._shell_channel.reset_first_reply()
+ super(QtKernelManagerMixin, self).start_kernel(*args, **kw)
+ self.started_kernel.emit()
+
+ #------ Channel management -------------------------------------------------
+
+ def start_channels(self, *args, **kw):
+ """ Reimplemented to emit signal.
+ """
+ super(QtKernelManagerMixin, self).start_channels(*args, **kw)
+ self.started_channels.emit()
+
+ def stop_channels(self):
+ """ Reimplemented to emit signal.
+ """
+ super(QtKernelManagerMixin, self).stop_channels()
+ self.stopped_channels.emit()
+
+ @property
+ def shell_channel(self):
+ """ Reimplemented for proper heartbeat management.
+ """
+ if self._shell_channel is None:
+ self._shell_channel = super(QtKernelManagerMixin,self).shell_channel
+ self._shell_channel.first_reply.connect(self._first_reply)
+ return self._shell_channel
+
+ #---------------------------------------------------------------------------
+ # Protected interface
+ #---------------------------------------------------------------------------
+
+ def _first_reply(self):
+ """ Unpauses the heartbeat channel when the first reply is received on
+ the execute channel. Note that this will *not* start the heartbeat
+ channel if it is not already running!
+ """
+ if self._hb_channel is not None:
+ self._hb_channel.unpause()
View
37 IPython/frontend/qt/embedded_kernelmanager.py
@@ -0,0 +1,37 @@
+""" Defines an embedded KernelManager that provides signals and slots.
+"""
+
+# Local imports.
+from IPython.embedded.kernelmanager import \
+ ShellEmbeddedChannel, SubEmbeddedChannel, StdInEmbeddedChannel, \
+ HBEmbeddedChannel, EmbeddedKernelManager
+from IPython.utils.traitlets import Type
+from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
+ QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
+from util import MetaQObjectHasTraits, SuperQObject
+
+
+class QtShellEmbeddedChannel(QtShellChannelMixin, ShellEmbeddedChannel):
+ pass
+
+class QtSubEmbeddedChannel(QtSubChannelMixin, SubEmbeddedChannel):
+ pass
+
+class QtStdInEmbeddedChannel(QtStdInChannelMixin, StdInEmbeddedChannel):
+ pass
+
+class QtHBEmbeddedChannel(QtHBChannelMixin, HBEmbeddedChannel):
+ pass
+
+
+class QtEmbeddedKernelManager(QtKernelManagerMixin,
+ EmbeddedKernelManager, SuperQObject):
+ """ An embedded KernelManager that provides signals and slots.
+ """
+
+ __metaclass__ = MetaQObjectHasTraits
+
+ sub_channel_class = Type(QtSubEmbeddedChannel)
+ shell_channel_class = Type(QtShellEmbeddedChannel)
+ stdin_channel_class = Type(QtStdInEmbeddedChannel)
+ hb_channel_class = Type(QtHBEmbeddedChannel)
View
240 IPython/frontend/qt/kernelmanager.py
@@ -1,247 +1,35 @@
""" Defines a KernelManager that provides signals and slots.
"""
-# System library imports.
-from IPython.external.qt import QtCore
-
-# IPython imports.
+# Local imports.
from IPython.utils.traitlets import Type
-from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
- ShellSocketChannel, StdInSocketChannel, HBSocketChannel
+from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
+ StdInSocketChannel, HBSocketChannel, KernelManager
+from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
+ QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
from util import MetaQObjectHasTraits, SuperQObject
-class SocketChannelQObject(SuperQObject):
-
- # Emitted when the channel is started.
- started = QtCore.Signal()
-
- # Emitted when the channel is stopped.
- stopped = QtCore.Signal()
-
- #---------------------------------------------------------------------------
- # 'ZMQSocketChannel' interface
- #---------------------------------------------------------------------------
-
- def start(self):
- """ Reimplemented to emit signal.
- """
- super(SocketChannelQObject, self).start()
- self.started.emit()
-
- def stop(self):
- """ Reimplemented to emit signal.
- """
- super(SocketChannelQObject, self).stop()
- self.stopped.emit()
-
-
-class QtShellSocketChannel(SocketChannelQObject, ShellSocketChannel):
-
- # Emitted when any message is received.
- message_received = QtCore.Signal(object)
-
- # Emitted when a reply has been received for the corresponding request
- # type.
- execute_reply = QtCore.Signal(object)
- complete_reply = QtCore.Signal(object)
- object_info_reply = QtCore.Signal(object)
- history_reply = QtCore.Signal(object)
-
- # Emitted when the first reply comes back.
- first_reply = QtCore.Signal()
-
- # Used by the first_reply signal logic to determine if a reply is the
- # first.
- _handlers_called = False
-
- #---------------------------------------------------------------------------
- # 'ShellSocketChannel' interface
- #---------------------------------------------------------------------------
-
- def call_handlers(self, msg):
- """ Reimplemented to emit signals instead of making callbacks.
- """
- # Emit the generic signal.
- self.message_received.emit(msg)
-
- # Emit signals for specialized message types.
- msg_type = msg['header']['msg_type']
- signal = getattr(self, msg_type, None)
- if signal:
- signal.emit(msg)
-
- if not self._handlers_called:
- self.first_reply.emit()
- self._handlers_called = True
-
- #---------------------------------------------------------------------------
- # 'QtShellSocketChannel' interface
- #---------------------------------------------------------------------------
-
- def reset_first_reply(self):
- """ Reset the first_reply signal to fire again on the next reply.
- """
- self._handlers_called = False
-
-
-class QtSubSocketChannel(SocketChannelQObject, SubSocketChannel):
-
- # Emitted when any message is received.
- message_received = QtCore.Signal(object)
-
- # Emitted when a message of type 'stream' is received.
- stream_received = QtCore.Signal(object)
-
- # Emitted when a message of type 'pyin' is received.
- pyin_received = QtCore.Signal(object)
-
- # Emitted when a message of type 'pyout' is received.
- pyout_received = QtCore.Signal(object)
-
- # Emitted when a message of type 'pyerr' is received.
- pyerr_received = QtCore.Signal(object)
-
- # Emitted when a message of type 'display_data' is received
- display_data_received = QtCore.Signal(object)
+class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
+ pass
- # Emitted when a crash report message is received from the kernel's
- # last-resort sys.excepthook.
- crash_received = QtCore.Signal(object)
+class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
+ pass
- # Emitted when a shutdown is noticed.
- shutdown_reply_received = QtCore.Signal(object)
+class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
+ pass
- #---------------------------------------------------------------------------
- # 'SubSocketChannel' interface
- #---------------------------------------------------------------------------
+class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
+ pass
- def call_handlers(self, msg):
- """ Reimplemented to emit signals instead of making callbacks.
- """
- # Emit the generic signal.
- self.message_received.emit(msg)
- # Emit signals for specialized message types.
- msg_type = msg['header']['msg_type']
- signal = getattr(self, msg_type + '_received', None)
- if signal:
- signal.emit(msg)
- elif msg_type in ('stdout', 'stderr'):
- self.stream_received.emit(msg)
- def flush(self):
- """ Reimplemented to ensure that signals are dispatched immediately.
- """
- super(QtSubSocketChannel, self).flush()
- QtCore.QCoreApplication.instance().processEvents()
-
-
-class QtStdInSocketChannel(SocketChannelQObject, StdInSocketChannel):
-
- # Emitted when any message is received.
- message_received = QtCore.Signal(object)
-
- # Emitted when an input request is received.
- input_requested = QtCore.Signal(object)
-
- #---------------------------------------------------------------------------
- # 'StdInSocketChannel' interface
- #---------------------------------------------------------------------------
-
- def call_handlers(self, msg):
- """ Reimplemented to emit signals instead of making callbacks.
- """
- # Emit the generic signal.
- self.message_received.emit(msg)
-
- # Emit signals for specialized message types.
- msg_type = msg['header']['msg_type']
- if msg_type == 'input_request':
- self.input_requested.emit(msg)
-
-
-class QtHBSocketChannel(SocketChannelQObject, HBSocketChannel):
-
- # Emitted when the kernel has died.
- kernel_died = QtCore.Signal(object)
-
- #---------------------------------------------------------------------------
- # 'HBSocketChannel' interface
- #---------------------------------------------------------------------------
-
- def call_handlers(self, since_last_heartbeat):
- """ Reimplemented to emit signals instead of making callbacks.
- """
- # Emit the generic signal.
- self.kernel_died.emit(since_last_heartbeat)
-
-
-class QtKernelManager(KernelManager, SuperQObject):
+class QtKernelManager(QtKernelManagerMixin, KernelManager, SuperQObject):
""" A KernelManager that provides signals and slots.
"""
__metaclass__ = MetaQObjectHasTraits
- # Emitted when the kernel manager has started listening.
- started_kernel = QtCore.Signal()
-
- # Emitted when the kernel manager has started listening.
- started_channels = QtCore.Signal()
-
- # Emitted when the kernel manager has stopped listening.
- stopped_channels = QtCore.Signal()
-
- # Use Qt-specific channel classes that emit signals.
sub_channel_class = Type(QtSubSocketChannel)
shell_channel_class = Type(QtShellSocketChannel)
stdin_channel_class = Type(QtStdInSocketChannel)
hb_channel_class = Type(QtHBSocketChannel)
-
- #---------------------------------------------------------------------------
- # 'KernelManager' interface
- #---------------------------------------------------------------------------
-
- #------ Kernel process management ------------------------------------------
-
- def start_kernel(self, *args, **kw):
- """ Reimplemented for proper heartbeat management.
- """
- if self._shell_channel is not None:
- self._shell_channel.reset_first_reply()
- super(QtKernelManager, self).start_kernel(*args, **kw)
- self.started_kernel.emit()
-
- #------ Channel management -------------------------------------------------
-
- def start_channels(self, *args, **kw):
- """ Reimplemented to emit signal.
- """
- super(QtKernelManager, self).start_channels(*args, **kw)
- self.started_channels.emit()
-
- def stop_channels(self):
- """ Reimplemented to emit signal.
- """
- super(QtKernelManager, self).stop_channels()
- self.stopped_channels.emit()
-
- @property
- def shell_channel(self):
- """ Reimplemented for proper heartbeat management.
- """
- if self._shell_channel is None:
- self._shell_channel = super(QtKernelManager, self).shell_channel
- self._shell_channel.first_reply.connect(self._first_reply)
- return self._shell_channel
-
- #---------------------------------------------------------------------------
- # Protected interface
- #---------------------------------------------------------------------------
-
- def _first_reply(self):
- """ Unpauses the heartbeat channel when the first reply is received on
- the execute channel. Note that this will *not* start the heartbeat
- channel if it is not already running!
- """
- if self._hb_channel is not None:
- self._hb_channel.unpause()
View
129 IPython/zmq/blockingkernelmanager.py
@@ -1,9 +1,9 @@
-"""Implement a fully blocking kernel manager.
+""" Implements a fully blocking kernel manager.
Useful for test suites and blocking terminal interfaces.
"""
#-----------------------------------------------------------------------------
-# Copyright (C) 2010-2011 The IPython Development Team
+# Copyright (C) 2010-2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING.txt, distributed as part of this software.
@@ -12,125 +12,25 @@
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
-from __future__ import print_function
-# Stdlib
-from Queue import Queue, Empty
-from threading import Event
-
-# Our own
-from IPython.utils import io
+# Local imports.
+from IPython.embedded.blockingkernelmanager import BlockingChannelMixin
from IPython.utils.traitlets import Type
-
-from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
- ShellSocketChannel, StdInSocketChannel)
+from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
+ ShellSocketChannel, StdInSocketChannel
#-----------------------------------------------------------------------------
-# Functions and classes
+# Blocking kernel manager
#-----------------------------------------------------------------------------
-class BlockingSubSocketChannel(SubSocketChannel):
-
- def __init__(self, context, session, address=None):
- super(BlockingSubSocketChannel, self).__init__(context, session,
- address)
- self._in_queue = Queue()
-
- def call_handlers(self, msg):
- #io.rprint('[[Sub]]', msg) # dbg
- self._in_queue.put(msg)
-
- def msg_ready(self):
- """Is there a message that has been received?"""
- if self._in_queue.qsize() == 0:
- return False
- else:
- return True
-
- def get_msg(self, block=True, timeout=None):
- """Get a message if there is one that is ready."""
- if block and timeout is None:
- # never use timeout=None, because get
- # becomes uninterruptible
- timeout = 1e6
- return self._in_queue.get(block, timeout)
-
- def get_msgs(self):
- """Get all messages that are currently ready."""
- msgs = []
- while True:
- try:
- msgs.append(self.get_msg(block=False))
- except Empty:
- break
- return msgs
-
-
-class BlockingShellSocketChannel(ShellSocketChannel):
+class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
+ pass
- def __init__(self, context, session, address=None):
- super(BlockingShellSocketChannel, self).__init__(context, session,
- address)
- self._in_queue = Queue()
-
- def call_handlers(self, msg):
- #io.rprint('[[Shell]]', msg) # dbg
- self._in_queue.put(msg)
-
- def msg_ready(self):
- """Is there a message that has been received?"""
- if self._in_queue.qsize() == 0:
- return False
- else:
- return True
-
- def get_msg(self, block=True, timeout=None):
- """Get a message if there is one that is ready."""
- if block and timeout is None:
- # never use timeout=None, because get
- # becomes uninterruptible
- timeout = 1e6
- return self._in_queue.get(block, timeout)
-
- def get_msgs(self):
- """Get all messages that are currently ready."""
- msgs = []
- while True:
- try:
- msgs.append(self.get_msg(block=False))
- except Empty:
- break
- return msgs
-
-
-class BlockingStdInSocketChannel(StdInSocketChannel):
-
- def __init__(self, context, session, address=None):
- super(BlockingStdInSocketChannel, self).__init__(context, session, address)
- self._in_queue = Queue()
-
- def call_handlers(self, msg):
- #io.rprint('[[Rep]]', msg) # dbg
- self._in_queue.put(msg)
-
- def get_msg(self, block=True, timeout=None):
- "Gets a message if there is one that is ready."
- return self._in_queue.get(block, timeout)
-
- def get_msgs(self):
- """Get all messages that are currently ready."""
- msgs = []
- while True:
- try:
- msgs.append(self.get_msg(block=False))
- except Empty:
- break
- return msgs
-
- def msg_ready(self):
- "Is there a message that has been received?"
- return not self._in_queue.empty()
+class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
+ pass
+class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
+ pass
class BlockingHBSocketChannel(HBSocketChannel):
@@ -140,10 +40,9 @@ class BlockingHBSocketChannel(HBSocketChannel):
time_to_dead = 1.
def call_handlers(self, since_last_heartbeat):
- """pause beating on missed heartbeat"""
+ """ Pause beating on missed heartbeat. """
pass
-
class BlockingKernelManager(KernelManager):
# The classes to use for the various channels.
View
8 IPython/zmq/kernelmanager.py
@@ -654,6 +654,8 @@ def _context_default(self):
# The Session to use for communication with the kernel.
session = Instance(Session)
+ def _session_default(self):
+ return Session(config=self.config)
# The kernel process with which the KernelManager is communicating.
kernel = Instance(Popen)
@@ -682,16 +684,10 @@ def _ip_changed(self, name, old, new):
_stdin_channel = Any
_hb_channel = Any
_connection_file_written=Bool(False)
-
- def __init__(self, **kwargs):
- super(KernelManager, self).__init__(**kwargs)
- if self.session is None:
- self.session = Session(config=self.config)
def __del__(self):
self.cleanup_connection_file()
-
#--------------------------------------------------------------------------
# Channel management methods:
#--------------------------------------------------------------------------
Please sign in to comment.
Something went wrong with that request. Please try again.