From 3a34e362786bc3428033f5120e815ca8385b94fe Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Thu, 6 Nov 2025 17:46:11 -0800 Subject: [PATCH 1/7] feat: Increase file size limit from 25GB to 50.1GB --- src/together/constants.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/together/constants.py b/src/together/constants.py index b92a585..bd00fc3 100644 --- a/src/together/constants.py +++ b/src/together/constants.py @@ -20,13 +20,13 @@ # Multipart upload constants MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement) -TARGET_PART_SIZE_MB = 100 # Target part size for optimal performance -MAX_MULTIPART_PARTS = 250 # Maximum parts per upload (S3 limit) +TARGET_PART_SIZE_MB = 250 # Target part size for large files +MAX_MULTIPART_PARTS = 250 # Maximum parts per upload MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload # maximum number of GB sized files we support finetuning for -MAX_FILE_SIZE_GB = 25.0 +MAX_FILE_SIZE_GB = 50.1 # Messages From 3f2c68eaf24318154882d6e7ee87793d1bee4f54 Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Thu, 6 Nov 2025 18:12:25 -0800 Subject: [PATCH 2/7] feat: Increase file size limit to 50.1GB - tests passing --- tests/unit/test_multipart_upload_manager.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_multipart_upload_manager.py b/tests/unit/test_multipart_upload_manager.py index 7c10dfd..d036682 100644 --- a/tests/unit/test_multipart_upload_manager.py +++ b/tests/unit/test_multipart_upload_manager.py @@ -74,8 +74,8 @@ def test_calculate_parts_medium_file(self, manager): file_size = 500 * 1024 * 1024 # 500MB part_size, num_parts = manager._calculate_parts(file_size) - expected_parts = 5 # 500MB / 100MB = 5 parts - expected_part_size = 100 * 1024 * 1024 # 100MB + expected_parts = 2 # 500MB / 250MB = 2 parts + expected_part_size = 250 * 1024 * 1024 # 250MB assert num_parts == expected_parts assert part_size == expected_part_size @@ -85,10 +85,11 @@ def test_calculate_parts_large_file(self, manager): file_size = 50 * 1024 * 1024 * 1024 # 50GB part_size, num_parts = manager._calculate_parts(file_size) - # Should use maximum parts and scale part size - assert num_parts == MAX_MULTIPART_PARTS # 250 - expected_part_size = file_size // MAX_MULTIPART_PARTS - assert part_size >= expected_part_size + # With 250MB target part size, 50GB should use ~205 parts + expected_parts = 205 # 50GB / 250MB ≈ 205 parts + assert num_parts == expected_parts + # Part size should be close to target (within rounding) + assert part_size >= 249 * 1024 * 1024 # At least 249MB (allowing for rounding) def test_calculate_parts_respects_minimum_part_size(self, manager): """Test that minimum part size is respected""" @@ -303,7 +304,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager): def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager): """Test that files exceeding size limit raise FileTypeError with clear message""" # Setup - file size over limit - file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 26GB + file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 51.1GB mock_stat.return_value.st_size = file_size # Should raise FileTypeError with descriptive message @@ -311,7 +312,7 @@ def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager): manager.upload("test-url", Path("test.jsonl"), FilePurpose.FineTune) error_message = str(exc_info.value) - assert "26.0GB exceeds maximum supported size of 25.0GB" in error_message + assert "51.1GB exceeds maximum supported size of 50.1GB" in error_message @patch.object(MultipartUploadManager, "_initiate_upload") @patch.object(MultipartUploadManager, "_upload_parts_concurrent") From c1b27a568fac894716c99752deb0e02e5521664f Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Fri, 7 Nov 2025 15:35:30 -0800 Subject: [PATCH 3/7] fix: Optimize memory and timeout handling for 50GB+ files --- src/together/filemanager.py | 49 +++++++++++++++++++++++++------------ src/together/utils/files.py | 11 +++++++-- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/together/filemanager.py b/src/together/filemanager.py index f139e9b..c3cb8b2 100644 --- a/src/together/filemanager.py +++ b/src/together/filemanager.py @@ -212,6 +212,7 @@ def download( ), remaining_retries=MAX_RETRIES, stream=True, + request_timeout=3600, ) try: @@ -522,10 +523,13 @@ def _upload_parts_concurrent( with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor: with tqdm(total=len(parts), desc="Uploading parts", unit="part") as pbar: - future_to_part = {} - with open(file, "rb") as f: - for part_info in parts: + future_to_part = {} + part_index = 0 + + # Submit initial batch limited by max_concurrent_parts + for i in range(min(self.max_concurrent_parts, len(parts))): + part_info = parts[part_index] f.seek((part_info["PartNumber"] - 1) * part_size) part_data = f.read(part_size) @@ -533,18 +537,33 @@ def _upload_parts_concurrent( self._upload_single_part, part_info, part_data ) future_to_part[future] = part_info["PartNumber"] - - # Collect results - for future in as_completed(future_to_part): - part_number = future_to_part[future] - try: - etag = future.result() - completed_parts.append( - {"part_number": part_number, "etag": etag} - ) - pbar.update(1) - except Exception as e: - raise Exception(f"Failed to upload part {part_number}: {e}") + part_index += 1 + + # Process completions and submit new parts (sliding window) + while future_to_part: + done_future = next(as_completed(future_to_part)) + part_number = future_to_part.pop(done_future) + + try: + etag = done_future.result() + completed_parts.append( + {"part_number": part_number, "etag": etag} + ) + pbar.update(1) + except Exception as e: + raise Exception(f"Failed to upload part {part_number}: {e}") + + # Submit next part if available + if part_index < len(parts): + part_info = parts[part_index] + f.seek((part_info["PartNumber"] - 1) * part_size) + part_data = f.read(part_size) + + future = executor.submit( + self._upload_single_part, part_info, part_data + ) + future_to_part[future] = part_info["PartNumber"] + part_index += 1 completed_parts.sort(key=lambda x: x["part_number"]) return completed_parts diff --git a/src/together/utils/files.py b/src/together/utils/files.py index 8f18bc6..669cc1d 100644 --- a/src/together/utils/files.py +++ b/src/together/utils/files.py @@ -7,6 +7,7 @@ from traceback import format_exc from typing import Any, Dict, List +from tqdm import tqdm from together.constants import ( MAX_FILE_SIZE_GB, @@ -363,14 +364,20 @@ def _check_utf8(file: Path) -> Dict[str, Any]: Dict[str, Any]: A dictionary with the results of the check. """ report_dict: Dict[str, Any] = {} + try: + # Stream file in chunks to avoid loading entire file into memory + chunk_size = 8192 # 8KB chunks with file.open(encoding="utf-8") as f: - f.read() + for chunk in iter(lambda: f.read(chunk_size), ""): + pass # UTF-8 decoding happens automatically during read + report_dict["utf8"] = True except UnicodeDecodeError as e: report_dict["utf8"] = False report_dict["message"] = f"File is not UTF-8 encoded. Error raised: {e}." report_dict["is_check_passed"] = False + return report_dict @@ -470,7 +477,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]: with file.open() as f: idx = -1 try: - for idx, line in enumerate(f): + for idx, line in tqdm(enumerate(f), desc="Validating file", unit=" lines"): json_line = json.loads(line) if not isinstance(json_line, dict): From 774c20eaa1c45a79900ce1d0def1ce39fc336b97 Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Fri, 7 Nov 2025 17:23:06 -0800 Subject: [PATCH 4/7] fixed test --- tests/unit/test_multipart_upload_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_multipart_upload_manager.py b/tests/unit/test_multipart_upload_manager.py index d036682..e2dea8e 100644 --- a/tests/unit/test_multipart_upload_manager.py +++ b/tests/unit/test_multipart_upload_manager.py @@ -276,7 +276,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager): # Mock as_completed with patch( "together.filemanager.as_completed", - return_value=[mock_future1, mock_future2], + side_effect=[iter([mock_future1]), iter([mock_future2])], ): upload_info = { "parts": [ From f953eed64ef9489ede65d2083c8f7185f9c60700 Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Fri, 7 Nov 2025 17:32:27 -0800 Subject: [PATCH 5/7] removed unnecessary comments --- src/together/constants.py | 2 +- src/together/utils/files.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/together/constants.py b/src/together/constants.py index bd00fc3..5e0c912 100644 --- a/src/together/constants.py +++ b/src/together/constants.py @@ -20,7 +20,7 @@ # Multipart upload constants MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement) -TARGET_PART_SIZE_MB = 250 # Target part size for large files +TARGET_PART_SIZE_MB = 250 # Target part size MAX_MULTIPART_PARTS = 250 # Maximum parts per upload MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload diff --git a/src/together/utils/files.py b/src/together/utils/files.py index 669cc1d..9cba190 100644 --- a/src/together/utils/files.py +++ b/src/together/utils/files.py @@ -366,11 +366,10 @@ def _check_utf8(file: Path) -> Dict[str, Any]: report_dict: Dict[str, Any] = {} try: - # Stream file in chunks to avoid loading entire file into memory - chunk_size = 8192 # 8KB chunks + chunk_size = 8192 with file.open(encoding="utf-8") as f: for chunk in iter(lambda: f.read(chunk_size), ""): - pass # UTF-8 decoding happens automatically during read + pass report_dict["utf8"] = True except UnicodeDecodeError as e: From f6d6ca0900554757286bce217319afd1bf1df3b5 Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Wed, 19 Nov 2025 20:56:35 -0800 Subject: [PATCH 6/7] refactor: Address code review comments --- src/together/filemanager.py | 30 +++++++++++++++++------------- src/together/utils/files.py | 4 ++-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/together/filemanager.py b/src/together/filemanager.py index c3cb8b2..6bab6c2 100644 --- a/src/together/filemanager.py +++ b/src/together/filemanager.py @@ -6,10 +6,10 @@ import stat import tempfile import uuid -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from functools import partial from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any, BinaryIO, Dict, List, Tuple import requests from filelock import FileLock @@ -513,6 +513,18 @@ def _initiate_upload( return response.data + def _submit_part( + self, + executor: ThreadPoolExecutor, + f: BinaryIO, + part_info: Dict[str, Any], + part_size: int, + ) -> Future[str]: + """Submit a single part for upload and return the future""" + f.seek((part_info["PartNumber"] - 1) * part_size) + part_data = f.read(part_size) + return executor.submit(self._upload_single_part, part_info, part_data) + def _upload_parts_concurrent( self, file: Path, upload_info: Dict[str, Any], part_size: int ) -> List[Dict[str, Any]]: @@ -530,12 +542,7 @@ def _upload_parts_concurrent( # Submit initial batch limited by max_concurrent_parts for i in range(min(self.max_concurrent_parts, len(parts))): part_info = parts[part_index] - f.seek((part_info["PartNumber"] - 1) * part_size) - part_data = f.read(part_size) - - future = executor.submit( - self._upload_single_part, part_info, part_data - ) + future = self._submit_part(executor, f, part_info, part_size) future_to_part[future] = part_info["PartNumber"] part_index += 1 @@ -556,11 +563,8 @@ def _upload_parts_concurrent( # Submit next part if available if part_index < len(parts): part_info = parts[part_index] - f.seek((part_info["PartNumber"] - 1) * part_size) - part_data = f.read(part_size) - - future = executor.submit( - self._upload_single_part, part_info, part_data + future = self._submit_part( + executor, f, part_info, part_size ) future_to_part[future] = part_info["PartNumber"] part_index += 1 diff --git a/src/together/utils/files.py b/src/together/utils/files.py index 9cba190..3734753 100644 --- a/src/together/utils/files.py +++ b/src/together/utils/files.py @@ -366,9 +366,9 @@ def _check_utf8(file: Path) -> Dict[str, Any]: report_dict: Dict[str, Any] = {} try: - chunk_size = 8192 + # Dry-run UTF-8 decode: iterate through file to validate encoding with file.open(encoding="utf-8") as f: - for chunk in iter(lambda: f.read(chunk_size), ""): + for _ in f: pass report_dict["utf8"] = True From d104aa1d8f0600a4ff7d330367be3b86c2841e77 Mon Sep 17 00:00:00 2001 From: Soroush Bassam Date: Thu, 20 Nov 2025 12:05:05 -0800 Subject: [PATCH 7/7] address pr feedback --- src/together/filemanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/together/filemanager.py b/src/together/filemanager.py index 6bab6c2..d3c16b8 100644 --- a/src/together/filemanager.py +++ b/src/together/filemanager.py @@ -540,7 +540,7 @@ def _upload_parts_concurrent( part_index = 0 # Submit initial batch limited by max_concurrent_parts - for i in range(min(self.max_concurrent_parts, len(parts))): + for _ in range(min(self.max_concurrent_parts, len(parts))): part_info = parts[part_index] future = self._submit_part(executor, f, part_info, part_size) future_to_part[future] = part_info["PartNumber"]