Skip to content

Commit

Permalink
Merge branch 'add_remote_file_uploader' into use_remote_uploader_v2_f…
Browse files Browse the repository at this point in the history
…or_checkpointing
  • Loading branch information
bigning committed May 30, 2024
2 parents 78e8c00 + 97588c8 commit deaec50
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions composer/utils/remote_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import time
import uuid
from concurrent.futures import Future, ProcessPoolExecutor
from typing import Any, List, Tuple
from typing import List

from composer.utils.dist import get_local_rank
from composer.utils.file_helpers import (
Expand All @@ -29,8 +29,8 @@

def _upload_worker(
remote_folder: str,
file_queue: multiprocessing.Queue,
is_closed_event: multiprocessing.Event,
file_queue: queue.Queue,
is_closed_event: multiprocessing.Event, # pyright: ignore[reportGeneralTypeIssues]
num_attempts: int,
parent_process_id: int,
) -> None:
Expand Down Expand Up @@ -101,7 +101,8 @@ def __init__(
num_concurrent_uploads: int = 1,
num_attempts: int = 3,
):
assert num_concurrent_uploads >= 1, f'num_concurrent_uploads must be >= 1, got {num_concurrent_uploads}'
if num_concurrent_uploads < 1:
raise ValueError(f'num_concurrent_uploads must be >=1, but got {num_concurrent_uploads}')

# A folder to use for staging uploads
self._tempdir = tempfile.TemporaryDirectory()
Expand All @@ -111,7 +112,7 @@ def __init__(

mp_ctx = multiprocessing.get_context('spawn')
manager = mp_ctx.Manager()
self.file_queue: multiprocessing.Queue[Any] = manager.Queue()
self.file_queue: queue.Queue = manager.Queue()
self.executor = ProcessPoolExecutor(
max_workers=num_concurrent_uploads,
mp_context=mp_ctx,
Expand All @@ -137,7 +138,6 @@ def upload_file_async(
overwrite: bool,
):
"""Async call to submit a job for uploading."""

# Copy file to staging folder
copied_path = os.path.join(self._upload_staging_folder, str(uuid.uuid4()))
os.makedirs(self._upload_staging_folder, exist_ok=True)
Expand Down

0 comments on commit deaec50

Please sign in to comment.