Skip to content

Commit

Permalink
Fix (another) thread leak in ThrottleExecutor
Browse files Browse the repository at this point in the history
The thread's main loop did "del executor" before waiting on the event,
but it could still hold a reference to the executor from 'to_submit'
or 'job'.

Fixes #93
  • Loading branch information
rohanpm committed Jan 2, 2019
1 parent dab5ec6 commit f6d8882
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 25 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -68,6 +68,11 @@ def fetch_urls(urls):

## Changelog

### v1.15.0

- Fixed ThrottleExecutor thread leak when `shutdown()` is never called
([#93](https://github.com/rohanpm/more-executors/issues/93))

### v1.14.0

- API break: removed `Executors.wrap` class method
Expand Down
47 changes: 25 additions & 22 deletions more_executors/throttle.py
Expand Up @@ -109,34 +109,37 @@ def _delegate_future_done(self, future):
self._event.set()


def _submit_loop(executor_ref):
while True:
executor = executor_ref()
def _submit_loop_iter(executor):
if not executor:
return

if not executor:
break
if executor._shutdown:
return

if executor._shutdown:
break
to_submit = []
with executor._lock:
while executor._to_submit:
if not executor._sem.acquire(False):
executor._log.debug("Throttled")
break
job = executor._to_submit.popleft()
executor._log.debug("Will submit: %s", job)
to_submit.append(job)

to_submit = []
with executor._lock:
while executor._to_submit:
if not executor._sem.acquire(False):
executor._log.debug("Throttled")
break
job = executor._to_submit.popleft()
executor._log.debug("Will submit: %s", job)
to_submit.append(job)
executor._log.debug("Submitting %s, throttling %s",
len(to_submit), len(executor._to_submit))

executor._log.debug("Submitting %s, throttling %s",
len(to_submit), len(executor._to_submit))
for job in to_submit:
executor._do_submit(job)

for job in to_submit:
executor._do_submit(job)
return executor._event

event = executor._event
del executor

def _submit_loop(executor_ref):
while True:
event = _submit_loop_iter(executor_ref())
if not event:
break

event.wait()
event.clear()
27 changes: 24 additions & 3 deletions tests/test_executor_threadleak.py
Expand Up @@ -10,7 +10,8 @@


def poll_noop(descriptors):
pass
for descriptor in descriptors:
descriptor.yield_result(descriptor.result)


@fixture
Expand Down Expand Up @@ -38,12 +39,19 @@ def fn():
@fixture
def ctor_with_throttle():
def fn():
return Executors.sync().with_throttle(4)
return Executors.sync().with_throttle(2)
return fn


@fixture
def ctor_with_poll_throttle():
def fn():
return Executors.sync().with_poll(poll_noop).with_throttle(2)
return fn


@fixture(params=['sync', 'thread_pool', 'with_retry', 'with_poll',
'with_throttle'])
'with_throttle', 'with_poll_throttle'])
def executor_ctor(request):
return request.getfixturevalue('ctor_' + request.param)

Expand Down Expand Up @@ -81,3 +89,16 @@ def test_no_leak_on_discarded_futures(executor_ctor):
del futures

assert_soon(no_extra_threads)


def test_no_leak_on_completed_futures(executor_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())

executor = executor_ctor()
futures = [executor.submit(mult2, n) for n in [10, 20, 30]]
for future in futures:
future.result()
del executor
del futures

assert_soon(no_extra_threads)

0 comments on commit f6d8882

Please sign in to comment.