Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taught import to be able to use less than all-available workers. #4191

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/4068.bugfix
@@ -0,0 +1 @@
Taught pulp-import to be able to use a subset of available worker-threads.
4 changes: 4 additions & 0 deletions pulpcore/app/settings.py
Expand Up @@ -296,6 +296,10 @@

DEFAULT_AUTO_FIELD = "django.db.models.AutoField"

# What percentage of available-workers will pulpimport use at a time, max
# By default, use all available workers.
IMPORT_WORKERS_PERCENT = 100

# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
# Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html
import dynaconf # noqa
Expand Down
21 changes: 20 additions & 1 deletion pulpcore/app/tasks/importer.py
Expand Up @@ -8,6 +8,7 @@
from gettext import gettext as _
from logging import getLogger

from django.conf import settings
from django.core.files.storage import default_storage
from django.db.models import F
from naya.json import stream_array, tokenize
Expand All @@ -28,6 +29,7 @@
Repository,
Task,
TaskGroup,
Worker,
)
from pulpcore.app.modelresource import (
ArtifactResource,
Expand Down Expand Up @@ -482,6 +484,18 @@ def validate_and_assemble(toc_filename):
default_storage.save(base_path, f)

# Now import repositories, in parallel.

# We want to be able to limit the number of available-workers that import will consume,
# so that pulp can continue to work while doing an import. We accomplish this by creating
# a reserved-resource string for each repo-import-task based on that repo's index in
# the dispatch loop, mod number-of-workers-to-consume.
#
# By default (setting is not-set), import will continue to use 100% of the available
# workers.
import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100))
total_workers = Worker.objects.online_workers().count()
import_workers = max(1, int(total_workers * (import_workers_percent / 100.0)))

with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file:
data = json.load(repo_data_file)
gpr = GroupProgressReport(
Expand All @@ -493,7 +507,10 @@ def validate_and_assemble(toc_filename):
)
gpr.save()

for src_repo in data:
for index, src_repo in enumerate(data):
# pulpcore-worker limiter
worker_rsrc = f"import-worker-{index % import_workers}"
exclusive_resources = [worker_rsrc]
try:
dest_repo = _destination_repo(importer, src_repo["name"])
except Repository.DoesNotExist:
Expand All @@ -503,6 +520,8 @@ def validate_and_assemble(toc_filename):
)
)
continue
else:
exclusive_resources.append(dest_repo)

dispatch(
import_repository_version,
Expand Down