Skip to content

Commit

Permalink
Introduce Pulp Resource Name (PRN) into task resource locking
Browse files Browse the repository at this point in the history
fixes: #4315
  • Loading branch information
gerrod3 authored and dralley committed Mar 26, 2024
1 parent fc8b15c commit b7a2b6b
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGES/4315.deprecation
@@ -0,0 +1 @@
Task filter ``reserved_resources_record`` has been deprecated and planned for removal in pulpcore 3.55.
2 changes: 2 additions & 0 deletions CHANGES/4315.misc
@@ -0,0 +1,2 @@
Added new task resource locking method ``get_prn`` that does not change with different settings.
``get_url`` is planned to be phased out of resource locking by pulpcore 3.55.
8 changes: 4 additions & 4 deletions pulpcore/app/replica.py
Expand Up @@ -60,7 +60,7 @@ def __init__(self, pulp_ctx, task_group, tls_settings):
# TODO check and compare this to distribution locking on the distribution viewset.
if settings.DOMAIN_ENABLED:
uri = f"/{self.domain.name}{uri}"
self.distros_uri = uri
self.distros_uris = [uri, f"pdrn:{self.domain.pulp_id}:distributions"]

@staticmethod
def needs_update(fields_dict, model_instance):
Expand Down Expand Up @@ -176,7 +176,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
general_update,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=[self.distros_uri],
exclusive_resources=self.distros_uris,
args=(distro.pk, self.app_label, self.distribution_serializer_name),
kwargs={
"data": distribution_data,
Expand All @@ -190,7 +190,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
general_create,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=[self.distros_uri],
exclusive_resources=self.distros_uris,
args=(self.app_label, self.distribution_serializer_name),
kwargs={"data": distribution_data},
)
Expand Down Expand Up @@ -221,7 +221,7 @@ def remove_missing(self, names):
dispatch(
general_multi_delete,
task_group=self.task_group,
exclusive_resources=[self.distros_uri],
exclusive_resources=self.distros_uris,
args=(distribution_ids,),
)

Expand Down
43 changes: 41 additions & 2 deletions pulpcore/app/util.py
Expand Up @@ -13,7 +13,7 @@
import gnupg

from django.conf import settings
from django.db.models import Sum
from django.db.models import Model, Sum
from django.urls import Resolver404, resolve, reverse
from opentelemetry import metrics

Expand All @@ -32,7 +32,7 @@
def get_url(model, domain=None):
"""
Get a resource url for the specified model instance or class. This returns the path component of
the resource URI. This is used in our resource locking/reservation code to identify resources.
the resource URI.
Args:
model (django.models.Model): A model instance or class.
Expand All @@ -59,6 +59,45 @@ def get_url(model, domain=None):
return reverse(get_view_name_for_model(model, view_action), kwargs=kwargs)


def get_prn(instance=None, uri=None):
"""
Get a Pulp Resource Name (PRN) for the specified model instance. It is similar to a HREF
url in that it uniquely identifies a resource, but it also has the guarantee that it will not
change regardless of API_ROOT or DOMAIN_ENABLED. This is used in our resource locking/
reservation code to identify resources.
The format for the PRN is as follows:
```
prn:model-label-lower:pk
```
Examples:
instance=FileRepository(pk=123) -> prn:file.filerepository:123
instance=Artifact(pk=abc) -> prn:core.artifact:abc
uri=/rerouted/api/v3/repositories/rpm/rpm/123/versions/2/ -> prn:core.repositoryversion:abc
uri=/pulp/foodomain/api/v3/content/ansible/role/123/ -> prn:ansible.role:123
Args:
instance Optional(django.models.Model): A model instance.
uri Optional(str): A resource URI
Returns:
prn (str): The PRN of the passed in resource
"""
if uri:
from pulpcore.app.viewsets import NamedModelViewSet

instance = NamedModelViewSet.get_resource(uri)

if not isinstance(instance, Model):
raise ValidationError(_("instance({}) must be a Model").format(instance))

if isinstance(instance, models.MasterModel):
instance = instance.cast()

return f"prn:{instance._meta.label_lower}:{instance.pk}"


def extract_pk(uri):
"""
Resolve a resource URI to a simple PK value.
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/views/orphans.py
Expand Up @@ -23,6 +23,7 @@ def delete(self, request, format=None):
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
task = dispatch(orphan_cleanup, exclusive_resources=[uri])
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:orphans"]
task = dispatch(orphan_cleanup, exclusive_resources=exclusive_resources)

return OperationPostponedResponse(task, request)
5 changes: 4 additions & 1 deletion pulpcore/app/views/repair.py
Expand Up @@ -30,6 +30,9 @@ def post(self, request):
uri = "/api/v3/repair/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
task = dispatch(repair_all_artifacts, exclusive_resources=[uri], args=[verify_checksums])
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:repair"]
task = dispatch(
repair_all_artifacts, exclusive_resources=exclusive_resources, args=[verify_checksums]
)

return OperationPostponedResponse(task, request)
4 changes: 4 additions & 0 deletions pulpcore/app/viewsets/custom_filters.py
Expand Up @@ -16,6 +16,7 @@

from pulpcore.app.models import ContentArtifact, RepositoryVersion, Publication
from pulpcore.app.viewsets import NamedModelViewSet
from pulpcore.app.loggers import deprecation_logger


class ReservedResourcesFilter(Filter):
Expand Down Expand Up @@ -83,6 +84,9 @@ def filter(self, qs, value):
Returns:
django.db.models.query.QuerySet: Queryset filtered by the reserved resource
"""
deprecation_logger.warning(
"This filter is deprecated. Please use reserved_resources(__in) instead."
)

if value is None:
# a value was not supplied by a user
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/viewsets/orphans.py
Expand Up @@ -29,10 +29,11 @@ def cleanup(self, request):
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:orphans"]

task = dispatch(
orphan_cleanup,
exclusive_resources=[uri],
exclusive_resources=exclusive_resources,
kwargs={"content_pks": content_pks, "orphan_protection_time": orphan_protection_time},
)

Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/viewsets/publication.py
Expand Up @@ -41,6 +41,7 @@
LabelFilter,
RepositoryVersionFilter,
)
from pulpcore.app.util import get_domain


class PublicationContentFilter(Filter):
Expand Down Expand Up @@ -543,7 +544,7 @@ def get_queryset(self):

def async_reserved_resources(self, instance):
"""Return resource that locks all Distributions."""
return ["/api/v3/distributions/"]
return ["/api/v3/distributions/", f"pdrn:{get_domain().pulp_id}:distributions"]


class ListDistributionViewSet(BaseDistributionViewSet, mixins.ListModelMixin):
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/viewsets/reclaim.py
Expand Up @@ -42,7 +42,7 @@ def reclaim(self, request):
uri = "/api/v3/repositories/reclaim_space/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
exclusive_resources = [uri]
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:reclaim_space"]

task = dispatch(
reclaim_space,
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/viewsets/replica.py
Expand Up @@ -126,10 +126,11 @@ def replicate(self, request, pk):
uri = "/api/v3/servers/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:servers"]

dispatch(
replicate_distributions,
exclusive_resources=[uri],
exclusive_resources=exclusive_resources,
kwargs={"server_pk": pk},
task_group=task_group,
)
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/app/viewsets/task.py
Expand Up @@ -46,7 +46,9 @@
class TaskFilter(BaseFilterSet):
# This filter is deprecated and badly documented, but we need to keep it for compatibility
# reasons
reserved_resources_record = ReservedResourcesRecordFilter()
reserved_resources_record = ReservedResourcesRecordFilter(
help_text=_("Deprecated, will be removed in pulpcore 3.55. Use reserved_resources instead.")
)
created_resources = CreatedResourcesFilter()
# Non model field filters
reserved_resources = ReservedResourcesFilter(exclusive=True, shared=True)
Expand Down
10 changes: 8 additions & 2 deletions pulpcore/tasking/tasks.py
Expand Up @@ -13,7 +13,7 @@
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task
from pulpcore.app.util import current_task, get_domain, get_url
from pulpcore.app.util import current_task, get_domain, get_prn, get_url
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
Expand All @@ -30,7 +30,11 @@ def _validate_and_get_resources(resources):
if isinstance(r, str):
resource_set.add(r)
elif isinstance(r, Model):
# TODO: In pulpcore 3.55 remove get_url from this list
# If 3.55 requires downtime then this line can be removed with no further changes
# Else, we must update the scheduling logic to account for prior tasks using get_url
resource_set.add(get_url(r))
resource_set.add(get_prn(r))
elif r is None:
# Silently drop None values
pass
Expand Down Expand Up @@ -146,8 +150,10 @@ def dispatch(
shared_resources = _validate_and_get_resources(shared_resources)

# A task that is exclusive on a domain will block all tasks within that domain
domain_prn = get_prn(get_domain())
domain_url = get_url(get_domain())
if domain_url not in exclusive_resources:
if not (domain_prn in exclusive_resources or domain_url in exclusive_resources):
shared_resources.append(domain_prn)
shared_resources.append(domain_url)
resources = exclusive_resources + [f"shared:{resource}" for resource in shared_resources]

Expand Down
89 changes: 76 additions & 13 deletions pulpcore/tests/functional/api/using_plugin/test_tasks.py
@@ -1,5 +1,6 @@
"""Test that operations can be performed over tasks."""
import json
import subprocess
from urllib.parse import urljoin
from uuid import uuid4

Expand Down Expand Up @@ -46,7 +47,6 @@ def setup_filter_fixture(
file_repo,
file_remote_ssl_factory,
basic_manifest_path,
tasks_api_client,
monitor_task,
):
remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand")
Expand All @@ -59,29 +59,31 @@ def setup_filter_fixture(
repo_update_action = file_bindings.RepositoriesFileApi.partial_update(
file_repo.pulp_href, {"description": str(uuid4())}
)
repo_update_task = tasks_api_client.read(repo_update_action.task)
repo_update_task = monitor_task(repo_update_action.task)

return (repo_sync_task, repo_update_task)
return repo_sync_task, repo_update_task, file_repo, remote


def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_client):
"""Filter all tasks by a particular reserved resource."""
repo_sync_task, repo_update_task = setup_filter_fixture
reserved_resources_record = repo_update_task.reserved_resources_record[0]
repo_sync_task, repo_update_task, _, _ = setup_filter_fixture
for resource in repo_update_task.reserved_resources_record:
if "/api/v3/repositories/file/file/" in resource:
reserved_resources_record = resource
break
else:
assert False, "File repository not found in reserved_resources_record"

results = tasks_api_client.list(reserved_resources_record=[reserved_resources_record]).results
# Why reserved_resources_record parameter needs to be a list here? ^
results = tasks_api_client.list(reserved_resources=reserved_resources_record).results

assert results[0].pulp_href == repo_update_task.pulp_href
assert len(results) == 2

# Filter all tasks by a non-existing reserved resource.
with pytest.raises(ApiException) as ctx:
tasks_api_client.list(
reserved_resources_record=["a_resource_should_be_never_named_like_this"]
)

assert ctx.value.status == 400
results = tasks_api_client.list(
reserved_resources="a_resource_should_be_never_named_like_this"
).results
assert len(results) == 0

# Filter all tasks by a particular created resource.
created_resources = repo_sync_task.created_resources[0]
Expand All @@ -97,3 +99,64 @@ def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_clie
tasks_api_client.list(created_resources=created_resources)

assert ctx.value.status == 404


def get_prn(uri):
commands = f"from pulpcore.app.util import get_prn; print(get_prn(uri='{uri}'));"
process = subprocess.run(["pulpcore-manager", "shell", "-c", commands], capture_output=True)

assert process.returncode == 0
prn = process.stdout.decode().strip()
return prn


def test_reserved_resources_filter(setup_filter_fixture, tasks_api_client):
"""Filter tasks using the ReservedResourcesFilter type filters."""
repo_sync_task, repo_update_task, repo, remote = setup_filter_fixture
task_hrefs = {repo_sync_task.pulp_href, repo_update_task.pulp_href}

repo_prn = get_prn(repo.pulp_href)
remote_prn = get_prn(remote.pulp_href)

# Sanity check, TODO: remove pulp_href from filter checks in pulpcore 3.55
assert repo_prn in repo_sync_task.reserved_resources_record
assert f"shared:{remote_prn}" in repo_sync_task.reserved_resources_record
assert repo_prn in repo_update_task.reserved_resources_record
assert remote_prn not in repo_update_task.reserved_resources_record

# reserved_resources filter
href_results = tasks_api_client.list(reserved_resources=repo.pulp_href)
assert href_results.count == 2
assert set(h.pulp_href for h in href_results.results) == task_hrefs
prn_results = tasks_api_client.list(reserved_resources=repo_prn)
assert set(h.pulp_href for h in prn_results.results) == task_hrefs
mixed_results = tasks_api_client.list(reserved_resources__in=[repo.pulp_href, remote_prn])
assert mixed_results.count == 1
assert mixed_results.results[0].pulp_href == repo_sync_task.pulp_href

# shared_resources filter
href_results = tasks_api_client.list(shared_resources=repo.pulp_href)
assert href_results.count == 0
href_results = tasks_api_client.list(shared_resources=remote.pulp_href)
assert href_results.count == 1
assert href_results.results[0].pulp_href == repo_sync_task.pulp_href
prn_results = tasks_api_client.list(shared_resources=repo_prn)
assert prn_results.count == 0
prn_results = tasks_api_client.list(shared_resources=remote_prn)
assert prn_results.count == 1
assert prn_results.results[0].pulp_href == repo_sync_task.pulp_href
mixed_results = tasks_api_client.list(shared_resources__in=[repo_prn, remote.pulp_href])
assert mixed_results.count == 0

# exclusive_resources filter
href_results = tasks_api_client.list(exclusive_resources=remote.pulp_href)
assert href_results.count == 0
href_results = tasks_api_client.list(exclusive_resources=repo.pulp_href)
assert href_results.count == 2
assert set(h.pulp_href for h in href_results.results) == task_hrefs
prn_results = tasks_api_client.list(exclusive_resources=remote_prn)
assert prn_results.count == 0
prn_results = tasks_api_client.list(exclusive_resources=repo_prn)
assert set(h.pulp_href for h in prn_results.results) == task_hrefs
mixed_results = tasks_api_client.list(exclusive_resources__in=[repo_prn, remote_prn])
assert mixed_results.count == 0

0 comments on commit b7a2b6b

Please sign in to comment.