New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error #3895

Merged
merged 15 commits into from Jan 5, 2018

Conversation

Projects
None yet
5 participants
@tomMoral
Copy link
Contributor

tomMoral commented Oct 5, 2017

When using concurrent.futures.ProcessPoolExecutor with objects that are not picklable or unpicklable, several situations results in a deadlock, with the interpreter freezed.

This is the case for different scenario, for instance these three scripts. This PR propose to test the different failure scenario and fix the ones that results on deadlocks.

Overall, the goal is to make concurrent.futures.ProcessPoolExecutor more robust to faulty user code.

This work was done as part of the tomMoral/loky#48 with the intent to re-use the executor in multiple independent part of the program, in collaboration with @ogrisel. See #1013 for more the details.

https://bugs.python.org/issue31699

@tomMoral tomMoral requested a review from rhettinger as a code owner Oct 5, 2017

@tomMoral tomMoral changed the title Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error Oct 5, 2017

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from 5865996 to 69d774f Oct 5, 2017

@pitrou

This comment has been minimized.

Copy link
Member

pitrou commented Oct 5, 2017

At a high level, I'm a bit surprised by the strategy adopted by this PR. Isn't it possible to keep a functioning executor in case an error occurs when (un)pickling? What does multiprocessing.Pool do (I don't remember)?

The crashing process problem is separate IMHO (and arguably less likely), though it does deserve to be handled.

@tomMoral

This comment has been minimized.

Copy link
Contributor

tomMoral commented Oct 5, 2017

For unpickling errors, you have currently no way to get the work_id of the failure once the error occurred, that is why we resorted to flagging the executor as broken. Another strategy which could solve this would be to change the _ResultItem/_CallItem objects to have safe unpickle, which is guaranteed to give back at least the work_id, so in case of failures, we can just add the exception to this job. But I am not sure how to do it.
Note that multiprocessing.Pool is also broken when a result fails at unpickle. (see here ).
If you think this is necessary for this PR, I can look into it.

For pickling errors, this is much simpler and I should add something for the result pickling error to make sure we do not fail the executor in this case. The pickling error for the call item is already handled that way.

@tomMoral

This comment has been minimized.

Copy link
Contributor

tomMoral commented Oct 5, 2017

After thinking a bit, my main concern about safe pickling/unpickling of _ResultItem/_CallItem is with the fact that we are agnostic of the Pickler objects used in the call_queue and results_queue. This should be the _ForkingPickler, but this could be changed, depending on the context, as we do in the loky project. So I am not sure if it is the right solution.

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch 2 times, most recently from 8427d36 to c70c4d6 Oct 23, 2017

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Oct 31, 2017

@tomMoral can you please summarize the change you did in the last commit. The tests deadlock now. I think this is no longer robust to the ExitAtUnpickle case (which I think is fine but it should be removed from the test suite).

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from c70c4d6 to 3c74cb2 Nov 2, 2017

@tomMoral

This comment has been minimized.

Copy link
Contributor

tomMoral commented Nov 2, 2017

@ogrisel In the last commit, I just removed the crash detection thread and the cases in the test suite not handled anymore. It works locally but it seems to fail deterministically on travis with clang. I cannot reproduce it even with a docker image from travis.

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch 2 times, most recently from e8595c5 to 7ad6721 Nov 3, 2017

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Nov 3, 2017

I managed to reproduce the deadlock (without -j4 and with --timeout=300):

test_crash (test.test_concurrent_futures.ProcessPoolForkserverExecutorDeadlockTest) ... error at task pickle
exit at task unpickle
error at task unpickle
crash at task unpickle
crash during func execution on worker
exit during func execution on worker
error during func execution on worker
crash during result pickle on worker
exit during result pickle on worker
error during result pickle on worker
error during result unpickle in result_handler

Timeout (0:05:00)!
Thread 0x00007fa7a0a88700 (most recent call first):
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 296 in wait
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/queues.py", line 224 in _feed
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 865 in run
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 917 in _bootstrap_inner
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 885 in _bootstrap

Thread 0x00007fa7a24a1700 (most recent call first):
  File "/home/parietal/ogrisel/cpython/Lib/selectors.py", line 415 in select
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/connection.py", line 920 in wait
  File "/home/parietal/ogrisel/cpython/Lib/concurrent/futures/process.py", line 308 in _queue_management_worker
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 865 in run
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 917 in _bootstrap_inner
  File "/home/parietal/ogrisel/cpython/Lib/threading.py", line 885 in _bootstrap

Thread 0x00007fa7a748b700 (most recent call first):
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/synchronize.py", line 96 in __enter__
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/queues.py", line 357 in put
  File "/home/parietal/ogrisel/cpython/Lib/concurrent/futures/process.py", line 567 in shutdown
  File "/home/parietal/ogrisel/cpython/Lib/test/test_concurrent_futures.py", line 873 in test_crash
  File "/home/parietal/ogrisel/cpython/Lib/unittest/case.py", line 615 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/case.py", line 663 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/runner.py", line 176 in run
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 1892 in _run_suite
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 1936 in run_unittest
  File "/home/parietal/ogrisel/cpython/Lib/test/test_concurrent_futures.py", line 1185 in test_main
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 2068 in decorator
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/runtest.py", line 176 in runtest_inner
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/runtest.py", line 140 in runtest
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 379 in run_tests_sequential
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 458 in run_tests
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 536 in _main
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 510 in main
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 585 in main
  File "/home/parietal/ogrisel/cpython/Lib/test/__main__.py", line 2 in <module>
  File "/home/parietal/ogrisel/cpython/Lib/runpy.py", line 85 in _run_code
  File "/home/parietal/ogrisel/cpython/Lib/runpy.py", line 193 in _run_module_as_main
@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Nov 3, 2017

Here is another run, this time the GC of the executor triggers the deadlock (via the wearkref callback):

test_crash (test.test_concurrent_futures.ProcessPoolForkserverExecutorDeadlockTest) ... error at task pickle
exit at task unpickle
error at task unpickle
crash at task unpickle
crash during func execution on worker
exit during func execution on worker
error during func execution on worker
crash during result pickle on worker
exit during result pickle on worker
error during result pickle on worker
error during result unpickle in result_handler
Timeout (0:01:00)!
Thread 0x00007f8bf9956700 (most recent call first):
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/synchronize.py", line 96 in __enter__
  File "/home/parietal/ogrisel/cpython/Lib/multiprocessing/queues.py", line 357 in put
  File "/home/parietal/ogrisel/cpython/Lib/concurrent/futures/process.py", line 486 in weakref_cb
  File "/home/parietal/ogrisel/cpython/Lib/unittest/case.py", line 643 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/case.py", line 663 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 122 in run
  File "/home/parietal/ogrisel/cpython/Lib/unittest/suite.py", line 84 in __call__
  File "/home/parietal/ogrisel/cpython/Lib/unittest/runner.py", line 176 in run
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 1892 in _run_suite
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 1936 in run_unittest
  File "/home/parietal/ogrisel/cpython/Lib/test/test_concurrent_futures.py", line 1186 in test_main
  File "/home/parietal/ogrisel/cpython/Lib/test/support/__init__.py", line 2068 in decorator
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/runtest.py", line 176 in runtest_inner
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/runtest.py", line 140 in runtest
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 379 in run_tests_sequential
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 458 in run_tests
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 536 in _main
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 510 in main
  File "/home/parietal/ogrisel/cpython/Lib/test/libregrtest/main.py", line 585 in main
  File "/home/parietal/ogrisel/cpython/Lib/test/__main__.py", line 2 in <module>
  File "/home/parietal/ogrisel/cpython/Lib/runpy.py", line 85 in _run_code
  File "/home/parietal/ogrisel/cpython/Lib/runpy.py", line 193 in _run_module_as_main

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from 7ad6721 to 7d886ad Nov 3, 2017

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Nov 3, 2017

Note: I also had to use taskset -c 0 python -m test ... to trigger the race condition. The problem is the lock of the result handler queue that has not yet been released when the worker process is terminated.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Nov 7, 2017

@tomMoral this PR is now conflicting with master.

tomMoral added some commits Feb 18, 2017

TST no deadlocks with pickling crashes
- TST crash in CallItem unpickling
- TST crash in func call run
- TST crash in result pickling
the test include crashes with PythonError/SystemExist/SegFault

Also add more tests for race condition when a worker crashes

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from 3de1ee7 to 04e1dc1 Nov 9, 2017

@tomMoral

This comment has been minimized.

Copy link
Contributor

tomMoral commented Nov 9, 2017

This PR has been rebased on master and I think it is ready for review.

@@ -75,12 +76,36 @@
_threads_queues = weakref.WeakKeyDictionary()
_global_shutdown = False


# This constants control the maximal wakeup. If a job is submitted to the

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

Typo: "This constant controls"

if reader in ready:
result_item = reader.recv()
# Wait for a result to be ready in the result_queue while checking
# that worker process are still running.

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

typo: "that all worker processes are still running"

# result_queue state. This avoid deadlocks caused by the non
# transmission of wakeup signal when a worker died with the
# _result_queue write lock.
self._wakeup = _Sentinel()

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

I think this should better be renamed to _queue_management_thread_sentinel to be more explicit.

self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
_threads_queues[self._queue_management_thread] = self._wakeup

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

_threads_queues should be renamed to _threads_sentinels.

p.terminate()
executor.shutdown(wait=True)
print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
self.fail(f"Deadlock executor:\n\n{tb}")

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

Phrasing: "Executor deadlock:"

from signal import SIGTERM as SIGKILL
try:
os.kill(pid, SIGKILL)
time.sleep(.01)

This comment has been minimized.

@ogrisel

ogrisel Nov 21, 2017

Contributor

Why is this sleep necessary? Shouldn't it be part of the caller instead? Or alternatively, kill several times until the OS replies that the process is dead:

n_trials = 3

for i in range(n_trials):
    try:
        os.kill(pid, SIGKILL)
    except (ProcessLookupError, PermissionError):
        break
    time.sleep(0.01)
else:
    raise RuntimeError(f"Could not kill process {pid} after {n_trials} trials")

This comment has been minimized.

@tomMoral

tomMoral Nov 22, 2017

Contributor

This sleep increases the chances for this process to be killed by another process and give time to make sure that the BrokenProcessPool error occurs before the end of the current function.
I think that if a process fail to kill another one, it means that it was already shutdown so we should not make multiple tries.

# Give some time for the Executor to detect the failure
time.sleep(.5)

def test_crash_races(self):

This comment has been minimized.

@pitrou

pitrou Nov 29, 2017

Member

Is it important to have this test in addition to test_crash?
If so, then I suggest you look at test_killed_child to see how to kill a child in a safe and cross-platform way.

This comment has been minimized.

@tomMoral

tomMoral Dec 1, 2017

Contributor

Indeed it is redundant. I removed it.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Nov 30, 2017

When googling the "fatal: Invalid symmetric difference expression" error message on travis I found this post by @pitrou on the python committers mailing list:

https://mail.python.org/pipermail/python-committers/2017-July/004717.html

but no one replied :) Maybe try a rebase.

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from 04089b6 to 919156a Dec 1, 2017

