Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions minio/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@

"""

import sys
from threading import Thread
from .compat import queue


class Worker(Thread):
""" Thread executing tasks from a given tasks queue """

def __init__(self, tasks_queue, results_queue, exceptions_queue):
Thread.__init__(self)
self.tasks_queue = tasks_queue
Expand All @@ -42,7 +43,7 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue):

def run(self):
fast_quit = False
while True:
while not self.tasks_queue.empty():
func, args, kargs = self.tasks_queue.get()
if not fast_quit:
try:
Expand All @@ -57,12 +58,12 @@ def run(self):

class ThreadPool:
""" Pool of threads consuming tasks from a queue """

def __init__(self, num_threads):
self.results_queue = queue()
self.exceptions_queue = queue()
self.tasks_queue = queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks_queue, self.results_queue, self.exceptions_queue)
self.tasks_queue = queue()
self.num_threads = num_threads

def add_task(self, func, *args, **kargs):
""" Add a task to the queue """
Expand All @@ -72,6 +73,10 @@ def parallel_run(self, func, args_list):
""" Add a list of tasks to the queue """
for args in args_list:
self.add_task(func, args)

for _ in range(self.num_threads):
Worker(self.tasks_queue, self.results_queue, self.exceptions_queue)

# Wait for completion of all the tasks in the queue
self.tasks_queue.join()
# Check if one of the thread raised an exception, if yes
Expand Down