Skip to content

Commit

Permalink
improve process cleanup on Windows
Browse files Browse the repository at this point in the history
* use system taskkill call to shutdown process trees on Windows
* use apps.launchers in tests, to make use of the fix

Also remove unnecessary and sometimes wrong assert in test_asyncresult

As of this commit, all parallel tests pass on Windows
  • Loading branch information
minrk committed Apr 20, 2011
1 parent c005f92 commit 1a93dba
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 39 deletions.
38 changes: 20 additions & 18 deletions IPython/parallel/apps/launcher.py
Expand Up @@ -21,13 +21,21 @@
import re
import stat

# signal imports, handling various platforms, versions

from signal import SIGINT, SIGTERM
try:
from signal import SIGKILL
except ImportError:
# windows
# Windows
SIGKILL=SIGTERM

try:
# Windows >= 2.7, 3.2
from signal import CTRL_C_EVENT as SIGINT
except ImportError:
pass

from subprocess import Popen, PIPE, STDOUT
try:
from subprocess import check_output
Expand All @@ -51,24 +59,10 @@ def check_output(*args, **kwargs):

from .win32support import forward_read_events

# load winhpcjob only on Windows
try:
from .winhpcjob import (
IPControllerTask, IPEngineTask,
IPControllerJob, IPEngineSetJob
)
except ImportError:
pass
from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob

WINDOWS = os.name == 'nt'

if WINDOWS:
try:
# >= 2.7, 3.2
from signal import CTRL_C_EVENT as SIGINT
except ImportError:
pass

#-----------------------------------------------------------------------------
# Paths to the kernel apps
#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -282,11 +276,19 @@ def stop(self):

def signal(self, sig):
if self.state == 'running':
self.process.send_signal(sig)
if WINDOWS and sig != SIGINT:
# use Windows tree-kill for better child cleanup
check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
else:
self.process.send_signal(sig)

def interrupt_then_kill(self, delay=2.0):
"""Send INT, wait a delay and then send KILL."""
self.signal(SIGINT)
try:
self.signal(SIGINT)
except Exception:
self.log.debug("interrupt failed")
pass
self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
self.killer.start()

Expand Down
2 changes: 0 additions & 2 deletions IPython/parallel/apps/winhpcjob.py
Expand Up @@ -16,8 +16,6 @@
# Imports
#-----------------------------------------------------------------------------

from __future__ import with_statement

import os
import re
import uuid
Expand Down
57 changes: 42 additions & 15 deletions IPython/parallel/tests/__init__.py
Expand Up @@ -14,21 +14,46 @@
import os
import tempfile
import time
from subprocess import Popen, PIPE, STDOUT
from subprocess import Popen

from IPython.utils.path import get_ipython_dir
from IPython.parallel import Client
from IPython.parallel.apps.launcher import (LocalProcessLauncher,
ipengine_cmd_argv,
ipcontroller_cmd_argv,
SIGKILL)

processes = []
blackhole = tempfile.TemporaryFile()
# globals
launchers = []
blackhole = open(os.devnull, 'w')

# Launcher class
class TestProcessLauncher(LocalProcessLauncher):
"""subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
def start(self):
if self.state == 'before':
self.process = Popen(self.args,
stdout=blackhole, stderr=blackhole,
env=os.environ,
cwd=self.work_dir
)
self.notify_start(self.process.pid)
self.poll = self.process.poll
else:
s = 'The process was already started and has state: %r' % self.state
raise ProcessStateError(s)

# nose setup/teardown

def setup():
cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
processes.append(cp)
engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
cp = TestProcessLauncher()
cp.cmd_and_args = ipcontroller_cmd_argv + \
['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
cp.start()
launchers.append(cp)
cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
tic = time.time()
while not os.path.exists(engine_json) or not os.path.exists(client_json):
if cp.poll() is not None:
Expand All @@ -44,9 +69,10 @@ def add_engines(n=1, profile='iptest'):
base = len(rc)
eps = []
for i in range(n):
ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
# ep.start()
processes.append(ep)
ep = TestProcessLauncher()
ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
ep.start()
launchers.append(ep)
eps.append(ep)
tic = time.time()
while len(rc) < base+n:
Expand All @@ -61,20 +87,21 @@ def add_engines(n=1, profile='iptest'):

def teardown():
time.sleep(1)
while processes:
p = processes.pop()
while launchers:
p = launchers.pop()
if p.poll() is None:
try:
p.terminate()
p.stop()
except Exception, e:
print e
pass
if p.poll() is None:
time.sleep(.25)
if p.poll() is None:
try:
print 'killing'
p.kill()
print 'cleaning up test process...'
p.signal(SIGKILL)
except:
print "couldn't shutdown process: ", p
blackhole.close()

7 changes: 4 additions & 3 deletions IPython/parallel/tests/clienttest.py
Expand Up @@ -20,7 +20,8 @@

from IPython.parallel import error
from IPython.parallel import Client
from IPython.parallel.tests import processes,add_engines

from IPython.parallel.tests import launchers, add_engines

# simple tasks for use in apply tests

Expand Down Expand Up @@ -112,8 +113,8 @@ def setUp(self):
def tearDown(self):
# self.client.clear(block=True)
# close fds:
for e in filter(lambda e: e.poll() is not None, processes):
processes.remove(e)
for e in filter(lambda e: e.poll() is not None, launchers):
launchers.remove(e)

# allow flushing of incoming messages to prevent crash on socket close
self.client.wait(timeout=2)
Expand Down
1 change: 0 additions & 1 deletion IPython/parallel/tests/test_asyncresult.py
Expand Up @@ -38,7 +38,6 @@ def test_single_result(self):

def test_get_after_done(self):
ar = self.client[-1].apply_async(lambda : 42)
self.assertFalse(ar.ready())
ar.wait()
self.assertTrue(ar.ready())
self.assertEquals(ar.get(), 42)
Expand Down

0 comments on commit 1a93dba

Please sign in to comment.