Permalink
Browse files

forward subprocess IO over zmq on Windows

This is required on Windows, because select can only poll
sockets, unlike [all] other platforms
  • Loading branch information...
1 parent b5a7d66 commit 3e8626a453a821c3779ee863e0e25c05aa91afcc @minrk committed Apr 13, 2011
Showing with 89 additions and 9 deletions.
  1. +22 −9 IPython/parallel/apps/launcher.py
  2. +67 −0 IPython/parallel/apps/win32support.py
@@ -48,6 +48,8 @@ def check_output(*args, **kwargs):
from IPython.parallel.factory import LoggingFactory
+from .win32support import forward_read_events
+
# load winhpcjob only on Windows
try:
from .winhpcjob import (
@@ -57,7 +59,7 @@ def check_output(*args, **kwargs):
except ImportError:
pass
-
+WINDOWS = os.name == 'nt'
#-----------------------------------------------------------------------------
# Paths to the kernel apps
#-----------------------------------------------------------------------------
@@ -251,9 +253,14 @@ def start(self):
env=os.environ,
cwd=self.work_dir
)
-
- self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
- self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
+ if WINDOWS:
+ self.stdout = forward_read_events(self.process.stdout)
+ self.stderr = forward_read_events(self.process.stderr)
+ else:
+ self.stdout = self.process.stdout.fileno()
+ self.stderr = self.process.stderr.fileno()
+ self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
+ self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
self.poller.start()
self.notify_start(self.process.pid)
@@ -277,15 +284,21 @@ def interrupt_then_kill(self, delay=2.0):
# callbacks, etc:
def handle_stdout(self, fd, events):
- line = self.process.stdout.readline()
+ if WINDOWS:
+ line = self.stdout.recv()
+ else:
+ line = self.process.stdout.readline()
# a stopped process will be readable but return empty strings
if line:
self.log.info(line[:-1])
else:
self.poll()
def handle_stderr(self, fd, events):
- line = self.process.stderr.readline()
+ if WINDOWS:
+ line = self.stderr.recv()
+ else:
+ line = self.process.stderr.readline()
# a stopped process will be readable but return empty strings
if line:
self.log.error(line[:-1])
@@ -296,8 +309,8 @@ def poll(self):
status = self.process.poll()
if status is not None:
self.poller.stop()
- self.loop.remove_handler(self.process.stdout.fileno())
- self.loop.remove_handler(self.process.stderr.fileno())
+ self.loop.remove_handler(self.stdout)
+ self.loop.remove_handler(self.stderr)
self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
@@ -588,7 +601,7 @@ def start(self, n, cluster_dir):
# This is only used on Windows.
def find_job_cmd():
- if os.name=='nt':
+ if WINDOWS:
try:
return find_cmd('job')
except (FindCmdError, ImportError):
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+"""Utility for forwarding file read events over a zmq socket.
+
+This is necessary because select on Windows only supports"""
+
+#-----------------------------------------------------------------------------
+# Copyright (C) 2011 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+import uuid
+import zmq
+
+from threading import Thread
+
+#-----------------------------------------------------------------------------
+# Code
+#-----------------------------------------------------------------------------
+
+class ForwarderThread(Thread):
+ def __init__(self, sock, fd):
+ Thread.__init__(self)
+ self.daemon=True
+ self.sock = sock
+ self.fd = fd
+
+ def run(self):
+ """loop through lines in self.fd, and send them over self.sock"""
+ line = self.fd.readline()
+ # allow for files opened in unicode mode
+ if isinstance(line, unicode):
+ send = self.sock.send_unicode
+ else:
+ send = self.sock.send
+ while line:
+ send(line)
+ line = self.fd.readline()
+ # line == '' means EOF
+ self.fd.close()
+ self.sock.close()
+
+def forward_read_events(fd, context=None):
+ """forward read events from an FD over a socket.
+
+ This method wraps a file in a socket pair, so it can
+ be polled for read events by select (specifically zmq.eventloop.ioloop)
+ """
+ if context is None:
+ context = zmq.Context.instance()
+ push = context.socket(zmq.PUSH)
+ push.setsockopt(zmq.LINGER, -1)
+ pull = context.socket(zmq.PULL)
+ addr='inproc://%s'%uuid.uuid4()
+ push.bind(addr)
+ pull.connect(addr)
+ forwarder = ForwarderThread(push, fd)
+ forwarder.start()
+ return pull
+
+
+__all__ = ['forward_read_events']

0 comments on commit 3e8626a

Please sign in to comment.