Skip to content

Commit

Permalink
Improve handling of download concurrency
Browse files Browse the repository at this point in the history
Don't save download concurrency defaults in the database. Make it easier
for individual plugins to override.

closes: #8897
https://pulp.plan.io/issues/8897
  • Loading branch information
dralley committed Jun 16, 2021
1 parent 282b6a5 commit 620e60f
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES/8897.feature
@@ -0,0 +1 @@
Where before ``download_concurrency`` would previously be set to a default value upon creation, it will now be set NULL (but a default value will still be used).
1 change: 1 addition & 0 deletions CHANGES/plugin_api/8897.feature
@@ -0,0 +1 @@
Added a field ``DEFAULT_DOWNLOAD_CONCURRENCY`` to the Remote base class - plugin writers can override the number of concurrent downloads for each type of remote. The default value is 10.
19 changes: 19 additions & 0 deletions pulpcore/app/migrations/0066_download_concurrency_changes.py
@@ -0,0 +1,19 @@
# Generated by Django 2.2.24 on 2021-06-13 03:14

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0065_merge_20210615_1211'),
]

operations = [
migrations.AlterField(
model_name='remote',
name='download_concurrency',
field=models.PositiveIntegerField(null=True, validators=[django.core.validators.MinValueValidator(1, 'Download concurrency must be at least 1')]),
),
]
8 changes: 6 additions & 2 deletions pulpcore/app/models/repository.py
Expand Up @@ -240,7 +240,7 @@ class Remote(MasterModel):
username (models.TextField): The username to be used for authentication when syncing.
password (models.TextField): The password to be used for authentication when syncing.
download_concurrency (models.PositiveIntegerField): Total number of
simultaneous connections.
simultaneous connections allowed to any remote during a sync.
policy (models.TextField): The policy to use when downloading content.
total_timeout (models.FloatField): Value for aiohttp.ClientTimeout.total on connections
connect_timeout (models.FloatField): Value for aiohttp.ClientTimeout.connect
Expand All @@ -256,6 +256,8 @@ class Remote(MasterModel):
ON_DEMAND = "on_demand"
STREAMED = "streamed"

DEFAULT_DOWNLOAD_CONCURRENCY = 10

POLICY_CHOICES = (
(IMMEDIATE, "When syncing, download all metadata and content now."),
(
Expand Down Expand Up @@ -288,7 +290,9 @@ class Remote(MasterModel):
proxy_username = models.TextField(null=True)
proxy_password = models.TextField(null=True)

download_concurrency = models.PositiveIntegerField(default=10)
download_concurrency = models.PositiveIntegerField(
null=True, validators=[MinValueValidator(1, "Download concurrency must be at least 1")]
)
policy = models.TextField(choices=POLICY_CHOICES, default=IMMEDIATE)

total_timeout = models.FloatField(
Expand Down
8 changes: 7 additions & 1 deletion pulpcore/app/serializers/repository.py
Expand Up @@ -136,7 +136,13 @@ class RemoteSerializer(ModelSerializer):
help_text="Timestamp of the most recent update of the remote.", read_only=True
)
download_concurrency = serializers.IntegerField(
help_text="Total number of simultaneous connections.", required=False, min_value=1
help_text=(
"Total number of simultaneous connections. If not set then the default "
"value will be used."
),
allow_null=True,
required=False,
min_value=1,
)
policy = serializers.ChoiceField(
help_text="The policy to use when downloading content.",
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/download/factory.py
Expand Up @@ -74,6 +74,8 @@ def __init__(self, remote, downloader_overrides=None):
is the downloader class to be used for that scheme, e.g.
{'https': MyCustomDownloader}. These override the default values.
"""
download_concurrency = remote.download_concurrency or remote.DEFAULT_DOWNLOAD_CONCURRENCY

self._remote = remote
self._download_class_map = copy.copy(PROTOCOL_MAP)
if downloader_overrides:
Expand All @@ -85,7 +87,7 @@ def __init__(self, remote, downloader_overrides=None):
"file": self._generic,
}
self._session = self._make_aiohttp_session_from_remote()
self._semaphore = asyncio.Semaphore(value=remote.download_concurrency)
self._semaphore = asyncio.Semaphore(value=download_concurrency)
atexit.register(self._session_cleanup)

def _session_cleanup(self):
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/download/http.py
Expand Up @@ -201,9 +201,9 @@ async def _run(self, extra_data=None):
"""
Download, validate, and compute digests on the `url`. This is a coroutine.
This method is decorated with a backoff-and-retry behavior to retry HTTP 429 and
some 5XX errors. It retries with exponential backoff 10 times before allowing
a final exception to be raised.
This method is decorated with a backoff-and-retry behavior to retry some errors.
It retries with exponential backoff 10 times before allowing a final exception to
be raised.
This method provides the same return object type and documented in
:meth:`~pulpcore.plugin.download.BaseDownloader._run`.
Expand Down
Expand Up @@ -254,7 +254,7 @@ def test_read(self):
self._compare_results(self.remote_attrs, self.remote)

def test_update(self):
data = {"download_concurrency": 66, "policy": "immediate"}
data = {"download_concurrency": 23, "policy": "immediate"}
self.remotes_api.partial_update(self.remote.pulp_href, data)
time.sleep(1) # without this, the read returns the pre-patch values
new_remote = self.remotes_api.read(self.remote.pulp_href)
Expand Down

0 comments on commit 620e60f

Please sign in to comment.