Skip to content

Commit

Permalink
Improve handling of download concurrency
Browse files Browse the repository at this point in the history
Don't save download concurrency defaults in the database. Make it easier
for individual plugins to override.

closes: #8897
https://pulp.plan.io/issues/8897
  • Loading branch information
dralley committed Jun 15, 2021
1 parent 282b6a5 commit 7f2e315
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES/8897.feature
@@ -0,0 +1 @@
The default ``download_concurrency`` of 10 was found to be too high for many client types, it has been reduced to 8. Additionally, where before ``download_concurrency`` would be set to a default value upon creation, it will now be set NULL (but a default value will still be used).
1 change: 1 addition & 0 deletions CHANGES/plugin_api/8897.feature
@@ -0,0 +1 @@
Added a field ``DEFAULT_DOWNLOAD_CONCURRENCY`` to the Remote base class - plugin writers can override the number of concurrent downloads for each type of remote. The default value is 8.
19 changes: 19 additions & 0 deletions pulpcore/app/migrations/0066_download_concurrency_changes.py
@@ -0,0 +1,19 @@
# Generated by Django 2.2.24 on 2021-06-13 03:14

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0065_merge_20210615_1211'),
]

operations = [
migrations.AlterField(
model_name='remote',
name='download_concurrency',
field=models.PositiveIntegerField(null=True, validators=[django.core.validators.MinValueValidator(1)]),
),
]
9 changes: 7 additions & 2 deletions pulpcore/app/models/repository.py
Expand Up @@ -256,6 +256,8 @@ class Remote(MasterModel):
ON_DEMAND = "on_demand"
STREAMED = "streamed"

DEFAULT_DOWNLOAD_CONCURRENCY = 8

POLICY_CHOICES = (
(IMMEDIATE, "When syncing, download all metadata and content now."),
(
Expand Down Expand Up @@ -288,7 +290,7 @@ class Remote(MasterModel):
proxy_username = models.TextField(null=True)
proxy_password = models.TextField(null=True)

download_concurrency = models.PositiveIntegerField(default=10)
download_concurrency = models.PositiveIntegerField(null=True, validators=[MinValueValidator(1)])
policy = models.TextField(choices=POLICY_CHOICES, default=IMMEDIATE)

total_timeout = models.FloatField(
Expand Down Expand Up @@ -323,7 +325,10 @@ def download_factory(self):
try:
return self._download_factory
except AttributeError:
self._download_factory = DownloaderFactory(self)
# if a download concurrency value was not provided (0), then use the default
# concurrency, which can be overridden on a per-remote-type basis.
concurrency = self.download_concurrency or self.DEFAULT_DOWNLOAD_CONCURRENCY
self._download_factory = DownloaderFactory(self, download_concurrency=concurrency)
return self._download_factory

@property
Expand Down
7 changes: 6 additions & 1 deletion pulpcore/app/serializers/repository.py
Expand Up @@ -136,7 +136,12 @@ class RemoteSerializer(ModelSerializer):
help_text="Timestamp of the most recent update of the remote.", read_only=True
)
download_concurrency = serializers.IntegerField(
help_text="Total number of simultaneous connections.", required=False, min_value=1
help_text=(
"Total number of simultaneous connections. If not provided or set to zero (0), "
"then the default value will be used."
),
required=False,
min_value=1,
)
policy = serializers.ChoiceField(
help_text="The policy to use when downloading content.",
Expand Down
6 changes: 4 additions & 2 deletions pulpcore/download/factory.py
Expand Up @@ -65,15 +65,17 @@ class DownloaderFactory:
to session continuation implementation in various servers.
"""

def __init__(self, remote, downloader_overrides=None):
def __init__(self, remote, downloader_overrides=None, download_concurrency=None):
"""
Args:
remote (:class:`~pulpcore.plugin.models.Remote`): The remote used to populate
downloader settings.
downloader_overrides (dict): Keyed on a scheme name, e.g. 'https' or 'ftp' and the value
is the downloader class to be used for that scheme, e.g.
{'https': MyCustomDownloader}. These override the default values.
download_concurrency (int): How many files may be downloaded concurrently.
"""
download_concurrency = download_concurrency or remote.DEFAULT_DOWNLOAD_CONCURRENCY
self._remote = remote
self._download_class_map = copy.copy(PROTOCOL_MAP)
if downloader_overrides:
Expand All @@ -85,7 +87,7 @@ def __init__(self, remote, downloader_overrides=None):
"file": self._generic,
}
self._session = self._make_aiohttp_session_from_remote()
self._semaphore = asyncio.Semaphore(value=remote.download_concurrency)
self._semaphore = asyncio.Semaphore(value=download_concurrency)
atexit.register(self._session_cleanup)

def _session_cleanup(self):
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/download/http.py
Expand Up @@ -201,9 +201,9 @@ async def _run(self, extra_data=None):
"""
Download, validate, and compute digests on the `url`. This is a coroutine.
This method is decorated with a backoff-and-retry behavior to retry HTTP 429 and
some 5XX errors. It retries with exponential backoff 10 times before allowing
a final exception to be raised.
This method is decorated with a backoff-and-retry behavior to retry some errors.
It retries with exponential backoff 10 times before allowing a final exception to
be raised.
This method provides the same return object type and documented in
:meth:`~pulpcore.plugin.download.BaseDownloader._run`.
Expand Down
Expand Up @@ -254,7 +254,7 @@ def test_read(self):
self._compare_results(self.remote_attrs, self.remote)

def test_update(self):
data = {"download_concurrency": 66, "policy": "immediate"}
data = {"download_concurrency": 23, "policy": "immediate"}
self.remotes_api.partial_update(self.remote.pulp_href, data)
time.sleep(1) # without this, the read returns the pre-patch values
new_remote = self.remotes_api.read(self.remote.pulp_href)
Expand Down

0 comments on commit 7f2e315

Please sign in to comment.