From ab9f6a75adc14a31436a60214e165e1915a65fef Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 2 Oct 2009 16:58:57 +0200 Subject: [PATCH] remove py.execnet, substitute py.execnet usages with "execnet" ones. --HG-- branch : trunk --- bin-for-dist/gensetup.py | 2 +- bin-for-dist/test_install.py | 2 +- conftest.py | 9 +- contrib/svn-sync-repo.py | 4 +- contrib/sysinfo.py | 2 +- doc/changelog.txt | 2 + doc/execnet.txt | 264 +----- doc/test/customize.txt | 2 +- doc/test/funcargs.txt | 2 +- example/execnet/popen_read_multiple.py | 4 +- example/execnet/redirect_remote_output.py | 2 +- example/execnet/svn-sync-repo.py | 2 +- example/execnet/sysinfo.py | 2 +- example/funcarg/mysetup2/conftest.py | 2 +- py/__init__.py | 25 +- py/execnet/__init__.py | 1 - py/execnet/gateway.py | 354 -------- py/execnet/gateway_base.py | 757 ------------------ py/execnet/multi.py | 71 -- py/execnet/rsync.py | 201 ----- py/execnet/rsync_remote.py | 92 --- py/execnet/script/__init__.py | 1 - py/execnet/script/loop_socketserver.py | 14 - py/execnet/script/quitserver.py | 16 - py/execnet/script/shell.py | 85 -- py/execnet/script/socketserver.py | 102 --- py/execnet/script/socketserverservice.py | 91 --- py/execnet/script/xx.py | 9 - py/execnet/serializer.py | 272 ------- py/execnet/xspec.py | 79 -- py/path/gateway/channeltest.py | 2 +- py/path/gateway/channeltest2.py | 4 +- py/test/config.py | 3 +- py/test/defaultconftest.py | 2 +- py/test/dist/gwmanage.py | 13 +- py/test/dist/mypickle.py | 2 +- py/test/looponfail/remote.py | 5 +- py/test/plugin/pytest_execnetcleanup.py | 41 - setup.py | 12 +- testing/execnet/__init__.py | 1 - testing/execnet/conftest.py | 46 -- testing/execnet/test_basics.py | 198 ----- testing/execnet/test_gateway.py | 545 ------------- testing/execnet/test_multi.py | 58 -- testing/execnet/test_rsync.py | 148 ---- testing/execnet/test_serializer.py | 179 ----- testing/execnet/test_xspec.py | 151 ---- testing/pytest/dist/test_dsession.py | 3 +- testing/pytest/dist/test_gwmanage.py | 5 +- testing/pytest/dist/test_mypickle.py | 3 +- testing/pytest/dist/test_txnode.py | 5 +- .../plugin/test_pytest_execnetcleanup.py | 12 - testing/pytest/plugin/test_pytest_terminal.py | 5 +- testing/pytest/test_pickling.py | 3 +- 54 files changed, 66 insertions(+), 3851 deletions(-) delete mode 100644 py/execnet/__init__.py delete mode 100644 py/execnet/gateway.py delete mode 100644 py/execnet/gateway_base.py delete mode 100644 py/execnet/multi.py delete mode 100644 py/execnet/rsync.py delete mode 100644 py/execnet/rsync_remote.py delete mode 100644 py/execnet/script/__init__.py delete mode 100644 py/execnet/script/loop_socketserver.py delete mode 100644 py/execnet/script/quitserver.py delete mode 100755 py/execnet/script/shell.py delete mode 100755 py/execnet/script/socketserver.py delete mode 100644 py/execnet/script/socketserverservice.py delete mode 100644 py/execnet/script/xx.py delete mode 100755 py/execnet/serializer.py delete mode 100644 py/execnet/xspec.py delete mode 100644 py/test/plugin/pytest_execnetcleanup.py delete mode 100644 testing/execnet/__init__.py delete mode 100644 testing/execnet/conftest.py delete mode 100644 testing/execnet/test_basics.py delete mode 100644 testing/execnet/test_gateway.py delete mode 100644 testing/execnet/test_multi.py delete mode 100644 testing/execnet/test_rsync.py delete mode 100755 testing/execnet/test_serializer.py delete mode 100644 testing/execnet/test_xspec.py delete mode 100644 testing/pytest/plugin/test_pytest_execnetcleanup.py diff --git a/bin-for-dist/gensetup.py b/bin-for-dist/gensetup.py index 5c6d9b2762b..8e13e064498 100644 --- a/bin-for-dist/gensetup.py +++ b/bin-for-dist/gensetup.py @@ -3,7 +3,7 @@ sys.path.insert(0, sys.argv[1]) import py -toolpath = py.magic.autopath() +toolpath = py.path.local(__file__) binpath = py.path.local(py.__file__).dirpath('bin') def error(msg): diff --git a/bin-for-dist/test_install.py b/bin-for-dist/test_install.py index 3d258792dfb..14ada3c488f 100644 --- a/bin-for-dist/test_install.py +++ b/bin-for-dist/test_install.py @@ -78,7 +78,7 @@ def create(self, sitepackages=False): def makegateway(self): python = self._cmd('python') - return py.execnet.makegateway("popen//python=%s" %(python,)) + return execnet.makegateway("popen//python=%s" %(python,)) def pcall(self, cmd, *args, **kw): self.ensure() diff --git a/conftest.py b/conftest.py index a06dfc85059..c35968219bf 100644 --- a/conftest.py +++ b/conftest.py @@ -17,15 +17,18 @@ def pytest_addoption(parser): def pytest_funcarg__specssh(request): return getspecssh(request.config) -def pytest_funcarg__specsocket(request): - return getsocketspec(request.config) +def getgspecs(config=None): + if config is None: + config = py.test.config + return [execnet.XSpec(spec) + for spec in config.getvalueorskip("gspecs")] # configuration information for tests def getgspecs(config=None): if config is None: config = py.test.config - return [py.execnet.XSpec(spec) + return [execnet.XSpec(spec) for spec in config.getvalueorskip("gspecs")] def getspecssh(config=None): diff --git a/contrib/svn-sync-repo.py b/contrib/svn-sync-repo.py index 18a07888628..1d1fb775d3b 100644 --- a/contrib/svn-sync-repo.py +++ b/contrib/svn-sync-repo.py @@ -3,7 +3,7 @@ """ small utility for hot-syncing a svn repository through ssh. -uses py.execnet. +uses execnet. """ @@ -105,7 +105,7 @@ def get_svn_youngest(repo): return int(rev) def getgateway(host, keyfile=None): - return py.execnet.SshGateway(host, identity=keyfile) + return execnet.SshGateway(host, identity=keyfile) if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/contrib/sysinfo.py b/contrib/sysinfo.py index 0c360c423cb..a4c44dd9829 100644 --- a/contrib/sysinfo.py +++ b/contrib/sysinfo.py @@ -95,7 +95,7 @@ def error(*args): def getinfo(sshname, ssh_config=None, loginfo=sys.stdout): debug("connecting to", sshname) try: - gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config) + gw = execnet.SshGateway(sshname, ssh_config=ssh_config) except IOError: error("could not get sshagteway", sshname) else: diff --git a/doc/changelog.txt b/doc/changelog.txt index ede46b4ff52..cadbb794b38 100644 --- a/doc/changelog.txt +++ b/doc/changelog.txt @@ -1,6 +1,8 @@ Changes between 1.0.x and 'trunk' ===================================== +* remove py.execnet code and substitute all usages with 'execnet' proper + * fix issue50 - cached_setup now caches more to expectations for test functions with multiple arguments. diff --git a/doc/execnet.txt b/doc/execnet.txt index 07c29762cbf..4ca1a30d38c 100644 --- a/doc/execnet.txt +++ b/doc/execnet.txt @@ -2,263 +2,11 @@ py.execnet: *elastic* distributed programming ============================================================================== -``execnet`` helps you to: +Since pylib 1.1 "py.execnet" is separated out of hte lib and now +available through the standalone `execnet standalone package`_. -* ad-hoc instantiate local or remote Python Processes -* send code for execution in one or many processes -* send and receive data between processes through channels - -One of it's unique features is that it uses a **zero-install** -technique: no manual installation steps are required on -remote places, only a basic working Python interpreter -and some input/output connection to it. - -There is a `EuroPython2009 talk`_ from July 2009 with -examples and some pictures. - -.. contents:: - :local: - :depth: 2 - -.. _`EuroPython2009 talk`: http://codespeak.net/download/py/ep2009-execnet.pdf - -Gateways: immediately spawn local or remote process -=================================================== - -In order to send code to a remote place or a subprocess -you need to instantiate a so-called Gateway object. -There are currently three Gateway classes: - -* :api:`py.execnet.PopenGateway` to open a subprocess - on the local machine. Useful for making use - of multiple processors to to contain code execution - in a separated environment. - -* :api:`py.execnet.SshGateway` to connect to - a remote ssh server and distribute execution to it. - -* :api:`py.execnet.SocketGateway` a way to connect to - a remote Socket based server. *Note* that this method - requires a manually started - :source:py/execnet/script/socketserver.py - script. You can run this "server script" without - having the py lib installed on the remote system - and you can setup it up as permanent service. - - -remote_exec: execute source code remotely -=================================================== - -All gateways offer remote code execution via this high level function:: - - def remote_exec(source): - """return channel object for communicating with the asynchronously - executing 'source' code which will have a corresponding 'channel' - object in its executing namespace.""" - -With `remote_exec` you send source code to the other -side and get both a local and a remote Channel_ object, -which you can use to have the local and remote site -communicate data in a structured way. Here is -an example for reading the PID:: - - >>> import py - >>> gw = py.execnet.PopenGateway() - >>> channel = gw.remote_exec(""" - ... import os - ... channel.send(os.getpid()) - ... """) - >>> remote_pid = channel.receive() - >>> remote_pid != py.std.os.getpid() - True - -.. _`Channel`: -.. _`channel-api`: -.. _`exchange data`: - -Channels: bidirectionally exchange data between hosts -======================================================= - -A channel object allows to send and receive data between -two asynchronously running programs. When calling -`remote_exec` you will get a channel object back and -the code fragment running on the other side will -see a channel object in its global namespace. - -Here is the interface of channel objects:: - - # - # API for sending and receiving anonymous values - # - channel.send(item): - sends the given item to the other side of the channel, - possibly blocking if the sender queue is full. - Note that items need to be marshallable (all basic - python types are). - - channel.receive(): - receives an item that was sent from the other side, - possibly blocking if there is none. - Note that exceptions from the other side will be - reraised as gateway.RemoteError exceptions containing - a textual representation of the remote traceback. - - channel.waitclose(timeout=None): - wait until this channel is closed. Note that a closed - channel may still hold items that will be received or - send. Note that exceptions from the other side will be - reraised as gateway.RemoteError exceptions containing - a textual representation of the remote traceback. - - channel.close(): - close this channel on both the local and the remote side. - A remote side blocking on receive() on this channel - will get woken up and see an EOFError exception. - - -.. _xspec: - - -XSpec: string specification for gateway type and configuration -=============================================================== - -``py.execnet`` supports a simple extensible format for -specifying and configuring Gateways for remote execution. -You can use a string specification to instantiate a new gateway, -for example a new SshGateway:: - - gateway = py.execnet.makegateway("ssh=myhost") - -Let's look at some examples for valid specifications. -Specification for an ssh connection to `wyvern`, running on python2.4 in the (newly created) 'mycache' subdirectory:: - - ssh=wyvern//python=python2.4//chdir=mycache - -Specification of a python2.5 subprocess; with a low CPU priority ("nice" level). Current dir will be the current dir of the instantiator (that's true for all 'popen' specifications unless they specify 'chdir'):: - - popen//python=2.5//nice=20 - -Specification of a Python Socket server process that listens on 192.168.1.4:8888; current dir will be the 'pyexecnet-cache' sub directory which is used a default for all remote processes:: - - socket=192.168.1.4:8888 - -More generally, a specification string has this general format:: - - key1=value1//key2=value2//key3=value3 - -If you omit a value, a boolean true value is assumed. Currently -the following key/values are supported: - -* ``popen`` for a PopenGateway -* ``ssh=host`` for a SshGateway -* ``socket=address:port`` for a SocketGateway -* ``python=executable`` for specifying Python executables -* ``chdir=path`` change remote working dir to given relative or absolute path -* ``nice=value`` decrease remote nice level if platforms supports it - - -Examples of py.execnet usage -=============================================================== - -Compare cwd() of Popen Gateways ----------------------------------------- - -A PopenGateway has the same working directory as the instantiatior:: - - >>> import py, os - >>> gw = py.execnet.PopenGateway() - >>> ch = gw.remote_exec("import os; channel.send(os.getcwd())") - >>> res = ch.receive() - >>> assert res == os.getcwd() - >>> gw.exit() - -Synchronously receive results from two sub processes ------------------------------------------------------ - -Use MultiChannels for receiving multiple results from remote code:: - - >>> import py - >>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)") - >>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)") - >>> mch = py.execnet.MultiChannel([ch1, ch2]) - >>> l = mch.receive_each() - >>> assert len(l) == 2 - >>> assert 1 in l - >>> assert 2 in l - -Asynchronously receive results from two sub processes ------------------------------------------------------ - -Use ``MultiChannel.make_receive_queue()`` for asynchronously receiving -multiple results from remote code. This standard Queue provides -``(channel, result)`` tuples which allows to determine where -a result comes from:: - - >>> import py - >>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)") - >>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)") - >>> mch = py.execnet.MultiChannel([ch1, ch2]) - >>> queue = mch.make_receive_queue() - >>> chan1, res1 = queue.get() # you may also specify a timeout - >>> chan2, res2 = queue.get() - >>> res1 + res2 - 3 - >>> assert chan1 in (ch1, ch2) - >>> assert chan2 in (ch1, ch2) - >>> assert chan1 != chan2 - -Receive file contents from remote SSH account ------------------------------------------------------ - -Here is a small program that you can use to retrieve -contents of remote files:: - - import py - # open a gateway to a fresh child process - gw = py.execnet.SshGateway('codespeak.net') - channel = gw.remote_exec(""" - for fn in channel: - f = open(fn, 'rb') - channel.send(f.read()) - f.close() - """) - - for fn in somefilelist: - channel.send(fn) - content = channel.receive() - # process content - - # later you can exit / close down the gateway - gw.exit() - - -Instantiate a socket server in a new subprocess ------------------------------------------------------ - -The following example opens a PopenGateway, i.e. a python -child process, and starts a socket server within that process -and then opens a second gateway to the freshly started -socketserver:: - - import py - - popengw = py.execnet.PopenGateway() - socketgw = py.execnet.SocketGateway.new_remote(popengw, ("127.0.0.1", 0)) - - print socketgw._rinfo() # print some info about the remote environment - - -Sending a module / checking if run through remote_exec --------------------------------------------------------------- - -You can pass a module object to ``remote_exec`` in which case -its source code will be sent. No dependencies will be transferred -so the module must be self-contained or only use modules that are -installed on the "other" side. Module code can detect if it is -running in a remote_exec situation by checking for the special -``__name__`` attribute like this:: - - if __name__ == '__channelexec__': - # ... call module functions ... - +If you have usages of the "py.execnet.*" 1.0 API you can likely +rename all occurences of the string ``py.execnet.`` with the +string ``execnet.``. +.. _`execnet standalone package`: http://codespeak.net/execnet diff --git a/doc/test/customize.txt b/doc/test/customize.txt index 8a1fd5808dc..542a72317fd 100644 --- a/doc/test/customize.txt +++ b/doc/test/customize.txt @@ -364,7 +364,7 @@ remote environment. For this you can implement the newgateway hook: def pytest_gwmanage_newgateway(gateway, platinfo): """ called after a gateway is instantiated. """ -The ``gateway`` object here has a ``spec`` attribute which is an ``py.execnet.XSpec`` +The ``gateway`` object here has a ``spec`` attribute which is an ``execnet.XSpec`` object, which has attributes that map key/values as specified from a ``--txspec`` option. The platinfo object is a dictionary with information about the remote process: diff --git a/doc/test/funcargs.txt b/doc/test/funcargs.txt index 0613a96f857..c6b6873ec75 100644 --- a/doc/test/funcargs.txt +++ b/doc/test/funcargs.txt @@ -165,7 +165,7 @@ and to offer a new mysetup method: host = self.config.option.ssh if host is None: py.test.skip("specify ssh host with --ssh") - return py.execnet.SshGateway(host) + return execnet.SshGateway(host) Now any test function can use the ``mysetup.getsshconnection()`` method like this: diff --git a/example/execnet/popen_read_multiple.py b/example/execnet/popen_read_multiple.py index 0f5e2a5155f..34ffbb65dc8 100644 --- a/example/execnet/popen_read_multiple.py +++ b/example/execnet/popen_read_multiple.py @@ -9,7 +9,7 @@ channels = [] for i in range(NUM_PROCESSES): - gw = py.execnet.PopenGateway() # or use SSH or socket gateways + gw = execnet.PopenGateway() # or use SSH or socket gateways channel = gw.remote_exec(""" import time secs = channel.receive() @@ -19,7 +19,7 @@ channels.append(channel) print "*** instantiated subprocess", gw -mc = py.execnet.MultiChannel(channels) +mc = execnet.MultiChannel(channels) queue = mc.make_receive_queue() print "***", "verifying that timeout on receiving results from blocked subprocesses works" diff --git a/example/execnet/redirect_remote_output.py b/example/execnet/redirect_remote_output.py index 86f303f4328..e36d41462a4 100644 --- a/example/execnet/redirect_remote_output.py +++ b/example/execnet/redirect_remote_output.py @@ -10,7 +10,7 @@ import py -gw = py.execnet.PopenGateway() +gw = execnet.PopenGateway() outchan = gw.remote_exec(""" import sys diff --git a/example/execnet/svn-sync-repo.py b/example/execnet/svn-sync-repo.py index 1d4b5dbf2d2..5e677a5f69b 100644 --- a/example/execnet/svn-sync-repo.py +++ b/example/execnet/svn-sync-repo.py @@ -82,7 +82,7 @@ def get_svn_youngest(repo): return int(rev) def getgateway(host, keyfile=None): - return py.execnet.SshGateway(host, identity=keyfile) + return execnet.SshGateway(host, identity=keyfile) if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/example/execnet/sysinfo.py b/example/execnet/sysinfo.py index 0c360c423cb..a4c44dd9829 100644 --- a/example/execnet/sysinfo.py +++ b/example/execnet/sysinfo.py @@ -95,7 +95,7 @@ def error(*args): def getinfo(sshname, ssh_config=None, loginfo=sys.stdout): debug("connecting to", sshname) try: - gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config) + gw = execnet.SshGateway(sshname, ssh_config=ssh_config) except IOError: error("could not get sshagteway", sshname) else: diff --git a/example/funcarg/mysetup2/conftest.py b/example/funcarg/mysetup2/conftest.py index 8f9580ca238..ccce424500c 100644 --- a/example/funcarg/mysetup2/conftest.py +++ b/example/funcarg/mysetup2/conftest.py @@ -20,5 +20,5 @@ def getsshconnection(self): host = self.config.option.ssh if host is None: py.test.skip("specify ssh host with --ssh") - return py.execnet.SshGateway(host) + return execnet.SshGateway(host) diff --git a/py/__init__.py b/py/__init__.py index f1c403a322a..13a302327e4 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -1,22 +1,20 @@ # -*- coding: utf-8 -*- """ -advanced testing and development support library: +advanced testing and development support library: - `py.test`_: cross-project testing tool with many advanced features -- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes -- `py.path`_: path abstractions over local and subversion files +- `py.path`_: path abstractions over local and subversion files - `py.code`_: dynamic code compile and traceback printing support -Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6. +Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1. For questions please check out http://pylib.org/contact.html .. _`py.test`: http://pylib.org/test.html -.. _`py.execnet`: http://pylib.org/execnet.html .. _`py.path`: http://pylib.org/path.html .. _`py.code`: http://pylib.org/code.html -(c) Holger Krekel and others, 2009 +(c) Holger Krekel and others, 2009 """ from py.initpkg import initpkg trunk = "trunk" @@ -159,21 +157,6 @@ 'builtin.execfile' : ('./builtin/builtin31.py', 'execfile'), 'builtin.callable' : ('./builtin/builtin31.py', 'callable'), - # gateways into remote contexts - 'execnet.__doc__' : ('./execnet/__init__.py', '__doc__'), - 'execnet._HookSpecs' : ('./execnet/gateway_base.py', 'ExecnetAPI'), - 'execnet.SocketGateway' : ('./execnet/gateway.py', 'SocketGateway'), - 'execnet.PopenGateway' : ('./execnet/gateway.py', 'PopenGateway'), - 'execnet.SshGateway' : ('./execnet/gateway.py', 'SshGateway'), - 'execnet.HostNotFound' : ('./execnet/gateway.py', 'HostNotFound'), - 'execnet.XSpec' : ('./execnet/xspec.py', 'XSpec'), - 'execnet.makegateway' : ('./execnet/xspec.py', 'makegateway'), - 'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'), - 'execnet.MultiChannel' : ('./execnet/multi.py', 'MultiChannel'), - - # execnet scripts - 'execnet.RSync' : ('./execnet/rsync.py', 'RSync'), - # input-output helping 'io.__doc__' : ('./io/__init__.py', '__doc__'), 'io.dupfile' : ('./io/capture.py', 'dupfile'), diff --git a/py/execnet/__init__.py b/py/execnet/__init__.py deleted file mode 100644 index 603aa384e37..00000000000 --- a/py/execnet/__init__.py +++ /dev/null @@ -1 +0,0 @@ -""" ad-hoc networking mechanism """ diff --git a/py/execnet/gateway.py b/py/execnet/gateway.py deleted file mode 100644 index 29d9608e338..00000000000 --- a/py/execnet/gateway.py +++ /dev/null @@ -1,354 +0,0 @@ -""" -gateway code for initiating popen, socket and ssh connections. -(c) 2004-2009, Holger Krekel and others -""" - -import sys, os, inspect, socket, atexit, weakref -import py -from py.__.execnet.gateway_base import Message, Popen2IO, SocketIO -from py.__.execnet import gateway_base - -debug = False - -class GatewayCleanup: - def __init__(self): - self._activegateways = weakref.WeakKeyDictionary() - atexit.register(self.cleanup_atexit) - - def register(self, gateway): - assert gateway not in self._activegateways - self._activegateways[gateway] = True - - def unregister(self, gateway): - del self._activegateways[gateway] - - def cleanup_atexit(self): - if debug: - debug.writeslines(["="*20, "cleaning up", "=" * 20]) - debug.flush() - for gw in list(self._activegateways): - gw.exit() - #gw.join() # should work as well - -class ExecnetAPI: - def pyexecnet_gateway_init(self, gateway): - """ signal initialisation of new gateway. """ - def pyexecnet_gateway_exit(self, gateway): - """ signal exitting of gateway. """ - -class InitiatingGateway(gateway_base.BaseGateway): - """ initialize gateways on both sides of a inputoutput object. """ - # XXX put the next two global variables into an Execnet object - # which intiaties gateways and passes in appropriate values. - _cleanup = GatewayCleanup() - hook = ExecnetAPI() - - def __init__(self, io): - self._remote_bootstrap_gateway(io) - super(InitiatingGateway, self).__init__(io=io, _startcount=1) - self._initreceive() - self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry) - self.hook.pyexecnet_gateway_init(gateway=self) - self._cleanup.register(self) - - def __repr__(self): - """ return string representing gateway type and status. """ - if hasattr(self, 'remoteaddress'): - addr = '[%s]' % (self.remoteaddress,) - else: - addr = '' - try: - r = (self._receiverthread.isAlive() and "receiving" or - "not receiving") - s = "sending" # XXX - i = len(self._channelfactory.channels()) - except AttributeError: - r = s = "uninitialized" - i = "no" - return "<%s%s %s/%s (%s active channels)>" %( - self.__class__.__name__, addr, r, s, i) - - def exit(self): - """ Try to stop all exec and IO activity. """ - try: - self._cleanup.unregister(self) - except KeyError: - return # we assume it's already happened - self._stopexec() - self._stopsend() - self.hook.pyexecnet_gateway_exit(gateway=self) - - def _remote_bootstrap_gateway(self, io, extra=''): - """ return Gateway with a asynchronously remotely - initialized counterpart Gateway (which may or may not succeed). - Note that the other sides gateways starts enumerating - its channels with even numbers while the sender - gateway starts with odd numbers. This allows to - uniquely identify channels across both sides. - """ - bootstrap = [extra] - bootstrap += [inspect.getsource(gateway_base)] - bootstrap += [io.server_stmt, - "io.write('1'.encode('ascii'))", - "SlaveGateway(io=io, _startcount=2).serve()", - ] - source = "\n".join(bootstrap) - self._trace("sending gateway bootstrap code") - #open("/tmp/bootstrap.py", 'w').write(source) - repr_source = repr(source) + "\n" - io.write(repr_source.encode('ascii')) - s = io.read(1) - assert s == "1".encode('ascii') - - def _rinfo(self, update=False): - """ return some sys/env information from remote. """ - if update or not hasattr(self, '_cache_rinfo'): - ch = self.remote_exec(rinfo_source) - self._cache_rinfo = RInfo(**ch.receive()) - return self._cache_rinfo - - def remote_exec(self, source): - """ return channel object and connect it to a remote - execution thread where the given 'source' executes - and has the sister 'channel' object in its global - namespace. - """ - source = str(py.code.Source(source)) - channel = self.newchannel() - self._send(Message.CHANNEL_OPEN(channel.id, source)) - return channel - - def remote_init_threads(self, num=None): - """ start up to 'num' threads for subsequent - remote_exec() invocations to allow concurrent - execution. - """ - if hasattr(self, '_remotechannelthread'): - raise IOError("remote threads already running") - from py.__.thread import pool - source = py.code.Source(pool, """ - execpool = WorkerPool(maxthreads=%r) - gw = channel.gateway - while 1: - task = gw._execqueue.get() - if task is None: - gw._stopsend() - execpool.shutdown() - execpool.join() - raise gw._StopExecLoop - execpool.dispatch(gw.executetask, task) - """ % num) - self._remotechannelthread = self.remote_exec(source) - - def _remote_redirect(self, stdout=None, stderr=None): - """ return a handle representing a redirection of a remote - end's stdout to a local file object. with handle.close() - the redirection will be reverted. - """ - # XXX implement a remote_exec_in_globals(...) - # to send ThreadOut implementation over - clist = [] - for name, out in ('stdout', stdout), ('stderr', stderr): - if out: - outchannel = self.newchannel() - outchannel.setcallback(getattr(out, 'write', out)) - channel = self.remote_exec(""" - import sys - outchannel = channel.receive() - ThreadOut(sys, %r).setdefaultwriter(outchannel.send) - """ % name) - channel.send(outchannel) - clist.append(channel) - for c in clist: - c.waitclose() - class Handle: - def close(_): - for name, out in ('stdout', stdout), ('stderr', stderr): - if out: - c = self.remote_exec(""" - import sys - channel.gateway._ThreadOut(sys, %r).resetdefault() - """ % name) - c.waitclose() - return Handle() - - - -class RInfo: - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - def __repr__(self): - info = ", ".join(["%s=%s" % item - for item in self.__dict__.items()]) - return "" % info - -rinfo_source = """ -import sys, os -channel.send(dict( - executable = sys.executable, - version_info = tuple([sys.version_info[i] for i in range(5)]), - platform = sys.platform, - cwd = os.getcwd(), - pid = os.getpid(), -)) -""" - -class PopenCmdGateway(InitiatingGateway): - def __init__(self, args): - from subprocess import Popen, PIPE - self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE) - io = Popen2IO(p.stdin, p.stdout) - super(PopenCmdGateway, self).__init__(io=io) - - def exit(self): - super(PopenCmdGateway, self).exit() - self._popen.poll() - -popen_bootstrapline = "import sys ; exec(eval(sys.stdin.readline()))" -class PopenGateway(PopenCmdGateway): - """ This Gateway provides interaction with a newly started - python subprocess. - """ - def __init__(self, python=None): - """ instantiate a gateway to a subprocess - started with the given 'python' executable. - """ - if not python: - python = sys.executable - args = [str(python), '-c', popen_bootstrapline] - super(PopenGateway, self).__init__(args) - - def _remote_bootstrap_gateway(self, io, extra=''): - # have the subprocess use the same PYTHONPATH and py lib - x = py.path.local(py.__file__).dirpath().dirpath() - ppath = os.environ.get('PYTHONPATH', '') - plist = [str(x)] + ppath.split(':') - s = "\n".join([extra, - "import sys ; sys.path[:0] = %r" % (plist,), - "import os ; os.environ['PYTHONPATH'] = %r" % ppath, - inspect.getsource(stdouterrin_setnull), - "stdouterrin_setnull()", - "" - ]) - super(PopenGateway, self)._remote_bootstrap_gateway(io, s) - -class SocketGateway(InitiatingGateway): - """ This Gateway provides interaction with a remote process - by connecting to a specified socket. On the remote - side you need to manually start a small script - (py/execnet/script/socketserver.py) that accepts - SocketGateway connections. - """ - def __init__(self, host, port): - """ instantiate a gateway to a process accessed - via a host/port specified socket. - """ - self.host = host = str(host) - self.port = port = int(port) - self.remoteaddress = '%s:%d' % (self.host, self.port) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sock.connect((host, port)) - except socket.gaierror: - raise HostNotFound(str(sys.exc_info()[1])) - io = SocketIO(sock) - super(SocketGateway, self).__init__(io=io) - - def new_remote(cls, gateway, hostport=None): - """ return a new (connected) socket gateway, instatiated - indirectly through the given 'gateway'. - """ - if hostport is None: - host, port = ('', 0) # XXX works on all platforms? - else: - host, port = hostport - mydir = py.path.local(__file__).dirpath() - socketserverbootstrap = py.code.Source( - mydir.join('script', 'socketserver.py').read('r'), """ - import socket - sock = bind_and_listen((%r, %r)) - port = sock.getsockname() - channel.send(port) - startserver(sock) - """ % (host, port) - ) - # execute the above socketserverbootstrap on the other side - channel = gateway.remote_exec(socketserverbootstrap) - (realhost, realport) = channel.receive() - #gateway._trace("new_remote received" - # "port=%r, hostname = %r" %(realport, hostname)) - return py.execnet.SocketGateway(host, realport) - new_remote = classmethod(new_remote) - -class HostNotFound(Exception): - pass - -class SshGateway(PopenCmdGateway): - """ This Gateway provides interaction with a remote Python process, - established via the 'ssh' command line binary. - The remote side needs to have a Python interpreter executable. - """ - - def __init__(self, sshaddress, remotepython=None, ssh_config=None): - """ instantiate a remote ssh process with the - given 'sshaddress' and remotepython version. - you may specify an ssh_config file. - """ - self.remoteaddress = sshaddress - if remotepython is None: - remotepython = "python" - args = ['ssh', '-C' ] - if ssh_config is not None: - args.extend(['-F', str(ssh_config)]) - remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline) - args.extend([sshaddress, remotecmd]) - super(SshGateway, self).__init__(args) - - def _remote_bootstrap_gateway(self, io, s=""): - extra = "\n".join([ - str(py.code.Source(stdouterrin_setnull)), - "stdouterrin_setnull()", - s, - ]) - try: - super(SshGateway, self)._remote_bootstrap_gateway(io, extra) - except EOFError: - ret = self._popen.wait() - if ret == 255: - raise HostNotFound(self.remoteaddress) - -def stdouterrin_setnull(): - """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null. - note that this function may run remotely without py lib support. - """ - # complete confusion (this is independent from the sys.stdout - # and sys.stderr redirection that gateway.remote_exec() can do) - # note that we redirect fd 2 on win too, since for some reason that - # blocks there, while it works (sending to stderr if possible else - # ignoring) on *nix - import sys, os - if not hasattr(os, 'dup'): # jython - return - try: - devnull = os.devnull - except AttributeError: - if os.name == 'nt': - devnull = 'NUL' - else: - devnull = '/dev/null' - # stdin - sys.stdin = os.fdopen(os.dup(0), 'r', 1) - fd = os.open(devnull, os.O_RDONLY) - os.dup2(fd, 0) - os.close(fd) - - # stdout - sys.stdout = os.fdopen(os.dup(1), 'w', 1) - fd = os.open(devnull, os.O_WRONLY) - os.dup2(fd, 1) - - # stderr for win32 - if os.name == 'nt': - sys.stderr = os.fdopen(os.dup(2), 'w', 1) - os.dup2(fd, 2) - os.close(fd) diff --git a/py/execnet/gateway_base.py b/py/execnet/gateway_base.py deleted file mode 100644 index fbb9ff8e7ee..00000000000 --- a/py/execnet/gateway_base.py +++ /dev/null @@ -1,757 +0,0 @@ -""" -base execnet gateway code, a quick overview. - -the code of this module is sent to the "other side" -as a means of bootstrapping a Gateway object -capable of receiving and executing code, -and routing data through channels. - -Gateways operate on InputOutput objects offering -a write and a read(n) method. - -Once bootstrapped a higher level protocol -based on Messages is used. Messages are serialized -to and from InputOutput objects. The details of this protocol -are locally defined in this module. There is no need -for standardizing or versioning the protocol. - -After bootstrapping the BaseGateway opens a receiver thread which -accepts encoded messages and triggers actions to interpret them. -Sending of channel data items happens directly through -write operations to InputOutput objects so there is no -separate thread. - -Code execution messages are put into an execqueue from -which they will be taken for execution. gateway.serve() -will take and execute such items, one by one. This means -that by incoming default execution is single-threaded. - -The receiver thread terminates if the remote side sends -a gateway termination message or if the IO-connection drops. -It puts an end symbol into the execqueue so -that serve() can cleanly finish as well. - -(C) 2004-2009 Holger Krekel, Armin Rigo and others -""" -import sys, os, weakref -import threading, traceback, socket, struct -try: - import queue -except ImportError: - import Queue as queue - -if sys.version_info > (3, 0): - exec("""def do_exec(co, loc): - exec(co, loc)""") - unicode = str -else: - exec("""def do_exec(co, loc): - exec co in loc""") - bytes = str - - -def str(*args): - raise EnvironmentError( - "use unicode or bytes, not cross-python ambigous 'str'") - -default_encoding = "UTF-8" -sysex = (KeyboardInterrupt, SystemExit) - -debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w') - - -# ___________________________________________________________________________ -# -# input output classes -# ___________________________________________________________________________ - -class SocketIO: - server_stmt = "io = SocketIO(clientsock)" - - error = (socket.error, EOFError) - def __init__(self, sock): - self.sock = sock - try: - sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY - except socket.error: - e = sys.exc_info()[1] - sys.stderr.write("WARNING: cannot set socketoption") - self.readable = self.writeable = True - - def read(self, numbytes): - "Read exactly 'bytes' bytes from the socket." - buf = bytes() - while len(buf) < numbytes: - t = self.sock.recv(numbytes - len(buf)) - if not t: - raise EOFError - buf += t - return buf - - def write(self, data): - assert isinstance(data, bytes) - self.sock.sendall(data) - - def close_read(self): - if self.readable: - try: - self.sock.shutdown(0) - except socket.error: - pass - self.readable = None - def close_write(self): - if self.writeable: - try: - self.sock.shutdown(1) - except socket.error: - pass - self.writeable = None - -class Popen2IO: - server_stmt = """ -import os, sys, tempfile -io = Popen2IO(sys.stdout, sys.stdin) -sys.stdout = tempfile.TemporaryFile('w') -sys.stdin = tempfile.TemporaryFile('r') -""" - error = (IOError, OSError, EOFError) - - def __init__(self, outfile, infile): - # we need raw byte streams - self.outfile, self.infile = outfile, infile - if sys.platform == "win32": - import msvcrt - msvcrt.setmode(infile.fileno(), os.O_BINARY) - msvcrt.setmode(outfile.fileno(), os.O_BINARY) - self.readable = self.writeable = True - - def read(self, numbytes): - """Read exactly 'numbytes' bytes from the pipe. """ - try: - data = self.infile.buffer.read(numbytes) - except AttributeError: - data = self.infile.read(numbytes) - if len(data) < numbytes: - raise EOFError - return data - - def write(self, data): - """write out all data bytes. """ - assert isinstance(data, bytes) - try: - self.outfile.buffer.write(data) - except AttributeError: - self.outfile.write(data) - self.outfile.flush() - - def close_read(self): - if self.readable: - self.infile.close() - self.readable = None - - def close_write(self): - try: - self.outfile.close() - except EnvironmentError: - pass - self.writeable = None - -# ___________________________________________________________________________ -# -# Messages -# ___________________________________________________________________________ -# the header format -HDR_FORMAT = "!hhii" -HDR_SIZE = struct.calcsize(HDR_FORMAT) - -is3k = sys.version_info >= (3,0) - -class Message: - """ encapsulates Messages and their wire protocol. """ - _types = {} - def __init__(self, channelid=0, data=''): - self.channelid = channelid - self.data = data - - def writeto(self, io): - # XXX marshal.dumps doesn't work for exchanging data across Python - # version :-((( XXX check this statement wrt python2.4 through 3.1 - data = self.data - if isinstance(data, bytes): - dataformat = 1 + int(is3k) - else: - if isinstance(data, unicode): - dataformat = 3 - else: - data = repr(self.data) # argh - dataformat = 4 - data = data.encode(default_encoding) - header = struct.pack(HDR_FORMAT, self.msgtype, dataformat, - self.channelid, len(data)) - io.write(header + data) - - def readfrom(cls, io): - header = io.read(HDR_SIZE) - (msgtype, dataformat, - senderid, stringlen) = struct.unpack(HDR_FORMAT, header) - data = io.read(stringlen) - if dataformat == 1: - if is3k: - # remote was python2-str, we are 3k-text - data = data.decode(default_encoding) - elif dataformat == 2: - # remote was python3-bytes - pass - else: - data = data.decode(default_encoding) - if dataformat == 3: - pass - elif dataformat == 4: - data = eval(data, {}) # reversed argh - else: - raise ValueError("bad data format") - return cls._types[msgtype](senderid, data) - readfrom = classmethod(readfrom) - - def __repr__(self): - r = repr(self.data) - if len(r) > 50: - return "" %(self.__class__.__name__, - self.channelid, len(r)) - else: - return "" %(self.__class__.__name__, - self.channelid, self.data) - -def _setupmessages(): - class CHANNEL_OPEN(Message): - def received(self, gateway): - channel = gateway._channelfactory.new(self.channelid) - gateway._local_schedulexec(channel=channel, sourcetask=self.data) - - class CHANNEL_NEW(Message): - def received(self, gateway): - """ receive a remotely created new (sub)channel. """ - newid = self.data - newchannel = gateway._channelfactory.new(newid) - gateway._channelfactory._local_receive(self.channelid, newchannel) - - class CHANNEL_DATA(Message): - def received(self, gateway): - gateway._channelfactory._local_receive(self.channelid, self.data) - - class CHANNEL_CLOSE(Message): - def received(self, gateway): - gateway._channelfactory._local_close(self.channelid) - - class CHANNEL_CLOSE_ERROR(Message): - def received(self, gateway): - remote_error = gateway._channelfactory.RemoteError(self.data) - gateway._channelfactory._local_close(self.channelid, remote_error) - - class CHANNEL_LAST_MESSAGE(Message): - def received(self, gateway): - gateway._channelfactory._local_close(self.channelid, sendonly=True) - - classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA, - CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE] - - for i, cls in enumerate(classes): - Message._types[i] = cls - cls.msgtype = i - setattr(Message, cls.__name__, cls) - -_setupmessages() - -def geterrortext(excinfo): - try: - l = traceback.format_exception(*excinfo) - errortext = "".join(l) - except sysex: - raise - except: - errortext = '%s: %s' % (excinfo[0].__name__, - excinfo[1]) - return errortext - -class RemoteError(EOFError): - """ Contains an Exceptions from the other side. """ - def __init__(self, formatted): - self.formatted = formatted - EOFError.__init__(self) - - def __str__(self): - return self.formatted - - def __repr__(self): - return "%s: %s" %(self.__class__.__name__, self.formatted) - - def warn(self): - # XXX do this better - sys.stderr.write("Warning: unhandled %r\n" % (self,)) - - -NO_ENDMARKER_WANTED = object() - -class Channel(object): - """Communication channel between two possibly remote threads of code. """ - RemoteError = RemoteError - - def __init__(self, gateway, id): - assert isinstance(id, int) - self.gateway = gateway - self.id = id - self._items = queue.Queue() - self._closed = False - self._receiveclosed = threading.Event() - self._remoteerrors = [] - - def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): - # we first execute the callback on all already received - # items. We need to hold the receivelock to prevent - # race conditions with newly arriving items. - # after having cleared the queue we register - # the callback only if the channel is not closed already. - _callbacks = self.gateway._channelfactory._callbacks - _receivelock = self.gateway._receivelock - _receivelock.acquire() - try: - if self._items is None: - raise IOError("%r has callback already registered" %(self,)) - items = self._items - self._items = None - while 1: - try: - olditem = items.get(block=False) - except queue.Empty: - if not (self._closed or self._receiveclosed.isSet()): - _callbacks[self.id] = (callback, endmarker) - break - else: - if olditem is ENDMARKER: - items.put(olditem) # for other receivers - if endmarker is not NO_ENDMARKER_WANTED: - callback(endmarker) - break - else: - callback(olditem) - finally: - _receivelock.release() - - def __repr__(self): - flag = self.isclosed() and "closed" or "open" - return "" % (self.id, flag) - - def __del__(self): - if self.gateway is None: # can be None in tests - return - self.gateway._trace("Channel(%d).__del__" % self.id) - # no multithreading issues here, because we have the last ref to 'self' - if self._closed: - # state transition "closed" --> "deleted" - for error in self._remoteerrors: - error.warn() - elif self._receiveclosed.isSet(): - # state transition "sendonly" --> "deleted" - # the remote channel is already in "deleted" state, nothing to do - pass - else: - # state transition "opened" --> "deleted" - if self._items is None: # has_callback - Msg = Message.CHANNEL_LAST_MESSAGE - else: - Msg = Message.CHANNEL_CLOSE - self.gateway._send(Msg(self.id)) - - def _getremoteerror(self): - try: - return self._remoteerrors.pop(0) - except IndexError: - return None - - # - # public API for channel objects - # - def isclosed(self): - """ return True if the channel is closed. A closed - channel may still hold items. - """ - return self._closed - - def makefile(self, mode='w', proxyclose=False): - """ return a file-like object. - mode: 'w' for writes, 'r' for reads - proxyclose: if true file.close() will - trigger a channel.close() call. - """ - if mode == "w": - return ChannelFileWrite(channel=self, proxyclose=proxyclose) - elif mode == "r": - return ChannelFileRead(channel=self, proxyclose=proxyclose) - raise ValueError("mode %r not availabe" %(mode,)) - - def close(self, error=None): - """ close down this channel on both sides. """ - if not self._closed: - # state transition "opened/sendonly" --> "closed" - # threads warning: the channel might be closed under our feet, - # but it's never damaging to send too many CHANNEL_CLOSE messages - put = self.gateway._send - if error is not None: - put(Message.CHANNEL_CLOSE_ERROR(self.id, error)) - else: - put(Message.CHANNEL_CLOSE(self.id)) - if isinstance(error, RemoteError): - self._remoteerrors.append(error) - self._closed = True # --> "closed" - self._receiveclosed.set() - queue = self._items - if queue is not None: - queue.put(ENDMARKER) - self.gateway._channelfactory._no_longer_opened(self.id) - - def waitclose(self, timeout=None): - """ wait until this channel is closed (or the remote side - otherwise signalled that no more data was being sent). - The channel may still hold receiveable items, but not receive - more. waitclose() reraises exceptions from executing code on - the other side as channel.RemoteErrors containing a a textual - representation of the remote traceback. - """ - self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state - if not self._receiveclosed.isSet(): - raise IOError("Timeout") - error = self._getremoteerror() - if error: - raise error - - def send(self, item): - """sends the given item to the other side of the channel, - possibly blocking if the sender queue is full. - Note that an item needs to be marshallable. - """ - if self.isclosed(): - raise IOError("cannot send to %r" %(self,)) - if isinstance(item, Channel): - data = Message.CHANNEL_NEW(self.id, item.id) - else: - data = Message.CHANNEL_DATA(self.id, item) - self.gateway._send(data) - - def receive(self): - """receives an item that was sent from the other side, - possibly blocking if there is none. - Note that exceptions from the other side will be - reraised as channel.RemoteError exceptions containing - a textual representation of the remote traceback. - """ - queue = self._items - if queue is None: - raise IOError("calling receive() on channel with receiver callback") - x = queue.get() - if x is ENDMARKER: - queue.put(x) # for other receivers - raise self._getremoteerror() or EOFError() - else: - return x - - def __iter__(self): - return self - - def next(self): - try: - return self.receive() - except EOFError: - raise StopIteration - __next__ = next - -ENDMARKER = object() - -class ChannelFactory(object): - RemoteError = RemoteError - - def __init__(self, gateway, startcount=1): - self._channels = weakref.WeakValueDictionary() - self._callbacks = {} - self._writelock = threading.Lock() - self.gateway = gateway - self.count = startcount - self.finished = False - - def new(self, id=None): - """ create a new Channel with 'id' (or create new id if None). """ - self._writelock.acquire() - try: - if self.finished: - raise IOError("connexion already closed: %s" % (self.gateway,)) - if id is None: - id = self.count - self.count += 2 - channel = Channel(self.gateway, id) - self._channels[id] = channel - return channel - finally: - self._writelock.release() - - def channels(self): - return list(self._channels.values()) - - # - # internal methods, called from the receiver thread - # - def _no_longer_opened(self, id): - try: - del self._channels[id] - except KeyError: - pass - try: - callback, endmarker = self._callbacks.pop(id) - except KeyError: - pass - else: - if endmarker is not NO_ENDMARKER_WANTED: - callback(endmarker) - - def _local_close(self, id, remoteerror=None, sendonly=False): - channel = self._channels.get(id) - if channel is None: - # channel already in "deleted" state - if remoteerror: - remoteerror.warn() - else: - # state transition to "closed" state - if remoteerror: - channel._remoteerrors.append(remoteerror) - if not sendonly: # otherwise #--> "sendonly" - channel._closed = True # --> "closed" - channel._receiveclosed.set() - queue = channel._items - if queue is not None: - queue.put(ENDMARKER) - self._no_longer_opened(id) - - def _local_receive(self, id, data): - # executes in receiver thread - try: - callback, endmarker = self._callbacks[id] - except KeyError: - channel = self._channels.get(id) - queue = channel and channel._items - if queue is None: - pass # drop data - else: - queue.put(data) - else: - callback(data) # even if channel may be already closed - - def _finished_receiving(self): - self._writelock.acquire() - try: - self.finished = True - finally: - self._writelock.release() - for id in list(self._channels): - self._local_close(id, sendonly=True) - for id in list(self._callbacks): - self._no_longer_opened(id) - -class ChannelFile(object): - def __init__(self, channel, proxyclose=True): - self.channel = channel - self._proxyclose = proxyclose - - def close(self): - if self._proxyclose: - self.channel.close() - - def __repr__(self): - state = self.channel.isclosed() and 'closed' or 'open' - return '' %(self.channel.id, state) - -class ChannelFileWrite(ChannelFile): - def write(self, out): - self.channel.send(out) - - def flush(self): - pass - -class ChannelFileRead(ChannelFile): - def __init__(self, channel, proxyclose=True): - super(ChannelFileRead, self).__init__(channel, proxyclose) - self._buffer = "" - - def read(self, n): - while len(self._buffer) < n: - try: - self._buffer += self.channel.receive() - except EOFError: - self.close() - break - ret = self._buffer[:n] - self._buffer = self._buffer[n:] - return ret - - def readline(self): - i = self._buffer.find("\n") - if i != -1: - return self.read(i+1) - line = self.read(len(self._buffer)+1) - while line and line[-1] != "\n": - c = self.read(1) - if not c: - break - line += c - return line - -class BaseGateway(object): - exc_info = sys.exc_info - - class _StopExecLoop(Exception): - pass - - def __init__(self, io, _startcount=2): - """ initialize core gateway, using the given inputoutput object. - """ - self._io = io - self._channelfactory = ChannelFactory(self, _startcount) - self._receivelock = threading.RLock() - - def _initreceive(self): - self._receiverthread = threading.Thread(name="receiver", - target=self._thread_receiver) - self._receiverthread.setDaemon(1) - self._receiverthread.start() - - def _trace(self, msg): - if debug: - try: - debug.write(unicode(msg) + "\n") - debug.flush() - except sysex: - raise - except: - sys.stderr.write("exception during tracing\n") - - def _thread_receiver(self): - """ thread to read and handle Messages half-sync-half-async. """ - self._trace("starting to receive") - try: - while 1: - try: - msg = Message.readfrom(self._io) - self._trace("received <- %r" % msg) - _receivelock = self._receivelock - _receivelock.acquire() - try: - msg.received(self) - finally: - _receivelock.release() - except sysex: - break - except EOFError: - break - except: - self._trace(geterrortext(self.exc_info())) - break - finally: - # XXX we need to signal fatal error states to - # channels/callbacks, particularly ones - # where the other side just died. - self._stopexec() - try: - self._stopsend() - except IOError: - self._trace('IOError on _stopsend()') - self._channelfactory._finished_receiving() - if threading: # might be None during shutdown/finalization - self._trace('leaving %r' % threading.currentThread()) - - def _send(self, msg): - if msg is None: - self._io.close_write() - else: - try: - msg.writeto(self._io) - except: - excinfo = self.exc_info() - self._trace(geterrortext(excinfo)) - else: - self._trace('sent -> %r' % msg) - - def _stopsend(self): - self._send(None) - - def _stopexec(self): - pass - - def _local_schedulexec(self, channel, sourcetask): - channel.close("execution disallowed") - - # _____________________________________________________________________ - # - # High Level Interface - # _____________________________________________________________________ - # - def newchannel(self): - """ return new channel object. """ - return self._channelfactory.new() - - def join(self, joinexec=True): - """ Wait for all IO (and by default all execution activity) - to stop. the joinexec parameter is obsolete. - """ - current = threading.currentThread() - if self._receiverthread.isAlive(): - self._trace("joining receiver thread") - self._receiverthread.join() - -class SlaveGateway(BaseGateway): - def _stopexec(self): - self._execqueue.put(None) - - def _local_schedulexec(self, channel, sourcetask): - self._execqueue.put((channel, sourcetask)) - - def serve(self, joining=True): - self._execqueue = queue.Queue() - self._initreceive() - try: - while 1: - item = self._execqueue.get() - if item is None: - self._stopsend() - break - try: - self.executetask(item) - except self._StopExecLoop: - break - finally: - self._trace("serve") - if joining: - self.join() - - def executetask(self, item): - """ execute channel/source items. """ - channel, source = item - try: - loc = { 'channel' : channel, '__name__': '__channelexec__'} - #open("task.py", 'w').write(source) - self._trace("execution starts: %s" % repr(source)[:50]) - try: - co = compile(source+'\n', '', 'exec') - do_exec(co, loc) - finally: - self._trace("execution finished") - except sysex: - pass - except self._StopExecLoop: - channel.close() - raise - except: - excinfo = self.exc_info() - self._trace("got exception %s" % excinfo[1]) - errortext = geterrortext(excinfo) - channel.close(errortext) - else: - channel.close() - diff --git a/py/execnet/multi.py b/py/execnet/multi.py deleted file mode 100644 index 0c8009136cb..00000000000 --- a/py/execnet/multi.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -Support for working with multiple channels and gateways - -(c) 2008-2009, Holger Krekel and others -""" -import py -try: - import queue -except ImportError: - import Queue as queue - -NO_ENDMARKER_WANTED = object() - -class MultiGateway: - def __init__(self, gateways): - self.gateways = gateways - def remote_exec(self, source): - channels = [] - for gw in self.gateways: - channels.append(gw.remote_exec(source)) - return MultiChannel(channels) - def exit(self): - for gw in self.gateways: - gw.exit() - -class MultiChannel: - def __init__(self, channels): - self._channels = channels - - def send_each(self, item): - for ch in self._channels: - ch.send(item) - - def receive_each(self, withchannel=False): - assert not hasattr(self, '_queue') - l = [] - for ch in self._channels: - obj = ch.receive() - if withchannel: - l.append((ch, obj)) - else: - l.append(obj) - return l - - def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): - try: - return self._queue - except AttributeError: - self._queue = queue.Queue() - for ch in self._channels: - def putreceived(obj, channel=ch): - self._queue.put((channel, obj)) - if endmarker is NO_ENDMARKER_WANTED: - ch.setcallback(putreceived) - else: - ch.setcallback(putreceived, endmarker=endmarker) - return self._queue - - - def waitclose(self): - first = None - for ch in self._channels: - try: - ch.waitclose() - except ch.RemoteError: - if first is None: - first = py.std.sys.exc_info() - if first: - py.builtin._reraise(first[0], first[1], first[2]) - - diff --git a/py/execnet/rsync.py b/py/execnet/rsync.py deleted file mode 100644 index f2fceb2afbf..00000000000 --- a/py/execnet/rsync.py +++ /dev/null @@ -1,201 +0,0 @@ -""" -1:N rsync implemenation on top of execnet. - -(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski -""" -import py, os, stat - -md5 = py.builtin._tryimport('hashlib', 'md5').md5 -Queue = py.builtin._tryimport('queue', 'Queue').Queue - -class RSync(object): - """ This class allows to send a directory structure (recursively) - to one or multiple remote filesystems. - - There is limited support for symlinks, which means that symlinks - pointing to the sourcetree will be send "as is" while external - symlinks will be just copied (regardless of existance of such - a path on remote side). - """ - def __init__(self, sourcedir, callback=None, verbose=True): - self._sourcedir = str(sourcedir) - self._verbose = verbose - assert callback is None or py.builtin.callable(callback) - self._callback = callback - self._channels = {} - self._receivequeue = Queue() - self._links = [] - - def filter(self, path): - return True - - def _end_of_channel(self, channel): - if channel in self._channels: - # too early! we must have got an error - channel.waitclose() - # or else we raise one - raise IOError('connection unexpectedly closed: %s ' % ( - channel.gateway,)) - - def _process_link(self, channel): - for link in self._links: - channel.send(link) - # completion marker, this host is done - channel.send(42) - - def _done(self, channel): - """ Call all callbacks - """ - finishedcallback = self._channels.pop(channel) - if finishedcallback: - finishedcallback() - - def _list_done(self, channel): - # sum up all to send - if self._callback: - s = sum([self._paths[i] for i in self._to_send[channel]]) - self._callback("list", s, channel) - - def _send_item(self, channel, data): - """ Send one item - """ - modified_rel_path, checksum = data - modifiedpath = os.path.join(self._sourcedir, *modified_rel_path) - try: - f = open(modifiedpath, 'rb') - data = f.read() - except IOError: - data = None - - # provide info to progress callback function - modified_rel_path = "/".join(modified_rel_path) - if data is not None: - self._paths[modified_rel_path] = len(data) - else: - self._paths[modified_rel_path] = 0 - if channel not in self._to_send: - self._to_send[channel] = [] - self._to_send[channel].append(modified_rel_path) - #print "sending", modified_rel_path, data and len(data) or 0, checksum - - if data is not None: - f.close() - if checksum is not None and checksum == md5(data).digest(): - data = None # not really modified - else: - self._report_send_file(channel.gateway, modified_rel_path) - channel.send(data) - - def _report_send_file(self, gateway, modified_rel_path): - if self._verbose: - print("%s <= %s" %(gateway, modified_rel_path)) - - def send(self, raises=True): - """ Sends a sourcedir to all added targets. Flag indicates - whether to raise an error or return in case of lack of - targets - """ - if not self._channels: - if raises: - raise IOError("no targets available, maybe you " - "are trying call send() twice?") - return - # normalize a trailing '/' away - self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x')) - # send directory structure and file timestamps/sizes - self._send_directory_structure(self._sourcedir) - - # paths and to_send are only used for doing - # progress-related callbacks - self._paths = {} - self._to_send = {} - - # send modified file to clients - while self._channels: - channel, req = self._receivequeue.get() - if req is None: - self._end_of_channel(channel) - else: - command, data = req - if command == "links": - self._process_link(channel) - elif command == "done": - self._done(channel) - elif command == "ack": - if self._callback: - self._callback("ack", self._paths[data], channel) - elif command == "list_done": - self._list_done(channel) - elif command == "send": - self._send_item(channel, data) - del data - else: - assert "Unknown command %s" % command - - def add_target(self, gateway, destdir, - finishedcallback=None, **options): - """ Adds a remote target specified via a 'gateway' - and a remote destination directory. - """ - assert finishedcallback is None or py.builtin.callable(finishedcallback) - for name in options: - assert name in ('delete',) - def itemcallback(req): - self._receivequeue.put((channel, req)) - channel = gateway.remote_exec(REMOTE_SOURCE) - channel.setcallback(itemcallback, endmarker = None) - channel.send((str(destdir), options)) - self._channels[channel] = finishedcallback - - def _broadcast(self, msg): - for channel in self._channels: - channel.send(msg) - - def _send_link(self, basename, linkpoint): - self._links.append(("link", basename, linkpoint)) - - def _send_directory(self, path): - # dir: send a list of entries - names = [] - subpaths = [] - for name in os.listdir(path): - p = os.path.join(path, name) - if self.filter(p): - names.append(name) - subpaths.append(p) - self._broadcast(names) - for p in subpaths: - self._send_directory_structure(p) - - def _send_link_structure(self, path): - linkpoint = os.readlink(path) - basename = path[len(self._sourcedir) + 1:] - if not linkpoint.startswith(os.sep): - # relative link, just send it - # XXX: do sth with ../ links - self._send_link(basename, linkpoint) - elif linkpoint.startswith(self._sourcedir): - self._send_link(basename, linkpoint[len(self._sourcedir) + 1:]) - else: - self._send_link(basename, linkpoint) - self._broadcast(None) - - def _send_directory_structure(self, path): - try: - st = os.lstat(path) - except OSError: - self._broadcast((0, 0)) - return - if stat.S_ISREG(st.st_mode): - # regular file: send a timestamp/size pair - self._broadcast((st.st_mtime, st.st_size)) - elif stat.S_ISDIR(st.st_mode): - self._send_directory(path) - elif stat.S_ISLNK(st.st_mode): - self._send_link_structure(path) - else: - raise ValueError("cannot sync %r" % (path,)) - -REMOTE_SOURCE = py.path.local(__file__).dirpath().\ - join('rsync_remote.py').open().read() + "\nf()" - diff --git a/py/execnet/rsync_remote.py b/py/execnet/rsync_remote.py deleted file mode 100644 index 945cb820481..00000000000 --- a/py/execnet/rsync_remote.py +++ /dev/null @@ -1,92 +0,0 @@ -def f(): - import os, stat, shutil - try: - from hashlib import md5 - except ImportError: - from md5 import md5 - destdir, options = channel.receive() - modifiedfiles = [] - - def remove(path): - assert path.startswith(destdir) - try: - os.unlink(path) - except OSError: - # assume it's a dir - shutil.rmtree(path) - - def receive_directory_structure(path, relcomponents): - try: - st = os.lstat(path) - except OSError: - st = None - msg = channel.receive() - if isinstance(msg, list): - if st and not stat.S_ISDIR(st.st_mode): - os.unlink(path) - st = None - if not st: - os.makedirs(path) - entrynames = {} - for entryname in msg: - receive_directory_structure(os.path.join(path, entryname), - relcomponents + [entryname]) - entrynames[entryname] = True - if options.get('delete'): - for othername in os.listdir(path): - if othername not in entrynames: - otherpath = os.path.join(path, othername) - remove(otherpath) - elif msg is not None: - checksum = None - if st: - if stat.S_ISREG(st.st_mode): - msg_mtime, msg_size = msg - if msg_size != st.st_size: - pass - elif msg_mtime != st.st_mtime: - f = open(path, 'rb') - checksum = md5(f.read()).digest() - f.close() - else: - return # already fine - else: - remove(path) - channel.send(("send", (relcomponents, checksum))) - modifiedfiles.append((path, msg)) - receive_directory_structure(destdir, []) - - STRICT_CHECK = False # seems most useful this way for py.test - channel.send(("list_done", None)) - - for path, (time, size) in modifiedfiles: - data = channel.receive() - channel.send(("ack", path[len(destdir) + 1:])) - if data is not None: - if STRICT_CHECK and len(data) != size: - raise IOError('file modified during rsync: %r' % (path,)) - f = open(path, 'wb') - f.write(data) - f.close() - try: - os.utime(path, (time, time)) - except OSError: - pass - del data - channel.send(("links", None)) - - msg = channel.receive() - while msg is not 42: - # we get symlink - _type, relpath, linkpoint = msg - assert _type == "link" - path = os.path.join(destdir, relpath) - try: - remove(path) - except OSError: - pass - - os.symlink(os.path.join(destdir, linkpoint), path) - msg = channel.receive() - channel.send(("done", None)) - diff --git a/py/execnet/script/__init__.py b/py/execnet/script/__init__.py deleted file mode 100644 index 792d6005489..00000000000 --- a/py/execnet/script/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/py/execnet/script/loop_socketserver.py b/py/execnet/script/loop_socketserver.py deleted file mode 100644 index 44896b67b25..00000000000 --- a/py/execnet/script/loop_socketserver.py +++ /dev/null @@ -1,14 +0,0 @@ - -import os, sys -import subprocess - -if __name__ == '__main__': - directory = os.path.dirname(os.path.abspath(sys.argv[0])) - script = os.path.join(directory, 'socketserver.py') - while 1: - cmdlist = ["python", script] - cmdlist.extend(sys.argv[1:]) - text = "starting subcommand: " + " ".join(cmdlist) - print(text) - process = subprocess.Popen(cmdlist) - process.wait() diff --git a/py/execnet/script/quitserver.py b/py/execnet/script/quitserver.py deleted file mode 100644 index 5b7ebdb9d58..00000000000 --- a/py/execnet/script/quitserver.py +++ /dev/null @@ -1,16 +0,0 @@ -""" - - send a "quit" signal to a remote server - -""" - -import sys -import socket - -hostport = sys.argv[1] -host, port = hostport.split(':') -hostport = (host, int(port)) - -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(hostport) -sock.sendall('"raise KeyboardInterrupt"\n') diff --git a/py/execnet/script/shell.py b/py/execnet/script/shell.py deleted file mode 100755 index 9196f419f54..00000000000 --- a/py/execnet/script/shell.py +++ /dev/null @@ -1,85 +0,0 @@ -#! /usr/bin/env python -""" -a remote python shell - -for injection into startserver.py -""" -import sys, os, socket, select - -try: - clientsock -except NameError: - print("client side starting") - import sys - host, port = sys.argv[1].split(':') - port = int(port) - myself = open(os.path.abspath(sys.argv[0]), 'rU').read() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - sock.sendall(repr(myself)+'\n') - print("send boot string") - inputlist = [ sock, sys.stdin ] - try: - while 1: - r,w,e = select.select(inputlist, [], []) - if sys.stdin in r: - line = raw_input() - sock.sendall(line + '\n') - if sock in r: - line = sock.recv(4096) - sys.stdout.write(line) - sys.stdout.flush() - except: - import traceback - print(traceback.print_exc()) - - sys.exit(1) - -print("server side starting") -# server side -# -from traceback import print_exc -from threading import Thread - -class promptagent(Thread): - def __init__(self, clientsock): - Thread.__init__(self) - self.clientsock = clientsock - - def run(self): - print("Entering thread prompt loop") - clientfile = self.clientsock.makefile('w') - - filein = self.clientsock.makefile('r') - loc = self.clientsock.getsockname() - - while 1: - try: - clientfile.write('%s %s >>> ' % loc) - clientfile.flush() - line = filein.readline() - if len(line)==0: raise EOFError("nothing") - #print >>sys.stderr,"got line: " + line - if line.strip(): - oldout, olderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = clientfile, clientfile - try: - try: - exec(compile(line + '\n','', 'single')) - except: - print_exc() - finally: - sys.stdout=oldout - sys.stderr=olderr - clientfile.flush() - except EOFError: - e = sys.exc_info()[1] - sys.stderr.write("connection close, prompt thread returns") - break - #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info())) - - self.clientsock.close() - -prompter = promptagent(clientsock) -prompter.start() -print("promptagent - thread started") diff --git a/py/execnet/script/socketserver.py b/py/execnet/script/socketserver.py deleted file mode 100755 index c8e3a2e76bb..00000000000 --- a/py/execnet/script/socketserver.py +++ /dev/null @@ -1,102 +0,0 @@ -#! /usr/bin/env python - -""" - start socket based minimal readline exec server -""" -# this part of the program only executes on the server side -# - -progname = 'socket_readline_exec_server-1.2' - -import sys, socket, os -try: - import fcntl -except ImportError: - fcntl = None - -debug = 0 - -if debug: # and not os.isatty(sys.stdin.fileno()): - f = open('/tmp/execnet-socket-pyout.log', 'w') - old = sys.stdout, sys.stderr - sys.stdout = sys.stderr = f - #import py - #compile = py.code.compile - -def print_(*args): - print(" ".join(str(arg) for arg in args)) - -if sys.version_info > (3, 0): - exec("""def exec_(source, locs): - exec(source, locs)""") -else: - exec("""def exec_(source, locs): - exec source in locs""") - -def exec_from_one_connection(serversock): - print_(progname, 'Entering Accept loop', serversock.getsockname()) - clientsock,address = serversock.accept() - print_(progname, 'got new connection from %s %s' % address) - clientfile = clientsock.makefile('rb') - print_("reading line") - # rstrip so that we can use \r\n for telnet testing - source = clientfile.readline().rstrip() - clientfile.close() - g = {'clientsock' : clientsock, 'address' : address} - source = eval(source) - if source: - co = compile(source+'\n', source, 'exec') - print_(progname, 'compiled source, executing') - try: - exec_(co, g) - finally: - print_(progname, 'finished executing code') - # background thread might hold a reference to this (!?) - #clientsock.close() - -def bind_and_listen(hostport): - if isinstance(hostport, str): - host, port = hostport.split(':') - hostport = (host, int(port)) - serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # set close-on-exec - if hasattr(fcntl, 'FD_CLOEXEC'): - old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD) - fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) - # allow the address to be re-used in a reasonable amount of time - if os.name == 'posix' and sys.platform != 'cygwin': - serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - serversock.bind(hostport) - serversock.listen(5) - return serversock - -def startserver(serversock, loop=False): - try: - while 1: - try: - exec_from_one_connection(serversock) - except (KeyboardInterrupt, SystemExit): - raise - except: - if debug: - import traceback - traceback.print_exc() - else: - excinfo = sys.exc_info() - print_("got exception", excinfo[1]) - if not loop: - break - finally: - print_("leaving socketserver execloop") - serversock.shutdown(2) - -if __name__ == '__main__': - import sys - if len(sys.argv)>1: - hostport = sys.argv[1] - else: - hostport = ':8888' - serversock = bind_and_listen(hostport) - startserver(serversock, loop=False) - diff --git a/py/execnet/script/socketserverservice.py b/py/execnet/script/socketserverservice.py deleted file mode 100644 index e1d6cab7acc..00000000000 --- a/py/execnet/script/socketserverservice.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -A windows service wrapper for the py.execnet socketserver. - -To use, run: - python socketserverservice.py register - net start ExecNetSocketServer -""" - -import sys -import os -import time -import win32serviceutil -import win32service -import win32event -import win32evtlogutil -import servicemanager -import threading -import socketserver - - -appname = 'ExecNetSocketServer' - - -class SocketServerService(win32serviceutil.ServiceFramework): - _svc_name_ = appname - _svc_display_name_ = "%s" % appname - _svc_deps_ = ["EventLog"] - def __init__(self, args): - # The exe-file has messages for the Event Log Viewer. - # Register the exe-file as event source. - # - # Probably it would be better if this is done at installation time, - # so that it also could be removed if the service is uninstalled. - # Unfortunately it cannot be done in the 'if __name__ == "__main__"' - # block below, because the 'frozen' exe-file does not run this code. - # - win32evtlogutil.AddSourceToRegistry(self._svc_display_name_, - servicemanager.__file__, - "Application") - win32serviceutil.ServiceFramework.__init__(self, args) - self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) - self.WAIT_TIME = 1000 # in milliseconds - - - def SvcStop(self): - self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) - win32event.SetEvent(self.hWaitStop) - - - def SvcDoRun(self): - # Redirect stdout and stderr to prevent "IOError: [Errno 9] - # Bad file descriptor". Windows services don't have functional - # output streams. - sys.stdout = sys.stderr = open('nul', 'w') - - # Write a 'started' event to the event log... - win32evtlogutil.ReportEvent(self._svc_display_name_, - servicemanager.PYS_SERVICE_STARTED, - 0, # category - servicemanager.EVENTLOG_INFORMATION_TYPE, - (self._svc_name_, '')) - print("Begin: %s" % (self._svc_display_name_)) - - hostport = ':8888' - print('Starting py.execnet SocketServer on %s' % hostport) - serversock = socketserver.bind_and_listen(hostport) - thread = threading.Thread(target=socketserver.startserver, - args=(serversock,), - kwargs={'loop':True}) - thread.setDaemon(True) - thread.start() - - # wait to be stopped or self.WAIT_TIME to pass - while True: - result = win32event.WaitForSingleObject(self.hWaitStop, - self.WAIT_TIME) - if result == win32event.WAIT_OBJECT_0: - break - - # write a 'stopped' event to the event log. - win32evtlogutil.ReportEvent(self._svc_display_name_, - servicemanager.PYS_SERVICE_STOPPED, - 0, # category - servicemanager.EVENTLOG_INFORMATION_TYPE, - (self._svc_name_, '')) - print("End: %s" % appname) - - -if __name__ == '__main__': - # Note that this code will not be run in the 'frozen' exe-file!!! - win32serviceutil.HandleCommandLine(SocketServerService) diff --git a/py/execnet/script/xx.py b/py/execnet/script/xx.py deleted file mode 100644 index 931e4b7fad1..00000000000 --- a/py/execnet/script/xx.py +++ /dev/null @@ -1,9 +0,0 @@ -import rlcompleter2 -rlcompleter2.setup() - -import register, sys -try: - hostport = sys.argv[1] -except: - hostport = ':8888' -gw = register.ServerGateway(hostport) diff --git a/py/execnet/serializer.py b/py/execnet/serializer.py deleted file mode 100755 index 710597139f5..00000000000 --- a/py/execnet/serializer.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Simple marshal format (based on pickle) designed to work across Python versions. -""" - -import sys -import struct - -_INPY3 = _REALLY_PY3 = sys.version_info > (3, 0) - -class SerializeError(Exception): - pass - -class SerializationError(SerializeError): - """Error while serializing an object.""" - -class UnserializableType(SerializationError): - """Can't serialize a type.""" - -class UnserializationError(SerializeError): - """Error while unserializing an object.""" - -class VersionMismatch(UnserializationError): - """Data from a previous or later format.""" - -class Corruption(UnserializationError): - """The pickle format appears to have been corrupted.""" - -if _INPY3: - def b(s): - return s.encode("ascii") -else: - b = str - -FOUR_BYTE_INT_MAX = 2147483647 - -_int4_format = struct.Struct("!i") -_float_format = struct.Struct("!d") - -# Protocol constants -VERSION_NUMBER = 1 -VERSION = b(chr(VERSION_NUMBER)) -PY2STRING = b('s') -PY3STRING = b('t') -UNICODE = b('u') -BYTES = b('b') -NEWLIST = b('l') -BUILDTUPLE = b('T') -SETITEM = b('m') -NEWDICT = b('d') -INT = b('i') -FLOAT = b('f') -STOP = b('S') - -class CrossVersionOptions(object): - pass - -class Serializer(object): - - def __init__(self, stream): - self.stream = stream - - def save(self, obj): - self.stream.write(VERSION) - self._save(obj) - self.stream.write(STOP) - - def _save(self, obj): - tp = type(obj) - try: - dispatch = self.dispatch[tp] - except KeyError: - raise UnserializableType("can't serialize %s" % (tp,)) - dispatch(self, obj) - - dispatch = {} - - def save_bytes(self, bytes_): - self.stream.write(BYTES) - self._write_byte_sequence(bytes_) - dispatch[bytes] = save_bytes - - if _INPY3: - def save_string(self, s): - self.stream.write(PY3STRING) - self._write_unicode_string(s) - else: - def save_string(self, s): - self.stream.write(PY2STRING) - self._write_byte_sequence(s) - - def save_unicode(self, s): - self.stream.write(UNICODE) - self._write_unicode_string(s) - dispatch[unicode] = save_unicode - dispatch[str] = save_string - - def _write_unicode_string(self, s): - try: - as_bytes = s.encode("utf-8") - except UnicodeEncodeError: - raise SerializationError("strings must be utf-8 encodable") - self._write_byte_sequence(as_bytes) - - def _write_byte_sequence(self, bytes_): - self._write_int4(len(bytes_), "string is too long") - self.stream.write(bytes_) - - def save_int(self, i): - self.stream.write(INT) - self._write_int4(i) - dispatch[int] = save_int - - def save_float(self, flt): - self.stream.write(FLOAT) - self.stream.write(_float_format.pack(flt)) - dispatch[float] = save_float - - def _write_int4(self, i, error="int must be less than %i" % - (FOUR_BYTE_INT_MAX,)): - if i > FOUR_BYTE_INT_MAX: - raise SerializationError(error) - self.stream.write(_int4_format.pack(i)) - - def save_list(self, L): - self.stream.write(NEWLIST) - self._write_int4(len(L), "list is too long") - for i, item in enumerate(L): - self._write_setitem(i, item) - dispatch[list] = save_list - - def _write_setitem(self, key, value): - self._save(key) - self._save(value) - self.stream.write(SETITEM) - - def save_dict(self, d): - self.stream.write(NEWDICT) - for key, value in d.items(): - self._write_setitem(key, value) - dispatch[dict] = save_dict - - def save_tuple(self, tup): - for item in tup: - self._save(item) - self.stream.write(BUILDTUPLE) - self._write_int4(len(tup), "tuple is too long") - dispatch[tuple] = save_tuple - - -class _UnserializationOptions(object): - pass - -class _Py2UnserializationOptions(_UnserializationOptions): - - def __init__(self, py3_strings_as_str=False): - self.py3_strings_as_str = py3_strings_as_str - -class _Py3UnserializationOptions(_UnserializationOptions): - - def __init__(self, py2_strings_as_str=False): - self.py2_strings_as_str = py2_strings_as_str - -if _INPY3: - UnserializationOptions = _Py3UnserializationOptions -else: - UnserializationOptions = _Py2UnserializationOptions - -class _Stop(Exception): - pass - -class Unserializer(object): - - def __init__(self, stream, options=UnserializationOptions()): - self.stream = stream - self.options = options - - def load(self): - self.stack = [] - version = ord(self.stream.read(1)) - if version != VERSION_NUMBER: - raise VersionMismatch("%i != %i" % (version, VERSION_NUMBER)) - try: - while True: - opcode = self.stream.read(1) - if not opcode: - raise EOFError - try: - loader = self.opcodes[opcode] - except KeyError: - raise Corruption("unkown opcode %s" % (opcode,)) - loader(self) - except _Stop: - if len(self.stack) != 1: - raise UnserializationError("internal unserialization error") - return self.stack[0] - else: - raise Corruption("didn't get STOP") - - opcodes = {} - - def load_int(self): - i = self._read_int4() - self.stack.append(i) - opcodes[INT] = load_int - - def load_float(self): - binary = self.stream.read(_float_format.size) - self.stack.append(_float_format.unpack(binary)[0]) - opcodes[FLOAT] = load_float - - def _read_int4(self): - return _int4_format.unpack(self.stream.read(4))[0] - - def _read_byte_string(self): - length = self._read_int4() - as_bytes = self.stream.read(length) - return as_bytes - - def load_py3string(self): - as_bytes = self._read_byte_string() - if not _INPY3 and self.options.py3_strings_as_str: - # XXX Should we try to decode into latin-1? - self.stack.append(as_bytes) - else: - self.stack.append(as_bytes.decode("utf-8")) - opcodes[PY3STRING] = load_py3string - - def load_py2string(self): - as_bytes = self._read_byte_string() - if _INPY3 and self.options.py2_strings_as_str: - s = as_bytes.decode("latin-1") - else: - s = as_bytes - self.stack.append(s) - opcodes[PY2STRING] = load_py2string - - def load_bytes(self): - s = self._read_byte_string() - self.stack.append(s) - opcodes[BYTES] = load_bytes - - def load_unicode(self): - self.stack.append(self._read_byte_string().decode("utf-8")) - opcodes[UNICODE] = load_unicode - - def load_newlist(self): - length = self._read_int4() - self.stack.append([None] * length) - opcodes[NEWLIST] = load_newlist - - def load_setitem(self): - if len(self.stack) < 3: - raise Corruption("not enough items for setitem") - value = self.stack.pop() - key = self.stack.pop() - self.stack[-1][key] = value - opcodes[SETITEM] = load_setitem - - def load_newdict(self): - self.stack.append({}) - opcodes[NEWDICT] = load_newdict - - def load_buildtuple(self): - length = self._read_int4() - tup = tuple(self.stack[-length:]) - del self.stack[-length:] - self.stack.append(tup) - opcodes[BUILDTUPLE] = load_buildtuple - - def load_stop(self): - raise _Stop - opcodes[STOP] = load_stop diff --git a/py/execnet/xspec.py b/py/execnet/xspec.py deleted file mode 100644 index 9c5497df3ea..00000000000 --- a/py/execnet/xspec.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -(c) 2008-2009, holger krekel -""" -import py - -class XSpec: - """ Execution Specification: key1=value1//key2=value2 ... - * keys need to be unique within the specification scope - * neither key nor value are allowed to contain "//" - * keys are not allowed to contain "=" - * keys are not allowed to start with underscore - * if no "=value" is given, assume a boolean True value - """ - # XXX allow customization, for only allow specific key names - popen = ssh = socket = python = chdir = nice = None - - def __init__(self, string): - self._spec = string - for keyvalue in string.split("//"): - i = keyvalue.find("=") - if i == -1: - key, value = keyvalue, True - else: - key, value = keyvalue[:i], keyvalue[i+1:] - if key[0] == "_": - raise AttributeError("%r not a valid XSpec key" % key) - if key in self.__dict__: - raise ValueError("duplicate key: %r in %r" %(key, string)) - setattr(self, key, value) - - def __getattr__(self, name): - if name[0] == "_": - raise AttributeError(name) - return None - - def __repr__(self): - return "" %(self._spec,) - def __str__(self): - return self._spec - - def __hash__(self): - return hash(self._spec) - def __eq__(self, other): - return self._spec == getattr(other, '_spec', None) - def __ne__(self, other): - return self._spec != getattr(other, '_spec', None) - - def _samefilesystem(self): - return bool(self.popen and not self.chdir) - -def makegateway(spec): - if not isinstance(spec, XSpec): - spec = XSpec(spec) - if spec.popen: - gw = py.execnet.PopenGateway(python=spec.python) - elif spec.ssh: - gw = py.execnet.SshGateway(spec.ssh, remotepython=spec.python) - elif spec.socket: - assert not spec.python, "socket: specifying python executables not supported" - hostport = spec.socket.split(":") - gw = py.execnet.SocketGateway(*hostport) - else: - raise ValueError("no gateway type found for %r" % (spec._spec,)) - gw.spec = spec - if spec.chdir or spec.nice: - channel = gw.remote_exec(""" - import os - path, nice = channel.receive() - if path: - if not os.path.exists(path): - os.mkdir(path) - os.chdir(path) - if nice and hasattr(os, 'nice'): - os.nice(nice) - """) - nice = spec.nice and int(spec.nice) or 0 - channel.send((spec.chdir, nice)) - channel.waitclose() - return gw diff --git a/py/path/gateway/channeltest.py b/py/path/gateway/channeltest.py index f6d951d485e..ac821aeb921 100644 --- a/py/path/gateway/channeltest.py +++ b/py/path/gateway/channeltest.py @@ -52,7 +52,7 @@ def serve(self): if __name__ == '__main__': import py - gw = py.execnet.PopenGateway() + gw = execnet.PopenGateway() channel = gw._channelfactory.new() srv = PathServer(channel) c = gw.remote_exec(""" diff --git a/py/path/gateway/channeltest2.py b/py/path/gateway/channeltest2.py index f15ac94d1c4..827abb7d32b 100644 --- a/py/path/gateway/channeltest2.py +++ b/py/path/gateway/channeltest2.py @@ -11,8 +11,8 @@ ''' -#gw = py.execnet.SshGateway('codespeak.net') -gw = py.execnet.PopenGateway() +#gw = execnet.SshGateway('codespeak.net') +gw = execnet.PopenGateway() gw.remote_init_threads(5) c = gw.remote_exec(SRC, stdout=py.std.sys.stdout, stderr=py.std.sys.stderr) subchannel = gw._channelfactory.new() diff --git a/py/test/config.py b/py/test/config.py index 083f61e0dd3..ed2b88cb344 100644 --- a/py/test/config.py +++ b/py/test/config.py @@ -252,7 +252,8 @@ def getxspecs(self): xspeclist.extend([xspec[i+1:]] * num) if not xspeclist: raise self.Error("MISSING test execution (tx) nodes: please specify --tx") - return [py.execnet.XSpec(x) for x in xspeclist] + import execnet + return [execnet.XSpec(x) for x in xspeclist] def getrsyncdirs(self): config = self diff --git a/py/test/defaultconftest.py b/py/test/defaultconftest.py index 4ebeaa4a42b..f837b2c08f1 100644 --- a/py/test/defaultconftest.py +++ b/py/test/defaultconftest.py @@ -10,5 +10,5 @@ Function = py.test.collect.Function Instance = py.test.collect.Instance -pytest_plugins = "default runner capture terminal keyword xfail tmpdir execnetcleanup monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split() +pytest_plugins = "default runner capture terminal keyword xfail tmpdir monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split() diff --git a/py/test/dist/gwmanage.py b/py/test/dist/gwmanage.py index c78b8db1522..00aa44c0bd0 100644 --- a/py/test/dist/gwmanage.py +++ b/py/test/dist/gwmanage.py @@ -4,7 +4,8 @@ import py import sys, os -from py.__.execnet.gateway_base import RemoteError +import execnet +from execnet.gateway_base import RemoteError class GatewayManager: RemoteError = RemoteError @@ -13,8 +14,8 @@ def __init__(self, specs, hook, defaultchdir="pyexecnetcache"): self.specs = [] self.hook = hook for spec in specs: - if not isinstance(spec, py.execnet.XSpec): - spec = py.execnet.XSpec(spec) + if not isinstance(spec, execnet.XSpec): + spec = execnet.XSpec(spec) if not spec.chdir and not spec.popen: spec.chdir = defaultchdir self.specs.append(spec) @@ -22,7 +23,7 @@ def __init__(self, specs, hook, defaultchdir="pyexecnetcache"): def makegateways(self): assert not self.gateways for spec in self.specs: - gw = py.execnet.makegateway(spec) + gw = execnet.makegateway(spec) self.gateways.append(gw) gw.id = "[%s]" % len(self.gateways) self.hook.pytest_gwmanage_newgateway( @@ -39,7 +40,7 @@ def getgateways(self, remote=True, inplacelocal=True): else: if remote: l.append(gw) - return py.execnet.MultiGateway(gateways=l) + return execnet.MultiGateway(gateways=l) def multi_exec(self, source, inplacelocal=True): """ remote execute code on all gateways. @@ -87,7 +88,7 @@ def exit(self): gw = self.gateways.pop() gw.exit() -class HostRSync(py.execnet.RSync): +class HostRSync(execnet.RSync): """ RSyncer that filters out common files """ def __init__(self, sourcedir, *args, **kwargs): diff --git a/py/test/dist/mypickle.py b/py/test/dist/mypickle.py index 38688319673..d1aa14842a5 100644 --- a/py/test/dist/mypickle.py +++ b/py/test/dist/mypickle.py @@ -13,7 +13,7 @@ """ import py -from py.__.execnet.gateway_base import Channel +from execnet.gateway_base import Channel import sys, os, struct #debug = open("log-mypickle-%d" % os.getpid(), 'w') diff --git a/py/test/looponfail/remote.py b/py/test/looponfail/remote.py index 7967652a79d..dee4452816d 100644 --- a/py/test/looponfail/remote.py +++ b/py/test/looponfail/remote.py @@ -7,10 +7,9 @@ otherwise changes to source code can crash the controlling process which should never happen. """ - -from __future__ import generators import py import sys +import execnet from py.__.test.session import Session from py.__.test.dist.mypickle import PickleChannel from py.__.test.looponfail import util @@ -55,7 +54,7 @@ def trace(self, *args): py.builtin.print_("RemoteControl:", msg) def initgateway(self): - return py.execnet.PopenGateway() + return execnet.PopenGateway() def setup(self, out=None): if out is None: diff --git a/py/test/plugin/pytest_execnetcleanup.py b/py/test/plugin/pytest_execnetcleanup.py deleted file mode 100644 index a10bf378652..00000000000 --- a/py/test/plugin/pytest_execnetcleanup.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -cleanup execnet gateways during test function runs. -""" -import py - -pytest_plugins = "xfail" - -def pytest_configure(config): - config.pluginmanager.register(Execnetcleanup()) - -class Execnetcleanup: - _gateways = None - def __init__(self, debug=False): - self._debug = debug - - def pyexecnet_gateway_init(self, gateway): - if self._gateways is not None: - self._gateways.append(gateway) - - def pyexecnet_gateway_exit(self, gateway): - if self._gateways is not None: - self._gateways.remove(gateway) - - def pytest_sessionstart(self, session): - self._gateways = [] - - def pytest_sessionfinish(self, session, exitstatus): - l = [] - for gw in self._gateways: - gw.exit() - l.append(gw) - #for gw in l: - # gw.join() - - def pytest_pyfunc_call(self, __multicall__, pyfuncitem): - if self._gateways is not None: - gateways = self._gateways[:] - res = __multicall__.execute() - while len(self._gateways) > len(gateways): - self._gateways[-1].exit() - return res diff --git a/setup.py b/setup.py index 8d541b084f0..d6c9596e7b6 100644 --- a/setup.py +++ b/setup.py @@ -8,22 +8,20 @@ long_description = """ -advanced testing and development support library: +advanced testing and development support library: - `py.test`_: cross-project testing tool with many advanced features -- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes -- `py.path`_: path abstractions over local and subversion files +- `py.path`_: path abstractions over local and subversion files - `py.code`_: dynamic code compile and traceback printing support -Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6. +Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1. For questions please check out http://pylib.org/contact.html .. _`py.test`: http://pylib.org/test.html -.. _`py.execnet`: http://pylib.org/execnet.html .. _`py.path`: http://pylib.org/path.html .. _`py.code`: http://pylib.org/code.html -(c) Holger Krekel and others, 2009 +(c) Holger Krekel and others, 2009 """ trunk = 'trunk' @@ -63,8 +61,6 @@ def main(): 'py.cmdline', 'py.code', 'py.compat', - 'py.execnet', - 'py.execnet.script', 'py.io', 'py.log', 'py.path', diff --git a/testing/execnet/__init__.py b/testing/execnet/__init__.py deleted file mode 100644 index 792d6005489..00000000000 --- a/testing/execnet/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/testing/execnet/conftest.py b/testing/execnet/conftest.py deleted file mode 100644 index 94c89503eba..00000000000 --- a/testing/execnet/conftest.py +++ /dev/null @@ -1,46 +0,0 @@ -import py - -def pytest_generate_tests(metafunc): - if 'gw' in metafunc.funcargnames: - if hasattr(metafunc.cls, 'gwtype'): - gwtypes = [metafunc.cls.gwtype] - else: - gwtypes = ['popen', 'socket', 'ssh'] - for gwtype in gwtypes: - metafunc.addcall(id=gwtype, param=gwtype) - -def pytest_funcarg__gw(request): - scope = "session" - if request.param == "popen": - return request.cached_setup( - setup=py.execnet.PopenGateway, - teardown=lambda gw: gw.exit(), - extrakey=request.param, - scope=scope) - elif request.param == "socket": - return request.cached_setup( - setup=setup_socket_gateway, - teardown=teardown_socket_gateway, - extrakey=request.param, - scope=scope) - elif request.param == "ssh": - return request.cached_setup( - setup=lambda: setup_ssh_gateway(request), - teardown=lambda gw: gw.exit(), - extrakey=request.param, - scope=scope) - -def setup_socket_gateway(): - proxygw = py.execnet.PopenGateway() - gw = py.execnet.SocketGateway.new_remote(proxygw, ("127.0.0.1", 0)) - gw.proxygw = proxygw - return gw - -def teardown_socket_gateway(gw): - gw.exit() - gw.proxygw.exit() - -def setup_ssh_gateway(request): - sshhost = request.getfuncargvalue('specssh').ssh - gw = py.execnet.SshGateway(sshhost) - return gw diff --git a/testing/execnet/test_basics.py b/testing/execnet/test_basics.py deleted file mode 100644 index edba7cbddff..00000000000 --- a/testing/execnet/test_basics.py +++ /dev/null @@ -1,198 +0,0 @@ - -import py -import sys, os, subprocess, inspect -from py.__.execnet import gateway_base, gateway -from py.__.execnet.gateway_base import Message, Channel, ChannelFactory - -def test_subprocess_interaction(anypython): - line = gateway.popen_bootstrapline - compile(line, 'xyz', 'exec') - args = [str(anypython), '-c', line] - popen = subprocess.Popen(args, bufsize=0, stderr=subprocess.STDOUT, - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - def send(line): - popen.stdin.write(line.encode('ascii')) - if sys.version_info > (3,0): # 3k still buffers - popen.stdin.flush() - def receive(): - return popen.stdout.readline().decode('ascii') - - try: - source = py.code.Source(read_write_loop, "read_write_loop()") - repr_source = repr(str(source)) + "\n" - sendline = repr_source - send(sendline) - s = receive() - assert s == "ok\n" - send("hello\n") - s = receive() - assert s == "received: hello\n" - send("world\n") - s = receive() - assert s == "received: world\n" - finally: - popen.stdin.close() - popen.stdout.close() - popen.wait() - -def read_write_loop(): - import os, sys - sys.stdout.write("ok\n") - sys.stdout.flush() - while 1: - try: - line = sys.stdin.readline() - sys.stdout.write("received: %s" % line) - sys.stdout.flush() - except (IOError, EOFError): - break - -def pytest_generate_tests(metafunc): - if 'anypython' in metafunc.funcargnames: - for name in 'python3.1', 'python2.4', 'python2.5', 'python2.6': - metafunc.addcall(id=name, param=name) - -def pytest_funcarg__anypython(request): - name = request.param - executable = py.path.local.sysfind(name) - if executable is None: - py.test.skip("no %s found" % (name,)) - return executable - -def test_io_message(anypython, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - try: - from io import BytesIO - except ImportError: - from StringIO import StringIO as BytesIO - import tempfile - temp_out = BytesIO() - temp_in = BytesIO() - io = Popen2IO(temp_out, temp_in) - for i, msg_cls in Message._types.items(): - print ("checking %s %s" %(i, msg_cls)) - for data in "hello", "hello".encode('ascii'): - msg1 = msg_cls(i, data) - msg1.writeto(io) - x = io.outfile.getvalue() - io.outfile.truncate(0) - io.outfile.seek(0) - io.infile.seek(0) - io.infile.write(x) - io.infile.seek(0) - msg2 = Message.readfrom(io) - assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data - print ("all passed") - """)) - #out = py.process.cmdexec("%s %s" %(executable,check)) - out = anypython.sysexec(check) - print (out) - assert "all passed" in out - -def test_popen_io(anypython, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - do_exec(Popen2IO.server_stmt, globals()) - io.write("hello".encode('ascii')) - s = io.read(1) - assert s == "x".encode('ascii') - """)) - from subprocess import Popen, PIPE - args = [str(anypython), str(check)] - proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) - proc.stdin.write("x".encode('ascii')) - stdout, stderr = proc.communicate() - print (stderr) - ret = proc.wait() - assert "hello".encode('ascii') in stdout - - -def test_rinfo_source(anypython, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(""" - class Channel: - def send(self, data): - assert eval(repr(data), {}) == data - channel = Channel() - """, gateway.rinfo_source, """ - print ('all passed') - """)) - out = anypython.sysexec(check) - print (out) - assert "all passed" in out - -def test_geterrortext(anypython, tmpdir): - check = tmpdir.join("check.py") - check.write(py.code.Source(gateway_base, """ - class Arg: - pass - errortext = geterrortext((Arg, "1", 4)) - assert "Arg" in errortext - import sys - try: - raise ValueError("17") - except ValueError: - excinfo = sys.exc_info() - s = geterrortext(excinfo) - assert "17" in s - print ("all passed") - """)) - out = anypython.sysexec(check) - print (out) - assert "all passed" in out - -def test_stdouterrin_setnull(): - cap = py.io.StdCaptureFD() - from py.__.execnet.gateway import stdouterrin_setnull - stdouterrin_setnull() - import os - os.write(1, "hello".encode('ascii')) - if os.name == "nt": - os.write(2, "world") - os.read(0, 1) - out, err = cap.reset() - assert not out - assert not err - - -class TestMessage: - def test_wire_protocol(self): - for cls in Message._types.values(): - one = py.io.BytesIO() - data = '23'.encode('ascii') - cls(42, data).writeto(one) - two = py.io.BytesIO(one.getvalue()) - msg = Message.readfrom(two) - assert isinstance(msg, cls) - assert msg.channelid == 42 - assert msg.data == data - assert isinstance(repr(msg), str) - # == "" %(msg.__class__.__name__, ) - -class TestPureChannel: - def setup_method(self, method): - self.fac = ChannelFactory(None) - - def test_factory_create(self): - chan1 = self.fac.new() - assert chan1.id == 1 - chan2 = self.fac.new() - assert chan2.id == 3 - - def test_factory_getitem(self): - chan1 = self.fac.new() - assert self.fac._channels[chan1.id] == chan1 - chan2 = self.fac.new() - assert self.fac._channels[chan2.id] == chan2 - - def test_channel_timeouterror(self): - channel = self.fac.new() - py.test.raises(IOError, channel.waitclose, timeout=0.01) - - def test_channel_makefile_incompatmode(self): - channel = self.fac.new() - py.test.raises(ValueError, 'channel.makefile("rw")') - - diff --git a/testing/execnet/test_gateway.py b/testing/execnet/test_gateway.py deleted file mode 100644 index 90ad2992663..00000000000 --- a/testing/execnet/test_gateway.py +++ /dev/null @@ -1,545 +0,0 @@ -""" -mostly functional tests of gateways. -""" -import os, sys, time -import py -from py.__.execnet import gateway_base, gateway -queue = py.builtin._tryimport('queue', 'Queue') - -TESTTIMEOUT = 10.0 # seconds - -class TestBasicRemoteExecution: - def test_correct_setup(self, gw): - assert gw._receiverthread.isAlive() - - def test_repr_doesnt_crash(self, gw): - assert isinstance(repr(gw), str) - - def test_attribute__name__(self, gw): - channel = gw.remote_exec("channel.send(__name__)") - name = channel.receive() - assert name == "__channelexec__" - - def test_correct_setup_no_py(self, gw): - channel = gw.remote_exec(""" - import sys - channel.send(list(sys.modules)) - """) - remotemodules = channel.receive() - assert 'py' not in remotemodules, ( - "py should not be imported on remote side") - - def test_remote_exec_waitclose(self, gw): - channel = gw.remote_exec('pass') - channel.waitclose(TESTTIMEOUT) - - def test_remote_exec_waitclose_2(self, gw): - channel = gw.remote_exec('def gccycle(): pass') - channel.waitclose(TESTTIMEOUT) - - def test_remote_exec_waitclose_noarg(self, gw): - channel = gw.remote_exec('pass') - channel.waitclose() - - def test_remote_exec_error_after_close(self, gw): - channel = gw.remote_exec('pass') - channel.waitclose(TESTTIMEOUT) - py.test.raises(IOError, channel.send, 0) - - def test_remote_exec_channel_anonymous(self, gw): - channel = gw.remote_exec(''' - obj = channel.receive() - channel.send(obj) - ''') - channel.send(42) - result = channel.receive() - assert result == 42 - -class TestChannelBasicBehaviour: - def test_channel_close_and_then_receive_error(self, gw): - channel = gw.remote_exec('raise ValueError') - py.test.raises(channel.RemoteError, channel.receive) - - def test_channel_finish_and_then_EOFError(self, gw): - channel = gw.remote_exec('channel.send(42)') - x = channel.receive() - assert x == 42 - py.test.raises(EOFError, channel.receive) - py.test.raises(EOFError, channel.receive) - py.test.raises(EOFError, channel.receive) - - def test_channel_close_and_then_receive_error_multiple(self, gw): - channel = gw.remote_exec('channel.send(42) ; raise ValueError') - x = channel.receive() - assert x == 42 - py.test.raises(channel.RemoteError, channel.receive) - - def test_channel__local_close(self, gw): - channel = gw._channelfactory.new() - gw._channelfactory._local_close(channel.id) - channel.waitclose(0.1) - - def test_channel__local_close_error(self, gw): - channel = gw._channelfactory.new() - gw._channelfactory._local_close(channel.id, - channel.RemoteError("error")) - py.test.raises(channel.RemoteError, channel.waitclose, 0.01) - - def test_channel_error_reporting(self, gw): - channel = gw.remote_exec('def foo():\n return foobar()\nfoo()\n') - try: - channel.receive() - except channel.RemoteError: - e = sys.exc_info()[1] - assert str(e).startswith('Traceback (most recent call last):') - assert str(e).find('NameError: global name \'foobar\' ' - 'is not defined') > -1 - else: - py.test.fail('No exception raised') - - def test_channel_syntax_error(self, gw): - # missing colon - channel = gw.remote_exec('def foo()\n return 1\nfoo()\n') - try: - channel.receive() - except channel.RemoteError: - e = sys.exc_info()[1] - assert str(e).startswith('Traceback (most recent call last):') - assert str(e).find('SyntaxError') > -1 - - def test_channel_iter(self, gw): - channel = gw.remote_exec(""" - for x in range(3): - channel.send(x) - """) - l = list(channel) - assert l == [0, 1, 2] - - def test_channel_passing_over_channel(self, gw): - channel = gw.remote_exec(''' - c = channel.gateway.newchannel() - channel.send(c) - c.send(42) - ''') - c = channel.receive() - x = c.receive() - assert x == 42 - - # check that the both sides previous channels are really gone - channel.waitclose(TESTTIMEOUT) - #assert c.id not in gw._channelfactory - newchan = gw.remote_exec(''' - assert %d not in channel.gateway._channelfactory._channels - ''' % (channel.id)) - newchan.waitclose(TESTTIMEOUT) - assert channel.id not in gw._channelfactory._channels - - def test_channel_receiver_callback(self, gw): - l = [] - #channel = gw.newchannel(receiver=l.append) - channel = gw.remote_exec(source=''' - channel.send(42) - channel.send(13) - channel.send(channel.gateway.newchannel()) - ''') - channel.setcallback(callback=l.append) - py.test.raises(IOError, channel.receive) - channel.waitclose(TESTTIMEOUT) - assert len(l) == 3 - assert l[:2] == [42,13] - assert isinstance(l[2], channel.__class__) - - def test_channel_callback_after_receive(self, gw): - l = [] - channel = gw.remote_exec(source=''' - channel.send(42) - channel.send(13) - channel.send(channel.gateway.newchannel()) - ''') - x = channel.receive() - assert x == 42 - channel.setcallback(callback=l.append) - py.test.raises(IOError, channel.receive) - channel.waitclose(TESTTIMEOUT) - assert len(l) == 2 - assert l[0] == 13 - assert isinstance(l[1], channel.__class__) - - def test_waiting_for_callbacks(self, gw): - l = [] - def callback(msg): - import time; time.sleep(0.2) - l.append(msg) - channel = gw.remote_exec(source=''' - channel.send(42) - ''') - channel.setcallback(callback) - channel.waitclose(TESTTIMEOUT) - assert l == [42] - - def test_channel_callback_stays_active(self, gw): - self.check_channel_callback_stays_active(gw, earlyfree=True) - - def check_channel_callback_stays_active(self, gw, earlyfree=True): - # with 'earlyfree==True', this tests the "sendonly" channel state. - l = [] - channel = gw.remote_exec(source=''' - try: - import thread - except ImportError: - import _thread as thread - import time - def producer(subchannel): - for i in range(5): - time.sleep(0.15) - subchannel.send(i*100) - channel2 = channel.receive() - thread.start_new_thread(producer, (channel2,)) - del channel2 - ''') - subchannel = gw.newchannel() - subchannel.setcallback(l.append) - channel.send(subchannel) - if earlyfree: - subchannel = None - counter = 100 - while len(l) < 5: - if subchannel and subchannel.isclosed(): - break - counter -= 1 - print(counter) - if not counter: - py.test.fail("timed out waiting for the answer[%d]" % len(l)) - time.sleep(0.04) # busy-wait - assert l == [0, 100, 200, 300, 400] - return subchannel - - def test_channel_callback_remote_freed(self, gw): - channel = self.check_channel_callback_stays_active(gw, earlyfree=False) - # freed automatically at the end of producer() - channel.waitclose(TESTTIMEOUT) - - def test_channel_endmarker_callback(self, gw): - l = [] - channel = gw.remote_exec(source=''' - channel.send(42) - channel.send(13) - channel.send(channel.gateway.newchannel()) - ''') - channel.setcallback(l.append, 999) - py.test.raises(IOError, channel.receive) - channel.waitclose(TESTTIMEOUT) - assert len(l) == 4 - assert l[:2] == [42,13] - assert isinstance(l[2], channel.__class__) - assert l[3] == 999 - - def test_channel_endmarker_callback_error(self, gw): - q = queue.Queue() - channel = gw.remote_exec(source=''' - raise ValueError() - ''') - channel.setcallback(q.put, endmarker=999) - val = q.get(TESTTIMEOUT) - assert val == 999 - err = channel._getremoteerror() - assert err - assert str(err).find("ValueError") != -1 - - @py.test.mark.xfail - def test_remote_redirect_stdout(self, gw): - out = py.io.TextIO() - handle = gw._remote_redirect(stdout=out) - c = gw.remote_exec("print 42") - c.waitclose(TESTTIMEOUT) - handle.close() - s = out.getvalue() - assert s.strip() == "42" - - @py.test.mark.xfail - def test_remote_exec_redirect_multi(self, gw): - num = 3 - l = [[] for x in range(num)] - channels = [gw.remote_exec("print %d" % i, - stdout=l[i].append) - for i in range(num)] - for x in channels: - x.waitclose(TESTTIMEOUT) - - for i in range(num): - subl = l[i] - assert subl - s = subl[0] - assert s.strip() == str(i) - -class TestChannelFile: - def test_channel_file_write(self, gw): - channel = gw.remote_exec(""" - f = channel.makefile() - f.write("hello world\\n") - f.close() - channel.send(42) - """) - first = channel.receive() - assert first.strip() == 'hello world' - second = channel.receive() - assert second == 42 - - def test_channel_file_write_error(self, gw): - channel = gw.remote_exec("pass") - f = channel.makefile() - channel.waitclose(TESTTIMEOUT) - py.test.raises(IOError, f.write, 'hello') - - def test_channel_file_proxyclose(self, gw): - channel = gw.remote_exec(""" - f = channel.makefile(proxyclose=True) - f.write("hello world") - f.close() - channel.send(42) - """) - first = channel.receive() - assert first.strip() == 'hello world' - py.test.raises(EOFError, channel.receive) - - def test_channel_file_read(self, gw): - channel = gw.remote_exec(""" - f = channel.makefile(mode='r') - s = f.read(2) - channel.send(s) - s = f.read(5) - channel.send(s) - """) - channel.send("xyabcde") - s1 = channel.receive() - s2 = channel.receive() - assert s1 == "xy" - assert s2 == "abcde" - - def test_channel_file_read_empty(self, gw): - channel = gw.remote_exec("pass") - f = channel.makefile(mode="r") - s = f.read(3) - assert s == "" - s = f.read(5) - assert s == "" - - def test_channel_file_readline_remote(self, gw): - channel = gw.remote_exec(""" - channel.send('123\\n45') - """) - channel.waitclose(TESTTIMEOUT) - f = channel.makefile(mode="r") - s = f.readline() - assert s == "123\n" - s = f.readline() - assert s == "45" - - def test_channel_makefile_incompatmode(self, gw): - channel = gw.newchannel() - py.test.raises(ValueError, 'channel.makefile("rw")') - - def test_confusion_from_os_write_stdout(self, gw): - channel = gw.remote_exec(""" - import os - os.write(1, 'confusion!'.encode('ascii')) - channel.send(channel.receive() * 6) - channel.send(channel.receive() * 6) - """) - channel.send(3) - res = channel.receive() - assert res == 18 - channel.send(7) - res = channel.receive() - assert res == 42 - - def test_confusion_from_os_write_stderr(self, gw): - channel = gw.remote_exec(""" - import os - os.write(2, 'test'.encode('ascii')) - channel.send(channel.receive() * 6) - channel.send(channel.receive() * 6) - """) - channel.send(3) - res = channel.receive() - assert res == 18 - channel.send(7) - res = channel.receive() - assert res == 42 - - def test__rinfo(self, gw): - rinfo = gw._rinfo() - assert rinfo.executable - assert rinfo.cwd - assert rinfo.version_info - s = repr(rinfo) - old = gw.remote_exec(""" - import os.path - cwd = os.getcwd() - channel.send(os.path.basename(cwd)) - os.chdir('..') - """).receive() - try: - rinfo2 = gw._rinfo() - assert rinfo2.cwd == rinfo.cwd - rinfo3 = gw._rinfo(update=True) - assert rinfo3.cwd != rinfo2.cwd - finally: - gw._cache_rinfo = rinfo - gw.remote_exec("import os ; os.chdir(%r)" % old).waitclose() - -def test_join_blocked_execution_gateway(): - gateway = py.execnet.PopenGateway() - channel = gateway.remote_exec(""" - import time - time.sleep(5.0) - """) - def doit(): - gateway.exit() - gateway.join(joinexec=True) - return 17 - - pool = py._thread.WorkerPool() - reply = pool.dispatch(doit) - x = reply.get(timeout=1.0) - assert x == 17 - -class TestPopenGateway: - gwtype = 'popen' - - def test_chdir_separation(self, tmpdir): - old = tmpdir.chdir() - try: - gw = py.execnet.PopenGateway() - finally: - waschangedir = old.chdir() - c = gw.remote_exec("import os ; channel.send(os.getcwd())") - x = c.receive() - assert x == str(waschangedir) - - def test_many_popen(self): - num = 4 - l = [] - for i in range(num): - l.append(py.execnet.PopenGateway()) - channels = [] - for gw in l: - channel = gw.remote_exec("""channel.send(42)""") - channels.append(channel) -## try: -## while channels: -## channel = channels.pop() -## try: -## ret = channel.receive() -## assert ret == 42 -## finally: -## channel.gateway.exit() -## finally: -## for x in channels: -## x.gateway.exit() - while channels: - channel = channels.pop() - ret = channel.receive() - assert ret == 42 - - def test_rinfo_popen(self, gw): - rinfo = gw._rinfo() - assert rinfo.executable == py.std.sys.executable - assert rinfo.cwd == py.std.os.getcwd() - assert rinfo.version_info == py.std.sys.version_info - - def test_gateway_init_event(self, _pytest): - rec = _pytest.gethookrecorder(gateway.ExecnetAPI) - gw = py.execnet.PopenGateway() - call = rec.popcall("pyexecnet_gateway_init") - assert call.gateway == gw - gw.exit() - call = rec.popcall("pyexecnet_gateway_exit") - assert call.gateway == gw - - @py.test.mark.xfail # "fix needed: dying remote process does not cause waitclose() to fail" - def test_waitclose_on_remote_killed(self): - gw = py.execnet.PopenGateway() - channel = gw.remote_exec(""" - import os - import time - channel.send(os.getpid()) - while 1: - channel.send("#" * 100) - """) - remotepid = channel.receive() - py.process.kill(remotepid) - py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)") - py.test.raises(EOFError, channel.send, None) - py.test.raises(EOFError, channel.receive) - -@py.test.mark.xfail -def test_endmarker_delivery_on_remote_killterm(): - if not hasattr(py.std.os, 'kill'): - py.test.skip("no os.kill()") - gw = py.execnet.PopenGateway() - try: - q = queue.Queue() - channel = gw.remote_exec(source=''' - import os - os.kill(os.getpid(), 15) - ''') - channel.setcallback(q.put, endmarker=999) - val = q.get(TESTTIMEOUT) - assert val == 999 - err = channel._getremoteerror() - finally: - gw.exit() - assert "killed" in str(err) - assert "15" in str(err) - - -def test_socket_gw_host_not_found(gw): - py.test.raises(py.execnet.HostNotFound, - 'py.execnet.SocketGateway("qowieuqowe", 9000)' - ) - -class TestSshPopenGateway: - gwtype = "ssh" - - def test_sshconfig_config_parsing(self, monkeypatch): - import subprocess - l = [] - monkeypatch.setattr(subprocess, 'Popen', - lambda *args, **kwargs: l.append(args[0])) - py.test.raises(AttributeError, - """py.execnet.SshGateway("xyz", ssh_config='qwe')""") - assert len(l) == 1 - popen_args = l[0] - i = popen_args.index('-F') - assert popen_args[i+1] == "qwe" - - def test_sshaddress(self, gw, specssh): - assert gw.remoteaddress == specssh.ssh - - def test_host_not_found(self): - py.test.raises(py.execnet.HostNotFound, - "py.execnet.SshGateway('nowhere.codespeak.net')") - -class TestThreads: - def test_threads(self): - gw = py.execnet.PopenGateway() - gw.remote_init_threads(3) - c1 = gw.remote_exec("channel.send(channel.receive())") - c2 = gw.remote_exec("channel.send(channel.receive())") - c2.send(1) - res = c2.receive() - assert res == 1 - c1.send(42) - res = c1.receive() - assert res == 42 - - def test_threads_twice(self): - gw = py.execnet.PopenGateway() - gw.remote_init_threads(3) - py.test.raises(IOError, gw.remote_init_threads, 3) - - -def test_nodebug(): - from py.__.execnet import gateway_base - assert not gateway_base.debug diff --git a/testing/execnet/test_multi.py b/testing/execnet/test_multi.py deleted file mode 100644 index d35dd1e861f..00000000000 --- a/testing/execnet/test_multi.py +++ /dev/null @@ -1,58 +0,0 @@ -""" - tests for - - multi channels and multi gateways - -""" - -import py - -class TestMultiChannelAndGateway: - def test_multichannel_receive_each(self): - class pseudochannel: - def receive(self): - return 12 - - pc1 = pseudochannel() - pc2 = pseudochannel() - multichannel = py.execnet.MultiChannel([pc1, pc2]) - l = multichannel.receive_each(withchannel=True) - assert len(l) == 2 - assert l == [(pc1, 12), (pc2, 12)] - l = multichannel.receive_each(withchannel=False) - assert l == [12,12] - - def test_multichannel_send_each(self): - l = [py.execnet.PopenGateway() for x in range(2)] - gm = py.execnet.MultiGateway(l) - mc = gm.remote_exec(""" - import os - channel.send(channel.receive() + 1) - """) - mc.send_each(41) - l = mc.receive_each() - assert l == [42,42] - - def test_multichannel_receive_queue_for_two_subprocesses(self): - l = [py.execnet.PopenGateway() for x in range(2)] - gm = py.execnet.MultiGateway(l) - mc = gm.remote_exec(""" - import os - channel.send(os.getpid()) - """) - queue = mc.make_receive_queue() - ch, item = queue.get(timeout=10) - ch2, item2 = queue.get(timeout=10) - assert ch != ch2 - assert ch.gateway != ch2.gateway - assert item != item2 - mc.waitclose() - - def test_multichannel_waitclose(self): - l = [] - class pseudochannel: - def waitclose(self): - l.append(0) - multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()]) - multichannel.waitclose() - assert len(l) == 2 - diff --git a/testing/execnet/test_rsync.py b/testing/execnet/test_rsync.py deleted file mode 100644 index 3c1f50282b4..00000000000 --- a/testing/execnet/test_rsync.py +++ /dev/null @@ -1,148 +0,0 @@ -import py -from py.execnet import RSync - - -def pytest_funcarg__gw1(request): - return request.cached_setup( - setup=py.execnet.PopenGateway, - teardown=lambda val: val.exit(), - scope="module" - ) -pytest_funcarg__gw2 = pytest_funcarg__gw1 - -def pytest_funcarg__dirs(request): - t = request.getfuncargvalue('tmpdir') - class dirs: - source = t.join("source") - dest1 = t.join("dest1") - dest2 = t.join("dest2") - return dirs - -class TestRSync: - def test_notargets(self, dirs): - rsync = RSync(dirs.source) - py.test.raises(IOError, "rsync.send()") - assert rsync.send(raises=False) is None - - def test_dirsync(self, dirs, gw1, gw2): - dest = dirs.dest1 - dest2 = dirs.dest2 - source = dirs.source - - for s in ('content1', 'content2', 'content2-a-bit-longer'): - source.ensure('subdir', 'file1').write(s) - rsync = RSync(dirs.source) - rsync.add_target(gw1, dest) - rsync.add_target(gw2, dest2) - rsync.send() - assert dest.join('subdir').check(dir=1) - assert dest.join('subdir', 'file1').check(file=1) - assert dest.join('subdir', 'file1').read() == s - assert dest2.join('subdir').check(dir=1) - assert dest2.join('subdir', 'file1').check(file=1) - assert dest2.join('subdir', 'file1').read() == s - for x in dest, dest2: - fn = x.join("subdir", "file1") - fn.setmtime(0) - - source.join('subdir').remove('file1') - rsync = RSync(source) - rsync.add_target(gw2, dest2) - rsync.add_target(gw1, dest) - rsync.send() - assert dest.join('subdir', 'file1').check(file=1) - assert dest2.join('subdir', 'file1').check(file=1) - rsync = RSync(source) - rsync.add_target(gw1, dest, delete=True) - rsync.add_target(gw2, dest2) - rsync.send() - assert not dest.join('subdir', 'file1').check() - assert dest2.join('subdir', 'file1').check() - - def test_dirsync_twice(self, dirs, gw1, gw2): - source = dirs.source - source.ensure("hello") - rsync = RSync(source) - rsync.add_target(gw1, dirs.dest1) - rsync.send() - assert dirs.dest1.join('hello').check() - py.test.raises(IOError, "rsync.send()") - assert rsync.send(raises=False) is None - rsync.add_target(gw1, dirs.dest2) - rsync.send() - assert dirs.dest2.join('hello').check() - py.test.raises(IOError, "rsync.send()") - assert rsync.send(raises=False) is None - - def test_rsync_default_reporting(self, capsys, dirs, gw1): - source = dirs.source - source.ensure("hello") - rsync = RSync(source) - rsync.add_target(gw1, dirs.dest1) - rsync.send() - out, err = capsys.readouterr() - assert out.find("hello") != -1 - - def test_rsync_non_verbose(self, capsys, dirs, gw1): - source = dirs.source - source.ensure("hello") - rsync = RSync(source, verbose=False) - rsync.add_target(gw1, dirs.dest1) - rsync.send() - out, err = capsys.readouterr() - assert not out - assert not err - - def test_symlink_rsync(self, dirs, gw1): - if py.std.sys.platform == 'win32': - py.test.skip("symlinks are unsupported on Windows.") - source = dirs.source - dest = dirs.dest1 - dirs.source.ensure("existant") - source.join("rellink").mksymlinkto(source.join("existant"), absolute=0) - source.join('abslink').mksymlinkto(source.join("existant")) - - rsync = RSync(source) - rsync.add_target(gw1, dest) - rsync.send() - - assert dest.join('rellink').readlink() == dest.join("existant") - assert dest.join('abslink').readlink() == dest.join("existant") - - def test_callback(self, dirs, gw1): - dest = dirs.dest1 - source = dirs.source - source.ensure("existant").write("a" * 100) - source.ensure("existant2").write("a" * 10) - total = {} - def callback(cmd, lgt, channel): - total[(cmd, lgt)] = True - - rsync = RSync(source, callback=callback) - #rsync = RSync() - rsync.add_target(gw1, dest) - rsync.send() - - assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True} - - def test_file_disappearing(self, dirs, gw1): - dest = dirs.dest1 - source = dirs.source - source.ensure("ex").write("a" * 100) - source.ensure("ex2").write("a" * 100) - - class DRsync(RSync): - def filter(self, x): - assert x != source - if x.endswith("ex2"): - self.x = 1 - source.join("ex2").remove() - return True - - rsync = DRsync(source) - rsync.add_target(gw1, dest) - rsync.send() - assert rsync.x == 1 - assert len(dest.listdir()) == 1 - assert len(source.listdir()) == 1 - diff --git a/testing/execnet/test_serializer.py b/testing/execnet/test_serializer.py deleted file mode 100755 index d03e703c736..00000000000 --- a/testing/execnet/test_serializer.py +++ /dev/null @@ -1,179 +0,0 @@ -# -*- coding: utf-8 -*- -import sys -import os -import tempfile -import subprocess -import py -from py.__.execnet import serializer - - -def _find_version(suffix=""): - name = "python" + suffix - executable = py.path.local.sysfind(name) - if executable is None: - py.test.skip("can't find a %r executable" % (name,)) - return executable - -def setup_module(mod): - mod.TEMPDIR = py.path.local(tempfile.mkdtemp()) - if sys.version_info > (3, 0): - mod._py3_wrapper = PythonWrapper(py.path.local(sys.executable)) - mod._py2_wrapper = PythonWrapper(_find_version()) - else: - mod._py3_wrapper = PythonWrapper(_find_version("3")) - mod._py2_wrapper = PythonWrapper(py.path.local(sys.executable)) - mod._old_pypath = os.environ.get("PYTHONPATH") - pylib = str(py.path.local(py.__file__).dirpath().join("..")) - os.environ["PYTHONPATH"] = pylib - -def teardown_module(mod): - TEMPDIR.remove(True) - if _old_pypath is not None: - os.environ["PYTHONPATH"] = _old_pypath - - -class PythonWrapper(object): - - def __init__(self, executable): - self.executable = executable - - def dump(self, obj_rep): - script_file = TEMPDIR.join("dump.py") - script_file.write(""" -from py.__.execnet import serializer -import sys -if sys.version_info > (3, 0): # Need binary output - sys.stdout = sys.stdout.detach() -saver = serializer.Serializer(sys.stdout) -saver.save(%s)""" % (obj_rep,)) - return self.executable.sysexec(script_file) - - def load(self, data, option_args=""): - script_file = TEMPDIR.join("load.py") - script_file.write(r""" -from py.__.execnet import serializer -import sys -if sys.version_info > (3, 0): - sys.stdin = sys.stdin.detach() -options = serializer.UnserializationOptions(%s) -loader = serializer.Unserializer(sys.stdin, options) -obj = loader.load() -sys.stdout.write(type(obj).__name__ + "\n") -sys.stdout.write(repr(obj))""" % (option_args,)) - popen = subprocess.Popen([str(self.executable), str(script_file)], - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE) - stdout, stderr = popen.communicate(data.encode("latin-1")) - ret = popen.returncode - if ret: - raise py.process.cmdexec.Error(ret, ret, str(self.executable), - stdout, stderr) - return [s.decode("ascii") for s in stdout.splitlines()] - - def __repr__(self): - return "" % (self.executable,) - - -def pytest_funcarg__py2(request): - return _py2_wrapper - -def pytest_funcarg__py3(request): - return _py3_wrapper - -def pytest_funcarg__dump(request): - py_dump = request.getfuncargvalue(request.param[0]) - return py_dump.dump - -def pytest_funcarg__load(request): - py_dump = request.getfuncargvalue(request.param[1]) - return py_dump.load - -def pytest_generate_tests(metafunc): - if 'dump' in metafunc.funcargnames and 'load' in metafunc.funcargnames: - pys = 'py2', 'py3' - for dump in pys: - for load in pys: - param = (dump, load) - conversion = '%s to %s'%param - if 'repr' not in metafunc.funcargnames: - metafunc.addcall(id=conversion, param=param) - else: - for tp, repr in simple_tests.items(): - metafunc.addcall( - id='%s:%s'%(tp, conversion), - param=param, - funcargs={'tp_name':tp, 'repr':repr}, - ) - - -simple_tests = { -# type: expected before/after repr - 'int': '4', - 'float':'3.25', - 'list': '[1, 2, 3]', - 'tuple': '(1, 2, 3)', - 'dict': '{6: 2, (1, 2, 3): 32}', -} - -def test_simple(tp_name, repr, dump, load): - p = dump(repr) - tp , v = load(p) - assert tp == tp_name - assert v == repr - - -@py.test.mark.xfail -# I'm not sure if we need the complexity. -def test_recursive_list(py2, py3): - l = [1, 2, 3] - l.append(l) - p = py2.dump(l) - tp, rep = py2.load(l) - assert tp == "list" - -def test_bigint_should_fail(): - py.test.raises(serializer.SerializationError, - serializer.Serializer(py.io.BytesIO()).save, - 123456678900) - -def test_bytes(py2, py3): - p = py3.dump("b'hi'") - tp, v = py2.load(p) - assert tp == "str" - assert v == "'hi'" - tp, v = py3.load(p) - assert tp == "bytes" - assert v == "b'hi'" - -def test_string(py2, py3): - p = py2.dump("'xyz'") - tp, s = py2.load(p) - assert tp == "str" - assert s == "'xyz'" - tp, s = py3.load(p) - assert tp == "bytes" - assert s == "b'xyz'" - tp, s = py3.load(p, "True") - assert tp == "str" - assert s == "'xyz'" - p = py3.dump("'xyz'") - tp, s = py2.load(p, True) - assert tp == "str" - assert s == "'xyz'" - -def test_unicode(py2, py3): - p = py2.dump("u'hi'") - tp, s = py2.load(p) - assert tp == "unicode" - assert s == "u'hi'" - tp, s = py3.load(p) - assert tp == "str" - assert s == "'hi'" - p = py3.dump("'hi'") - tp, s = py3.load(p) - assert tp == "str" - assert s == "'hi'" - tp, s = py2.load(p) - assert tp == "unicode" - assert s == "u'hi'" diff --git a/testing/execnet/test_xspec.py b/testing/execnet/test_xspec.py deleted file mode 100644 index 5ccf16dd720..00000000000 --- a/testing/execnet/test_xspec.py +++ /dev/null @@ -1,151 +0,0 @@ -import py - -XSpec = py.execnet.XSpec - -class TestXSpec: - def test_norm_attributes(self): - spec = XSpec("socket=192.168.102.2:8888//python=c:/this/python2.5//chdir=d:\hello") - assert spec.socket == "192.168.102.2:8888" - assert spec.python == "c:/this/python2.5" - assert spec.chdir == "d:\hello" - assert spec.nice is None - assert not hasattr(spec, '_xyz') - - py.test.raises(AttributeError, "spec._hello") - - spec = XSpec("socket=192.168.102.2:8888//python=python2.5//nice=3") - assert spec.socket == "192.168.102.2:8888" - assert spec.python == "python2.5" - assert spec.chdir is None - assert spec.nice == "3" - - spec = XSpec("ssh=user@host//chdir=/hello/this//python=/usr/bin/python2.5") - assert spec.ssh == "user@host" - assert spec.python == "/usr/bin/python2.5" - assert spec.chdir == "/hello/this" - - spec = XSpec("popen") - assert spec.popen == True - - def test__samefilesystem(self): - assert XSpec("popen")._samefilesystem() - assert XSpec("popen//python=123")._samefilesystem() - assert not XSpec("popen//chdir=hello")._samefilesystem() - - def test__spec_spec(self): - for x in ("popen", "popen//python=this"): - assert XSpec(x)._spec == x - - def test_samekeyword_twice_raises(self): - py.test.raises(ValueError, "XSpec('popen//popen')") - py.test.raises(ValueError, "XSpec('popen//popen=123')") - - def test_unknown_keys_allowed(self): - xspec = XSpec("hello=3") - assert xspec.hello == '3' - - def test_repr_and_string(self): - for x in ("popen", "popen//python=this"): - assert repr(XSpec(x)).find("popen") != -1 - assert str(XSpec(x)) == x - - def test_hash_equality(self): - assert XSpec("popen") == XSpec("popen") - assert hash(XSpec("popen")) == hash(XSpec("popen")) - assert XSpec("popen//python=123") != XSpec("popen") - assert hash(XSpec("socket=hello:8080")) != hash(XSpec("popen")) - -class TestMakegateway: - def test_no_type(self): - py.test.raises(ValueError, "py.execnet.makegateway('hello')") - - def test_popen(self): - gw = py.execnet.makegateway("popen") - assert gw.spec.python == None - rinfo = gw._rinfo() - assert rinfo.executable == py.std.sys.executable - assert rinfo.cwd == py.std.os.getcwd() - assert rinfo.version_info == py.std.sys.version_info - - def test_popen_nice(self): - gw = py.execnet.makegateway("popen//nice=5") - remotenice = gw.remote_exec(""" - import os - if hasattr(os, 'nice'): - channel.send(os.nice(0)) - else: - channel.send(None) - """).receive() - if remotenice is not None: - assert remotenice == 5 - - def test_popen_explicit(self): - gw = py.execnet.makegateway("popen//python=%s" % py.std.sys.executable) - assert gw.spec.python == py.std.sys.executable - rinfo = gw._rinfo() - assert rinfo.executable == py.std.sys.executable - assert rinfo.cwd == py.std.os.getcwd() - assert rinfo.version_info == py.std.sys.version_info - - def test_popen_cpython25(self): - for trypath in ('python2.5', r'C:\Python25\python.exe'): - cpython25 = py.path.local.sysfind(trypath) - if cpython25 is not None: - cpython25 = cpython25.realpath() - break - else: - py.test.skip("cpython2.5 not found") - gw = py.execnet.makegateway("popen//python=%s" % cpython25) - rinfo = gw._rinfo() - if py.std.sys.platform != "darwin": # it's confusing there - assert rinfo.executable == cpython25 - assert rinfo.cwd == py.std.os.getcwd() - assert rinfo.version_info[:2] == (2,5) - - def test_popen_cpython26(self): - for trypath in ('python2.6', r'C:\Python26\python.exe'): - cpython26 = py.path.local.sysfind(trypath) - if cpython26 is not None: - break - else: - py.test.skip("cpython2.6 not found") - gw = py.execnet.makegateway("popen//python=%s" % cpython26) - rinfo = gw._rinfo() - assert rinfo.executable == cpython26 - assert rinfo.cwd == py.std.os.getcwd() - assert rinfo.version_info[:2] == (2,6) - - def test_popen_chdir_absolute(self, testdir): - gw = py.execnet.makegateway("popen//chdir=%s" % testdir.tmpdir) - rinfo = gw._rinfo() - assert rinfo.cwd == str(testdir.tmpdir.realpath()) - - def test_popen_chdir_newsub(self, testdir): - testdir.chdir() - gw = py.execnet.makegateway("popen//chdir=hello") - rinfo = gw._rinfo() - assert rinfo.cwd == str(testdir.tmpdir.join("hello").realpath()) - - def test_ssh(self, specssh): - sshhost = specssh.ssh - gw = py.execnet.makegateway("ssh=%s" % sshhost) - rinfo = gw._rinfo() - gw2 = py.execnet.SshGateway(sshhost) - rinfo2 = gw2._rinfo() - assert rinfo.executable == rinfo2.executable - assert rinfo.cwd == rinfo2.cwd - assert rinfo.version_info == rinfo2.version_info - - def test_socket(self, specsocket): - gw = py.execnet.makegateway("socket=%s" % specsocket.socket) - rinfo = gw._rinfo() - assert rinfo.executable - assert rinfo.cwd - assert rinfo.version_info - # we cannot instantiate a second gateway - - #gw2 = py.execnet.SocketGateway(*specsocket.socket.split(":")) - #rinfo2 = gw2._rinfo() - #assert rinfo.executable == rinfo2.executable - #assert rinfo.cwd == rinfo2.cwd - #assert rinfo.version_info == rinfo2.version_info diff --git a/testing/pytest/dist/test_dsession.py b/testing/pytest/dist/test_dsession.py index f4489e4e4a6..1a0078a6712 100644 --- a/testing/pytest/dist/test_dsession.py +++ b/testing/pytest/dist/test_dsession.py @@ -1,8 +1,9 @@ from py.__.test.dist.dsession import DSession from py.__.test import outcome import py +import execnet -XSpec = py.execnet.XSpec +XSpec = execnet.XSpec def run(item, node, excinfo=None): runner = item.config.pluginmanager.getplugin("runner") diff --git a/testing/pytest/dist/test_gwmanage.py b/testing/pytest/dist/test_gwmanage.py index b64bb09ff12..45224931749 100644 --- a/testing/pytest/dist/test_gwmanage.py +++ b/testing/pytest/dist/test_gwmanage.py @@ -9,6 +9,7 @@ import os from py.__.test.dist.gwmanage import GatewayManager, HostRSync from py.__.test.plugin import hookspec +import execnet def pytest_funcarg__hookrecorder(request): _pytest = request.getfuncargvalue('_pytest') @@ -35,7 +36,7 @@ def test_popen_makegateway_events(self, hook, hookrecorder, _pytest): hm = GatewayManager(["popen"] * 2, hook) hm.makegateways() call = hookrecorder.popcall("pytest_gwmanage_newgateway") - assert call.gateway.spec == py.execnet.XSpec("popen") + assert call.gateway.spec == execnet.XSpec("popen") assert call.gateway.id == "[1]" assert call.platinfo.executable == call.gateway._rinfo().executable call = hookrecorder.popcall("pytest_gwmanage_newgateway") @@ -149,7 +150,7 @@ def test_hrsync_filter(self, mysetup): def test_hrsync_one_host(self, mysetup): source, dest = mysetup.source, mysetup.dest - gw = py.execnet.makegateway("popen//chdir=%s" % dest) + gw = execnet.makegateway("popen//chdir=%s" % dest) finished = [] rsync = HostRSync(source) rsync.add_target_host(gw, finished=lambda: finished.append(1)) diff --git a/testing/pytest/dist/test_mypickle.py b/testing/pytest/dist/test_mypickle.py index 863795cec46..623dfc91477 100644 --- a/testing/pytest/dist/test_mypickle.py +++ b/testing/pytest/dist/test_mypickle.py @@ -1,6 +1,7 @@ import py import sys +import execnet Queue = py.builtin._tryimport('queue', 'Queue').Queue @@ -117,7 +118,7 @@ def test_self_memoize(): TESTTIMEOUT = 2.0 class TestPickleChannelFunctional: def setup_class(cls): - cls.gw = py.execnet.PopenGateway() + cls.gw = execnet.PopenGateway() cls.gw.remote_init_threads(5) def test_popen_send_instance(self): diff --git a/testing/pytest/dist/test_txnode.py b/testing/pytest/dist/test_txnode.py index 9a03766cfd9..23935b2eea3 100644 --- a/testing/pytest/dist/test_txnode.py +++ b/testing/pytest/dist/test_txnode.py @@ -1,5 +1,6 @@ import py +import execnet from py.__.test.dist.txnode import TXNode queue = py.builtin._tryimport("queue", "Queue") Queue = queue.Queue @@ -46,8 +47,8 @@ def makenode(self, config=None): config = py.test.config._reparse([]) self.config = config self.queue = Queue() - self.xspec = py.execnet.XSpec("popen") - self.gateway = py.execnet.makegateway(self.xspec) + self.xspec = execnet.XSpec("popen") + self.gateway = execnet.makegateway(self.xspec) self.id += 1 self.gateway.id = str(self.id) self.node = TXNode(self.gateway, self.config, putevent=self.queue.put) diff --git a/testing/pytest/plugin/test_pytest_execnetcleanup.py b/testing/pytest/plugin/test_pytest_execnetcleanup.py deleted file mode 100644 index b9d001216e4..00000000000 --- a/testing/pytest/plugin/test_pytest_execnetcleanup.py +++ /dev/null @@ -1,12 +0,0 @@ -def test_execnetplugin(testdir): - reprec = testdir.inline_runsource(""" - import py - import sys - def test_hello(): - sys._gw = py.execnet.PopenGateway() - def test_world(): - assert hasattr(sys, '_gw') - assert sys._gw not in sys._gw._cleanup._activegateways - - """, "-s", "--debug") - reprec.assertoutcome(passed=2) diff --git a/testing/pytest/plugin/test_pytest_terminal.py b/testing/pytest/plugin/test_pytest_terminal.py index b9ccae34c1f..c2e09bb37d3 100644 --- a/testing/pytest/plugin/test_pytest_terminal.py +++ b/testing/pytest/plugin/test_pytest_terminal.py @@ -105,6 +105,7 @@ def test_internalerror(self, testdir, linecomp): ]) def test_gwmanage_events(self, testdir, linecomp): + execnet = py.test.importorskip("execnet") modcol = testdir.getmodulecol(""" def test_one(): pass @@ -113,10 +114,10 @@ def test_one(): rep = TerminalReporter(modcol.config, file=linecomp.stringio) class gw1: id = "X1" - spec = py.execnet.XSpec("popen") + spec = execnet.XSpec("popen") class gw2: id = "X2" - spec = py.execnet.XSpec("popen") + spec = execnet.XSpec("popen") class rinfo: version_info = (2, 5, 1, 'final', 0) executable = "hello" diff --git a/testing/pytest/test_pickling.py b/testing/pytest/test_pickling.py index 0096b1f09da..0bffcbcf8e1 100644 --- a/testing/pytest/test_pickling.py +++ b/testing/pytest/test_pickling.py @@ -182,8 +182,9 @@ def test_config_and_collector_pickling(self, testdir): old.chdir() def test_config__setstate__wired_correctly_in_childprocess(testdir): + execnet = py.test.importorskip("execnet") from py.__.test.dist.mypickle import PickleChannel - gw = py.execnet.PopenGateway() + gw = execnet.PopenGateway() channel = gw.remote_exec(""" import py from py.__.test.dist.mypickle import PickleChannel