Skip to content

[data] DataSourceV2: V2 ARROW-5030 nested-type fallback#63175

Merged
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63175
May 13, 2026
Merged

[data] DataSourceV2: V2 ARROW-5030 nested-type fallback#63175
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63175

Conversation

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale commented May 7, 2026

Why

Parquet files with nested columns (e.g. list<struct<..., string>>) whose row groups exceed Arrow's ~2 GB chunking threshold hit ArrowNotImplementedError at decode time (ARROW-5030). V1 already has a metadata-only fallback that detects this and switches to pq.ParquetFile.iter_batches. This PR ports it to V2 and makes the decision filter-aware.

What

Port V1's nested-type fallback to V2. FileReader grows an _iter_fragment_tables hook; ParquetFileReader overrides it with V1's _needs_nested_type_fallback metadata check, falling back to pq.ParquetFile.iter_batches (with safe batch sizing, row-group pushdown via fragment.subset, and per-batch row-level filtering) when the check fires.

Make the fallback decision filter-aware. Previously the check looked only at projected columns. A filter that touches a large nested column outside the projection would still force the scanner to decode it for row-level evaluation — and hit ARROW-5030. The check now sees the union of projected + filter-referenced columns:

ds.read_parquet(path).select_columns(["id"]).filter(col("nested_col").is_not_null())
#                    ^^^^ projection excludes nested_col
#                                              ^^^^ but filter references it
# → fallback must trigger

Carry the predicate as a Ray Expr instead of a pyarrow expression. pyarrow.compute.Expression is opaque (no public visitor), so we can't extract filter columns from it after the fact. Keeping the Ray Expr as the source of truth — and converting to pyarrow once, at the scanner-kwargs boundary — lets the reader call get_column_references for the union above. Touches ArrowFileScanner.predicate, FileReader.predicate, and push_filters (now ANDs Ray Exprs).

Drop the legacy filter= kwarg on V2. read_parquet(filter=pc.field("x") > 5) is already deprecated. Since it carries a raw pyarrow expression that can't be introspected, it's silently stripped on the V2 path. Callers should use read_parquet(path).filter(expr=...).

