Skip to content

Commit

Permalink
storage: optimize large azure chunked upload (PROJQUAY-3753) (#1387)
Browse files Browse the repository at this point in the history
- Increase nginx send timeout on blobs endpoints
- Reduce Azure blob block size

TODO: Spread block uploads over multiple worker threads.
  • Loading branch information
kleesc committed Jun 21, 2022
1 parent a60d0c9 commit 56b16b7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 28 deletions.
1 change: 1 addition & 0 deletions conf/nginx/server-base.conf.jnj
Expand Up @@ -139,6 +139,7 @@ location ~ /v2/([^/]+)(/[^/]+)+/blobs/ {
proxy_buffering off;
proxy_request_buffering off;
proxy_read_timeout 2000;
proxy_send_timeout 2000;
proxy_temp_path /tmp 1 2;

client_max_body_size {{ maximum_layer_size }};
Expand Down
50 changes: 24 additions & 26 deletions storage/azurestorage.py
Expand Up @@ -11,6 +11,7 @@
import copy
import time

import concurrent.futures
from datetime import datetime, timedelta

from azure.core.exceptions import AzureError, ResourceNotFoundError
Expand Down Expand Up @@ -232,41 +233,38 @@ def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, con
total_bytes_written = 0

while True:
current_length = length - total_bytes_written
max_length = (
min(current_length, self._max_block_size)
if length != READ_UNTIL_END
else self._max_block_size
)
# Deal with smaller blocks to avoid timeouts when staging larger blocks.
# Azure has a limit of 50000 blocks,
# which should be enough for most use cases (12800GB per blob).
max_length = min(self._max_block_size, 1024 * 1024 * 128)

if max_length <= 0:
break

limited = LimitingStream(in_fp, max_length, seekable=False)

# Note: Azure fails if a zero-length block is uploaded, so we read all the data here,
# and, if there is none, terminate early.
block_data = b""
for chunk in iter(lambda: limited.read(31457280), b""):
block_data += chunk
bytes_written = 0

if len(block_data) == 0:
break
with io.BytesIO() as buf:
bytes_written += buf.write(in_fp.read(max_length))
buf.seek(0)
if bytes_written == 0:
break

block_index = len(new_metadata[_BLOCKS_KEY])
block_id = format(block_index, "05")
new_metadata[_BLOCKS_KEY].append(block_id)
block_index = len(new_metadata[_BLOCKS_KEY])
block_id = format(block_index, "05")
new_metadata[_BLOCKS_KEY].append(block_id)

try:
self._blob(upload_blob_path).stage_block(
block_id, block_data, validate_content=True
)
except AzureError as ae:
logger.exception(
"Exception when trying to stream_upload_chunk block %s for %s", block_id, uuid
)
return total_bytes_written, new_metadata, ae
try:
# TODO (kleesc): Look at doing this multithreaded
self._blob(upload_blob_path).stage_block(block_id, buf, validate_content=True)
except AzureError as ae:
raise IOError(
"Exception when trying to stream_upload_chunk block %s for %s",
block_id,
uuid,
)

bytes_written = len(block_data)
total_bytes_written += bytes_written
if bytes_written == 0 or bytes_written < max_length:
break
Expand Down
4 changes: 2 additions & 2 deletions storage/test/test_azure.py
Expand Up @@ -78,12 +78,12 @@ def container_file(url, request):
if query_params.get("comp") == ["block"]:
block_id = query_params["blockid"][0]
files[filename] = files.get(filename) or {}
files[filename][block_id] = request.body
files[filename][block_id] = request.body.read()
return {
"status_code": 201,
"content": "{}",
"headers": {
"Content-MD5": base64.b64encode(md5(request.body).digest()).decode("ascii"),
"Content-MD5": base64.b64encode(md5(files[filename][block_id]).digest()).decode("ascii"),
"ETag": "foo",
"x-ms-request-server-encrypted": "false",
"last-modified": "Wed, 21 Oct 2015 07:28:00 GMT",
Expand Down

0 comments on commit 56b16b7

Please sign in to comment.