/
blobuploader.py
371 lines (300 loc) · 14 KB
/
blobuploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
import logging
import time
from contextlib import contextmanager
from collections import namedtuple
import bitmath
import rehash
from prometheus_client import Counter, Histogram
from data.registry_model import registry_model
from data.database import CloseForLongOperation, db_transaction
from digest import digest_tools
from util.registry.filelike import wrap_with_handler, StreamSlice
from util.registry.gzipstream import calculate_size_handler
logger = logging.getLogger(__name__)
chunk_upload_duration = Histogram(
"quay_chunk_upload_duration_seconds",
"number of seconds for a chunk to be uploaded to the registry",
labelnames=["region"],
)
pushed_bytes_total = Counter(
"quay_registry_image_pushed_bytes_total", "number of bytes pushed to the registry"
)
BLOB_CONTENT_TYPE = "application/octet-stream"
class BlobUploadException(Exception):
"""
Base for all exceptions raised when uploading blobs.
"""
class BlobRangeMismatchException(BlobUploadException):
"""
Exception raised if the range to be uploaded does not match.
"""
class BlobDigestMismatchException(BlobUploadException):
"""
Exception raised if the digest requested does not match that of the contents uploaded.
"""
class BlobTooLargeException(BlobUploadException):
"""
Exception raised if the data uploaded exceeds the maximum_blob_size.
"""
def __init__(self, uploaded, max_allowed):
super(BlobTooLargeException, self).__init__()
self.uploaded = uploaded
self.max_allowed = max_allowed
BlobUploadSettings = namedtuple(
"BlobUploadSettings",
["maximum_blob_size", "committed_blob_expiration"],
)
def create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers=None):
"""
Creates a new blob upload in the specified repository and returns a manager for interacting with
that upload.
Returns None if a new blob upload could not be started.
"""
location_name = storage.preferred_locations[0]
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
blob_upload = registry_model.create_blob_upload(
repository_ref, new_upload_uuid, location_name, upload_metadata
)
if blob_upload is None:
return None
return _BlobUploadManager(
repository_ref, blob_upload, settings, storage, extra_blob_stream_handlers
)
def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settings):
"""
Retrieves the manager for an in-progress blob upload with the specified ID under the given
repository or None if none.
"""
blob_upload = registry_model.lookup_blob_upload(repository_ref, blob_upload_id)
if blob_upload is None:
return None
return _BlobUploadManager(repository_ref, blob_upload, settings, storage)
@contextmanager
def complete_when_uploaded(blob_upload):
"""
Wraps the given blob upload in a context manager that completes the upload when the context
closes.
"""
try:
yield blob_upload
except Exception as ex:
logger.exception("Exception when uploading blob `%s`", blob_upload.blob_upload_id)
raise ex
finally:
# Cancel the upload if something went wrong or it was not commit to a blob.
if blob_upload.committed_blob is None:
blob_upload.cancel_upload()
@contextmanager
def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None):
"""
Starts a new blob upload in the specified repository and yields a manager for interacting with
that upload.
When the context manager completes, the blob upload is deleted, whether committed to a blob or
not. Yields None if a blob upload could not be started.
"""
assert repository_ref is not None
created = create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers)
if not created:
yield None
return
try:
yield created
except Exception as ex:
logger.exception("Exception when uploading blob `%s`", created.blob_upload_id)
raise ex
finally:
# Cancel the upload if something went wrong or it was not commit to a blob.
if created.committed_blob is None:
created.cancel_upload()
class _BlobUploadManager(object):
"""
Defines a helper class for easily interacting with blob uploads in progress, including handling
of database and storage calls.
"""
def __init__(
self, repository_ref, blob_upload, settings, storage, extra_blob_stream_handlers=None
):
assert repository_ref is not None
assert blob_upload is not None
self.repository_ref = repository_ref
self.blob_upload = blob_upload
self.settings = settings
self.storage = storage
self.extra_blob_stream_handlers = extra_blob_stream_handlers
self.committed_blob = None
@property
def blob_upload_id(self):
"""
Returns the unique ID for the blob upload.
"""
return self.blob_upload.upload_id
def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1):
"""
Uploads a chunk of data found in the given input file-like interface. start_offset and
length are optional and should match a range header if any was given.
Returns the total number of bytes uploaded after this upload has completed. Raises a
BlobUploadException if the upload failed.
"""
assert start_offset is not None
assert length is not None
if start_offset > 0 and start_offset > self.blob_upload.byte_count:
logger.error("start_offset provided greater than blob_upload.byte_count")
raise BlobRangeMismatchException()
# Ensure that we won't go over the allowed maximum size for blobs.
max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size)
uploaded = bitmath.Byte(length + start_offset)
if length > -1 and uploaded > max_blob_size:
raise BlobTooLargeException(uploaded=uploaded.bytes, max_allowed=max_blob_size.bytes)
location_set = {self.blob_upload.location_name}
upload_error = None
with CloseForLongOperation(app_config):
if start_offset > 0 and start_offset < self.blob_upload.byte_count:
# Skip the bytes which were received on a previous push, which are already stored and
# included in the sha calculation
overlap_size = self.blob_upload.byte_count - start_offset
input_fp = StreamSlice(input_fp, overlap_size)
# Update our upload bounds to reflect the skipped portion of the overlap
start_offset = self.blob_upload.byte_count
length = max(length - overlap_size, 0)
# We use this to escape early in case we have already processed all of the bytes the user
# wants to upload.
if length == 0:
return self.blob_upload.byte_count
input_fp = wrap_with_handler(input_fp, self.blob_upload.sha_state.update)
if self.extra_blob_stream_handlers:
for handler in self.extra_blob_stream_handlers:
input_fp = wrap_with_handler(input_fp, handler)
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
# comes in, but in the common case the docker client only sends one chunk.
size_info = None
if start_offset == 0 and self.blob_upload.chunk_count == 0:
size_info, fn = calculate_size_handler()
input_fp = wrap_with_handler(input_fp, fn)
start_time = time.time()
length_written, new_metadata, upload_error = self.storage.stream_upload_chunk(
location_set,
self.blob_upload.upload_id,
start_offset,
length,
input_fp,
self.blob_upload.storage_metadata,
content_type=BLOB_CONTENT_TYPE,
)
if upload_error is not None:
logger.error("storage.stream_upload_chunk returned error %s", upload_error)
raise BlobUploadException(upload_error)
# Update the chunk upload time and push bytes metrics.
chunk_upload_duration.labels(list(location_set)[0]).observe(time.time() - start_time)
pushed_bytes_total.inc(length_written)
# Ensure we have not gone beyond the max layer size.
new_blob_bytes = self.blob_upload.byte_count + length_written
new_blob_size = bitmath.Byte(new_blob_bytes)
if new_blob_size > max_blob_size:
raise BlobTooLargeException(uploaded=new_blob_size, max_allowed=max_blob_size.bytes)
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
uncompressed_byte_count = self.blob_upload.uncompressed_byte_count
if size_info is not None and self.blob_upload.chunk_count == 0 and size_info.is_valid:
uncompressed_byte_count = size_info.uncompressed_size
elif length_written > 0:
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
# know the uncompressed size.
uncompressed_byte_count = None
self.blob_upload = registry_model.update_blob_upload(
self.blob_upload,
uncompressed_byte_count,
new_metadata,
new_blob_bytes,
self.blob_upload.chunk_count + 1,
self.blob_upload.sha_state,
)
if self.blob_upload is None:
raise BlobUploadException("Could not complete upload of chunk")
return new_blob_bytes
def cancel_upload(self):
"""
Cancels the blob upload, deleting any data uploaded and removing the upload itself.
"""
if self.blob_upload is None:
return
# Tell storage to cancel the chunked upload, deleting its contents.
self.storage.cancel_chunked_upload(
{self.blob_upload.location_name},
self.blob_upload.upload_id,
self.blob_upload.storage_metadata,
)
# Remove the blob upload record itself.
registry_model.delete_blob_upload(self.blob_upload)
def commit_to_blob(self, app_config, expected_digest=None):
"""
Commits the blob upload to a blob under the repository. The resulting blob will be marked to
not be GCed for some period of time (as configured by `committed_blob_expiration`).
If expected_digest is specified, the content digest of the data uploaded for the blob is
compared to that given and, if it does not match, a BlobDigestMismatchException is raised.
The digest given must be of type `Digest` and not a string.
"""
# Compare the content digest.
if expected_digest is not None:
self._validate_digest(expected_digest)
# Finalize the storage.
storage_already_existed = self._finalize_blob_storage(app_config)
# Convert the upload to a blob.
computed_digest_str = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state)
with db_transaction():
blob = registry_model.commit_blob_upload(
self.blob_upload, computed_digest_str, self.settings.committed_blob_expiration
)
if blob is None:
return None
self.committed_blob = blob
return blob
def _validate_digest(self, expected_digest):
"""
Verifies that the digest's SHA matches that of the uploaded data.
"""
try:
computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state)
if not digest_tools.digests_equal(computed_digest, expected_digest):
logger.error(
"Digest mismatch for upload %s: Expected digest %s, found digest %s",
self.blob_upload.upload_id,
expected_digest,
computed_digest,
)
raise BlobDigestMismatchException()
except digest_tools.InvalidDigestException:
raise BlobDigestMismatchException()
def _finalize_blob_storage(self, app_config):
"""
When an upload is successful, this ends the uploading process from the storage's
perspective.
Returns True if the blob already existed.
"""
computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state)
final_blob_location = digest_tools.content_path(computed_digest)
# Close the database connection before we perform this operation, as it can take a while
# and we shouldn't hold the connection during that time.
with CloseForLongOperation(app_config):
# Move the storage into place, or if this was a re-upload, cancel it
already_existed = self.storage.exists(
{self.blob_upload.location_name}, final_blob_location
)
if already_existed:
# It already existed, clean up our upload which served as proof that the
# uploader had the blob.
self.storage.cancel_chunked_upload(
{self.blob_upload.location_name},
self.blob_upload.upload_id,
self.blob_upload.storage_metadata,
)
else:
# We were the first ones to upload this image (at least to this location)
# Let's copy it into place
self.storage.complete_chunked_upload(
{self.blob_upload.location_name},
self.blob_upload.upload_id,
final_blob_location,
self.blob_upload.storage_metadata,
)
return already_existed