New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix sleeping processes #295
Conversation
@jonashen did you test this on your machine? |
Oops, good point. Sorry. |
@@ -27,7 +27,7 @@ dependencies: | |||
- ipdb | |||
- ipywidgets | |||
- jsonmerge | |||
- joblib==0.10.3 | |||
- joblib<0.13,>=0.12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(garage) garage (fix_sleeping_proc *) $ pip install joblib<0.13,>=0.12
bash: 0.13,: No such file or directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try pip install "joblib<0.13,>=0.12"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're specifying a version here? The installed version (0.12.2) is equivalent to if I simply called pip install joblib
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are being more careful with the major changes introduced in joblib, so we are restricting the updates of this library to only minor changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantic versioning scheme used by many open source libraries:
XX.YY.ZZ
XX changes -- no backwards compatibility. literally anything can happen. whole packages can disappear.
YY changes -- backwards compatible for the same value of YY. features may be added but not removed. most increments to YY will be small changes but they may sometimes be backwards-incompatible, especially for libraries <1.0.
ZZ changes -- bug fixes/maintenance within a release only. generally no new features.
@@ -84,6 +84,9 @@ def shutdown(self): | |||
if not Plotter.enable: | |||
return | |||
if self._process and self._process.is_alive(): | |||
while not self._queue.empty(): | |||
self._queue.get() | |||
self._queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traceback (most recent call last):
File "/Users/jonathon/Documents/garage/garage/garage/plotter/plotter.py", line 89, in shutdown
self._queue.task_done()
AttributeError: 'Queue' object has no attribute 'task_done'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use JoinableQueue instead of Queue. I think python 3.6 clean up the multiprocessing a little bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -2,9 +2,9 @@ | |||
import ast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(garage) garage (fix_sleeping_proc *) $ python scripts/run_experiment.py
2018-08-17 09:45:25.883375 PDT | tensorboard data will be logged into:/Users/jonathon/Documents/garage/garage/data/experiment_2018_08_17_09_45_25_880884_PDT_95e7c
Traceback (most recent call last):
File "scripts/run_experiment.py", line 201, in <module>
run_experiment(sys.argv)
File "scripts/run_experiment.py", line 187, in run_experiment
data = pickle.loads(base64.b64decode(args.args_data))
File "/anaconda2/envs/garage/lib/python3.6/base64.py", line 80, in b64decode
s = _bytes_from_decode_data(s)
File "/anaconda2/envs/garage/lib/python3.6/base64.py", line 46, in _bytes_from_decode_data
"string, not %r" % s.__class__.__name__) from None
TypeError: argument should be a bytes-like object or ASCII string, not 'NoneType'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You forgot an argument. run_experiment.py
is usually called by garage.misc.instrument.run_experiment
garage/tf/plotter/plotter.py
Outdated
@@ -98,6 +98,9 @@ def _start_worker(self): | |||
|
|||
def shutdown(self): | |||
if self.worker_thread.is_alive(): | |||
while not self.queue.empty(): | |||
self.queue.get() | |||
self.queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traceback (most recent call last):
File "examples/tf/trpo_cartpole.py", line 25, in <module>
algo.train()
File "/Users/jonathon/Documents/garage/garage/garage/tf/algos/batch_polopt.py", line 146, in train
self.shutdown_worker()
File "/Users/jonathon/Documents/garage/garage/garage/tf/algos/batch_polopt.py", line 101, in shutdown_worker
self.plotter.shutdown()
File "/Users/jonathon/Documents/garage/garage/garage/tf/plotter/plotter.py", line 103, in shutdown
self.queue.task_done()
File "/anaconda2/envs/garage/lib/python3.6/queue.py", line 68, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/anaconda2/envs/garage/lib/python3.6/queue.py", line 68, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this by moving some calls of task_done previously inserted to the right places. Right after the tasks are completed in the worker thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure how to test the sampler.
Maybe a good exercise is to think about how to test this bug, since it's so severe and recurring. You can use subprocess in Python to spawn a new process and send it signals (e.g. SIGINT). I think the standard library should also have the tools to crawl the spawned process tree and make sure everything terminates. You can test the sampler by
Seeing as this is also a script to reliably reproduce this bug on all platforms, I propose that we write it and then just wrap it in a unit test so this doesn't happen again. |
There is a nice tutorial here about using process groups to jail fork trees created by subprocess. It might be of use.
|
f0c384c
to
597ac9d
Compare
Please create a test fixture that can reproduce (and then prove fixed) this bug before we submit. |
scripts/run_experiment.py
Outdated
if args.seed is not None: | ||
set_seed(args.seed) | ||
|
||
sigint_hdlr = signal.getsignal(signal.SIGINT) | ||
|
||
def terminte_sampler(signum, frame): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
terminte --> terminate
scripts/run_experiment.py
Outdated
|
||
def terminte_sampler(signum, frame): | ||
parallel_sampler.terminate() | ||
parallel_sampler.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if parallel_sampler hands after terminate? can we join() with a timeout and take more drastic termination steps if it hangs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok SGTM. We can test this and see how robust it is before going to greater lengths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually calling join here is wrong since the signal handler is called by the Python main thread and join has an assert call to make sure the parent process is trying to join.
Also, I found some handling that I did for this in a previous fix here. I will add the join call there instead of the overriding the signal handler.
scripts/run_experiment.py
Outdated
|
||
signal.signal(signal.SIGINT, terminte_sampler) | ||
|
||
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this a context manager?
# SIGINT unblocked here
with mask_signal(signal.SIGINT):
# SIGINT blocked in here
# do uninterruptible stuff
# SIGINT unblocked again out here
scripts/run_experiment.py
Outdated
if args.seed is not None: | ||
set_seed(args.seed) | ||
|
||
sigint_hdlr = signal.getsignal(signal.SIGINT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include a large block comment explaining why this is necessary and how it works.
It seems to me that it might be more appropriate to implement this inside stateful_pool if possible.
Can you verify that this bug https://bugs.python.org/issue8296 doesn't affect Python 3.5+? Otherwise we may need to change stateful_pool to only use _async mappers. |
Regarding the bug above mentioned, I did an experiment with python 3.6.6 based on this example:
Without interrupting the execution, the outcome is:
Interrupting the execution:
Even though the child processes catch the KeyboardInterruption, they become joinable and the parent process exits without leaving any zombie processes hanging around. |
Alright. To merge this still need a test which reproduces the bug (before the change) and verifies it's gone after. |
I've been thinking in a way to perform the tests for this change. My algorithm would be the following:
To make sure we're trying this test in different execution points, we can run the above sequence for a certain number of iterations, but please let me know what could be improved. One of the things I'm not so sure about is how to detect that the child processes under the parallel sampler are running. How I'm currently doing this in my sandbox is reading stdout and catching the string "Populated", which is written once all the child processes finished their initialization. Please let me know if you have a better idea to do this. |
Some thoughts on this plan:
|
597ac9d
to
6717887
Compare
I added the test, but now I'm wondering about three things:
|
6717887
to
b17dde5
Compare
It seems my test is not behaving the same way as in my working area. As shown at the bottom of these logs, the tests hangs. |
2da09c0
to
f93c359
Compare
c97ef82
to
ad23f72
Compare
Does |
Okay, I have added the corresponding calls to shutdown the parallel sampler and plotters in run_experiment. However, when run_experiment runs under test_sigint_theano, a bug happens in some tests, where some sleeping processes remain and the corresponding user warning is produced. The output comes with a traceback that points to the fork of the process that is remaining:
It seems the problem is related to the use of psutil.wait_procs in both run_experiment and test_sigint_theano at the same time, but I need to look further into this. |
garage/plotter/plotter.py
Outdated
|
||
class Plotter: | ||
|
||
# Static variable used to disable the plotter | ||
enable = True | ||
|
||
def __init__(self): | ||
def __init__(self, standalone=False): | ||
__plotters__.append(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use a dunder (__foo__
). Those are reserved for Python internals. __plotters
should be fine.
garage/plotter/plotter.py
Outdated
@@ -21,13 +21,16 @@ class Op(Enum): | |||
|
|||
Message = namedtuple("Message", ["op", "args", "kwargs"]) | |||
|
|||
__plotters__ = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a class member of Plotter
rather than package-global.
See https://stackoverflow.com/a/12102666 for a useful pattern.
scripts/run_experiment.py
Outdated
@@ -186,5 +202,34 @@ def run_experiment(argv): | |||
logger.pop_prefix() | |||
|
|||
|
|||
def child_proc_shutdown(shutdown_sampler=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change the Plotter API to use close()
(or the sampler API to use shutdown()
) then this could be called as
def child_proc_shutdown(children):
run_exp_proc = psutil.Process()
alive = run_exp_proc.children(recursive=True)
for c in children:
c.shutdown()
# etc...
# example at the callsite
child_proc_shutdown(__plotters__ + [parallel_sampler])
scripts/travisci/check_flake8.sh
Outdated
@@ -2,7 +2,7 @@ | |||
|
|||
# Python packages considered local to the project, for the purposes of import | |||
# order checking. Comma-delimited. | |||
garage_packages="garage,sandbox,examples,tests" | |||
garage_packages="tests,garage,sandbox,examples,contrib" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
contrib was removed for a reason--it not longer exists
setup.cfg
Outdated
@@ -4,7 +4,7 @@ | |||
# style, this rule is ignored. | |||
ignore = W503 | |||
import-order-style = google | |||
application-import-names = sandbox,garage,examples,tests | |||
application-import-names = tests,sandbox,garage,examples,contrib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove contrib
tests/fixtures/theano/__init__.py
Outdated
@@ -0,0 +1,4 @@ | |||
from tests.fixtures.theano.batch_polopt_instrumented import \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PEP8 says to use () to split lines instead of \
tests/fixtures/theano/__init__.py
Outdated
@@ -0,0 +1,4 @@ | |||
from tests.fixtures.theano.batch_polopt_instrumented import \ | |||
InstrumentedBatchPolopt | |||
from tests.fixtures.theano.npo_instrumented import InstrumentedNPO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Package names should be instrumented_batchpolopt
, instrumented_npo
and instrumented_trpo
to match class names.
tests/sampler/test_sigint_theano.py
Outdated
@@ -0,0 +1,81 @@ | |||
from enum import Enum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put this file in tests/integration_tests
tests/sampler/test_sigint_theano.py
Outdated
@@ -0,0 +1,81 @@ | |||
from enum import Enum | |||
from multiprocessing.connection import Listener | |||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an equivalent for TensorFlow?
tests/sampler/test_sigint_theano.py
Outdated
class TestSigInt(unittest.TestCase): | ||
def test_sigint(self): | ||
"""Interrupt the experiment in different stages of its lifecyle.""" | ||
for stage in list(ExpLifecycle): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a parameterized test instead of for loop https://nose2.readthedocs.io/en/latest/params.html
tests/sampler/test_sigint_theano.py
Outdated
os.kill(child.pid, signal.SIGINT) | ||
|
||
if not clean_exit: | ||
raise AssertionError(colorize(error_msg, "red")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't raise exceptions in tests. Just use assert
.
Codecov Report
@@ Coverage Diff @@
## master #295 +/- ##
==========================================
- Coverage 61.41% 61.41% -0.01%
==========================================
Files 213 213
Lines 14312 14328 +16
==========================================
+ Hits 8790 8799 +9
- Misses 5522 5529 +7
Continue to review full report at Codecov.
|
The joblib package responsible of the MemmappingPool has been updated to consider any bugs that could produce the sleeping processes in the parallel sampler. Also the environment variable JOBLIB_START_METHOD has been removed since it's not implemented by joblib anymore. However, if run_experiment is interrupted during the optimization steps, the sleeping processes are still produced. To fix the problem, the child processes of the parallel sampler ignore SIGINT so they're not killed while holding a lock that is also acquired by the parent process, avoiding a dead lock. To make sure the child processes are terminated, the SIGINT handler in the parent process is overridden to call the terminate and join functions in the processes pool. The process (thread in TF) used in Plotter is terminated thanks to registering the method shutdown with function atexit, but one important step missing was to clean the Queue that interacts with worker process.
The class BatchPolopt has been overridden as BatchPoloptCallback to notify the test of the different stages in the experiment life cycle so it can be interrupted with SIGINT. The test makes sure that the children processes produce are zero after the SIGINT is sent, or it throws an assertion error with those processes that didn't die. Also the context manager MasksSignals has been created to handle the masking of SIGINT in parallel_sampler.
Also, some of codacy issues were solved, as well as some legacy pylint and flake8 issues.
All plotters are appended to a static list, so they can be easily reachable from run_experiment to call shutdown on them. Also, terminate was replace by close to shutdown the parallel sampler, since terminate calls join and may block the shutdown in run_experiment. In order to check that all processes died after a time out, a loop to poll for alive processes was implemented. A warning message for processes that remain after shutdown is printed so users of garage can reopen the corresponding issue.
Other changes include the file renaming of instrumented policy optimizers, as well as renaming the shutdown method in plotters to close in order to close all processes under run_experiment with the same method.
Otherwise, comparing two enumerations assigned to variables does not work.
d3adb1a
to
c3e41b9
Compare
Codecov Report
@@ Coverage Diff @@
## master #295 +/- ##
==========================================
- Coverage 61.32% 61.31% -0.01%
==========================================
Files 213 213
Lines 14316 14332 +16
==========================================
+ Hits 8779 8788 +9
- Misses 5537 5544 +7
Continue to review full report at Codecov.
|
A process known as semaphore tracker is spawned from the run_experiment process, but we cannot stop this process as it ignores SIGINT and SIGTERM and we haven't access to it. Therefore, it's removed from the list of children to wait for in both run_experiment and in test_sigint. Another process that is spawned from run_experiment is the Manager, which owns multiprocessing objects such as RLocks and counters used during the run_collect method in the StatefulPool class. This process was also making the warning message appear and the test fail. However, the manager has shutdown method to terminate the process, so we can verify the termination of the process.
NICE |
The joblib package responsible of the MemmappingPool has been updated to
consider any bugs that could produce the sleeping processes in the
parallel sampler. Also the environment variable JOBLIB_START_METHOD has
been removed since it's not implemented by joblib anymore.
However, if run_experiment is interrupted during the optimization steps,
the sleeping processes are still produced. To fix the problem, the child
processes of the parallel sampler ignore SIGINT so they're not killed
while holding a lock that is also acquired by the parent process,
avoiding a dead lock.
To make sure the child processes are terminated, the SIGINT handler in
the parent process is overridden to call the terminate and join
functions in the processes pool.
The process (thread in TF) used in Plotter is terminated thanks to
registering the method shutdown with function atexit, but one important
step missing was to clean the Queue that interacts with worker process.