Skip to content

Commit

Permalink
Merge pull request ipython#2791 from minrk/forksafe_nomp
Browse files Browse the repository at this point in the history
forward stdout from forked processes

uses zmq instead of multiprocessing, because mp has too many issues.

- messages are sent via PUSH/PULL from subprocesses
- messages are sent at flush time, not at write time
- subprocess messages
- no threads, no sync events, etc.

some basic tests are included

closes ipython#2438
  • Loading branch information
minrk committed Apr 11, 2013
2 parents cbde3bb + 1e1ca28 commit a9cedfc
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 19 deletions.
4 changes: 2 additions & 2 deletions IPython/kernel/inprocess/ipkernel.py
Expand Up @@ -140,11 +140,11 @@ def _shell_class_default(self):

def _stdout_default(self):
from IPython.kernel.zmq.iostream import OutStream
return OutStream(self.session, self.iopub_socket, u'stdout')
return OutStream(self.session, self.iopub_socket, u'stdout', pipe=False)

def _stderr_default(self):
from IPython.kernel.zmq.iostream import OutStream
return OutStream(self.session, self.iopub_socket, u'stderr')
return OutStream(self.session, self.iopub_socket, u'stderr', pipe=False)

#-----------------------------------------------------------------------------
# Interactive shell subclass
Expand Down
14 changes: 9 additions & 5 deletions IPython/kernel/tests/test_message_spec.py
Expand Up @@ -46,9 +46,11 @@ def teardown():
KM.shutdown_kernel()


def flush_channels():
def flush_channels(km=None):
if km is None:
km = KM
"""flush any messages waiting on the queue"""
for channel in (KM.shell_channel, KM.iopub_channel):
for channel in (km.shell_channel, km.iopub_channel):
while True:
try:
msg = channel.get_msg(block=True, timeout=0.1)
Expand All @@ -58,10 +60,12 @@ def flush_channels():
list(validate_message(msg))


def execute(code='', **kwargs):
def execute(code='', km=None, **kwargs):
"""wrapper for doing common steps for validating an execution request"""
shell = KM.shell_channel
sub = KM.iopub_channel
if km is None:
km = KM
shell = km.shell_channel
sub = km.iopub_channel

msg_id = shell.execute(code=code, **kwargs)
reply = shell.get_msg(timeout=2)
Expand Down
144 changes: 133 additions & 11 deletions IPython/kernel/zmq/iostream.py
@@ -1,16 +1,33 @@
"""wrappers for stdout/stderr forwarding over zmq
"""

#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------

import sys
import time
import os
import threading
import uuid
from io import StringIO

from session import extract_header, Message
import zmq

from session import extract_header

from IPython.utils import io, text
from IPython.utils import py3compat

#-----------------------------------------------------------------------------
# Globals
#-----------------------------------------------------------------------------

MASTER = 0
CHILD = 1

#-----------------------------------------------------------------------------
# Stream classes
#-----------------------------------------------------------------------------
Expand All @@ -19,39 +36,126 @@ class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""

# The time interval between automatic flushes, in seconds.
_subprocess_flush_limit = 256
flush_interval = 0.05
topic=None

def __init__(self, session, pub_socket, name):
def __init__(self, session, pub_socket, name, pipe=True):
self.encoding = 'UTF-8'
self.session = session
self.pub_socket = pub_socket
self.name = name
self.parent_header = {}
self._new_buffer()
self._buffer_lock = threading.Lock()
self._master_pid = os.getpid()
self._master_thread = threading.current_thread().ident
self._pipe_pid = os.getpid()
self._pipe_flag = pipe
if pipe:
self._setup_pipe_in()

def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
ctx = self.pub_socket.context

# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes

self._pipe_in = ctx.socket(zmq.PULL)
self._pipe_in.linger = 0
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
self._pipe_poller = zmq.Poller()
self._pipe_poller.register(self._pipe_in, zmq.POLLIN)

def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
self._pipe_pid = os.getpid()
self._pipe_out = ctx.socket(zmq.PUSH)
self._pipe_out_lock = threading.Lock()
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)

def _is_master_process(self):
return os.getpid() == self._master_pid

def _is_master_thread(self):
return threading.current_thread().ident == self._master_thread

