From 6948945d55890fc31b379781d0b0454df2bd44af Mon Sep 17 00:00:00 2001 From: THIBAULT OBER Date: Thu, 18 Aug 2016 09:37:40 +0000 Subject: [PATCH 1/2] threadpool v2, destroy thread when needed and create good number of thread in enqueue --- pelix/threadpool.py | 47 ++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/pelix/threadpool.py b/pelix/threadpool.py index bc6031bc..7e1fc0f7 100644 --- a/pelix/threadpool.py +++ b/pelix/threadpool.py @@ -24,6 +24,8 @@ See the License for the specific language governing permissions and limitations under the License. """ +from sys import dont_write_bytecode +dont_write_bytecode # Standard library import logging @@ -212,9 +214,10 @@ def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60, # Thread count self._thread_id = 0 - # Current number of threads, active and alive + # Current number of threads, active and alive, and number of task waiting self.__nb_threads = 0 self.__nb_active_threads = 0 + self.__nb_pending_task = 0 def start(self): """ @@ -231,15 +234,20 @@ def start(self): nb_pending_tasks = self._queue.qsize() if nb_pending_tasks > self._max_threads: nb_threads = self._max_threads + nb_pending_tasks = self.max_threads elif nb_pending_tasks < self._min_threads: nb_threads = self._min_threads else: nb_threads = nb_pending_tasks # Create the threads - for _ in range(nb_threads): + for _ in range(0,nb_pending_tasks): + self.__nb_pending_task += 1 self.__start_thread() - + for _ in range(0,nb_threads-nb_pending_tasks): + self.__start_thread() + + def __start_thread(self): """ Starts a new thread, if possible @@ -259,9 +267,14 @@ def __start_thread(self): thread = threading.Thread(target=self.__run, name=name) thread.daemon = True - self._threads.append(thread) - thread.start() - return True + try: + self.__nb_threads += 1 + thread.start() + self._threads.append(thread) + return True + except (RuntimeError, OSError): + self.__nb_threads -= 1 + return False def stop(self): """ @@ -321,8 +334,9 @@ def enqueue(self, method, *args, **kwargs): # Add the task to the queue self._queue.put((method, args, kwargs, future), True, self._timeout) - - if self.__nb_active_threads == self.__nb_threads: + self.__nb_pending_task += 1 + + if self.__nb_pending_task > self.__nb_threads: # All threads are taken: start a new one self.__start_thread() @@ -370,9 +384,7 @@ def __run(self): """ The main loop """ - with self.__lock: - self.__nb_threads += 1 - + while not self._done_event.is_set(): try: # Wait for an action (blocking) @@ -389,7 +401,6 @@ def __run(self): else: with self.__lock: self.__nb_active_threads += 1 - # Extract elements method, args, kwargs, future = task try: @@ -403,14 +414,18 @@ def __run(self): self._queue.task_done() # Thread is not active anymore - self.__nb_active_threads -= 1 + with self.__lock: + self.__nb_pending_task -= 1 + self.__nb_active_threads -= 1 # Clean up thread if necessary with self.__lock: - if self.__nb_threads > self._min_threads: - # No more work for this thread, and we're above the - # minimum number of threads: stop this one + if self.__nb_threads > self._min_threads and self.__nb_threads - self.__nb_active_threads > self._queue.qsize(): + # No more work for this thread + # if there are more non active_thread than task + # and we're above the minimum number of threads: stop this one self.__nb_threads -= 1 + #print("fin lock remaining threads {}".format(self.__nb_threads)) return with self.__lock: From eaf849c43cf10bd7dd8444b18aafb8da6db347b9 Mon Sep 17 00:00:00 2001 From: Thomas Calmant Date: Fri, 19 Aug 2016 13:25:19 +0200 Subject: [PATCH 2/2] Code review of pelix.threadpool * Corrected format according to PEP-8 * Corrected typo (self.max_threads => self._max_threads) * Remove "no bytecode" flag --- pelix/threadpool.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pelix/threadpool.py b/pelix/threadpool.py index 7e1fc0f7..145b9179 100644 --- a/pelix/threadpool.py +++ b/pelix/threadpool.py @@ -24,8 +24,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -from sys import dont_write_bytecode -dont_write_bytecode # Standard library import logging @@ -214,7 +212,8 @@ def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60, # Thread count self._thread_id = 0 - # Current number of threads, active and alive, and number of task waiting + # Current number of threads, active and alive, + # and number of task waiting self.__nb_threads = 0 self.__nb_active_threads = 0 self.__nb_pending_task = 0 @@ -234,20 +233,19 @@ def start(self): nb_pending_tasks = self._queue.qsize() if nb_pending_tasks > self._max_threads: nb_threads = self._max_threads - nb_pending_tasks = self.max_threads + nb_pending_tasks = self._max_threads elif nb_pending_tasks < self._min_threads: nb_threads = self._min_threads else: nb_threads = nb_pending_tasks # Create the threads - for _ in range(0,nb_pending_tasks): + for _ in range(nb_pending_tasks): self.__nb_pending_task += 1 self.__start_thread() - for _ in range(0,nb_threads-nb_pending_tasks): + for _ in range(nb_threads-nb_pending_tasks): self.__start_thread() - - + def __start_thread(self): """ Starts a new thread, if possible @@ -335,7 +333,7 @@ def enqueue(self, method, *args, **kwargs): self._queue.put((method, args, kwargs, future), True, self._timeout) self.__nb_pending_task += 1 - + if self.__nb_pending_task > self.__nb_threads: # All threads are taken: start a new one self.__start_thread() @@ -384,7 +382,6 @@ def __run(self): """ The main loop """ - while not self._done_event.is_set(): try: # Wait for an action (blocking) @@ -420,12 +417,14 @@ def __run(self): # Clean up thread if necessary with self.__lock: - if self.__nb_threads > self._min_threads and self.__nb_threads - self.__nb_active_threads > self._queue.qsize(): + extra_threads = self.__nb_threads - self.__nb_active_threads + if self.__nb_threads > self._min_threads \ + and extra_threads > self._queue.qsize(): # No more work for this thread # if there are more non active_thread than task - # and we're above the minimum number of threads: stop this one + # and we're above the minimum number of threads: + # stop this one self.__nb_threads -= 1 - #print("fin lock remaining threads {}".format(self.__nb_threads)) return with self.__lock: