Skip to content

Commit

Permalink
Store content in the Redis cache
Browse files Browse the repository at this point in the history
ref #9500
Required PR: pulp/pulpcore#1751
  • Loading branch information
lubosmj committed Dec 3, 2021
1 parent 39cb487 commit 3fac4c4
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGES/9500.feature
@@ -0,0 +1 @@
Enabled caching responses from the registry.
125 changes: 125 additions & 0 deletions pulp_container/app/cache.py
@@ -0,0 +1,125 @@
from aiohttp.web_exceptions import HTTPForbidden

from pulpcore.plugin.cache import CacheKeys, ContentCache, SyncContentCache

from pulp_container.app.models import ContainerDistribution

ACCEPT_HEADER_KEY = "accept_header"


class RegistryCache:
"""A class that overrides the default key specs."""

def __init__(self, base_key=None, expires_ttl=None, auth=None):
"""Initialize the parent class with the plugin's specific keys."""
updated_keys = (CacheKeys.path, CacheKeys.method, ACCEPT_HEADER_KEY)
super().__init__(base_key=base_key, expires_ttl=expires_ttl, keys=updated_keys, auth=auth)


class RegistryContentCache(RegistryCache, ContentCache):
"""A wrapper around the Redis content cache handler tailored for the content application."""

def make_key(self, request):
"""Make a key composed of the request's path, method, host, and accept header."""
all_keys = {
CacheKeys.path: request.path,
CacheKeys.method: request.method,
CacheKeys.host: request.url.host,
ACCEPT_HEADER_KEY: request.headers["accept"],
}
key = ":".join(all_keys[k] for k in self.keys)
return key


class RegistryApiCache(RegistryCache, SyncContentCache):
"""A wrapper around the Redis content cache handler tailored for the registry API."""

def make_key(self, request):
"""Make a key composed of the request's path, method, host, and accept header."""
all_keys = {
CacheKeys.path: request.path,
CacheKeys.method: request.method,
CacheKeys.host: request.get_host(),
ACCEPT_HEADER_KEY: request.headers["accept"],
}
key = ":".join(all_keys[k] for k in self.keys)
return key


def auth_cached(request, cached, base_key):
"""
Authentication check for the cached stream_content handler
Args:
request (:class:`aiohttp.web.request`): The request from the client.
cached (:class:`CacheAiohttp`): The Pulp cache
base_key (str): The base_key associated with this response
"""
guard_key = "DISTRO#GUARD#PRESENT"
present = cached.get(guard_key, base_key=base_key)
if present == b"True" or present is None:
path = request.resolver_match.kwargs["path"]
distro = ContainerDistribution.objects.select_related(
"repository", "repository_version", "publication", "remote"
).get(base_path=path)
try:
guard = _permit(request, distro)
except HTTPForbidden:
guard = True
raise
finally:
if not present:
cached.set(guard_key, str(guard), base_key=base_key)


def _permit(request, distribution):
"""
Permit the request.
Authorization is delegated to the optional content-guard associated with the distribution.
Args:
request (:class:`aiohttp.web.Request`): A request for a published file.
distribution (detail of :class:`pulpcore.plugin.models.Distribution`): The matched
distribution.
Raises:
:class:`aiohttp.web_exceptions.HTTPForbidden`: When not permitted.
"""
guard = distribution.content_guard
if not guard:
return False
try:
guard.cast().permit(request)
except PermissionError as pe:
raise HTTPForbidden(reason=str(pe))
return True


def find_base_path_cached(request, cached):
"""
Finds the base-path to use for the base-key in the cache
Args:
request (:class:`aiohttp.web.request`): The request from the client.
cached (:class:`CacheAiohttp`): The Pulp cache
Returns:
str: The base-path associated with this request
"""
path = request.resolver_match.kwargs["path"]
base_paths = [path]
multiplied_base_paths = []
for i, base_path in enumerate(base_paths):
copied_by_index_base_path = [base_path for _ in range(i + 1)]
multiplied_base_paths.extend(copied_by_index_base_path)
index_p1 = cached.exists(base_key=multiplied_base_paths)
if index_p1:
return base_paths[index_p1 - 1]
else:
distro = ContainerDistribution.objects.select_related(
"repository", "repository_version", "publication", "remote"
).get(base_path=path)
return distro.base_path
24 changes: 14 additions & 10 deletions pulp_container/app/models.py
Expand Up @@ -9,6 +9,7 @@
from urllib.parse import urlparse

from django.db import models
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.postgres import fields
from django.shortcuts import redirect
Expand Down Expand Up @@ -598,17 +599,20 @@ def permit(self, request):
"""
Permit preauthenticated redirects from pulp-api.
"""
try:
signed_url = request.url
validate_token = request.query["validate_token"]
hex_salt, hex_digest = validate_token.split(":", 1)
salt = bytes.fromhex(hex_salt)
digest = bytes.fromhex(hex_digest)
url = re.sub(r"\?validate_token=.*$", "", str(signed_url))
if not digest == self._get_digest(salt, url):
if settings.DEFAULT_FILE_STORAGE == "storages.backends.s3boto3.S3Boto3Storage":
pass
else:
try:
signed_url = request.url
validate_token = request.query["validate_token"]
hex_salt, hex_digest = validate_token.split(":", 1)
salt = bytes.fromhex(hex_salt)
digest = bytes.fromhex(hex_digest)
url = re.sub(r"\?validate_token=.*$", "", str(signed_url))
if not digest == self._get_digest(salt, url):
raise PermissionError("Access not authenticated")
except (KeyError, ValueError):
raise PermissionError("Access not authenticated")
except (KeyError, ValueError):
raise PermissionError("Access not authenticated")

