[Data][2/n][Dsv2] Add DataSourceV2 core API, scanner/reader framework, and optimizer mixins#61615
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the core API for DataSourceV2, including the DataSourceV2, Scanner, and Reader abstractions, along with mixins for optimizer pushdowns. This is a foundational change for unifying data source access in Ray Data. The new API is well-structured and modular.
My review focuses on ensuring the new abstractions are sound and maintainable. I've identified a few issues related to code duplication and circular dependencies that should be resolved to improve the design. I also found a critical bug in SamplingInMemorySizeEstimator where it calls a non-existent method, and an incorrect type hint. Please see the detailed comments for suggestions.
python/ray/data/_internal/datasource_v2/readers/in_memory_size_estimator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/datasource_v2/readers/in_memory_size_estimator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/datasource_v2/readers/in_memory_size_estimator.py
Outdated
Show resolved
Hide resolved
7af78ef to
403bd79
Compare
| # reading the file. So, we only estimate the encoding ratio if we don't | ||
| # already have one. | ||
| self._encoding_ratio = self._estimate_encoding_ratio(path, file_size) | ||
| break |
There was a problem hiding this comment.
Loop iterates all files needlessly when ratio exists
Low Severity
When self._encoding_ratio is already set from a previous call, the for loop iterates through every file in the manifest doing nothing, because the break on line 59 is inside the if self._encoding_ratio is None block. The intent is to skip estimation entirely when the ratio is known, but the loop only terminates early when the condition is true. The guard or the break needs to be restructured so the loop is skipped when the ratio is already computed.
python/ray/data/_internal/datasource_v2/readers/in_memory_size_estimator.py
Outdated
Show resolved
Hide resolved
| # reading the file. So, we only estimate the encoding ratio if we don't | ||
| # already have one. | ||
| self._encoding_ratio = self._estimate_encoding_ratio(path, file_size) | ||
| break |
There was a problem hiding this comment.
Unconditional break skips valid files for ratio estimation
Medium Severity
The break on line 59 is unconditional — it exits the loop after trying only the first file, even when _estimate_encoding_ratio returns None (e.g., file has zero size or produces no data). If the first file can't yield a valid ratio but subsequent files could, the estimator falls back to a 1:1 ratio unnecessarily. The break needs to be conditional on self._encoding_ratio is not None.
e6d421d to
c0e8101
Compare
c0e8101 to
410ae0d
Compare
|
|
||
|
|
||
| @DeveloperAPI | ||
| class DataSourceV2(ABC, Generic[InputBucket]): |
There was a problem hiding this comment.
Where will it be used?
There was a problem hiding this comment.
Will be used in subsequent PRs. When I introduce it for Parquet
| ... | ||
|
|
||
|
|
||
| class SamplingInMemorySizeEstimator(InMemorySizeEstimator): |
There was a problem hiding this comment.
This is just a copy, no changes, right?
There was a problem hiding this comment.
the filemanifest construction and read_files signature are different. But the rest are the same
410ae0d to
bb31ddb
Compare
37fa501 to
42dfa66
Compare
…izer mixins Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
42dfa66 to
a67b865
Compare
| Returns: | ||
| Iterator[pa.Table]: Iterator of PyArrow Tables containing the read data. | ||
| """ | ||
| ... |
There was a problem hiding this comment.
FileReader is concrete but read() returns None
Medium Severity
FileReader overrides Reader's @abstractmethod read() with a body of ..., which returns None. This makes FileReader a concrete, instantiable class whose read() silently violates the Iterator[pa.Table] return contract. SamplingInMemorySizeEstimator calls self._reader.read(manifest) and passes the result to next(), which will raise TypeError if the reader is a plain FileReader rather than a subclass with a real implementation. Marking FileReader as abstract or adding @abstractmethod to read() would prevent this.
Additional Locations (1)
Signed-off-by: Goutam <goutam@anyscale.com>
…, and optimizer mixins (ray-project#61615) ## Description - DataSourceV2 (datasource_v2.py) -- Top-level entry point that ties together file indexing, schema inference, size estimation, and scanner creation. Datasources declare their category (file-based, database, data lake, etc.) to enable category-specific optimizations. - Scanner / FileScanner (scanners/) -- Immutable, clonable abstraction representing a configured read plan. Scanner is responsible for partitioning input into parallel work units (plan()) and creating Reader instances. FileScanner provides a default plan() that distributes files evenly across a target parallelism. - Reader / FileReader (readers/) -- Worker-side execution: receives an InputBucket (e.g., FileManifest) and yields Arrow tables. FileReader is wired for column pruning, filter pushdown, and limit support via PyArrow's Dataset API. - InMemorySizeEstimator / SamplingInMemorySizeEstimator (readers/in_memory_size_estimator.py) -- Estimates in-memory data sizes to inform partitioning. The sampling variant reads a single file to estimate the encoding ratio (on-disk vs. in-memory) and applies it to the rest, with a 1:1 fallback for multi-batch files. - Logical optimizer mixins (logical_optimizers.py) -- Declarative capability interfaces (SupportsFilterPushdown, SupportsColumnPruning, SupportsLimitPushdown, SupportsPartitionPruning) that scanners can implement to advertise which optimizations they support. The execution engine can introspect these to apply pushdowns automatically. ``` DataSourceV2 ├── FileIndexer (discovery: what files exist?) ├── Scanner (planning: how to partition & what pushdowns?) │ └── Reader (execution: read data from a partition) └── SizeEstimator (cost model: how big is the data in memory?) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Pedro Jeronimo <pedro.jeronimo@tecnico.ulisboa.pt>
…, and optimizer mixins (ray-project#61615) ## Description - DataSourceV2 (datasource_v2.py) -- Top-level entry point that ties together file indexing, schema inference, size estimation, and scanner creation. Datasources declare their category (file-based, database, data lake, etc.) to enable category-specific optimizations. - Scanner / FileScanner (scanners/) -- Immutable, clonable abstraction representing a configured read plan. Scanner is responsible for partitioning input into parallel work units (plan()) and creating Reader instances. FileScanner provides a default plan() that distributes files evenly across a target parallelism. - Reader / FileReader (readers/) -- Worker-side execution: receives an InputBucket (e.g., FileManifest) and yields Arrow tables. FileReader is wired for column pruning, filter pushdown, and limit support via PyArrow's Dataset API. - InMemorySizeEstimator / SamplingInMemorySizeEstimator (readers/in_memory_size_estimator.py) -- Estimates in-memory data sizes to inform partitioning. The sampling variant reads a single file to estimate the encoding ratio (on-disk vs. in-memory) and applies it to the rest, with a 1:1 fallback for multi-batch files. - Logical optimizer mixins (logical_optimizers.py) -- Declarative capability interfaces (SupportsFilterPushdown, SupportsColumnPruning, SupportsLimitPushdown, SupportsPartitionPruning) that scanners can implement to advertise which optimizations they support. The execution engine can introspect these to apply pushdowns automatically. ``` DataSourceV2 ├── FileIndexer (discovery: what files exist?) ├── Scanner (planning: how to partition & what pushdowns?) │ └── Reader (execution: read data from a partition) └── SizeEstimator (cost model: how big is the data in memory?) ``` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>


Description
Related issues
Additional information