Skip to content

Commit

Permalink
fix: fix write s3 file 80GB limit issue (#376)
Browse files Browse the repository at this point in the history
* fix: fix write s3 file 80GB limit issue

* make lint happy

* perf params order

---------

Co-authored-by: liyang <liyang@msh.team>
Co-authored-by: penghongyang <penghongyang@megvii.com>
  • Loading branch information
3 people committed Jul 9, 2024
1 parent 6571a5c commit 18c4b2f
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 9 deletions.
3 changes: 3 additions & 0 deletions docs/configuration/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ Common Configuration

### Environment configurations
- `MEGFILE_BLOCK_SIZE`: default block size of read and write operate, unit is bytes, default is `8MB`
- `MEGFILE_MIN_BLOCK_SIZE`:
- min write block size, unit is bytes, default is equal to `MEGFILE_BLOCK_SIZE`
- If you need write big size file, you should set `MEGFILE_MIN_BLOCK_SIZE` to a big value.
- `MEGFILE_MAX_BLOCK_SIZE`: max write block size, unit is bytes, default is `128MB`
- `MEGFILE_MAX_BUFFER_SIZE`: max read buffer size, unit is bytes, default is `128MB`
- `MEGFILE_MAX_WORKERS`: max threads will be used, default is `32`
Expand Down
3 changes: 3 additions & 0 deletions megfile/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
DEFAULT_MAX_BUFFER_SIZE = 128 * 2**20
DEFAULT_BLOCK_CAPACITY = 16

DEFAULT_MIN_BLOCK_SIZE = int(
os.getenv('MEGFILE_MIN_BLOCK_SIZE') or DEFAULT_BLOCK_SIZE)

if os.getenv('MEGFILE_MAX_BLOCK_SIZE'):
DEFAULT_MAX_BLOCK_SIZE = int(os.environ['MEGFILE_MAX_BLOCK_SIZE'])
if DEFAULT_MAX_BLOCK_SIZE < DEFAULT_BLOCK_SIZE:
Expand Down
4 changes: 2 additions & 2 deletions megfile/lib/s3_buffered_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from threading import Lock
from typing import NamedTuple, Optional

from megfile.config import BACKOFF_FACTOR, BACKOFF_INITIAL, DEFAULT_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MAX_BUFFER_SIZE, GLOBAL_MAX_WORKERS
from megfile.config import BACKOFF_FACTOR, BACKOFF_INITIAL, DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MAX_BUFFER_SIZE, DEFAULT_MIN_BLOCK_SIZE, GLOBAL_MAX_WORKERS
from megfile.errors import raise_s3_error
from megfile.interfaces import Writable
from megfile.utils import get_human_size, process_local
Expand Down Expand Up @@ -42,7 +42,7 @@ def __init__(
key: str,
*,
s3_client,
block_size: int = DEFAULT_BLOCK_SIZE,
block_size: int = DEFAULT_MIN_BLOCK_SIZE,
max_block_size: int = DEFAULT_MAX_BLOCK_SIZE,
max_buffer_size: int = DEFAULT_MAX_BUFFER_SIZE,
max_workers: Optional[int] = None,
Expand Down
4 changes: 2 additions & 2 deletions megfile/lib/s3_limited_seekable_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from logging import getLogger as get_logger
from typing import Optional

from megfile.config import DEFAULT_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MAX_BUFFER_SIZE
from megfile.config import DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MAX_BUFFER_SIZE, DEFAULT_MIN_BLOCK_SIZE
from megfile.errors import raise_s3_error
from megfile.interfaces import Seekable
from megfile.lib.s3_buffered_writer import S3BufferedWriter
Expand All @@ -25,7 +25,7 @@ def __init__(
key: str,
*,
s3_client,
block_size: int = DEFAULT_BLOCK_SIZE,
block_size: int = DEFAULT_MIN_BLOCK_SIZE,
head_block_size: Optional[int] = None,
tail_block_size: Optional[int] = None,
max_block_size: int = DEFAULT_MAX_BLOCK_SIZE,
Expand Down
16 changes: 11 additions & 5 deletions megfile/s3_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import botocore
from botocore.awsrequest import AWSResponse

from megfile.config import DEFAULT_BLOCK_SIZE, GLOBAL_MAX_WORKERS, S3_CLIENT_CACHE_MODE, S3_MAX_RETRY_TIMES
from megfile.config import DEFAULT_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MIN_BLOCK_SIZE, GLOBAL_MAX_WORKERS, S3_CLIENT_CACHE_MODE, S3_MAX_RETRY_TIMES
from megfile.errors import S3BucketNotFoundError, S3ConfigError, S3FileExistsError, S3FileNotFoundError, S3IsADirectoryError, S3NameTooLongError, S3NotADirectoryError, S3NotALinkError, S3PermissionError, S3UnknownError, SameFileError, UnsupportedError, _create_missing_ok_generator
from megfile.errors import _logger as error_logger
from megfile.errors import patch_method, raise_s3_error, s3_error_code_should_retry, s3_should_retry, translate_fs_error, translate_s3_error
Expand Down Expand Up @@ -783,7 +783,9 @@ def s3_buffered_open(
limited_seekable: bool = False,
buffered: bool = False,
share_cache_key: Optional[str] = None,
cache_path: Optional[str] = None) -> IO:
cache_path: Optional[str] = None,
min_block_size: int = DEFAULT_MIN_BLOCK_SIZE,
max_block_size: int = DEFAULT_MAX_BLOCK_SIZE) -> IO:
'''Open an asynchronous prefetch reader, to support fast sequential read
.. note ::
Expand All @@ -796,7 +798,9 @@ def s3_buffered_open(
:param max_concurrency: Max download thread number, None by default
:param max_buffer_size: Max cached buffer size in memory, 128MB by default
:param block_size: Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread.
:param min_block_size: Min size of single block, default is same as block_size. Each block will be downloaded by single thread.
:param max_block_size: Max size of single block, 128MB by default. Each block will be downloaded by single thread.
:param block_size: Size of single block, 8MB by default. Each block will be uploaded by single thread.
:param limited_seekable: If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek
:returns: An opened S3PrefetchReader object
:raises: S3FileNotFoundError
Expand Down Expand Up @@ -872,17 +876,19 @@ def s3_buffered_open(
key,
s3_client=client,
max_workers=max_concurrency,
block_size=min_block_size,
max_block_size=max_block_size,
max_buffer_size=max_buffer_size,
block_size=block_size,
profile_name=s3_url._profile_name)
else:
writer = S3BufferedWriter(
bucket,
key,
s3_client=client,
max_workers=max_concurrency,
block_size=min_block_size,
max_block_size=max_block_size,
max_buffer_size=max_buffer_size,
block_size=block_size,
profile_name=s3_url._profile_name)
if buffered or _is_pickle(writer):
writer = io.BufferedWriter(writer) # type: ignore
Expand Down

0 comments on commit 18c4b2f

Please sign in to comment.