@tomMoral

This comment has been minimized.

Copy link
Contributor

tomMoral commented Dec 1, 2017

Here is a small benchmark of a large number of very short tasks to mesure the overhead of task management.

from time import time
import statistics
from concurrent.futures import ProcessPoolExecutor

def identity(x):
    return x

def main():
    durations = []
    with ProcessPoolExecutor(max_workers=4) as executor:
        for _ in range(10):
            t0 = time()
            list(executor.map(identity, range(int(1e4))))
            dt = time() - t0
            durations.append(dt)
            print(f"dt: {dt:.3f}s")
    mean = statistics.mean(durations)
    std = statistics.stdev(durations)
    print(f"{mean:.3f}s (+/-{std:.3f}s)")

if __name__ == '__main__':
    main()

Result on master: 2.300s (+/-0.065s)
Result on this PR: 2.169s (+/-0.069s)

The small improvement is probably due to the use of an lockless independent communication channel to wakeup the queue_manager_thread when submitting new tasks.

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Dec 4, 2017

Note that the "fatal: Invalid symmetric difference expression" error message on travis is no longer occurring in the latest commits.

@@ -149,8 +149,6 @@ def run(self):
def run_tests_multiprocess(regrtest):
output = queue.Queue()
pending = MultiprocessIterator(regrtest.tests)
test_timeout = regrtest.ns.timeout

