From 4b9572aeb1e1b1d7ee626bfe419b3651919e9aa6 Mon Sep 17 00:00:00 2001 From: "m.ghaeini" Date: Mon, 19 Feb 2018 20:29:56 +0330 Subject: [PATCH 1/4] Change ThreadPool and Worker class in thread_pool.py Fixes #595 --- minio/thread_pool.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 52ee5668a..44daf4494 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -37,12 +37,13 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): self.tasks_queue = tasks_queue self.results_queue = results_queue self.exceptions_queue = exceptions_queue - self.daemon = True + self.daemon = False self.start() def run(self): fast_quit = False - while True: + + while not self.tasks_queue.empty(): func, args, kargs = self.tasks_queue.get() if not fast_quit: try: @@ -61,8 +62,7 @@ def __init__(self, num_threads): self.results_queue = queue() self.exceptions_queue = queue() self.tasks_queue = queue(num_threads) - for _ in range(num_threads): - Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) + self.num_threads = num_threads def add_task(self, func, *args, **kargs): """ Add a task to the queue """ @@ -72,6 +72,10 @@ def parallel_run(self, func, args_list): """ Add a list of tasks to the queue """ for args in args_list: self.add_task(func, args) + + for _ in range(self.num_threads): + Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) + # Wait for completion of all the tasks in the queue self.tasks_queue.join() # Check if one of the thread raised an exception, if yes @@ -79,7 +83,9 @@ def parallel_run(self, func, args_list): if not self.exceptions_queue.empty(): raise self.exceptions_queue.get() + def result(self): """ Return the result of all called tasks """ return self.results_queue + From 8221328dd59338e649e49a73f9f2cd092a6e7b4e Mon Sep 17 00:00:00 2001 From: "m.ghaeini" Date: Mon, 26 Feb 2018 17:47:30 +0330 Subject: [PATCH 2/4] Fix ThreadPool and Worker class in thread_pool.py Change Worker daemon field to True. remove queue self.task_queue size restriction in ThreadPool class. Fixes #595 --- minio/thread_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 44daf4494..339fa805a 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -37,7 +37,7 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): self.tasks_queue = tasks_queue self.results_queue = results_queue self.exceptions_queue = exceptions_queue - self.daemon = False + self.daemon = True self.start() def run(self): @@ -61,7 +61,7 @@ class ThreadPool: def __init__(self, num_threads): self.results_queue = queue() self.exceptions_queue = queue() - self.tasks_queue = queue(num_threads) + self.tasks_queue = queue() self.num_threads = num_threads def add_task(self, func, *args, **kargs): From 6799120f3953639b5cc51ef3c728a988a90650c9 Mon Sep 17 00:00:00 2001 From: "m.ghaeini" Date: Tue, 27 Feb 2018 11:53:57 +0330 Subject: [PATCH 3/4] Clean up 2 extra space line in thread_pool.py Fixes #595 --- minio/thread_pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 339fa805a..43501e2c7 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -83,7 +83,6 @@ def parallel_run(self, func, args_list): if not self.exceptions_queue.empty(): raise self.exceptions_queue.get() - def result(self): """ Return the result of all called tasks """ return self.results_queue From 6e7ee8904f82a7bfc44ffb0695714ea360a9f994 Mon Sep 17 00:00:00 2001 From: "m.ghaeini" Date: Sat, 3 Mar 2018 15:17:42 +0330 Subject: [PATCH 4/4] Clean up 2 extra space line in thread_pool.py Fixes #595 --- minio/thread_pool.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 43501e2c7..45e18ff41 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -26,12 +26,13 @@ """ -import sys from threading import Thread from .compat import queue + class Worker(Thread): """ Thread executing tasks from a given tasks queue """ + def __init__(self, tasks_queue, results_queue, exceptions_queue): Thread.__init__(self) self.tasks_queue = tasks_queue @@ -42,7 +43,6 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): def run(self): fast_quit = False - while not self.tasks_queue.empty(): func, args, kargs = self.tasks_queue.get() if not fast_quit: @@ -58,6 +58,7 @@ def run(self): class ThreadPool: """ Pool of threads consuming tasks from a queue """ + def __init__(self, num_threads): self.results_queue = queue() self.exceptions_queue = queue() @@ -72,7 +73,7 @@ def parallel_run(self, func, args_list): """ Add a list of tasks to the queue """ for args in args_list: self.add_task(func, args) - + for _ in range(self.num_threads): Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) @@ -87,4 +88,3 @@ def result(self): """ Return the result of all called tasks """ return self.results_queue -