def preauthenticate_url(self, url, salt=None):
"""
Expand Down
25 changes: 19 additions & 6 deletions pulp_container/app/redirects.py
Expand Up @@ -5,6 +5,7 @@
from django.http import Http404
from django.shortcuts import redirect

from pulp_container.app.cache import RegistryApiCache, find_base_path_cached, auth_cached
from pulp_container.app.utils import get_accepted_media_types
from pulp_container.constants import BLOB_CONTENT_TYPE, MEDIA_TYPE

Expand Down Expand Up @@ -36,19 +37,19 @@ class FileStorageRedirects(CommonRedirects):
A class which contains methods used for redirecting to the default django's file storage.
"""

def issue_tag_redirect(self, tag):
def issue_tag_redirect(self, request, tag):
"""
Issue a redirect for the passed tag.
"""
return self.redirect_to_content_app("manifests", tag.name)

def issue_manifest_redirect(self, manifest):
def issue_manifest_redirect(self, request, manifest):
"""
Issue a redirect for the passed manifest.
"""
return self.redirect_to_content_app("manifests", manifest.digest)

def issue_blob_redirect(self, blob):
def issue_blob_redirect(self, request, blob):
"""
Issue a redirect for the passed blob.
"""
Expand All @@ -60,7 +61,11 @@ class S3StorageRedirects(CommonRedirects):
A class that implements methods for the direct retrieval of manifest objects.
"""

def issue_tag_redirect(self, tag):
@RegistryApiCache(
base_key=lambda req, cac: find_base_path_cached(req, cac),
auth=lambda req, cac, bk: auth_cached(req, cac, bk),
)
def issue_tag_redirect(self, request, tag):
"""
Issue a redirect or perform a schema conversion if an accepted media type requires it.
"""
Expand All @@ -75,7 +80,11 @@ def issue_tag_redirect(self, tag):
# execute the schema conversion
return self.redirect_to_content_app("manifests", tag.name)

def issue_manifest_redirect(self, manifest):
@RegistryApiCache(
base_key=lambda req, cac: find_base_path_cached(req, cac),
auth=lambda req, cac, bk: auth_cached(req, cac, bk),
)
def issue_manifest_redirect(self, request, manifest):
"""
Directly redirect to an associated manifest's artifact.
"""
Expand All @@ -92,7 +101,11 @@ def redirect_to_artifact(self, content_name, manifest, manifest_media_type):

return self.redirect_to_object_storage(artifact, manifest_media_type)

def issue_blob_redirect(self, blob):
@RegistryApiCache(
base_key=lambda req, cac: find_base_path_cached(req, cac),
auth=lambda req, cac, bk: auth_cached(req, cac, bk),
)
def issue_blob_redirect(self, request, blob):
"""
Redirect to the passed blob or stream content when an associated artifact is not present.
"""
Expand Down
10 changes: 10 additions & 0 deletions pulp_container/app/registry.py
Expand Up @@ -10,6 +10,8 @@

from pulpcore.plugin.content import Handler, PathNotResolved
from pulpcore.plugin.models import ContentArtifact

from pulp_container.app.cache import RegistryContentCache
from pulp_container.app.models import ContainerDistribution, Tag, Blob
from pulp_container.app.schema_convert import Schema2toSchema1ConverterWrapper
from pulp_container.app.utils import get_accepted_media_types
Expand Down Expand Up @@ -76,6 +78,10 @@ async def _dispatch(file, headers):
file_response = web.FileResponse(path, headers=full_headers)
return file_response

@RegistryContentCache(
base_key=lambda req, cac: Registry.find_base_path_cached(req, cac),
auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk),
)
async def get_tag(self, request):
"""
Match the path and stream either Manifest or ManifestList.
Expand Down Expand Up @@ -200,6 +206,10 @@ async def dispatch_converted_schema(tag, accepted_media_types, path):
}
return web.Response(text=result.text, headers=response_headers)

@RegistryContentCache(
base_key=lambda req, cac: Registry.find_base_path_cached(req, cac),
auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk),
)
async def get_by_digest(self, request):
"""
Return a response to the "GET" action.
Expand Down
6 changes: 3 additions & 3 deletions pulp_container/app/registry_api.py
Expand Up @@ -805,7 +805,7 @@ def get(self, request, path, pk=None):
if pk == EMPTY_BLOB:
return redirects.redirect_to_content_app("blobs", pk)
raise BlobNotFound(digest=pk)
return redirects.issue_blob_redirect(blob)
return redirects.issue_blob_redirect(request, blob)


class Manifests(RedirectsMixin, ContainerRegistryApiMixin, ViewSet):
Expand Down Expand Up @@ -834,13 +834,13 @@ def get(self, request, path, pk=None):
try:
if pk[:7] != "sha256:":
tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content)
return redirects.issue_tag_redirect(tag)
return redirects.issue_tag_redirect(request, tag)
else:
manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content)
except (models.Tag.DoesNotExist, models.Manifest.DoesNotExist):
raise ManifestNotFound(reference=pk)

return redirects.issue_manifest_redirect(manifest)
return redirects.issue_manifest_redirect(request, manifest)

def put(self, request, path, pk=None):
"""
Expand Down

0 comments on commit 3fac4c4

Please sign in to comment.