This comment has been minimized.

@pitrou

pitrou Dec 4, 2017

Member

I don't think we want to remove this in this PR. If there's a problem with parallel tests and the --timeout option, better open a separate issue about it.

This comment has been minimized.

@tomMoral

tomMoral Dec 4, 2017

Contributor

I removed the commit from the PR and I will open another one for this change.

tb = f.read()
for p in executor._processes.values():
p.terminate()
executor.shutdown(wait=True)

This comment has been minimized.

@pitrou

pitrou Dec 4, 2017

Member

If the executor is in a deadlock state, shutdown(wait=True) will never return, right?

This comment has been minimized.

@tomMoral

tomMoral Dec 4, 2017

Contributor

As we kill the processes just before, this should be safe. In this case, if the queue_management_worker is alive, it will flag the ProcessPoolExecutor as broken and clean up the state and if it is not, the call to shutdown won't be blocking and the ressources will still be freed.

import faulthandler
from tempfile import TemporaryFile
with TemporaryFile(mode="w+") as f:
faulthandler.dump_traceback(file=f)

This comment has been minimized.

@pitrou

pitrou Dec 4, 2017

Member

Why not file=sys.stderr?

This comment has been minimized.

@ogrisel

ogrisel Dec 4, 2017

Contributor

If I recall correctly, faulthandler actually needs a system-level file handle, not just a Python-level file object.

