Skip to content

Commit

Permalink
Merge 7cbc6eb into 2e88e22
Browse files Browse the repository at this point in the history
  • Loading branch information
ptthiem committed May 15, 2014
2 parents 2e88e22 + 7cbc6eb commit 2c0d290
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 5 deletions.
41 changes: 41 additions & 0 deletions docs/plugins/mp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,29 @@ config file::
``processes`` or pass :option:`-N`, the number of processes
defaults to the number of CPUs available.
Should one wish to specify the use of internet sockets for
interprocess communications, specify the ``bind_address``
setting in the ``[multiprocess]`` section of the config file,
for example::

[multiprocess]
bind_address = 127.0.0.1:1024

This will bind to port 1024 of ``127.0.0.1``. Also::

[multiprocess]
bind_address = 127.1.2.3

will bind to any random open port on ``127.1.2.3``. Any internet
address or host-name which python can recognize as such, bind, *and*
connect is acceptable. While ``0.0.0.0`` can be use for listening,
it is not necessarily an address to which the OS can connect. When
the port address is 0 or omitted, a random open port is used. If
the setting is omitted or or blank, then sockets are not used unless
nose is being executed on Windows. In which case, an address on
the loop back interface and a random port are used. Whenever used,
processes employ a random shared key for authentication.

Guidelines for Test Authors
---------------------------

Expand Down Expand Up @@ -228,6 +251,24 @@ Interacting with Users

If you're not doing that, start!

Possible Issues On Windows
--------------------------

On windows, there are a few know bugs with respect to multiprocessing.

First, on python 2.X or old versions of 3.X, if the __main__ module
accessing nose2 is a __main__.py, an assertion in python code module
``multiprocessing.forking`` may fail. The bug for 3.2 is
http://bugs.python.org/issue10845.

Secondly, python on windows does not use fork(). It bootstraps from a
separate interpreter invocation. In certain contexts, the "value" for
a parameter will be taken as a "count" and subprocess use this to build
the flag for the command-line. E.g., If this value is 2 billion
(like a hash seed), subprocess.py may attempt to built a 2gig string,
and possibly throw a MemoryError exception. The related bug is
http://bugs.python.org/issue20954.

Reference
---------

Expand Down
77 changes: 75 additions & 2 deletions nose2/plugins/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import multiprocessing
import select
import unittest
import collections

import os
import sys
import six

import multiprocessing.connection as connection
from nose2 import events, loader, result, runner, session, util

log = logging.getLogger(__name__)
Expand All @@ -19,12 +23,38 @@ def __init__(self):
self.testRunTimeout = self.config.as_float('test-run-timeout', 60.0)
self.procs = self.config.as_int(
'processes', multiprocessing.cpu_count())
self.setAddress(self.config.as_str('bind_address', None))

self.cases = {}

def setProcs(self, num):
self.procs = int(num[0]) # FIXME merge n fix
self.register()

def setAddress(self, address):
if address is None or address.strip() == '':
address = []
else:
address = [x.strip() for x in address.split(':')[:2]]

#Background: On Windows, select.select only works on sockets. So the
#ability to select a bindable address and optionally port for the mp
#plugin was added. Pipes should support a form of select, but this
#would require using pywin32. There are altnernatives but all have
#some kind of downside. An alternative might be creating a connection
#like object using a shared queue for incomings events.
self.bind_host = None
self.bind_port = 0

if sys.platform == "win32" or address:
self.bind_host = '127.116.157.163'
if address and address[0]:
self.bind_host = address[0]

self.bind_port = 0
if len(address) >= 2:
self.bind_port = int(address[1])

def pluginsLoaded(self, event):
self.addMethods('registerInSubprocess', 'startSubprocess',
'stopSubprocess')
Expand Down Expand Up @@ -56,7 +86,7 @@ def _runmp(self, test, result):
try:
remote_events = conn.recv()
except EOFError:
# probably dead
# probably dead/12
log.warning("Subprocess connection closed unexpectedly")
continue # XXX or die?

Expand Down Expand Up @@ -88,16 +118,56 @@ def _runmp(self, test, result):
for proc, _ in procs:
proc.join()

