Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less memory allocation with put_object() #930

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions minio/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

"""

from threading import Thread
from threading import Thread, BoundedSemaphore

from .compat import PYTHON2

Expand Down Expand Up @@ -57,13 +57,15 @@ def run(self):
# No exception detected in any thread,
# continue the execution.
if self.exceptions_queue.empty():
# Execute the task
func, args, kargs, cleanup_func = task
try:
# Execute the task
func, args, kargs = task
result = func(*args, **kargs)
self.results_queue.put(result)
except Exception as ex: # pylint: disable=broad-except
self.exceptions_queue.put(ex)
finally:
cleanup_func()
# Mark this task as done, whether an exception happened or not
self.tasks_queue.task_done()

Expand All @@ -74,12 +76,20 @@ class ThreadPool:
def __init__(self, num_threads):
self.results_queue = Queue()
self.exceptions_queue = Queue()
self.tasks_queue = Queue(num_threads)
self.tasks_queue = Queue()
self.sem = BoundedSemaphore(num_threads)
self.num_threads = num_threads

def add_task(self, func, *args, **kargs):
""" Add a task to the queue """
self.tasks_queue.put((func, args, kargs))
"""
Add a task to the queue. Calling this function can block
until workers have a room for processing new tasks. Blocking
the caller also prevents the latter from allocating a lot of
memory while workers are still busy running their assigned tasks.
"""
self.sem.acquire()
cleanup_func = self.sem.release
self.tasks_queue.put((func, args, kargs, cleanup_func))

def start_parallel(self):
""" Prepare threads to run tasks"""
Expand Down