Skip to content

Commit

Permalink
Adopt new "shared_resources" / "exclusive_resources" API
Browse files Browse the repository at this point in the history
  • Loading branch information
dralley committed Aug 26, 2021
1 parent 213ab1b commit 28b3417
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES/9255.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved the parallelism of copy operations.
24 changes: 17 additions & 7 deletions pulp_rpm/app/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def sync(self, request, pk):

result = dispatch(
tasks.synchronize,
[repository, remote],
shared_resources=[remote],
exclusive_resources=[repository],
kwargs={
"mirror": mirror,
"remote_pk": str(remote.pk),
Expand Down Expand Up @@ -257,7 +258,7 @@ def create(self, request):

result = dispatch(
tasks.publish,
[repository_version.repository],
exclusive_resources=[repository_version.repository],
kwargs={
"repository_version_pk": repository_version.pk,
"metadata_signing_service": signing_service_pk,
Expand Down Expand Up @@ -303,10 +304,14 @@ def create(self, request):
dependency_solving = serializer.validated_data["dependency_solving"]
config = serializer.validated_data["config"]

config, repos = self._process_config(config)
config, shared_repos, exclusive_repos = self._process_config(config)

async_result = dispatch(
tasks.copy_content, repos, args=[config, dependency_solving], kwargs={}
tasks.copy_content,
shared_resources=shared_repos,
exclusive_resources=exclusive_repos,
args=[config, dependency_solving],
kwargs={},
)
return OperationPostponedResponse(async_result, request)

Expand All @@ -318,7 +323,11 @@ def _process_config(self, config):
repos so that the task can lock on them.
"""
result = []
repos = []
# exclusive use of the destination repos is needed since new repository versions are being
# created, but source repos can be accessed in a read-only fashion in parallel, so long
# as there are no simultaneous modifications.
shared_repos = []
exclusive_repos = []

for entry in config:
r = dict()
Expand All @@ -328,7 +337,8 @@ def _process_config(self, config):
dest_repo = NamedModelViewSet().get_resource(entry["dest_repo"], RpmRepository)
r["source_repo_version"] = source_version.pk
r["dest_repo"] = dest_repo.pk
repos.extend((source_version.repository, dest_repo))
shared_repos.append(source_version.repository)
exclusive_repos.append(dest_repo)

if "dest_base_version" in entry:
try:
Expand All @@ -347,7 +357,7 @@ def _process_config(self, config):
r["content"].append(NamedModelViewSet().extract_pk(c))
result.append(r)

return result, repos
return result, shared_repos, exclusive_repos


class PackageGroupViewSet(ReadOnlyContentViewSet):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ django_readonly_field
jsonschema>=3.0
libcomps~=0.1.15
productmd~=1.33.0
pulpcore>=3.15.0.dev
pulpcore>=3.15.0
PyGObject~=3.22
solv~=0.7.17
aiohttp_xmlrpc

0 comments on commit 28b3417

Please sign in to comment.