Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taught import to be able to use less than all-available workers. #4192

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/4068.bugfix
@@ -0,0 +1 @@
Taught pulp-import to be able to use a subset of available worker-threads.
4 changes: 4 additions & 0 deletions pulpcore/app/settings.py
Expand Up @@ -294,6 +294,10 @@

HIDE_GUARDED_DISTRIBUTIONS = False

# What percentage of available-workers will pulpimport use at a time, max
# By default, use all available workers.
IMPORT_WORKERS_PERCENT = 100

# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
# Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html
from dynaconf import DjangoDynaconf, Validator # noqa
Expand Down
24 changes: 20 additions & 4 deletions pulpcore/app/tasks/importer.py
Expand Up @@ -8,6 +8,7 @@
from gettext import gettext as _
from logging import getLogger

from django.conf import settings
from django.core.files.storage import default_storage
from django.db.models import F
from naya.json import stream_array, tokenize
Expand All @@ -28,6 +29,7 @@
Repository,
Task,
TaskGroup,
Worker,
)
from pulpcore.app.modelresource import (
ArtifactResource,
Expand Down Expand Up @@ -506,6 +508,18 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
default_storage.save(base_path, f)

# Now import repositories, in parallel.

# We want to be able to limit the number of available-workers that import will consume,
# so that pulp can continue to work while doing an import. We accomplish this by creating
# a reserved-resource string for each repo-import-task based on that repo's index in
# the dispatch loop, mod number-of-workers-to-consume.
#
# By default (setting is not-set), import will continue to use 100% of the available
# workers.
import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100))
total_workers = Worker.objects.online_workers().count()
import_workers = max(1, int(total_workers * (import_workers_percent / 100.0)))

with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file:
data = json.load(repo_data_file)
gpr = GroupProgressReport(
Expand All @@ -517,14 +531,16 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
)
gpr.save()

for src_repo in data:
for index, src_repo in enumerate(data):
# Lock the repo we're importing-into
dest_repo_name = _get_destination_repo_name(importer, src_repo["name"])

# pulpcore-worker limiter
worker_rsrc = f"import-worker-{index % import_workers}"
exclusive_resources = [worker_rsrc]
try:
dest_repo = Repository.objects.get(name=dest_repo_name)
except Repository.DoesNotExist:
if create_repositories:
exclusive_resources = []
dest_repo_pk = ""
else:
log.warning(
Expand All @@ -534,7 +550,7 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
)
continue
else:
exclusive_resources = [dest_repo]
exclusive_resources.append(dest_repo)
dest_repo_pk = dest_repo.pk

dispatch(
Expand Down