-
-
Notifications
You must be signed in to change notification settings - Fork 31.6k
Imap from ThreadPool behaves unexpectedly #72885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Consider the following code: from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def gen():
yield 1 + '1' # here is an error
print(list(pool.imap(str, gen()))) # prints []
print(list(pool.map(str, gen()))) # raises TypeError The difference is, that the line with imap prints an empty list, while the line with map raises an exception, as expected. Change the above snippet of code, adding additional yield statement: from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def gen():
yield 1
yield 1 + '1' # here is an error
print(list(pool.imap(str, gen()))) # raises TypeError
print(list(pool.map(str, gen()))) # also would raise TypeError So now both map and imap will raise the exception, as expected. Therefore I suppose the behavior of imap showed in the first case is wrong. |
To tie in the example given by @elias in bpo-28625, this inconsistency in behavior is not limited to ThreadPool -- it appears with a process Pool as well: from multiprocessing import Pool
def double(x):
return 2 * x
def get_numbers():
raise Exception("oops")
yield 1
yield 2 >>> list(Pool(processes=2).imap(double, get_numbers())) # returns list
[]
>>> list(Pool(processes=2).map(double, get_numbers()))
Traceback (most recent call last):
...
Exception: oops def get_numbers_differently():
yield 1
raise Exception("oops")
yield 2 >>> list(Pool(processes=2).imap(double, get_numbers_differently())) # now we see exception
Traceback (most recent call last):
...
Exception: oops |
This inconsistent behavior in imap on both Pool and ThreadPool is not what I would expect. |
Though it still lacks a proper test, I'm attaching a preliminary patch to address the problematic behavior in 3.5/3.6/default in the hopes that others might help test it. |
Hi Davin, could it be fixed like this? diff -r 05a728e1da15 Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py Wed Nov 16 16:35:53 2016 -0800
+++ b/Lib/multiprocessing/pool.py Thu Nov 17 16:35:38 2016 +0800
@@ -398,7 +398,7 @@
except Exception as ex:
job, ind = task[:2] if task else (0, 0)
if job in cache:
- cache[job]._set(ind + 1, (False, ex))
+ cache[job]._set(ind + 1 if task else 0, (False, ex))
if set_length:
util.debug('doing set_length()')
set_length(i+1) It seems to me the bug is _handle_tasks doesn't treat the exception correctly if it's on the very first. Every time it _set(ind + 1) since if there is any exception the task is the previous task and + 1 is needed. But if the exception occurs at the very first, task is None and the + 1 is not needed. I am not very sure but the reported cases work correctly now: list(Pool(processes=2).imap(double, get_numbers())) # raises error now |
What's more, this case seems non-reentrant. Since there is no task in this case, the job id is always 0 which is not true. This means after the first time, we can not set even the exception. |
Here is a patch which is just a try. I don't quite like the implementation but I can not figure out a better solution. The examples in this one and bpo-28696 seems to work and no test fails currently. |
@xiang.zhang: Your patch looks to be introducing a number of changes to the structure of the data being passed around between threads and even monitored/indirectly shared across processes. It's looking increasingly high risk to me. We already have logic for handling exceptions arising during jobs but the one situation overlooked in this logic is if the exception occurs "quickly in an unfortunate order", meaning the exception is encountered and reported before any of the other individual tasks can complete and respond with a result. This oversight of logic can be addressed a couple of ways:
I regard option 1 as being potentially a bit fragile and option 2 as introducing non-trivial complexity and risk. With option 3, there's effectively no risk and no measurable cost getting to the truth of what has actually happened. |
That's why I say even myself don't like it. To solve an edge case some long introduced codes have to been changed. Sigh.
I think you mean your patch. But how does it solve the reentrant problem? Yeah, actually it's not reentrant. If the problematic job is not the first job with id 0, then the exception won't be set. With your patch, repeatedly execute print(list(pool.imap(str, gen()))). Only the first time there is an exception. |
@xiang.zhang: Nice catch -- thank you for pointing out the additional issue that arises when trying to provoke the issue twice in a row. The module attribute I didn't like any of my previously suggested approaches, but I think there's a 4th option: Now when we provide a problematic generator/iterable to imap, the exception it triggers is caught and the resulting exception passed through the system to make use of the logic that is already in place. This same issue can arise for imap_unordered() as well as imap() and can be addressed in the same manner. Attaching another preliminary patch that still lacks formal tests but I'll attach crude versions of tests momentarily. If we're still missing some use case or other logic path, now's the time to find it. |
Attaching promised crude tests. |
This approach is good. 2 points about the patch:
from multiprocessing import Pool
def double(x):
return 2 * x
class buggy:
def __iter__(self):
return self
def __next__(self):
raise Exception('oops')
def __len__(self):
return 1
list(Pool(processes=2).map(double, buggy()))
list(Pool(processes=2).map(double, buggy())) # hangs
|
Since the only thing I know about the multiprocessing internals is what I just read in the source code trying to debug my imap_unordered call, I'll add the following example, not knowing whether this is already covered by everything you have until now. import multiprocessing.pool
def gen():
raise Exception('generator exception')
yield 1
yield 2
for i in range(3):
with multiprocessing.pool.ThreadPool(3) as pool:
try:
print(list(pool.imap_unordered(lambda x: x*2, gen())))
except Exception as e:
print(e) This only prints 'generator exception' once for the first iteration. For the following iterations imap_unordered returns an empty list. This is the case for both Pool and ThreadPool. |
Davin, I propose a PR to solve this problem based on your patch. Hope you are willing to review and let's finish this. ;-) |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: