Skip to content

Commit

Permalink
Verify if a manifest already exists locally when syncing a remote repo.
Browse files Browse the repository at this point in the history
Closes #1047
  • Loading branch information
decko committed Nov 23, 2022
1 parent b4d757d commit f860076
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGES/1047.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a check if a manifest already exists locally to decrease the number of downloads from a remote registry when syncing content.
7 changes: 5 additions & 2 deletions pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from pulpcore.plugin.download import DownloaderFactory, HttpDownloader


log = getLogger(__name__)


Expand Down Expand Up @@ -51,6 +50,7 @@ async def _run(self, handle_401=True, extra_data=None):
if extra_data is not None:
headers = extra_data.get("headers", headers)
repo_name = extra_data.get("repo_name", None)
http_method = extra_data.get("http_method", "get") if extra_data is not None else "get"
this_token = self.registry_auth["bearer"]
basic_auth = self.registry_auth["basic"]
auth_headers = self.auth_header(this_token, basic_auth)
Expand All @@ -59,7 +59,10 @@ async def _run(self, handle_401=True, extra_data=None):
self.session._default_auth = None
if self.download_throttler:
await self.download_throttler.acquire()
async with self.session.get(

session_http_method = getattr(self.session, http_method)

async with session_http_method(
self.url, headers=headers, proxy=self.proxy, proxy_auth=self.proxy_auth
) as response:
try:
Expand Down
108 changes: 71 additions & 37 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,46 @@ def __init__(self, remote, signed_only):
self.manifest_dcs = []
self.signature_dcs = []

def _get_content_data_blocking(self, manifest):
saved_artifact = manifest.contentartifact_set.first().artifact
raw_data = saved_artifact.file
content_data = json.load(raw_data)
saved_artifact.file.close()
return saved_artifact, content_data, raw_data

async def _download_artifact_data(self, manifest_url):
downloader = self.remote.get_downloader(url=manifest_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(response.path, "rb") as content_file:
raw_data = content_file.read()
response.artifact_attributes["file"] = response.path

saved_artifact = await sync_to_async(_save_artifact_blocking)(response.artifact_attributes)
content_data = json.loads(raw_data)

return saved_artifact, content_data, raw_data, response

async def _check_for_existing_manifest(self, download_tag):
response = await download_tag

digest = response.headers.get("docker-content-digest")

if digest and (
manifest := await sync_to_async(
Manifest.objects.prefetch_related("contentartifact_set").filter(digest=digest).first
)()
):
saved_artifact, content_data, raw_data = await sync_to_async(
self._get_content_data_blocking
)(manifest)

else:
saved_artifact, content_data, raw_data, response = await self._download_artifact_data(
response.url
)

return saved_artifact, content_data, raw_data, response

async def run(self):
"""
ContainerFirstStage.
Expand Down Expand Up @@ -112,33 +152,34 @@ async def run(self):
relative_url = "/v2/{name}/manifests/{tag}".format(
name=self.remote.namespaced_upstream_name, tag=tag_name
)
url = urljoin(self.remote.url, relative_url)
downloader = self.remote.get_downloader(url=url)
to_download.append(downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}))
tag_url = urljoin(self.remote.url, relative_url)
downloader = self.remote.get_downloader(url=tag_url)
to_download.append(
downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"})
)

async with ProgressReport(
message="Processing Tags",
code="sync.processing.tag",
total=len(tag_list),
) as pb_parsed_tags:

for download_tag in asyncio.as_completed(to_download):
dl_res = await download_tag
with open(dl_res.path, "rb") as content_file:
raw_data = content_file.read()
dl_res.artifact_attributes["file"] = dl_res.path
saved_artifact = await sync_to_async(_save_artifact_blocking)(
dl_res.artifact_attributes
)
to_download_artifact = [
self._check_for_existing_manifest(download_tag)
for download_tag in asyncio.as_completed(to_download)
]

tag_name = dl_res.url.split("/")[-1]
tag_dc = DeclarativeContent(Tag(name=tag_name))
for artifact in asyncio.as_completed(to_download_artifact):
saved_artifact, content_data, raw_data, response = await artifact

digest = response.headers.get("docker-content-digest")

content_data = json.loads(raw_data)
media_type = determine_media_type(content_data, dl_res)
digest = dl_res.artifact_attributes["sha256"]
media_type = determine_media_type(content_data, response)
validate_manifest(content_data, media_type, digest)

tag_name = response.url.split("/")[-1]
tag_dc = DeclarativeContent(Tag(name=tag_name))

if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI):
list_dc = self.create_tagged_manifest_list(
tag_name, saved_artifact, content_data, media_type
Expand Down Expand Up @@ -428,29 +469,21 @@ async def create_listed_manifest(self, manifest_data):
name=self.remote.namespaced_upstream_name, digest=digest
)
manifest_url = urljoin(self.remote.url, relative_url)
manifest = await sync_to_async(
Manifest.objects.prefetch_related("contentartifact_set").filter(digest=digest).first
)()
if manifest:

def _get_content_data_blocking():
saved_artifact = manifest.contentartifact_set.first().artifact
content_data = json.load(saved_artifact.file)
saved_artifact.file.close()
return saved_artifact, content_data

saved_artifact, content_data = await sync_to_async(_get_content_data_blocking)()

if digest and (
manifest := await sync_to_async(
Manifest.objects.prefetch_related("contentartifact_set").filter(digest=digest).first
)()
):
saved_artifact, content_data, _ = await sync_to_async(self._get_content_data_blocking)(
manifest
)

else:
downloader = self.remote.get_downloader(url=manifest_url)
dl_res = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(dl_res.path, "rb") as content_file:
raw_data = content_file.read()
dl_res.artifact_attributes["file"] = dl_res.path
saved_artifact = await sync_to_async(_save_artifact_blocking)(
dl_res.artifact_attributes
saved_artifact, content_data, _, response = await self._download_artifact_data(
manifest_url
)
content_data = json.loads(raw_data)
media_type = determine_media_type(content_data, dl_res)
media_type = determine_media_type(content_data, response)
validate_manifest(content_data, media_type, digest)

manifest = Manifest(
Expand All @@ -460,6 +493,7 @@ def _get_content_data_blocking():
else 1,
media_type=manifest_data["mediaType"],
)

da = DeclarativeArtifact(
artifact=saved_artifact,
url=manifest_url,
Expand Down

0 comments on commit f860076

Please sign in to comment.