Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Various Windows-related fixes to IPython.parallel #374

Merged
merged 11 commits into from

3 participants

@minrk
Owner

This is the a pull request containing fixes that, when merged should hopefully close #365.

Issues fixed:

  • multiprocessing does not propagate Config objects properly, so they are coerced to dict and back for Scheduler launch
  • on Windows, select cannot poll non-socket FDs, so subprocess IO is forwarded over zmq sockets in ipcluster
  • SIGINT caused a crash when shutting down subprocesses in ipcluster, so it is replaced with CTRL_C_EVENT on Windows
  • allow win32api to be missing on import of IPython.parallel.apps.launcher

The removal of the module-specific default serialization selection in StreamSession is not Windows specific, but is done for simplicity. While the stdlib json in 2.6 is extremely slow compared to jsonlib, the chance of having defaults differ between machines is too annoying to keep this behavior.

@fperez
Owner

Any feedback on the list about this?

It passes the test suite (modulo a failing test that's also in master) for me, but I only tested on linux :) Unfortunately I can't test for a few days on windows, with all the moving coming up...

@minrk
Owner

Real usage seems to be working fine, but several tests don't pass on Windows, so I need to look into those.

@fperez
Owner
@minrk
Owner

All tests now pass on Windows. I believe there are still some signaling / shutdown / process cleanup issues to work out, but it's very close at least.

@minrk
Owner

Now have 100% tests passing on my Windows VMs, and I can't seem to make processes hang around longer than they should, so I think this is ready for review/merge.

@ellisonbg
Owner

Min, have you tested this branch on a non-Windows system. I have often found that fixing Windows bugs, leads to non-Windows bugs. Other than that, I think this is fine.

minrk added some commits
@minrk minrk always default to json packer in streamsession b491840
@minrk minrk handle potentially absent win32api in launcher.py b5a7d66
@minrk minrk forward subprocess IO over zmq on Windows
This is required on Windows, because select can only poll
sockets, unlike [all] other platforms
3e8626a
@minrk minrk interrupt windows subprocesses with CTRL-C instead of SIGINT
CTRL_C_EVENT is new in 2.7
22552e5
@minrk minrk pass config obj to Scheduler as dict
This is another Windows fix.  Windows doesn't properly
preserve the config object across the multiprocessing spawn.
4312fde
@minrk minrk remove @default_block from client, in favor of clearer implementation
also remove dead _execute/_push/_pull methods from client after transition to View as primary API provider
b191e9a
@minrk minrk Start each test with clear engine namespaces
closes gh-386
0d3d807
@minrk minrk use crash that will also kill a Windows engine
crash is from the Python test suite

Windows crash dialog is suppressed prior to crash

also fixed 'assertRaisesRemote' message, which had names backwards
151aca8
@minrk minrk move check for any engines into _build_targets
This prevents load-balancing apply submissions from raising NoEnginesRegistered
c005f92
@minrk minrk improve process cleanup on Windows
* use system taskkill call to shutdown process trees on Windows
* use apps.launchers in tests, to make use of the fix

Also remove unnecessary and sometimes wrong assert in test_asyncresult

As of this commit, all parallel tests pass on Windows
1a93dba
@minrk
Owner

I have indeed. The one change I haven't made is making IPython.parallel depend on pyzmq-2.1.5 on Windows, which it does require, due to fixes in zmq.poll and zmqstream.flush.

However, we haven't released pyzmq-2.1.5, so I don't know what exactly I should do there. Should I add the dependency, noting that the code has not been released and really means 2.1dev? Also, should I make the dependency consistent, and bump the dep to 2.1.5 across the board, or make it different for Windows and non-Windows?

@ellisonbg
Owner

There was a problem with zmq 2.1.5, so we should wait on releasing 2.1.5 as well. At some level, we simply need to wait for zmq 2.1.5 and until that comes out and we can release 2.1.5, we should probably put a dep of 2.1dev. Not sure of a better way of handling it. Obviously, simple messages/docs informing the user of the dep is going to be better than a format dependency in code.

@minrk minrk depend on pyzmq-2.1dev on Windows
This is a temporary dependency, and should be changed to the next release of pyzmq when it arrives.
877366d
@minrk minrk merged commit 877366d into from
@damianavila damianavila referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@minrk minrk referenced this pull request
Merged

DB fixes and Scheduler HWM #389

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 20, 2011
  1. @minrk
  2. @minrk
  3. @minrk

    forward subprocess IO over zmq on Windows

    minrk authored
    This is required on Windows, because select can only poll
    sockets, unlike [all] other platforms
  4. @minrk

    interrupt windows subprocesses with CTRL-C instead of SIGINT

    minrk authored
    CTRL_C_EVENT is new in 2.7
  5. @minrk

    pass config obj to Scheduler as dict

    minrk authored
    This is another Windows fix.  Windows doesn't properly
    preserve the config object across the multiprocessing spawn.
  6. @minrk

    remove @default_block from client, in favor of clearer implementation

    minrk authored
    also remove dead _execute/_push/_pull methods from client after transition to View as primary API provider
  7. @minrk
  8. @minrk

    use crash that will also kill a Windows engine

    minrk authored
    crash is from the Python test suite
    
    Windows crash dialog is suppressed prior to crash
    
    also fixed 'assertRaisesRemote' message, which had names backwards
  9. @minrk

    move check for any engines into _build_targets

    minrk authored
    This prevents load-balancing apply submissions from raising NoEnginesRegistered
  10. @minrk

    improve process cleanup on Windows

    minrk authored
    * use system taskkill call to shutdown process trees on Windows
    * use apps.launchers in tests, to make use of the fix
    
    Also remove unnecessary and sometimes wrong assert in test_asyncresult
    
    As of this commit, all parallel tests pass on Windows
Commits on Apr 21, 2011
  1. @minrk

    depend on pyzmq-2.1dev on Windows

    minrk authored
    This is a temporary dependency, and should be changed to the next release of pyzmq when it arrives.
This page is out of date. Refresh to see the latest.
View
8 IPython/parallel/__init__.py
@@ -10,9 +10,15 @@
# Imports
#-----------------------------------------------------------------------------
+import os
import zmq
-if zmq.__version__ < '2.1.4':
+
+if os.name == 'nt':
+ if zmq.__version__ < '2.1dev':
+ raise ImportError("On Windows, IPython.parallel depends on bugfixes only "
+ "in current git master of pyzmq, you appear to have %s"%zmq.__version__)
+elif zmq.__version__ < '2.1.4':
raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.4, you appear to have %s"%zmq.__version__)
from IPython.utils.pickleutil import Reference
View
63 IPython/parallel/apps/launcher.py
@@ -21,12 +21,21 @@
import re
import stat
+# signal imports, handling various platforms, versions
+
from signal import SIGINT, SIGTERM
try:
from signal import SIGKILL
except ImportError:
+ # Windows
SIGKILL=SIGTERM
+try:
+ # Windows >= 2.7, 3.2
+ from signal import CTRL_C_EVENT as SIGINT
+except ImportError:
+ pass
+
from subprocess import Popen, PIPE, STDOUT
try:
from subprocess import check_output
@@ -48,15 +57,11 @@ def check_output(*args, **kwargs):
from IPython.parallel.factory import LoggingFactory
-# load winhpcjob only on Windows
-try:
- from .winhpcjob import (
- IPControllerTask, IPEngineTask,
- IPControllerJob, IPEngineSetJob
- )
-except ImportError:
- pass
+from .win32support import forward_read_events
+
+from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
+WINDOWS = os.name == 'nt'
#-----------------------------------------------------------------------------
# Paths to the kernel apps
@@ -251,9 +256,14 @@ def start(self):
env=os.environ,
cwd=self.work_dir
)
-
- self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
- self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
+ if WINDOWS:
+ self.stdout = forward_read_events(self.process.stdout)
+ self.stderr = forward_read_events(self.process.stderr)
+ else:
+ self.stdout = self.process.stdout.fileno()
+ self.stderr = self.process.stderr.fileno()
+ self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
+ self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
self.poller.start()
self.notify_start(self.process.pid)
@@ -266,18 +276,29 @@ def stop(self):
def signal(self, sig):
if self.state == 'running':
- self.process.send_signal(sig)
+ if WINDOWS and sig != SIGINT:
+ # use Windows tree-kill for better child cleanup
+ check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
+ else:
+ self.process.send_signal(sig)
def interrupt_then_kill(self, delay=2.0):
"""Send INT, wait a delay and then send KILL."""
- self.signal(SIGINT)
+ try:
+ self.signal(SIGINT)
+ except Exception:
+ self.log.debug("interrupt failed")
+ pass
self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
self.killer.start()
# callbacks, etc:
def handle_stdout(self, fd, events):
- line = self.process.stdout.readline()
+ if WINDOWS:
+ line = self.stdout.recv()
+ else:
+ line = self.process.stdout.readline()
# a stopped process will be readable but return empty strings
if line:
self.log.info(line[:-1])
@@ -285,7 +306,10 @@ def handle_stdout(self, fd, events):
self.poll()
def handle_stderr(self, fd, events):
- line = self.process.stderr.readline()
+ if WINDOWS:
+ line = self.stderr.recv()
+ else:
+ line = self.process.stderr.readline()
# a stopped process will be readable but return empty strings
if line:
self.log.error(line[:-1])
@@ -296,8 +320,8 @@ def poll(self):
status = self.process.poll()
if status is not None:
self.poller.stop()
- self.loop.remove_handler(self.process.stdout.fileno())
- self.loop.remove_handler(self.process.stderr.fileno())
+ self.loop.remove_handler(self.stdout)
+ self.loop.remove_handler(self.stderr)
self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
@@ -588,10 +612,11 @@ def start(self, n, cluster_dir):
# This is only used on Windows.
def find_job_cmd():
- if os.name=='nt':
+ if WINDOWS:
try:
return find_cmd('job')
- except FindCmdError:
+ except (FindCmdError, ImportError):
+ # ImportError will be raised if win32api is not installed
return 'job'
else:
return 'job'
View
67 IPython/parallel/apps/win32support.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+"""Utility for forwarding file read events over a zmq socket.
+
+This is necessary because select on Windows only supports"""
+
+#-----------------------------------------------------------------------------
+# Copyright (C) 2011 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+import uuid
+import zmq
+
+from threading import Thread
+
+#-----------------------------------------------------------------------------
+# Code
+#-----------------------------------------------------------------------------
+
+class ForwarderThread(Thread):
+ def __init__(self, sock, fd):
+ Thread.__init__(self)
+ self.daemon=True
+ self.sock = sock
+ self.fd = fd
+
+ def run(self):
+ """loop through lines in self.fd, and send them over self.sock"""
+ line = self.fd.readline()
+ # allow for files opened in unicode mode
+ if isinstance(line, unicode):
+ send = self.sock.send_unicode
+ else:
+ send = self.sock.send
+ while line:
+ send(line)
+ line = self.fd.readline()
+ # line == '' means EOF
+ self.fd.close()
+ self.sock.close()
+
+def forward_read_events(fd, context=None):
+ """forward read events from an FD over a socket.
+
+ This method wraps a file in a socket pair, so it can
+ be polled for read events by select (specifically zmq.eventloop.ioloop)
+ """
+ if context is None:
+ context = zmq.Context.instance()
+ push = context.socket(zmq.PUSH)
+ push.setsockopt(zmq.LINGER, -1)
+ pull = context.socket(zmq.PULL)
+ addr='inproc://%s'%uuid.uuid4()
+ push.bind(addr)
+ pull.connect(addr)
+ forwarder = ForwarderThread(push, fd)
+ forwarder.start()
+ return pull
+
+
+__all__ = ['forward_read_events']
View
2  IPython/parallel/apps/winhpcjob.py
@@ -16,8 +16,6 @@
# Imports
#-----------------------------------------------------------------------------
-from __future__ import with_statement
-
import os
import re
import uuid
View
88 IPython/parallel/client/client.py
@@ -47,19 +47,6 @@ def spin_first(f, self, *args, **kwargs):
self.spin()
return f(self, *args, **kwargs)
-@decorator
-def default_block(f, self, *args, **kwargs):
- """Default to self.block; preserve self.block."""
- block = kwargs.get('block',None)
- block = self.block if block is None else block
- saveblock = self.block
- self.block = block
- try:
- ret = f(self, *args, **kwargs)
- finally:
- self.block = saveblock
- return ret
-
#--------------------------------------------------------------------------
# Classes
@@ -377,6 +364,11 @@ def _build_targets(self, targets):
"""Turn valid target IDs or 'all' into two lists:
(int_ids, uuids).
"""
+ if not self._ids:
+ # flush notification socket if no engines yet, just in case
+ if not self.ids:
+ raise error.NoEnginesRegistered("Can't build targets without any engines")
+
if targets is None:
targets = self._ids
elif isinstance(targets, str):
@@ -387,7 +379,7 @@ def _build_targets(self, targets):
elif isinstance(targets, int):
if targets < 0:
targets = self.ids[targets]
- if targets not in self.ids:
+ if targets not in self._ids:
raise IndexError("No such engine: %i"%targets)
targets = [targets]
@@ -788,14 +780,14 @@ def wait(self, jobs=None, timeout=-1):
#--------------------------------------------------------------------------
@spin_first
- @default_block
def clear(self, targets=None, block=None):
"""Clear the namespace in target(s)."""
+ block = self.block if block is None else block
targets = self._build_targets(targets)[0]
for t in targets:
self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
error = False
- if self.block:
+ if block:
self._flush_ignored_control()
for i in range(len(targets)):
idents,msg = self.session.recv(self._control_socket,0)
@@ -810,7 +802,6 @@ def clear(self, targets=None, block=None):
@spin_first
- @default_block
def abort(self, jobs=None, targets=None, block=None):
"""Abort specific jobs from the execution queues of target(s).
@@ -825,6 +816,7 @@ def abort(self, jobs=None, targets=None, block=None):
"""
+ block = self.block if block is None else block
targets = self._build_targets(targets)[0]
msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
@@ -842,7 +834,7 @@ def abort(self, jobs=None, targets=None, block=None):
self.session.send(self._control_socket, 'abort_request',
content=content, ident=t)
error = False
- if self.block:
+ if block:
self._flush_ignored_control()
for i in range(len(targets)):
idents,msg = self.session.recv(self._control_socket,0)
@@ -856,9 +848,9 @@ def abort(self, jobs=None, targets=None, block=None):
raise error
@spin_first
- @default_block
def shutdown(self, targets=None, restart=False, hub=False, block=None):
"""Terminates one or more engine processes, optionally including the hub."""
+ block = self.block if block is None else block
if hub:
targets = 'all'
targets = self._build_targets(targets)[0]
@@ -890,29 +882,9 @@ def shutdown(self, targets=None, restart=False, hub=False, block=None):
raise error
#--------------------------------------------------------------------------
- # Execution methods
+ # Execution related methods
#--------------------------------------------------------------------------
- @default_block
- def _execute(self, code, targets='all', block=None):
- """Executes `code` on `targets` in blocking or nonblocking manner.
-
- ``execute`` is always `bound` (affects engine namespace)
-
- Parameters
- ----------
-
- code : str
- the code string to be executed
- targets : int/str/list of ints/strs
- the engines on which to execute
- default : all
- block : bool
- whether or not to wait until done to return
- default: self.block
- """
- return self[targets].execute(code, block=block)
-
def _maybe_raise(self, result):
"""wrapper for maybe raising an exception if apply failed."""
if isinstance(result, error.RemoteError):
@@ -943,13 +915,6 @@ def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None,
if not isinstance(subheader, dict):
raise TypeError("subheader must be dict, not %s"%type(subheader))
- if not self._ids:
- # flush notification socket if no engines yet
- any_ids = self.ids
- if not any_ids:
- raise error.NoEnginesRegistered("Can't execute without any connected engines.")
- # enforce types of f,args,kwargs
-
bufs = util.pack_apply_message(f,args,kwargs)
msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
@@ -1008,38 +973,10 @@ def direct_view(self, targets='all'):
return DirectView(client=self, socket=self._mux_socket, targets=targets)
#--------------------------------------------------------------------------
- # Data movement (TO BE REMOVED)
- #--------------------------------------------------------------------------
-
- @default_block
- def _push(self, ns, targets='all', block=None, track=False):
- """Push the contents of `ns` into the namespace on `target`"""
- if not isinstance(ns, dict):
- raise TypeError("Must be a dict, not %s"%type(ns))
- result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
- if not block:
- return result
-
- @default_block
- def _pull(self, keys, targets='all', block=None):
- """Pull objects from `target`'s namespace by `keys`"""
- if isinstance(keys, basestring):
- pass
- elif isinstance(keys, (list,tuple,set)):
- for key in keys:
- if not isinstance(key, basestring):
- raise TypeError("keys must be str, not type %r"%type(key))
- else:
- raise TypeError("keys must be strs, not %r"%keys)
- result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
- return result
-
- #--------------------------------------------------------------------------
# Query methods
#--------------------------------------------------------------------------
@spin_first
- @default_block
def get_result(self, indices_or_msg_ids=None, block=None):
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
@@ -1077,6 +1014,7 @@ def get_result(self, indices_or_msg_ids=None, block=None):
A subclass of AsyncResult that retrieves results from the Hub
"""
+ block = self.block if block is None else block
if indices_or_msg_ids is None:
indices_or_msg_ids = -1
View
6 IPython/parallel/controller/controller.py
@@ -109,8 +109,10 @@ def construct_schedulers(self):
else:
self.log.info("task::using Python %s Task scheduler"%self.scheme)
- sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
- kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
+ sargs = (self.client_info['task'][1], self.engine_info['task'],
+ self.monitor_url, self.client_info['notification'])
+ kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
+ config=dict(self.config))
q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
View
6 IPython/parallel/controller/scheduler.py
@@ -34,6 +34,7 @@
# local imports
from IPython.external.decorator import decorator
+from IPython.config.loader import Config
from IPython.utils.traitlets import Instance, Dict, List, Set
from IPython.parallel import error
@@ -557,9 +558,12 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname=
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
+ if config:
+ # unwrap dict back into Config
+ config = Config(config)
+
ctx = zmq.Context()
loop = ioloop.IOLoop()
- print (in_addr, out_addr, mon_addr, not_addr)
ins = ZMQStream(ctx.socket(zmq.XREP),loop)
ins.setsockopt(zmq.IDENTITY, identity)
ins.bind(in_addr)
View
20 IPython/parallel/streamsession.py
@@ -27,18 +27,6 @@
from .util import ISO8601
-# packer priority: jsonlib[2], cPickle, simplejson/json, pickle
-json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
-if json_name in ('jsonlib', 'jsonlib2'):
- use_json = True
-elif json_name:
- if cPickle is None:
- use_json = True
- else:
- use_json = False
-else:
- use_json = False
-
def squash_unicode(obj):
if isinstance(obj,dict):
for key in obj.keys():
@@ -58,12 +46,8 @@ def squash_unicode(obj):
pickle_packer = lambda o: pickle.dumps(o,-1)
pickle_unpacker = pickle.loads
-if use_json:
- default_packer = json_packer
- default_unpacker = json_unpacker
-else:
- default_packer = pickle_packer
- default_unpacker = pickle_unpacker
+default_packer = json_packer
+default_unpacker = json_unpacker
DELIM="<IDS|MSG>"
View
57 IPython/parallel/tests/__init__.py
@@ -14,21 +14,46 @@
import os
import tempfile
import time
-from subprocess import Popen, PIPE, STDOUT
+from subprocess import Popen
from IPython.utils.path import get_ipython_dir
from IPython.parallel import Client
+from IPython.parallel.apps.launcher import (LocalProcessLauncher,
+ ipengine_cmd_argv,
+ ipcontroller_cmd_argv,
+ SIGKILL)
-processes = []
-blackhole = tempfile.TemporaryFile()
+# globals
+launchers = []
+blackhole = open(os.devnull, 'w')
+
+# Launcher class
+class TestProcessLauncher(LocalProcessLauncher):
+ """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
+ def start(self):
+ if self.state == 'before':
+ self.process = Popen(self.args,
+ stdout=blackhole, stderr=blackhole,
+ env=os.environ,
+ cwd=self.work_dir
+ )
+ self.notify_start(self.process.pid)
+ self.poll = self.process.poll
+ else:
+ s = 'The process was already started and has state: %r' % self.state
+ raise ProcessStateError(s)
# nose setup/teardown
def setup():
- cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
- processes.append(cp)
- engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
- client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
+ cp = TestProcessLauncher()
+ cp.cmd_and_args = ipcontroller_cmd_argv + \
+ ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
+ cp.start()
+ launchers.append(cp)
+ cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
+ engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
+ client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
tic = time.time()
while not os.path.exists(engine_json) or not os.path.exists(client_json):
if cp.poll() is not None:
@@ -44,9 +69,10 @@ def add_engines(n=1, profile='iptest'):
base = len(rc)
eps = []
for i in range(n):
- ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
- # ep.start()
- processes.append(ep)
+ ep = TestProcessLauncher()
+ ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
+ ep.start()
+ launchers.append(ep)
eps.append(ep)
tic = time.time()
while len(rc) < base+n:
@@ -61,11 +87,11 @@ def add_engines(n=1, profile='iptest'):
def teardown():
time.sleep(1)
- while processes:
- p = processes.pop()
+ while launchers:
+ p = launchers.pop()
if p.poll() is None:
try:
- p.terminate()
+ p.stop()
except Exception, e:
print e
pass
@@ -73,8 +99,9 @@ def teardown():
time.sleep(.25)
if p.poll() is None:
try:
- print 'killing'
- p.kill()
+ print 'cleaning up test process...'
+ p.signal(SIGKILL)
except:
print "couldn't shutdown process: ", p
+ blackhole.close()
View
22 IPython/parallel/tests/clienttest.py
@@ -20,7 +20,8 @@
from IPython.parallel import error
from IPython.parallel import Client
-from IPython.parallel.tests import processes,add_engines
+
+from IPython.parallel.tests import launchers, add_engines
# simple tasks for use in apply tests
@@ -29,6 +30,17 @@ def segfault():
import ctypes
ctypes.memset(-1,0,1)
+def crash():
+ """from stdlib crashers in the test suite"""
+ import types
+ if sys.platform.startswith('win'):
+ import ctypes
+ ctypes.windll.kernel32.SetErrorMode(0x0002);
+
+ co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
+ (), (), (), '', '', 1, b'')
+ exec(co)
+
def wait(n):
"""sleep for a time"""
import time
@@ -86,21 +98,23 @@ def assertRaisesRemote(self, etype, f, *args, **kwargs):
except error.CompositeError as e:
e.raise_exception()
except error.RemoteError as e:
- self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
+ self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
else:
self.fail("should have raised a RemoteError")
def setUp(self):
BaseZMQTestCase.setUp(self)
self.client = self.connect_client()
+ # start every test with clean engine namespaces:
+ self.client.clear(block=True)
self.base_engine_count=len(self.client.ids)
self.engines=[]
def tearDown(self):
# self.client.clear(block=True)
# close fds:
- for e in filter(lambda e: e.poll() is not None, processes):
- processes.remove(e)
+ for e in filter(lambda e: e.poll() is not None, launchers):
+ launchers.remove(e)
# allow flushing of incoming messages to prevent crash on socket close
self.client.wait(timeout=2)
View
1  IPython/parallel/tests/test_asyncresult.py
@@ -38,7 +38,6 @@ def test_single_result(self):
def test_get_after_done(self):
ar = self.client[-1].apply_async(lambda : 42)
- self.assertFalse(ar.ready())
ar.wait()
self.assertTrue(ar.ready())
self.assertEquals(ar.get(), 42)
View
4 IPython/parallel/tests/test_client.py
@@ -85,8 +85,8 @@ def test_clear(self):
v.push(dict(a=5))
v.pull('a')
id0 = self.client.ids[-1]
- self.client.clear(targets=id0)
- self.client[:-1].pull('a')
+ self.client.clear(targets=id0, block=True)
+ a = self.client[:-1].get('a')
self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
self.client.clear(block=True)
for i in self.client.ids:
View
21 IPython/parallel/tests/test_view.py
@@ -25,33 +25,37 @@
from IPython.parallel.tests import add_engines
-from .clienttest import ClusterTestCase, segfault, wait, skip_without
+from .clienttest import ClusterTestCase, crash, wait, skip_without
def setup():
add_engines(3)
class TestView(ClusterTestCase):
- def test_segfault_task(self):
+ def test_crash_task(self):
"""test graceful handling of engine death (balanced)"""
# self.add_engines(1)
- ar = self.client[-1].apply_async(segfault)
+ ar = self.client[-1].apply_async(crash)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
- while eid in self.client.ids:
+ tic = time.time()
+ while eid in self.client.ids and time.time()-tic < 5:
time.sleep(.01)
self.client.spin()
+ self.assertFalse(eid in self.client.ids, "Engine should have died")
- def test_segfault_mux(self):
+ def test_crash_mux(self):
"""test graceful handling of engine death (direct)"""
# self.add_engines(1)
eid = self.client.ids[-1]
- ar = self.client[eid].apply_async(segfault)
+ ar = self.client[eid].apply_async(crash)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
- while eid in self.client.ids:
+ tic = time.time()
+ while eid in self.client.ids and time.time()-tic < 5:
time.sleep(.01)
self.client.spin()
+ self.assertFalse(eid in self.client.ids, "Engine should have died")
def test_push_pull(self):
"""test pushing and pulling"""
@@ -298,7 +302,8 @@ def test_importer(self):
def findall(pat, s):
# this globals() step isn't necessary in real code
# only to prevent a closure in the test
- return globals()['re'].findall(pat, s)
+ re = globals()['re']
+ return re.findall(pat, s)
self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
View
5 IPython/testing/iptest.py
@@ -106,7 +106,10 @@ def test_for(mod, min_version=None):
have['pymongo'] = test_for('pymongo')
have['wx'] = test_for('wx')
have['wx.aui'] = test_for('wx.aui')
-have['zmq'] = test_for('zmq', '2.1.4')
+if os.name == 'nt':
+ have['zmq'] = test_for('zmq', '2.1dev')
+else:
+ have['zmq'] = test_for('zmq', '2.1.4')
have['qt'] = test_for('IPython.external.qt')
#-----------------------------------------------------------------------------
Something went wrong with that request. Please try again.