[data] DataSourceV2: enable V2 by default#63326
Conversation
There was a problem hiding this comment.
Code Review
This pull request enables DataSourceV2 by default and ports the nested-type fallback (ARROW-5030) to the V2 Parquet reader. It refactors predicate handling to use Ray Expr objects until the scanner boundary, allowing for better introspection. Key changes include the implementation of _iter_fragment_tables in ParquetFileReader to handle large nested columns, schema evolution support via null-filling, and per-fragment retry logic in FileReader. Feedback focuses on robustly handling schema evolution in the Parquet fallback path, specifically ensuring all unified schema columns are tracked, filtering missing columns before calling iter_batches, and selecting correct columns before casting to avoid mismatches. A typo in a static analysis comment was also noted.
## Why
Parquet files with nested columns (e.g. `list<struct<..., string>>`)
whose row groups exceed Arrow's ~2 GB chunking threshold hit
`ArrowNotImplementedError` at decode time (ARROW-5030). V1 already has a
metadata-only fallback that detects this and switches to
`pq.ParquetFile.iter_batches`. This PR ports it to V2 and makes the
decision filter-aware.
## What
**Port V1's nested-type fallback to V2.** `FileReader` grows an
`_iter_fragment_tables` hook; `ParquetFileReader` overrides it with V1's
`_needs_nested_type_fallback` metadata check, falling back to
`pq.ParquetFile.iter_batches` (with safe batch sizing, row-group
pushdown via `fragment.subset`, and per-batch row-level filtering) when
the check fires.
**Make the fallback decision filter-aware.** Previously the check looked
only at projected columns. A filter that touches a large nested column
*outside* the projection would still force the scanner to decode it for
row-level evaluation — and hit ARROW-5030. The check now sees the union
of projected + filter-referenced columns:
```python
ds.read_parquet(path).select_columns(["id"]).filter(col("nested_col").is_not_null())
# ^^^^ projection excludes nested_col
# ^^^^ but filter references it
# → fallback must trigger
```
**Carry the predicate as a Ray `Expr` instead of a pyarrow expression.**
`pyarrow.compute.Expression` is opaque (no public visitor), so we can't
extract filter columns from it after the fact. Keeping the Ray `Expr` as
the source of truth — and converting to pyarrow once, at the
scanner-kwargs boundary — lets the reader call `get_column_references`
for the union above. Touches `ArrowFileScanner.predicate`,
`FileReader.predicate`, and `push_filters` (now ANDs Ray `Expr`s).
**Drop the legacy `filter=` kwarg on V2.**
`read_parquet(filter=pc.field("x") > 5)` is already deprecated. Since it
carries a raw pyarrow expression that can't be introspected, it's
silently stripped on the V2 path. Callers should use
`read_parquet(path).filter(expr=...)`.
## Tests
- `test_read_parquet_nested_type_arrow_not_implemented_fallback` — V2
skip removed (regression for
[#61675](#61675)).
-
`test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column`
— new, V2-only. Projects a flat column and filters on the large nested
column; asserts the fallback is invoked.
Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/ray-project/ray/pull/63175).
* #63326
* __->__ #63175
Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Flip ``DEFAULT_USE_DATASOURCE_V2`` to ``True`` so ``ray.data.read_parquet`` routes through the V2 path unless callers opt out. Lands last so the production cut-over is isolated for an easy revert. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
a01a949 to
6cc5e89
Compare
Signed-off-by: goutam <goutam@anyscale.com>
Signed-off-by: goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 6657b2d. Configure here.
|
Are we actually ready to flip this to default? Should we let it bake for a while before flipping, and raise a tip/warning to mention users to flip this on themselves?
|
-3 Actually nvm. Seeing no regression on the latest run: https://console.anyscale-staging.com/cld_kvedZWag2qA8i5BjxUevf5i7/prj_92c7b71w55flm6gv6imv4m6vqg/jobs/prodjob_eb3ym8edqcj6h76r1llrr47ptv/data?job-logs-section-tabs=application_logs&job-tab=ray-turbo-dashboard |
|
No perf regression is good, but API changes is more scary. Can we soft deprecate the API changes instead of hard break? |
…63175) ## Why Parquet files with nested columns (e.g. `list<struct<..., string>>`) whose row groups exceed Arrow's ~2 GB chunking threshold hit `ArrowNotImplementedError` at decode time (ARROW-5030). V1 already has a metadata-only fallback that detects this and switches to `pq.ParquetFile.iter_batches`. This PR ports it to V2 and makes the decision filter-aware. ## What **Port V1's nested-type fallback to V2.** `FileReader` grows an `_iter_fragment_tables` hook; `ParquetFileReader` overrides it with V1's `_needs_nested_type_fallback` metadata check, falling back to `pq.ParquetFile.iter_batches` (with safe batch sizing, row-group pushdown via `fragment.subset`, and per-batch row-level filtering) when the check fires. **Make the fallback decision filter-aware.** Previously the check looked only at projected columns. A filter that touches a large nested column *outside* the projection would still force the scanner to decode it for row-level evaluation — and hit ARROW-5030. The check now sees the union of projected + filter-referenced columns: ```python ds.read_parquet(path).select_columns(["id"]).filter(col("nested_col").is_not_null()) # ^^^^ projection excludes nested_col # ^^^^ but filter references it # → fallback must trigger ``` **Carry the predicate as a Ray `Expr` instead of a pyarrow expression.** `pyarrow.compute.Expression` is opaque (no public visitor), so we can't extract filter columns from it after the fact. Keeping the Ray `Expr` as the source of truth — and converting to pyarrow once, at the scanner-kwargs boundary — lets the reader call `get_column_references` for the union above. Touches `ArrowFileScanner.predicate`, `FileReader.predicate`, and `push_filters` (now ANDs Ray `Expr`s). **Drop the legacy `filter=` kwarg on V2.** `read_parquet(filter=pc.field("x") > 5)` is already deprecated. Since it carries a raw pyarrow expression that can't be introspected, it's silently stripped on the V2 path. Callers should use `read_parquet(path).filter(expr=...)`. ## Tests - `test_read_parquet_nested_type_arrow_not_implemented_fallback` — V2 skip removed (regression for [ray-project#61675](ray-project#61675)). - `test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column` — new, V2-only. Projects a flat column and filters on the large nested column; asserts the fallback is invoked. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/ray-project/ray/pull/63175). * ray-project#63326 * __->__ ray-project#63175 Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
So filter and columns are already deprecated from before. |
| produce identical block sequences. The ``preserve_ordering=True`` | ||
| path drains the input iterator eagerly; for our use that's | ||
| bounded by the manifest size (already materialized upstream | ||
| in the listing op) so the up-front cost is ``O(num_fragments)`` | ||
| Fragment-object constructions — no per-fragment I/O. |
There was a problem hiding this comment.
Hmm it's not clear to me what the relationship is between preserve_ordering=True, bounding something by manifest size, and fragment-object constructions?
There was a problem hiding this comment.
Cleaned up comment
| # This is required so that Ray Data task retries (block reconstruction) | ||
| # produce identical block sequences. |
There was a problem hiding this comment.
Why is identical block sequences required? Like what happens when preserve_order=False?
|
btw, what did u have to do to get the read performance up? |
Signed-off-by: Goutam <goutam@anyscale.com>
Bumped up |
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…ons (#63466) ## Summary V2's ``ParquetFileReader._arrow_scanner_kwargs`` was constructing ``pds.ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True, buffer_size=8 MiB)``. With ``pre_buffer=False``, pyarrow opens a per-column buffered stream and issues range requests lazily. On wide-schema Parquet (thousands of small columns) that turns one fragment read into thousands of S3 GETs. The ``wide_schema_pipeline_primitives`` release test regressed from ~16s on V1 to ~154s after V2 became the default reader (#63326). Other release tests on wide schemas (``wide_schema_pipeline_{tensors,objects,nested_structs}`` and ``write_parquet``, which is read-then-write) are affected by the same mechanism. ## Evidence ``ds.stats()`` from a regressing run on ``s3://ray-benchmark-data-internal-us-west-2/wide_schema/primitives`` (27 files × 22 MB × 5000 columns): ``` Operator 2 ReadFilesParquetV2: 27 tasks executed, 40 blocks produced in 143.19s Remote wall time: 61.73ms min, 141.62s max, 88.67s mean Remote cpu time: 62.07ms min, 4.76s max, 2.96s mean Peak heap memory: 448.82 MiB min, 612.21 MiB max ``` 96% of per-task wall time is blocked, CPU is negligible — the classic many-small-I/O signature, not memory pressure or per-task setup cost. ## Why this only affects V2 - **V1** ``ParquetDatasource`` constructs ``pds.ParquetFragmentScanOptions(use_buffered_stream=True, buffer_size=8 MiB)`` without setting ``pre_buffer`` → pyarrow default ``True`` → one coalesced range request per fragment. Fast. - **Anyscale internal Turbo** ``ParquetReader`` does the same — ``pre_buffer`` left at default. No regression. - **V2 (this PR fixes)** explicitly set ``pre_buffer=False``. On narrow schemas the difference is invisible (few large columns either way). On wide schemas it's catastrophic. The cross-fragment memory accumulation that originally motivated disabling ``pre_buffer`` (apache/arrow#39808) is already addressed by V2's per-fragment scanner construction — so disabling ``pre_buffer`` was load-bearing on a problem that no longer needs that fix. ## Change Drop the ``pre_buffer=False`` argument. Update the comment to record why we rely on the default. No other behavioral change. ## Test plan - [x] pre-commit clean on the modified file - [x] ``python/ray/data/_internal/datasource_v2/tests/test_parquet_datasource_v2.py`` — 11/11 pass - [x] ``python/ray/data/tests/datasource/test_parquet.py`` — 267 passed / 6 skipped Release tests: https://buildkite.com/ray-project/release/builds/93384#_ 🤖 Generated with [Claude Code](https://claude.com/claude-code) Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Why
Turn on DataSource V2 by default for
ray.data.read_parquet(and related reads that go through the same path), so the stack that landed in prior PRs is what users get unless they opt out viaDataContext.get_current().use_datasource_v2 = False. Keeping the default flip in a dedicated change makes revert/revert-cherry-pick straightforward.What
Default:
DEFAULT_USE_DATASOURCE_V2 = Trueinpython/ray/data/context.py.Logical plan:
ReadFilesno longer uses a bespokeInitVar+input_oppattern. It now takesinput_dependencieslike other operators, which removes the custom_apply_transform/input_dependencyplumbing and simplifies everyreplace(...)that used to threadinput_opthrough (read_operator.py,read_api.py,limit_pushdown.py, and unit tests). Aper_block_limitfield is declared onReadFiles(alwaysNonehere) so inheritedAbstractMapmachinery can resolve it; V2 limits still go throughscanner.push_limitinLimitPushdownRule. The read stats metadata key is aligned with the operator name ("ReadFiles").Tests and docs for V2-by-default: Several tests still assumed V1 behavior or patterns that V2 does not support the same way. Updates are grouped below. The
read_parquetdocstring example uses chained.filter(expr=...)instead of deprecatedfilter=.Test changes (by reason)
V2 default + deprecated read API
test_parquet.py—test_parquet_read_partitioned_with_filter: stopread_parquet(..., filter=pds.field(...)); useread_parquet(...).filter(expr=col(...) == lit(...)).test_parquet.py—test_count_with_filter: same (.filter(expr=col(...) < lit(...))).test_predicate_pushdown.py—test_filter_pushdown_source_and_op: removeread_parquet(..., filter=pc.greater(...)); chain.filter(expr=col("sepal.length") > lit(5.0))before the string-expression filter; drop unusedpyarrow.computeimport.include_row_hash+ column projection (V2 ordering)test_parquet.py—test_include_row_hash_with_column_projectionandtest_include_row_hash_existing_column_with_projection: replaceread_parquet(..., columns=[...], include_row_hash=True)withread_parquet(..., include_row_hash=True).select_columns([..., "row_hash"])sorow_hashremains in the schema after projection.Empty input path (V2 fails fast)
test_streaming_executor.py—test_execution_callbacks_executor_arg: write a one-row Parquet file under the input directory aftermakedirs. V2 raises "no files found" for an empty directory; V1 could return an empty dataset.Nested ARROW-5030 fallback assertion (worker vs driver)
test_parquet.py—test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column: removepatch+assert_called()on_get_safe_batch_size_for_nested_types(called in a Ray worker, so a driver-side patch never fires). Assert with_resolve_read_columns(...)and_needs_nested_type_fallback(fragment, read_columns)on a real fragment from the fixture.Logical
ReadFilesconstructor / graph shapetest_read_files_logical.py: constructReadFileswithinput_dependencies=[list_files_op]instead ofinput_op=list_files_op.test_read_parquet_v2.py—test_read_parquet_builds_list_files_read_files_chain: assertdag.input_dependencies[0]isListFilesinstead ofdag.input_dependency.Performance tuning for the V2 reader
Several knobs in the V2 Parquet read path defaulted to conservative values that left throughput on the table for typical S3+Parquet workloads — the
read_large_parquet_*release benchmark in particular saw underutilized CPU traceable to a mix of sequential fragment reads, small per-fragment batch readahead, and a tiny per-stream buffer. Bumped each default and gated every change behind an env var so the prior behavior is recoverable for memory-sensitive workloads (e.g. wide tensor columns).Concurrent fragment reads (
file_reader.py)RAY_DATA_READ_FILES_NUM_THREADSenv var, default4. When> 1,_read_fragment_batchesreads fragments concurrently viamake_async_geninstead of the previous serial loop.min(env_default, len(fragments))so single-fragment tasks don't spin up extra threads.DataContext.execution_options.preserve_orderis set.preserve_ordering=Trueso block ordering is deterministic across task retries (required for safe block reconstruction)._read_fragments_sequentialso the threaded and sequential paths share the same fragment-read body.Batch readahead (
file_reader.py)_ARROW_SCANNER_BATCH_READAHEADis nowenv_integer("RAY_DATA_ARROW_SCANNER_BATCH_READAHEAD", 8)(was hardcoded1). The previous value was set to bound peak memory on jumbo tensor columns; 8 keeps decode pipelined for non-tensor workloads while remaining tunable down via env var.Parquet fragment buffer size (
parquet_file_reader.py)RAY_DATA_PARQUET_FRAGMENT_BUFFER_SIZEenv var, default8 MiB. Passed asParquetFragmentScanOptions.buffer_sizesouse_buffered_stream=Trueissues meaningfully sized range requests against S3 instead of pyarrow's small default (~8 KiB).Testing
Ran the updated Parquet, predicate pushdown, streaming executor, and V2 read unit tests in
.venv(including nested fallback and row-hash cases touched above). Performance defaults are exercised by the existing V2 read tests; release benchmarks (read_large_parquet_fixed_size/_autoscaling) verify the wall-clock improvement on the full S3 dataset.