Skip to content

Commit

Permalink
backports Artifact redownloading bug
Browse files Browse the repository at this point in the history
The `sync_to_async_iterable` wraps the Artifact queryset, but unlike
querysets, it can't be reused. This causes subsequent iterations
through it to not actually iterate.

backports #9542

fixes #9596

(cherry picked from commit a943156)
  • Loading branch information
bmbouter committed Dec 2, 2021
1 parent 68412d5 commit 20c1a97
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGES/9596.bugfix
@@ -0,0 +1,2 @@
Fixed bug where Artifacts were being downloaded even if they were already saved in Pulp.
(backported from #9542)
33 changes: 19 additions & 14 deletions pulpcore/plugin/sync.py
@@ -1,6 +1,8 @@
from asgiref.sync import sync_to_async


# This is useful for querysets which don't have async support yet. Django querysets issue a db call
# when the iterator for them is requested, so we need that to be wrapped in `sync_to_async` also.
iter_async = sync_to_async(iter)


Expand All @@ -12,21 +14,24 @@ def next_async(it):
raise StopAsyncIteration


async def sync_to_async_iterable(sync_iterable):
def sync_to_async_iterable(sync_iterable):
"""
Utility method which runs each iteration of a synchronous iterable in a threadpool. It also
sets a threadlocal inside the thread so calls to AsyncToSync can escape it. The implementation
relies on `asgiref.sync.sync_to_async`. thread_sensitive parameter for sync_to_async defaults
to True. This code will run in the same thread as any outer code. This is needed for
underlying Python code that is not threadsafe (for example, code which handles database
connections).
Creates an async iterable.
The returned iterator is able to be reused and iterated through multiple times.
Args:
sync_iterable (iter): A synchronous iterable such as a QuerySet.
sync_iterable: An iterable to be asynchronously iterated through.
"""
sync_iterator = await iter_async(sync_iterable)
while True:
try:
yield await next_async(sync_iterator)
except StopAsyncIteration:
return

class _Wrapper:
def __aiter__(self):
self.sync_iterator = None
return self

async def __anext__(self):
if self.sync_iterator is None:
self.sync_iterator = await iter_async(sync_iterable)
return await next_async(self.sync_iterator)

return _Wrapper()

0 comments on commit 20c1a97

Please sign in to comment.