Skip to content

Commit

Permalink
Metadata Fetching Improvements
Browse files Browse the repository at this point in the history
* New progress reporting has been added which shows the page-by-page
fetching from Galaxy's role API.

* The page fetching from Galaxy is now asyncrhonous with a
parallelization of 20.

* Adds retry support for 504 errors with logging when fetching Role
metadata

https://pulp.plan.io/issues/3180
re #3180
  • Loading branch information
Brian Bouterse committed Mar 29, 2018
1 parent 89de207 commit d0eb9d1
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 45 deletions.
34 changes: 15 additions & 19 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,29 @@ Install ``pulpcore``
--------------------

Follow the `installation
instructions <docs.pulpproject.org/en/3.0/nightly/installation/instructions.html>`__
provided with pulpcore.
instructions <https://docs.pulpproject.org/en/3.0/nightly/installation/instructions.html>`__
for pulpcore.

Install plugin
--------------

Define installation steps here.
Install from PyPI
~~~~~~~~~~~~~~~~~

TODO: publish on PyPI

From source
~~~~~~~~~~~

1) sudo -u pulp -i
2) source ~/pulpvenv/bin/activate
3) git clone https://github.com/pulp/pulp\_ansible.git
4) cd pulp\_ansible
5) python setup.py develop
6) pulp-manager makemigrations pulp\_ansible
7) pulp-manager migrate pulp\_ansible
8) django-admin runserver
9) sudo systemctl restart pulp\_resource\_manager
10) sudo systemctl restart pulp\_worker@1
11) sudo systemctl restart pulp\_worker@2

Install from PyPI
~~~~~~~~~~~~~~~~~

Define installation steps here.
0) source ~/pulpvenv/bin/activate
1) git clone https://github.com/pulp/pulp\_ansible.git
2) cd pulp\_ansible
3) python setup.py develop
4) pulp-manager makemigrations pulp\_ansible
5) pulp-manager migrate pulp\_ansible
6) django-admin runserver
7) sudo systemctl restart pulp\_resource\_manager
8) sudo systemctl restart pulp\_worker@1


Create a repository ``foo``
Expand Down
97 changes: 71 additions & 26 deletions pulp_ansible/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import json
import logging
import os

from aiohttp.client_exceptions import ClientResponseError
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED
from contextlib import suppress
from gettext import gettext as _
from urllib.parse import urlparse, urlunparse, urlencode, parse_qs
from urllib.parse import urlparse, urlencode, parse_qs

import asyncio
from aiohttp import ClientSession
from celery import shared_task
from django.db.models import Q

from pulpcore.plugin.models import Artifact, RepositoryVersion, Repository
from pulpcore.plugin.models import Artifact, RepositoryVersion, Repository, ProgressBar
from pulpcore.plugin.changeset import (
BatchIterator,
ChangeSet,
Expand All @@ -32,10 +33,7 @@
# The set of Key to be added and removed.
Delta = namedtuple('Delta', ('additions', 'removals'))


# the roles per page when fetching the list of roles
PAGE_SIZE = 1000

# The Github URL template to fetch a .tar.gz file from
GITHUB_URL = 'https://github.com/%s/%s/archive/%s.tar.gz'


Expand Down Expand Up @@ -124,29 +122,76 @@ def fetch_roles(importer):
"""
page_count = 0

def role_url(importer, page=1, page_size=PAGE_SIZE):
def role_page_url(importer, page=1):
parsed = urlparse(importer.feed_url)
new_query = {**parse_qs(parsed.query), **{'page': page, 'page_size': page_size}}
parsed._replace(query=urlencode(new_query))
return urlunparse(parsed)
new_query = parse_qs(parsed.query)
new_query['page'] = page
return parsed.scheme + '://' + parsed.netloc + parsed.path + '?' + urlencode(new_query, doseq=True)

def parse_metadata(path):
nonlocal page_count

metadata = json.load(open(path))
page_count = metadata['num_pages']
return parse_roles(metadata)

url = importer
downloader = importer.get_downloader(role_url(importer))
downloader.fetch()
roles = parse_metadata(downloader.path)

# TODO: make sure this loop runs asynchronously
for page in range(2, page_count + 1):
downloader = importer.get_downloader(role_url(importer, page))
downloader.fetch()
roles.extend(parse_metadata(downloader.path))
return page_count, parse_roles(metadata)

while True:
downloader = importer.get_downloader(role_page_url(importer))
try:
downloader.fetch()
except ClientResponseError as aiohttp_exc:
if aiohttp_exc.status == 504:
msg = _("Download of '{url}' emitted a {error_code} retrying...")
error_msg = msg.format(url=aiohttp_exc.request_info.url,
error_code=aiohttp_exc.status)
log.warning(error_msg)
else:
raise
else:
break

page_count, roles = parse_metadata(downloader.path)

progress_bar = ProgressBar(message='Parsing Pages from Galaxy Roles API', total=page_count,
done=1, state='running')
progress_bar.save()

def downloader_coroutines():
for page in range(2, page_count + 1):
downloader = importer.get_downloader(role_page_url(importer, page))
yield downloader.run()

loop = asyncio.get_event_loop()
downloaders = downloader_coroutines()

not_done = set()
with suppress(StopIteration):
for i in range(20):
not_done.add(next(downloaders))

while True:
if not_done == set():
break
done, not_done = loop.run_until_complete(asyncio.wait(not_done, return_when=FIRST_COMPLETED))
for item in done:
try:
download_result = item.result()
except ClientResponseError as aiohttp_exc:
if aiohttp_exc.status == 504:
msg = _("Download of '{url}' emitted a {error_code} retrying...")
error_msg = msg.format(url=aiohttp_exc.request_info.url,
error_code=aiohttp_exc.status)
log.warning(error_msg)
new_downloader = importer.get_downloader(str(aiohttp_exc.request_info.url)).run()
not_done.add(new_downloader)
continue
raise
new_page_count, new_roles = parse_metadata(download_result.path)
roles.extend(new_roles)
progress_bar.increment()
with suppress(StopIteration):
not_done.add(next(downloaders))

progress_bar.state = 'completed'
progress_bar.save()

return roles

Expand Down

0 comments on commit d0eb9d1

Please sign in to comment.