Skip to content

Commit

Permalink
Use DeclarativeVersion
Browse files Browse the repository at this point in the history
This shows pulp_file being used with DeclarativeVersion which is powered
by the Stages API.

Added:
* a coroutine that performs metadata fetching and reports progress on
  the metadata being processed.

Removed:
* differencing code since that is handled by DeclarativeVersion now
  • Loading branch information
Brian Bouterse committed Jul 5, 2018
1 parent 5c16d67 commit c23bec0
Showing 1 changed file with 26 additions and 148 deletions.
174 changes: 26 additions & 148 deletions pulp_file/app/tasks/synchronizing.py
@@ -1,20 +1,14 @@
import asyncio
import logging
import os

from collections import namedtuple
from gettext import gettext as _
from urllib.parse import urlparse, urlunparse

from django.db.models import Q

from pulpcore.plugin.models import Artifact, RepositoryVersion, Repository
from pulpcore.plugin.changeset import (
BatchIterator,
ChangeSet,
PendingArtifact,
PendingContent,
SizedIterable)
from pulpcore.plugin.tasking import WorkingDirectory
from pulpcore.plugin.models import (
Artifact, ProgressBar, RepositoryVersion, Repository
)
from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, DeclarativeVersion

from pulp_file.app.models import FileContent, FileRemote
from pulp_file.manifest import Manifest
Expand All @@ -23,13 +17,6 @@
log = logging.getLogger(__name__)


# The natural key.
Key = namedtuple('Key', ('relative_path', 'digest'))

# The set of Key to be added and removed.
Delta = namedtuple('Delta', ('additions', 'removals'))


def synchronize(remote_pk, repository_pk):
"""
Create a new version of the repository that is synchronized with the remote
Expand All @@ -44,148 +31,39 @@ def synchronize(remote_pk, repository_pk):
"""
remote = FileRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
base_version = RepositoryVersion.latest(repository)

if not remote.url:
raise ValueError(_('An remote must have a url specified to synchronize.'))
raise ValueError(_('A remote must have a url specified to synchronize.'))

with WorkingDirectory():
with RepositoryVersion.create(repository) as new_version:
log.info(
_('Synchronizing: repository=%(r)s remote=%(p)s'),
{
'r': repository.name,
'p': remote.name
})
manifest = fetch_manifest(remote)
content = fetch_content(base_version)
delta = find_delta(manifest, content)
additions = build_additions(remote, manifest, delta)
removals = build_removals(base_version, delta)
changeset = ChangeSet(
remote=remote,
repository_version=new_version,
additions=additions,
removals=removals)
for report in changeset.apply():
if not log.isEnabledFor(logging.DEBUG):
continue
log.debug(
_('Applied: repository=%(r)s remote=%(p)s change:%(c)s'),
{
'r': repository.name,
'p': remote.name,
'c': report,
})
out_q = asyncio.Queue(maxsize=100) # restricts the number of content units in memory
asyncio.ensure_future(fetch_manifest(remote, out_q)) # Schedule the "fetching" stage

This comment has been minimized.

Copy link
@gmbnomis

gmbnomis Jul 13, 2018

Contributor

I think the first stage future should not be scheduled by the plugin (and if it is scheduled by the plugin, it should not be scheduled like this):

If fetch_manifest throws an exception, there is nothing that will catch it (Python usually emits a warning message when it destroys a future that has a stored exception). What's worse: fetch_manifest will not generate the None entry that signals completion in this case. Thus, the remainder of the pipeline will wait forever (unless Python is able to detect that no progress is being made(?)).

IMHO, the easiest solution for this is to make the future part of the asyncio.gather call in pulp core. Using default parameters, gather immediately propagates exceptions. (I am not sure what happens to the remaining futures though. Maybe one needs to cancel them.)

Additionally, I think that the plugin writer should not create the first queue. The creation of the queues should be part of the pipeline (the type of the queue might change in the future for example).

Thus, I think the plugin API for the first stage could be:

  • a coroutine function taking a queue, e.g. async def some_function(out_q) which is passed to DeclarativeVersion: DeclarativeVersion(some_function, repository).create(). DeclarativeVersion creates all the queues then and passes the respective queue as a parameter to the function when creating the future for it. Or:
  • a class with an abstract coroutine method async def gen_declarative_content(self, out_q) that the plugin writer will need to implement