This comment has been minimized.

@tomMoral

tomMoral Dec 4, 2017

Contributor

This part of the code is called with test.support.captured_stderr. In this case, the sys.stderr object does not have a fileno and fault_handler.dump_traceback fails. So we resorted to this to allow getting the traceback in the exception. Another option is to dump the traceback on sys.stdout.

This comment has been minimized.

@pitrou

pitrou Dec 4, 2017

Member

I see. Do you think this whole code is still useful? I understand it was useful for you to debug the PR, but now that it seems to pass, is there a reason for keeping it?

This comment has been minimized.

@tomMoral

tomMoral Dec 4, 2017

Contributor

The reason to keep it would be to have an informative test report in case of a regression introducing a deadlock. As deadlocks potentially happen randomly, it is always good to have a traceback asap when it happens. So I would rather keep it.
But we could move it to sys.stdout if you think the usage of a tempfile introduces unnecessary complexity. The test report would still have the information but in a degraded format: not in the test report, but in the test stdout.

@tomMoral tomMoral force-pushed the tomMoral:PR_robust_failures branch from 5da1f92 to 874f263 Dec 4, 2017

@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Jan 4, 2018

@pitrou I think all the comments have been addressed in this PR.

@pitrou
Copy link
Member

pitrou left a comment

@ogrisel thanks for pinging me (did you amend previous commits? I didn't see any recent commits). There are still a couple nits left.

@@ -94,6 +95,7 @@ def tearDown(self):


class ExecutorMixin:
timeout = 30

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

This doesn't seem used anymore.

@@ -116,6 +118,8 @@ def setUp(self):
self._prime_executor()

def tearDown(self):
# Remove the reference to self.timer to avoid the thread_cleanup

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

Also this comment doesn't seem necessary anymore. Or am I missing something?

call_queue.close()
# Make sure that the queue management thread was properly finished
# and the queue was closed by the shutdown process
queue_management_thread.join()

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

It's already joined 6 lines above. The second join() call is a no-op, no?

@@ -759,6 +766,180 @@ def test_ressources_gced_in_workers(self):
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))

def hide_process_stderr():
import io
setattr(sys, "stderr", io.StringIO())

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

Is there a subtlety that I'm missing, or can this simply be written as sys.stderr = io.StringIO()?



class ExitAtPickle(object):
"""Bad object that triggers a segfault at pickling time."""

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

The docstring is wrong here.



class ErrorAtPickle(object):
"""Bad object that triggers a segfault at pickling time."""

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

And here as well.



class ErrorAtUnpickle(object):
"""Bad object that triggers a process exit at unpickling time."""

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

And here too :-)

This comment has been minimized.

@tomMoral

tomMoral Jan 5, 2018

Contributor

The classic copy/paste mistake... Thanks for pointing it out! :-)

executor.shutdown(wait=True)

@classmethod
def _test_getpid(cls, a):

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

This doesn't seem used anymore?



class ExecutorDeadlockTest:
# If ExecutorDeadlockTest takes more than 100secs to complete, it is very

This comment has been minimized.

@pitrou

pitrou Jan 4, 2018

Member

"100secs" isn't in sync with the number below.

This comment has been minimized.

@tomMoral

tomMoral Jan 5, 2018

Contributor

The comment was a duplicate from the one 3 lines below so I removed it.

@bedevere-bot

This comment has been minimized.

Copy link

bedevere-bot commented Jan 4, 2018

A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated.

Once you have made the requested changes, please leave a comment on this pull request containing the phrase I have made the requested changes; please review again. I will then notify any core developers who have left a review that you're ready for them to take another look at this pull request.

@pitrou

pitrou approved these changes Jan 5, 2018

@pitrou

This comment has been minimized.

Copy link
Member

pitrou commented Jan 5, 2018

@tomMoral thanks for the updates! This is looking good to me now.

@pitrou pitrou merged commit 94459fd into python:master Jan 5, 2018

3 checks passed

bedevere/issue-number Issue number 31699 found
Details
bedevere/news News entry found in Misc/NEWS.d
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@ogrisel

This comment has been minimized.

Copy link
Contributor

ogrisel commented Jan 5, 2018

Great, thanks for the review @pitrou!

@tomMoral tomMoral deleted the tomMoral:PR_robust_failures branch Jan 11, 2018

@tomMoral tomMoral referenced this pull request Apr 9, 2018

Closed

Port robust ProcessPoolExecutor to `concurrent.futures` #48

7 of 11 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment