[data] fix parquet scanner memory accumulation (apache/arrow#39808)#62745
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to pass format-specific scanner options to the underlying PyArrow dataset scanner. In the Parquet reader, pre-buffering is disabled and lazy caching is enabled to reduce peak memory consumption, albeit with a potential impact on read throughput. Feedback suggests making these scanner configurations adjustable via environment variables or the DataContext to allow users to optimize for their specific performance and memory requirements.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to inject format-specific scanner arguments into the FileReader class via a new _arrow_scanner_kwargs method. The ParquetFileReader implementation utilizes this to optimize memory usage by disabling pre-buffering and configuring a buffered stream with lazy caching, effectively capping peak memory near one row group. Additionally, override decorators were added to several methods in ParquetFileReader for improved clarity and type safety. I have no feedback to provide.
PyArrow's Dataset.scanner defaults to pre_buffer=True, which holds a whole fragment's worth of decoded column chunks resident before yielding batches. Peak memory ends up scaling with fragment size rather than batch size — the behavior reported in apache/arrow#39808. Inject ParquetFragmentScanOptions with pre_buffer=False, use_buffered_stream=True, and CacheOptions(lazy=True, prefetch_limit=0) via a new _scanner_kwargs() hook on FileReader overridden in ParquetFileReader. CSV/JSON/feather paths are unchanged. Trades ~2x S3 read throughput for ~10-40x lower peak memory. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
The previous commit disabled pre_buffer to stop total_allocated_bytes from climbing monotonically across batches, but also gave up pipelining between S3 I/O and decode — making S3 reads ~2x slower. Add a small readahead pipeline (2 batches, 1 fragment) on top of the tuned fragment scan options. Peak memory still plateaus early and stays flat for the rest of the scan (leak pattern eliminated), and throughput matches V1's fragment.to_batches path (~9s on the 497 MB S3 file). Make the scanner_kwargs hook on FileReader override batch_readahead instead of conflicting with it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
e28b601 to
c3c916d
Compare
Signed-off-by: Goutam <goutam@anyscale.com>
batch_readahead=2 peaked at 116 MB on the common_voice audio fixture; batch_readahead=1 peaks at 76 MB (~35% less) with identical throughput (n=3 S3 runs, medians within noise). Since FileReader base kwargs already set batch_readahead=1, drop the override entirely. Signed-off-by: Goutam <goutam@anyscale.com>
671d90d to
b2690ba
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit b2690ba. Configure here.
…ay-project#62745) ## Summary Tests whether [apache/arrow#39808](apache/arrow#39808) ("Dataset.to_batches accumulates memory usage and leaks") reproduces in Ray Data V2's parquet read path, and ships a fix. **Finding**: V2's production read path (`dataset.scanner(...).scan_batches()`) reproduces the reported accumulation pattern — `pa.total_allocated_bytes()` climbs monotonically through the scan, peaking near full fragment size. V1's `fragment.to_batches` path has a similarly bad peak (different shape). **Fix**: inject `ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)` plus `fragment_readahead=1` via a new `_arrow_scanner_kwargs()` hook on `FileReader`, overridden by `ParquetFileReader`. `batch_readahead` stays at the `FileReader` base default of `1`. CSV/JSON/feather unchanged. ## Test Adapts the [progression test from the Arrow issue](apache/arrow#39808 (comment)): record every time `pa.total_allocated_bytes()` hits a new maximum during a scan. Monotonic growth through the scan is the pathological signature. The progression was run through **the exact production iteration path** (`dataset.scanner(...).scan_batches()`, reconstructing `pa.Table.from_batches([tagged.record_batch])` per batch — same as `FileReader._read_batches`). The `dataset.to_batches()` rows from the arrow issue are included for comparison; they understate peak by ~13%, likely because `TaggedRecordBatch` holds a fragment reference that delays buffer release. Public S3 file (`s3://ray-example-data/common_voice_17/parquet/5.parquet`, 497 MB, 113 row groups of audio blobs), pyarrow 23.0.0, `batch_size=1000`. Script at the bottom. | path | mode | new-max events | peak alloc | signature | |---|---|---:|---:|---| | `scanner().scan_batches()` (prod) | **scanner_default** (current) | 24 | 571.9 MB | **Monotonic growth through batch 95/113** — reproduces arrow#39808 | | `scanner().scan_batches()` (prod) | **scanner_tuned** (this PR) | **2** | **75.3 MB** | One-shot climb, peak at batch 102, no accumulation | | `dataset.to_batches()` (arrow issue) | default | 40 | 506.6 MB | climbing through batch 112 | | `dataset.to_batches()` (arrow issue) | tuned_ra2 | 3 | 101.9 MB | plateau at batch 46 | | `fragment.to_batches()` (V1) | fragment | 4 | 915.4 MB | fast spike to ~2× file size by batch 28, then flat | | `ParquetFile.iter_batches()` (ref) | pqfile | 3 | 59.2 MB | saturates at batch 4, flat | Sample progressions through the production path: **scanner_default — accumulation pattern** ``` batch 1: new max = 125.1 MB batch 21: new max = 224.7 MB batch 49: new max = 359.4 MB batch 70: new max = 430.6 MB batch 91: new max = 533.0 MB batch 95: new max = 571.9 MB ← still climbing at end of scan Final (after del): 4.4 MB, peak=571.9 MB ``` **scanner_tuned — pattern eliminated** ``` batch 1: new max = 67.1 MB batch 102: new max = 75.3 MB ← peak (no new max for remaining 11 batches) Final (after del): 4.4 MB, peak=75.3 MB ``` The ~67 MB initial alloc in `scanner_tuned` is the one-time readahead pipeline fill (1 batch + 1 fragment), not accumulation. ## Throughput (same file, 3 runs per mode, reporting min / median / max elapsed) | path | mode | min | median | max | |---|---|---:|---:|---:| | `scanner().scan_batches()` (prod) | **scanner_default** (current) | 8.24 s | 9.79 s | 19.38 s | | `scanner().scan_batches()` (prod) | **scanner_tuned** (this PR) | 9.16 s | 10.04 s | 11.29 s | | `dataset.to_batches()` | default | 9.32 s | 10.31 s | 13.34 s | | `dataset.to_batches()` | tuned_ra2 | 9.67 s | 9.80 s | 11.60 s | | `fragment.to_batches()` (V1) | fragment | 8.54 s | 9.70 s | 10.17 s | | `ParquetFile.iter_batches()` (ref) | pqfile | 24.06 s | 24.35 s | 27.11 s | Within noise, `scanner_tuned` matches `scanner_default`, V1 `fragment`, and `tuned_ra2` — the fix doesn't regress throughput. The tighter max for `scanner_tuned` (11.29 s vs `scanner_default`'s 19.38 s) hints at lower tail-latency variance, but n=3 is too small to claim that firmly. ## Caveats - `pa.total_allocated_bytes()` reads only arrow's default memory pool. Leaks outside it (direct `malloc`, STL containers, pyarrow Python wrappers, AWS SDK state) would not appear in this counter — process RSS would still grow. Not observed for `tuned_ra2` in a separate 5-iteration run, but that test is allocator-dependent (macOS / mimalloc). <details> <summary>Test script (<code>scan_alloc_progression.py</code>)</summary> ```python """Arrow#39808 progression test — parquet scan allocation over time. Records every time pa.total_allocated_bytes() hits a new maximum during iteration. Monotonic growth through the scan is the pathological signature reported in apache/arrow#39808. Modes: default pds.dataset.to_batches() [as in arrow issue] tuned_ra2 pds.dataset.to_batches() + tuned kwargs [as in arrow issue] scanner_default dataset.scanner().scan_batches() [mirrors Ray V2 prod baseline] scanner_tuned dataset.scanner(...tuned).scan_batches()[mirrors Ray V2 prod w/ fix] fragment fragment.to_batches() [V1 path] pqfile ParquetFile.iter_batches() [reference] `scanner_tuned` replicates ParquetFileReader._arrow_scanner_kwargs() on the current branch exactly, and also reconstructs pa.Table per batch the same way FileReader._read_batches does, since TaggedRecordBatch holds a fragment ref that could affect buffer lifetime. """ from __future__ import annotations import argparse import gc import pyarrow as pa import pyarrow.dataset as pds import pyarrow.fs as pafs import pyarrow.parquet as pq BUCKET = "ray-example-data" KEY = "common_voice_17/parquet/5.parquet" REGION = "us-west-2" def _tuned_fso() -> pds.ParquetFragmentScanOptions: return pds.ParquetFragmentScanOptions( pre_buffer=False, use_buffered_stream=True, ) def _dataset(fs) -> pds.Dataset: return pds.dataset(f"{BUCKET}/{KEY}", filesystem=fs, format="parquet") def _scan_batches_as_tables(scanner: pds.Scanner): # Mirrors FileReader._read_batches: tagged batch -> pa.Table.from_batches. for tagged in scanner.scan_batches(): yield pa.Table.from_batches([tagged.record_batch]) def build(mode: str, batch_size: int): fs = pafs.S3FileSystem(anonymous=True, region=REGION) common = dict(batch_size=batch_size, batch_readahead=0, fragment_readahead=0) if mode == "default": return _dataset(fs).to_batches(**common) if mode == "tuned_ra2": kw = dict(common, batch_readahead=2, fragment_readahead=1) return _dataset(fs).to_batches(fragment_scan_options=_tuned_fso(), **kw) if mode == "scanner_default": scanner = _dataset(fs).scanner( columns=None, filter=None, batch_size=batch_size, batch_readahead=1, ) return _scan_batches_as_tables(scanner) if mode == "scanner_tuned": scanner = _dataset(fs).scanner( columns=None, filter=None, batch_size=batch_size, batch_readahead=1, fragment_readahead=1, fragment_scan_options=_tuned_fso(), ) return _scan_batches_as_tables(scanner) if mode == "fragment": ds = _dataset(fs) return next(iter(ds.get_fragments())).to_batches( batch_size=batch_size, use_threads=True, ) if mode == "pqfile": return pq.ParquetFile(f"{BUCKET}/{KEY}", filesystem=fs).iter_batches( batch_size=batch_size ) raise ValueError(mode) def main() -> None: ap = argparse.ArgumentParser() ap.add_argument( "mode", choices=[ "default", "tuned_ra2", "scanner_default", "scanner_tuned", "fragment", "pqfile", ], ) ap.add_argument("--batch-size", type=int, default=1000) args = ap.parse_args() gc.collect() base = pa.total_allocated_bytes() batches = build(args.mode, args.batch_size) max_alloc = 0 n = 0 print(f"=== {args.mode} ===") for batch in batches: alloc = pa.total_allocated_bytes() - base n += 1 if alloc > max_alloc: max_alloc = alloc print(f" batch {n:>4}: new max = {alloc/1e6:>8.1f} MB") del batches gc.collect() final = pa.total_allocated_bytes() - base print( f" Final (after del): {final/1e6:.1f} MB, batches={n}, peak={max_alloc/1e6:.1f} MB" ) if __name__ == "__main__": main() ``` Run with: `for mode in scanner_default scanner_tuned default tuned_ra2 fragment pqfile; do python scan_alloc_progression.py $mode; done` </details> ## Test plan - [x] Within-iteration progression test through the production `scanner().scan_batches()` path — `scanner_default` reproduces arrow#39808 (571.9 MB peak, climbing through batch 95/113); `scanner_tuned` eliminates it (75.3 MB peak, 2 new-max events). - [x] Within-iteration progression test through `dataset.to_batches()` (arrow-issue repro path) — same shape. - [x] E2E smoke: `ParquetFileReader.read()` on 300 MB local fixture — peak alloc 315 MB → 35 MB. - [ ] Optional: Linux rerun to confirm RSS behavior independent of macOS/mimalloc. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Summary
Tests whether apache/arrow#39808 ("Dataset.to_batches accumulates memory usage and leaks") reproduces in Ray Data V2's parquet read path, and ships a fix.
Finding: V2's production read path (
dataset.scanner(...).scan_batches()) reproduces the reported accumulation pattern —pa.total_allocated_bytes()climbs monotonically through the scan, peaking near full fragment size. V1'sfragment.to_batchespath has a similarly bad peak (different shape).Fix: inject
ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)plusfragment_readahead=1via a new_arrow_scanner_kwargs()hook onFileReader, overridden byParquetFileReader.batch_readaheadstays at theFileReaderbase default of1. CSV/JSON/feather unchanged.Test
Adapts the progression test from the Arrow issue: record every time
pa.total_allocated_bytes()hits a new maximum during a scan. Monotonic growth through the scan is the pathological signature.The progression was run through the exact production iteration path (
dataset.scanner(...).scan_batches(), reconstructingpa.Table.from_batches([tagged.record_batch])per batch — same asFileReader._read_batches). Thedataset.to_batches()rows from the arrow issue are included for comparison; they understate peak by ~13%, likely becauseTaggedRecordBatchholds a fragment reference that delays buffer release.Public S3 file (
s3://ray-example-data/common_voice_17/parquet/5.parquet, 497 MB, 113 row groups of audio blobs), pyarrow 23.0.0,batch_size=1000. Script at the bottom.scanner().scan_batches()(prod)scanner().scan_batches()(prod)dataset.to_batches()(arrow issue)dataset.to_batches()(arrow issue)fragment.to_batches()(V1)ParquetFile.iter_batches()(ref)Sample progressions through the production path:
scanner_default — accumulation pattern
scanner_tuned — pattern eliminated
The ~67 MB initial alloc in
scanner_tunedis the one-time readahead pipeline fill (1 batch + 1 fragment), not accumulation.Throughput (same file, 3 runs per mode, reporting min / median / max elapsed)
scanner().scan_batches()(prod)scanner().scan_batches()(prod)dataset.to_batches()dataset.to_batches()fragment.to_batches()(V1)ParquetFile.iter_batches()(ref)Within noise,
scanner_tunedmatchesscanner_default, V1fragment, andtuned_ra2— the fix doesn't regress throughput. The tighter max forscanner_tuned(11.29 s vsscanner_default's 19.38 s) hints at lower tail-latency variance, but n=3 is too small to claim that firmly.Caveats
pa.total_allocated_bytes()reads only arrow's default memory pool. Leaks outside it (directmalloc, STL containers, pyarrow Python wrappers, AWS SDK state) would not appear in this counter — process RSS would still grow. Not observed fortuned_ra2in a separate 5-iteration run, but that test is allocator-dependent (macOS / mimalloc).Test script (
scan_alloc_progression.py)Run with:
for mode in scanner_default scanner_tuned default tuned_ra2 fragment pqfile; do python scan_alloc_progression.py $mode; doneTest plan
scanner().scan_batches()path —scanner_defaultreproduces arrow#39808 (571.9 MB peak, climbing through batch 95/113);scanner_tunedeliminates it (75.3 MB peak, 2 new-max events).dataset.to_batches()(arrow-issue repro path) — same shape.ParquetFileReader.read()on 300 MB local fixture — peak alloc 315 MB → 35 MB.🤖 Generated with Claude Code