Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NAS-111779 / 12.0 / improve IoThreadPoolExecutor (by yocalebo) #7381

Merged
merged 2 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/middlewared/middlewared/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def __init__(
self.startup_seq_path = startup_seq_path
self.app = None
self.loop = None
self.run_in_thread_executor = IoThreadPoolExecutor('IoThread', 20)
self.run_in_thread_executor = IoThreadPoolExecutor()
self.__thread_id = threading.get_ident()
# Spawn new processes for ProcessPool instead of forking
multiprocessing.set_start_method('spawn')
Expand Down Expand Up @@ -1117,6 +1117,14 @@ async def run_in_executor(self, pool, method, *args, **kwargs):
Also used to run non thread safe libraries (using a ProcessPool)
"""
loop = asyncio.get_event_loop()
if isinstance(pool, IoThreadPoolExecutor) and self.run_in_thread_executor.no_idle_threads:
# this means the IoThreadPool has no idle threads so instead of blocking the
# main event loop, we'll spin up single-use threads until the threadpool gets
# some more idle thread(s)
self.logger.trace('Calling %r in single-use thread', method)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as exc:
return await loop.run_in_executor(exc, functools.partial(method, *args, **kwargs))

return await loop.run_in_executor(pool, functools.partial(method, *args, **kwargs))

async def _run_in_conn_threadpool(self, method, *args, **kwargs):
Expand Down
147 changes: 23 additions & 124 deletions src/middlewared/middlewared/utils/io_thread_pool_executor.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,30 @@
from concurrent.futures import _base
import itertools
import logging
import queue
import random
import threading
from os import cpu_count
from concurrent.futures import ThreadPoolExecutor

import middlewared.utils.osc as osc
from middlewared.utils.osc import set_thread_name

logger = logging.getLogger(__name__)

class IoThreadPoolExecutor(ThreadPoolExecutor):

class WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initializer = set_thread_name('IoThread')

def run(self):
if not self.future.set_running_or_notify_cancel():
return
# we set these to 21 or 33 respectively so that we
# always have a 1 idle thread buffer when we check
# the semaphore which should help prevent a non-fatal
# race condition with the caller of this method
# minimally we have 21 - 1 thread available
# on large cpu count systems we set it to 33 - 1 (to match upstream)
self._max_workers = 21 if ((cpu_count() or 1) + 4) < 32 else 33

try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
@property
def no_idle_threads(self):
# note, this is "technically" an implementation
# detail of the threading.Semaphore class so upstream
# can change this variable at any time so I'm noting
# it here so my future self doesn't pull their hair
# out when this occurs :)


class Worker:
def __init__(self, name, executor):
self.name = name
self.executor = executor

self.busy = False

self.thread = threading.Thread(name=self.name, daemon=True, target=self._target)
self.thread.start()

def _target(self):
osc.set_thread_name(self.name)
try:
while True:
work_item = self.executor.get_work_item(self)
if work_item is None:
return

work_item.run()
del work_item
except Exception:
logger.critical("Exception in worker", exc_info=True)
finally:
self.executor.remove_worker(self)

def __repr__(self):
return f"<Worker {self.name}{' busy' if self.busy else ''}>"


class IoThreadPoolExecutor(_base.Executor):
def __init__(self, thread_name_prefix, min_workers):
self.thread_name_prefix = thread_name_prefix
self.counter = itertools.count()

self.work_queue = queue.Queue()

self.min_workers = min_workers
self.workers = []
self.workers_busy_lock = threading.Lock()
for i in range(self.min_workers):
self._start_worker()

def submit(self, fn, *args, **kwargs):
future = _base.Future()
work_item = WorkItem(future, fn, args, kwargs)

self.work_queue.put(work_item)

start_worker = False
with self.workers_busy_lock:
if not any([not worker.busy for worker in self.workers]):
logger.trace("Starting new worker in namespace %r because there are no free workers",
self.thread_name_prefix)
start_worker = True
if start_worker:
self._start_worker()

return future

def _start_worker(self):
worker = Worker(f'{self.thread_name_prefix}-{next(self.counter)}', self)
self.workers.append(worker)

def get_work_item(self, worker):
with self.workers_busy_lock:
worker.busy = False

while True:
timeout = None
free_workers = sum([1 for worker in self.workers if not worker.busy])
if free_workers > self.min_workers:
logger.trace("Will probably need to shutdown %r because there are %d free workers",
worker, free_workers)
timeout = random.uniform(4.0, 6.0)

try:
work_item = self.work_queue.get(True, timeout)
except queue.Empty:
with self.workers_busy_lock:
free_workers = sum([1 for worker in self.workers if not worker.busy])
if free_workers > self.min_workers:
logger.trace("Shutting down %r because there are %d free workers", worker, free_workers)
self.remove_worker(worker)
return None

# Else, other worker has been shut down and now the number of workers is correct, let's run another
# iteration of this (now, probably with infinite timeout)
else:
with self.workers_busy_lock:
worker.busy = True

return work_item

def remove_worker(self, worker):
try:
self.workers.remove(worker)
except ValueError:
pass
# give ourselvs a single idle thread buffer
return self._idle_semaphore._value - 1 <= 1