Skip to content
This repository has been archived by the owner on Oct 28, 2019. It is now read-only.

Commit

Permalink
Implement single blocking asyncio.wait for ArtifactDownloader
Browse files Browse the repository at this point in the history
The ArtifactDownloader had two locations where it could block: dequeue from
the upstream stage queue and waiting for downloads to finish.

When downloads were ongoing, no new work could be accepted (even when below
the limit of concurrent downloads).

Fix this by waiting on both upstream content units and downloads at the same
time.  Additionally, refactor the huge method into a "run" class with state.
This allows to split up the functionality into smaller methods without
passing a large set of parameters around.

Restructure the tasks: Tasks are handling one content instance in the form
of a single coroutine now.  The coroutine computes the artifacts to
download, downloads them and updates the content instance.  The overall limit
on the number of concurrent downloads is restricted by a common semaphore.

Add unit tests using simulated downloads and clocks to verify the dynamic
behavior of ArtifactDownloader.

closes #4018
https://pulp.plan.io/issues/4018
  • Loading branch information
gmbnomis committed Sep 21, 2018
1 parent 5c42692 commit d413ef6
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 88 deletions.
254 changes: 166 additions & 88 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging

from django.core.files import File
from django.core.files.storage import DefaultStorage
Expand All @@ -8,6 +9,8 @@

from .api import Stage

log = logging.getLogger(__name__)


class QueryExistingArtifacts(Stage):
"""
Expand Down Expand Up @@ -71,6 +74,159 @@ async def __call__(self, in_q, out_q):
await out_q.put(None)


class ArtifactDownloaderRunner():
"""
This class encapsulates an actual run of the ArtifactDownloader stage.
As there is a lot of state to keep during the run, a run is modelled as an
instance of :class:`ArtifactDownloaderRunner` which stores the state as
instance members.
Call `run()` to actually do the work.
Args:
in_q (:class:`asyncio.Queue`): The queue to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects into.
max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will
run.
max_concurrent_content (int): The maximum number of
:class:`~pulpcore.plugin.stages.DeclarativeContent` instances to handle simultaneously.
"""

def __init__(self, in_q, out_q, max_concurrent_downloads, max_concurrent_content):
self.in_q = in_q
self.out_q = out_q
self.max_concurrent_downloads = max_concurrent_downloads
self.max_concurrent_content = max_concurrent_content

@property
def saturated(self):
return len(self._pending) >= self.max_concurrent_content

@property
def shutdown(self):
return self._content_get_task is None

async def run(self):
"""
The coroutine doing the stage's work.
"""
#: (set): The set of unfinished tasks. Contains the content
# handler tasks and may contain `self._content_get_task`.
self._pending = set()

#: (:class:`asyncio.Task`): The task that gets new content from `in_q`.
# Set to None if stage is shutdown.
self._content_get_task = self._add_to_pending(self.in_q.get())

#: (:class:`asyncio.Semaphore`): Semaphore controlling the number of concurrent downloads
self._download_semaphore = asyncio.Semaphore(value=self.max_concurrent_downloads)

with ProgressBar(message='Downloading Artifacts') as pb:
try:
while self._pending:
done, self._pending = await asyncio.wait(self._pending,
return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task is self._content_get_task:
content = task.result()
if content is None:
# previous stage is finished and we retrieved all
# content instances: shutdown
self._content_get_task = None
else:
self._add_to_pending(self._handle_content_unit(content))
else:
download_count = task.result()
pb.done += download_count
pb.save()

if not self.shutdown:
if not self.saturated and self._content_get_task not in self._pending:
self._content_get_task = self._add_to_pending(self.in_q.get())
except asyncio.CancelledError:
# asyncio.wait does not cancel its tasks when cancelled, we need to do this
for future in self._pending:
future.cancel()
raise

await self.out_q.put(None)

def _add_to_pending(self, coro):
task = asyncio.ensure_future(coro)
self._pending.add(asyncio.ensure_future(task))
return task

async def _handle_content_unit(self, content):
"""Handle one content unit.
Returns:
The number of downloads
"""
downloaders_for_content = self._downloaders_for_content(content)
if downloaders_for_content:
downloads = await asyncio.gather(*downloaders_for_content)
self._update_content(content, downloads)
await self.out_q.put(content)
return len(downloaders_for_content)

def _downloaders_for_content(self, content):
"""
Compute a list of downloader coroutines, one for each artifact to download for `content`.
When run, the downloader coroutine needs to get the download semaphore before downloading.
Returns:
List of downloader coroutines (may be empty)
"""
async def download_with_limit(semaphore, downloader):
async with semaphore:
log.debug("ArtifactDownloader: Start download of '%s'", declarative_artifact.url)
result = await downloader.run()
log.debug("ArtifactDownloader: Downloaded: '%s'", declarative_artifact.url)
return result

downloaders_for_content = []
for declarative_artifact in content.d_artifacts:
if declarative_artifact.artifact.pk is None:
# this needs to be downloaded
expected_digests = {}
validation_kwargs = {}
for digest_name in declarative_artifact.artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value:
expected_digests[digest_name] = digest_value
if expected_digests:
validation_kwargs['expected_digests'] = expected_digests
if declarative_artifact.artifact.size:
expected_size = declarative_artifact.artifact.size
validation_kwargs['expected_size'] = expected_size
downloader = declarative_artifact.remote.get_downloader(
declarative_artifact.url,
**validation_kwargs
)
downloaders_for_content.append(
download_with_limit(self._download_semaphore, downloader)
)
return downloaders_for_content

def _update_content(self, content, downloads):
"""Update the content using the download results."""
for download_result in downloads:

def url_lookup(x):
return x.url == download_result.url
d_artifact = list(filter(url_lookup, content.d_artifacts))[0]
if d_artifact.artifact.pk is None:
new_artifact = Artifact(
**download_result.artifact_attributes,
file=download_result.path
)
d_artifact.artifact = new_artifact


class ArtifactDownloader(Stage):
"""
A Stages API stage to download :class:`~pulpcore.plugin.models.Artifact` files, but don't save
Expand All @@ -88,18 +244,23 @@ class ArtifactDownloader(Stage):
This stage creates a ProgressBar named 'Downloading Artifacts' that counts the number of
downloads completed. Since it's a stream the total count isn't known until it's finished.
This stage drains all available items from `in_q` and starts as many downloaders as possible.
This stage drains all available items from `in_q` and starts as many downloaders as possible
(up to `max_concurrent_downloads`)
Args:
max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will
run. Default is 100.
max_concurrent_content (int): The maximum number of
:class:`~pulpcore.plugin.stages.DeclarativeContent` instances to handle simultaneously.
Default is 200.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, max_concurrent_downloads=100, *args, **kwargs):
def __init__(self, max_concurrent_downloads=100, max_concurrent_content=200, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_concurrent_downloads = max_concurrent_downloads
self.max_concurrent_content = max_concurrent_content

async def __call__(self, in_q, out_q):
"""
Expand All @@ -116,92 +277,9 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
pending = set()
unhandled_content = []
outstanding_downloads = 0
shutdown = False
saturated = False
with ProgressBar(message='Downloading Artifacts') as pb:
while True:
if not saturated:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not unhandled_content and not shutdown and not pending:
content = await in_q.get()
unhandled_content.append(content)
continue
else:
unhandled_content.append(content)
continue

for i, content in enumerate(unhandled_content):
if content is None:
shutdown = True
continue
downloaders_for_content = []
for declarative_artifact in content.d_artifacts:
if declarative_artifact.artifact.pk is None:
# this needs to be downloaded
expected_digests = {}
validation_kwargs = {}
for digest_name in declarative_artifact.artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value:
expected_digests[digest_name] = digest_value
if expected_digests:
validation_kwargs['expected_digests'] = expected_digests
if declarative_artifact.artifact.size:
expected_size = declarative_artifact.artifact.size
validation_kwargs['expected_size'] = expected_size
downloader = declarative_artifact.remote.get_downloader(
declarative_artifact.url,
**validation_kwargs
)
next_future = asyncio.ensure_future(downloader.run())
downloaders_for_content.append(next_future)
if not downloaders_for_content:
await out_q.put(content)
continue

async def return_content_for_downloader(c):
return c
outstanding_downloads = outstanding_downloads + len(downloaders_for_content)
downloaders_for_content.append(return_content_for_downloader(content))
pending.add(asyncio.gather(*downloaders_for_content))
if outstanding_downloads > self.max_concurrent_downloads:
saturated = True
unhandled_content = unhandled_content[i + 1:] # remove handled content
break
else:
unhandled_content = []

if pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for gathered_downloaders in done:
one_units_downloads = gathered_downloaders.result()
content = one_units_downloads[-1]
for download_result in one_units_downloads[:-1]:
def url_lookup(x):
return x.url == download_result.url
d_artifact = list(filter(url_lookup, content.d_artifacts))[0]
if d_artifact.artifact.pk is None:
new_artifact = Artifact(
**download_result.artifact_attributes,
file=download_result.path
)
d_artifact.artifact = new_artifact
pb.done = pb.done + 1
pb.save()
outstanding_downloads = outstanding_downloads - 1
await out_q.put(content)
else:
if shutdown:
break

if outstanding_downloads < self.max_concurrent_downloads:
saturated = False
await out_q.put(None)
runner = ArtifactDownloaderRunner(in_q, out_q, self.max_concurrent_downloads,
self.max_concurrent_content)
await runner.run()


class ArtifactSaver(Stage):
Expand Down
Loading

0 comments on commit d413ef6

Please sign in to comment.