Skip to content

Commit

Permalink
Refactor existing stages to use the async batches iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
gmbnomis committed Sep 20, 2018
1 parent 28ed635 commit 93f4ac2
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 118 deletions.
48 changes: 2 additions & 46 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
Expand Up @@ -45,26 +45,9 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
batch = []
shutdown = False
while True:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not batch:
content = await in_q.get()
batch.append(content)
continue
else:
batch.append(content)
continue

async for batch in self.batches(in_q):
all_artifacts_q = Q(pk=None)
for content in batch:
if content is None:
shutdown = True
continue

for declarative_artifact in content.d_artifacts:
one_artifact_q = Q()
for digest_name in declarative_artifact.artifact.DIGEST_FIELDS:
Expand All @@ -77,21 +60,14 @@ async def __call__(self, in_q, out_q):

for artifact in Artifact.objects.filter(all_artifacts_q):
for content in batch:
if content is None:
continue
for declarative_artifact in content.d_artifacts:
for digest_name in artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value and digest_value == getattr(artifact, digest_name):
declarative_artifact.artifact = artifact
break
for content in batch:
if content is None:
continue
await out_q.put(content)
batch = []
if shutdown:
break
await out_q.put(None)


Expand Down Expand Up @@ -259,25 +235,9 @@ async def __call__(self, in_q, out_q):
The coroutine for this stage.
"""
storage_backend = DefaultStorage()
shutdown = False
batch = []
while not shutdown:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not batch:
content = await in_q.get()
batch.append(content)
continue
else:
batch.append(content)
continue

async for batch in self.batches(in_q):
artifacts_to_save = []
for declarative_content in batch:
if declarative_content is None:
shutdown = True
break
for declarative_artifact in declarative_content.d_artifacts:
if declarative_artifact.artifact.pk is None:
src_path = str(declarative_artifact.artifact.file)
Expand All @@ -292,10 +252,6 @@ async def __call__(self, in_q, out_q):
Artifact.objects.bulk_create(artifacts_to_save)

for declarative_content in batch:
if declarative_content is None:
continue
await out_q.put(declarative_content)

batch = []

await out_q.put(None)
23 changes: 1 addition & 22 deletions plugin/pulpcore/plugin/stages/association_stages.py
@@ -1,4 +1,3 @@
import asyncio
from collections import defaultdict

from django.db.models import Q
Expand Down Expand Up @@ -52,25 +51,9 @@ async def __call__(self, in_q, out_q):
The coroutine for this stage.
"""
with ProgressBar(message='Associating Content') as pb:
batch = []
shutdown = False
while True:
try:
declarative_content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not batch and not shutdown:
declarative_content = await in_q.get()
batch.append(declarative_content)
continue
else:
batch.append(declarative_content)
continue

async for batch in self.batches(in_q):
content_q_by_type = defaultdict(lambda: Q(pk=None))
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue
try:
unit_key = declarative_content.content.natural_key()
self.unit_keys_by_type[type(declarative_content.content)].remove(unit_key)
Expand All @@ -86,10 +69,6 @@ async def __call__(self, in_q, out_q):
pb.done = pb.done + queryset.count()
pb.save()

if shutdown:
break
batch = []

for unit_type, ids in self.unit_keys_by_type.items():
if ids:
units_to_unassociate = Q()
Expand Down
54 changes: 4 additions & 50 deletions plugin/pulpcore/plugin/stages/content_unit_stages.py
@@ -1,4 +1,3 @@
import asyncio
from collections import defaultdict

from django.db import transaction
Expand Down Expand Up @@ -44,35 +43,16 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
batch = []
shutdown = False
while True:
try:
content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not batch:
content = await in_q.get()
batch.append(content)
continue
else:
batch.append(content)
continue

async for batch in self.batches(in_q):
content_q_by_type = defaultdict(lambda: Q(pk=None))
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue

model_type = type(declarative_content.content)
unit_key = declarative_content.content.natural_key_dict()
content_q_by_type[model_type] = content_q_by_type[model_type] | Q(**unit_key)

for model_type in content_q_by_type.keys():
for result in model_type.objects.filter(content_q_by_type[model_type]):
for declarative_content in batch:
if declarative_content is None:
continue
not_same_unit = False
for field in result.natural_key_fields():
in_memory_digest_value = getattr(declarative_content.content, field)
Expand All @@ -84,9 +64,6 @@ async def __call__(self, in_q, out_q):
declarative_content.content = result
for declarative_content in batch:
await out_q.put(declarative_content)
batch = []
if shutdown:
break
await out_q.put(None)


Expand Down Expand Up @@ -124,30 +101,14 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
batch = []
shutdown = False
while True:
try:
declarative_content = in_q.get_nowait()
except asyncio.QueueEmpty:
if not batch and not shutdown:
declarative_content = await in_q.get()
batch.append(declarative_content)
continue
else:
batch.append(declarative_content)
continue

async for batch in self.batches(in_q):
content_artifact_bulk = []
remote_artifact_bulk = []
remote_artifact_map = {}

with transaction.atomic():
await self._pre_save(batch)
for declarative_content in batch:
if declarative_content is None:
shutdown = True
continue
if declarative_content.content.pk is None:
declarative_content.content.save()
for declarative_artifact in declarative_content.d_artifacts:
Expand Down Expand Up @@ -185,20 +146,14 @@ async def __call__(self, in_q, out_q):
await self._post_save(batch)

for declarative_content in batch:
if declarative_content is None:
continue
await out_q.put(declarative_content)
if shutdown:
break
batch = []
await out_q.put(None)

async def _pre_save(self, batch):
"""
A hook plugin-writers can override to save related objects prior to content unit saving.
This is run within the same transaction as the content unit saving. As with the Stages API
it's possible one item in batch is None indicating it is the end of the pipeline.
This is run within the same transaction as the content unit saving.
Args:
batch (list of :class:`~pulpcore.plugin.stages.DeclarativeContent`): The batch of
Expand All @@ -211,8 +166,7 @@ async def _post_save(self, batch):
"""
A hook plugin-writers can override to save related objects after content unit saving.
This is run within the same transaction as the content unit saving. As with the Stages API
it's possible one item in batch is None indicating it is the end of the pipeline.
This is run within the same transaction as the content unit saving.
Args:
batch (list of :class:`~pulpcore.plugin.stages.DeclarativeContent`): The batch of
Expand Down

0 comments on commit 93f4ac2

Please sign in to comment.