diff --git a/README.md b/README.md index 9840ec85..456a6558 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,11 @@ def fetch_urls(urls): ## Changelog +### v1.15.0 + +- Fixed ThrottleExecutor thread leak when `shutdown()` is never called + ([#93](https://github.com/rohanpm/more-executors/issues/93)) + ### v1.14.0 - API break: removed `Executors.wrap` class method diff --git a/more_executors/throttle.py b/more_executors/throttle.py index 9b896859..2d4d8274 100644 --- a/more_executors/throttle.py +++ b/more_executors/throttle.py @@ -109,34 +109,37 @@ def _delegate_future_done(self, future): self._event.set() -def _submit_loop(executor_ref): - while True: - executor = executor_ref() +def _submit_loop_iter(executor): + if not executor: + return - if not executor: - break + if executor._shutdown: + return - if executor._shutdown: - break + to_submit = [] + with executor._lock: + while executor._to_submit: + if not executor._sem.acquire(False): + executor._log.debug("Throttled") + break + job = executor._to_submit.popleft() + executor._log.debug("Will submit: %s", job) + to_submit.append(job) - to_submit = [] - with executor._lock: - while executor._to_submit: - if not executor._sem.acquire(False): - executor._log.debug("Throttled") - break - job = executor._to_submit.popleft() - executor._log.debug("Will submit: %s", job) - to_submit.append(job) + executor._log.debug("Submitting %s, throttling %s", + len(to_submit), len(executor._to_submit)) - executor._log.debug("Submitting %s, throttling %s", - len(to_submit), len(executor._to_submit)) + for job in to_submit: + executor._do_submit(job) - for job in to_submit: - executor._do_submit(job) + return executor._event - event = executor._event - del executor + +def _submit_loop(executor_ref): + while True: + event = _submit_loop_iter(executor_ref()) + if not event: + break event.wait() event.clear() diff --git a/tests/test_executor_threadleak.py b/tests/test_executor_threadleak.py index 20e7b888..eda1ff11 100644 --- a/tests/test_executor_threadleak.py +++ b/tests/test_executor_threadleak.py @@ -10,7 +10,8 @@ def poll_noop(descriptors): - pass + for descriptor in descriptors: + descriptor.yield_result(descriptor.result) @fixture @@ -38,12 +39,19 @@ def fn(): @fixture def ctor_with_throttle(): def fn(): - return Executors.sync().with_throttle(4) + return Executors.sync().with_throttle(2) + return fn + + +@fixture +def ctor_with_poll_throttle(): + def fn(): + return Executors.sync().with_poll(poll_noop).with_throttle(2) return fn @fixture(params=['sync', 'thread_pool', 'with_retry', 'with_poll', - 'with_throttle']) + 'with_throttle', 'with_poll_throttle']) def executor_ctor(request): return request.getfixturevalue('ctor_' + request.param) @@ -81,3 +89,16 @@ def test_no_leak_on_discarded_futures(executor_ctor): del futures assert_soon(no_extra_threads) + + +def test_no_leak_on_completed_futures(executor_ctor): + no_extra_threads = partial(assert_no_extra_threads, thread_names()) + + executor = executor_ctor() + futures = [executor.submit(mult2, n) for n in [10, 20, 30]] + for future in futures: + future.result() + del executor + del futures + + assert_soon(no_extra_threads)