def _prepConns(self):
"""
If the bind_host is not none, return:
(multiprocessing.connection.Listener, (address, port, authkey))
else:
(parent_connection, child_connection)
For the former case: accept must be called on the listener. In order
to get a Connection object for the socket.
"""
if self.bind_host is not None:
#prevent "accidental" wire crossing
authkey = os.urandom(20)
address = (self.bind_host, self.bind_port)
listener = connection.Listener(address, authkey=authkey)
return (listener, listener.address + (authkey,))
else:
return multiprocessing.Pipe()

def _acceptConns(self, parent_conn):
"""
When listener is is a connection.Listener instance: accept the next
incoming connection. However, a timeout mechanism is needed. Since,
this functionality was added to support mp over inet sockets, will
assume a Socket based listen and will accept the private _socket
member to get a low_level socket to do a select on.
"""
if isinstance(parent_conn, connection.Listener):
#ick private interface
rdrs = [parent_conn._listener._socket]
readable, _, _ = select.select(rdrs, [], [],
self.testRunTimeout)
if readable:
return parent_conn.accept()
else:
raise RuntimeError('MP: Socket Connection Failed')
else:
return parent_conn

def _startProcs(self):
# XXX create session export
session_export = self._exportSession()
procs = []
for i in range(0, self.procs):
parent_conn, child_conn = multiprocessing.Pipe()
parent_conn, child_conn = self._prepConns()
proc = multiprocessing.Process(
target=procserver, args=(session_export, child_conn))
proc.daemon = True
proc.start()
parent_conn = self._acceptConns(parent_conn)
procs.append((proc, parent_conn))
return procs

Expand Down Expand Up @@ -200,6 +270,9 @@ def procserver(session_export, conn):
plugin.register()
rlog.debug("Registered %s in subprocess", plugin)

if isinstance(conn, collections.Sequence):
conn = connection.Client(conn[:2], authkey=conn[2])

event = SubprocessEvent(loader_, result_, runner_, ssn.plugins, conn)
res = ssn.hooks.startSubprocess(event)
if event.handled and not res:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[multiprocess]
bind_address = 127.1.2.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def check(_):
pass


def test():
for i in range(0, 600):
yield check, i

50 changes: 48 additions & 2 deletions nose2/tests/functional/test_mp_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from nose2.plugins import buffer
from nose2.plugins.loader import discovery, testcases
from nose2.tests._common import FunctionalTestCase, support_file, Conn
import multiprocessing
import threading
import time
from multiprocessing import connection


class TestMpPlugin(FunctionalTestCase):
Expand Down Expand Up @@ -71,6 +75,39 @@ def test_flatten_respects_class_fixtures(self):
'test_cf_testcase.Test3',
])

def test_conn_prep(self):
self.plugin.bind_host = None
(parent_conn, child_conn) = self.plugin._prepConns()
(parent_pipe, child_pipe) = multiprocessing.Pipe()
self.assertIsInstance(parent_conn, type(parent_pipe))
self.assertIsInstance(child_conn, type(child_pipe))

self.plugin.bind_host = "127.0.0.1"
self.plugin.bind_port = 0
(parent_conn, child_conn) = self.plugin._prepConns()
self.assertIsInstance(parent_conn, connection.Listener)
self.assertIsInstance(child_conn, tuple)
self.assertEqual(parent_conn.address, child_conn[:2])

def test_conn_accept(self):
(parent_conn, child_conn) = multiprocessing.Pipe()
self.assertEqual(self.plugin._acceptConns(parent_conn), parent_conn)

listener = connection.Listener(('127.0.0.1', 0))
with self.assertRaises(RuntimeError):
self.plugin._acceptConns(listener)

def fake_client(address):
client = connection.Client(address)
time.sleep(10)
client.close()

t = threading.Thread(target=fake_client, args=(listener.address,))
t.start()
conn = self.plugin._acceptConns(listener)
self.assertTrue(hasattr(conn, "send"))
self.assertTrue(hasattr(conn, "recv"))


class TestProcserver(FunctionalTestCase):

Expand Down Expand Up @@ -127,8 +164,6 @@ def test_dispatch_tests_receive_events(self):
self.assertEqual(getattr(event, attr), val)


@unittest.skipIf(sys.platform == 'win32',
'unable to use select.select with pipes on Windows')
class MPPluginTestRuns(FunctionalTestCase):

