Skip to content

Commit

Permalink
Merge b10e5bd into 442136d
Browse files Browse the repository at this point in the history
  • Loading branch information
egabancho committed Apr 25, 2020
2 parents 442136d + b10e5bd commit 62c3fd8
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 43 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Expand Up @@ -6,6 +6,12 @@
Changes
=======

Version 1.0.3 (released 2020-04-25)

- Allow for dynamic part size for multipart uploads.
- Adds new configuration variables to define default part size and maximum
number of parts.

Version 1.0.2 (released 2020-02-17)

- Fixes typos on configuration variables and cached properties.
Expand Down
13 changes: 12 additions & 1 deletion invenio_s3/config.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, 2019 Esteban J. G. Gabancho.
# Copyright (C) 2018, 2019, 2020 Esteban J. G. Gabancho.
#
# Invenio-S3 is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -63,3 +63,14 @@
<https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#configuration-file>`_
for more information.
"""

S3_MAXIMUM_NUMBER_OF_PARTS = 10000
"""Maximum number of parts to be used.
See `AWS Multipart Upload Overview
<https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html>`_ for more
information.
"""

S3_DEFAULT_BLOCK_SIZE = 5 * 2**20
"""Default block size value used to send multi-part uploads to S3.
Typically 5Mb is minimum allowed by the API."""
100 changes: 72 additions & 28 deletions invenio_s3/storage.py
@@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, 2019 Esteban J. G. Gabancho.
# Copyright (C) 2018, 2019, 2020 Esteban J. G. Gabancho.
#
# Invenio-S3 is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""S3 file storage interface."""
from __future__ import absolute_import, print_function
from __future__ import absolute_import, division, print_function

from functools import partial
from functools import partial, wraps

import s3fs
from flask import current_app
Expand All @@ -17,11 +17,30 @@
from .helpers import redirect_stream


def set_blocksize(f):
"""Decorator to set the correct block size according to file size."""
@wraps(f)
def inner(self, *args, **kwargs):
size = kwargs.get('size', None)
block_size = (
size // current_app.config['S3_MAXIMUM_NUMBER_OF_PARTS'] # Integer
if size
else current_app.config['S3_DEFAULT_BLOCK_SIZE']
)

if block_size > self.block_size:
self.block_size = block_size
return f(self, *args, **kwargs)

return inner


class S3FSFileStorage(PyFSFileStorage):
"""File system storage using Amazon S3 API for accessing files."""

def __init__(self, fileurl, **kwargs):
"""Storage initialization."""
self.block_size = current_app.config['S3_DEFAULT_BLOCK_SIZE']
super(S3FSFileStorage, self).__init__(fileurl, **kwargs)

def _get_fs(self, *args, **kwargs):
Expand All @@ -30,10 +49,11 @@ def _get_fs(self, *args, **kwargs):
return super(S3FSFileStorage, self)._get_fs(*args, **kwargs)

info = current_app.extensions['invenio-s3'].init_s3fs_info
fs = s3fs.S3FileSystem(**info)
fs = s3fs.S3FileSystem(default_block_size=self.block_size, **info)

return (fs, self.fileurl)

@set_blocksize
def initialize(self, size=0):
"""Initialize file on storage and truncate to given size."""
fs, path = self._get_fs()
Expand All @@ -46,8 +66,9 @@ def initialize(self, size=0):
to_write = size
fs_chunk_size = fp.blocksize # Force write every time
while to_write > 0:
current_chunk_size = (to_write if to_write <= fs_chunk_size
else fs_chunk_size)
current_chunk_size = (
to_write if to_write <= fs_chunk_size else fs_chunk_size
)
fp.write(b'\0' * current_chunk_size)
to_write -= current_chunk_size
except Exception:
Expand All @@ -68,23 +89,30 @@ def delete(self):
fs.rm(path)
return True

def update(self,
incoming_stream,
seek=0,
size=None,
chunk_size=None,
progress_callback=None):
@set_blocksize
def update(
self,
incoming_stream,
seek=0,
size=None,
chunk_size=None,
progress_callback=None,
):
"""Update a file in the file system."""
old_fp = self.open(mode='rb')
updated_fp = S3FSFileStorage(
self.fileurl, size=self._size).open(mode='wb')
updated_fp = S3FSFileStorage(self.fileurl, size=self._size).open(
mode='wb'
)
try:
if seek >= 0:
to_write = seek
fs_chunk_size = updated_fp.blocksize
while to_write > 0:
current_chunk_size = (to_write if to_write <= fs_chunk_size
else fs_chunk_size)
current_chunk_size = (
to_write
if to_write <= fs_chunk_size
else fs_chunk_size
)
updated_fp.write(old_fp.read(current_chunk_size))
to_write -= current_chunk_size

Expand All @@ -93,15 +121,19 @@ def update(self,
updated_fp,
chunk_size=chunk_size,
size=size,
progress_callback=progress_callback)
progress_callback=progress_callback,
)

