Skip to content

Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather #3207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

dgegen
Copy link

@dgegen dgegen commented Jul 5, 2025

As described in #3196, I encountered issues opening Zarr v3 arrays stored over SFTP using fsspec. Specifically, python would freeze opening zarr arrays.

Root Cause

The issue stems from the use of asyncio.gather in zarr.core.array.get_array_metadata, which attempts to read multiple metadata files (e.g., .zarray, .zattrs, zarr.json) concurrently. This works well for truly asynchronous filesystems, but breaks when using systems like SFTPFileSystem, which does not seem to be concurrency-safe in async contexts (potentially relying on blocking I/O internally or managing connection states using global locks) leading to deadlocks or indefinite hangs when asyncio.gather is used to perform multiple reads simultaneously.

Solution

To address this, I’ve implemented a fallback to sequential reads for filesystems that are not concurrency-safe. The logic is as follows: For non asynchronous file systems, the user sets store.fs.asynchronous=False. The helper function is_concurrency_safe(store_path: StorePath) -> bool, checks this getattr(fs, "asynchronous", True). If True asyncio.gather is used, else we fall back to sequential await. This Preserves the performance benefit of concurrent reads for safe filesystems (e.g., local disk, S3, GCS), while preventing deadlocks and improved robustness when using backends like SFTP.

These changes may not address all scenarios not asynchronous file systems could cause issues, as there are several other instances of asyncio.gather in zarr.core.array and zarr.core.group. However, I opted to focus on this specific problem first, as enabling the opening of arrays and groups is likely the highest priority, and I wanted to discuss this approach before making too many changes.

I look forward to hearing your thoughts and seeing this issue resolved!

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.rst
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Jul 5, 2025
@d-v-b
Copy link
Contributor

d-v-b commented Jul 5, 2025

Good detective work here! I think the ideal solution would keep store implementation details confined to the store classes themselves. So instead of the solution here, what if we override the get_many method on the fsspec store to include the logic you have added here, and then use that method instead of multiple gets

@dgegen
Copy link
Author

dgegen commented Jul 5, 2025

Very good point! Perhaps something along these lines?

class StorePath:
    # ...
    async def _is_concurrency_save(self):
        fs = getattr(self.store, "fs", None)
        return getattr(fs, "asynchronous", True)

    async def get_many(
        self,
        *suffixes : str,
        prototype: BufferPrototype | None = None,
        byte_range: ByteRequest | None = None,
    ):
        tasks = [
            (self / suffix).get(prototype=prototype, byte_range=byte_range) for suffix in suffixes
        ]
        if await self._is_concurrency_save():
            return await gather(*tasks)
        else:
            results = []
            for task in tasks:
                result = await task
                results.append(result)
            return results
            
class FsspecStore:
    # ...
    async def _get_many(
        self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
    ) -> AsyncGenerator[tuple[str, Buffer | None], None]:
        if getattr(self.fs, "asynchronous", True):
            async for result in super()._get_many(requests=requests):
                yield result
        else:
            for key, prototype, byte_range in requests:
                value = await self.get(key, prototype, byte_range)
                yield (key, value)
                

Copy link

codecov bot commented Jul 10, 2025

Codecov Report

Attention: Patch coverage is 84.21053% with 3 lines in your changes missing coverage. Please review.

Project coverage is 94.73%. Comparing base (ea4d7e9) to head (7f72217).

Files with missing lines Patch % Lines
src/zarr/storage/_fsspec.py 57.14% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3207      +/-   ##
==========================================
- Coverage   94.76%   94.73%   -0.03%     
==========================================
  Files          78       78              
  Lines        8710     8726      +16     
==========================================
+ Hits         8254     8267      +13     
- Misses        456      459       +3     
Files with missing lines Coverage Δ
src/zarr/abc/store.py 95.80% <ø> (ø)
src/zarr/core/array.py 98.52% <100.00%> (ø)
src/zarr/core/group.py 94.87% <100.00%> (ø)
src/zarr/storage/_common.py 92.74% <100.00%> (+0.03%) ⬆️
src/zarr/testing/store.py 100.00% <100.00%> (ø)
src/zarr/storage/_fsspec.py 88.15% <57.14%> (-1.50%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions github-actions bot removed the needs release notes Automatically applied to PRs which haven't added release notes label Jul 10, 2025
Comment on lines 285 to 298
async def _is_concurrency_save(self):
fs = getattr(self.store, "fs", None)
return getattr(fs, "asynchronous", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a method specific to the fsspec store. the fsspec store should call it inside _get_many in order to chose which implementation to use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Comment on lines 3480 to 3482
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets bind _join_paths([path, X]) to a variable so that we don't call the _join_paths function so many times. for example:

zarray_path = _join_paths([path, ZARRAY_JSON])
...

- Introduced `Store.get_many_ordered` and `StorePath.get_many_ordered` to
  retrieve multiple metadata files in a single call, optimizing the retrieval
  process and reducing overhead. `get_many_ordered` is used in
  `get_array_metadata`.
- Modified `FsspecStore._get_many` to conditionally use `asyncio.gather`
  based on the concurrency safety of the underlying file system, enhancing
  compatibility with synchronous file systems by avoiding deadlocks when
  accessing metadata concurrently.
- Modified `_read_metadata_v2` to use `Store._get_many`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants