Permalink
Browse files

add basic print tests for kernel

tests new mp.Process behavior for PR #2734
  • Loading branch information...
1 parent ddc672f commit 36b4d8c4d1e90febe52e3f7cfacb7c9bc73f4c0d @minrk committed Jan 3, 2013
Showing with 183 additions and 0 deletions.
  1. +183 −0 IPython/zmq/tests/test_kernel.py
@@ -0,0 +1,183 @@
+"""test the IPython Kernel"""
+
+#-------------------------------------------------------------------------------
+# Copyright (C) 2013 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-------------------------------------------------------------------------------
+
+#-------------------------------------------------------------------------------
+# Imports
+#-------------------------------------------------------------------------------
+
+import os
+import shutil
+import tempfile
+
+from Queue import Empty
+from contextlib import contextmanager
+from subprocess import PIPE
+
+import nose.tools as nt
+
+from IPython.zmq.blockingkernelmanager import BlockingKernelManager
+from IPython.zmq.tests.test_message_spec import execute, flush_channels
+from IPython.testing import decorators as dec
+from IPython.utils import path, py3compat
+
+#-------------------------------------------------------------------------------
+# Tests
+#-------------------------------------------------------------------------------
+
+def setup():
+ """setup temporary IPYTHONDIR for tests"""
+ global IPYTHONDIR
+ global save_env
+ global save_get_ipython_dir
+
+ IPYTHONDIR = tempfile.mkdtemp()
+
+ save_env = os.environ.copy()
+ os.environ["IPYTHONDIR"] = IPYTHONDIR
+
+ save_get_ipython_dir = path.get_ipython_dir
+ path.get_ipython_dir = lambda : IPYTHONDIR
+
+
+def teardown():
+ path.get_ipython_dir = save_get_ipython_dir
+ os.environ = save_env
+
+ try:
+ shutil.rmtree(IPYTHONDIR)
+ except (OSError, IOError):
+ # no such file
+ pass
+
+
+@contextmanager
+def new_kernel():
+ """start a kernel in a subprocess, and wait for it to be ready
+
+ Returns
+ -------
+ kernel_manager: connected KernelManager instance
+ """
+ KM = BlockingKernelManager()
+
+ KM.start_kernel(stdout=PIPE, stderr=PIPE)
+ KM.start_channels()
+
+ # wait for kernel to be ready
+ KM.shell_channel.execute("import sys")
+ KM.shell_channel.get_msg(block=True, timeout=5)
+ flush_channels(KM)
+ try:
+ yield KM
+ finally:
+ KM.stop_channels()
+ KM.shutdown_kernel()
+
+
+def assemble_output(iopub):
+ """assemble stdout/err from an execution"""
+ stdout = ''
+ stderr = ''
+ while True:
+ msg = iopub.get_msg(block=True, timeout=1)
+ msg_type = msg['msg_type']
+ content = msg['content']
+ if msg_type == 'status' and content['execution_state'] == 'idle':
+ # idle message signals end of output
+ break
+ elif msg['msg_type'] == 'stream':
+ if content['name'] == 'stdout':
+ stdout = stdout + content['data']
+ elif content['name'] == 'stderr':
+ stderr = stderr + content['data']
+ else:
+ raise KeyError("bad stream: %r" % content['name'])
+ else:
+ # other output, ignored
+ pass
+ return stdout, stderr
+
+
+def _check_mp_mode(km, expected=False, stream="stdout"):
+ execute(km=km, code="import sys")
+ flush_channels(km)
+ msg_id, content = execute(km=km, code="print sys.stdout._check_mp_mode()")
+ stdout, stderr = assemble_output(km.sub_channel)
+ nt.assert_equal(eval(stdout.strip()), expected)
+
+
+@dec.parametric
+def test_simple_print():
+ """simple print statement in kernel"""
+ with new_kernel() as km:
+ iopub = km.sub_channel
+
+ msg_id, content = execute(km=km, code="print ('hi')")
+ stdout, stderr = assemble_output(iopub)
+ yield nt.assert_equal(stdout, 'hi\n')
+ yield nt.assert_equal(stderr, '')
+ yield _check_mp_mode(km, expected=False)
+
+
+@dec.parametric
+def test_subprocess_print():
+ """printing from forked mp.Process"""
+ with new_kernel() as km:
+ iopub = km.sub_channel
+
+ yield _check_mp_mode(km, expected=False)
+ flush_channels(km)
+ np = 5
+ code = '\n'.join([
+ "import multiprocessing as mp",
+ "def f(x):",
+ " print 'hello',x",
+ "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
+ "for p in pool: p.start()",
+ "for p in pool: p.join()"
+ ])
+
+ expected = '\n'.join([
+ "hello %s" % i for i in range(np)
+ ]) + '\n'
+
+ msg_id, content = execute(km=km, code=code)
+ stdout, stderr = assemble_output(iopub)
+ yield nt.assert_equal(stdout.count("hello"), np, stdout)
+ for n in range(np):
+ yield nt.assert_equal(stdout.count(str(n)), 1, stdout)
+ yield nt.assert_equal(stderr, '')
+ yield _check_mp_mode(km, expected=True)
+ yield _check_mp_mode(km, expected=True, stream="stderr")
+
+
+@dec.parametric
+def test_subprocess_noprint():
+ """mp.Process without print doesn't trigger iostream mp_mode"""
+ with new_kernel() as km:
+ iopub = km.sub_channel
+
+ np = 5
+ code = '\n'.join([
+ "import multiprocessing as mp",
+ "def f(x):",
+ " return x",
+ "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
+ "for p in pool: p.start()",
+ "for p in pool: p.join()"
+ ])
+
+ msg_id, content = execute(km=km, code=code)
+ stdout, stderr = assemble_output(iopub)
+ yield nt.assert_equal(stdout, '')
+ yield nt.assert_equal(stderr, '')
+
+ yield _check_mp_mode(km, expected=False)
+ yield _check_mp_mode(km, expected=False, stream="stderr")
+

0 comments on commit 36b4d8c

Please sign in to comment.