[data] Detect non-contiguous keys in read_webdataset#63407
Conversation
`_group_by_keys` flushes the in-progress sample whenever the tar entry's sample key changes. That works only if entries sharing a sample key are adjacent in the tar — the invariant the WebDataset spec requires. If a key reappears after its run has ended (e.g. `001.jpg, 002.jpg, 001.txt`), the function silently produces fragmented partial samples instead of the intended grouped sample, and the user has no signal that their data was silently misread. This change tracks the keys that have already been flushed; when the parser sees a known key start a fresh run, it raises a `ValueError` naming the offending key and the tar URL, with concrete fix instructions (`tar --sort=name` / pre-sort with `find | sort | tar -T -`). The existing same-run duplicate-suffix check is unchanged. Closes ray-project#44068 Signed-off-by: lonexreb <reach2shubhankar@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request updates the WebDataset datasource to enforce the requirement that entries sharing a sample key must be contiguous within the tar archive. It introduces a seen_keys set to track processed keys and raises a ValueError with actionable advice if a non-contiguous key is encountered, preventing silent data loss. Corresponding regression tests have been added to verify this behavior. The review feedback identifies a potential robustness issue where a consumer could modify the yielded dictionary, leading to a KeyError, and suggests capturing the key in a local variable to prevent this.
| if current_sample is not None: | ||
| if _valid_sample(current_sample): | ||
| current_sample.update(meta) | ||
| yield current_sample | ||
| seen_keys.add(current_sample["__key__"]) |
There was a problem hiding this comment.
To ensure robustness, it is recommended to extract the key into a local variable before yielding the sample. Since current_sample is a dictionary yielded to the consumer (which may include user-provided decoders), the consumer could potentially modify or clear the dictionary. If the __key__ field is removed, the subsequent call to seen_keys.add(current_sample["__key__"]) would raise a KeyError when the generator resumes.
| if current_sample is not None: | |
| if _valid_sample(current_sample): | |
| current_sample.update(meta) | |
| yield current_sample | |
| seen_keys.add(current_sample["__key__"]) | |
| if current_sample is not None: | |
| last_key = current_sample["__key__"] | |
| if _valid_sample(current_sample): | |
| current_sample.update(meta) | |
| yield current_sample | |
| seen_keys.add(last_key) |
There was a problem hiding this comment.
Good catch — applied in f3fc228. Captured last_key = current_sample["__key__"] before the yield exactly as suggested, and added test_webdataset_consumer_mutates_yielded_sample which clears the dict between the first and second next(gen) call to lock the contract in.
Address gemini-code-assist review on ray-project#63407: `_group_by_keys` yields `current_sample` to the consumer (which may include user-supplied decoders) and then on resume reads `current_sample["__key__"]` to update `seen_keys`. If the consumer mutates or clears the dict between the yield and the resume, the bookkeeping `KeyError`s. Capture the key in a local variable before the yield. Add a focused regression test that wipes the yielded dict and verifies the generator continues. Signed-off-by: lonexreb <reach2shubhankar@gmail.com>
Description
ray.data.read_webdatasetsilently produces fragmented samples when a tar archive's entries are not sorted by sample key. Per the WebDataset specification, entries sharing a sample key (the base name before the first.) must be contiguous in the tar — Ray's_group_by_keysreader relies on that invariant: it flushes the in-progress sample whenever the key changes.If a key reappears in a later run (e.g.
001.jpg, 002.jpg, 001.txt), the current code:{001: jpg}when it sees002.jpg{002: jpg}when it sees001.txt{001: txt}at EOFThe user sees three partial samples instead of two complete ones, with no warning and no error. There's a duplicate-suffix check, but it only fires within a single contiguous run, so it doesn't catch this case.
Fix
In
_group_by_keys, track the set of sample keys that have already been flushed. When the parser sees a known key start a fresh run, raise aValueErrorthat:__url__.tar --sort=name ...orfind ... | sort | tar -T - ....The existing same-run duplicate-suffix check is unchanged. No detection logic relies on buffering or sorting the full tar, so streaming behavior is preserved.
Related issues
Closes #44068
Additional information
Test plan
Three tests in
test_webdataset.py:test_webdataset_non_contiguous_keys_raises— feeds_group_by_keysan iterator where key001reappears non-contiguously; assertsValueErrorwith"not contiguous".test_webdataset_contiguous_keys_no_false_positive— feeds a well-formed sequence; asserts the parser still emits the two expected sample keys, so the new detection logic does not regress the happy path.test_webdataset_duplicate_suffix_within_run_still_raises— locked-in via the existing focused-test runner; the pre-existing duplicate-suffix-within-a-run error path is preserved.All three pass locally against a clean Ray install:
ruff checkis clean on both modified files (the only outstandingruff formatdiff in the repo is on pre-existing code I didn't touch).