Skip to content

Commit

Permalink
Add connection_limit to a Remote
Browse files Browse the repository at this point in the history
Use aiohttp's `limit` option to limit a total number of TCP connections,
instead of using `asyncio.Semaphore` directly.

closes #4040
https://pulp.plan.io/issues/4040
  • Loading branch information
goosemania committed Oct 5, 2018
1 parent 1355cde commit 19b0c26
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 113 deletions.
3 changes: 3 additions & 0 deletions plugin/pulpcore/plugin/download/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def _make_aiohttp_session_from_remote(self):
if self._remote.ssl_validation:
tcp_conn_opts['verify_ssl'] = self._remote.ssl_validation

if self._remote.connection_limit:
tcp_conn_opts['limit'] = self._remote.connection_limit

conn = aiohttp.TCPConnector(**tcp_conn_opts)

auth_options = {}
Expand Down
32 changes: 6 additions & 26 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,13 @@ class ArtifactDownloaderRunner():
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects into.
max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will
run.
max_concurrent_content (int): The maximum number of
:class:`~pulpcore.plugin.stages.DeclarativeContent` instances to handle simultaneously.
"""

def __init__(self, in_q, out_q, max_concurrent_downloads, max_concurrent_content):
def __init__(self, in_q, out_q, max_concurrent_content):
self.in_q = in_q
self.out_q = out_q
self.max_concurrent_downloads = max_concurrent_downloads
self.max_concurrent_content = max_concurrent_content

@property
Expand All @@ -119,9 +116,6 @@ async def run(self):
# Set to None if stage is shutdown.
self._content_get_task = self._add_to_pending(self.in_q.get())

#: (:class:`asyncio.Semaphore`): Semaphore controlling the number of concurrent downloads
self._download_semaphore = asyncio.Semaphore(value=self.max_concurrent_downloads)

with ProgressBar(message='Downloading Artifacts') as pb:
try:
while self._pending:
Expand Down Expand Up @@ -174,18 +168,9 @@ def _downloaders_for_content(self, content):
"""
Compute a list of downloader coroutines, one for each artifact to download for `content`.
When run, the downloader coroutine needs to get the download semaphore before downloading.
Returns:
List of downloader coroutines (may be empty)
"""
async def download_with_limit(semaphore, downloader):
async with semaphore:
log.debug("ArtifactDownloader: Start download of '%s'", declarative_artifact.url)
result = await downloader.run()
log.debug("ArtifactDownloader: Downloaded: '%s'", declarative_artifact.url)
return result

downloaders_for_content = []
for declarative_artifact in content.d_artifacts:
if declarative_artifact.artifact.pk is None:
Expand All @@ -205,9 +190,8 @@ async def download_with_limit(semaphore, downloader):
declarative_artifact.url,
**validation_kwargs
)
downloaders_for_content.append(
download_with_limit(self._download_semaphore, downloader)
)
downloaders_for_content.append(downloader.run())

return downloaders_for_content

def _update_content(self, content, downloads):
Expand Down Expand Up @@ -243,21 +227,18 @@ class ArtifactDownloader(Stage):
downloads completed. Since it's a stream the total count isn't known until it's finished.
This stage drains all available items from `in_q` and starts as many downloaders as possible
(up to `max_concurrent_downloads`)
(up to `connection_limit` set on a Remote)
Args:
max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will
run. Default is 100.
max_concurrent_content (int): The maximum number of
:class:`~pulpcore.plugin.stages.DeclarativeContent` instances to handle simultaneously.
Default is 200.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, max_concurrent_downloads=200, max_concurrent_content=200, *args, **kwargs):
def __init__(self, max_concurrent_content=200, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_concurrent_downloads = max_concurrent_downloads
self.max_concurrent_content = max_concurrent_content

async def __call__(self, in_q, out_q):
Expand All @@ -275,8 +256,7 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
runner = ArtifactDownloaderRunner(in_q, out_q, self.max_concurrent_downloads,
self.max_concurrent_content)
runner = ArtifactDownloaderRunner(in_q, out_q, self.max_concurrent_content)
await runner.run()


Expand Down
124 changes: 38 additions & 86 deletions plugin/tests/unit/stages/test_artifactdownloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ def queue_dc(self, in_q, delays=[]):
dc = DeclarativeContent(content=mock.Mock(), d_artifacts=das)
in_q.put_nowait(dc)

async def download_task(self, in_q, out_q,
max_concurrent_downloads=2, max_concurrent_content=4):
async def download_task(self, in_q, out_q, max_concurrent_content=3):
"""
A coroutine running the downloader stage with a mocked ProgressBar.
Expand All @@ -90,8 +89,7 @@ async def download_task(self, in_q, out_q,
"""
with mock.patch('pulpcore.plugin.stages.artifact_stages.ProgressBar') as pb:
pb.return_value.__enter__.return_value.done = 0
ad = ArtifactDownloader(max_concurrent_downloads=max_concurrent_downloads,
max_concurrent_content=max_concurrent_content)
ad = ArtifactDownloader(max_concurrent_content=max_concurrent_content)
await ad(in_q, out_q)
return pb.return_value.__enter__.return_value.done

Expand All @@ -100,38 +98,38 @@ async def test_downloads(self):
out_q = asyncio.Queue()
download_task = self.loop.create_task(self.download_task(in_q, out_q))

# Create 12 content units, every second one must be downloaded.
# The downloads take 1, 3, 5, 7, 9, 11 seconds; content units
# 0, 2, ..., 10 do not need downloads.
for i in range(12):
self.queue_dc(in_q, delays=[i if i % 2 else None])
# Create 28 content units, every third one must be downloaded.
# The downloads take 0, 3, 6,..., 27 seconds; content units
# 1, 2, 4, 5, ..., 26 do not need downloads.
for i in range(28):
self.queue_dc(in_q, delays=[i if not i % 3 else None])
in_q.put_nowait(None)

# At 0.5 seconds
await self.advance_to(0.5)
# 1 and 3 are running
# 1, 3, 5, and 7 are "in_flight"
self.assertEqual(DownloaderMock.running, 2)
# non-downloads 0, 2, ..., 6 forwarded
self.assertEqual(out_q.qsize(), 4)
# 8 - 11 + None are waiting to be picked up
self.assertEqual(in_q.qsize(), 5)
# 3, 6 and 9 are running. 0 is finished
self.assertEqual(DownloaderMock.running, 3)
# non-downloads 1, 2, 4, 5, 7, 8 are forwarded
self.assertEqual(out_q.qsize(), 7)
# 9 - 26 + None are waiting to be picked up
self.assertEqual(in_q.qsize(), 19)

# Two downloads run in parallel. The most asymmetric way
# to schedule the remaining downloads is:
# 1 + 5 + 7: finished after 13 seconds
# 3 + 9 + 11: finished after 23 seconds
for t in range(1, 13): # until 12.5 seconds two downloads must run
# 3 + 12 + 21: finished after 36 seconds
# 6 + 15 + 24: finished after 45 seconds
# 9 + 18 + 27: finished after 54 seconds
for t in range(1, 36): # until 35.5 seconds three downloads must run
await self.advance_to(t + 0.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(DownloaderMock.running, 3)

# At 23.5 seconds, the stage is done at the latest
await self.advance_to(23.5)
# At 54.5 seconds, the stage is done at the latest
await self.advance_to(54.5)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.downloads, 6)
self.assertEqual(DownloaderMock.downloads, 10)
self.assertEqual(download_task.result(), DownloaderMock.downloads)
self.assertEqual(in_q.qsize(), 0)
self.assertEqual(out_q.qsize(), 13)
self.assertEqual(out_q.qsize(), 29)

async def test_multi_artifact_downloads(self):
in_q = asyncio.Queue()
Expand All @@ -140,78 +138,32 @@ async def test_multi_artifact_downloads(self):
self.queue_dc(in_q, delays=[]) # must be forwarded to next stage immediately
self.queue_dc(in_q, delays=[1])
self.queue_dc(in_q, delays=[2, 2])
self.queue_dc(in_q, delays=[2])
self.queue_dc(in_q, delays=[2, None]) # schedules only one download
in_q.put_nowait(None)
# At 0.5 seconds, two content units are downloading with two
# At 0.5 seconds, three content units are downloading with four
# downloads overall
await self.advance_to(0.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(DownloaderMock.running, 4)
self.assertEqual(out_q.qsize(), 1)
# At 1.5 seconds, the download for the first content unit has completed
# At 1.5 seconds, the download for the first content unit has completed.
# At 1 second, the download of the forth content unit is started
await self.advance_to(1.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(DownloaderMock.running, 4)
self.assertEqual(out_q.qsize(), 2)
# At 2.5 seconds, the first download for the second content unit has
# completed. At 1 second, either the second download of the second content unit, or the
# first download of the third unit is started
# At 2.5 seconds, the downloads for the second and the third content unit
# have completed
await self.advance_to(2.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(out_q.qsize(), 2)
# At 3.5 seconds, only one of the artifacts is left
await self.advance_to(3.5)
self.assertEqual(DownloaderMock.running, 1)
self.assertEqual(out_q.qsize(), 3)

# At 4.5 seconds, stage must de done
await self.advance_to(4.5)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.downloads, 4)
self.assertEqual(download_task.result(), DownloaderMock.downloads)
self.assertEqual(in_q.qsize(), 0)
self.assertEqual(out_q.qsize(), 5)

async def test_download_stall(self):
in_q = asyncio.Queue()
out_q = asyncio.Queue()
download_task = self.loop.create_task(self.download_task(in_q, out_q))

self.queue_dc(in_q, delays=[1, 1])
self.queue_dc(in_q, delays=[1, 1])

# At 0.5 seconds, the first content unit is downloading with two
# downloads overall
await self.advance_to(0.5)
self.assertEqual(DownloaderMock.running, 2)

# At 1.5 seconds, the downloads for the second content are running
await self.advance_to(1.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(out_q.qsize(), 1)

# At 2.5 second all content units are completed and the stage is waiting
# for input
await self.advance_to(2.5)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(out_q.qsize(), 2)

# A new content unit arrives
self.queue_dc(in_q, delays=[1, 1])

# At 3 seconds, download must be running for it
await self.advance_to(3)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(out_q.qsize(), 2)

# Upstream stage completes
in_q.put_nowait(None)

await self.advance_to(4)
self.assertEqual(out_q.qsize(), 4)

# At 3.5 seconds, stage must de done
await self.advance_to(3.5)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.downloads, 6)
self.assertEqual(DownloaderMock.downloads, 5)
self.assertEqual(download_task.result(), DownloaderMock.downloads)
self.assertEqual(in_q.qsize(), 0)
self.assertEqual(out_q.qsize(), 4)
self.assertEqual(out_q.qsize(), 6)

async def test_sparse_batches_dont_block_stage(self):
"""Regression test for issue https://pulp.plan.io/issues/4018."""
Expand Down Expand Up @@ -262,9 +214,9 @@ async def test_cancel(self):
self.queue_dc(in_q, delays=[100])
in_q.put_nowait(None)

# After 0.5 seconds, the first two downloads must have started
# After 0.5 seconds, the three downloads must have started
await self.advance_to(0.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(DownloaderMock.running, 3)

download_task.cancel()

Expand All @@ -273,4 +225,4 @@ async def test_cancel(self):
with self.assertRaises(asyncio.CancelledError):
download_task.result()
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.canceled, 2)
self.assertEqual(DownloaderMock.canceled, 3)
2 changes: 2 additions & 0 deletions pulpcore/pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Remote(MasterModel):
username (models.TextField): The username to be used for authentication when syncing.
password (models.TextField): The password to be used for authentication when syncing.
last_synced (models.DatetimeField): Timestamp of the most recent successful sync.
connection_limit (models.PositiveIntegerField): Total number of simultaneous connections.
Relations:
Expand Down Expand Up @@ -103,6 +104,7 @@ def tls_storage_path(self, name):
username = models.TextField(blank=True)
password = models.TextField(blank=True)
last_synced = models.DateTimeField(blank=True, null=True)
connection_limit = models.PositiveIntegerField(default=5)

class Meta:
default_related_name = 'remotes'
Expand Down
7 changes: 6 additions & 1 deletion pulpcore/pulpcore/app/serializers/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@ class RemoteSerializer(MasterModelSerializer):
help_text='Timestamp of the most recent update of the remote.',
read_only=True
)
connection_limit = serializers.IntegerField(
help_text='Total number of simultaneous connections.',
required=False,
min_value=1
)

class Meta:
abstract = True
model = models.Remote
fields = MasterModelSerializer.Meta.fields + (
'name', 'url', 'validate', 'ssl_ca_certificate', 'ssl_client_certificate',
'ssl_client_key', 'ssl_validation', 'proxy_url', 'username', 'password', 'last_synced',
'last_updated',)
'last_updated', 'connection_limit')


class RepositorySyncURLSerializer(serializers.Serializer):
Expand Down

0 comments on commit 19b0c26

Please sign in to comment.