def test_tests_in_package(self):
Expand Down Expand Up @@ -177,3 +212,14 @@ def test_large_number_of_tests_stresstest(self):
self.assertTestRunOutputMatches(proc, stderr='Ran 600 tests')
self.assertEqual(proc.poll(), 0)

def test_socket_stresstest(self):
proc = self.runIn(
'scenario/many_tests_socket',
'-v',
'-c scenario/many_test_socket/nose2.cfg',
'--plugin=nose2.plugins.mp',
'--plugin=nose2.plugins.loader.generators',
'-N=1')
self.assertTestRunOutputMatches(proc, stderr='Ran 600 tests')
self.assertEqual(proc.poll(), 0)

30 changes: 30 additions & 0 deletions nose2/tests/unit/test_mp_plugin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from nose2 import session
from nose2.tests._common import TestCase, Conn
from nose2.plugins import mp
import sys


class TestMPPlugin(TestCase):
Expand Down Expand Up @@ -28,3 +29,32 @@ def test_recording_plugin_interface(self):
rpi.loadTestsFromModule(None)
rpi.loadTestsFromTestCase(None)
self.assertEqual(rpi.flush(), [('setTestOutcome', None)])

def test_address(self):
platform = sys.platform
try:
sys.platform = "linux"
host = "1.2.3.4"
port = 245
self.plugin.setAddress(host)
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
(host, 0))
self.plugin.setAddress("%s:%i" % (host, port))
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
(host, port))
self.plugin.setAddress(None)
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
(None, 0))
sys.platform = "win32"
self.plugin.setAddress(host)
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
(host, 0))
self.plugin.setAddress("%s:%i" % (host, port))
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
(host, port))
self.plugin.setAddress(None)
self.assertEqual((self.plugin.bind_host, self.plugin.bind_port),
("127.116.157.163", 0))
finally:
sys.platform = platform

43 changes: 43 additions & 0 deletions tox-win32.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[tox]
envlist=py26,py27,py32,py33,pypy,docs,self27,cov27

# Default settings for py27, py32, py33 and pypy
[testenv]
deps=-r{toxinidir}/requirements.txt
commands=python -m unittest discover []

[testenv:jython]
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements-py26.txt
commands=unit2 discover []

[testenv:docs]
basepython=python2.7
changedir=docs
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements-docs.txt
commands=sphinx-build -b html -d {envtmpdir}/doctrees . {envtmpdir}/html

[testenv:py26]
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements-py26.txt
commands=python unit_workaround.py discover []

[testenv:py27]
deps=-r{toxinidir}/requirements.txt
commands=python unit_workaround.py discover []

[testenv:self27]
basepython=python2.7
deps=-r{toxinidir}/requirements.txt
setenv=PYTHONPATH={toxinidir}
commands=python unit_workaround.py discover []

[testenv:cov27]
basepython=python2.7
deps=coverage>=3.3
-r{toxinidir}/requirements.txt
commands=coverage erase
coverage run unit_workaround.py discover []
coverage report --include=*nose2* --omit=*nose2/tests*
coverage html -d cover --include=*nose2* --omit=*nose2/tests*
6 changes: 5 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ commands=sphinx-build -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
[testenv:py26]
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements-py26.txt
commands=unit2 discover []
commands=python -m unittest discover []

[testenv:py27]
deps=-r{toxinidir}/requirements.txt
commands=python -m unittest discover []

[testenv:self27]
basepython=python2.7
Expand Down
29 changes: 29 additions & 0 deletions unit_workaround.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Python 2.7 Multiprocessing Unittest workaround.
=====================================================================
Due the manner in which multiprocessing is handled on windows
and the fact that __main__.py are actually called __main__
This workaround bypasses the fact that the calling unittest
script is called __main__
http://bugs.python.org/issue10845
This should be fine for python 3.2+, however, 2.7 and before will
not likely see a fix. This only affects the unittests called by tox.
Also with python 2.6 , the windows balks on unit2 (the sh script) not
being a valid executable.
"""
try:
import unittest2
from unittest2.main import main, TestProgram, USAGE_AS_MAIN
except ImportError:
import unittest
from unittest.main import main, TestProgram, USAGE_AS_MAIN

TestProgram.USAGE = USAGE_AS_MAIN

if __name__ == "__main__":
main(module=None)

0 comments on commit 2c0d290

Please sign in to comment.