Skip to content

Commit

Permalink
proxy: don't store entire blob in memory when caching
Browse files Browse the repository at this point in the history
also uses blob uploader to upload the blob to storage.
  • Loading branch information
flavianmissi committed Mar 24, 2022
1 parent 815ef44 commit c8d5fb9
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 58 deletions.
75 changes: 45 additions & 30 deletions data/registry_model/registry_proxy_model.py
Expand Up @@ -19,11 +19,18 @@
RepositoryDoesNotExist,
ManifestDoesNotExist,
TagDoesNotExist,
InvalidImageException,
)
from data.model.repository import get_repository, create_repository
from data.model.proxy_cache import get_proxy_cache_config_for_org
from data.model.storage import get_image_location_for_name, get_layer_path
from data.registry_model.blobuploader import (
create_blob_upload,
complete_when_uploaded,
BlobTooLargeException,
BlobRangeMismatchException,
BlobUploadException,
BlobDigestMismatchException,
BlobUploadSettings,
)
from data.registry_model.registry_oci_model import OCIModel
from data.registry_model.datatypes import Manifest, Tag, RepositoryReference
from image.oci import OCI_IMAGE_MANIFEST_CONTENT_TYPE, OCI_IMAGE_INDEX_CONTENT_TYPE
Expand Down Expand Up @@ -441,37 +448,45 @@ def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placement
except ImageStorage.DoesNotExist:
return None

with db_disallow_replica_use():
with db_transaction():
try:
ImageStoragePlacement.select().where(
ImageStoragePlacement.storage == blob
).get()
except ImageStoragePlacement.DoesNotExist:
try:
self._proxy.blob_exists(blob_digest)
except UpstreamRegistryError:
return None
try:
raw_blob, size = self._proxy.get_blob(blob_digest)
except UpstreamRegistryError:
return None
location_name = storage.preferred_locations[0]
path = get_layer_path(blob)
# TODO: use range requests to download the blob when the server supports it
storage.put_content([location_name], path, raw_blob)
location = get_image_location_for_name(location_name)
if blob.image_size != size:
q = ImageStorage.update(
image_size=size,
).where(ImageStorage.id == blob.id)
affected = q.execute()
if affected != 1:
return None
ImageStoragePlacement.create(storage=blob, location=location.id)
try:
ImageStoragePlacement.select().where(ImageStoragePlacement.storage == blob).get()
except ImageStoragePlacement.DoesNotExist:
try:
self._download_blob(repository_ref, blob_digest)
except BlobDigestMismatchException:
raise UpstreamRegistryError("blob digest mismatch")
except BlobTooLargeException as e:
raise UpstreamRegistryError(f"blob too large, max allowed is {e.max_allowed}")
except BlobRangeMismatchException:
raise UpstreamRegistryError("range mismatch")
except BlobUploadException:
raise UpstreamRegistryError("invalid blob upload")

return super().get_repo_blob_by_digest(repository_ref, blob_digest, include_placements)

def _download_blob(self, repo_ref: RepositoryReference, digest: str) -> int:
"""
Download blob from upstream registry and perform a monolitic upload to
Quay's own storage.
"""
expiration = (
self._config.expiration_s
if self._config.expiration_s
else app.config["PUSH_TEMP_TAG_EXPIRATION_SEC"]
)
settings = BlobUploadSettings(
maximum_blob_size=app.config["MAXIMUM_LAYER_SIZE"],
committed_blob_expiration=expiration,
)
uploader = create_blob_upload(repo_ref, storage, settings)
stream, size = self._proxy.get_blob(digest)
start_offset = 0
length = size
with complete_when_uploaded(uploader):
uploader.upload_chunk(app.config, stream, start_offset, length)
uploader.commit_to_blob(app.config, digest)
return size

def convert_manifest(
self,
manifest,
Expand Down
56 changes: 37 additions & 19 deletions endpoints/v2/test/test_blob.py
@@ -1,12 +1,9 @@
import io
import gzip
import json
import unittest
import hashlib
import pytest
from unittest.mock import MagicMock, patch

from mock import patch
from flask import url_for
from playhouse.test_utils import assert_query_count

Expand All @@ -15,14 +12,14 @@
from data import model
from data.cache import InMemoryDataModelCache, NoopDataModelCache
from data.cache.test.test_cache import TEST_CACHE_CONFIG
from data.database import ImageStorageLocation, ManifestBlob, ImageStorage, ImageStoragePlacement
from data.database import ImageStorageLocation, ImageStorage, ImageStoragePlacement
from data.registry_model import registry_model
from data.registry_model.registry_proxy_model import ProxyModel
from data.model.storage import get_layer_path
from digest.digest_tools import sha256_digest
from endpoints.test.shared import conduct_call
from image.docker.schema2 import DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
from image.shared.schemas import parse_manifest_from_bytes
from proxy import Proxy
from image.docker.schema2.manifest import DockerSchema2ManifestBuilder
from util.bytes import Bytes
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
from test.fixtures import *
Expand Down Expand Up @@ -54,7 +51,8 @@ class TestBlobPullThroughStorage:
image_name = "library/hello-world"
repository = f"{orgname}/{image_name}"
tag = "14"
digest = "sha256:2db29710123e3e53a794f2694094b9b4338aa9ee5c40b930cb8063a1be392c54"
# digest for 'test'. matches the one used in proxy/fixtures.py
digest = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
config = None
org = None
manifest = None
Expand Down Expand Up @@ -98,20 +96,40 @@ def setup(self, client, app, proxy_manifest_response):
self.repo_ref = registry_model.lookup_repository(self.orgname, self.image_name)
assert self.repo_ref is not None

def get_blob(layer):
content = Bytes.for_string_or_unicode(layer).as_encoded_str()
digest = str(sha256_digest(content))
blob = model.blob.store_blob_record_and_temp_link(
self.orgname,
self.image_name,
digest,
ImageStorageLocation.get(name="local_us"),
len(content),
120,
)
storage.put_content(["local_us"], get_layer_path(blob), content)
return blob, digest

if self.manifest is None:
proxy_mock = proxy_manifest_response(
self.tag, HELLO_WORLD_SCHEMA2_MANIFEST_JSON, DOCKER_SCHEMA2_MANIFEST_CONTENT_TYPE
layer1 = json.dumps(
{
"config": {},
"rootfs": {"type": "layers", "diff_ids": []},
"history": [{}],
}
)
with patch(
"data.registry_model.registry_proxy_model.Proxy", MagicMock(return_value=proxy_mock)
):
proxy_model = ProxyModel(
self.orgname,
self.image_name,
self.user,
)
tag = proxy_model.get_repo_tag(self.repo_ref, self.tag)
self.manifest = tag.manifest
_, config_digest = get_blob(layer1)
layer2 = "test"
_, blob_digest = get_blob(layer2)
builder = DockerSchema2ManifestBuilder()
builder.set_config_digest(config_digest, len(layer1.encode("utf-8")))
builder.add_layer(blob_digest, len(layer2.encode("utf-8")))
manifest = builder.build()
created_manifest = model.oci.manifest.get_or_create_manifest(
self.repo_ref.id, manifest, storage
)
self.manifest = created_manifest.manifest
assert self.digest == blob_digest
assert self.manifest is not None

if self.blob is None:
Expand Down
11 changes: 3 additions & 8 deletions proxy/__init__.py
Expand Up @@ -3,16 +3,13 @@
Registries following the distribution spec are supported.
"""
from __future__ import annotations
from typing import Any
import re
import json

import requests

from app import model_cache
from data.cache import cache_key
from data.database import ProxyCacheConfig
from digest.digest_tools import sha256_digest


WWW_AUTHENTICATE_REGEX = re.compile(r'(\w+)[=] ?"?([^",]+)"?')
Expand Down Expand Up @@ -94,12 +91,10 @@ def get_blob(self, digest: str):
resp = self.get(
url,
allow_redirects=True,
stream=True,
)
size = resp.headers.get("content-length")
content_digest = sha256_digest(resp.content)
if digest != content_digest:
UpstreamRegistryError(f"digest mismatch, expected {digest}, got {content_digest}")
return resp.content, size
size = resp.headers.get("content-length", -1)
return resp.raw, int(size)

def blob_exists(self, digest: str):
url = f"{self.base_url}/v2/{self._repo}/blobs/{digest}"
Expand Down
3 changes: 2 additions & 1 deletion proxy/fixtures.py
@@ -1,3 +1,4 @@
import io
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -29,7 +30,7 @@ def mock_blob_exists(digest):
return {"status": 200}

def mock_get_blob(digest):
return b"test", 4
return io.BytesIO(b"test"), 4

proxy_mock = MagicMock()
proxy_mock.manifest_exists.side_effect = mock_manifest_exists
Expand Down

0 comments on commit c8d5fb9

Please sign in to comment.