Skip to content
Permalink
Browse files

Port synchronizing to stages api

  • Loading branch information...
mdellweg committed Mar 3, 2019
1 parent 2589e0c commit c9f3eec8c3a7899b4234b68938d59c4bc1cfdfbe
@@ -1,45 +1,43 @@
import asyncio
import json
import logging
import math

from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED
from asyncio import FIRST_COMPLETED
from contextlib import suppress
from gettext import gettext as _
from urllib.parse import urlparse, urlencode, parse_qs

import asyncio
from django.db.models import Q

from pulpcore.plugin.models import Artifact, RepositoryVersion, Repository, ProgressBar
from pulpcore.plugin.changeset import (
BatchIterator,
ChangeSet,
PendingArtifact,
PendingContent,
SizedIterable)
from pulpcore.plugin.tasking import WorkingDirectory

from urllib.parse import (
parse_qs,
urlencode,
urlparse,
)

from pulpcore.plugin.models import (
Artifact,
ProgressBar,
Remote,
Repository,
)
from pulpcore.plugin.stages import (
DeclarativeArtifact,
DeclarativeContent,
DeclarativeVersion,
Stage,
)
from pulp_ansible.app.models import AnsibleRole, AnsibleRoleVersion, AnsibleRemote


log = logging.getLogger(__name__)


# The natural key.
Key = namedtuple('Key', ('namespace', 'name', 'version'))

# The set of Key to be added and removed.
Delta = namedtuple('Delta', ('additions', 'removals'))

# The Github URL template to fetch a .tar.gz file from
GITHUB_URL = 'https://github.com/%s/%s/archive/%s.tar.gz'

# default results per page. used to calculate number of pages
PAGE_SIZE = 10


def synchronize(remote_pk, repository_pk):
def synchronize(remote_pk, repository_pk, mirror=False):
"""
Sync content from the remote repository.
@@ -48,46 +46,95 @@ def synchronize(remote_pk, repository_pk):
Args:
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
Raises:
ValueError: When remote has no url specified.
ValueError: If the remote does not specify a URL to sync.
"""
remote = AnsibleRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
base_version = RepositoryVersion.latest(repository)

if not remote.url:
raise ValueError(_('A remote must have a url specified to synchronize.'))

with WorkingDirectory():
with RepositoryVersion.create(repository) as new_version:
log.info(
_('Synchronizing: repository=%(r)s remote=%(p)s'),
{
'r': repository.name,
'p': remote.name
})
roles = fetch_roles(remote)
content = fetch_content(base_version)
delta = find_delta(roles, content)
additions = build_additions(remote, roles, delta)
removals = build_removals(base_version, delta)
changeset = ChangeSet(
remote=remote,
repository_version=new_version,
additions=additions,
removals=removals)
for report in changeset.apply():
if not log.isEnabledFor(logging.DEBUG):
continue
log.debug(
_('Applied: repository=%(r)s remote=%(p)s change:%(c)s'),
{
'r': repository.name,
'p': remote.name,
'c': report,
})
log.info(
_('Synchronizing: repository=%(r)s remote=%(p)s'),
{
'r': repository.name,
'p': remote.name,
},
)
first_stage = AnsibleFirstStage(remote)
d_version = DeclarativeVersion(first_stage, repository, mirror=mirror)
d_version.create()


class AnsibleFirstStage(Stage):
"""
The first stage of a pulp_ansible sync pipeline.
"""

def __init__(self, remote):
"""
The first stage of a pulp_ansible sync pipeline.
Args:
remote (AnsibleRemote): The remote data to be used when syncing
"""
super().__init__()
self.remote = remote

# Interpret download policy
self.deferred_download = (self.remote.policy != Remote.IMMEDIATE)

async def run(self):
"""
Build and emit `DeclarativeContent` from the ansible metadata.
"""

with ProgressBar(message='Downloading Metadata') as pb:
roles = await fetch_roles(self.remote)
pb.increment()

with ProgressBar(message='Parsing Metadata') as pb:
pending = []
for metadata in roles:
role = AnsibleRole(name=metadata['name'], namespace=metadata['namespace'])
d_content = DeclarativeContent(content=role, d_artifacts=[], does_batch=False)
pending.append(asyncio.ensure_future(self._add_role_versions(d_content.get_or_create_future(), metadata)))
await self.put(d_content)
await asyncio.gather(*pending)

async def _add_role_versions(self, role_future, metadata):
role = await role_future
log.info(role)
log.info(metadata)
for version in metadata['summary_fields']['versions']:
url = GITHUB_URL % (
metadata['github_user'],
metadata['github_repo'],
version['name'],
)
role_version = AnsibleRoleVersion(version=version['name'], role=role)
relative_path = "%s/%s/%s.tar.gz" % (
metadata['namespace'],
metadata['name'],
version['name'],
)
d_artifact = DeclarativeArtifact(
artifact=Artifact(),
url=url,
relative_path=relative_path,
remote=self.remote,
deferred_download=self.deferred_download,
)
d_content = DeclarativeContent(
content=role_version,
d_artifacts=[d_artifact],
)
await self.put(d_content)


def parse_roles(metadata):
@@ -114,7 +161,7 @@ def parse_roles(metadata):
return roles


