Skip to content
This repository has been archived by the owner on Oct 28, 2019. It is now read-only.

Commit

Permalink
Introducing FirstStage
Browse files Browse the repository at this point in the history
@gmbnomis suggested some changes, so this PR implements them.

pulp/pulp_file@c23bec0#r29701942

It has a bug in it, which I need to resolve.
It also doesn't save the files correctly. I am still making Artifact
save compatible with bulk_create()

https://pulp.plan.io/issues/3844
re #3844
  • Loading branch information
Brian Bouterse committed Jul 31, 2018
1 parent 54c628c commit 4523f5f
Showing 1 changed file with 98 additions and 42 deletions.
140 changes: 98 additions & 42 deletions plugin/pulpcore/plugin/stages.py
Expand Up @@ -111,6 +111,32 @@ async def create_pipeline(stages, in_q=None, maxsize=100):
await asyncio.gather(*futures)


class FirstStage:
"""
A class plugin writers can subclass and use as the first stage for a DeclarativeVersion pipeline
To use this class, the plugin writer needs to:
1. Subclass it and implement the `gen_declarative_content()` method.
2. Pass the instantiated subclass to `DeclarativeVersion`.
"""

async def gen_declarative_content(self, in_q, out_q):
"""
A Stages API compatible coroutine for `DeclarativeVersion` to use as the first stage.
This must be implemented on the subclass.
Args:
in_q (asyncio.Queue): Unused because the first stage doesn't read from an input queue.
out_q (asyncio.Queue): The queue to put `DeclarativeContent` into.
Returns:
A Stages API compatible coroutine for `DeclarativeVersion` to use as the first stage.
"""
raise NotImplementedError('A plugin writer needs to implement this')


