Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: cannot pickle '_thread.RLock' object with ProcessPoolExecutor #429

Open
davetapley opened this issue Jan 13, 2024 · 2 comments
Open

Comments

@davetapley
Copy link

Here's a weird one.

I'm using ProcessPoolExecutor and sometimes that causes in contention on my databases.
I already use tenacity elsewhere, so I figure can just wrap in retry_if_exception_type(IOException).

But when IOException is thrown I get TypeError: cannot pickle '_thread.RLock' object.

MRE:

from concurrent.futures import ProcessPoolExecutor, wait
from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
                      wait_fixed)

I = TypeVar('I')
O = TypeVar('O')


class IOException(Exception):
    ...


@dataclass
class Job(Generic[I, O]):
    f: Callable[[I], O]

    @retry(retry=retry_if_exception_type(IOException),
           wait=wait_fixed(0.1),
           stop=stop_after_attempt(10))
    def __call__(self, x: I):
        return self.f(x)


def good_job(x: int) -> int:
    return x * 2


def fail_job(x: int) -> int:
    raise IOException('nope')


data = [1, 2, 3, 4, 5]

with ProcessPoolExecutor() as ex:
    jobs = [ex.submit(Job(good_job), x) for x in data]
    wait(jobs)
    print([job.result() for job in jobs])

with ProcessPoolExecutor() as ex:
    jobs = [ex.submit(Job(fail_job), x) for x in data]
    wait(jobs)
    print([job.result() for job in jobs])

Output:

[2, 4, 6, 8, 10]
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/python/3.11.6/lib/python3.11/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 23, in __call__
    return self.f(x)
           ^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 31, in fail_job
    raise IOException('nope')
IOException: nope

The above exception was the direct cause of the following exception:

tenacity.RetryError: RetryError[<Future at 0x7f0a37898e50 state=finished raised IOException>]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/process.py", line 217, in _sendback_result
    result_queue.put(_ResultItem(work_id, result=result,
  File "/opt/python/3.11.6/lib/python3.11/multiprocessing/queues.py", line 371, in put
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/workspaces/ng/tenancity.py", line 43, in <module>
    print([job.result() for job in jobs])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/ng/tenancity.py", line 43, in <listcomp>
    print([job.result() for job in jobs])
           ^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/python/3.11.6/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
TypeError: cannot pickle '_thread.RLock' object
@spolloni
Copy link

spolloni commented Jan 18, 2024

we're also hitting this but using multiprocessing.Pool instead. Interestingly, setting reraise=True avoids this issue and makes the code behave as expected.

Here is @davetapley's adapted MRE:

from multiprocessing import get_context
from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
                      wait_fixed)

I = TypeVar('I')
O = TypeVar('O')

class IOException(Exception):
    ...


@dataclass
class Job(Generic[I, O]):
    f: Callable[[I], O]

    @retry(
        retry=retry_if_exception_type(IOException),
        wait=wait_fixed(0.1),
        stop=stop_after_attempt(10),
        #reraise=True,
    )
    def __call__(self, x: I):
        return self.f(x)


def good_job(x: int) -> int:
    return x * 2


def fail_job(x: int) -> int:
    raise IOException('nope')


data = [1, 2, 3, 4, 5]

if __name__ == '__main__':
    with get_context('spawn').Pool() as pool:
        for result in pool.imap_unordered(Job(good_job), data):
            print(result)

    with get_context('spawn').Pool() as pool:
        for result in pool.imap_unordered(Job(fail_job), data):
            print(result)

output with reraise=False:

multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x1023aa320>'. Reason: 'TypeError("cannot pickle '_thread.RLock' object")'

output with reraise=True:

__main__.IOException: nope

@spolloni
Copy link

^ the issue also seems unrelated to the use of retry_if_exception_type and wait_fixed since removing those still causes the same error.

sergei-maertens added a commit to open-formulieren/open-forms that referenced this issue Apr 8, 2024
The retry error from tenacity uses threading locks under the hood,
which cannot be pickled. Specifying reraise=True avoids this
particular codepath.

See upstream issues:

* jd/tenacity#429
* jd/tenacity#182
* jd/tenacity#147
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants