[Data] Fix credential drop and IMDS herd in download expression#62894
[Data] Fix credential drop and IMDS herd in download expression#62894xyuzh wants to merge 2 commits into
Conversation
Two independent bugs in the `download` expression pipeline that compound
under concurrent S3 workloads on EC2:
1. **Silent credential drop (obstore path).** For fsspec S3 filesystems
with Okta/STS/profile-based auth, static attrs (`key`/`secret`/`token`,
`storage_options`) are `None` — credentials live on
`fs.session.get_credentials()` and may rotate. Extraction returned
`{}`, obstore's `from_url` got no keys, and obstore fell back to its
own credential chain (IMDS on EC2). Users never saw a warning.
2. **IMDS thundering herd (threaded fallback).** `download_bytes_threaded`
spawned 16 worker threads, each calling `_resolve_paths_and_filesystem`
independently on its first URI. With N concurrent Download tasks on a
node that was ~16xN simultaneous IMDS calls, tripping the per-instance
rate limit and producing intermittent `NoCredentialsError`.
Changes:
- `_extract_credentials_from_filesystem` now returns `Optional[Dict]`:
`None` signals "route to threaded" (user's FS stays authoritative).
New `_frozen_s3fs_credentials` helper snapshots via
`session.get_credentials().get_frozen_credentials()` (sync boto3 and
async aiobotocore), so Okta/STS/profile-based credentials reach obstore.
Unrecognized non-None filesystems also return `None` per the contract.
- New `_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)` centralizes
the dispatch decision with a one-shot WARNING per filesystem when
credentials can't be extracted.
- `plan_download_op` pre-decides obstore-vs-threaded at plan time and
routes `AsyncPartitionActor` accordingly (no silent downgrade).
- `download_bytes_threaded` resolves the filesystem exactly once per
(block, column) before `make_async_gen`, then shares it across all 16
workers — no per-worker IMDS storm. The probe loop requires both
`paths` and `fs` to be usable before breaking, avoiding a leak-through
when `_resolve_paths_and_filesystem` silently drops a bad URI.
- `download_bytes_async` extracts credentials once in sync context and
threads the kwargs through to `_download_uris_with_obstore` (fixes a
correctness issue where aiobotocore's async `get_credentials` couldn't
run from inside an existing event loop).
- `AsyncPartitionActor` fails closed with a clear `RuntimeError` if it's
ever constructed with an unextractable filesystem (dispatch bug guard).
Tests: added coverage for session-based credential snapshotting (boto3
sync + aiobotocore async), warning dedup, dispatch routing on
unextractable filesystems, and the threaded pre-resolve (one probe
across workers, supplied-FS skips probe, bad-first-URI handling,
empty-paths regression, empty-block safety, fail-closed actor init).
There was a problem hiding this comment.
Code Review
This pull request enhances the obstore download path by adding support for session-backed s3fs credentials (Okta/STS) and introducing a routing logic that falls back to threaded PyArrow downloads when credentials cannot be statically extracted. It also mitigates the "IMDS thundering herd" problem in the threaded path by pre-resolving the filesystem once per block. Reviewers noted that obstore support could be extended to GCS and Azure fsspec protocols and raised concerns about a potential regression in handling mixed-scheme blocks due to the new "sticky" filesystem behavior in threaded downloads.
| if protocol not in ("s3", "s3a"): | ||
| # Non-S3 fsspec — obstore can't use these. Callers usually filter | ||
| # this out via _obstore_filesystem_requires_threaded_download, but | ||
| # direct callers must also route to the threaded path. | ||
| return None |
There was a problem hiding this comment.
The comment states that obstore cannot use non-S3 fsspec protocols, but obstore (via the underlying object_store Rust crate) does support GCS and Azure. While the current credential extraction logic is focused on S3, it might be worth clarifying the comment or considering future support for GCS/Azure fsspec protocols to avoid unnecessary fallbacks to the threaded path when obstore could potentially be used.
| resolved_paths, _ = _resolve_paths_and_filesystem( | ||
| uri, filesystem=resolved_fs | ||
| ) | ||
| fs_for_read = wrapped_fs |
There was a problem hiding this comment.
The implementation of download_bytes_threaded now uses a single pre-resolved filesystem (resolved_fs) for all URIs in a block to mitigate the IMDS thundering herd. This introduces a 'sticky' filesystem behavior per block. If a block contains mixed-scheme URIs (e.g., S3 and GCS), URIs that are incompatible with the first resolved filesystem will fail to download, as _resolve_paths_and_filesystem will return the incompatible input filesystem when one is provided (per its documented behavior). While mixed-scheme blocks are rare in Ray Data, this is a regression in flexibility compared to the previous per-worker resolution. Consider if a per-scheme cache within the worker would be a more robust alternative to support mixed-scheme blocks while still avoiding the thundering herd.
Direct-caller guard: if _download_uris_with_obstore is invoked with a
filesystem whose credentials can't be statically extracted, raise
RuntimeError instead of coercing extraction's None to {}. The old
coercion silently handed obstore's ambient credential chain (IMDS/env)
to any internal/test caller that bypassed _plan_obstore_routing —
exactly the silent-drop contract the new routing was designed to
prevent. filesystem=None still legitimately yields fs_kwargs={}.
Adds a regression test covering the new raise.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 53bfece. Configure here.
| # Unrecognized non-None filesystem — route to threaded so the user's FS | ||
| # stays authoritative. Obstore with empty kwargs would silently use its | ||
| # own credential chain, dropping any configuration on the user's FS. | ||
| return None |
There was a problem hiding this comment.
Unrecognized native filesystems unnecessarily forced to threaded path
Low Severity
_extract_credentials_from_filesystem now returns None for all unrecognized non-None filesystems (e.g. LocalFileSystem), whereas it previously returned {}. This causes _plan_obstore_routing to route these to the threaded path and emit a misleading warning about "S3 credentials" — even though obstore natively supports file:// URIs and no credentials are needed. Users explicitly passing pafs.LocalFileSystem() would see an unnecessary performance downgrade and a confusing warning.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 53bfece. Configure here.
|
Splitting into two independent PRs for easier review:
The two bugs are independent; either PR can land first. Closing this combined PR in favor of the split. |


Summary
Fixes two independent bugs in the
downloadexpression pipeline that compound under concurrent S3 workloads on EC2.Bug 1 — Silent credential drop (obstore path)
When a user passes an fsspec
S3FileSystemwith Okta/STS/profile-based auth (wrapped asPyFileSystem(FSSpecHandler(s3fs_fs))),_extract_credentials_from_filesystemread only static attrs (key/secret/token,storage_options). Those attrs areNonefor session-backed s3fs — credentials live onfs.session.get_credentials()and may rotate. The function returned{}, obstore'sfrom_urlgot no keys, and obstore silently fell back to its own credential chain (IMDS on EC2). Users never saw a warning.Bug 2 — IMDS thundering herd (threaded fallback)
download_bytes_threadedspawned 16 worker threads viamake_async_gen. Each worker independently called_resolve_paths_and_filesystem(uri, cached_fs=None)on its first URI, constructing a freshpyarrow.fs.S3FileSystem→ triggering an IMDS credential fetch. With N concurrent Download tasks on a node that was ~16×N near-simultaneous IMDS HTTP calls — enough to trip the per-instance rate limit and produce intermittentNoCredentialsError.Changes
_extract_credentials_from_filesystemnow returnsOptional[Dict].Nonesignals "route to threaded" so the user's filesystem stays authoritative. New_frozen_s3fs_credentialshelper snapshots viasession.get_credentials().get_frozen_credentials()for both sync (botocore) and async (aiobotocore) sessions. Unrecognized non-Nonefilesystems also returnNoneper the new contract._plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)centralizes dispatch with a one-shotWARNINGper filesystem when credentials can't be extracted.plan_download_oppre-decides obstore-vs-threaded at plan time and routesAsyncPartitionActoraccordingly — no silent downgrade.download_bytes_threadedresolves the filesystem exactly once per (block, column) beforemake_async_genand shares it across all 16 workers via closure default-arg binding. Probe loop requires bothpathsandfsto be usable before breaking, avoiding leak-through when_resolve_paths_and_filesystemsilently drops a bad URI.download_bytes_asyncextracts credentials once in sync context and threads kwargs through to_download_uris_with_obstore. Fixes a correctness issue where aiobotocore's asyncget_credentialscouldn't run from inside an existing event loop.AsyncPartitionActorfails closed with a clearRuntimeErrorif it's ever constructed with an unextractable filesystem (dispatch-bug guard).Test plan
pre-commit runpasses on all changed files (ruff, pydoclint, black, docstyle, semgrep, import order, mock-method / logger checks).test_obstore_download.py:TestSessionBackedFsspecCredentials— boto3 sync session, aiobotocore async session,_sessionfallback for older s3fs, unresolvable session returnsNone, no-access-key returnsNone,anon=Trueskips session, static attrs win over session.TestPlanObstoreRouting—Nonefilesystem → obstore, non-S3 fsspec → threaded, unextractable fsspec-S3 → threaded with warning, warning dedup (same FS twice = one warning), extractable fsspec-S3 → obstore,AsyncPartitionActorraises on unextractable creds.TestThreadedDownloadPreResolve— exactly one probe across 16 workers, supplied-FS skips probe, bad-first-URI still finds FS, all-bad URIs don't crash, empty block doesn't spawn workers, regression test for([], fs)probe-break bug.S3FileSystemover ~100 URIs (noNoCredentialsError, moto sees signed requests).RAY_DATA_USE_OBSTORE=0over concurrent tasks, verifyingS3FileSystem.__init__is invoked exactly once per block rather than per worker.