async def query_existing_artifacts(in_q, out_q):
"""
Stages API stage replacing DeclarativeArtifact.artifact with already-saved Artifacts.
Expand Down Expand Up @@ -184,7 +210,7 @@ async def query_existing_artifacts(in_q, out_q):
await out_q.put(None)


def artifact_downloader_factory(max_concurrent_downloads=1):
def artifact_downloader_factory(max_concurrent_downloads=100):
"""
A factory returning a Stages API stage to download missing Artifact files, but don't call save()
Expand Down Expand Up @@ -269,7 +295,10 @@ async def return_content_for_downloader(c):
content = results[-1]
for declarative_artifact in content.d_artifacts:
if declarative_artifact.artifact._state.adding:
new_artifact = Artifact(**download_result.artifact_attributes)
new_artifact = Artifact(
**download_result.artifact_attributes,
file=download_result.path
)
declarative_artifact.artifact = new_artifact
pb.done = pb.done + len(content.d_artifacts)
outstanding_downloads = outstanding_downloads - len(content.d_artifacts)
Expand All @@ -296,22 +325,43 @@ async def artifact_saver(in_q, out_q):
sent to `out_q` after all of its `~pulpcore.plugin.stages.DeclarativeArtifact` objects have been
handled.
This stage handles items one-by-one because Artifact is not compatible with bulk_create.
Args:
in_q: `~pulpcore.plugin.stages.DeclarativeContent`
out_q: `~pulpcore.plugin.stages.DeclarativeContent`
Returns:
The artifact_saver stage as a coroutine to be included in a pipeline.
"""
while True:
declarative_content = await in_q.get()
if declarative_content is None:
break
for declarative_artifact in declarative_content.d_artifacts:
declarative_artifact.artifact.save()
await out_q.put(declarative_content)
shutdown = False
declarative_content_list = []
while not shutdown:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content_list:
content = await in_q.get()
declarative_content_list.append(content)
continue
else:
declarative_content_list.append(content)
continue

artifacts_to_save = []
for declarative_content in declarative_content_list:
if declarative_content is None:
shutdown = True
break
for declarative_artifact in declarative_content.d_artifacts:
pass # TODO move the file into place
artifacts_to_save.append(declarative_artifact.artifact)

Artifact.objects.bulk_create(artifacts_to_save)

for declarative_content in declarative_content_list:
if declarative_content is None:
continue
await out_q.put(declarative_content)

await out_q.put(None)


Expand Down Expand Up @@ -599,14 +649,13 @@ async def end_stage(in_q, out_q):

class DeclarativeVersion:

def __init__(self, in_q, repository, sync_mode='mirror'):
def __init__(self, first_stage, repository, sync_mode='mirror'):
"""
A pipeline that creates a new RepositoryVersion from a stream of DeclarativeContent objects.
The plugin writer needs to create a `~pulpcore.plugin.stages.DeclarativeContent` object for
each Content unit that should exist in the new RepositoryVersion. Each
`~pulpcore.plugin.stages.DeclarativeContent` object is put into the pipeline via the `in_q`
object.
The plugin writer needs to specify a first_stage that will create a
`~pulpcore.plugin.stages.DeclarativeContent` object for each Content unit that should exist
in the new RepositoryVersion.
The pipeline stages perform the following steps:
Expand All @@ -619,35 +668,41 @@ def __init__(self, in_q, repository, sync_mode='mirror'):
7. Associate all content units with the new repository version.
8. Unassociate any content units not declared in the stream (only when sync_mode='mirror')
To do this, the plugin writer should create a coroutine which downloads metadata, creates
corresponding DeclarativeContent objects, and put them into the `asyncio.Queue` to send them
down the pipeline. For example:
>>> async def fetch_metadata(remote, out_q):
>>> downloader = remote.get_downloader(remote.url)
>>> result = await downloader.run()
>>> for entry in read_my_metadata_file_somehow(result.path)
>>> unit = MyContent(entry) # make the content unit in memory-only
>>> artifact = Artifact(entry) # make Artifact in memory-only
>>> da = DeclarativeArtifact(artifact, url, entry.relative_path, remote)
>>> dc = DeclarativeContent(content=unit, d_artifacts=[da])
>>> await out_q.put(dc)
>>> await out_q.put(None)
To use your coroutine with the pipeline you have to:
1. Create the asyncio.Queue the pipeline should listen on
2. Schedule your corouine using `ensure_future()`
3. Create the `DeclarativeVersion` and call its `create()` method
To do this, the plugin writer should subclass the FirstStage class and define its
`gen_declarative_content()` interface which return a coroutine. This coroutine should
download metadata, create the corresponding DeclarativeContent objects, and put them into
the `asyncio.Queue` to send them down the pipeline. For example:
>>> class MyFirstStage(FirstStage):
>>>
>>> def __init__(remote):
>>> self.remote = remote
>>>
>>> async def gen_declarative_content(self, out_q):
>>> downloader = remote.get_downloader(remote.url)
>>> result = await downloader.run()
>>> for entry in read_my_metadata_file_somehow(result.path)
>>> unit = MyContent(entry) # make the content unit in memory-only
>>> artifact = Artifact(entry) # make Artifact in memory-only
>>> da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
>>> dc = DeclarativeContent(content=unit, d_artifacts=[da])
>>> await out_q.put(dc)
>>> await out_q.put(None)
To use your first stage with the pipeline you have to instantiate the subclass and pass it
to `DeclarativeVersion`.
1. Create the instance of the FirstStage subclass
2. Create the `DeclarativeVersion` instance, passing the FirstStage subclass instance to it
3. Call the `create()` method on your `DeclarativeVersion` instance
Here is an example:
>>> out_q = asyncio.Queue(maxsize=100) # restricts the number of content units in memory
>>> asyncio.ensure_future(fetch_metadata(remote, out_q)) # Schedule the "fetching" stage
>>> DeclarativeVersion(out_q, repository).create()
>>> first_stage = FileFirstStage(remote)
>>> DeclarativeVersion(first_stage, repository).create()
Args:
in_q (asyncio.Queue): The queue to get DeclarativeContent from
first_stage (FirstStage): The first stage to receive `DeclarativeContent` from
repository (Repository): The repository receiving the new version
sync_mode (str): 'mirror' removes content units from the RepositoryVersion that are not
queued to DeclarativeVersion. 'additive' only adds content units queued to
Expand All @@ -660,7 +715,7 @@ def __init__(self, in_q, repository, sync_mode='mirror'):
if sync_mode is not 'mirror' and sync_mode is not 'additive':
msg = _("'sync_mode' must either be 'mirror' or 'additive' not '{sync_mode}'")
raise ValueError(msg.format(sync_mode=sync_mode))
self.in_q = in_q
self.first_stage = first_stage
self.repository = repository
self.sync_mode = sync_mode

Expand All @@ -672,6 +727,7 @@ def create(self):
with RepositoryVersion.create(self.repository) as new_version:
loop = asyncio.get_event_loop()
stages = [
self.first_stage.gen_declarative_content,
query_existing_artifacts, artifact_downloader_factory(), artifact_saver,
query_existing_content_units, content_unit_saver,
content_unit_association_factory(new_version)
Expand All @@ -680,5 +736,5 @@ def create(self):
stages.append(end_stage)
elif self.sync_mode is 'mirror':
stages.extend([content_unit_unassociation_factory(new_version), end_stage])
pipeline = create_pipeline(stages, self.in_q)
pipeline = create_pipeline(stages)
loop.run_until_complete(pipeline)

0 comments on commit 4523f5f

Please sign in to comment.