diff --git a/src/together/constants.py b/src/together/constants.py index b92a585..5e0c912 100644 --- a/src/together/constants.py +++ b/src/together/constants.py @@ -20,13 +20,13 @@ # Multipart upload constants MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement) -TARGET_PART_SIZE_MB = 100 # Target part size for optimal performance -MAX_MULTIPART_PARTS = 250 # Maximum parts per upload (S3 limit) +TARGET_PART_SIZE_MB = 250 # Target part size +MAX_MULTIPART_PARTS = 250 # Maximum parts per upload MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload # maximum number of GB sized files we support finetuning for -MAX_FILE_SIZE_GB = 25.0 +MAX_FILE_SIZE_GB = 50.1 # Messages diff --git a/src/together/filemanager.py b/src/together/filemanager.py index f139e9b..d3c16b8 100644 --- a/src/together/filemanager.py +++ b/src/together/filemanager.py @@ -6,10 +6,10 @@ import stat import tempfile import uuid -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from functools import partial from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any, BinaryIO, Dict, List, Tuple import requests from filelock import FileLock @@ -212,6 +212,7 @@ def download( ), remaining_retries=MAX_RETRIES, stream=True, + request_timeout=3600, ) try: @@ -512,6 +513,18 @@ def _initiate_upload( return response.data + def _submit_part( + self, + executor: ThreadPoolExecutor, + f: BinaryIO, + part_info: Dict[str, Any], + part_size: int, + ) -> Future[str]: + """Submit a single part for upload and return the future""" + f.seek((part_info["PartNumber"] - 1) * part_size) + part_data = f.read(part_size) + return executor.submit(self._upload_single_part, part_info, part_data) + def _upload_parts_concurrent( self, file: Path, upload_info: Dict[str, Any], part_size: int ) -> List[Dict[str, Any]]: @@ -522,29 +535,39 @@ def _upload_parts_concurrent( with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor: with tqdm(total=len(parts), desc="Uploading parts", unit="part") as pbar: - future_to_part = {} - with open(file, "rb") as f: - for part_info in parts: - f.seek((part_info["PartNumber"] - 1) * part_size) - part_data = f.read(part_size) + future_to_part = {} + part_index = 0 - future = executor.submit( - self._upload_single_part, part_info, part_data - ) + # Submit initial batch limited by max_concurrent_parts + for _ in range(min(self.max_concurrent_parts, len(parts))): + part_info = parts[part_index] + future = self._submit_part(executor, f, part_info, part_size) future_to_part[future] = part_info["PartNumber"] - - # Collect results - for future in as_completed(future_to_part): - part_number = future_to_part[future] - try: - etag = future.result() - completed_parts.append( - {"part_number": part_number, "etag": etag} - ) - pbar.update(1) - except Exception as e: - raise Exception(f"Failed to upload part {part_number}: {e}") + part_index += 1 + + # Process completions and submit new parts (sliding window) + while future_to_part: + done_future = next(as_completed(future_to_part)) + part_number = future_to_part.pop(done_future) + + try: + etag = done_future.result() + completed_parts.append( + {"part_number": part_number, "etag": etag} + ) + pbar.update(1) + except Exception as e: + raise Exception(f"Failed to upload part {part_number}: {e}") + + # Submit next part if available + if part_index < len(parts): + part_info = parts[part_index] + future = self._submit_part( + executor, f, part_info, part_size + ) + future_to_part[future] = part_info["PartNumber"] + part_index += 1 completed_parts.sort(key=lambda x: x["part_number"]) return completed_parts diff --git a/src/together/utils/files.py b/src/together/utils/files.py index 8f18bc6..3734753 100644 --- a/src/together/utils/files.py +++ b/src/together/utils/files.py @@ -7,6 +7,7 @@ from traceback import format_exc from typing import Any, Dict, List +from tqdm import tqdm from together.constants import ( MAX_FILE_SIZE_GB, @@ -363,14 +364,19 @@ def _check_utf8(file: Path) -> Dict[str, Any]: Dict[str, Any]: A dictionary with the results of the check. """ report_dict: Dict[str, Any] = {} + try: + # Dry-run UTF-8 decode: iterate through file to validate encoding with file.open(encoding="utf-8") as f: - f.read() + for _ in f: + pass + report_dict["utf8"] = True except UnicodeDecodeError as e: report_dict["utf8"] = False report_dict["message"] = f"File is not UTF-8 encoded. Error raised: {e}." report_dict["is_check_passed"] = False + return report_dict @@ -470,7 +476,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]: with file.open() as f: idx = -1 try: - for idx, line in enumerate(f): + for idx, line in tqdm(enumerate(f), desc="Validating file", unit=" lines"): json_line = json.loads(line) if not isinstance(json_line, dict): diff --git a/tests/unit/test_multipart_upload_manager.py b/tests/unit/test_multipart_upload_manager.py index 7c10dfd..e2dea8e 100644 --- a/tests/unit/test_multipart_upload_manager.py +++ b/tests/unit/test_multipart_upload_manager.py @@ -74,8 +74,8 @@ def test_calculate_parts_medium_file(self, manager): file_size = 500 * 1024 * 1024 # 500MB part_size, num_parts = manager._calculate_parts(file_size) - expected_parts = 5 # 500MB / 100MB = 5 parts - expected_part_size = 100 * 1024 * 1024 # 100MB + expected_parts = 2 # 500MB / 250MB = 2 parts + expected_part_size = 250 * 1024 * 1024 # 250MB assert num_parts == expected_parts assert part_size == expected_part_size @@ -85,10 +85,11 @@ def test_calculate_parts_large_file(self, manager): file_size = 50 * 1024 * 1024 * 1024 # 50GB part_size, num_parts = manager._calculate_parts(file_size) - # Should use maximum parts and scale part size - assert num_parts == MAX_MULTIPART_PARTS # 250 - expected_part_size = file_size // MAX_MULTIPART_PARTS - assert part_size >= expected_part_size + # With 250MB target part size, 50GB should use ~205 parts + expected_parts = 205 # 50GB / 250MB ≈ 205 parts + assert num_parts == expected_parts + # Part size should be close to target (within rounding) + assert part_size >= 249 * 1024 * 1024 # At least 249MB (allowing for rounding) def test_calculate_parts_respects_minimum_part_size(self, manager): """Test that minimum part size is respected""" @@ -275,7 +276,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager): # Mock as_completed with patch( "together.filemanager.as_completed", - return_value=[mock_future1, mock_future2], + side_effect=[iter([mock_future1]), iter([mock_future2])], ): upload_info = { "parts": [ @@ -303,7 +304,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager): def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager): """Test that files exceeding size limit raise FileTypeError with clear message""" # Setup - file size over limit - file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 26GB + file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 51.1GB mock_stat.return_value.st_size = file_size # Should raise FileTypeError with descriptive message @@ -311,7 +312,7 @@ def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager): manager.upload("test-url", Path("test.jsonl"), FilePurpose.FineTune) error_message = str(exc_info.value) - assert "26.0GB exceeds maximum supported size of 25.0GB" in error_message + assert "51.1GB exceeds maximum supported size of 50.1GB" in error_message @patch.object(MultipartUploadManager, "_initiate_upload") @patch.object(MultipartUploadManager, "_upload_parts_concurrent")