Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES/plugin_api/6201.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added ability for plugins to dispatch a task to add pull-through content to an associated repository.

Add the class var `PULL_THROUGH_SUPPORTED = True` to the plugin's repository model to enable this
feature. Plugins can also customize the dispatched task by supplying their own
`pull_through_add_content` method on their repository model.
28 changes: 28 additions & 0 deletions pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Repository(MasterModel):
TYPE = "repository"
CONTENT_TYPES = []
REMOTE_TYPES = []
PULL_THROUGH_SUPPORTED = False

name = models.TextField(db_index=True)
pulp_labels = HStoreField(default=dict)
Expand Down Expand Up @@ -353,6 +354,33 @@ def protected_versions(self):

return qs.distinct()

def pull_through_add_content(self, content_artifact):
"""
Dispatch a task to add the passed in content_artifact from the content app's pull-through
feature to this repository.

Defaults to adding the associated content of the passed in content_artifact to the
repository. Plugins should overwrite this method if more complex behavior is necessary, i.e.
adding multiple associated content units in the same task.

Args:
content_artifact (pulpcore.app.models.ContentArtifact): the content artifact to add

Returns:
Optional(Task): Returns the dispatched task or None if nothing was done
"""
cpk = content_artifact.content_id
already_present = RepositoryContent.objects.filter(
content__pk=cpk, repository=self, version_removed__isnull=True
)
if not cpk or already_present.exists():
return None

from pulpcore.plugin.tasking import dispatch, add_and_remove

body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []}
return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True)

@hook(AFTER_UPDATE, when="retain_repo_versions", has_changed=True)
def _cleanup_old_versions_hook(self):
# Do not attempt to clean up anything, while there is a transaction involving repo versions
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/content/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

from .handler import Handler # noqa: E402: module level not at top of file
from .instrumentation import instrumentation # noqa: E402: module level not at top of file
from .authentication import authenticate # noqa: E402: module level not at top of file
from .authentication import authenticate, guid # noqa: E402: module level not at top of file


log = logging.getLogger(__name__)

if settings.OTEL_ENABLED:
app = web.Application(middlewares=[authenticate, instrumentation()])
app = web.Application(middlewares=[guid, authenticate, instrumentation()])
else:
app = web.Application(middlewares=[authenticate])
app = web.Application(middlewares=[guid, authenticate])

CONTENT_MODULE_NAME = "content"

Expand Down
9 changes: 9 additions & 0 deletions pulpcore/content/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from django.conf import settings
from django.db.utils import InterfaceError, DatabaseError
from django.http.request import HttpRequest
from django_guid import set_guid
from django_guid.utils import generate_guid
from rest_framework.views import APIView
from rest_framework.exceptions import APIException

Expand All @@ -18,6 +20,13 @@
_ = gettext.gettext


@middleware
async def guid(request, handler):
"""Sets the django_guid for each request."""
set_guid(generate_guid())
return await handler(request)


Comment on lines +23 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a good change in general. Maybe that's a different story, but should we look for the cid header in the request to allow following correlations even through content access?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe yeah, we would also need to expose the value to be used in logging for the content app, currently this change is solely to allow for dispatching tasks from the content app.

@middleware
async def authenticate(request, handler):
"""Authenticates the request to the content app using the DRF authentication classes"""
Expand Down
35 changes: 24 additions & 11 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ async def _match_and_stream(self, path, request):
repo_version = distro.repository_version

if repository:
repository = await repository.acast()
# Search for publication serving the latest (last complete) version
if not publication:
try:
Expand Down Expand Up @@ -895,8 +896,11 @@ async def _match_and_stream(self, path, request):
.filter(remote=remote, url=url)
.afirst()
):
# Try to stream the ContentArtifact if already created
ca = ra.content_artifact
# Try to add content to repository if present & supported
if repository and repository.PULL_THROUGH_SUPPORTED:
await sync_to_async(repository.pull_through_add_content)(ca)
# Try to stream the ContentArtifact if already created
if ca.artifact:
return await self._serve_content_artifact(ca, headers, request)
else:
Expand All @@ -916,6 +920,7 @@ async def _match_and_stream(self, path, request):
StreamResponse(headers=headers),
ra,
save_artifact=save_artifact,
repository=repository,
)
except ClientResponseError as ce:

Expand Down Expand Up @@ -1008,11 +1013,12 @@ def _save_artifact(self, download_result, remote_artifact, request=None):
request (aiohttp.web.Request) The request.