Tests

  • test_read_parquet_nested_type_arrow_not_implemented_fallback — V2 skip removed (regression for #61675).
  • test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column — new, V2-only. Projects a flat column and filters on the large nested column; asserts the fallback is invoked.

Signed-off-by: Goutam goutam@anyscale.com
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com


Stack created with Sapling. Best reviewed with ReviewStack.

@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner May 7, 2026 00:39
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the FileReader to utilize per-fragment scanners instead of a single aggregate scanner, which prevents issues with cross-fragment casts for variable-shape tensor extensions. It also ports the nested-type fallback for Parquet files (addressing ARROW-5030) to the DataSourceV2 path and includes version-specific handling for pyarrow readahead parameters. The review feedback identifies a critical issue where the current retry logic could lead to duplicate data if a transient error occurs during fragment iteration. Additionally, suggestions were made to optimize logging keys to prevent log flooding and to document performance trade-offs when using the fallback reader with filters.

Comment thread python/ray/data/_internal/datasource_v2/readers/file_reader.py Outdated
Comment thread python/ray/data/_internal/datasource_v2/readers/file_reader.py Outdated
Comment thread python/ray/data/_internal/datasource_v2/readers/parquet_file_reader.py Outdated
Comment thread python/ray/data/_internal/datasource_v2/readers/file_reader.py Outdated
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 7, 2026
@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label May 7, 2026
@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63175 branch 2 times, most recently from db3fce8 to 9638cec Compare May 8, 2026 17:47
Comment thread python/ray/data/_internal/datasource_v2/readers/parquet_file_reader.py Outdated
@abhishekverma-ray
Copy link
Copy Markdown

The refactor is structurally sensible (per-fragment scanners with a swappable _iter_fragment_tables hook) and delivers the headline fix. The main concerns are incomplete V1 parity in three places that materially change behavior — filter columns are ignored when deciding to enter the fallback (#1), caller schema alignment is dropped on the fallback path (#2), and filtered fallback reads materialize the entire file (#3) — together with no new tests for filter / limit / schema / partitioning interactions. The commit advertises itself as a 1:1 port, but the differences from V1 are non-trivial and worth catching before merge.

@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63175 branch 3 times, most recently from 5779966 to ce7a3e1 Compare May 11, 2026 22:30
Comment thread python/ray/data/_internal/datasource_v2/readers/parquet_file_reader.py Outdated
@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63175 branch 2 times, most recently from d456a1b to 02cdef8 Compare May 11, 2026 22:55
Comment thread python/ray/data/_internal/datasource_v2/readers/file_reader.py Outdated
Comment thread python/ray/data/read_api.py Outdated
@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63175 branch 2 times, most recently from a9b419f to 3a7a85a Compare May 12, 2026 21:19
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 3a7a85a. Configure here.

Comment thread python/ray/data/_internal/logical/operators/read_operator.py Outdated
@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63175 branch 2 times, most recently from 63b30ac to 52ec75e Compare May 12, 2026 23:07
## Why

Parquet files with nested columns (e.g. `list<struct<..., string>>`) whose row groups exceed Arrow's ~2 GB chunking threshold hit `ArrowNotImplementedError` at decode time (ARROW-5030). V1 already has a metadata-only fallback that detects this and switches to `pq.ParquetFile.iter_batches`. This PR ports it to V2 and makes the decision filter-aware.

## What

**Port V1's nested-type fallback to V2.** `FileReader` grows an `_iter_fragment_tables` hook; `ParquetFileReader` overrides it with V1's `_needs_nested_type_fallback` metadata check, falling back to `pq.ParquetFile.iter_batches` (with safe batch sizing, row-group pushdown via `fragment.subset`, and per-batch row-level filtering) when the check fires.

**Make the fallback decision filter-aware.** Previously the check looked only at projected columns. A filter that touches a large nested column *outside* the projection would still force the scanner to decode it for row-level evaluation — and hit ARROW-5030. The check now sees the union of projected + filter-referenced columns:

```python
ds.read_parquet(path).select_columns(["id"]).filter(col("nested_col").is_not_null())
#                    ^^^^ projection excludes nested_col
#                                              ^^^^ but filter references it
# → fallback must trigger
```

**Carry the predicate as a Ray `Expr` instead of a pyarrow expression.** `pyarrow.compute.Expression` is opaque (no public visitor), so we can't extract filter columns from it after the fact. Keeping the Ray `Expr` as the source of truth — and converting to pyarrow once, at the scanner-kwargs boundary — lets the reader call `get_column_references` for the union above. Touches `ArrowFileScanner.predicate`, `FileReader.predicate`, and `push_filters` (now ANDs Ray `Expr`s).

**Drop the legacy `filter=` kwarg on V2.** `read_parquet(filter=pc.field("x") > 5)` is already deprecated. Since it carries a raw pyarrow expression that can't be introspected, it's silently stripped on the V2 path. Callers should use `read_parquet(path).filter(expr=...)`.

## Tests

- `test_read_parquet_nested_type_arrow_not_implemented_fallback` — V2 skip removed (regression for [ray-project#61675](ray-project#61675)).
- `test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column` — new, V2-only. Projects a flat column and filters on the large nested column; asserts the fallback is invoked.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
get_column_references,
)

columns = scanner_kwargs.get("columns")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also pc.Expression?

yield from super()._iter_fragment_tables(fragment, scanner_kwargs)
return

if log_once(f"parquet_nested_fallback_v2:{fragment.path}"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be noisy?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea left this on purpose so that the user knows the file for which we used the fallback path


# Scope the safe batch-size calculation to the columns actually being
# decoded so we don't shrink batches based on columns we won't read.
leaf_indices = (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name for leaf_indices is unclear to me?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, for struct cols the leaf indices refers to the key in the struct.

If I had address: struct<street: string, zip: int32>

then the leaves would be address.street, address.zip etc.

# in that case and let the per-batch filter (post null-fill) do
# all the row-dropping.
fragment_physical_columns = set(fragment.physical_schema.names)
filter_touches_missing_column = filter_columns is not None and any(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about inverting this like "filter_columns_all_present_in_schema" or smth like that and then below u do `if filter_Expr is not None and "filter_columns_all_present_in_schema"?

# hit ARROW-5030 in the normal scanner path.
filter_columns = (
get_column_references(self._predicate)
if self._predicate is not None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these if is None checks. I think it's fine when u have a couple, but i think it might be easier to read if separated out like this when u have many

filter_columns=None
if not None
   filter_columns = ..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do as a follow up clean up change.

@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 1fd5697 into ray-project:master May 13, 2026
6 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the pr63175 branch May 13, 2026 22:15
am-kinetica pushed a commit to kineticadb/ray that referenced this pull request May 14, 2026
…63175)

## Why

Parquet files with nested columns (e.g. `list<struct<..., string>>`)
whose row groups exceed Arrow's ~2 GB chunking threshold hit
`ArrowNotImplementedError` at decode time (ARROW-5030). V1 already has a
metadata-only fallback that detects this and switches to
`pq.ParquetFile.iter_batches`. This PR ports it to V2 and makes the
decision filter-aware.

## What

**Port V1's nested-type fallback to V2.** `FileReader` grows an
`_iter_fragment_tables` hook; `ParquetFileReader` overrides it with V1's
`_needs_nested_type_fallback` metadata check, falling back to
`pq.ParquetFile.iter_batches` (with safe batch sizing, row-group
pushdown via `fragment.subset`, and per-batch row-level filtering) when
the check fires.

**Make the fallback decision filter-aware.** Previously the check looked
only at projected columns. A filter that touches a large nested column
*outside* the projection would still force the scanner to decode it for
row-level evaluation — and hit ARROW-5030. The check now sees the union
of projected + filter-referenced columns:

```python
ds.read_parquet(path).select_columns(["id"]).filter(col("nested_col").is_not_null())
#                    ^^^^ projection excludes nested_col
#                                              ^^^^ but filter references it
# → fallback must trigger
```

**Carry the predicate as a Ray `Expr` instead of a pyarrow expression.**
`pyarrow.compute.Expression` is opaque (no public visitor), so we can't
extract filter columns from it after the fact. Keeping the Ray `Expr` as
the source of truth — and converting to pyarrow once, at the
scanner-kwargs boundary — lets the reader call `get_column_references`
for the union above. Touches `ArrowFileScanner.predicate`,
`FileReader.predicate`, and `push_filters` (now ANDs Ray `Expr`s).

**Drop the legacy `filter=` kwarg on V2.**
`read_parquet(filter=pc.field("x") > 5)` is already deprecated. Since it
carries a raw pyarrow expression that can't be introspected, it's
silently stripped on the V2 path. Callers should use
`read_parquet(path).filter(expr=...)`.

## Tests

- `test_read_parquet_nested_type_arrow_not_implemented_fallback` — V2
skip removed (regression for
[ray-project#61675](ray-project#61675)).
-
`test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column`
— new, V2-only. Projects a flat column and filters on the large nested
column; asserts the fallback is invoked.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/ray-project/ray/pull/63175).
* ray-project#63326
* __->__ ray-project#63175

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants