-
Notifications
You must be signed in to change notification settings - Fork 74
/
inject.py
236 lines (177 loc) · 7.7 KB
/
inject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import asyncio
from typing import Optional, Callable, BinaryIO, Dict, Any
from botocore.exceptions import ClientError
from boto3 import utils
from boto3.s3.transfer import S3TransferConfig
def inject_s3_transfer_methods(class_attributes, **kwargs):
utils.inject_attribute(class_attributes, 'upload_file', upload_file)
utils.inject_attribute(class_attributes, 'download_file', download_file)
utils.inject_attribute(class_attributes, 'copy', copy)
utils.inject_attribute(class_attributes, 'upload_fileobj', upload_fileobj)
utils.inject_attribute(class_attributes, 'download_fileobj', download_fileobj)
async def download_file(self, Bucket, Key, Filename, ExtraArgs=None, Callback=None, Config=None):
"""Download an S3 object to a file.
Usage::
import boto3
s3 = boto3.resource('s3')
s3.meta.client.download_file('mybucket', 'hello.txt', '/tmp/hello.txt')
Similar behavior as S3Transfer's download_file() method,
except that parameters are capitalized.
"""
with open(Filename, 'wb') as open_file:
await download_fileobj(self, Bucket, Key, open_file, ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)
async def download_fileobj(self, Bucket, Key, Fileobj, ExtraArgs=None, Callback=None, Config=None):
"""Download an object from S3 to a file-like object.
The file-like object must be in binary mode.
This is a managed transfer which will perform a multipart download in
multiple threads if necessary.
Usage::
import boto3
s3 = boto3.client('s3')
with open('filename', 'wb') as data:
s3.download_fileobj('mybucket', 'mykey', data)
:type Fileobj: a file-like object
:param Fileobj: A file-like object to download into. At a minimum, it must
implement the `write` method and must accept bytes.
:type Bucket: str
:param Bucket: The name of the bucket to download from.
:type Key: str
:param Key: The name of the key to download from.
:type ExtraArgs: dict
:param ExtraArgs: Extra arguments that may be passed to the
client operation.
:type Callback: method
:param Callback: A method which takes a number of bytes transferred to
be periodically called during the download.
:type Config: boto3.s3.transfer.TransferConfig
:param Config: The transfer configuration to be used when performing the
download.
"""
try:
resp = await self.get_object(Bucket=Bucket, Key=Key)
except ClientError as err:
if err.response['Error']['Code'] == 'NoSuchKey':
# Convert to 404 so it looks the same when boto3.download_file fails
raise ClientError({'Error': {'Code': '404', 'Message': 'Not Found'}}, 'HeadObject')
raise
body = resp['Body']
while True:
data = await body.read(4096)
if data == b'':
break
if Callback:
try:
Callback(len(data))
except: # noqa: E722
pass
Fileobj.write(data)
await asyncio.sleep(0.0)
async def upload_fileobj(self, Fileobj: BinaryIO, Bucket: str, Key: str, ExtraArgs: Optional[Dict[str, Any]] = None,
Callback: Optional[Callable[[int], None]] = None,
Config: Optional[S3TransferConfig] = None):
"""Upload a file-like object to S3.
The file-like object must be in binary mode.
This is a managed transfer which will perform a multipart upload in
multiple threads if necessary.
Usage::
import boto3
s3 = boto3.client('s3')
with open('filename', 'rb') as data:
s3.upload_fileobj(data, 'mybucket', 'mykey')
:type Fileobj: a file-like object
:param Fileobj: A file-like object to upload. At a minimum, it must
implement the `read` method, and must return bytes.
:type Bucket: str
:param Bucket: The name of the bucket to upload to.
:type Key: str
:param Key: The name of the key to upload to.
:type ExtraArgs: dict
:param ExtraArgs: Extra arguments that may be passed to the
client operation.
:type Callback: method
:param Callback: A method which takes a number of bytes transferred to
be periodically called during the upload.
:type Config: boto3.s3.transfer.TransferConfig
:param Config: The transfer configuration to be used when performing the
upload.
"""
if not ExtraArgs:
ExtraArgs = {}
# I was debating setting up a queue etc...
# If its too slow I'll then be bothered
multipart_chunksize = 8388608 if Config is None else Config.multipart_chunksize
io_chunksize = 262144 if Config is None else Config.io_chunksize
# max_concurrency = 10 if Config is None else Config.max_concurrency
# max_io_queue = 100 if config is None else Config.max_io_queue
# Start multipart upload
resp = await self.create_multipart_upload(Bucket=Bucket, Key=Key, **ExtraArgs)
upload_id = resp['UploadId']
part = 0
parts = []
running = True
sent_bytes = 0
try:
while running:
part += 1
multipart_payload = b''
while len(multipart_payload) < multipart_chunksize:
if asyncio.iscoroutinefunction(Fileobj.read): # handles if we pass in aiofiles obj
data = await Fileobj.read(io_chunksize)
else:
data = Fileobj.read(io_chunksize)
if data == b'': # End of file
running = False
break
multipart_payload += data
# Submit part to S3
resp = await self.upload_part(
Body=multipart_payload,
Bucket=Bucket,
Key=Key,
PartNumber=part,
UploadId=upload_id
)
parts.append({'ETag': resp['ETag'], 'PartNumber': part})
sent_bytes += len(multipart_payload)
try:
Callback(sent_bytes) # Attempt to call the callback, if it fails, ignore, if no callback, ignore
except: # noqa: E722
pass
# By now the uploads must have been done
await self.complete_multipart_upload(
Bucket=Bucket,
Key=Key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
except: # noqa: E722
# Cancel multipart upload
await self.abort_multipart_upload(
Bucket=Bucket,
Key=Key,
UploadId=upload_id
)
raise
async def upload_file(self, Filename, Bucket, Key, ExtraArgs=None, Callback=None, Config=None):
"""Upload a file to an S3 object.
Usage::
import boto3
s3 = boto3.resource('s3')
s3.meta.client.upload_file('/tmp/hello.txt', 'mybucket', 'hello.txt')
Similar behavior as S3Transfer's upload_file() method,
except that parameters are capitalized.
"""
with open(Filename, 'rb') as open_file:
await upload_fileobj(self, open_file, Bucket, Key, ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)
async def copy(self, CopySource, Bucket, Key, ExtraArgs=None, Callback=None, Config=None):
assert 'Bucket' in CopySource
assert 'Key' in CopySource
try:
resp = await self.get_object(Bucket=CopySource['Bucket'], Key=CopySource['Key'])
except ClientError as err:
if err.response['Error']['Code'] == 'NoSuchKey':
# Convert to 404 so it looks the same when boto3.download_file fails
raise ClientError({'Error': {'Code': '404', 'Message': 'Not Found'}}, 'HeadObject')
raise
file_obj = resp['Body']
await self.upload_fileobj(file_obj, Bucket, Key, ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)