def fetch_roles(remote):
async def fetch_roles(remote):
"""
Fetch the roles in a remote repository.
@@ -139,21 +186,20 @@ def parse_metadata(path):
page_count = math.ceil(float(metadata['count']) / float(PAGE_SIZE))
return page_count, parse_roles(metadata)

downloader = remote.get_downloader(role_page_url(remote))
downloader.fetch()
downloader = remote.get_downloader(url=role_page_url(remote))
result = await downloader.run()

page_count, roles = parse_metadata(downloader.path)
page_count, roles = parse_metadata(result.path)

progress_bar = ProgressBar(message='Parsing Pages from Galaxy Roles API', total=page_count,
done=1, state='running')
progress_bar.save()

def downloader_coroutines():
for page in range(2, page_count + 1):
downloader = remote.get_downloader(role_page_url(remote, page))
downloader = remote.get_downloader(url=role_page_url(remote, page))
yield downloader.run()

loop = asyncio.get_event_loop()
downloaders = downloader_coroutines()

not_done = set()
@@ -164,8 +210,7 @@ def downloader_coroutines():
while True:
if not_done == set():
break
done, not_done = loop.run_until_complete(asyncio.wait(not_done,
return_when=FIRST_COMPLETED))
done, not_done = await asyncio.wait(not_done, return_when=FIRST_COMPLETED)
for item in done:
download_result = item.result()
new_page_count, new_roles = parse_metadata(download_result.path)
@@ -178,120 +223,3 @@ def downloader_coroutines():
progress_bar.save()

return roles


def fetch_content(base_version):
"""
Fetch the AnsibleRoleVersions contained in the (base) repository version.
Args:
base_version (RepositoryVersion): A repository version.
Returns:
set: A set of Key contained in the (base) repository version.
"""
content = set()
if base_version:
for role_version in AnsibleRoleVersion.objects.filter(pk__in=base_version.content):
key = Key(name=role_version.role.name, namespace=role_version.role.namespace,
version=role_version.version)
content.add(key)
return content


def find_delta(roles, content, mirror=True):
"""
Find the content that needs to be added and removed.
Args:
roles (list): A list of roles from a remote repository
content: (set): The set of natural keys for content contained in the (base)
repository version.
mirror (bool): The delta should include changes needed to ensure the content
contained within the pulp repository is exactly the same as the
content contained within the remote repository.
Returns:
Delta: The set of Key to be added and removed.
"""
remote_content = set()
for r in roles:
for version in r['summary_fields']['versions']:
role = Key(name=r['name'],
namespace=r['namespace'],
version=version['name'])
remote_content.add(role)
additions = (remote_content - content)
if mirror:
removals = (content - remote_content)
else:
removals = set()
return Delta(additions, removals)


def build_additions(remote, roles, delta):
"""
Build the content to be added.
Args:
remote (AnsibleRemote): A remote.
roles (list): The list of role dict from Galaxy
delta (Delta): The set of Key to be added and removed.
Returns:
SizedIterable: The PendingContent to be added to the repository.
"""
def generate():
for metadata in roles:
role, _ = AnsibleRole.objects.get_or_create(name=metadata['name'],
namespace=metadata['namespace'])

for version in metadata['summary_fields']['versions']:
key = Key(name=metadata['name'],
namespace=metadata['namespace'],
version=version['name'])

if key not in delta.additions:
continue

url = GITHUB_URL % (metadata['github_user'], metadata['github_repo'],
version['name'])
role_version = AnsibleRoleVersion(version=version['name'], role=role)
path = "%s/%s/%s.tar.gz" % (metadata['namespace'], metadata['name'],
version['name'])
artifact = Artifact()
content = PendingContent(
role_version,
artifacts={
PendingArtifact(artifact, url, path)
})
yield content
return SizedIterable(generate(), len(delta.additions))


def build_removals(base_version, delta):
"""
Build the content to be removed.
Args:
base_version (RepositoryVersion): The base repository version.
delta (Delta): The set of Key to be added and removed.
Returns:
SizedIterable: The AnsibleRoleVersion to be removed from the repository.
"""
def generate():
for removals in BatchIterator(delta.removals):
q = Q()
for key in removals:
role = AnsibleRoleVersion.objects.get(name=key.name, namespace=key.namespace)
q |= Q(ansibleroleversion__role_id=role.pk, ansibleroleversion__version=key.version)
q_set = base_version.content.filter(q)
q_set = q_set.only('id')
for file in q_set:
yield file
return SizedIterable(generate(), len(delta.removals))
@@ -10,7 +10,8 @@
DOWNLOAD_POLICIES = ['cache_only', 'immediate', 'on_demand']
"""Allowed download policies for this plugin."""

ANSIBLE_ROLE_CONTENT_NAME = 'ansible.ansible-role-version'
ANSIBLE_ROLE_NAME = 'ansible.ansible-role'
ANSIBLE_ROLE_VERSION_NAME = 'ansible.ansible-role-version'

ANSIBLE_ROLE_CONTENT_PATH = urljoin(CONTENT_PATH, 'ansible/roles/')
ANSIBLE_ROLE_VERSION_CONTENT_PATH = urljoin(CONTENT_PATH, 'ansible/roles/versions/')
@@ -29,12 +30,13 @@
ANSIBLE_PULP_FIXTURE_URL = urljoin(ANSIBLE_GALAXY_URL, NAMESPACE_PULP)

ANSIBLE_FIXTURE_CONTENT_SUMMARY = {
ANSIBLE_ROLE_CONTENT_NAME: 5,
ANSIBLE_ROLE_NAME: 5,
ANSIBLE_ROLE_VERSION_NAME: 5,
}
ANSIBLE_FIXTURE_COUNT = 5

ANSIBLE_PULP_FIXTURE_CONTENT_SUMMARY = {
ANSIBLE_ROLE_CONTENT_NAME: 3,
ANSIBLE_ROLE_VERSION_NAME: 3,
}
ANSIBLE_PULP_FIXTURE_COUNT = 3

0 comments on commit c9f3eec

Please sign in to comment.
You can’t perform that action at this time.