Skip to content

Commit

Permalink
Updating stages w/ some clearer names
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Bouterse committed Aug 2, 2018
1 parent 05bb5f8 commit 082e1e4
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 43 deletions.
44 changes: 22 additions & 22 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ async def query_existing_artifacts(in_q, out_q):
Returns:
The query_existing_artifacts stage as a coroutine to be included in a pipeline.
"""
declarative_content = []
batch = []
shutdown = False
while True:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content:
if not batch:
content = await in_q.get()
declarative_content.append(content)
batch.append(content)
continue
else:
declarative_content.append(content)
batch.append(content)
continue

all_artifacts_q = Q(pk=None)
for content in declarative_content:
for content in batch:
if content is None:
shutdown = True
continue
Expand All @@ -68,19 +68,19 @@ async def query_existing_artifacts(in_q, out_q):
all_artifacts_q |= one_artifact_q

for artifact in Artifact.objects.filter(all_artifacts_q):
for content in declarative_content:
for content in batch:
if content is None:
continue
for declarative_artifact in content.d_artifacts:
for digest_name in artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value and digest_value == getattr(artifact, digest_name):
declarative_artifact.artifact = artifact
for content in declarative_content:
for content in batch:
if content is None:
continue
await out_q.put(content)
declarative_content = []
batch = []
if shutdown:
break
await out_q.put(None)
Expand Down Expand Up @@ -135,7 +135,7 @@ async def stage(self, in_q, out_q):
"""
pending = set()
incoming_content = []
unhandled_content = []
outstanding_downloads = 0
shutdown = False
saturated = False
Expand All @@ -145,15 +145,15 @@ async def stage(self, in_q, out_q):
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not incoming_content and not shutdown and not pending:
if not unhandled_content and not shutdown and not pending:
content = await in_q.get()
incoming_content.append(content)
unhandled_content.append(content)
continue
else:
incoming_content.append(content)
unhandled_content.append(content)
continue

for i, content in enumerate(incoming_content):
for i, content in enumerate(unhandled_content):
if content is None:
shutdown = True
continue
Expand All @@ -177,10 +177,10 @@ async def return_content_for_downloader(c):
pending.add(asyncio.gather(*downloaders_for_content))
if outstanding_downloads > self.max_concurrent_downloads:
saturated = True
incoming_content = incoming_content[i + 1:] # remove handled content
unhandled_content = unhandled_content[i + 1:] # remove handled content
break
else:
incoming_content = []
unhandled_content = []

if pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
Expand Down Expand Up @@ -233,21 +233,21 @@ async def artifact_saver(in_q, out_q):
The artifact_saver stage as a coroutine to be included in a pipeline.
"""
shutdown = False
declarative_content_list = []
batch = []
while not shutdown:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content_list:
if not batch:
content = await in_q.get()
declarative_content_list.append(content)
batch.append(content)
continue
else:
declarative_content_list.append(content)
batch.append(content)
continue

artifacts_to_save = []
for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
shutdown = True
break
Expand All @@ -264,11 +264,11 @@ async def artifact_saver(in_q, out_q):
if artifacts_to_save:
Artifact.objects.bulk_create(artifacts_to_save)

for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
continue
await out_q.put(declarative_content)

declarative_content_list = []
batch = []

await out_q.put(None)
12 changes: 6 additions & 6 deletions plugin/pulpcore/plugin/stages/association_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ async def stage(self, in_q, out_q):
:class:`~pulpcore.plugin.models.Content` type.
"""
with ProgressBar(message='Associating Content') as pb:
declarative_content_list = []
batch = []
shutdown = False
while True:
try:
declarative_content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content_list and not shutdown:
if not batch and not shutdown:
declarative_content = await in_q.get()
declarative_content_list.append(declarative_content)
batch.append(declarative_content)
continue
else:
declarative_content_list.append(declarative_content)
batch.append(declarative_content)
continue

content_q_by_type = defaultdict(lambda: Q(pk=None))
for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue
Expand All @@ -88,7 +88,7 @@ async def stage(self, in_q, out_q):

if shutdown:
break
declarative_content_list = []
batch = []

for unit_type, ids in self.unit_keys_by_type.items():
if ids:
Expand Down
30 changes: 15 additions & 15 deletions plugin/pulpcore/plugin/stages/content_unit_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ async def query_existing_content_units(in_q, out_q):
Returns:
The query_existing_content_units stage as a coroutine to be included in a pipeline.
"""
declarative_content_list = []
batch = []
shutdown = False
while True:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content_list:
if not batch:
content = await in_q.get()
declarative_content_list.append(content)
batch.append(content)
continue
else:
declarative_content_list.append(content)
batch.append(content)
continue

content_q_by_type = defaultdict(lambda: Q(pk=None))
for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue
Expand All @@ -63,7 +63,7 @@ async def query_existing_content_units(in_q, out_q):

for model_type in content_q_by_type.keys():
for result in model_type.objects.filter(content_q_by_type[model_type]):
for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
continue
not_same_unit = False
Expand All @@ -74,9 +74,9 @@ async def query_existing_content_units(in_q, out_q):
if not_same_unit:
continue
declarative_content.content = result
for declarative_content in declarative_content_list:
for declarative_content in batch:
await out_q.put(declarative_content)
declarative_content_list = []
batch = []
if shutdown:
break
await out_q.put(None)
Expand Down Expand Up @@ -111,26 +111,26 @@ async def content_unit_saver(in_q, out_q):
Returns:
The content_unit_saver stage as a coroutine to be included in a pipeline.
"""
declarative_content_list = []
batch = []
shutdown = False
while True:
try:
declarative_content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not declarative_content_list and not shutdown:
if not batch and not shutdown:
declarative_content = await in_q.get()
declarative_content_list.append(declarative_content)
batch.append(declarative_content)
continue
else:
declarative_content_list.append(declarative_content)
batch.append(declarative_content)
continue

content_artifact_bulk = []
remote_artifact_bulk = []
remote_artifact_map = {}

with transaction.atomic():
for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue
Expand Down Expand Up @@ -166,11 +166,11 @@ async def content_unit_saver(in_q, out_q):

RemoteArtifact.objects.bulk_create(remote_artifact_bulk)

for declarative_content in declarative_content_list:
for declarative_content in batch:
if declarative_content is None:
continue
await out_q.put(declarative_content)
if shutdown:
break
declarative_content_list = []
batch = []
await out_q.put(None)

0 comments on commit 082e1e4

Please sign in to comment.