Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify StreamCapturer for subprocess testing #4456

Merged
merged 2 commits into from Oct 30, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions IPython/kernel/tests/utils.py
Expand Up @@ -43,11 +43,10 @@
def start_new_kernel(argv=None):
"""start a new kernel, and return its Manager and Client"""
km = KernelManager()
kwargs = dict(stdout=PIPE, stderr=STDOUT)
kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT)
if argv:
kwargs['extra_arguments'] = argv
km.start_kernel(**kwargs)
nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno())
nose.ipy_stream_capturer.ensure_started()
kc = km.client()
kc.start_channels()
Expand Down
4 changes: 1 addition & 3 deletions IPython/parallel/tests/__init__.py
Expand Up @@ -38,15 +38,14 @@ class TestProcessLauncher(LocalProcessLauncher):
def start(self):
if self.state == 'before':
self.process = Popen(self.args,
stdout=PIPE, stderr=STDOUT,
stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT,
env=os.environ,
cwd=self.work_dir
)
self.notify_start(self.process.pid)
self.poll = self.process.poll
# Store stdout & stderr to show with failing tests.
# This is defined in IPython.testing.iptest
nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno())
nose.ipy_stream_capturer.ensure_started()
else:
s = 'The process was already started and has state: %r' % self.state
Expand Down Expand Up @@ -114,7 +113,6 @@ def teardown():
time.sleep(1)
while launchers:
p = launchers.pop()
nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno())
if p.poll() is None:
try:
p.stop()
Expand Down
60 changes: 20 additions & 40 deletions IPython/testing/iptest.py
Expand Up @@ -31,7 +31,6 @@
from io import BytesIO
import os
import os.path as path
from select import select
import sys
from threading import Thread, Lock, Event
import warnings
Expand Down Expand Up @@ -365,68 +364,51 @@ def __init__(self):
super(StreamCapturer, self).__init__()
self.streams = []
self.buffer = BytesIO()
self.streams_lock = Lock()
self.readfd, self.writefd = os.pipe()
self.buffer_lock = Lock()
self.stream_added = Event()
self.stop = Event()

def run(self):
self.started = True
while not self.stop.is_set():
with self.streams_lock:
streams = self.streams

if not streams:
self.stream_added.wait(timeout=1)
self.stream_added.clear()
continue
while not self.stop.is_set():
chunk = os.read(self.readfd, 1024)

ready = select(streams, [], [], 0.5)[0]
dead = []
with self.buffer_lock:
for fd in ready:
try:
self.buffer.write(os.read(fd, 1024))
except OSError as e:
import errno
if e.errno == errno.EBADF:
dead.append(fd)
else:
raise

with self.streams_lock:
for fd in dead:
self.streams.remove(fd)

def add_stream(self, fd):
with self.streams_lock:
self.streams.append(fd)
self.stream_added.set()
self.buffer.write(chunk)

def remove_stream(self, fd):
with self.streams_lock:
self.streams.remove(fd)

os.close(self.readfd)
os.close(self.writefd)

def reset_buffer(self):
with self.buffer_lock:
self.buffer.truncate(0)
self.buffer.seek(0)

def get_buffer(self):
with self.buffer_lock:
return self.buffer.getvalue()

def ensure_started(self):
if not self.started:
self.start()

def halt(self):
"""Safely stop the thread."""
if not self.started:
return

self.stop.set()
os.write(self.writefd, b'wake up') # Ensure we're not locked in a read()
self.join()

class SubprocessStreamCapturePlugin(Plugin):
name='subprocstreams'
def __init__(self):
Plugin.__init__(self)
self.stream_capturer = StreamCapturer()
# This is ugly, but distant parts of the test machinery need to be able
# to add streams, so we make the object globally accessible.
# to redirect streams, so we make the object globally accessible.
nose.ipy_stream_capturer = self.stream_capturer

def configure(self, options, config):
Expand Down Expand Up @@ -454,9 +436,7 @@ def formatFailure(self, test, err):
formatError = formatFailure

def finalize(self, result):
if self.stream_capturer.started:
self.stream_capturer.stop.set()
self.stream_capturer.join()
self.stream_capturer.halt()


def run_iptest():
Expand Down