forked from ipython/ipython
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests new mp.Process behavior for PR ipython#2734
- Loading branch information
Showing
1 changed file
with
183 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
"""test the IPython Kernel""" | ||
|
||
#------------------------------------------------------------------------------- | ||
# Copyright (C) 2013 The IPython Development Team | ||
# | ||
# Distributed under the terms of the BSD License. The full license is in | ||
# the file COPYING, distributed as part of this software. | ||
#------------------------------------------------------------------------------- | ||
|
||
#------------------------------------------------------------------------------- | ||
# Imports | ||
#------------------------------------------------------------------------------- | ||
|
||
import os | ||
import shutil | ||
import tempfile | ||
|
||
from Queue import Empty | ||
from contextlib import contextmanager | ||
from subprocess import PIPE | ||
|
||
import nose.tools as nt | ||
|
||
from IPython.zmq.blockingkernelmanager import BlockingKernelManager | ||
from IPython.zmq.tests.test_message_spec import execute, flush_channels | ||
from IPython.testing import decorators as dec | ||
from IPython.utils import path, py3compat | ||
|
||
#------------------------------------------------------------------------------- | ||
# Tests | ||
#------------------------------------------------------------------------------- | ||
|
||
def setup(): | ||
"""setup temporary IPYTHONDIR for tests""" | ||
global IPYTHONDIR | ||
global save_env | ||
global save_get_ipython_dir | ||
|
||
IPYTHONDIR = tempfile.mkdtemp() | ||
|
||
save_env = os.environ.copy() | ||
os.environ["IPYTHONDIR"] = IPYTHONDIR | ||
|
||
save_get_ipython_dir = path.get_ipython_dir | ||
path.get_ipython_dir = lambda : IPYTHONDIR | ||
|
||
|
||
def teardown(): | ||
path.get_ipython_dir = save_get_ipython_dir | ||
os.environ = save_env | ||
|
||
try: | ||
shutil.rmtree(IPYTHONDIR) | ||
except (OSError, IOError): | ||
# no such file | ||
pass | ||
|
||
|
||
@contextmanager | ||
def new_kernel(): | ||
"""start a kernel in a subprocess, and wait for it to be ready | ||
Returns | ||
------- | ||
kernel_manager: connected KernelManager instance | ||
""" | ||
KM = BlockingKernelManager() | ||
|
||
KM.start_kernel(stdout=PIPE, stderr=PIPE) | ||
KM.start_channels() | ||
|
||
# wait for kernel to be ready | ||
KM.shell_channel.execute("import sys") | ||
KM.shell_channel.get_msg(block=True, timeout=5) | ||
flush_channels(KM) | ||
try: | ||
yield KM | ||
finally: | ||
KM.stop_channels() | ||
KM.shutdown_kernel() | ||
|
||
|
||
def assemble_output(iopub): | ||
"""assemble stdout/err from an execution""" | ||
stdout = '' | ||
stderr = '' | ||
while True: | ||
msg = iopub.get_msg(block=True, timeout=1) | ||
msg_type = msg['msg_type'] | ||
content = msg['content'] | ||
if msg_type == 'status' and content['execution_state'] == 'idle': | ||
# idle message signals end of output | ||
break | ||
elif msg['msg_type'] == 'stream': | ||
if content['name'] == 'stdout': | ||
stdout = stdout + content['data'] | ||
elif content['name'] == 'stderr': | ||
stderr = stderr + content['data'] | ||
else: | ||
raise KeyError("bad stream: %r" % content['name']) | ||
else: | ||
# other output, ignored | ||
pass | ||
return stdout, stderr | ||
|
||
|
||
def _check_mp_mode(km, expected=False, stream="stdout"): | ||
execute(km=km, code="import sys") | ||
flush_channels(km) | ||
msg_id, content = execute(km=km, code="print (sys.stdout._check_mp_mode())") | ||
stdout, stderr = assemble_output(km.sub_channel) | ||
nt.assert_equal(eval(stdout.strip()), expected) | ||
|
||
|
||
@dec.parametric | ||
def test_simple_print(): | ||
"""simple print statement in kernel""" | ||
with new_kernel() as km: | ||
iopub = km.sub_channel | ||
|
||
msg_id, content = execute(km=km, code="print ('hi')") | ||
stdout, stderr = assemble_output(iopub) | ||
yield nt.assert_equal(stdout, 'hi\n') | ||
yield nt.assert_equal(stderr, '') | ||
yield _check_mp_mode(km, expected=False) | ||
|
||
|
||
@dec.parametric | ||
def test_subprocess_print(): | ||
"""printing from forked mp.Process""" | ||
with new_kernel() as km: | ||
iopub = km.sub_channel | ||
|
||
yield _check_mp_mode(km, expected=False) | ||
flush_channels(km) | ||
np = 5 | ||
code = '\n'.join([ | ||
"import multiprocessing as mp", | ||
"def f(x):", | ||
" print 'hello',x", | ||
"pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np, | ||
"for p in pool: p.start()", | ||
"for p in pool: p.join()" | ||
]) | ||
|
||
expected = '\n'.join([ | ||
"hello %s" % i for i in range(np) | ||
]) + '\n' | ||
|
||
msg_id, content = execute(km=km, code=code) | ||
stdout, stderr = assemble_output(iopub) | ||
yield nt.assert_equal(stdout.count("hello"), np, stdout) | ||
for n in range(np): | ||
yield nt.assert_equal(stdout.count(str(n)), 1, stdout) | ||
yield nt.assert_equal(stderr, '') | ||
yield _check_mp_mode(km, expected=True) | ||
yield _check_mp_mode(km, expected=True, stream="stderr") | ||
|
||
|
||
@dec.parametric | ||
def test_subprocess_noprint(): | ||
"""mp.Process without print doesn't trigger iostream mp_mode""" | ||
with new_kernel() as km: | ||
iopub = km.sub_channel | ||
|
||
np = 5 | ||
code = '\n'.join([ | ||
"import multiprocessing as mp", | ||
"def f(x):", | ||
" return x", | ||
"pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np, | ||
"for p in pool: p.start()", | ||
"for p in pool: p.join()" | ||
]) | ||
|
||
msg_id, content = execute(km=km, code=code) | ||
stdout, stderr = assemble_output(iopub) | ||
yield nt.assert_equal(stdout, '') | ||
yield nt.assert_equal(stderr, '') | ||
|
||
yield _check_mp_mode(km, expected=False) | ||
yield _check_mp_mode(km, expected=False, stream="stderr") | ||
|