def _have_pipe_out(self):
return os.getpid() == self._pipe_pid

def _check_mp_mode(self):
"""check for forks, and switch to zmq pipeline if necessary"""
if not self._pipe_flag or self._is_master_process():
return MASTER
else:
if not self._have_pipe_out():
self._flush_buffer()
# setup a new out pipe
self._setup_pipe_out()
return CHILD

def set_parent(self, parent):
self.parent_header = extract_header(parent)

def close(self):
self.pub_socket = None

def _flush_from_subprocesses(self):
"""flush possible pub data from subprocesses into my buffer"""
if not self._pipe_flag or not self._is_master_process():
return
for i in range(self._subprocess_flush_limit):
if self._pipe_poller.poll(0):
msg = self._pipe_in.recv_multipart()
if msg[0] != self._pipe_uuid:
continue
else:
self._buffer.write(msg[1].decode(self.encoding, 'replace'))
# this always means a flush,
# so reset our timer
self._start = 0
else:
break

def flush(self):
#io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
"""trigger actual zmq send"""
if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
else:
data = self._buffer.getvalue()

mp_mode = self._check_mp_mode()

if mp_mode != CHILD:
# we are master
if not self._is_master_thread():
# sub-threads must not trigger flush,
# but at least they can force the timer.
self._start = 0
return

self._flush_from_subprocesses()
data = self._flush_buffer()

if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)

if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
self._buffer.close()
self._new_buffer()
else:
with self._pipe_out_lock:
string = self._flush_buffer()
tracker = self._pipe_out.send_multipart([
self._pipe_uuid,
string.encode(self.encoding, 'replace'),
], copy=False, track=True)
try:
tracker.wait(1)
except:
pass

def isatty(self):
return False
Expand All @@ -75,10 +179,19 @@ def write(self, string):
# Make sure that we're handling unicode
if not isinstance(string, unicode):
string = string.decode(self.encoding, 'replace')


is_child = (self._check_mp_mode() == CHILD)
self._buffer.write(string)
if is_child:
# newlines imply flush in subprocesses
# mp.Pool cannot be trusted to flush promptly (or ever),
# and this helps.
if '\n' in string:
self.flush()
# do we want to check subprocess flushes on write?
# self._flush_from_subprocesses()
current_time = time.time()
if self._start <= 0:
if self._start < 0:
self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
Expand All @@ -90,6 +203,15 @@ def writelines(self, sequence):
for string in sequence:
self.write(string)

def _flush_buffer(self):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
data = self._buffer.getvalue()
self._buffer.close()
self._new_buffer()
return data

def _new_buffer(self):
self._buffer = StringIO()
self._start = -1
11 changes: 10 additions & 1 deletion IPython/kernel/zmq/session.py
Expand Up @@ -45,6 +45,7 @@

from IPython.config.application import Application, boolean_flag
from IPython.config.configurable import Configurable, LoggingConfigurable
from IPython.utils import io
from IPython.utils.importstring import import_item
from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
from IPython.utils.py3compat import str_to_bytes
Expand Down Expand Up @@ -318,6 +319,9 @@ def _keyfile_changed(self, name, old, new):
with open(new, 'rb') as f:
self.key = f.read().strip()

# for protecting against sends from forks
pid = Integer()

# serialization traits:

pack = Any(default_packer) # the actual packer function
Expand All @@ -341,6 +345,7 @@ def _unpack_changed(self, name, old, new):
Containers larger than this are pickled outright.
"""
)


def __init__(self, **kwargs):
"""create a Session object
Expand Down Expand Up @@ -382,6 +387,7 @@ def __init__(self, **kwargs):
self.none = self.pack({})
# ensure self._session_default() if necessary, so bsession is defined:
self.session
self.pid = os.getpid()

@property
def msg_id(self):
Expand Down Expand Up @@ -568,7 +574,10 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
else:
msg = self.msg(msg_or_type, content=content, parent=parent,
header=header, metadata=metadata)

if not os.getpid() == self.pid:
io.rprint("WARNING: attempted to send message from fork")
io.rprint(msg)
return
buffers = [] if buffers is None else buffers
to_send = self.serialize(msg, ident)
to_send.extend(buffers)
Expand Down

0 comments on commit a9cedfc

Please sign in to comment.