The advantage of the latter is that it is easier to store additional parameters (like remote) in the class case. (coroutine function case would probably need to create a partial function)

This comment has been minimized.

Copy link
@bmbouter

bmbouter Jul 22, 2018

Member

@gmbnomis I agree completely. I've gone with the second option, so additional commits on this branch introduce a class called FirstStage. I'm going to open a PR this week for the entire work. I link you to it to get more input hopefully. Thanks for the comments!

DeclarativeVersion(out_q, repository).create()


def fetch_manifest(remote):
async def fetch_manifest(remote, out_q):
"""
Fetch (download) the manifest.
Args:
remote (FileRemote): An remote.
"""
downloader = remote.get_downloader(remote.url)
downloader.fetch()
return Manifest(downloader.path)


def fetch_content(base_version):
"""
Fetch the FileContent contained in the (base) repository version.
Args:
base_version (RepositoryVersion): A repository version.
Returns:
set: A set of Key contained in the (base) repository version.
"""
content = set()
if base_version:
for file in FileContent.objects.filter(pk__in=base_version.content):
key = Key(relative_path=file.relative_path, digest=file.digest)
content.add(key)
return content


def find_delta(manifest, content, mirror=True):
"""
Find the content that needs to be added and removed.
Args:
manifest (Manifest): The downloaded manifest.
content: (set): The set of natural keys for content contained in the (base)
repository version.
mirror (bool): The delta should include changes needed to ensure the content
contained within the pulp repository is exactly the same as the
content contained within the remote repository.
Returns:
Delta: The set of Key to be added and removed.
"""
remote_content = set(
[
Key(relative_path=e.relative_path, digest=e.digest) for e in manifest.read()
])
additions = (remote_content - content)
if mirror:
removals = (content - remote_content)
else:
removals = set()
return Delta(additions, removals)


def build_additions(remote, manifest, delta):
"""
Build the content to be added.
Args:
remote (FileRemote): An remote.
manifest (Manifest): The downloaded manifest.
delta (Delta): The set of Key to be added and removed.
Returns:
SizedIterable: The PendingContent to be added to the repository.
remote (FileRemote): The remote data to be used when syncing
out_q (asyncio.Queue): The out_q to send DeclarativeContent objects to
"""
def generate():
with ProgressBar(message='Downloading Metadata') as pb:
parsed_url = urlparse(remote.url)
root_dir = os.path.dirname(parsed_url.path)
downloader = remote.get_downloader(remote.url)
result = await downloader.run()
pb.increment()

with ProgressBar(message='Parsing Metadata') as pb:
manifest = Manifest(result.path)
for entry in manifest.read():
key = Key(relative_path=entry.relative_path, digest=entry.digest)
if key not in delta.additions:
continue
path = os.path.join(root_dir, entry.relative_path)
url = urlunparse(parsed_url._replace(path=path))
file = FileContent(relative_path=entry.relative_path, digest=entry.digest)
artifact = Artifact(size=entry.size, sha256=entry.digest)
content = PendingContent(
file,
artifacts={
PendingArtifact(artifact, url, entry.relative_path)
})
yield content
parsed_url = urlparse(remote.url)
root_dir = os.path.dirname(parsed_url.path)
return SizedIterable(generate(), len(delta.additions))


def build_removals(base_version, delta):
"""
Build the content to be removed.
Args:
base_version (RepositoryVersion): The base repository version.
delta (Delta): The set of Key to be added and removed.
Returns:
SizedIterable: The FileContent to be removed from the repository.
"""
def generate():
for removals in BatchIterator(delta.removals):
q = Q()
for key in removals:
q |= Q(filecontent__relative_path=key.relative_path, filecontent__digest=key.digest)
q_set = base_version.content.filter(q)
q_set = q_set.only('id')
for file in q_set:
yield file
return SizedIterable(generate(), len(delta.removals))
da = DeclarativeArtifact(artifact, url, entry.relative_path, remote)
dc = DeclarativeContent(content=file, d_artifacts=[da])
pb.increment()
await out_q.put(dc)
await out_q.put(None)

0 comments on commit c23bec0

Please sign in to comment.