New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool #11488

Open
wants to merge 6 commits into
base: master
from

Conversation

Projects
None yet
5 participants
@pablogsal
Member

pablogsal commented Jan 9, 2019

This is a very simple fix for this problem using Process.sentinel. If you want a more sophisticated solution, please advice.

import multiprocessing
import time
CONCURRENCY = 1
NTASK = 100
def noop():
    pass
with multiprocessing.Pool(CONCURRENCY, maxtasksperchild=1) as pool:
    start_time = time.monotonic()
    results = [pool.apply_async(noop, ()) for _ in range(NTASK)]
    for result in results:
        result.get()
    dt = time.monotonic() - start_time
    pool.terminate()
    pool.join()
print("Total: %.1f sec" % dt)

Before this PR

Total: 10.2 sec

After this PR:

Total: 0.5 sec

https://bugs.python.org/issue35493

@pablogsal pablogsal requested review from pitrou and vstinner Jan 9, 2019

@pablogsal pablogsal self-assigned this Jan 9, 2019

@pablogsal pablogsal force-pushed the pablogsal:bpo35493 branch from 14d1e7c to 55170b9 Jan 9, 2019

@@ -437,7 +443,7 @@ def _handle_workers(pool):
# is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
pool._wait_for_updates(timeout=0.2)

This comment has been minimized.

@pablogsal

pablogsal Jan 9, 2019

Member

The timeout is needed for detecting changes in thread._state

This comment has been minimized.

@vstinner

vstinner Jan 10, 2019

Member

asyncio uses the "self-pipe" pattern to wake up itself when it gets an event from a different thread or when it gets a Unix signal. Would it be possible to use a self-pipe (or something else) to wake up the wait when thread._state changes?

This comment has been minimized.

@pablogsal

pablogsal Jan 12, 2019

Member

We would need to manage also the case when pool._cache is empty.

This comment has been minimized.

@vstinner

vstinner Jan 14, 2019

Member

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?

This comment has been minimized.

@pablogsal

pablogsal Jan 14, 2019

Member

I used 0.2 to maintain backwards compatibility with the old behaviour. See my other comment explaining why I maintained the timeout.

@pablogsal

This comment has been minimized.

Member

pablogsal commented Jan 12, 2019

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

@@ -437,7 +443,7 @@ def _handle_workers(pool):
# is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
pool._wait_for_updates(timeout=0.2)

This comment has been minimized.

@vstinner

vstinner Jan 14, 2019

Member

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?

@@ -596,7 +622,7 @@ def _help_stuff_finish(inqueue, task_handler, size):
time.sleep(0)

@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, pool_notifier,

This comment has been minimized.

@vstinner

vstinner Jan 14, 2019

Member

i would prefer to reuse the same variable name: pool_notifier => change_notifier.

Show resolved Hide resolved Lib/multiprocessing/pool.py
Show resolved Hide resolved Lib/multiprocessing/pool.py
Show resolved Hide resolved Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst Outdated
Show resolved Hide resolved Lib/multiprocessing/pool.py
@bedevere-bot

This comment has been minimized.

bedevere-bot commented Jan 14, 2019

When you're done making the requested changes, leave the comment: I have made the requested changes; please review again.

Show resolved Hide resolved Lib/multiprocessing/pool.py Outdated
Show resolved Hide resolved Lib/multiprocessing/pool.py
Show resolved Hide resolved Lib/multiprocessing/pool.py Outdated
Show resolved Hide resolved Lib/multiprocessing/pool.py

@pablogsal pablogsal force-pushed the pablogsal:bpo35493 branch 6 times, most recently from 0046667 to 67bc914 Jan 14, 2019

@pablogsal pablogsal force-pushed the pablogsal:bpo35493 branch from 67bc914 to 61e3b60 Jan 14, 2019

@pablogsal pablogsal force-pushed the pablogsal:bpo35493 branch from 7ba415b to fc3fa24 Jan 14, 2019

@vstinner
Member

vstinner left a comment

Oh wow, any multiprocessing is so complex... I'm happy with @pablogsal handles it instead of me :-D

*self_notifier_sentinels]
wait(sentinels, timeout=timeout)
while not self._change_notifier.empty():
self._change_notifier.get()

This comment has been minimized.

@vstinner

vstinner Jan 15, 2019

Member

I don't think that this code is safe, it looks like a race condition: https://en.wikipedia.org/wiki/Time_of_check_to_time_of_use

I suggest to call get(block=False) in a loop until you get an Empty exception.

Note: the race condition is not really critical, since it's fine if we miss a few events.

This comment has been minimized.

@pitrou

pitrou Jan 15, 2019

Member

There is no race condition as long as this is the only thread that pops from the queue.

This comment has been minimized.

@pablogsal

pablogsal Jan 15, 2019

Member

Also, sadly queue.SimpleQueue has no block=False option (we could add one, but I think is not needed).

sentinels = [*task_queue_sentinels,
*worker_sentinels,
*self_notifier_sentinels]
wait(sentinels, timeout=timeout)

This comment has been minimized.

@vstinner

vstinner Jan 15, 2019

Member

Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool.

This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):

My change doesn't work: self._worker_state_event isn't set when a worker completes, whereas _maintain_pool() should be called frequently to check when a worker completed.

This comment has been minimized.

This comment has been minimized.

@vstinner

vstinner Jan 15, 2019

Member

I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.

This comment has been minimized.

@pitrou

pitrou Jan 15, 2019

Member

It looks obvious to me, especially as the function is named wait_for_updates, but I guess it doesn't hurt to add a comment.

Show resolved Hide resolved Lib/multiprocessing/pool.py
@@ -0,0 +1,3 @@
Use :func:`multiprocessing.connection.wait` instead of polling each 0.2

This comment has been minimized.

@vstinner

vstinner Jan 15, 2019

Member

Oops, I clicked on the wrong button :-( Your NEWS entry still doesn't explain that only process pools are affected.

Another issue: "polling each 0.2 seconds": currently the code uses 0.1 seconds.

This comment has been minimized.

@pablogsal

pablogsal Jan 15, 2019

Member

Sorry, I completely forgot about that while fighting Windows issues :/

@pablogsal

This comment has been minimized.

Member

pablogsal commented Jan 15, 2019

I made some changes to eliminate some problems that I found on Windows. I have run all multiprocessing test in a loop with this patch manually on almost all our Windows buildbots and it passes without problems, so I think the pipe/socket solution is resilient also on Windows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment