Skip to content

Abhishekverma data 2225 dev#63121

Draft
abhishekverma-ray wants to merge 28 commits into
ray-project:masterfrom
abhishekverma-ray:abhishekverma-DATA-2225-dev
Draft

Abhishekverma data 2225 dev#63121
abhishekverma-ray wants to merge 28 commits into
ray-project:masterfrom
abhishekverma-ray:abhishekverma-DATA-2225-dev

Conversation

@abhishekverma-ray
Copy link
Copy Markdown

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

goutamvenkat-anyscale and others added 24 commits April 23, 2026 00:08
… filter via V1 dispatch

Extract listing from the monolithic V2 read op into a new ListFiles
source operator that owns listing, shuffling, and size-balanced
bucketing via RoundRobinPartitioner. The read op becomes ReadFiles —
consumes FileManifest blocks from ListFiles via a single
input_dependency and only runs the scanner. Driver-side schema
inference calls sample_first_file against the indexer directly.

Checkpoint support on V2 matches V1's architecture exactly. Adds
ReadFiles to Planner._CHECKPOINT_FILTER_OPS and registers
plan_read_files_op_with_checkpoint_filter in the planner's
_get_plan_fns_for_checkpointing dispatch. That wrapper mirrors V1's
plan_read_op_with_checkpoint_filter: wraps the read physical op with a
dedicated "CheckpointFilter" ActorPool MapOperator backed by the
existing _get_checkpoint_map_transformer / _CheckpointFilterFn, with
the same ids_size-proportional memory reservation and
supports_fusion=False. No behavior divergence from V1 — same op
structure, same actor lifecycle, same memory contract. Plain
plan_read_files_op stays checkpoint-unaware; attaching the filter is
solely the planner's responsibility.

IdColumnCheckpointManager.load_checkpoint gets a defensive post-clean
empty-dir pre-check so it stays a no-op when pending-checkpoint
cleanup empties the directory — V2's read_parquet raises on empty
dirs (by design) while V1 returned a zero-row dataset, so this keeps
load_checkpoint behaving uniformly under both paths.

FileReader.read is simplified: early-return on empty manifest,
file_dataset_schema lifted to cached_property, partition-value
broadcast extracted into a helper, and the partition + include_paths
synthesis loops are merged.

Tests: V2 unit tests 65/65 green, parquet broad slice 212 pass / 17
skip / 0 fail under use_datasource_v2=True, checkpoint suite 63 pass /
3 pre-existing ModuleNotFoundError failures (reproduce on master,
unrelated).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
- Register ``_internal/logical/operators/tests/`` in BUILD.bazel so the
  new ``test_read_files_logical.py`` runs in CI (was missed by existing
  globs).
- Parallelize ``ParquetDatasourceV2.infer_schema`` sample reads across
  a thread pool (up to 16 workers). Sequential ``pq.read_schema`` on
  high-latency object stores was the bottleneck for driver-side
  schema inference.
- Narrow the defensive ``except Exception`` in
  ``IdColumnCheckpointManager.load_checkpoint``'s post-clean file-info
  probe to ``except OSError`` — ``allow_not_found=True`` already
  covers the missing-path case, so other exceptions shouldn't be
  silently swallowed.
- When ``parallelism=-1`` in ``_read_datasource_v2``, default
  ``RoundRobinPartitioner.num_buckets`` to
  ``ctx.read_op_min_num_blocks`` (same floor V1's
  ``_autodetect_parallelism`` uses) so unconfigured reads don't
  collapse to a single output block.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…scovery to ``resolve_partitioning``

``ParquetDatasourceV2.infer_schema`` used to mutate ``self._partitioning``
in place so the scanner (constructed afterwards) could report the
path-discovered field names via ``SupportsPartitionPruning.partition_columns``.
That made the datasource instance implicitly non-reusable: a second
planning pass saw the resolved partitioning instead of the original
hive template, and concurrent callers could race on the assignment.

Split the responsibilities:

- New ``DataSourceV2.resolve_partitioning(sample) -> Optional[Any]``
  base method (default returns ``None``). ``ParquetDatasourceV2``
  overrides it to return a fresh ``Partitioning`` with
  path-discovered ``field_names`` populated — reading from a sample
  but not writing back to ``self``.
- ``infer_schema`` now calls ``resolve_partitioning`` for the
  schema-augmentation step only; the datasource instance is untouched.
- ``create_scanner`` reads the resolved partitioning from
  ``options["partitioning"]``, falling back to ``self._partitioning``
  for direct users of the API.
- ``_read_datasource_v2`` calls ``resolve_partitioning`` on the sample
  once and threads the result into ``create_scanner`` — the only place
  that needs to know the resolved names.

Net effect: the datasource is immutable across reads, and the
sample-derived partitioning is an explicit plan-time value rather
than mutable instance state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
The helper samples up to ``max_files=16`` paths (not one), so the
prior name was misleading. Straight rename — no behavior change.
Updates the definition, all import sites (``read_api.py``, the logical
operator tests) and the ``ValueError`` message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…`; per-fragment reads for variable-shape tensors

``read_parquet`` was raising ``NotImplementedError`` for ``_block_udf``
and ``tensor_column_schema`` on the V2 path, skipping a batch of V1
tests. Wire them through:

- ``ReadFiles`` gets an optional ``block_udf: Callable[[Block], Block]``
  field. ``plan_read_files_op`` applies it after
  ``reader.read(manifest)`` and before column renames so the UDF sees
  on-disk column names (V1 ``ParquetDatasource`` semantics).
- ``_read_datasource_v2`` accepts a ``block_udf`` kwarg and stores it on
  the logical op.
- ``ReadFiles.infer_schema`` probes the UDF's schema effect via a dummy
  empty table (mirrors V1's ``dummy_table`` trick) so ``ds.schema()``
  reflects post-transform types before materialization. The scanner
  keeps the *pre-UDF* schema so pyarrow sees the raw on-disk types.
- ``read_parquet`` drops the two ``NotImplementedError`` raises;
  ``tensor_column_schema`` is already folded into ``_block_udf`` by
  ``_resolve_parquet_args`` so no extra handling is needed.

While un-skipping V1 tests, a second issue surfaced:
``test_multiple_files_with_ragged_arrays`` was failing because
``pds.dataset(paths).scanner().scan_batches()`` forces a cross-fragment
schema unification inside pyarrow. That unification casts per-file
``ArrowTensorTypeV2(shape=X)`` to the unified type and pyarrow refuses
extension-to-extension casts — "One can first cast to the storage
type, then to the extension type". V1 avoids this by iterating
``fragment.to_batches`` per fragment.

Port the pattern: ``FileReader._read_fragment_batches`` builds a
per-fragment scanner with that fragment's ``physical_schema`` so
pyarrow keeps the native per-file type. Downstream concat handles
heterogeneous block schemas, same as V1. The caller-supplied
``file_dataset_schema`` still applies for the common all-null
first-column case, and steps aside when any extension column is
present.

Tests: V2 unit 64/64, parquet broad slice 103 pass / 1 skip / 0 fail,
checkpoint suite 63 pass / 3 pre-existing ``ModuleNotFoundError``
failures (reproduced on master).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…ss-version pq.read_schema, test op-name match

Three CI issues surfaced against this branch:

1. ``test_arrow_batch_gt_2gb`` (test_jumbo_arrow_block) failed with
   ``TypeError: read_schema() got an unexpected keyword argument
   'filesystem'``. ``pyarrow.parquet.read_schema`` only picked up the
   ``filesystem=`` kwarg in recent pyarrow releases; the CI wheel
   predates it. Open the file through ``filesystem.open_input_file``
   and hand the file handle to ``read_schema`` — cross-version safe,
   same semantics, preserves the thread-pool parallelism.

2. ``test_read_write_local_node`` and
   ``test_read_write_local_node_ray_client`` failed because
   ``ParquetDatasourceV2`` resolves ``local://`` paths (strips the
   scheme) before anyone checks for local-scheme, so
   ``_read_datasource_v2`` couldn't pin reads to the driver.

   Fix mirrors V1's ``Datasource.supports_distributed_reads``:

   - ``DataSourceV2`` base class exposes a ``supports_distributed_reads``
     property (default ``True``) backed by a ``_supports_distributed_reads``
     attr — other V2 datasources can reuse the hook without
     reimplementing it.
   - ``ParquetDatasourceV2.__init__`` flips the attr based on
     ``_is_local_scheme(paths)`` against the *original* paths, before
     ``_resolve_paths_and_filesystem`` strips the scheme.
   - ``_read_datasource_v2`` consults the property and, when
     ``False``: sets ``ray_remote_args['label_selector']`` keyed by
     ``ray._raylet.RAY_NODE_ID_KEY`` (label selector API per PR
     ray-project#54940), drops any prior ``scheduling_strategy``, and raises the
     V1 ``ValueError`` under Ray Client.
   - The now-unused ``FileScanner.compute_local_scheduling`` helper is
     removed; local-read pinning flows entirely through the datasource
     property and ``_read_datasource_v2``.

3. ``test_limit_operator_memory_leak_fix`` grepped the topology for an
   op whose name contained ``ReadParquet``. V2's op is
   ``ReadFilesParquetV2`` (no substring match). Loosen the check to
   match both V1 and V2 names.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
pa.Table.rename_columns only accepts a dict on PyArrow >= 17. Forwarding
the rename dict through BlockAccessor was crashing on PyArrow 9 with
'tried to rename a table of N columns but only K names were provided'.
Call the existing _DatasourceProjectionPushdownMixin._apply_rename helper
that V1 uses, which expands to a positional list of new names.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Address several review findings on the V2 ReadFiles path:

- ``ReadFiles.apply_predicate`` no longer short-circuits to ``self`` when a
  predicate references a partition column. Reuses (and fixes) V1's
  ``_split_predicate_by_columns`` to route AND-conjuncts to
  ``Scanner.push_filters`` (data) and ``Scanner.prune_partitions``
  (partition), and keeps unsplittable mixed-column conjuncts as a
  ``Filter`` above the new ``ReadFiles`` instead of dropping them.

- Extend ``_SplitPredicateResult`` with ``residual_predicate``. The prior
  helper returned ``(None, None)`` for unsplittable mixed conjuncts inside
  an enclosing AND chain, which let V1 push the splittable parts and
  silently over-include rows. V1 ``ParquetDatasource.apply_predicate``
  now returns ``self`` when a residual exists, so the surrounding
  ``Filter`` stays intact.

- Rewrite ``SetReadParallelismRule._apply_v2``'s docstring to match the
  code: there is no sample-based size extrapolation; ``mem_size=None`` is
  passed unconditionally because the upstream ``ListFiles`` op rebalances
  buckets at execution time.

- Fix ``ParquetScanner.read_schema()`` schema/data mismatch: it appended
  ``"path"`` whenever ``include_paths=True`` even after a projection had
  pruned ``"path"`` away, while the file reader only synthesized columns
  listed in ``self.columns``. Gate the append on ``"path"`` surviving the
  projection.

- Improve the ``columns=`` deprecation message in ``read_parquet`` (V2
  path) so callers combining ``columns=`` with ``include_paths=True`` know
  V1 implicitly retained ``"path"`` and that ``select_columns([...])`` is
  literal.

- Extract ``INCLUDE_PATHS_COLUMN_NAME`` constant in
  ``readers/file_reader.py`` and reuse it across the V2 datasource,
  scanner, and reader instead of repeating the ``"path"`` string literal.

Signed-off-by: Goutam <goutam@anyscale.com>
V2's ParquetFileReader always read via fragment.scanner(), which raises
ArrowNotImplementedError ("Nested data conversions not implemented for
chunked array outputs") on Parquet files whose nested columns wrapping
variable-length leaves (string/binary/etc.) exceed Arrow's ~2GB chunking
threshold per row group (ARROW-5030). V1 already handles this via a
metadata-only fallback that switches to pq.ParquetFile.iter_batches with
a safe batch size derived from row-group metadata; this commit ports
that fallback to the V2 path.

- file_reader.py: convert ``_read_fragment_batches`` from staticmethod
  to instance method and split per-fragment iteration into a new
  ``_iter_fragment_tables(fragment, scanner_kwargs)`` hook so subclasses
  can swap the scanner-based path for a format-specific reader.

- parquet_file_reader.py: override ``_iter_fragment_tables`` to use V1's
  ``_needs_nested_type_fallback`` metadata check; when triggered, read
  via ``pq.ParquetFile.iter_batches`` with the safe batch size from
  ``_get_safe_batch_size_for_nested_types`` scoped to the projected
  leaf indices. Apply row-group-level predicate pushdown via
  ``fragment.subset(filter=)`` and the row-level filter per-batch
  post-read (``iter_batches`` doesn't accept a filter expression). When
  a filter is present we read all columns and project after filtering,
  since field references can't be reliably extracted from a
  ``pyarrow.compute.Expression``.

- test_parquet.py: drop the V2 skip from
  ``test_read_parquet_nested_type_arrow_not_implemented_fallback``
  (regression test for ray-project#61675); it now exercises the V2
  fallback end-to-end. The companion
  ``test_read_parquet_nested_fallback_skipped_when_only_flat_columns_selected``
  also passes on V2 unchanged.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
- ``plan_read_files_op_with_checkpoint_filter`` now defers to V1's
  ``create_checkpoint_filter_op`` instead of reimplementing the not-found
  short-circuit, ``IdColumnCheckpointManager`` load, and actor-pool wrap.
- Extract ``_resolve_read_remote_args`` for the label-selector +
  ``scheduling_strategy`` + ``merge_resources_to_ray_remote_args`` block
  shared by ``read_datasource`` and ``_read_datasource_v2``. V1 keeps its
  ``_validate_head_node_resources_for_local_scheduling`` call; V2 keeps
  its Ray Client raise.
- ``_read_datasource_v2`` uses ``_build_pruners`` from ``listing_utils``
  instead of constructing the pruner list inline.
- Fix pyrefly suppression on ``pq.ParquetFile(filesystem=...)`` — the
  ``# pyrefly: ignore`` comment must sit on the kwarg's own line.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…avoid OOM scenarios. Epsilon value might need some refinement by iteratively testing.
…EMORY_EPS_BYTES via environment for testing.
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the DataSourceV2 architecture for Parquet reads, decoupling file listing from data reading through new ListFiles and ReadFiles logical operators. The implementation adds support for driver-side schema sampling, advanced predicate pushdown with residual filtering, and memory-aware task scheduling. Review feedback points out potential performance bottlenecks: the I/O overhead of reading Parquet footers for all files during listing and the throughput impact of a conservative batch readahead default. Furthermore, the reviewer identified a bug in the UDF schema probing logic where a missing try-except block could lead to failures when UDFs are applied to empty tables.

paths = manifest.paths
out = np.empty(len(paths), dtype=np.int64)
for i, path in enumerate(paths):
out[i] = _parquet_max_uncompressed_row_group_bytes(path, self._filesystem)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Reading the Parquet footer for every file during the listing stage to populate max_uncompressed_row_group_size can significantly impact performance, especially for datasets with a large number of files (e.g., 100k+ files on S3). This information is currently only used for memory hints in ReadFiles tasks. Consider making this optional or using a sampling approach to avoid the I/O overhead during listing.

# dummy-table trick. Falls back to the scanner schema if the
# probe fails — the UDF may require a non-empty input.
if self.block_udf is not None:
transformed = self.block_udf(schema.empty_table()).schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code calls self.block_udf(schema.empty_table()) to probe the schema after a UDF. However, some UDFs might fail when given an empty table. The docstring mentions falling back to the scanner schema if the probe fails, but the implementation is missing a try-except block to handle this.

        if self.block_udf is not None:
            try:
                transformed = self.block_udf(schema.empty_table()).schema
                schema = transformed.with_metadata(schema.metadata)
            except Exception:
                # Fallback to scanner schema if the UDF requires non-empty input
                pass

# uncompressed batches (jumbo tensor columns can run to multi-GB per
# batch, and pyarrow's default 16-batch readahead would retain all of
# them).
_ARROW_SCANNER_BATCH_READAHEAD = 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Setting _ARROW_SCANNER_BATCH_READAHEAD to 1 is very conservative compared to PyArrow's default of 16. While this bounds driver/worker memory for jumbo tensor columns, it may significantly reduce throughput for standard datasets by limiting pipelining. Consider making this configurable via DataContext or increasing the default for non-tensor workloads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants