Skip to content

[Data] Pre-resolve filesystem in threaded download to avoid IMDS herd#62898

Open
xyuzh wants to merge 7 commits into
ray-project:masterfrom
xyuzh:fix/download-threaded-pre-resolve
Open

[Data] Pre-resolve filesystem in threaded download to avoid IMDS herd#62898
xyuzh wants to merge 7 commits into
ray-project:masterfrom
xyuzh:fix/download-threaded-pre-resolve

Conversation

@xyuzh
Copy link
Copy Markdown
Member

@xyuzh xyuzh commented Apr 23, 2026

Summary

download_bytes_threaded spawns 16 worker threads via make_async_gen. Each worker called _resolve_paths_and_filesystem independently on its first URI. When no filesystem is provided, that call constructs a fresh pyarrow.fs.S3FileSystem, which triggers an IMDS credential fetch on EC2. With N concurrent Download tasks on a node that was ~16×N near-simultaneous IMDS HTTP calls — enough to trip the per-instance rate limit and produce intermittent NoCredentialsError.

This is the second of two independent PRs. The matching fsspec credential-extraction fix — which triggers the threaded fallback whenever a session-backed filesystem can't have credentials statically extracted — is in a separate PR. Either can land first; this change is valuable on its own because the threaded path is also used when obstore is disabled (RAY_DATA_USE_OBSTORE=0) or when the URI scheme isn't obstore-supported.

Changes

  • Resolve the filesystem exactly once per (block, column) before make_async_gen, then share it across all 16 workers via default-arg binding on the inner load_uri_bytes closure. Workers now do zero filesystem inference — their per-URI calls to _resolve_paths_and_filesystem short-circuit (path normalization only; no network) because a filesystem is supplied.
  • The probe loop:
    • Skips None URIs.
    • Catches resolution exceptions and continues to the next URI.
    • Requires both paths and a non-None filesystem before breaking — otherwise _resolve_paths_and_filesystem could silently drop a bad URI and leave workers with no usable filesystem, recreating the herd.
  • If every probe fails, workers retain the legacy per-URI resolution as a last resort so a block of partially-unresolvable URIs doesn't lose the ones that might still resolve.

Test plan

  • pre-commit run passes.
  • New TestThreadedDownloadPreResolve covers:
    • test_resolves_filesystem_once_across_workers — spy on _resolve_paths_and_filesystem, 10 URIs × 16 workers → exactly 1 probe call with filesystem=None (vs. up to 16 before the fix).
    • test_supplied_filesystem_skips_probe — with a user-supplied FS, zero probe calls happen.
    • test_bad_first_uri_still_finds_filesystem — mix of None + resolvable URIs; probe keeps going past None.
    • test_all_probe_uris_unresolvable_does_not_crash — all URIs unresolvable → no crash, every result is None.
    • test_empty_block_does_not_spawn_workers — zero URIs → no probe, no workers.
    • test_probe_rejects_empty_paths_result — regression test for the ([], fs) silent-drop case: the probe must skip it and continue.
  • End-to-end under concurrent tasks with RAY_DATA_USE_OBSTORE=0, verifying pyarrow.fs.S3FileSystem.__init__ is invoked exactly once per block rather than per worker.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request optimizes the download_bytes_threaded function by pre-resolving the filesystem once per block/column. This change prevents the "IMDS thundering herd" issue where multiple worker threads would independently trigger credential fetches. The PR also introduces a suite of unit tests to verify the probe logic, including edge cases like unresolvable URIs and user-supplied filesystems. Feedback was provided to improve the efficiency of the fallback mechanism: if the initial probe fails, the worker should cache the filesystem locally once it is successfully resolved for a URI to avoid redundant resolutions for subsequent items in that worker's iterator.

Comment on lines +245 to +262
if wrapped_fs is None:
# All probe URIs failed — last-resort per-URI resolution.
# Mirrors legacy behavior so a block full of unresolvable
# URIs doesn't lose the ones that might still resolve.
resolved_paths, per_uri_fs = _resolve_paths_and_filesystem(
uri, filesystem=None
)
fs_for_read = RetryingPyFileSystem.wrap(
per_uri_fs,
retryable_errors=data_context.retried_io_errors,
)
else:
# Path normalization only — _resolve_paths_and_filesystem
# short-circuits when filesystem is supplied (no network).
resolved_paths, _ = _resolve_paths_and_filesystem(
uri, filesystem=resolved_fs
)
fs_for_read = wrapped_fs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the fallback case where wrapped_fs is None (because the probe failed for all URIs in the block), the current implementation performs filesystem resolution and creates a new RetryingPyFileSystem wrapper for every single URI in the worker. This is less efficient than the legacy behavior which cached the resolved filesystem for subsequent URIs in the same worker.

Updating the local wrapped_fs and resolved_fs variables within the loop will ensure that once a filesystem is successfully resolved for one URI, it is reused for all remaining URIs in that worker's iterator, truly mirroring the legacy lazy-resolution behavior.

Suggested change
if wrapped_fs is None:
# All probe URIs failed — last-resort per-URI resolution.
# Mirrors legacy behavior so a block full of unresolvable
# URIs doesn't lose the ones that might still resolve.
resolved_paths, per_uri_fs = _resolve_paths_and_filesystem(
uri, filesystem=None
)
fs_for_read = RetryingPyFileSystem.wrap(
per_uri_fs,
retryable_errors=data_context.retried_io_errors,
)
else:
# Path normalization only — _resolve_paths_and_filesystem
# short-circuits when filesystem is supplied (no network).
resolved_paths, _ = _resolve_paths_and_filesystem(
uri, filesystem=resolved_fs
)
fs_for_read = wrapped_fs
if wrapped_fs is None:
# All probe URIs failed — last-resort per-URI resolution.
# Mirrors legacy behavior so a block full of unresolvable
# URIs doesn't lose the ones that might still resolve.
resolved_paths, per_uri_fs = _resolve_paths_and_filesystem(
uri, filesystem=None
)
resolved_fs = per_uri_fs
wrapped_fs = RetryingPyFileSystem.wrap(
per_uri_fs,
retryable_errors=data_context.retried_io_errors,
)
fs_for_read = wrapped_fs
else:
# Path normalization only — _resolve_paths_and_filesystem
# short-circuits when filesystem is supplied (no network).
resolved_paths, _ = _resolve_paths_and_filesystem(
uri, filesystem=resolved_fs
)
fs_for_read = wrapped_fs

@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Apr 24, 2026
Comment thread python/ray/data/tests/unit/test_obstore_download.py Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 8fa637a. Configure here.

Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
xyuzh added a commit to xyuzh/ray that referenced this pull request Apr 24, 2026
Two issues from the CI run on PR ray-project#62898:

1. **Code bug**: when ``_resolve_paths_and_filesystem`` in the worker
   fallback returned ``(_, None)`` (silent-drop case where it returns
   structurally but yields no filesystem), the previous version cached
   ``wrapped_fs = RetryingPyFileSystem.wrap(None, ...)`` and corrupted
   every subsequent URI in that worker. Add a guard: when ``per_uri_fs``
   is None, ``continue`` without caching so the next URI gets a fresh
   inference attempt.

2. **Test bug**: ``test_all_probes_fail_fallback_caches_per_worker``
   used a fake ``_resolve_paths_and_filesystem`` that raised whenever
   ``uri == "probe-fail"``, regardless of caller. But the probe loop
   and the worker fallback both call with the same URI and the same
   ``filesystem=None`` — so the worker's resolution also raised and
   ``inference_calls`` was never populated (CI reported 0 instead of
   16). Differentiate by ``threading.get_ident()``: the probe runs in
   the test's main thread, workers run in spawned threads. Probe-thread
   calls fail; worker-thread calls succeed and append.
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
xyuzh added 6 commits April 28, 2026 09:23
``download_bytes_threaded`` spawns 16 worker threads via ``make_async_gen``.
Each worker called ``_resolve_paths_and_filesystem`` independently on its
first URI, which (when no filesystem is provided) constructs a fresh
``pyarrow.fs.S3FileSystem`` — triggering an IMDS credential fetch. With N
concurrent Download tasks on an EC2 node that was ~16*N near-simultaneous
IMDS HTTP calls, tripping the per-instance rate limit and producing
intermittent ``NoCredentialsError`` in downstream tasks.

Fix: resolve the filesystem exactly once per (block, column) before
``make_async_gen``, then share it across all 16 workers via default-arg
binding on the inner ``load_uri_bytes`` closure. Workers now do zero
filesystem inference — their per-URI calls to ``_resolve_paths_and_filesystem``
short-circuit (path normalization only; no network) because a filesystem
is supplied.

The probe loop:

- Skips ``None`` URIs.
- Catches resolution exceptions and continues to the next URI.
- Requires both ``paths`` and a non-None filesystem before breaking —
  otherwise ``_resolve_paths_and_filesystem`` could silently drop a bad
  URI and leave the workers with no usable filesystem, recreating the
  herd.

If every probe fails, workers retain the legacy per-URI resolution as a
last resort so a block of partially-unresolvable URIs doesn't lose the
ones that might still resolve.

Tests cover: one probe across 16 workers, supplied-FS skips probe, bad
first URI still finds a filesystem, all-bad URIs don't crash, empty
block doesn't spawn workers, and the ``([], fs)`` silent-drop regression.

This is the matching threaded-path half of the IMDS herd story; the
fsspec credential-extraction fix (which triggers the threaded fallback
for unextractable session-backed filesystems) is in a separate PR so
each change can be reviewed independently.
When the block-level probe fails for every URI, workers fall back to
per-URI resolution with filesystem=None. The previous version did not
update the worker-local wrapped_fs / resolved_fs after a successful
fallback resolution, so every URI in that worker re-inferred the
filesystem — recreating the repeated filesystem construction this PR
is trying to avoid (in exactly the all-probes-failed fallback case
bkthe reviewer flagged).

Fix: after the first fallback resolution succeeds, assign the result to
the worker-local ``wrapped_fs`` / ``resolved_fs`` variables (default-arg
bindings are regular function locals, so mutation propagates to the
next iteration of the loop).

Regression test: 32 "probe-fail" URIs across 16 workers. Under the fix,
inference happens exactly 16 times (once per worker) instead of 32.
Also shortens several test names that otherwise generated CI log
filenames over the Linux 255-byte limit (``File name too long`` error).
Test-only cleanup. Same coverage, less boilerplate.

- Added ``_spy_resolve`` helper that wraps ``_resolve_paths_and_filesystem``
  with a call counter, returning ``(patch_cm, probe_calls, normalize_calls)``
  so individual tests don't each redefine the tracking side-effect.
- Parametrized ``test_bad_first_uri_probes_past`` + ``test_probe_skips_empty_paths``
  into a single ``test_probe_loop_skips_unusable`` with two param ids
  (``none-in-list``, ``empty-paths-result``) — both exercise the probe
  loop's rejection of unusable results, just with different input shapes.
- Merged ``test_all_probes_fail_no_crash`` + ``test_fallback_caches_fs_in_worker``
  into ``test_all_probes_fail_fallback_caches_per_worker``: both cover the
  all-probes-failed fallback path, one verifying we don't crash and the
  other verifying the per-worker cache. With 32 probe-failing URIs across
  16 workers, asserting ``len(inference_calls) == 16`` exercises both.

Net: TestThreadedDownloadPreResolve 7 -> 5 tests.
Two issues from the CI run on PR ray-project#62898:

1. **Code bug**: when ``_resolve_paths_and_filesystem`` in the worker
   fallback returned ``(_, None)`` (silent-drop case where it returns
   structurally but yields no filesystem), the previous version cached
   ``wrapped_fs = RetryingPyFileSystem.wrap(None, ...)`` and corrupted
   every subsequent URI in that worker. Add a guard: when ``per_uri_fs``
   is None, ``continue`` without caching so the next URI gets a fresh
   inference attempt.

2. **Test bug**: ``test_all_probes_fail_fallback_caches_per_worker``
   used a fake ``_resolve_paths_and_filesystem`` that raised whenever
   ``uri == "probe-fail"``, regardless of caller. But the probe loop
   and the worker fallback both call with the same URI and the same
   ``filesystem=None`` — so the worker's resolution also raised and
   ``inference_calls`` was never populated (CI reported 0 instead of
   16). Differentiate by ``threading.get_ident()``: the probe runs in
   the test's main thread, workers run in spawned threads. Probe-thread
   calls fail; worker-thread calls succeed and append.
…ilure

Reviewer point: "If the probe already iterated every URI and failed,
why would the worker fallback succeed on the same URIs with the same
filesystem=None?" Correct — it wouldn't.

In deterministic conditions, the per-worker fallback is dead code:
``_resolve_paths_and_filesystem(uri, None)`` either succeeds or fails
the same way regardless of which thread calls it. In transient-failure
conditions (IMDS rate-limited the probe, recovers later), having 16
workers each retry their first URI is exactly the per-worker FS
construction this PR is designed to prevent — partially recreating the
IMDS herd.

Replace the fallback with a short-circuit: if the probe iterates every
URI in the block and resolves none of them, fill the column with None
and skip the worker pool. Yields the same observable result (workers
were going to fail to read all URIs anyway) with strictly fewer
filesystem-construction attempts. Logs a warning at the column level
so the user sees what happened.

Test: ``test_all_probes_fail_fallback_caches_per_worker`` (which had to
use threading.get_ident() to make the probe fail but the worker
succeed for the same URI — a smell that confirmed the fallback was
testing nothing real) is replaced with
``test_all_probes_fail_yields_none``: assert exactly N probe calls,
zero worker resolution calls, every URI yields None.
@xyuzh xyuzh force-pushed the fix/download-threaded-pre-resolve branch from 70c22bc to 5e82cc6 Compare April 28, 2026 16:23
@xyuzh
Copy link
Copy Markdown
Member Author

xyuzh commented May 5, 2026

@goutamvenkat-anyscale

@xyuzh xyuzh added the go add ONLY when ready to merge, run all tests label May 5, 2026
@xyuzh xyuzh requested a review from bveeramani May 11, 2026 22:12
When a user passes an fsspec filesystem (e.g. s3fs) to Download, the
pre-resolve path stored it directly in resolved_fs and handed it to
RetryingPyFileSystem.wrap. The retrying handler forwards open_input_stream
to the inner FS, but fsspec filesystems don't expose that method, so
reading any URI failed with AttributeError. Normalize through
_validate_and_wrap_filesystem so fsspec inputs become
PyFileSystem(FSSpecHandler(fs)) before wrapping.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label May 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests stale The issue is stale. It will be closed within 7 days unless there are further conversation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants