diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 52ee5668a..45e18ff41 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -26,12 +26,13 @@ """ -import sys from threading import Thread from .compat import queue + class Worker(Thread): """ Thread executing tasks from a given tasks queue """ + def __init__(self, tasks_queue, results_queue, exceptions_queue): Thread.__init__(self) self.tasks_queue = tasks_queue @@ -42,7 +43,7 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): def run(self): fast_quit = False - while True: + while not self.tasks_queue.empty(): func, args, kargs = self.tasks_queue.get() if not fast_quit: try: @@ -57,12 +58,12 @@ def run(self): class ThreadPool: """ Pool of threads consuming tasks from a queue """ + def __init__(self, num_threads): self.results_queue = queue() self.exceptions_queue = queue() - self.tasks_queue = queue(num_threads) - for _ in range(num_threads): - Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) + self.tasks_queue = queue() + self.num_threads = num_threads def add_task(self, func, *args, **kargs): """ Add a task to the queue """ @@ -72,6 +73,10 @@ def parallel_run(self, func, args_list): """ Add a list of tasks to the queue """ for args in args_list: self.add_task(func, args) + + for _ in range(self.num_threads): + Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) + # Wait for completion of all the tasks in the queue self.tasks_queue.join() # Check if one of the thread raised an exception, if yes