Skip to content

Commit

Permalink
Fixes Artifact redownloading bug
Browse files Browse the repository at this point in the history
The `sync_to_async_iterable` wraps the Artifact queryset, but unlike
querysets, it can't be reused. This causes subsequent iterations
through it to not actually iterate.

closes #9542
  • Loading branch information
mdellweg authored and bmbouter committed Nov 30, 2021
1 parent fcb355c commit a943156
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES/9542.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug where Artifacts were being downloaded even if they were already saved in Pulp.
33 changes: 19 additions & 14 deletions pulpcore/plugin/sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from asgiref.sync import sync_to_async


# This is useful for querysets which don't have async support yet. Django querysets issue a db call
# when the iterator for them is requested, so we need that to be wrapped in `sync_to_async` also.
iter_async = sync_to_async(iter)


Expand All @@ -12,21 +14,24 @@ def next_async(it):
raise StopAsyncIteration


async def sync_to_async_iterable(sync_iterable):
def sync_to_async_iterable(sync_iterable):
"""
Utility method which runs each iteration of a synchronous iterable in a threadpool. It also
sets a threadlocal inside the thread so calls to AsyncToSync can escape it. The implementation
relies on `asgiref.sync.sync_to_async`. thread_sensitive parameter for sync_to_async defaults
to True. This code will run in the same thread as any outer code. This is needed for
underlying Python code that is not threadsafe (for example, code which handles database
connections).
Creates an async iterable.
The returned iterator is able to be reused and iterated through multiple times.
Args:
sync_iterable (iter): A synchronous iterable such as a QuerySet.
sync_iterable: An iterable to be asynchronously iterated through.
"""
sync_iterator = await iter_async(sync_iterable)
while True:
try:
yield await next_async(sync_iterator)
except StopAsyncIteration:
return

class _Wrapper:
def __aiter__(self):
self.sync_iterator = None
return self

async def __anext__(self):
if self.sync_iterator is None:
self.sync_iterator = await iter_async(sync_iterable)
return await next_async(self.sync_iterator)

return _Wrapper()

0 comments on commit a943156

Please sign in to comment.