Returns:
The associated [pulpcore.plugin.models.Artifact][].
A dictionary of created ContentArtifact objects by relative path.
"""
content_artifact = remote_artifact.content_artifact
remote = remote_artifact.remote
artifact = Artifact(**download_result.artifact_attributes, file=download_result.path)
cas = []
with transaction.atomic():
try:
with transaction.atomic():
Expand Down Expand Up @@ -1040,7 +1046,6 @@ def _save_artifact(self, download_result, remote_artifact, request=None):
c_type = remote.get_remote_artifact_content_type(rel_path)
artifacts = {rel_path: artifact}
content = c_type.init_from_artifact_and_relative_path(artifact, rel_path)
cas = []
if isinstance(content, tuple):
content, artifacts = content
try:
Expand Down Expand Up @@ -1070,21 +1075,21 @@ def _save_artifact(self, download_result, remote_artifact, request=None):
# Now try to save RemoteArtifacts for each ContentArtifact
for ca in cas:
if url := remote.get_remote_artifact_url(ca.relative_path, request=request):
remote_artifact = RemoteArtifact(
remote=remote, content_artifact=ca, url=url
)
ra = RemoteArtifact(remote=remote, content_artifact=ca, url=url)
try:
with transaction.atomic():
remote_artifact.save()
ra.save()
except IntegrityError:
# Remote artifact must have already been saved during a parallel request
log.info(f"RemoteArtifact for {url} already exists.")

else:
# Normal on-demand downloading, update CA to point to new saved Artifact
content_artifact.artifact = artifact
content_artifact.save()
return artifact
ret = {content_artifact.relative_path: content_artifact}
if cas:
ret.update({ca.relative_path: ca for ca in cas})
return ret

async def _serve_content_artifact(self, content_artifact, headers, request):
"""
Expand Down Expand Up @@ -1165,7 +1170,9 @@ def _build_url(**kwargs):
else:
raise NotImplementedError()

