/
storage.py
193 lines (160 loc) · 5.98 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, 2019, 2020, 2021 Esteban J. G. Gabancho.
#
# Invenio-S3 is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""S3 file storage interface."""
from functools import partial, wraps
from math import ceil
import s3fs
from flask import current_app
from invenio_files_rest.errors import StorageError
from invenio_files_rest.storage import PyFSFileStorage, pyfs_storage_factory
from .helpers import redirect_stream
def set_blocksize(f):
"""Decorator to set the correct block size according to file size."""
@wraps(f)
def inner(self, *args, **kwargs):
size = kwargs.get("size", None)
block_size = (
ceil(size / current_app.config["S3_MAXIMUM_NUMBER_OF_PARTS"])
if size
else current_app.config["S3_DEFAULT_BLOCK_SIZE"]
)
if block_size > self.block_size:
self.block_size = block_size
return f(self, *args, **kwargs)
return inner
class S3FSFileStorage(PyFSFileStorage):
"""File system storage using Amazon S3 API for accessing files."""
def __init__(self, fileurl, **kwargs):
"""Storage initialization."""
self.block_size = current_app.config["S3_DEFAULT_BLOCK_SIZE"]
super(S3FSFileStorage, self).__init__(fileurl, **kwargs)
def _get_fs(self, *args, **kwargs):
"""Get PyFilesystem instance and S3 real path."""
if not self.fileurl.startswith("s3://"):
return super(S3FSFileStorage, self)._get_fs(*args, **kwargs)
info = current_app.extensions["invenio-s3"].init_s3fs_info
fs = s3fs.S3FileSystem(default_block_size=self.block_size, **info)
return (fs, self.fileurl)
@set_blocksize
def initialize(self, size=0):
"""Initialize file on storage and truncate to given size."""
fs, path = self._get_fs()
if fs.exists(path):
fp = fs.rm(path)
fp = fs.open(path, mode="wb")
try:
to_write = size
fs_chunk_size = fp.blocksize # Force write every time
while to_write > 0:
current_chunk_size = (
to_write if to_write <= fs_chunk_size else fs_chunk_size
)
fp.write(b"\0" * current_chunk_size)
to_write -= current_chunk_size
except Exception:
fp.close()
self.delete()
raise
finally:
fp.close()
self._size = size
return self.fileurl, size, None
def delete(self):
"""Delete a file."""
fs, path = self._get_fs()
if fs.exists(path):
fs.rm(path)
return True
@set_blocksize
def update(
self,
incoming_stream,
seek=0,
size=None,
chunk_size=None,
progress_callback=None,
):
"""Update a file in the file system."""
old_fp = self.open(mode="rb")
updated_fp = S3FSFileStorage(self.fileurl, size=self._size).open(mode="wb")
try:
if seek >= 0:
to_write = seek
fs_chunk_size = updated_fp.blocksize
while to_write > 0:
current_chunk_size = (
to_write if to_write <= fs_chunk_size else fs_chunk_size
)
updated_fp.write(old_fp.read(current_chunk_size))
to_write -= current_chunk_size
bytes_written, checksum = self._write_stream(
incoming_stream,
updated_fp,
chunk_size=chunk_size,
size=size,
progress_callback=progress_callback,
)
if (bytes_written + seek) < self._size:
old_fp.seek((bytes_written + seek))
to_write = self._size - (bytes_written + seek)
fs_chunk_size = updated_fp.blocksize
while to_write > 0:
current_chunk_size = (
to_write if to_write <= fs_chunk_size else fs_chunk_size
)
updated_fp.write(old_fp.read(current_chunk_size))
to_write -= current_chunk_size
finally:
old_fp.close()
updated_fp.close()
return bytes_written, checksum
def send_file(
self,
filename,
mimetype=None,
restricted=True,
checksum=None,
trusted=False,
chunk_size=None,
as_attachment=False,
):
"""Send the file to the client."""
try:
fs, path = self._get_fs()
s3_url_builder = partial(
fs.url, path, expires=current_app.config["S3_URL_EXPIRATION"]
)
return redirect_stream(
s3_url_builder,
filename,
mimetype=mimetype,
restricted=restricted,
trusted=trusted,
as_attachment=as_attachment,
)
except Exception as e:
raise StorageError("Could not send file: {}".format(e))
@set_blocksize
def copy(self, src, *args, **kwargs):
"""Copy data from another file instance.
If the source is an S3 stored object the copy process happens on the S3
server side, otherwise we use the normal ``FileStorage`` copy method.
"""
if src.fileurl.startswith("s3://"):
fs, path = self._get_fs()
fs.copy(src.fileurl, path)
else:
super(S3FSFileStorage, self).copy(src, *args, **kwargs)
@set_blocksize
def save(self, *args, **kwargs):
"""Save incoming stream to storage.
Just overwrite parent method to allow set the correct block size.
"""
return super(S3FSFileStorage, self).save(*args, **kwargs)
def s3fs_storage_factory(**kwargs):
"""File storage factory for S3."""
return pyfs_storage_factory(filestorage_class=S3FSFileStorage, **kwargs)