-
-
Notifications
You must be signed in to change notification settings - Fork 345
Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather
#3207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Good detective work here! I think the ideal solution would keep store implementation details confined to the store classes themselves. So instead of the solution here, what if we override the get_many method on the fsspec store to include the logic you have added here, and then use that method instead of multiple gets |
Very good point! Perhaps something along these lines? class StorePath:
# ...
async def _is_concurrency_save(self):
fs = getattr(self.store, "fs", None)
return getattr(fs, "asynchronous", True)
async def get_many(
self,
*suffixes : str,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
):
tasks = [
(self / suffix).get(prototype=prototype, byte_range=byte_range) for suffix in suffixes
]
if await self._is_concurrency_save():
return await gather(*tasks)
else:
results = []
for task in tasks:
result = await task
results.append(result)
return results
class FsspecStore:
# ...
async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
if getattr(self.fs, "asynchronous", True):
async for result in super()._get_many(requests=requests):
yield result
else:
for key, prototype, byte_range in requests:
value = await self.get(key, prototype, byte_range)
yield (key, value)
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3207 +/- ##
==========================================
- Coverage 94.76% 94.73% -0.03%
==========================================
Files 78 78
Lines 8710 8726 +16
==========================================
+ Hits 8254 8267 +13
- Misses 456 459 +3
🚀 New features to boost your workflow:
|
src/zarr/storage/_common.py
Outdated
async def _is_concurrency_save(self): | ||
fs = getattr(self.store, "fs", None) | ||
return getattr(fs, "asynchronous", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a method specific to the fsspec store. the fsspec store should call it inside _get_many
in order to chose which implementation to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
src/zarr/core/group.py
Outdated
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None), | ||
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None), | ||
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets bind _join_paths([path, X])
to a variable so that we don't call the _join_paths
function so many times. for example:
zarray_path = _join_paths([path, ZARRAY_JSON])
...
- Introduced `Store.get_many_ordered` and `StorePath.get_many_ordered` to retrieve multiple metadata files in a single call, optimizing the retrieval process and reducing overhead. `get_many_ordered` is used in `get_array_metadata`. - Modified `FsspecStore._get_many` to conditionally use `asyncio.gather` based on the concurrency safety of the underlying file system, enhancing compatibility with synchronous file systems by avoiding deadlocks when accessing metadata concurrently. - Modified `_read_metadata_v2` to use `Store._get_many`
As described in #3196, I encountered issues opening Zarr v3 arrays stored over SFTP using
fsspec
. Specifically, python would freeze opening zarr arrays.Root Cause
The issue stems from the use of
asyncio.gather
inzarr.core.array.get_array_metadata
, which attempts to read multiple metadata files (e.g.,.zarray
,.zattrs
,zarr.json
) concurrently. This works well for truly asynchronous filesystems, but breaks when using systems likeSFTPFileSystem
, which does not seem to be concurrency-safe in async contexts (potentially relying on blocking I/O internally or managing connection states using global locks) leading to deadlocks or indefinite hangs whenasyncio.gather
is used to perform multiple reads simultaneously.Solution
To address this, I’ve implemented a fallback to sequential reads for filesystems that are not concurrency-safe. The logic is as follows: For non asynchronous file systems, the user sets
store.fs.asynchronous=False
. The helper functionis_concurrency_safe(store_path: StorePath) -> bool
, checks thisgetattr(fs, "asynchronous", True)
. If Trueasyncio.gather
is used, else we fall back to sequentialawait
. This Preserves the performance benefit of concurrent reads for safe filesystems (e.g., local disk, S3, GCS), while preventing deadlocks and improved robustness when using backends like SFTP.These changes may not address all scenarios not asynchronous file systems could cause issues, as there are several other instances of
asyncio.gather
inzarr.core.array
andzarr.core.group
. However, I opted to focus on this specific problem first, as enabling the opening of arrays and groups is likely the highest priority, and I wanted to discuss this approach before making too many changes.I look forward to hearing your thoughts and seeing this issue resolved!
TODO:
docs/user-guide/*.rst
changes/