async def _stream_remote_artifact(self, request, response, remote_artifact, save_artifact=True):
async def _stream_remote_artifact(
self, request, response, remote_artifact, save_artifact=True, repository=None
):
"""
Stream and save a RemoteArtifact.

Expand All @@ -1175,6 +1182,8 @@ async def _stream_remote_artifact(self, request, response, remote_artifact, save
remote_artifact (pulpcore.plugin.models.RemoteArtifact) The RemoteArtifact
to fetch and then stream back to the client
save_artifact (bool): Override the save behavior on the streamed RemoteArtifact
repository (:class:`~pulpcore.plugin.models.Repository`): An optional repository to save
the content to if supported

Raises:
[aiohttp.web.HTTPNotFound][] when no
Expand Down Expand Up @@ -1309,9 +1318,13 @@ async def finalize():
artifacts_size_counter.add(size)

if save_artifact and remote.policy != Remote.STREAMED:
await asyncio.shield(
content_artifacts = await asyncio.shield(
sync_to_async(self._save_artifact)(download_result, remote_artifact, request)
)
# Try to add content to repository if present & supported
if repository and repository.PULL_THROUGH_SUPPORTED:
ca = content_artifacts[remote_artifact.content_artifact.relative_path]
await sync_to_async(repository.pull_through_add_content)(ca)
await response.write_eof()

if response.status == 404:
Expand Down
85 changes: 69 additions & 16 deletions pulpcore/tests/unit/content/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,27 @@ def checkpoint_publication_2(repo_version_3, noncheckpoint_publication):
def test_save_artifact(c1, ra1, download_result_mock):
"""Artifact needs to be created."""
handler = Handler()
new_artifact = handler._save_artifact(download_result_mock, ra1)
content_artifacts = handler._save_artifact(download_result_mock, ra1)
c1 = Content.objects.get(pk=c1.pk)
assert new_artifact is not None
assert c1._artifacts.get().pk == new_artifact.pk
assert content_artifacts is not None
assert ra1.content_artifact.relative_path in content_artifacts
artifact = content_artifacts[ra1.content_artifact.relative_path].artifact
assert c1._artifacts.get().pk == artifact.pk


def test_save_artifact_artifact_already_exists(c2, ra1, ra2, download_result_mock):
"""Artifact turns out to already exist."""
cch = Handler()
new_artifact = cch._save_artifact(download_result_mock, ra1)
new_content_artifacts = cch._save_artifact(download_result_mock, ra1)

existing_artifact = cch._save_artifact(download_result_mock, ra2)
existing_content_artifacts = cch._save_artifact(download_result_mock, ra2)
c2 = Content.objects.get(pk=c2.pk)
assert existing_artifact.pk == new_artifact.pk
assert c2._artifacts.get().pk == existing_artifact.pk
assert ra1.content_artifact.relative_path in new_content_artifacts
assert ra2.content_artifact.relative_path in existing_content_artifacts
new_artifact = new_content_artifacts[ra1.content_artifact.relative_path]
existing_artifact = existing_content_artifacts[ra2.content_artifact.relative_path]
assert new_artifact.artifact.pk == existing_artifact.artifact.pk
assert c2._artifacts.get().pk == existing_artifact.artifact.pk


# Test pull through features
Expand Down Expand Up @@ -176,9 +182,15 @@ async def create_remote_artifact(remote, ca):
)


async def create_distribution(remote):
async def create_repository():
return await Repository.objects.acreate(name=str(uuid.uuid4()))


async def create_distribution(remote, repository=None):
name = str(uuid.uuid4())
return await Distribution.objects.acreate(name=name, base_path=name, remote=remote)
return await Distribution.objects.acreate(
name=name, base_path=name, remote=remote, repository=repository
)


@pytest.mark.asyncio
Expand Down Expand Up @@ -285,7 +297,8 @@ def test_pull_through_save_single_artifact_content(
ra = RemoteArtifact(url=f"{remote123.url}/c123", remote=remote123, content_artifact=ca)

# Content is saved during handler._save_artifact
artifact = handler._save_artifact(download_result_mock, ra, request=request123)
content_artifacts = handler._save_artifact(download_result_mock, ra, request=request123)
artifact = content_artifacts[ra.content_artifact.relative_path].artifact

remote123.get_remote_artifact_content_type.assert_called_once_with("c123")
content_init_mock.assert_called_once_with(artifact, "c123")
Expand Down Expand Up @@ -319,14 +332,16 @@ def content_init(art, path):
ca = ContentArtifact(relative_path="c123")
ra = RemoteArtifact(url=f"{remote123.url}/c123", remote=remote123, content_artifact=ca)

artifact = handler._save_artifact(download_result_mock, ra, request123)

ca = artifact.content_memberships.first()
assert ca.content is not None
content_artifacts = handler._save_artifact(download_result_mock, ra, request123)
ca1 = content_artifacts["c123"]
ca2 = content_artifacts["c123abc"]
assert ca1.content is not None
assert ca2.content == ca1.content
assert ca1.artifact == artifact123

artifacts = set(ca.content._artifacts.all())
artifacts = set(ca1.content._artifacts.all())
assert len(artifacts) == 2
assert {artifact, artifact123} == artifacts
assert {ca2.artifact, artifact123} == artifacts


@pytest.mark.django_db
Expand Down Expand Up @@ -446,3 +461,41 @@ def test_handle_checkpoint_before_first_ts(
)
with pytest.raises(PathNotResolved):
Handler._select_checkpoint_publication(checkpoint_distribution, f"{request_ts}/")


@pytest.mark.asyncio
@pytest.mark.django_db
async def test_pull_through_repository_add(request123, monkeypatch):
"""Test that repository adding is called when supported."""
handler = Handler()
handler._stream_content_artifact = AsyncMock()

content = await create_content()
ca = await create_content_artifact(content)
remote = await create_remote()
await create_remote_artifact(remote, ca)
repo = await create_repository()
monkeypatch.setattr(Remote, "get_remote_artifact_content_type", Mock(return_value=Content))
monkeypatch.setattr(Repository, "pull_through_add_content", Mock())
distro = await create_distribution(remote, repository=repo)

try:
# Assert with Repository.PULL_THROUGH_SUPPORTED=False the method isn't called
await handler._match_and_stream(f"{distro.base_path}/c123", request123)
handler._stream_content_artifact.assert_called_once()
assert ca in handler._stream_content_artifact.call_args[0]
repo.pull_through_add_content.assert_not_called()

# Now set PULL_THROUGH_SUPPORTED=True and see the method is called with CA
monkeypatch.setattr(Repository, "PULL_THROUGH_SUPPORTED", True)
handler._stream_content_artifact.reset_mock()
await handler._match_and_stream(f"{distro.base_path}/c123", request123)
handler._stream_content_artifact.assert_called_once()
assert ca in handler._stream_content_artifact.call_args[0]
repo.pull_through_add_content.assert_called_once()
assert ca in repo.pull_through_add_content.call_args[0]
finally:
await content.adelete()
await repo.adelete()
await remote.adelete()
await distro.adelete()
Comment on lines +497 to +501
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need that... Django unittests usually run in a transaction that never gets committed.