Skip to content

Commit

Permalink
Merge branch 'Paltoquet-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
tcalmant committed Aug 19, 2016
2 parents f7a70b0 + eaf849c commit 2dae44b
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions pelix/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,11 @@ def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60,
# Thread count
self._thread_id = 0

# Current number of threads, active and alive
# Current number of threads, active and alive,
# and number of task waiting
self.__nb_threads = 0
self.__nb_active_threads = 0
self.__nb_pending_task = 0

def start(self):
"""
Expand All @@ -231,13 +233,17 @@ def start(self):
nb_pending_tasks = self._queue.qsize()
if nb_pending_tasks > self._max_threads:
nb_threads = self._max_threads
nb_pending_tasks = self._max_threads
elif nb_pending_tasks < self._min_threads:
nb_threads = self._min_threads
else:
nb_threads = nb_pending_tasks

# Create the threads
for _ in range(nb_threads):
for _ in range(nb_pending_tasks):
self.__nb_pending_task += 1
self.__start_thread()
for _ in range(nb_threads-nb_pending_tasks):
self.__start_thread()

def __start_thread(self):
Expand All @@ -259,9 +265,14 @@ def __start_thread(self):

thread = threading.Thread(target=self.__run, name=name)
thread.daemon = True
self._threads.append(thread)
thread.start()
return True
try:
self.__nb_threads += 1
thread.start()
self._threads.append(thread)
return True
except (RuntimeError, OSError):
self.__nb_threads -= 1
return False

def stop(self):
"""
Expand Down Expand Up @@ -321,8 +332,9 @@ def enqueue(self, method, *args, **kwargs):
# Add the task to the queue
self._queue.put((method, args, kwargs, future), True,
self._timeout)
self.__nb_pending_task += 1

if self.__nb_active_threads == self.__nb_threads:
if self.__nb_pending_task > self.__nb_threads:
# All threads are taken: start a new one
self.__start_thread()

Expand Down Expand Up @@ -370,9 +382,6 @@ def __run(self):
"""
The main loop
"""
with self.__lock:
self.__nb_threads += 1

while not self._done_event.is_set():
try:
# Wait for an action (blocking)
Expand All @@ -389,7 +398,6 @@ def __run(self):
else:
with self.__lock:
self.__nb_active_threads += 1

# Extract elements
method, args, kwargs, future = task
try:
Expand All @@ -403,13 +411,19 @@ def __run(self):
self._queue.task_done()

# Thread is not active anymore
self.__nb_active_threads -= 1
with self.__lock:
self.__nb_pending_task -= 1
self.__nb_active_threads -= 1

# Clean up thread if necessary
with self.__lock:
if self.__nb_threads > self._min_threads:
# No more work for this thread, and we're above the
# minimum number of threads: stop this one
extra_threads = self.__nb_threads - self.__nb_active_threads
if self.__nb_threads > self._min_threads \
and extra_threads > self._queue.qsize():
# No more work for this thread
# if there are more non active_thread than task
# and we're above the minimum number of threads:
# stop this one
self.__nb_threads -= 1
return

Expand Down

0 comments on commit 2dae44b

Please sign in to comment.