Skip to content

Commit

Permalink
Updating to accomidate FirstStage
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Bouterse committed Jul 31, 2018
1 parent b653889 commit 55d76df
Showing 1 changed file with 40 additions and 29 deletions.
69 changes: 40 additions & 29 deletions pulp_file/app/tasks/synchronizing.py
Expand Up @@ -8,7 +8,9 @@
from pulpcore.plugin.models import (
Artifact, ProgressBar, RepositoryVersion, Repository
)
from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, DeclarativeVersion
from pulpcore.plugin.stages import (
DeclarativeArtifact, DeclarativeContent, DeclarativeVersion, FirstStage
)

from pulp_file.app.models import FileContent, FileRemote
from pulp_file.manifest import Manifest
Expand All @@ -35,35 +37,44 @@ def synchronize(remote_pk, repository_pk):
if not remote.url:
raise ValueError(_('A remote must have a url specified to synchronize.'))

out_q = asyncio.Queue(maxsize=100) # restricts the number of content units in memory
asyncio.ensure_future(fetch_manifest(remote, out_q)) # Schedule the "fetching" stage
DeclarativeVersion(out_q, repository).create()
first_stage = FileFirstStage(remote)
DeclarativeVersion(first_stage, repository).create()


async def fetch_manifest(remote, out_q):
"""
Fetch (download) the manifest.
class FileFirstStage(FirstStage):

Args:
remote (FileRemote): The remote data to be used when syncing
out_q (asyncio.Queue): The out_q to send DeclarativeContent objects to
"""
with ProgressBar(message='Downloading Metadata') as pb:
parsed_url = urlparse(remote.url)
root_dir = os.path.dirname(parsed_url.path)
downloader = remote.get_downloader(remote.url)
result = await downloader.run()
pb.increment()

with ProgressBar(message='Parsing Metadata') as pb:
manifest = Manifest(result.path)
for entry in manifest.read():
path = os.path.join(root_dir, entry.relative_path)
url = urlunparse(parsed_url._replace(path=path))
file = FileContent(relative_path=entry.relative_path, digest=entry.digest)
artifact = Artifact(size=entry.size, sha256=entry.digest)
da = DeclarativeArtifact(artifact, url, entry.relative_path, remote)
dc = DeclarativeContent(content=file, d_artifacts=[da])
def __init__(self, remote):
"""
Args:
remote (FileRemote): The remote data to be used when syncing
"""
self.remote = remote

async def gen_declarative_content(self, in_q, out_q):
"""
Build and emit `DeclarativeContent` from the Manifest data.
Args:
in_q (asyncio.Queue): Unused because the first stage doesn't read from an input queue.
out_q (asyncio.Queue): The out_q to send `DeclarativeContent` objects to
"""
with ProgressBar(message='Downloading Metadata') as pb:
parsed_url = urlparse(self.remote.url)
root_dir = os.path.dirname(parsed_url.path)
downloader = self.remote.get_downloader(self.remote.url)
result = await downloader.run()
pb.increment()
await out_q.put(dc)
await out_q.put(None)

with ProgressBar(message='Parsing Metadata') as pb:
manifest = Manifest(result.path)
for entry in manifest.read():
path = os.path.join(root_dir, entry.relative_path)
url = urlunparse(parsed_url._replace(path=path))
file = FileContent(relative_path=entry.relative_path, digest=entry.digest)
artifact = Artifact(size=entry.size, sha256=entry.digest)
da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
dc = DeclarativeContent(content=file, d_artifacts=[da])
pb.increment()
await out_q.put(dc)
await out_q.put(None)

0 comments on commit 55d76df

Please sign in to comment.