Skip to content

Commit

Permalink
Use separate thread pool executors when uploading chunks to avoid dea…
Browse files Browse the repository at this point in the history
…dlock (#8349)

* Use separate thread pool executors

Signed-off-by: harupy <hkawamura0130@gmail.com>

* Indent

Signed-off-by: harupy <hkawamura0130@gmail.com>

* Address comments

Signed-off-by: harupy <hkawamura0130@gmail.com>

* Remove with

Signed-off-by: harupy <hkawamura0130@gmail.com>

* Fix

Signed-off-by: harupy <hkawamura0130@gmail.com>

* Fix

Signed-off-by: harupy <hkawamura0130@gmail.com>

---------

Signed-off-by: harupy <hkawamura0130@gmail.com>
  • Loading branch information
harupy committed Apr 28, 2023
1 parent 2b50b88 commit cb5cc36
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
5 changes: 4 additions & 1 deletion mlflow/store/artifact/artifact_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def __init__(self, artifact_uri):
# Limit the number of threads used for artifact uploads/downloads. Use at most
# constants._NUM_MAX_THREADS threads or 2 * the number of CPU cores available on the
# system (whichever is smaller)
self.thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)
self.thread_pool = self._create_thread_pool()

def _create_thread_pool(self):
return ThreadPoolExecutor(max_workers=self.max_workers)

@abstractmethod
def log_artifact(self, local_file, artifact_path=None):
Expand Down
11 changes: 8 additions & 3 deletions mlflow/store/artifact/databricks_artifact_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ def __init__(self, artifact_uri):
self.run_relative_artifact_repo_root_path = (
"" if run_artifact_root_path == artifact_repo_root_path else run_relative_root_path
)
# Use an isolated thread pool executor for chunk uploads to avoid a deadlock
# caused by waiting for a chunk-upload task within a file-upload task.
# See https://superfastpython.com/threadpoolexecutor-deadlock/#Deadlock_1_Submit_and_Wait_for_a_Task_Within_a_Task
# for more details
self.chunk_upload_thread_pool = self._create_thread_pool()

@staticmethod
def _extract_run_id(artifact_uri):
Expand Down Expand Up @@ -263,7 +268,7 @@ def _azure_upload_file(self, credentials, local_file, artifact_path):
num_chunks = _compute_num_chunks(local_file, _MULTIPART_UPLOAD_CHUNK_SIZE)
for index in range(num_chunks):
start_byte = index * _MULTIPART_UPLOAD_CHUNK_SIZE
future = self.thread_pool.submit(
future = self.chunk_upload_thread_pool.submit(
self._azure_upload_chunk,
credentials=credentials,
headers=headers,
Expand Down Expand Up @@ -341,7 +346,7 @@ def _azure_adls_gen2_upload_file(self, credentials, local_file, artifact_path):
use_single_part_upload = num_chunks == 1
for index in range(num_chunks):
start_byte = index * _MULTIPART_UPLOAD_CHUNK_SIZE
future = self.thread_pool.submit(
future = self.chunk_upload_thread_pool.submit(
self._retryable_adls_function,
func=patch_adls_file_upload,
artifact_path=artifact_path,
Expand Down Expand Up @@ -518,7 +523,7 @@ def _upload_parts(self, local_file, create_mpu_resp):
for index, cred_info in enumerate(create_mpu_resp.upload_credential_infos):
part_number = index + 1
start_byte = index * _MULTIPART_UPLOAD_CHUNK_SIZE
future = self.thread_pool.submit(
future = self.chunk_upload_thread_pool.submit(
self._upload_part_retry,
cred_info=cred_info,
upload_id=create_mpu_resp.upload_id,
Expand Down

0 comments on commit cb5cc36

Please sign in to comment.