if (bytes_written + seek) < self._size:
old_fp.seek((bytes_written + seek))
to_write = self._size - (bytes_written + seek)
fs_chunk_size = updated_fp.blocksize
while to_write > 0:
current_chunk_size = (to_write if to_write <= fs_chunk_size
else fs_chunk_size)
current_chunk_size = (
to_write
if to_write <= fs_chunk_size
else fs_chunk_size
)
updated_fp.write(old_fp.read(current_chunk_size))
to_write -= current_chunk_size
finally:
Expand All @@ -110,19 +142,22 @@ def update(self,

return bytes_written, checksum

def send_file(self,
filename,
mimetype=None,
restricted=True,
checksum=None,
trusted=False,
chunk_size=None,
as_attachment=False):
def send_file(
self,
filename,
mimetype=None,
restricted=True,
checksum=None,
trusted=False,
chunk_size=None,
as_attachment=False,
):
"""Send the file to the client."""
try:
fs, path = self._get_fs()
s3_url_builder = partial(
fs.url, path, expires=current_app.config['S3_URL_EXPIRATION'])
fs.url, path, expires=current_app.config['S3_URL_EXPIRATION']
)

return redirect_stream(
s3_url_builder,
Expand All @@ -135,6 +170,7 @@ def send_file(self,
except Exception as e:
raise StorageError('Could not send file: {}'.format(e))

@set_blocksize
def copy(self, src, *args, **kwargs):
"""Copy data from another file instance.
Expand All @@ -147,6 +183,14 @@ def copy(self, src, *args, **kwargs):
else:
super(S3FSFileStorage, self).copy(src, *args, **kwargs)

@set_blocksize
def save(self, *args, **kwargs):
"""Save incoming stream to storage.
Just overwrite parent method to allow set the correct block size.
"""
return super(S3FSFileStorage, self).save(*args, **kwargs)


def s3fs_storage_factory(**kwargs):
"""File storage factory for S3."""
Expand Down
2 changes: 1 addition & 1 deletion invenio_s3/version.py
Expand Up @@ -13,4 +13,4 @@

from __future__ import absolute_import, print_function

__version__ = '1.0.2'
__version__ = '1.0.3'
12 changes: 6 additions & 6 deletions setup.py
Expand Up @@ -16,23 +16,23 @@
tests_require = [
'check-manifest>=0.25',
'coverage>=5.0',
'invenio-base>=1.2.0',
'invenio-app>=1.2.0',
'invenio-db[all]>=1.0.2',
'invenio-base>=1.2.2',
'invenio-app>=1.2.3',
'invenio-db[all]>=1.0.4',
'isort>=4.3.3',
'moto>=1.3.7',
'pydocstyle>=1.0.0',
'pytest-cache>=1.0',
'pytest-cov>=1.8.0',
'pytest-invenio>=1.0.4,<1.1.0',
'pytest-invenio>=1.3.0',
'pytest-pep8>=1.0.6',
'pytest>=3.8.0',
'pytest>=4.6.4,<5.0.0',
'redis>=2.10.5',
]

extras_require = {
'docs': [
'Sphinx>=1.5.1',
'Sphinx>=1.5.1,<3.0.2',
],
'tests': tests_require,
}
Expand Down
25 changes: 18 additions & 7 deletions tests/test_storage.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, 2019 Esteban J. G. Gabancho.
# Copyright (C) 2018, 2019, 2020 Esteban J. G. Gabancho.
#
# Invenio-S3 is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -15,8 +15,6 @@
from io import BytesIO

import pytest
import requests
from flask import url_for
from invenio_files_rest.errors import FileSizeError, StorageError, \
UnexpectedFileSizeError
from invenio_files_rest.limiters import FileSizeLimit
Expand Down Expand Up @@ -112,7 +110,6 @@ def test_delete(s3_bucket, s3fs_testpath, s3fs):
))
def test_save(s3_bucket, s3fs_testpath, s3fs, get_md5, data):
"""Test save."""
print(len(data))
uri, size, checksum = s3fs.save(BytesIO(data))
assert uri == s3fs_testpath
assert size == len(data)
Expand All @@ -134,7 +131,7 @@ def test_save_failcleanup(s3fs, s3fs_testpath, get_md5):
data = b'somedata'

def fail_callback(total, size):
assert exists(s3fs_testpath)
assert fs.exists(s3fs_testpath)
raise Exception('Something bad happened')

pytest.raises(
Expand Down Expand Up @@ -233,9 +230,8 @@ def test_update(s3fs, get_md5, file_size):

def test_update_fail(s3fs, s3fs_testpath, get_md5):
"""Test update of file."""

def fail_callback(total, size):
assert exists(s3fs_testpath)
assert fs.exists(s3fs_testpath)
raise Exception('Something bad happened')

s3fs.initialize(size=100)
Expand Down Expand Up @@ -367,3 +363,18 @@ def test_non_unicode_filename(base_app, s3fs):
'żółć.txt', mimetype='text/plain', checksum=checksum)
assert res.status_code == 302
assert res.headers['Content-Disposition'] == 'inline'


def test_block_size(appctx, s3_bucket, s3fs_testpath, s3fs, get_md5):
"""Test block size update on the S3FS client."""
# Set file size to 4 times the default block size
data = b'a' * appctx.config['S3_DEFAULT_BLOCK_SIZE'] * 4
# Set the number of maximum parts to two
appctx.config['S3_MAXIMUM_NUMBER_OF_PARTS'] = 2
uri, size, checksum = s3fs.save(BytesIO(data),
size=len(data))
# The block size should be 2 times the default block size
assert s3fs.block_size == appctx.config['S3_DEFAULT_BLOCK_SIZE'] * 2
assert uri == s3fs_testpath
assert size == len(data)
assert checksum == get_md5(data)

0 comments on commit 62c3fd8

Please sign in to comment.